In [1]:
import PIL
import pydicom
import numpy as np
import pyodbc
import urllib
import sqlalchemy
from shutil import move, copytree, copy2
from datetime import datetime
from fastprogress.fastprogress import master_bar, progress_bar

import pandas as pd
from pathlib import Path
import os
import hashlib

pd.set_option('display.max_colwidth', 250)
pd.set_option('display.max_columns', None)

In [2]:
## Functions

def hash_md5(file_to_hash):
    """Takes a list of full paths and returns a list of MD5 hashes"""
    file_md5 = []
    if type(file_to_hash) is str:
        file_to_hash = [file_to_hash]

    BLOCK_SIZE = 65536*10 # The size of each read from the file

    for i, file_name in progress_bar(enumerate(file_to_hash),total=len(file_to_hash)):
        file_hash = hashlib.md5() # Create the hash object, can use something other than `.sha256()` if you wish
        with open(file_name, 'rb') as f: # Open the file to read it's bytes
            fb = f.read(BLOCK_SIZE) # Read from the file. Take in the amount declared above
            while len(fb) > 0: # While there is still data being read from the file
                file_hash.update(fb) # Update the hash
                fb = f.read(BLOCK_SIZE) # Read the next block from the file
        file_md5.append(file_hash.hexdigest())
    return file_md5

def connect_sql(server = '*****', database = '*****'):
    username = pd.read_csv('*****',"|", header=None).iloc[0][0]
    password = pd.read_csv('*****',"|", header=None).iloc[0][1]
    params = urllib.parse.quote_plus('DRIVER={ODBC Driver 17 for SQL Server};SERVER=tcp:'+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
    engine = sqlalchemy.create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
    print(f'Connected to {database}')
    return engine

def upload_sql(df_to_upload, chunksize, table_name, schema):
    for i in progress_bar(range(len(df_to_upload)//chunksize+1),total=(len(df_to_upload)//chunksize+1)):
        df_to_upload[i*chunksize:(i+1)*chunksize].to_sql(table_name, engine, schema=schema, if_exists="append", index = False, method = "multi")

In [5]:
folder_to_import = '*****'

resolution = 224
working_directory = '*****'

folder_to_import = Path(folder_to_import)
working_directory = Path(working_directory)
dicom_path = Path(os.path.join(working_directory, 'dicom'))
image_path = Path(os.path.join(working_directory, 'jpg', str(resolution)))
image_processing_path = Path(os.path.join(working_directory, 'jpg', 'being_processed'))

In [None]:
engine = connect_sql()

# 1) DICOM Manipulation

In [None]:
statement = 'SELECT **** )'

pd_dicom_paths = pd.read_sql_query(statement, con=engine)
pd_dicom_paths["filename"] = pd_dicom_paths.source_path.str.split('/').str[-1]

print(f'Rows loaded: {pd_dicom_paths.shape[0]}')

pd_dicom_paths = pd_dicom_paths[~pd_dicom_paths.duplicated(subset='path', keep='first')].reset_index(drop=True)
print(f'Rows to insert after having removing duplicates: {pd_dicom_paths.shape[0]}')

In [38]:
##### Columns to extract from DICOM files
dcm_fields = ['AquisitionDate',
 'AcquisitionDeviceProcessingDescription',
 'BodyPartExamined',
 'BurnedInAnnotation',
 'DetectorManufacturerName',
 'DetectorManufacturerModelName',
 'DeviceSerialNumber',
 'Manufacturer',
 'ManufacturerModelName',
 'Modality',
 'PerformedProcedureStepDescription',
 'ProtocolName',
 'RequestedProcedureDescription',
 'StudyInstanceUID',
 'StudyDate',
 'StudyTime',
 'ViewPosition',
 'PhotometricInterpretation'
]

In [39]:
data=[]
for p in progress_bar(pd_dicom_paths.source_path.tolist()):
    dcm = pydicom.dcmread(str(p),stop_before_pixels=True)
    datar = {f:dcm[f].value for f in dcm_fields if f in dcm }
    data.append(datar)

In [83]:
dicom_data = pd.DataFrame(data)
final_df = pd.concat([pd_dicom_paths.reset_index(drop=True), dicom_data.drop(columns={"StudyInstanceUID"})], axis=1)

In [84]:
# We change "path" for its future location
final_df["path"] = str(dicom_path) + '/' + final_df.StudyDate.str[:4]+'_'+final_df.StudyDate.str[4:6]+'/'+final_df.StudyInstanceUID+'/'+final_df.SeriesInstanceUID+'/'+final_df.filename

In [85]:
if 'AcquisitionDeviceProcessingDescription' in final_df.columns:
    final_df["AcquisitionDeviceProcessingDescription"] = final_df["AcquisitionDeviceProcessingDescription"].astype('str')

final_df = final_df.replace('',np.NaN).replace('nan',np.NaN)

In [88]:
chunksize = 100

upload_sql(final_df.drop(columns=['source_path', 'filename']), chunksize, '****', '****')

In [27]:
final_df["src_copy"] = ''
final_df["dst_copy"] = ''

for i in final_df.index:
    final_df["src_copy"][i] = '/'+os.path.join(*(final_df.source_path[i].split('/')[:-2]))
    final_df["dst_copy"][i] = '/'+os.path.join(*(final_df.path[i].split('/')[:-2]))

to_copy = final_df[["src_copy", "dst_copy"]].drop_duplicates().values.tolist()

In [None]:
for i in range(len(to_copy)):
    copytree(to_copy[i][0], to_copy[i][1], dirs_exist_ok=True)
    if i%500 == 0:
        print(f'Copied {i} files / {len(to_copy)}')
        if i > 0:
            duration = (datetime.now() - start).total_seconds()
            print(f'Speed: {500*60*60/duration} folders/h')
        start = datetime.now()
        
print(f'Copied {i+1} folders / {len(to_copy)}')

# 3) JPG EXTRACTION

In [5]:
# We'll just select only Dicoms that are NOT in the folder_results as jpg, just to increase speed and allow resume of job
# We remove all Dicoms that don't have property "PhotometricInterpretation", since they'll return errors.

df = pd.read_sql_query("****", con=engine)

In [7]:
def crop_outside(img,mask):
    m,n = mask.shape
    mask0,mask1 = mask.any(0),mask.any(1)
    col_start,col_end = mask0.argmax(),n-mask0[::-1].argmax()
    row_start,row_end = mask1.argmax(),m-mask1[::-1].argmax()
    return img[row_start:row_end,col_start:col_end]

def open_dcm(row):
    dcm = pydicom.dcmread(row.path)
    w = dcm.WindowWidth if 'WindowWidth' in dcm else 4000
    c = dcm.WindowCenter if 'WindowCenter' in dcm else 2000
    i = dcm.RescaleIntercept if 'RescaleIntercept' in dcm else 0
    s = dcm.RescaleSlope if 'RescaleSlope' in dcm else 1
    phi = dcm.PhotometricInterpretation

    #remove black border surrounding image
    arr = dcm.pixel_array
    amx, amn = arr.max(),arr.min()
    tolerance = 30 #hounsfield units to exclude black border
    if abs(c+w/2-amx)>100: w,c = amx-amn, (amx+amn)/2 #sometimes w,c are wrong
    if phi == 'MONOCHROME2':
        arr = crop_outside(arr,arr>amn+tolerance)
        arr = (c-w/2+arr)/w*255
    elif phi== 'MONOCHROME1' :
        arr = crop_outside(arr,arr<amx-tolerance)
        arr = (c+w/2-arr)/w*255
    else:
        raise Exception('Image must be monochrome')             
    arr = (arr+i)/s
    return PIL.Image.fromarray(arr.astype(np.ubyte)) 

In [None]:
os.makedirs(image_processing_path, exist_ok=True)

size = (resolution,resolution)

def convert_img(_,idx):
    row = df.iloc[idx]
    img = open_dcm(row)
    img= img.resize(size, PIL.Image.LANCZOS)
    image_name = str(row.path.split('/')[-1].strip('.dcm'))+'.jpg' 
    img.save(image_processing_path / image_name)
    
parallel(convert_img, df.index)
print(f'Done converting {len(df)} dicoms to {size} size')

In [None]:
jpg_paths = pd.DataFrame({'path':list(image_processing_path.rglob("*.jpg"))})
print("List of converted jpgs read")
jpg_paths["SOPInstanceUID"] = jpg_paths.path.astype(str).str.split('/').str[-1].str[:-4]

converted_jpgs = df[df.SOPInstanceUID.isin(jpg_paths.SOPInstanceUID)]

if len(converted_jpgs) > 0:
    chunksize = 1000

    engine.execute('DROP TABLE IF EXISTS ****')

    upload_sql(converted_jpgs.SOPInstanceUID, chunksize, '****', '****')

In [11]:
if len(converted_jpgs) > 0:
    statement = 'UPDATE *****'
    engine.execute(statement)
    statement = 'UPDATE *****'
    engine.execute(statement)
    statement = 'DROP TABLE ****'
    engine.execute(statement)

In [12]:
for index, row in converted_jpgs.iterrows():
    os.makedirs(os.path.join(image_path, row['StudyMonth']), exist_ok=True)
    move(os.path.join(image_processing_path, row['SOPInstanceUID']+'.jpg'), os.path.join(image_path, row['StudyMonth'], row['SOPInstanceUID']+'.jpg'))

# 4) HASH FUNCTION (DUPLICATE CHECK)

In [13]:
# We select all images that don't have MD5 calculated

md5_calculation = pd.read_sql_query("SELECT *****", con=engine)

In [16]:
file_md5 = hash_md5(md5_calculation.path_jpg)

md5_calculated = pd.DataFrame(list(zip(md5_calculation.SOPInstanceUID, file_md5)), columns =['SOPInstanceUID', 'MD5'])

In [None]:
if len(md5_calculated) > 0:
    chunksize = 1000

    engine.execute('DROP TABLE IF EXISTS ****')
    
    upload_sql(md5_calculated, chunksize, '****', '*****')

    statement = 'UPDATE ****'
    engine.execute(statement)
    statement = 'DROP TABLE ****'
    engine.execute(statement)