<table class="tfo-notebook-buttons" align="left">
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/google-health/imaging-research/blob/master/ct-foundation/CT_Foundation_NIfTI_Demo.ipynb"><img src="https://www.tensorflow.org/images/colab_logo_32px.png" />Run in Google Colab</a>
  </td>
  <td>
    <a target="_blank" href="https://github.com/google-health/imaging-research/tree/master/ct-foundation"><img src="https://www.tensorflow.org/images/GitHub-Mark-32px.png" />View source on GitHub</a>
  </td>
</table>


In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

## CT Foundation API Demo with NIfTI images
The ipynb is a demonstration of using the
[CT Foundation API](https://github.com/google-health/imaging-research/tree/master/ct-foundation)
(this API computes embeddings from CT volumes).

The contents include how to:

-   Check NIfTI files to ensure they are in the proper format.
-   Call inference on CTs stored as NIfTI files in a Google cloud bucket

Please see the CT_Foundation_Demo colab for running DICOMs and training downstream models.


### This notebook is for API demonstration purposes only

**Note: This notebook is for API demonstration purposes only.**

It's important to use evaluation datasets
that reflect the expected distribution of images and patients you wish to use any downstream models on.

This means that the best way to determine if this API is right for you is to try it with data that would be used for the downstream task you're interested in.

# Data Attribution

This notebook makes use of two public datasets provided by the Cancer Imaging Archive which is managed by the United States  National Cancer Institute


### LIDC-IDRI Data Access CC BY 3.0
https://www.cancerimagingarchive.net/collection/lidc-idri/

#### LIDC-IDRI Data Citation

Armato III, S. G., McLennan, G., Bidaut, L., McNitt-Gray, M. F., Meyer, C. R., Reeves, A. P., Zhao, B., Aberle, D. R., Henschke, C. I., Hoffman, E. A., Kazerooni, E. A., MacMahon, H., Van Beek, E. J. R., Yankelevitz, D., Biancardi, A. M., Bland, P. H., Brown, M. S., Engelmann, R. M., Laderach, G. E., Max, D., Pais, R. C. , Qing, D. P. Y. , Roberts, R. Y., Smith, A. R., Starkey, A., Batra, P., Caligiuri, P., Farooqi, A., Gladish, G. W., Jude, C. M., Munden, R. F., Petkovska, I., Quint, L. E., Schwartz, L. H., Sundaram, B., Dodd, L. E., Fenimore, C., Gur, D., Petrick, N., Freymann, J., Kirby, J., Hughes, B., Casteele, A. V., Gupte, S., Sallam, M., Heath, M. D., Kuhn, M. H., Dharaiya, E., Burns, R., Fryd, D. S., Salganicoff, M., Anand, V., Shreter, U., Vastagh, S., Croft, B. Y., Clarke, L. P. (2015). Data From LIDC-IDRI [Data set]. The Cancer Imaging Archive. https://doi.org/10.7937/K9/TCIA.2015.LO9QL9SX

# Installation & Setup



In [None]:
# Notebook specific dependencies

!pip install google-auth requests-toolbelt
!pip install nibabel



In [8]:
import gzip
import http
import io
from typing import Iterable, Optional
from google.auth import credentials as gcredentials
from google.auth.transport import requests
from google.cloud import storage
from google.colab import auth
from google.oauth2 import credentials
import matplotlib
import nibabel as nib
import numpy as np
from requests_toolbelt.multipart import decoder
import pandas as pd

**IMPORTANT**: If you are using Colab, you must restart the runtime after installing new packages.

NOTE: There will be some ERROR messages due to the protobuf library - this is normal.

In [9]:
# @title Authenticate
# Authenticate user for access. There will be a popup asking you to sign in with your user and approve access.
auth.authenticate_user()
TOKEN_ = !gcloud beta auth application-default print-access-token
TOKEN = TOKEN_[0]

# This is your token for accessing the API and CT Volumes.
# It's good for 1 hour until you need a new one.
TOKEN

'ya29.a0AXeO80QZnGIcn1pt7hXW9WmOfKs_dgOypVCXY2Blj1TUfZILNk4Wk2XSgzIsUXglvy6LADxXMm1gNGot-LYzxYkDVbvbC4a3H857BmUEMIfbvv4kx-Q6g5FbLEtBVebE-g7CXXp6ADhCKKm6Q32kbD2jdaUNP2EX3jnXaaEqaCgYKAW4SARASFQHGX2MiunsWYCEC0zVT6bKcZC_ZfA0175'

## Collect the stored NPZ data from the cloud bucket

In [53]:
# #@title Create token and call the API for the DICOM volume

# nifti_urls = []

# for a in gcs_bucket.list_blobs(prefix='LIDC-IDRI-0208.nii.gz'):
#   if a.name.endswith('.gz'):
#     nifti_urls.append('gs://lidc_perceptualct/' + a.name)
# for a in gcs_bucket.list_blobs(prefix='LIDC-IDRI-0208_nobs360_guidance10.nii.gz'):
#   if a.name.endswith('.gz'):
#     nifti_urls.append('gs://lidc_perceptualct/' + a.name)

# print('Files to process:')
# print(nifti_urls)
# print(len(nifti_urls))
# # Credentials to access the API
# credentials = google.auth.default()[0]

# # Token to access the DICOMs in the DICOM store
# TOKEN_ = !gcloud beta auth application-default print-access-token
# TOKEN = TOKEN_[0]


# # Call the API with a single call and a batch size of 3.
# my_embeddings = get_ct_embeddings(
#     caller=Endpoint(), credentials=credentials, urls=nifti_urls,
#     access_token=TOKEN, batch_size=1, parallel_size=2)
# # Total passed urls are 3
# print(f'Total return results: {len(my_embeddings)}')
# print('Example from first result....')
# print(f'Embeddings or error message for the CT: {my_embeddings[0][1]}')
# print(my_embeddings[0][0])
# df = pd.DataFrame(my_embeddings, columns=['Embeddings', 'Name'])

Files to process:
['gs://lidc_perceptualct/LIDC-IDRI-0208.nii.gz', 'gs://lidc_perceptualct/LIDC-IDRI-0208_nobs360_guidance10.nii.gz']
2
Total return results: 2
Example from first result....
Embeddings or error message for the CT: gs://lidc_perceptualct/LIDC-IDRI-0208.nii.gz
[-0.1827561110258102, -1.241934180259705, -1.284136533737183, 1.310749888420105, -0.7993499636650085, 0.9644224047660828, -1.476088643074036, 1.168237924575806, -0.4147839546203613, -0.2353465706110001, -1.270089745521545, -0.603443443775177, -0.02113744057714939, -0.6256497502326965, -0.3766814470291138, -0.4883481860160828, 2.177797317504883, 2.294134855270386, 0.1932857632637024, -1.751186609268188, 2.031693458557129, -0.3508084416389465, 0.0298360101878643, -0.7040283679962158, 1.536370992660522, -1.217443585395813, -1.729920148849487, 0.662085771560669, -2.450458288192749, -0.466509222984314, 1.408177614212036, -0.3496276438236237, 0.1209481358528137, 1.626335024833679, -1.89916718006134, -0.6230788826942444, -

In [54]:
rec_embs = np.array(my_embeddings[1][0])
gt_embs = np.array(my_embeddings[0][0])
cossim = (rec_embs*gt_embs).sum()/((rec_embs**2).sum()*(gt_embs**2).sum())**0.5
cossim

0.9937066965693846

In [19]:
(rec_embs*gt_embs).sum()/((rec_embs**2).sum()*(gt_embs**2).sum())**0.5

0.9921150047895679

In [15]:
(np.array(my_embeddings[1][0])*np.array(my_embeddings[0][0])).sum()/((np.array(my_embeddings[1][0])**2).sum()*(np.array((my_embeddings[0][0])**2).sum()))**0.5

TypeError: unsupported operand type(s) for ** or pow(): 'list' and 'int'

In [68]:
df_final = pd.concat([df_final, df], ignore_index=True)
df_final.to_csv('/content/sample_data/lidc_test_embeddings21.csv', index=False)

df_final


Unnamed: 0,Embeddings,Name
0,"[0.3478351533412933, 0.2323243021965027, -0.94...",gs://lidc_perceptualct/LIDC-IDRI-0200.nii.gz
1,"[0.3198219835758209, -0.2165238112211227, -0.9...",gs://lidc_perceptualct/LIDC-IDRI-0200_nobs100_...
2,"[0.3680700957775116, -1.043951869010925, -1.21...",gs://lidc_perceptualct/LIDC-IDRI-0200_nobs10_g...
3,"[0.3922318816184998, -0.1925630271434784, -0.9...",gs://lidc_perceptualct/LIDC-IDRI-0200_nobs120_...
4,"[0.4454675316810608, -0.05436502769589424, -0....",gs://lidc_perceptualct/LIDC-IDRI-0200_nobs140_...
...,...,...
709,"[0.91888427734375, -0.876690685749054, -0.9932...",gs://lidc_perceptualct/LIDC-IDRI-0220_nobs7_gu...
710,"[0.8841715455055237, -0.6646436452865601, -0.9...",gs://lidc_perceptualct/LIDC-IDRI-0220_nobs80_g...
711,"[0.7458922863006592, -1.014414072036743, -1.08...",gs://lidc_perceptualct/LIDC-IDRI-0220_nobs8_gu...
712,"[0.8286287188529968, -0.6488018035888672, -0.9...",gs://lidc_perceptualct/LIDC-IDRI-0220_nobs90_g...


In [39]:
# Save DataFrame to a CSV file
df_final.to_csv('/content/sample_data/lidc_test_embeddings.csv', index=False)


In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
# OPTIONAL
# Download and check the NIfTI file to make sure it can run through CT Foundation:


client = storage.Client(
    project=None, credentials=credentials.Credentials(TOKEN)
)

for a_nifti in nifti_urls:
  print(f'Checking: {a_nifti}')
  blob = storage.Blob.from_string(a_nifti, client=client)
  with blob.open('rb') as f:
    the_bytes = f.read()

  # Unzip the file
  compressed_stream = io.BytesIO(the_bytes)
  with gzip.GzipFile(fileobj=compressed_stream, mode='rb') as decompressed:
    the_bytes = decompressed.read()
  compressed_stream.close()

  # Load and check NIfTI image
  nifti_image = nib.Nifti1Image.from_bytes(io.BytesIO(the_bytes).read())
  reoriented_img = nib.as_closest_canonical(nifti_image)
  reoriented_img = reoriented_img.get_fdata()

  slope, intercept = nifti_image.header.get_slope_inter()

  print(f'Should be (512,512, x) {reoriented_img.shape}')
  print('Min and Max should be in Hounsfield Units')
  # Note: Some scanners set -3024 for outside of the imaging area
  print(f'Min: {np.min(reoriented_img)}')
  print(f'Max: {np.max(reoriented_img)}')

Checking: gs://lidc_perceptualct/LIDC-IDRI-0200.nii.gz
Should be (512,512, x) (128, 128, 128)
Min and Max should be in Hounsfield Units
Min: -1000.0
Max: 1000.0
Checking: gs://lidc_perceptualct/LIDC-IDRI-0200_nobs100_guidance10.nii.gz
Should be (512,512, x) (128, 128, 128)
Min and Max should be in Hounsfield Units
Min: -1000.0
Max: 1000.0
Checking: gs://lidc_perceptualct/LIDC-IDRI-0200_nobs10_guidance10.nii.gz
Should be (512,512, x) (128, 128, 128)
Min and Max should be in Hounsfield Units
Min: -1000.0
Max: 813.4373779296875
Checking: gs://lidc_perceptualct/LIDC-IDRI-0200_nobs120_guidance10.nii.gz
Should be (512,512, x) (128, 128, 128)
Min and Max should be in Hounsfield Units
Min: -1000.0
Max: 1000.0
Checking: gs://lidc_perceptualct/LIDC-IDRI-0200_nobs140_guidance10.nii.gz
Should be (512,512, x) (128, 128, 128)
Min and Max should be in Hounsfield Units
Min: -1000.0
Max: 1000.0
Checking: gs://lidc_perceptualct/LIDC-IDRI-0200_nobs15_guidance10.nii.gz
Should be (512,512, x) (128, 128, 12

KeyboardInterrupt: 

## Call the API to compute embeddings for the selected NIfTI.

**NOTE:** *The API can take up to 10 minutes to scale individual instances. If you get errors, wait and attempt them again.*

Errors results in a FAIL Status string instead of embeddings in the returned list.

**NOTE:** Up to 300 parallel requests can be made if the system is fully scaled. Please start at 50 and reduce requests if you are getting end point errors.

In [6]:
# @title Python methods to call CT Foundation's API with NIfTI URLs.

from concurrent.futures import ThreadPoolExecutor
import dataclasses
import functools
import json
from typing import Any, Tuple
import google.auth
import google.auth.transport.requests
import numpy as np


@dataclasses.dataclass(eq=False, frozen=True)
class Response:
  """Response from a Vertex Endpoint."""

  status_code: int
  response_json: dict[str, Any] | None  # json_types.JSONObject


class Endpoint:
  """Calling utility for a Vertex Endpoint using default credentials."""

  def __init__(self):
    self._endpoint_url = (
        'https://us-central1-aiplatform.googleapis.com/v1/projects/'
        'hai-cd3-foundations/locations/us-central1/endpoints/300'
    )

  def predict(
      self,
      instances=list[Any],
      parameters: dict[str, Any] | None = None,
      credentials: google.auth.credentials.Credentials | None = None,
  ) -> Response:
    """Calls the Vertex Endpoint with the given instances and parameters."""
    if credentials is None:
      credentials = google.auth.default()[0]
    session = google.auth.transport.requests.AuthorizedSession(
        credentials=credentials
    )
    response = session.post(
        self._endpoint_url + ':predict',
        json=(
            {'instances': instances}
            | ({'parameters': parameters} if parameters is not None else {})
        ),
        headers={
            'Content-Type': 'application/json',
        },
        timeout=400,
    )
    try:
      response_json = response.json()
    except json.JSONDecodeError:
      # Not expected, handling in case server incorrectly returns non-JSON.
      response_json = None
    return Response(
        status_code=response.status_code,
        response_json=response_json,
    )


def call_single_batch(
    caller: Endpoint, credentials, urls: list[str], access_token: str
) -> list[Tuple[np.ndarray | str, str]]:
  """Handles calls for a single batch and returns embeddings."""
  return_data = []
  if not credentials.valid:
    credentials.refresh(google.auth.transport.requests.Request())
  instances = [
      {'gcs_uri': a_url, 'bearer_token': f'{access_token}'} for a_url in urls
  ]
  returns = caller.predict(instances=instances)
  if returns.status_code != 200:
    for a_url in urls:
      return_data.append((f'FAIL STATUS {returns.status_code}', a_url))
    return return_data
  else:
    for i in range(len(returns.response_json['predictions'])):
      if returns.response_json['predictions'][i]['error_response']:
        return_data.append(
            (returns.response_json['predictions'][i]['error_response'], urls[i])
        )
      else:
        embeddings = returns.response_json['predictions'][i][
            'embedding_result'
        ]['embedding']
        return_data.append((embeddings, urls[i]))
    return return_data


def get_ct_embeddings(
    caller: Endpoint,
    credentials,
    urls: list[str],
    access_token: str,
    batch_size: int,
    parallel_size: int,
) -> list[Tuple[np.ndarray | str, str]]:
  """Handles calls and returns for parallel requests.

  Args:
    caller: CT foundation API caller.
    credentials: The credentials for the API.
    urls: List of urls to the NIfTI files in the cloud bucket. This must be of
      length batch_size * parallel_size.
    access_token: Access token for the DICOM store.
    batch_size: The number of volumes to pass in a batch (max 5).
    parallel_size: The number of parallel calls.

  Returns:
    Tuple list of embeddings | errors and the corresponding urls from which
      the embeddings were computed.
  """
  assert batch_size < 6, 'Batch size must be 5 or less.'
  assert (
      len(urls) == batch_size * parallel_size
  ), 'Error in batch, parallel sizes versus requests'

  # Setup up parallel batches
  p_urls = []
  for i in range(parallel_size):
    p_urls.append(urls[i * batch_size : (i + 1) * batch_size])

  # Check for correct sizing
  assert len(p_urls) == parallel_size, 'Error in batch, parallel dimensions'

  call_batch = functools.partial(call_single_batch, caller, credentials)

  # Launch parallel calls
  with ThreadPoolExecutor(max_workers=parallel_size) as executor:
    futures = [
        executor.submit(call_batch, b_urls, access_token) for b_urls in p_urls
    ]
    results = [f.result() for f in futures]
  # Unpack results into a single list
  return_results = []
  for b_result in results:
    for a_result in b_result:
      return_results.append(a_result)
  return return_results

In [None]:
#@title Create token and call the API for the DICOM volume

# Credentials to access the API
credentials = google.auth.default()[0]

# Token to access the DICOMs in the DICOM store
TOKEN_ = !gcloud beta auth application-default print-access-token
TOKEN = TOKEN_[0]


# Call the API with a single call and a batch size of 3.
my_embeddings = get_ct_embeddings(
    caller=Endpoint(), credentials=credentials, urls=nifti_urls,
    access_token=TOKEN, batch_size=3, parallel_size=1)
# Total passed urls are 3
print(f'Total return results: {len(my_embeddings)}')
print('Example from first result....')
print(f'Embeddings or error message for the CT: {my_embeddings[0][1]}')
print(my_embeddings[0][0])

Total return results: 3
Example from first result....
Embeddings or error message for the CT: gs://hai-cd3-foundations-ct3d-vault-entry/lidc/1.3.6.1.4.1.14519.5.2.1.6279.6001.153985109349433321657655488650.nii.gz
[-0.7965675592422485, 0.5134899020195007, -0.8446145057678223, 1.650375247001648, -0.8611537218093872, -0.2983881235122681, -0.2350313514471054, -0.03746968880295753, -0.3491234481334686, -1.801878094673157, -1.586135625839233, -0.4713811874389648, -2.144275426864624, -0.7525808811187744, -1.480953454971313, -1.538024544715881, 0.5738796591758728, 1.313102841377258, -0.2632680535316467, 0.3683145642280579, 1.029897928237915, 0.8377450704574585, -0.3613732159137726, -0.5696321129798889, 1.217603325843811, 1.230955719947815, -0.5118405818939209, -0.7924652695655823, -3.048054456710815, -0.7912593483924866, 0.3353587687015533, 0.2827237546443939, 1.442557692527771, 0.8849602937698364, -0.7764493227005005, 0.1428226083517075, 0.3671017587184906, 1.952146053314209, -0.3514912724494

# Trying CT Foundation on your own NIfTI files.


1.   [Create your own cloud bucket](https://cloud.google.com/storage/docs/creating-buckets)
2.   Upload your NIfTIs to the bucket.
3.   Call the API for a given NIfTI files in your cloud bucket.
4.   Collect and store your embeddings for training.

**NOTE**: If performing parallel calls, i.e. parallel_size >1, please start at
50 or less as a start.

If you have any feedback or questions please email us at: ct-foundation@google.com


