# Example Code for Existing Functions

In [1]:
import crantpy as cp
# set logging level to INFO
cp.set_logging_level("INFO")

## Cave Client

#### Authentication

In [2]:
# Generate a token for accessing the CAVE API
cp.generate_cave_token()

New Tokens need to be acquired by hand. Please follow the following steps:
                1) Go to: https://proofreading.zetta.ai/auth/api/v1/create_token to create a new token.
                2) Log in with your Google credentials and copy the token shown afterward.
                3a) Save it to your computer with: client.auth.save_token(token="PASTE_YOUR_TOKEN_HERE")
                or
                3b) Set it for the current session only with client.auth.token = "PASTE_YOUR_TOKEN_HERE"
                Note: If you need to save or load multiple tokens, please read the documentation for details.




In [3]:
# Set the token
cp.set_cave_token("db865e00b18c304342661b919197d0b0")

In [None]:
# combine both processes using save argument
cp.generate_cave_token(save=True)

In [7]:
cp.get_current_cave_token()

'db865e00b18c304342661b919197d0b0'

#### Getting Cave Client

In [12]:
client = cp.get_cave_client()

2025-06-20 08:55:47 - INFO - Cached cave_clients is stale.
2025-06-20 08:55:47 - INFO - Fetching new cave_clients...


In [11]:
cp.get_cave_client() # cached client is returned if it exists and is not stale

2025-06-20 08:54:11 - INFO - Cached cave_clients is stale.
2025-06-20 08:54:11 - INFO - Fetching new cave_clients...


CAVEclient<datastack_name=kronauer_ant, server_address=https://proofreading.zetta.ai>

In [15]:
cp.validate_cave_client(client)

False

In [14]:
cp.get_cave_client(clear_cache=True) # a new client is created and cached

2025-06-20 08:55:52 - INFO - Fetching new cave_clients...


CAVEclient<datastack_name=kronauer_ant, server_address=https://proofreading.zetta.ai>

In [None]:
# you can force clearing the cache
cp.clear_cave_client_cache()

In [None]:
client = cp.get_cave_client(dataset="latest") # you can specify a dataset to use
client_sandbox = cp.get_cave_client(dataset="sandbox") # you can specify a dataset to use

In [None]:
client = cp.get_cave_client(dataset="latest") # you can specify a dataset to use

In [None]:
client = cp.get_cave_client(dataset="sandbox") # you can specify a dataset to use

In [None]:
# get cave datastacks
datastacks = cp.get_cave_datastacks()
print(datastacks)


In [None]:
# get datastack segmentation source
print(cp.get_datastack_segmentation_source(datastack="kronauer_ant"))

In [None]:
# or get it via dataset
print(cp.get_dataset_segmentation_source(dataset="sandbox"))

In [16]:
cp.get_cloudvolume(dataset="latest")

2025-06-20 09:11:03 - INFO - Fetching new cloudvolumes...
2025-06-20 09:11:04 - INFO - Cached cave_clients is stale.
2025-06-20 09:11:04 - INFO - Fetching new cave_clients...


<cloudvolume.frontends.graphene.CloudVolumeGraphene at 0x13edea850>

In [18]:
cp.supervoxels_to_roots(74100212167388307)

AttributeError: No crantpy attribute supervoxels_to_roots

In [None]:
cp.roots_to_supervoxels(576460752688452655)

In [17]:
from crantpy.queries import NeuronCriteria, get_annotations
from crantpy.utils import parse_root_ids

ImportError: cannot import name 'parse_root_ids' from 'crantpy.utils' (/Users/giacomo.glotzer/Desktop/Rockefeller/Kronauer/CRANTb/crantpy/src/crantpy/utils/__init__.py)

In [None]:
%timeit cp.is_latest_roots(cp.get_proofread_neurons(), use_http_session=True)

In [None]:
%timeit cp.is_latest_roots(cp.get_proofread_neurons(), use_http_session=False)

In [None]:
# make sure the results are the same
import numpy as np
assert np.array_equal(
    cp.is_latest_roots(cp.get_proofread_neurons(), use_http_session=True),
    cp.is_latest_roots(cp.get_proofread_neurons(), use_http_session=False)
), "Results are not the same!"

In [27]:
import crantpy as cp
cp.get_proofread_neurons()

AttributeError: No crantpy attribute get_proofread_neurons

In [28]:
import crantpy as cp
df = cp.get_all_seatable_annotations()

KeyError: 'proofread_only'

In [None]:
cp.update_ids(df)

In [None]:
cp.is_proofread(cp.NeuronCriteria(tract='mALT'))

In [None]:
all_roots = NeuronCriteria().get_roots()

In [None]:
parse_root_ids(NeuronCriteria(tract='mALT'))

In [None]:
get_annotations(NeuronCriteria(tract='mALT'),proofread_only=False,clear_cache=True)

In [None]:
from crantpy.utils.cave import get_cave_client
import pcg_skel
import numpy as np
import pandas as pd
import cloudvolume as cv
import navis

In [None]:
client = get_cave_client()

# Choose a neuron to work with, best way: read seatable/CAVE table
root_id = 576460752653449509

# Get skeleton!
skel = pcg_skel.pcg_skeleton(root_id=root_id, client=client)

##############################################
### Optionally, get skeleton in SWC format ###
##############################################

# Extract vertices and edges
vertices = skel.vertices
edges = skel.edges

# Create a dictionary to store node information
node_info = {}

# Initialize all nodes as endpoints (type 6)
for i, coord in enumerate(vertices):
    node_info[i] = {
        'PointNo': i + 1,  # SWC index (1-based)
        'type': 6,   # Default to endpoint
        'X': coord[0],
        'Y': coord[1],
        'Z': coord[2],
        'W': 1.0,  # Default W (radius), adjust if you have actual data
        'Parent': -1  # Default parent to -1 (root)
    }

# Update node types and parent information based on edges
for edge in edges:
    parent, child = edge
    node_info[parent]['type'] = 3  # Set as basal dendrite, adjust as needed
    node_info[child]['type'] = 3   # Set as basal dendrite, adjust as needed
    node_info[child]['Parent'] = parent + 1  # Set parent (convert to 1-based index)

# Find the root node (the one without a parent or with parent -1)
root = [n for n, info in node_info.items() if info['Parent'] == -1][0]
node_info[root]['type'] = 1  # Set root type

# Create DataFrame
df = pd.DataFrame.from_dict(node_info, orient='index')

# Reorder columns to match desired SWC format
df = df[['PointNo', 'type', 'X', 'Y', 'Z', 'W', 'Parent']]

# Sort by node index
df = df.sort_values('PointNo')

# write as .SWC file if you want!
df.to_csv('neuron.swc', index=False)

In [None]:
import datetime as dt
import pytz
current_time = pytz.UTC.localize(dt.datetime.utcnow())
mds = client.materialize.get_versions_metadata()
expired = [version for version in mds if version['expires_on'] > current_time]
print(f"Expired versions: {expired}")

In [19]:
from crantpy.utils.cave import get_cloudvolume, get_cave_datastacks

In [None]:
from caveclient import CAVEclient
CAVEclient(server_address='https://proofreading.zetta.ai').info.get_datastacks()

In [20]:
root_id = 576460752653449509
volume = get_cloudvolume()

2025-06-20 09:12:04 - INFO - Using cached cloudvolumes.


In [21]:
mesh = volume.mesh.get(root_id, deduplicate_chunk_boundaries=False)[root_id]



In [22]:
import navis
navis.patch_cloudvolume()
volume.mesh.get_navis(root_id)

INFO  : cloud-volume successfully patched! (navis)
2025-06-20 09:12:13 - INFO - cloud-volume successfully patched!




Unnamed: 0,type,name,id,units,n_vertices,n_faces
0,navis.MeshNeuron,,576460752653449509,1 nanometer,295374,589728


In [23]:

n = navis.MeshNeuron(mesh, id=id, units="nm")
s = navis.skeletonize(n)

In [24]:
s

Unnamed: 0,Unnamed: 1
type,navis.TreeNeuron
name,
id,<built-in function id>
n_nodes,13362
n_connectors,
n_branches,439
n_leafs,857
cable_length,1744192.0
soma,
units,1 nanometer


In [25]:
s.plot3d(backend='plotly')