In [4]:
 %%sh
pip install -q --upgrade pip
pip install -q boto3 tqdm nibabel pydicom numpy pylibjpeg-openjpeg #pathlib2 monai 



In [5]:
import pydicom
from pydicom.sequence import Sequence
from pydicom import Dataset , DataElement 
from pydicom.dataset import FileDataset, FileMetaDataset
from pydicom.uid import UID
from pydicom.pixel_data_handlers.util import convert_color_space , apply_color_lut
from openjpeg import decode
import array
import json
import logging
import importlib  
import boto3
import io
import sys
import time
import os
import pandas as pd
from botocore.exceptions import ClientError
logging.basicConfig( level="INFO" )
# logging.basicConfig( level="DEBUG" )

os.environ["AWS_DATA_PATH"] = os.getcwd() + "/src/"
from src.Api import MedicalImaging 

s3 = boto3.client('s3')
iam = boto3.client('iam')

account_id = boto3.client("sts").get_caller_identity()["Account"]
region = boto3.Session().region_name

medicalimaging = MedicalImaging()

In [6]:
InputBucketName = "multimodal-hcls-synthea-coherent-data-set"
OutputBucketName = "ahli-import-job-results-workshop"
s3.create_bucket(Bucket=InputBucketName)
s3.create_bucket(Bucket=OutputBucketName)

assume_role_policy_document = json.dumps({
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "medical-imaging.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
})

import_policy_document = json.dumps({
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:ListBucket",
                "s3:GetEncryptionConfiguration"
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(InputBucketName),
                "arn:aws:s3:::{}".format(OutputBucketName)
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "s3:ListBucket",
                "s3:GetEncryptionConfiguration"
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(InputBucketName),
                "arn:aws:s3:::{}".format(OutputBucketName)
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::{}/*".format(InputBucketName)
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::{}/*".format(OutputBucketName)
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "medical-imaging:CreateDatastore",
                "medical-imaging:GetDatastore",
                "medical-imaging:ListDatastores",
                "medical-imaging:UpdateDatastore",
                "medical-imaging:StartDICOMImportJob",
                "medical-imaging:GetDICOMImportJob",
                "medical-imaging:UpdateDICOMImportJob",
                "medical-imaging:GetDICOMStudyMetdata",
                "medical-imaging:GetImageFrame"
            ],
            "Resource": "*",
            "Effect": "Allow"
        }
    ]
})

try:
    respons_createpolicy = iam.get_policy(PolicyArn=f"arn:aws:iam::{account_id}:policy/AHLI-Service-Import-Policy")
except:
    respons_createpolicy = iam.create_policy(
         PolicyName='AHLI-Service-Import-Policy',
         PolicyDocument=import_policy_document,
     )
try:
    response_createrole=iam.get_role(RoleName='AHLI-Service-Import-Role')
except:
    response_createrole = iam.create_role(
         RoleName='AHLI-Service-Import-Role',
         AssumeRolePolicyDocument=assume_role_policy_document
     )
    response = iam.attach_role_policy(
         RoleName=response_createrole['Role']['RoleName'],
         PolicyArn=respons_createpolicy['Policy']['Arn']
    )
print(response_createrole['Role']['RoleName'])
print(respons_createpolicy['Policy']['Arn'])

AHLI-Service-Import-Role
arn:aws:iam::659535263284:policy/AHLI-Service-Import-Policy


In [7]:
DatastoreName = "CoherentDataStoreDemo"
datastoreList = medicalimaging.listDatastores()

res_createstore = None
for datastore in datastoreList["datastoreSummaries"]:
    if datastore["datastoreName"] == DatastoreName:
        res_createstore = datastore
        break
if res_createstore is None:        
    res_createstore = medicalimaging.createDatastore(DatastoreName)

datastoreId = res_createstore['datastoreId']
res_getstore = medicalimaging.getDatastore(res_createstore['datastoreId'])    
status = res_getstore['datastoreProperties']['datastoreStatus']
while status!='ACTIVE':
    time.sleep(30)
    res_getstore = medicalimaging.getDatastore(res_createstore['datastoreId'])    
    status = res_getstore['datastoreProperties']['datastoreStatus']
    print(status)
print(f"datastoreId: {datastoreId}; status: {status}")

INFO:root:List Datastores  : 179.22043800354004 ms
INFO:root:Get Datastore  : 108.44206809997559 ms


datastoreId: 14950750aa7c383498ab67b365e22bb8; status: ACTIVE


In [8]:
res_startimportjob = medicalimaging.startImportJob(
    res_createstore['datastoreId'],
    response_createrole['Role']['Arn'],
    's3://'+InputBucketName+'/unzipped/dicom/', 
    's3://'+OutputBucketName+'/coherent/'
)

jobstatus = medicalimaging.getImportJob(datastoreId, res_startimportjob['jobId'])['jobProperties']['jobStatus']
while jobstatus!='COMPLETED':
    time.sleep(30)
    jobstatus = medicalimaging.getImportJob(datastoreId, res_startimportjob['jobId'])['jobProperties']['jobStatus']
print(f"jobstatus is {jobstatus}")

INFO:root:Start Import Job  : 498.69656562805176 ms
INFO:root:Get Import Job  : 166.4907932281494 ms


jobstatus is COMPLETED


In [9]:
imageSetIds = {}
try:
    response = s3.head_object(Bucket=OutputBucketName, Key=f"coherent/{res_createstore['datastoreId']}-DicomImport-{res_startimportjob['jobId']}/job-output-manifest.json")
    if response['ResponseMetadata']['HTTPStatusCode'] == 200:
        data = s3.get_object(Bucket=OutputBucketName, Key=f"coherent/{res_createstore['datastoreId']}-DicomImport-{res_startimportjob['jobId']}/SUCCESS/success.ndjson")
        contents = data['Body'].read().decode("utf-8")
        for l in contents.splitlines():
            isid = json.loads(l)['importResponse']['imageSetId']
            if isid in imageSetIds:
                imageSetIds[isid]+=1
            else:
                imageSetIds[isid]=1
except ClientError:
    pass


print("number of image sets: {}".format(len(imageSetIds)))

number of image sets: 298


In [10]:
%store datastoreId
%store imageSetIds

Stored 'datastoreId' (str)
Stored 'imageSetIds' (dict)


## (Optional) Save JSON to S3

In [9]:
for s in imageSetIds.keys():
    json_dicom_header = medicalimaging.getMetadata(datastoreId, s)
    patient = json_dicom_header['Patient']['DICOM']
    patient['imagesetid'] = s
    s3.put_object(
        Body=json.dumps(patient),
        Bucket=OutputBucketName,
        Key='dicom_header/json/patient/{}'.format(s)
    )
    study=json_dicom_header['Study']['DICOM']
    study['imagesetid'] = s
    s3.put_object(
        Body=json.dumps(study),
        Bucket=OutputBucketName,
        Key='dicom_header/json/study/{}'.format(s)
    )
    for se in list(json_dicom_header['Study']['Series'].keys()):
        s3.put_object(
            Body=json.dumps(json_dicom_header['Study']['Series'][se]['DICOM']),
            Bucket=OutputBucketName,
            Key='dicom_header/json/series/{}'.format(s)
        )
        for i in list(json_dicom_header['Study']['Series'][se]['Instances']):
            s3.put_object(
                Body=json.dumps(json_dicom_header['Study']['Series'][se]['Instances'][i]),
                Bucket=OutputBucketName,
                Key='dicom_header/json/series/{}'.format(s)
            )

INFO:root:Metadata fetch  : 248.78931045532227 ms
INFO:root:Metadata fetch  : 217.93746948242188 ms
INFO:root:Metadata fetch  : 263.5033130645752 ms
INFO:root:Metadata fetch  : 228.8510799407959 ms
INFO:root:Metadata fetch  : 224.68209266662598 ms
INFO:root:Metadata fetch  : 216.078519821167 ms
INFO:root:Metadata fetch  : 206.32648468017578 ms
INFO:root:Metadata fetch  : 214.79153633117676 ms
INFO:root:Metadata fetch  : 186.0511302947998 ms
INFO:root:Metadata fetch  : 243.12877655029297 ms
INFO:root:Metadata fetch  : 282.14311599731445 ms
INFO:root:Metadata fetch  : 227.65445709228516 ms
INFO:root:Metadata fetch  : 245.194673538208 ms
INFO:root:Metadata fetch  : 211.95077896118164 ms
INFO:root:Metadata fetch  : 225.4354953765869 ms
INFO:root:Metadata fetch  : 195.9846019744873 ms
INFO:root:Metadata fetch  : 213.026762008667 ms
INFO:root:Metadata fetch  : 226.2578010559082 ms
INFO:root:Metadata fetch  : 230.38601875305176 ms
INFO:root:Metadata fetch  : 178.40170860290527 ms
INFO:root:Me

## Save Parquet to S3

In [111]:
for s in imageSetIds.keys():
    json_dicom_header = medicalimaging.getMetadata(datastoreId, s)
    dfstudy = pd.json_normalize(json_dicom_header, max_level=2)
    dfstudy[dfstudy.columns[:-1]].to_parquet('dicomheader/study/{}.parquet.gzip'.format(s),compression='gzip')   ### write to files and upload to s3 later
    for se in list(json_dicom_header['Study']['Series'].keys()):
        dfseries = pd.json_normalize(json_dicom_header['Study']['Series'][se], max_level=2)
        dfseries = dfseries.iloc[:, ['Instances' not in c for c in dfseries.columns]]
        dfseries['ImageSetID'] = json_dicom_header['ImageSetID']
        dfseries.to_parquet('dicomheader/series/{}.parquet.gzip'.format(se),compression='gzip')
        for i in list(json_dicom_header['Study']['Series'][se]['Instances']):
            dfinstance = pd.json_normalize(json_dicom_header['Study']['Series'][se]['Instances'][i], max_level=2)
            dfinstance['ImageSetID'] = json_dicom_header['ImageSetID']
            dfinstance.to_parquet('dicomheader/instance/{}.parquet.gzip'.format(se),compression='gzip')
        

INFO:root:Metadata fetch  : 306.6284656524658 ms
INFO:root:Metadata fetch  : 192.8119659423828 ms
INFO:root:Metadata fetch  : 233.66928100585938 ms
INFO:root:Metadata fetch  : 345.07155418395996 ms
INFO:root:Metadata fetch  : 295.3059673309326 ms
INFO:root:Metadata fetch  : 251.16991996765137 ms
INFO:root:Metadata fetch  : 223.10543060302734 ms
INFO:root:Metadata fetch  : 298.1562614440918 ms
INFO:root:Metadata fetch  : 236.17196083068848 ms
INFO:root:Metadata fetch  : 243.20650100708008 ms
INFO:root:Metadata fetch  : 578.5226821899414 ms
INFO:root:Metadata fetch  : 220.12734413146973 ms
INFO:root:Metadata fetch  : 196.2592601776123 ms
INFO:root:Metadata fetch  : 268.4319019317627 ms
INFO:root:Metadata fetch  : 215.01636505126953 ms
INFO:root:Metadata fetch  : 178.2703399658203 ms
INFO:root:Metadata fetch  : 212.42356300354004 ms
INFO:root:Metadata fetch  : 207.2618007659912 ms
INFO:root:Metadata fetch  : 214.339017868042 ms
INFO:root:Metadata fetch  : 259.479284286499 ms
INFO:root:Met

In [112]:
import sagemaker
import time
from time import gmtime, strftime

role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
boto_session = boto3.Session(region_name=region)
account_id = boto3.client('sts').get_caller_identity().get('Account')
default_bucket = sagemaker_session.default_bucket()

sagemaker_session.upload_data(path='./dicomheader/study', bucket='synthea-coherent-data', key_prefix='dicom_headers/study/')
sagemaker_session.upload_data(path='./dicomheader/series', bucket='synthea-coherent-data', key_prefix='dicom_headers/series/')
sagemaker_session.upload_data(path='./dicomheader/instance', bucket='synthea-coherent-data', key_prefix='dicom_headers/instance/')

's3://synthea-coherent-data/dicom_headers/instance/'

In [5]:
whos

Variable                              Type        Data/Info
-----------------------------------------------------------
DiscoveryWidget                       type        <class '__main__.DiscoveryWidget'>
Loader                                ABCMeta     <class 'importlib.abc.Loader'>
MetaPathFinder                        ABCMeta     <class 'importlib.abc.MetaPathFinder'>
ModuleSpec                            type        <class '_frozen_importlib.ModuleSpec'>
PACKAGE_DIRS                          type        <class '__main__.PACKAGE_DIRS'>
SageMakerDataWranglerPandasFinder     ABCMeta     <class '__main__.SageMake<...>ataWranglerPandasFinder'>
display                               function    <function display at 0x7f89fd75daf0>
has_pandas_finder                     function    <function has_pandas_finder at 0x7f89f85eeb80>
is_sagemaker_datawrangler_installed   function    <function is_sagemaker_da<...>talled at 0x7f89f85eeaf0>
os                                    module      <module 

## Clean Up

In [None]:
# try:
#     s3res = boto3.resource('s3')
#     bucket = s3res.Bucket(InputBucketName)
#     bucket.object_versions.delete()
#     s3.delete_bucket(Bucket=InputBucketName)
#     bucket = s3res.Bucket(OutputBucketName)
#     bucket.object_versions.delete()
#     s3.delete_bucket(Bucket=OutputBucketName)
# except ClientError  as e:
#     if e.response['Error']['Code'] == 'NoSuchBucket':
#         print("Bucket already deleted")
    
try: 
    resp = iam.detach_role_policy(PolicyArn=respons_createpolicy['Policy']['Arn'],RoleName=response_createrole['Role']['RoleName'])
    resp = iam.delete_policy(PolicyArn=respons_createpolicy['Policy']['Arn'])
    resp = iam.delete_role(RoleName=response_createrole['Role']['RoleName'])
except ClientError as ee:
    if ee.response['Error']['Code'] == 'NoSuchEntity':
        print("Policy not attached, ignore")
    else: 
        print(ee)