# SkeletonService integration tests

## This integration test suite can be run by simply restarting wih a fresh kernel and then running all cells.
## Any failures will be indicated with large bold red output, while all successes will be indicated by corresponding green output.
## Note that there is a corresponding command-line version of this test in a separate file. Please refer to that file for instructions on its use.

In [31]:
from IPython.display import display, HTML
display(HTML("<style>:root { --jp-notebook-max-width: 100% !important; }</style>"))

In [64]:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

import logging
logger = logging.getLogger()
logger.setLevel(logging.WARNING)

from IPython.display import Markdown, display
def printmd(string):
    display(Markdown(string))

def test_passed():
    printmd("# <font color='green'><b>TEST PASSED</b></font>")

def test_failed():
    printmd("# <font color='red'><b>TEST FAILED</b></font>")

def print_test_result(result):
    if result:
        test_passed()
    else:
        test_failed()

def test(result):
    print_test_result(result)
    # Asserting the result prevents the notebook from automatically running all tests.
    # I'm unsure if I want to assert the result and stop or gather all test results at the end.
    # assert result
    return result

# Set things up

In [33]:
import caveclient as cc
import importlib.metadata

print(f"CAVEclient version: v{cc.__version__} , v{importlib.metadata.version('CAVEclient')}")

datastack_name = "minnie65_phase3_v1"

client = cc.CAVEclient(datastack_name)
client.materialize.version = 1078

CAVEclient version: v7.7.0 , v7.7.0


## Pick a test server:
* **localhost:5000 — Test SkeletonService on the local machine, say via the VS Code Debugger**
* **ltv5 — The SkeletonService on the test cluster**
* **minniev6 — Test SkeletonService "in the wild"**

In [34]:
# server_address = "https://localhost:5000"
server_address = "https://ltv5.microns-daf.com"
# server_address = "https://minniev6.microns-daf.com"

skclient = cc.skeletonservice.SkeletonClient(server_address, datastack_name, over_client=client, verify=False)
print(f"SkeletonService server and version: {server_address} , v{skclient._server_version}")

SkeletonService server and version: https://ltv5.microns-daf.com , v0.18.5


In [67]:
# Hard-code the expected service version instead of retrieving it from the skclient above so we can manually determine when an intended version has fully deployed on a new pod
expected_skeleton_service_version = "0.18.5"
expected_available_skeleton_versions = [-1, 0, 1, 2, 3, 4]
bulk_rids = [864691135463611454, 864691135687456480]
larger_bulk_rids = bulk_rids * 6  # Twelve rids will exceed the ten-rid limit of get_bulk_skeletons()
single_rid = bulk_rids[0]
sample_refusal_list_rid = 112233445566778899
sample_invalid_node_rid = 864691135687000000
sample_supervoxel_rid = 88891049011371731
skvn = 4

# Delete the test rid files from the bucket so we can test regenerating them from scratch

In [36]:
from cloudfiles import CloudFiles

bucket = None
if "localhost" in server_address or "ltv" in server_address:
    bucket = f"gs://minnie65_skeletons/ltv/{datastack_name}/{skvn}"
elif "minnie" in server_address:
    bucket = f"gs://minnie65_skeletons/{datastack_name}/{skvn}"
print(f"Testing bucket: {bucket}")

cf = CloudFiles(bucket)
for rid in bulk_rids:
    for output_format in ["h5", "flatdict", "swccompressed"]:
        filename = f"skeleton__v{skvn}__rid-{rid}__ds-{datastack_name}__res-1x1x1__cs-True__cr-7500.{output_format}.gz"
        print(filename)
        print(cf.exists(filename))

Testing bucket: gs://minnie65_skeletons/ltv/minnie65_phase3_v1/4
skeleton__v4__rid-864691135463611454__ds-minnie65_phase3_v1__res-1x1x1__cs-True__cr-7500.h5.gz
True
skeleton__v4__rid-864691135463611454__ds-minnie65_phase3_v1__res-1x1x1__cs-True__cr-7500.flatdict.gz
False
skeleton__v4__rid-864691135463611454__ds-minnie65_phase3_v1__res-1x1x1__cs-True__cr-7500.swccompressed.gz
False
skeleton__v4__rid-864691135687456480__ds-minnie65_phase3_v1__res-1x1x1__cs-True__cr-7500.h5.gz
True
skeleton__v4__rid-864691135687456480__ds-minnie65_phase3_v1__res-1x1x1__cs-True__cr-7500.flatdict.gz
False
skeleton__v4__rid-864691135687456480__ds-minnie65_phase3_v1__res-1x1x1__cs-True__cr-7500.swccompressed.gz
False


In [37]:
from cloudfiles import CloudFiles

cf = CloudFiles(bucket)
for rid in bulk_rids:
    for output_format in ["h5", "flatdict", "swccompressed"]:
        filename = f"skeleton__v{skvn}__rid-{rid}__ds-{datastack_name}__res-1x1x1__cs-True__cr-7500.{output_format}.gz"
        print(filename)
        print(cf.exists(filename))
        cf.delete(filename)
        print(cf.exists(filename))

skeleton__v4__rid-864691135463611454__ds-minnie65_phase3_v1__res-1x1x1__cs-True__cr-7500.h5.gz
True
False
skeleton__v4__rid-864691135463611454__ds-minnie65_phase3_v1__res-1x1x1__cs-True__cr-7500.flatdict.gz
False
False
skeleton__v4__rid-864691135463611454__ds-minnie65_phase3_v1__res-1x1x1__cs-True__cr-7500.swccompressed.gz
False
False
skeleton__v4__rid-864691135687456480__ds-minnie65_phase3_v1__res-1x1x1__cs-True__cr-7500.h5.gz
True
False
skeleton__v4__rid-864691135687456480__ds-minnie65_phase3_v1__res-1x1x1__cs-True__cr-7500.flatdict.gz
False
False
skeleton__v4__rid-864691135687456480__ds-minnie65_phase3_v1__res-1x1x1__cs-True__cr-7500.swccompressed.gz
False
False


## Metadata tests

In [69]:
import packaging

skeleton_service_version = skclient.get_version()
if not test(skeleton_service_version == packaging.version.Version(expected_skeleton_service_version)):
    print("Make sure you have assigned the expected version near the top of this test suite. Search for 'expected_skeleton_service_version'.")

# <font color='green'><b>TEST PASSED</b></font>

In [39]:
skeleton_versions = skclient.get_versions()
test(skeleton_versions == expected_available_skeleton_versions)

# <font color='green'><b>TEST PASSED</b></font>

In [40]:
precomputed_skeleton_info = skclient.get_precomputed_skeleton_info(skvn=skvn)
test(precomputed_skeleton_info == {
    '@type': 'neuroglancer_skeletons',
    'transform': [1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0],
    'vertex_attributes': [
        {'id': 'radius', 'data_type': 'float32', 'num_components': 1},
        {'id': 'compartment', 'data_type': 'uint8', 'num_components': 1}
    ]
})

# <font color='green'><b>TEST PASSED</b></font>

## Cache status tests

In [41]:
# %%time
import json

rids_exist = skclient.skeletons_exist(skeleton_version=skvn, root_ids=bulk_rids)
# print(json.dumps(rids_exist, indent=4))
test(rids_exist == {
    bulk_rids[0]: False,
    bulk_rids[1]: False
})

# <font color='green'><b>TEST PASSED</b></font>

In [42]:
rids_exist = skclient.skeletons_exist(skeleton_version=skvn, root_ids=bulk_rids, log_warning=False)
# print(json.dumps(rids_exist, indent=4))
test(rids_exist == {
    bulk_rids[0]: False,
    bulk_rids[1]: False
})

# <font color='green'><b>TEST PASSED</b></font>

In [43]:
# Requires CAVEclient version >= v7.6.1
rids_exist = skclient.skeletons_exist(skeleton_version=skvn, root_ids=bulk_rids, verbose_level=1)
print(json.dumps(rids_exist, indent=4))
test(rids_exist == {
    bulk_rids[0]: False,
    bulk_rids[1]: False
})

{
    "864691135463611454": false,
    "864691135687456480": false
}


# <font color='green'><b>TEST PASSED</b></font>

## Cache contents tests

In [44]:
# %%time
import json

cache_contents = skclient.get_cache_contents(skeleton_version=skvn, root_id_prefixes=bulk_rids)
print(json.dumps(cache_contents, indent=4))
test(cache_contents == {
    "num_found": 0,
    "files": []
})

{
    "num_found": 0,
    "files": []
}


# <font color='green'><b>TEST PASSED</b></font>

In [45]:
cache_contents = skclient.get_cache_contents(skeleton_version=skvn, root_id_prefixes=bulk_rids, log_warning=False)
print(json.dumps(cache_contents, indent=4))
test(cache_contents == {
    "num_found": 0,
    "files": []
})

{
    "num_found": 0,
    "files": []
}


# <font color='green'><b>TEST PASSED</b></font>

In [46]:
# Requires CAVEclient version >= v7.6.1
cache_contents = skclient.get_cache_contents(skeleton_version=skvn, root_id_prefixes=bulk_rids, verbose_level=1)
print(json.dumps(cache_contents, indent=4))
test(cache_contents == {
    "num_found": 0,
    "files": []
})

{
    "num_found": 0,
    "files": []
}


# <font color='green'><b>TEST PASSED</b></font>

# Invalid skeleton request tests

In [47]:
# %%time
import requests

try:
    sk = skclient.get_skeleton(sample_refusal_list_rid, datastack_name, skeleton_version=skvn, output_format='dict', verbose_level=1)
    test_failed()
except ValueError as e:
    print(e.args[0])
    test(e.args[0] == 'Invalid root id: ' + str(sample_refusal_list_rid) + ' (perhaps it doesn\'t exist; the error is unclear)')
# except requests.HTTPError as e:
#     print(e)
#     test(e.response.text == '{\n    "Error": "Problematic root id: ' + str(sample_refusal_list_rid) + ' is in the refusal list"\n}\n')

Invalid root id: 112233445566778899 (perhaps it doesn't exist; the error is unclear)


# <font color='green'><b>TEST PASSED</b></font>

In [48]:
# %%time
import requests

try:
    sk = skclient.get_skeleton(sample_invalid_node_rid, datastack_name, skeleton_version=skvn, output_format='dict', verbose_level=1)
    test_failed()
except ValueError as e:
    print(e.args[0])
    test(e.args[0] == 'Invalid root id: ' + str(sample_invalid_node_rid) + ' (perhaps it doesn\'t exist; the error is unclear)')
# except requests.HTTPError as e:
#     print(e)
#     test(e.response.text == '{\n    "Error": "Invalid root id: ' + str(sample_invalid_node_rid) + ' (perhaps it doesn\'t exist; the error is unclear)"\n}\n')

Invalid root id: 864691135687000000 (perhaps it doesn't exist; the error is unclear)


# <font color='green'><b>TEST PASSED</b></font>

In [49]:
# %%time
import requests

try:
    sk = skclient.get_skeleton(sample_supervoxel_rid, datastack_name, skeleton_version=skvn, output_format='dict', verbose_level=1)
    test_failed()
except ValueError as e:
    print(e.args[0])
    test(e.args[0] == 'Invalid root id: ' + str(sample_supervoxel_rid) + ' (perhaps this is an id corresponding to a different level of the PCG, e.g., a supervoxel id)')
# except requests.HTTPError as e:
#     print(e)
#     test(e.response.text == '{\n    "Error": "Invalid root id: ' + str(sample_supervoxel_rid) + ' (perhaps this is an id corresponding to a different level of the PCG, e.g., a supervoxel id)"\n}\n')

Invalid root id: 88891049011371731 (perhaps this is an id corresponding to a different level of the PCG, e.g., a supervoxel id)


# <font color='green'><b>TEST PASSED</b></font>

# Skeleton request tests

In [50]:
%%time
from timeit import default_timer

start_time = default_timer()
sk = skclient.get_skeleton(single_rid, datastack_name, skeleton_version=skvn, output_format='dict', verbose_level=1)
elapsed_time = default_timer() - start_time
print(f"Elapsed time: {elapsed_time:>.1f}s")
# display(sk)
test(sk is not None and isinstance(sk, dict))
test(elapsed_time > 5 and elapsed_time < 90)

Elapsed time: 7.4s


# <font color='green'><b>TEST PASSED</b></font>

# <font color='green'><b>TEST PASSED</b></font>

CPU times: user 42.4 ms, sys: 8.82 ms, total: 51.2 ms
Wall time: 7.4 s


In [51]:
%%time
start_time = default_timer()
sk = skclient.get_skeleton(single_rid, datastack_name, skeleton_version=skvn, output_format='dict', verbose_level=1)
elapsed_time = default_timer() - start_time
print(f"Elapsed time: {elapsed_time:>.1f}s")
# display(sk)
test(sk is not None and isinstance(sk, dict))
test(elapsed_time < 5)

Elapsed time: 2.0s


# <font color='green'><b>TEST PASSED</b></font>

# <font color='green'><b>TEST PASSED</b></font>

CPU times: user 34.6 ms, sys: 6.92 ms, total: 41.5 ms
Wall time: 1.99 s


In [52]:
%%time
import pandas as pd

start_time = default_timer()
sk = skclient.get_skeleton(single_rid, datastack_name, skeleton_version=skvn, output_format='swc', verbose_level=1)
elapsed_time = default_timer() - start_time
print(f"Elapsed time: {elapsed_time:>.1f}s")
# display(sk)
test(sk is not None and isinstance(sk, pd.DataFrame))
test(elapsed_time < 5)

Elapsed time: 1.9s


# <font color='green'><b>TEST PASSED</b></font>

# <font color='green'><b>TEST PASSED</b></font>

CPU times: user 27.2 ms, sys: 11.3 ms, total: 38.5 ms
Wall time: 1.91 s


### Inspect the cache after generating new skeletons

In [53]:
# %%time
rids_exist = skclient.skeletons_exist(skeleton_version=skvn, root_ids=bulk_rids)
print(json.dumps(rids_exist, indent=4))
test(rids_exist == {
    bulk_rids[0]: True,
    bulk_rids[1]: False
})

{
    "864691135463611454": true,
    "864691135687456480": false
}


# <font color='green'><b>TEST PASSED</b></font>

In [54]:
# %%time
import json
cache_contents = skclient.get_cache_contents(skeleton_version=skvn, root_id_prefixes=bulk_rids)
print(json.dumps(cache_contents, indent=4))
test(cache_contents == {
    "num_found": 1,
    "files": [
        f"skeleton__v4__rid-{bulk_rids[0]}__ds-{datastack_name}__res-1x1x1__cs-True__cr-7500.h5.gz"
    ]
})

{
    "num_found": 1,
    "files": [
        "skeleton__v4__rid-864691135463611454__ds-minnie65_phase3_v1__res-1x1x1__cs-True__cr-7500.h5.gz"
    ]
}


# <font color='green'><b>TEST PASSED</b></font>

# Small bulk skeleton request tests
## This routine truncates the request list to a small number (10 at the time of this writing), returns any skeletons that are available, and submits the rest to the asynchronous queue

In [55]:
# %%time
result = skclient.get_bulk_skeletons(bulk_rids, skeleton_version=skvn, output_format='dict')
# We can't assert both root ids but only one was generated by the previous tests above.
# The other root id will be asyncronously triggered by this test but won't be available for 20-60 seconds afterwards.
test(str(bulk_rids[0]) in result.keys())

# <font color='green'><b>TEST PASSED</b></font>

In [56]:
# %%time
result = skclient.get_bulk_skeletons(bulk_rids, skeleton_version=skvn, output_format='dict', verbose_level=1)
# We can't assert both root ids but only one was generated by the previous tests above.
# The other root id will be asyncronously triggered by this test but won't be available for 20-60 seconds afterwards.
test(str(bulk_rids[0]) in result.keys())

# <font color='green'><b>TEST PASSED</b></font>

In [57]:
# %%time
result = skclient.get_bulk_skeletons(larger_bulk_rids, skeleton_version=skvn, output_format='dict', verbose_level=1)
# We can't assert both root ids but only one was generated by the previous tests above.
# The other root id will be asyncronously triggered by this test but won't be available for 20-60 seconds afterwards.
test(str(bulk_rids[0]) in result.keys())

# <font color='green'><b>TEST PASSED</b></font>

# Asynchronous bulk skeleton request tests
## This routine submits a large number of requests and returns only the estimated time to complete the job; it doesn't return any skeletons.
### The estimated job time depends on the number of parallel workers available on the server with each skeleton allocated 60s for estimation purposes.
### For example, with 10 workers, 1–10 skeletons would take 60s, 11–20 skeletons would take 120s, etc.
### At the time of this writing, all servers are configured to use 30 workers.

In [58]:
# %%time
result = skclient.generate_bulk_skeletons_async(bulk_rids, skeleton_version=skvn)
print(type(result), result)
test(result == 60.0)

<class 'float'> 60.0


# <font color='green'><b>TEST PASSED</b></font>

In [59]:
# %%time
result = skclient.generate_bulk_skeletons_async(bulk_rids, skeleton_version=skvn, verbose_level=1)
print(type(result), result)
test(result == 60.0)

<class 'float'> 60.0


# <font color='green'><b>TEST PASSED</b></font>

In [60]:
# %%time
result = skclient.generate_bulk_skeletons_async(larger_bulk_rids, skeleton_version=skvn, verbose_level=1)
print(type(result), result)
test(result == 60.0)

<class 'float'> 60.0


# <font color='green'><b>TEST PASSED</b></font>

## Meshwork tests
### At the current time, I implemented meshwork generation and caching, but then removed it at the suggestion of other team members, saving the code for possible future use. Consequently, the meshwork routines can't be tested until the code is added back at some later time, and therefore, the following tests are currently disabled.

In [61]:
RUN_MESHWORK_TESTS = False

In [62]:
%%time
from io import BytesIO

if RUN_MESHWORK_TESTS:
    import pcg_skel
    
    mw_bytes = skclient.get_meshwork(single_rid, datastack_name, verbose_level=1)
    print(len(mw_bytes))
    nrn = pcg_skel.meshwork.load_meshwork(BytesIO(mw_bytes))
    print(nrn)
    print(len(nrn.vertices), len(nrn.edges), len(nrn.anno['pre_syn']), len(nrn.anno['post_syn']))
    print(nrn.distance_to_root(nrn.anno.post_syn.mesh_index) / 1000)

    print("This test doesn't appear to be fully implemented yet, as it doesn't contain an assertion clause.")
    
    test_passed()

CPU times: user 7 µs, sys: 0 ns, total: 7 µs
Wall time: 9.06 µs


In [63]:
%%time
if RUN_MESHWORK_TESTS:
    estimated_time = skclient.generate_bulk_meshworks_async(bulk_rids, datastack_name, verbose_level=1)
    print(type(result), result)
    assert result == 60.0
    test_passed()

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 4.05 µs
