[![Licence](https://img.shields.io/badge/license-MIT-blue)](https://opensource.org/license/mit/)

# IDC on AWS Turotial

This notebook walk through how to study the [NCI Imaging Data Commons](https://portal.imaging.datacommons.cancer.gov/) data on [AWS Open Data repositry](https://registry.opendata.aws/nci-imaging-data-commons/) using native AWS services, including [Amazon SageMaker](https://aws.amazon.com/sagemaker/), [AWS HealthImaging](https://aws.amazon.com/healthimaging/), [Amazon Athena](https://aws.amazon.com/athena/), [Amazon Glue](https://aws.amazon.com/glue/), and so on.

If you are not familiar with IDC data, please run through the existing [tutorials](https://github.com/ImagingDataCommons/IDC-Tutorials/tree/master/notebooks/getting_started) first.

[![Licence](https://img.shields.io/badge/license-MIT-blue)](https://opensource.org/license/mit/)

# IDC on AWS Turotial

This notebook walk through how to study the [NCI Imaging Data Commons](https://portal.imaging.datacommons.cancer.gov/) data on [AWS Open Data repositry](https://registry.opendata.aws/nci-imaging-data-commons/) using native AWS services, including [Amazon SageMaker](https://aws.amazon.com/sagemaker/), [AWS HealthImaging](https://aws.amazon.com/healthimaging/), [Amazon Athena](https://aws.amazon.com/athena/), [Amazon Glue](https://aws.amazon.com/glue/), and so on.

Here is the architecture diagram of this tutorial:

![arch diagram](img/arch_diagram.png)

If you are not familiar with IDC data, please run through the existing [tutorials](https://github.com/ImagingDataCommons/IDC-Tutorials/tree/master/notebooks/getting_started) first.

## Pre-requisites 
    
1. Download the [CloudFormation tempalte](https://github.com/aws-solutions-library-samples/guidance-for-multi-modal-data-analysis-with-aws-health-and-ml-services/blob/main/cfn_template) and deploy the template to create [Amazon SageMaker Domain](https://aws.amazon.com/sagemaker/) and necessary [Amazon IAM](https://aws.amazon.com/iam/) roles

<img src="img/deploy_template.png" width="888">

2. Launch SageMaker Studio application. 

<img src="img/launch_studio.png" width="888">

3. Setup notebook environment using "PyTorch 1.12 Python 3.8 CPU optimized" Kernel with "t3.medium" instance type. 

<img src="img/studio_setup.png" width="800">

## Install and import libraries

In [None]:
%env PIP_DISABLE_PIP_VERSION_CHECK True
%env PIP_ROOT_USER_ACTION ignore

!pip install -q --upgrade pip
!pip install -q --upgrade boto3 botocore 
!pip install -q tqdm nibabel pydicom numpy pylibjpeg-openjpeg AHItoDICOMInterface
!pip install -q "itk>=5.3rc4" "itkwidgets[all]>=1.0a23"
!pip3 install -q sqlalchemy==1.4.47 
!pip3 install -q PyAthena[SQLAlchemy]==2.25.2 

%load_ext autoreload
%autoreload 2

import warnings
warnings.filterwarnings('ignore')

In [None]:
import pydicom
from pydicom.sequence import Sequence
from pydicom import Dataset , DataElement 
from pydicom.dataset import FileDataset, FileMetaDataset
from pydicom.uid import UID
from pydicom.pixel_data_handlers.util import convert_color_space , apply_color_lut
from openjpeg import decode
import array
import json
import logging
import importlib  
import boto3
import sagemaker
from sagemaker import get_execution_role
import io
import sys
import time
import os
import pandas as pd
from botocore.exceptions import ClientError
logging.basicConfig( level="INFO" )
# logging.basicConfig( level="DEBUG" )
from Api import MedicalImaging 
medicalimaging = MedicalImaging()

account_id = boto3.client("sts").get_caller_identity()["Account"]
session = sagemaker.session.Session()
region = boto3.Session().region_name
bucket = sagemaker.Session().default_bucket()
role = f"arn:aws:iam::{account_id}:role/HealthImagingImportJobRole"  ## use this role if you have deployed the CloudFormation template described above
# role = get_execution_role()                ## use this role if you want to use SageMaker Execution role to import image into AWS HealthImaging
print(f"S3 Bucket is {bucket}")
print(f"IAM role for image import job is {role}")

suffix = int(time.time())
ahi_input_prefix = f'idc_tutorial_lidc_images_{suffix}/'
ahi_output_prefix = f'idc_tutorial_importjobs_{suffix}/'
dicom_header_prefix = f'dicom_headers_{suffix}/'

print(f"S3 prefix for input images is {ahi_input_prefix}")
print(f"S3 prefix for import job outputs is {ahi_output_prefix}")

## Download Sample Data from IDC

We will use a subset of the IDC [Lung Image Database Consortium(LIDC)](https://wiki.cancerimagingarchive.net/pages/viewpage.action?pageId=1966254) dataset for the following experiments. You can select the lung nodule segmentations of CT scans from [the IDC portal](https://portal.imaging.datacommons.cancer.gov/explore/filters/?analysis_results_id=DICOM-LIDC-IDRI-Nodules), here is [a demo video](https://app.screencast.com/7QraNK83RLWTo) on how to download the S3 path of DICOM studies. You can download sample images to your own staging S3 bucket by running the following CLI commands (~2mins):

In [None]:
!aws s3 sync --quiet s3://idc-open-data/70e1a841-e82a-41db-84d6-4d6ab266569e/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/173d42c3-4bc2-4cd6-aade-f931116576d2/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/9fe8c918-31d4-4d7a-a06f-ee339f99ffeb/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/f45a1e23-fd38-4750-bd73-e0c0f41eb744/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/57dd6d89-d2c7-439c-b109-3bca4f647742/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/85b44dd6-3a9e-441b-a371-4327e339f684/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/b315752b-f2d5-4475-8ccf-76eb07399389/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/98ad9818-baa3-4d58-ad63-5f502d3ff318/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/54f1f220-1c97-4a46-8f65-d30923e629fe/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/80ceded3-e2d3-4b67-90ad-4d195e1364d8/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/8f0c16e8-9295-4e2b-8883-d52a0e4bc555/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/9abe335e-2df9-46c2-9ef2-a16c4ee5693d/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/af3d99bf-e941-4fb1-9854-38b7115d0880/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/525ab988-fd90-46fc-9a81-ebce58e7acac/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/2bbb3397-bcc2-4885-9acc-adf58c5defd0/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/c31bf2ea-b506-4a11-8aac-d001331dd257/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/417c3a82-87e8-4b23-a55c-406e7aa67f39/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/bb965e17-15ca-42f8-9ae6-8b9990fe059d/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/30ca7460-9213-44d0-9998-d8ef92ea1a3d/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/2072ddb8-020b-4137-951b-b45148fc62e1/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/4f0bd76f-485d-4ca1-8458-59844595ba8b/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/739a1b6c-c11f-480b-a35e-7265c7c7406a/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/8f2399ec-b1b7-427f-b5d1-a1b09a6b4f70/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/417bcd53-295e-46ac-8cf4-ec2aa55949a6/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/e0459c0c-8269-4044-ada4-6d96359dfeee/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/cf86839f-5e89-4aab-927b-103c658290e8/ s3://{bucket}/{ahi_input_prefix} 
!aws s3 sync --quiet s3://idc-open-data/6949b1e0-7a1c-416b-87f6-d682398c8e1b/ s3://{bucket}/{ahi_input_prefix} 

## Create an AWS HealthImaging (AHI) datastore

In [None]:
DatastoreName = "IDCWorkshopDataStore"

datastoreList = medicalimaging.listDatastores()

res_createstore = None
for datastore in datastoreList["datastoreSummaries"]:
    if datastore["datastoreName"] == DatastoreName:
        res_createstore = datastore
        break
if res_createstore is None:        
    res_createstore = medicalimaging.createDatastore(DatastoreName)

datastoreId = res_createstore['datastoreId']
res_getstore = medicalimaging.getDatastore(res_createstore['datastoreId'])    
status = res_getstore['datastoreProperties']['datastoreStatus']
while status!='ACTIVE':
    time.sleep(30)
    res_getstore = medicalimaging.getDatastore(res_createstore['datastoreId'])    
    status = res_getstore['datastoreProperties']['datastoreStatus']
    print(status)
print(f"datastoreId: {datastoreId}; status: {status}")

## Import DICOM studies from staging S3 buckets to AHI datastore

A unique import JobId will be generated for a given combination of datastoreId, input S3 path, and output S3 path. The same combination of three variables will return the existing import JobId. The import job of 9 sample DICOM studies takes about 15mins

In [None]:
res_startimportjob = medicalimaging.startImportJob(
    res_createstore['datastoreId'],
    role,
    f"s3://{bucket}/{ahi_input_prefix}", 
    f"s3://{bucket}/{ahi_output_prefix}"
)

jobId = res_startimportjob['jobId']
jobstatus = medicalimaging.getImportJob(datastoreId, jobId)['jobProperties']['jobStatus']
while jobstatus not in ['COMPLETED', 'FAILED']:
    time.sleep(30)
    jobstatus = medicalimaging.getImportJob(datastoreId, jobId)['jobProperties']['jobStatus']
print(f"jobstatus is {jobstatus}")

In [None]:
imageSetIds = {}
s3=boto3.client('s3')
try:
    response = s3.head_object(Bucket=bucket, Key=f"{ahi_output_prefix}{datastoreId}-DicomImport-{jobId}/job-output-manifest.json")
    if response['ResponseMetadata']['HTTPStatusCode'] == 200:
        data = s3.get_object(Bucket=bucket, Key=f"{ahi_output_prefix}{datastoreId}-DicomImport-{jobId}/SUCCESS/success.ndjson")
        contents = data['Body'].read().decode("utf-8")
        for l in contents.splitlines():
            isid = json.loads(l)['importResponse']['imageSetId']
            if isid in imageSetIds:
                imageSetIds[isid]+=1
            else:
                imageSetIds[isid]=1
except ClientError:
    pass


print("Number of image sets imported: {}".format(len(imageSetIds)))

## Save DICOM Header JSON to S3

The DICOM header includes metadata for a given ImageSetId, which is equivalent to a DICOM series. The DICOM header can be retrieved through native AHI API as a nested JSON object. We will need to parse the JSON object and save each level of information (patient, study, series, instance) into seperate S3 folder.

In [None]:
for s in imageSetIds.keys():
    json_dicom_header = medicalimaging.getMetadata(datastoreId, s)
    patient = json_dicom_header['Patient']['DICOM']
    patient['datastoreid'] = datastoreId
    patient['imagesetid'] = s
    PatientID = patient["PatientID"]
    s3.put_object(
        Body=json.dumps(patient),
        Bucket=bucket,
        Key=f'{dicom_header_prefix}json/patient/{s}.json'
    )
    study=json_dicom_header['Study']['DICOM']
    study['datastoreid'] = datastoreId
    study['imagesetid'] = s
    study['PatientID'] = PatientID
    StudyInstanceUID = study['StudyInstanceUID']
    s3.put_object(
        Body=json.dumps(study),
        Bucket=bucket,
        Key=f'{dicom_header_prefix}json/study/{s}.json'
    )
    for se in list(json_dicom_header['Study']['Series'].keys()):
        series = json_dicom_header['Study']['Series'][se]['DICOM']
        series['datastoreid'] = datastoreId
        series['imagesetid'] = s
        series['PatientID'] = PatientID
        series['StudyInstanceUID'] = StudyInstanceUID
        s3.put_object(
            Body=json.dumps(series),
            Bucket=bucket,
            Key=f'{dicom_header_prefix}json/series/{s}.json'
        )
        for i in list(json_dicom_header['Study']['Series'][se]['Instances']):
            instance = json_dicom_header['Study']['Series'][se]['Instances'][i]['DICOM']
            instance['datastoreid'] = datastoreId
            instance['imagesetid'] = s
            instance['PatientID'] = PatientID
            instance['StudyInstanceUID'] = StudyInstanceUID
            instance['SeriesInstanceUID'] = se
            instance['DICOMVRs'] = json_dicom_header['Study']['Series'][se]['Instances'][i]['DICOMVRs']
            instance['ImageFrames'] = json_dicom_header['Study']['Series'][se]['Instances'][i]['ImageFrames']
            s3.put_object(
                Body=json.dumps(instance),
                Bucket=bucket,
                Key=f'{dicom_header_prefix}json/instances/{i}.json'
            )

## Create Glue Data Catalogs using Crawler

[Amazon Glue crawler can be used to produce data catalog](https://docs.aws.amazon.com/glue/latest/dg/catalog-and-crawler.html), which is used for interactive SQL query through Amazon Athena

In [None]:
glue_client = boto3.client('glue')
glue_crawler_name = 'DICOMHeaderJSONCrawler'
glue_database_name = 'idc_dicom_headers'

try:
    glue_client.get_crawler(Name=glue_crawler_name)
except ClientError as err:
    logging.info( "Could not get crawler Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message'])
    logging.info("Creating Crawler")
    glue_client.create_crawler(
        Name=glue_crawler_name,
        Role=get_execution_role(),
        DatabaseName=glue_database_name,
        Targets={'S3Targets': [{'Path': f"s3://{bucket}/{dicom_header_prefix}json/"}]})

try:
    logging.info("Starting Crawler")
    glue_client.start_crawler(Name=glue_crawler_name)
except ClientError as err:
    logging.error(
        "Couldn't start crawler %s. Here's why: %s: %s", name,
        err.response['Error']['Code'], err.response['Error']['Message'])
    raise

jobstatus = glue_client.get_crawler(Name=glue_crawler_name)['Crawler']['State']
while jobstatus != 'READY':
    time.sleep(30)
    jobstatus = glue_client.get_crawler(Name=glue_crawler_name)['Crawler']['State']
print(f"crawler jobstatus is {jobstatus}")

## Query the DICOM header using Amazon Athena

In [None]:
import requests
from pyathena import connect

# This code uses the connect() function from the pyathena library to establish a connection to Amazon Athena. 
# s3_staging_dir: The S3 URL to the staging directory for query results.
# region_name: The AWS region where the Athena resources are located.
# work_group: The name of the workgroup to use for the connection.
# schema_name: The name of the schema within Athena that you want to interact with.
conn = connect(
    s3_staging_dir=f's3://{bucket}/athena-results/',
    region_name=region,
    work_group='primary', #REPLACE WORKGROUP NAME IF ANY ERROR
    schema_name=glue_database_name,
)

print(conn)

In [None]:
## find Modality for a given ImageSetId
sql = "SELECT distinct series.studyinstanceuid, series.modality, count(series.imagesetid) as ImageSetCount from series GROUP BY 1,2 ORDER BY 3 DESC"
try:        
    # Execute the SQL query using pd.read_sql() and the established connection (conn)
    athen_result_df = pd.read_sql(sql, conn)

except pd.io.sql.DatabaseError as e:
    # Handle the exception if there's an error while executing the SQL query
    logging.error( "SQL query failed: " + sql + " Database error " + str(e) )

athen_result_df

In [None]:
## find ImageFrameId for a given ImageSetId
sql = "SELECT distinct series.imagesetid, series.modality, count(element_at(ImageFrames, 1).id) as ImageFrameCounts from series join instances on series.imagesetid=instances.imagesetid GROUP BY 1,2 ORDER BY 3 DESC"
try:        
    # Execute the SQL query using pd.read_sql() and the established connection (conn)
    athen_result_df = pd.read_sql(sql, conn)

except pd.io.sql.DatabaseError as e:
    # Handle the exception if there's an error while executing the SQL query
    logging.error( "SQL query failed: " + sql + " Database error " + str(e) )

athen_result_df

## Image Visualization

We will use an open source library [Itkwidget](https://github.com/InsightSoftwareConsortium/itkwidgets) to visualize 3D CT scans, which has been described in this [blog post](https://aws.amazon.com/blogs/machine-learning/share-medical-image-research-on-amazon-sagemaker-studio-lab-for-free/). First install [imjoy-jupyter-extension](https://github.com/imjoy-team/imjoy-jupyter-extension) on Studio:

![install imjoy](img/imjoy.png)

In [None]:
import itk
from itkwidgets import view

logging.basicConfig(level=logging.ERROR)
# logging.basicConfig( level=logging.INFO )
logging.getLogger('AHItoDICOMInterface').setLevel(logging.CRITICAL)


import warnings
warnings.filterwarnings('ignore')

from AHItoDICOMInterface.AHItoDICOM import AHItoDICOM
helper = AHItoDICOM()
imagesets = helper.DICOMizeImageSet(datastore_id=datastoreId , image_set_id=athen_result_df['imagesetid'][0])

In [None]:
img = itk.image_view_from_array([ins.pixel_array for ins in imagesets])
viewer = view(img).set_image_gradient_opacity(0.5)

## Clean Up

In [None]:
## S3 bucket
s3 = boto3.client('s3')
try:
    objects=s3.list_objects(Bucket=bucket, Prefix=dicom_header_prefix)
    for object in objects['Contents']:
        s3.delete_object(Bucket=bucket, Key=object['Key'])
    s3.delete_object(Bucket=bucket, Key=dicom_header_prefix)
    objects=s3.list_objects(Bucket=bucket, Prefix=ahi_input_prefix)
    for object in objects['Contents']:
        s3.delete_object(Bucket=bucket, Key=object['Key'])
    s3.delete_object(Bucket=bucket, Key=ahi_input_prefix)
    objects=s3.list_objects(Bucket=bucket, Prefix=ahi_output_prefix)
    for object in objects['Contents']:
        s3.delete_object(Bucket=bucket, Key=object['Key'])
    s3.delete_object(Bucket=bucket, Key=ahi_output_prefix)
except ClientError  as e:
    logging.error(
        "Couldn't delete S3 folder %s. Here's why: %s: %s", name, err.response['Error']['Code'], err.response['Error']['Message'])
    raise


## Glue Crawler and Database
try:
    glue_client.delete_database(Name=glue_database_name)
    glue_client.delete_crawler(Name=glue_crawler_name)
except ClientError as err:
    logging.error(
        "Couldn't delete database and crawler %s. Here's why: %s: %s", name, err.response['Error']['Code'], err.response['Error']['Message'])
    raise

## AHI Datastore images
for s in imageSetIds.keys():
    medicalimaging.deleteImageSet(datastoreId, s)
# medicalimaging.deleteDatastore(datastoreId)