In [1]:
# imports are excessive, but some are used when moving to Azure Functions later
import logging
import requests
import json
import os
import azure.functions as func
import azure.storage.queue as asq
import azure.storage.blob as blob
import zipfile
from io import BytesIO
import pandas as pd
import httpx
import asyncio
import time
import azure.storage.blob.aio as blobaio


In [2]:
# Read in all the series in all of TCIA
df = pd.read_csv('C:\\githealth\\data\\TciaSeriesDataForAllCollections.csv')

# Discard a bunch of columns, keeping only the below
df = df[['Collection', 'StudyInstanceUID', 'SeriesCount', 'SeriesInstanceUID', 'ImageCount']]


In [None]:
# Select a subset of data - best to just choose one approach

# Option 1:  Select only a few series from each Collection to take a sample
#df_samples = df.groupby('Collection').nth([0,1,2,3,4]) # Grabs first 5 rows
#len(df_samples)

# Option 2: Select only a single Collection.  First print them, so you can pick which ones you want.
cols = df.Collection.unique()
print(cols)
collection_name = "ACRIN-FLT-Breast" # replace with desired collection
df_samples = df[df['Collection'] == collection_name] 
len(df_samples)


# Option 3: Do whatever you like



In [None]:
# Get a list of SeriesInstanceUIDs to iterate over
series_list = df_samples['SeriesInstanceUID'].tolist()
len(series_list)

#series_list = series_list[1:5]

## Upload with httpx - an async library - plus with async blob uploads
In this code, each series zip file download is done serially, but the blob uploads are done in parallel, sometimes hundreds at a time. The reason for this is the code is meant to execute inside an Azure Function, with a single function call being responsible for the instance downloads (via zip) of a single series.


In [None]:
# Define a few variables

# Use one of the two ways below to define the container name. This code currently only uploads to a single container.
#container_name = 'samples' 
container_name = collection_name.lower() # containers only take lowercase names
storageConnString = 'DefaultEndpointsProtocol=https;AccountName=tciadicoms;AccountKey=+uHbmJGRcox0FPfqW24iv1JhzycVoleE/iC9ThfT8UjfqSkwnbk5LjKGOeA2vkEKuMgShCfUbt9u9uE36gtWwA==;EndpointSuffix=core.windows.net'

print(container_name)

In [None]:

# Set up a connection to the storage container. Ensure storageConnString is set up above

blob_container_sync = blob.ContainerClient.from_connection_string(conn_str=storageConnString,container_name=container_name)
blob_container = blobaio.ContainerClient.from_connection_string(conn_str=storageConnString,container_name=container_name)

# Create the blob store container if it doesn't exist...  by exception
#   Which is hacky, but effective
try:
    await blob_container.get_container_properties()
except:
    await blob_container.create_container()
    

# Get list of all series already downloaded in storage container
#  This gets a list of blob metadata, takes the name, splits it on / to get the folder name (series) using [0]

# Only do this if you want to avoid re-uploading any series. But beware, if only a single file in a series is uploaded
#   the series will show up in the items below
series_blobs = [bl.name.split('/')[0] for bl in blob_container_sync.list_blobs()]
series_blobs = list(set(series_blobs)) # Converting a list to set removes all duplicates but need to make it a list again
len(series_blobs)

In [None]:
blob_container.container_name


In [None]:
# Upload a single file async, throw exception on failure to be caught elsewhere
async def uploadSingleFileToBlob(file_name, file_bytes, blob_container):
    print(f'uploading {file_name}')    
    try:
        return await blob_container.upload_blob(data=file_bytes, name=file_name)
    except Exception as e:
        print(e)
        print(e.ErrorCode)
        #raise(e)

In [None]:
# Do retries of the upload.
async def uploadSingleFileToBlobWithRetries(file_name, file_bytes, blob_container):
    print(f'Uploading {file_name}')
    for i in range(10):
        try:
            stored_blob = await uploadSingleFileToBlob(file_bytes = file_bytes, file_name=file_name, blob_container=blob_container)
            print(f'Blob stored as: {stored_blob}')
            return stored_blob
        except:
            #there was an error, try again
            print(f'sleeping {i}')
            #time.sleep(i) #sleep increasing amount of time and try again.
            continue
    
    return None # at this point, we've failed, but no sense throwing exception for testung data use case

In [None]:
# Define an async upload.
async def uploadSeriesInstancesToBlob(series_id, async_client, blob_container):
    urlGetImageBase = 'https://services.cancerimagingarchive.net/services/v3/TCIA/query/getImage'
    params = {'SeriesInstanceUID': series_id}

    # Not using streaming, since it looks like perf is higher with a full download then upload async to blob
    #  But does use a lot more memory
    
    try:
        r = await async_client.get(urlGetImageBase,params=params,timeout=None) #timeout=15.0)

        file_like_object = BytesIO(r.content)

        z = zipfile.ZipFile(file_like_object)
        
        print(f'Files in zip: {len(z.filelist)}')

        # Using a loop here instead of a list comprehension. Much slower, but can be understood easily
        files_to_upload = [] # array to hold filenames and byte arrays

        for f in z.filelist:
            dicom_file = z.read(f) #this is the byte array
            parts = f.filename.split('/')
            dcm_parts = [p for p in parts if p.find('.dcm') != -1] # split the name and get filename (could also use Path, but whatevs)
            if len(dcm_parts) == 1: # we have a dicom file, and only one
                dcm_name = f'{series_id}/{dcm_parts[0]}' #this is the filename (series/fname)
                #print(dcm_name)
                #uploadSingleFileToBlob(dcm_name, dicom_file, blob_container)
                #print(up)
                files_to_upload.append((dcm_name,dicom_file))

        # We now have a list of (filename,filebytes) tuples
        # Upload them all asynchronously.  This might be insane.  Might want to batch...  Even small series can have hundreds of files
        #   but it seems to be working...  at least for the few items I've tried
        print('About to call upload on zip files')
        responses = await asyncio.gather(*[uploadSingleFileToBlobWithRetries(fname, fbytes, blob_container) for fname, fbytes in files_to_upload])
      
        return responses
    except:
        #there was an error, return None - avoiding exceptions for now...
        return None
    

In [None]:
# Upload a whole collection, but only one series at a time.
#   Ideally replace this with a call to an Azure Function to trigger the download

series_list = df_samples['SeriesInstanceUID'].tolist()
print(len(series_list))

# get an async client to reuse. 
async_client = httpx.AsyncClient()

print(f'Starting upload of {len(series_list)} series from {collection_name}')
for series in series_list:
    if series in series_blobs: # already uploaded at least one instance to blob
        print(f'  Already uploaded {series}')
        continue 
    print(f' Uploading series: {series}')
    start = time.time()
    uploaded_blobs = await uploadSeriesInstancesToBlob(series, async_client, blob_container)
    duration = time.time() - start
    if uploaded_blobs is not None:
        print(f'Uploaded {len(uploaded_blobs)} instances from series uid: {series} in {duration}')
    else:
        print('None returned!  Problem!')
    

In [None]:
storageConnString = 'DefaultEndpointsProtocol=https;AccountName=sjbfunctest;AccountKey=XuYBliYrXazCmfDdK2jLcaJcfqPgu8tC43TlltTMY413nusjx2N6+IvErYmVXuZfOBVgVaCQ52RObKioS9FDRg==;EndpointSuffix=core.windows.net'

series_queue = asq.QueueClient.from_connection_string(conn_str=storageConnString,queue_name='series')

# Create the queue if it doesn't exist...  by exception
#   Which is hacky, but effective
try:
    series_queue.get_queue_properties()
except:
    series_queue.create_queue()

# Must base-64 encode since... functions...
enc = asq.TextBase64EncodePolicy()


## The following uploads a single series, use for troubleshooting


In [None]:
# Upload a single file. Ignore this cell if you're uploading a whole collection
series_list = df_samples['SeriesInstanceUID'].tolist()
print(len(series_list))
print(series_list[0])
async_client = httpx.AsyncClient()
#uploaded_blobs = await uploadSeriesInstancesToBlob(series_list[3], async_client, blob_container)

## The following uploads using httpx - an asynchronous library

# Please skip this one and use the one with parallel blob uploads. It is much faster


In [None]:
# Define an async upload.

# Don't use this one.  Just for reference, as it doesn't have the parallel uploads of blobs.

async def uploadSeriesInstancesToBlob(series_id, async_client, blob_container):
    urlGetImageBase = 'https://services.cancerimagingarchive.net/services/v3/TCIA/query/getImage'
    params = {'SeriesInstanceUID': series_id}

    # Not using streaming, since it looks like perf is higher with a full download then upload async to blob
    #  But does use a lot more memory
    
    try:
        r = await async_client.get(urlGetImageBase,params=params,timeout=None) #timeout=15.0)

        #with httpx.get(urlGetImage,params=params,timeout=None, stream=True) as r:

        file_like_object = BytesIO(r.content)

        z = zipfile.ZipFile(file_like_object)
        files = z.filelist
        print(files)

        for f in files:
            dicom_file = z.read(f)
            parts = f.filename.split('/')
            dcm_parts = [p for p in parts if p.find('.dcm') != -1]
            if len(dcm_parts) == 1: # we have a dicom file, and only one
                dcm_name = f'{series_id}/{dcm_parts[0]}'
                print(dcm_name)
                up = blob_container.upload_blob(data=z.read(f), name=dcm_name)
                #print(up)

        return True
    except:
        #there was an error, return False
        return False
    

## The following uploads using requests - a synchronous library

# Please skip this one and use the Async one above, it is many times faster

In [None]:

# Upload using requests - a synchronous library

storageConnString = 'DefaultEndpointsProtocol=https;AccountName=tciadicoms;AccountKey=t216FXH+Q1FpcE746SgxGRwkZ6qEXiw2vu1J40kUtT2phZvJ5usCppiDQp5YW9NQcWGwIsS+eJBrOFDIORrbyg==;EndpointSuffix=core.windows.net'

b = blob.ContainerClient.from_connection_string(conn_str=storageConnString,container_name='samples')

# Create the blob store container if it doesn't exist...  by exception
#   Which is hacky, but effective
try:
    b.get_container_properties()
except:
    b.create_container()
    
    
# Get list of all series already downloaded in storage container
#  This gets a list of blob metadata, takes the name, splits it on / to get the folder name (series) using [0]
series_blobs = [bl.name.split('/')[0] for bl in b.list_blobs()]
series_blobs = list(set(series_blobs)) # Converting a list to set removes all duplicates but need to make it a list again

for series_id in series_list:
    print(series_id)
    
    # if the series has been downloaded (or even started) then move on.
    if series_id in series_blobs:
        print(f'Already uploaded {series_id}. Skipping.')
        continue
    
    if series_id in ['1.2.840.113713.4.2.20015998712384536720405088092843300246','1.2.840.113713.4.2.277260943311504281313806422270862087705','1.3.6.1.4.1.14519.5.2.1.4429.7055.273220878682060174312749196256']:
        print(f'No way! {series_id} is bad. Skipping.')
        continue
    
    #get a single series (likely with multiple instances) and upload to blob
    urlGetImage = 'https://services.cancerimagingarchive.net/services/v3/TCIA/query/getImage'
    params = {'SeriesInstanceUID': series_id}
    #res = requests.get(urlGetImage,params=params,timeout=None) #timeout=15.0)
    with requests.get(urlGetImage,params=params,timeout=None, stream=True) as r:

        file_like_object = BytesIO(r.content)

        z = zipfile.ZipFile(file_like_object)
        files = z.filelist
        print(f'Uploading {len(files)} files')

        for f in files:
            dicom_file = z.read(f)
            parts = f.filename.split('/')
            dcm_parts = [p for p in parts if p.find('.dcm') != -1]
            if len(dcm_parts) == 1: # we have a dicom file, and only one
                dcm_name = f'{series_id}/{dcm_parts[0]}'
                print(dcm_name)
                up = b.upload_blob(data=z.read(f), name=dcm_name)
                #print(up)