# CBCT scan fusion

This API takes oral scans and zipped raw CBCT files (dcms / NRRD in zip), and then returns the reconstructed organ mesh along with fused teeth mesh constructed from cbct-root and scan-crown.

For API specification, see https://www.chohotech.com/docs/cloud-en/#/workflow/scan-seg-and-cbct-fusion-1

In [None]:
import os
import glob
import base64
import time
import requests
import json
import trimesh
import urllib
import numpy as np

def _create_colors():
    # 20 high contrast colors
    colors = [[230, 25, 75,255],[60, 180, 75,255],[255, 225, 25,255],\
            [0, 130, 200,255],[245, 130, 48,255],[145, 30, 180,255],[70, 240, 240,255],\
            [240, 50, 230,255],[210, 245, 60,255],[250, 190, 190,255],[0, 128, 128,255],\
            [230, 190, 255,255],[170, 110, 40,255],[255, 250, 200,255],[128, 0, 0,255],\
            [170, 255, 195,255],[128, 128, 0, 255]]
    #np.random.shuffle(colors)
    # gum color
    colors = [[255,255,255,255]] + colors
    return colors

def colored_mesh(mesh, cid):
    COLORS = _create_colors()
    mcopy = mesh.copy()
    mcopy.visual.face_colors = COLORS[cid % 18]
    return mcopy

# Define call rules

Please modify the following code blocks based on the credentials you obtained from us.

In [None]:
# Chohotech service request URL, sent with the API documentation.
base_url = "<service request URL>"

# Chohotech file service URL, sent with the API documentation.
file_server_url = "<service file server URL>"

# The authentication header must be passed in. Please keep the TOKEN confidential!!! If it is leaked, please contact us immediately to reset it. All tasks using this TOKEN will be charged to your account.
zh_token = "<your company's service Token, sent with the contact>" # All API calls must be authenticated with the token.

user_group = "APIClient" # User group, usually named APIClient.

# Your company's user_id, sent with the API documentation.
user_id = "<your company's user_id>"

# If you have received creds.json, it will be read directly below.
if os.path.exists('../../creds.json'):
    creds = json.load(open('../../creds.json', 'r'))
    base_url = creds['base_url']
    file_server_url = creds['file_server_url']
    zh_token = creds['zh_token']
    user_id = creds['user_id']
    print("loaded creds from creds.json")

In [None]:
json_call = {
  "spec_group": "cbct",
  "spec_name": "scan-seg-and-cbct-fusion",
  "spec_version": "1.0-snapshot",
  "user_group": user_group,
  "user_id": user_id
}

### Define Callback URL

Callback will send a POST request to the specified URL after the workflow is completed. The request content is a JSON object with 4 fields:

1. workflow_id (str): the corresponding workflow_id
2. metadata (dict): the metadata you passed when starting the workflow
3. success (bool): whether the workflow succeeded (true for success)
4. reason (str or null): If success is true, this item will be null. Otherwise, it will be a string representing the reason for failure.

If you need a callback, please uncomment the following code block.

In [1]:
# json_call['metadata'] = { # (optional) metadata can be added here, which will be attached to the callback information. Each item in the dictionary is limited to 128 characters
#     "case_id": "CH-123",
#     "case_name": "ABCDE"
# }
# json_call['notification'] =[ # (optional) callback URL can be added here
#     {"url": "https://www.baidu.com"} # multiple callback URLs can be added here, each in the format {"url": "xxx"}
# ]

## Define Input/Output Blocks

The input CBCT file must be a zip package containing all CBCT files of the sequence. The algorithm will automatically search for the first CBCT sequence contained within the zip. If the top-level directory within the zip package is nested with additional compressed files (zip / rar / tar), the algorithm will at most decompress one level further.

In [None]:
def upload_file(file_name):
    ext = file_name.split('.')[-1]
    data = open('../../data/' + file_name, 'rb').read()
    resp = requests.get(file_server_url + f"/scratch/{user_group}/{user_id}/upload_url?" +
                        f"postfix={ext}", # Must specify postfix, i.e., file extension
                        headers={"X-ZH-TOKEN": zh_token}) # Get signed upload URL
    resp.raise_for_status()

    upload_url = resp.text[1:-1] # Returns a single string JSON "string", can also use json.loads(resp.text)

    resp = requests.put(upload_url, data) # No auth header is needed for uploading to the cloud storage service

    resp.raise_for_status()
    path = "/".join(urllib.parse.urlparse(upload_url).path.lstrip("/").split("/")[3:])
    urn = f"urn:zhfile:o:s:{user_group}:{user_id}:{path}"
    return urn

json_call["input_data"] = {
    "upper_jaw_scan": {"type":"drc", "data": upload_file('upper_jaw_scan.drc')},
    "lower_jaw_scan": {"type":"ply", "data": upload_file('lower_jaw_scan.ply')},
    "raw_ct_file": upload_file("cbct.zip")
}

json_call["output_config"]= {
  "teeth": {"type": "ply"},
  "root": {"type": "ply"},
  "fusion": {"type": "ply"},
  "reconstructions": {"type": "ply"},
  "cbct_lower_jaw": {"type": "ply"},
  "cbct_upper_jaw": {"type": "ply"}
}

# Submit request

Submit the request using the `/run` POST method.

In [None]:
headers = {
  "Content-Type": "application/json",
  "X-ZH-TOKEN": zh_token
}

url = base_url + '/run'

response = requests.request("POST", url, headers=headers, data=json.dumps(json_call))
response.raise_for_status()
create_result = response.json()
run_id = create_result['run_id']
print(run_id)

# Wait for the request to complete

Use the polling API to wait for the request to complete, the API URL is `/run/{run_id}`.

**It is strongly recommended to use the callback method.** Refer to [Define Callback URL](#Define-Callback-URL), the callback method will send a callback message to the given URL at the end of the workflow, regardless of whether the workflow is successful or not, so you do not need to poll the result.

In [None]:
# Step 2: Polling the status
url = base_url + f"/run/{run_id}"

start_time = time.time()
while time.time()-start_time < 600: # Maximum wait time: 10 minutes
    time.sleep(3) # Polling interval
    response = requests.request("GET", url, headers=headers)
    result = response.json()
    if result['completed'] or result['failed']:
        break

if not result['completed']:
    if result['failed']:
        raise ValueError("API error, reason: " + str(result['reason_public']))
    raise TimeoutError("API timeout")

print("API execution time: {}s".format(time.time()-start_time))

# Get the request result

Use `/data/{run_id}` to get all output data of this workflow, use `/data/{run_id}/{key}` to get a specific item of data.

In [None]:
url = base_url + f"/data/{run_id}"
response = requests.request("GET", url, headers=headers)
result = response.json()

In [None]:
def retrieve_mesh(mesh_file_json):
    resp = requests.get(file_server_url + f"/file/download?" + urllib.parse.urlencode({
                        "urn": mesh_file_json['data']}),
                        headers={"X-ZH-TOKEN": zh_token})
    return trimesh.load(trimesh.util.wrap_as_stream(resp.content), file_type=mesh_file_json['type'])

reconstructions = sum([
    colored_mesh(retrieve_mesh(result["reconstructions"][k]), i) for i,k in enumerate(result["reconstructions"].keys())
], None)

fusion = sum([
    colored_mesh(retrieve_mesh(result["fusion"][k]), i) for i,k in enumerate(result["fusion"].keys())
], None)

cbct_lower_jaw = sum([
    colored_mesh(retrieve_mesh(k), i) for i,k in enumerate(result["cbct_lower_jaw"])
], None)

cbct_upper_jaw = sum([
    colored_mesh(retrieve_mesh(k), i) for i,k in enumerate(result["cbct_upper_jaw"])
], None)

# Visualization

In [None]:
fusion.show()

Visualize all reconstructions (bones etc.)

In [None]:
reconstructions.show()

Visualize mesh reconstructed from CBCT teeth instance segmentation.

In [None]:
(cbct_lower_jaw + cbct_upper_jaw).show()