# Preprocessing on AWS

*By Danny Luo*

The following is the notebook that I used to preprocess the Lung CT image data of the Kaggle Data Science Bowl that uses Python's `multiprocessing` module to use all CPU cores to efficiently preprocess the 150GB stage 1 dicom dataset. I ran this on `c4.xlarge` and `c4.2xlarge`. I preprocessed about 73% of the data in 1h 30m on `c4.2xlarge`, much faster than the regular preprocessing, which took 5h to finish just 27%.

This notebook uses preprocessing functions in the `preprocessing_aws.py`, prepared by Cem Anil from Guido Zuidhof's `Full_Preprocessing_Tutorial.ipynb` on kaggle. It was slightly modifed for AWS by Danny Luo.

In [1]:
%matplotlib inline

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import dicom
import os
#import scipy.ndimage
import matplotlib.pyplot as plt

import multiprocessing

from skimage import measure, morphology
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
from io import BytesIO

from preprocessing_aws import preprocess

In [2]:
import boto3
#Let's use Amazon S3

client = boto3.client('s3') #functional oriented API
resource = boto3.resource('s3') #object oriented API
udst_dsb =resource.Bucket('udst-dsb') #subsitute this for your s3 bucket where you keep your data.


## Thoughts on Improvement

* Memory Issues: Try to see where you can free up memory immediately -> DUMP object right after np.save! So you don't hold onto the object as you save or potentially going on to the next process
* Saving progress: Save the name of the data that you've processed to files

In [72]:
def preprocess_aws(patients, rootpath, savepath='tmp/', process_num=-1):
    #rootpath is the path in which the patient files are . Ex. 'data/sample_images/'
    #savepath is the path to the directory of the saved files in your S3 bucket
    #process_num to keep track of which process it is
    count = 0
    for patient in patients:
        path = rootpath + patient #Which patient
    
        objs= list(udst_dsb.objects.filter(Prefix=path)) #lists all slices of that ONE patient
        img = preprocess(objs, aws=True)
    
        np.save(patient+'.npy', img)
        
        udst_dsb.upload_file(patient+'.npy',savepath+patient+'.npy')
        #Important: remove the file so your disk isn't swamped
        os.remove(patient+'.npy')
        
        count += 1
        print('Process ', process_num, ': ',count, ' out of ', len(patients), ' completed. '+patient)
        if count == len(patients):
            print('Process ', process_num,'COMPLETED: ', len(patients))

def chunkIt(seq, num): #To divide the data about approximately # NEED FIXING try (list of 12 numbers, 10) -> you will get 11!
  avg = len(seq) / float(num)
  out = []
  last = 0.0

  while last < len(seq):
    out.append(seq[int(last):int(last + avg)])
    last += avg

  return out

def multipreprocess_aws(patients, num_processes, path, savepath):
    #Cutting up the processes and then sorting them by size. We want the larger jobs first to optimize workflow.
     jobs = []
    patients = sorted(chunkIt(patients, num_processes), key=lambda chunk:-len(chunk))#USE NUMPY ARRAY_SPLIT INSTEAD, this is needlessly complicated, and also fails for certain combinations.
    print(patients, '\n')
    for i in range(num_processes):
        print('Process ', i+1, ': ', patients[i], '\n')
        #patient[i] chunk of patients for this i-th process
        p = multiprocessing.Process(target=preprocess_aws, args=(patients[i], path, savepath, i+1))
        jobs.append(p)
        p.start()
        
    return jobs

In [77]:
%%time
path = 'data/stage1/'
savepath = 'pp_data/stage1/' #on udst-dsb bucket

patients_full = list(set([objsum.key.split('/')[2] for objsum in udst_dsb.objects.filter(Prefix=path)]))
patients_full.sort()


CPU times: user 52.6 s, sys: 1.07 s, total: 53.7 s
Wall time: 1min 34s


In [79]:
#Multiprocessing here
patients = patients_full[428:] #starting where we left off
print(patients_full[427], patients[0])

#jobs = multipreprocess_aws(patients, 12, path, savepath)

#Fix oops somehow error with process 7 and 10
#  Last one for 7: Process  7 :  91  out of  97  completed. 933cc0dec1c737d9654820453ce64284
unproc_7_patients = ['934bc93ae41ee1d98c7c44d483327f8d', '934f01786c3bdf8eeaf7d8411a3a8f82', '935908ab6e4c329756638887f2dedca2', '9397a41c9e819a92eb5c86e0e652d7c1', '93a6f37a72f60498986374f57bfc30c4', '93b60dbb5cc3fa4c4cc9247cb1143a09'] #unprocessed patients of process 7 #last one was 
# Last one for 10: Process  10 :  91  out of  97  completed. d032116d73789ff9c805f493357b4037
unproc_10_patients = ['d08383958039c53ea3347ce1719be43e', 'd09e4124b97b22ef45692b62b4ca7f03', 'd0baf38ff8d974e7b59c23fd43713de9', 'd0c1c175814c610bc333080152a605f7', 'd0c86e0c3b3fe3e1dd936b7553ecc29c', 'd104e584e51830ea51ee60e69186f83b'] 

unproc_patients = unproc_7_patients + unproc_10_patients

sorted(chunkIt(unproc_patients, 10))
#jobs = multipreprocess_aws(unproc_7_patients + unproc_10_patients, 10, path, savepath) #Fixing this

48ab0b98fc7789304c21430978624f32 48e592418247393234dd658f9112c543
[['763288341ee363a264fe45a28ea28c21', '763ce10dfdd4662f15de3f5931d5534b', '7692b05abf70dbac5292e91918e98913', '76a77d945cd4c568f3b602957e1ec031', '76e54792d7f5543f734e0906ea8f36c5', '76f36ea4858c572b425eb915e1b27c8d', '76fff2029e577190ce0bf070192b889e', '77033e4c1591403d1b1255607a20a983', '775c5f8043e72b2284b5885254566271', '77941d758eae521a00ef225e306eda08', '7797bfd33eb9f06da2b2f5d5c6501af4', '77d6f5203d46073369d7038b2d58e320', '781cf9cc66e59f4586d9338630b95c1a', '7842c108866fccf9b1b56dca68fc355e', '78459ff46c9f3b3fa26be2a467515c7e', '7852cb521d7029ca08133476054e7bec', '7869cc6bfc3678fec1a81e93b34648cf', '787bd094dd0586ea0f51f9f8557424a8', '78c0a0104c0428e260cbd9e50eb7eea6', '7917af5df3fe5577f912c5168c6307e3', '7921bbb92d5390784f3ba046be0d59a3', '797d6bffdecfc88e990e820dc5771a0b', '799b283083f1b6547d558f063b318e78', '799c0026d66479f7447ed0df5955f051', '79e0e507b1cd1d0c8107de4fd6b9e444', '79e7773230a96a6789c2c64ec39312c


From scipy 0.13.0, the output shape of zoom() is calculated with round() instead of int() - for these inputs the size of the returned array has changed.



Process  6 :  2  out of  97  completed. 66cef72d8428dbba31f2ab01abdaf6ca
Process  12 :  2  out of  97  completed. e1e47812eecd80466cf7f5b0160de446
Process  2 :  1  out of  98  completed. b42263b9b84f10464a94092e0cdc13b1
Process  3 :  1  out of  98  completed. f0310ffc724faf9f7aef2c418127ee68
Process  5 :  2  out of  97  completed. 580cffecce8d3d53cde1abb922adf21a
Process  10 :  2  out of  97  completed. c25876fb40d6f8dafd1ecb243193dd3f
Process  11 :  2  out of  97  completed. d13d9be7873c99a9fde9d24a95f1590d
Process  4 :  2  out of  97  completed. 48fac79fead32f2b10e37752cb077af6
Process  1 :  2  out of  98  completed. 763ce10dfdd4662f15de3f5931d5534b
Process  8 :  2  out of  97  completed. 9462c59a652187dc2d581b8772bcb3f0
Process  9 :  2  out of  97  completed. a590753c175011e42bf17b90e3aec3b0
Process  6 :  3  out of  97  completed. 66fb40934f0be4b301fe27db7fb62299
Process  3 :  2  out of  98  completed. f0459473753b99105e9723f122022ad0
Process  2 :  2  out of  98  completed. b4581f4f


invalid value encountered in true_divide



Process  5 :  30  out of  97  completed. 5bf71718aa1bdcada76f0683f8d6b41b
Process  2 :  30  out of  98  completed. b8bb02d229361a623a4dc57aa0e5c485
Process  1 :  28  out of  98  completed. 7a705cc36536145911a2ef9ace7d9930
Process  3 :  27  out of  98  completed. f467795ce3b50a771085d79ae8d29ecc
Process  6 :  31  out of  97  completed. 6c56e9802fb6346db58bd5daf160f9be
Process  12 :  31  out of  97  completed. e58cc57cab8a1738041b72b156fedc56
Process  11 :  32  out of  97  completed. d68bfca920fd548909bd9c7eb5694b9d
Process  8 :  30  out of  97  completed. 991a2fe88aeb9ef23bb011e3843c9105
Process  9 :  32  out of  97  completed. a906cd7c6ad05c27216125485d2b7322
Process  4 :  29  out of  97  completed. 4cd70a98baca46b116071b32788d3c2d
Process  5 :  31  out of  97  completed. 5c0d992d01e6383d5684b8ac5d7143b1
Process  7 :  30  out of  97  completed. 89f003dbfbdbd18a5cdeb9b128cb075b
Process  2 :  31  out of  98  completed. b8dc33b670bb078d10954345c3ffbb3a
Process  12 :  32  out of  97  compl

Process Process-160:
Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/envs/tensorflow/lib/python3.5/site-packages/botocore/vendored/requests/packages/urllib3/response.py", line 239, in read
    data = self._fp.read()
  File "/home/ubuntu/anaconda3/envs/tensorflow/lib/python3.5/http/client.py", line 461, in read
    s = self._safe_read(self.length)
  File "/home/ubuntu/anaconda3/envs/tensorflow/lib/python3.5/http/client.py", line 609, in _safe_read
    raise IncompleteRead(b''.join(s), amt)
http.client.IncompleteRead: IncompleteRead(503065 bytes read, 22349 more expected)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/envs/tensorflow/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/home/ubuntu/anaconda3/envs/tensorflow/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-in

Process  2 :  92  out of  98  completed. c187d7be84f64e29e6623c15575f373f
Process  3 :  92  out of  98  completed. fdf73dcce35167f3ed952a58f5a6f738
Process  1 :  89  out of  98  completed. 8369f716ca2d51c934e7f6d44cb156e9
Process  12 :  96  out of  97  completed. eff5b5a7c7245fa8f4661d7e88f8bc7f
Process  4 :  97  out of  97  completed. 57af0020213d64598ede82fe9d6bb8b3
Process  4 COMPLETED:  97
Process  8 :  92  out of  97  completed. a3cb12d3b8d5c64fa55f27208fe13a07
Process  5 :  94  out of  97  completed. 6675d3cc20585f44d4c45746cb1002d4
Process  6 :  94  out of  97  completed. 7577cbd6961b0cab27f88727dcd2d6d3
Process  3 :  93  out of  98  completed. fe26fd2bb25112b3c2ca93eb8e34f8ed
Process  12 :  97  out of  97  completed. f029c73f6f8753e7447c4e9f22e917ad
Process  12 COMPLETED:  97
Process  2 :  93  out of  98  completed. c19197af81f94110ddc23317e182f37d
Process  8 :  93  out of  97  completed. a4ae73433e63549558a3d0eed9128a69
Process  5 :  95  out of  97  completed. 668bb968918c63fa

In [107]:
#Check we have all the patients
pp_patients_full = list(set([objsum.key.split('/')[2][:-4] for objsum in udst_dsb.objects.filter(Prefix=savepath)]))

print(pp_patients_full[0])
print(len(pp_patients_full))
print(len(patients_full))

d654966fd2498de023552b830c07a659
1595
1595


In [108]:
pp_patients_full.sort()
patients.sort()
print(set(patients_full) - set(pp_patients_full))

# missing 'd104e584e51830ea51ee60e69186f83b'
#multipreprocess_aws(['d104e584e51830ea51ee60e69186f83b'], 1, path, savepath)

#Preprocessing is complete

set()
1595
1595


### Preprocessing Tests

Below code is a sandbox where I played around with multiprocessing.

For smaller dataset `sample_images`, here are the times it took to preprocess all 20 sample patients.

* 2 processes on 4 CPU c4.xlarge: 6m 47s
* 4 processes on 4 CPU c4.xlarge: 4m 29s
* 6 processes on 4 CPU c4.xlarge: 3m 40s
* 8 processes on 8 CPU c4.2xlarge: 2m 09s

For the first fifty images (by path name) in `stage_1` full dataset:

* 10 processes on 8CPU c4.2xlarge: 4m 45s
* 14 processes on 8CPU c4.2xlarge: 4m 14s 

In [4]:
%%time
#test run for patient 0

path = 'data/sample_images/' + patients[0] #Which patient
objs= list(udst_dsb.objects.filter(Prefix=path)) #lists all slices of that ONE patient
img = preprocess(objs, aws=True)
np.save(patients[0]+'.npy', img)
udst_dsb.upload_file(patients[0]+'.npy','pp_data/sample_images/'+patients[0]+'.npy')
#How big was saved file?
#stat = os.stat(patients[0]+'.npy')
#print(stat)
#Important! Delete file!
#os.remove(patients[0]+'.npy')


CPU times: user 23.8 s, sys: 1.41 s, total: 25.3 s
Wall time: 44.5 s


In [43]:
%%time
#Sample Patients Trial
path = 'data/sample_images/'

patients = list(set([objsum.key.split('/')[2] for objsum in udst_dsb.objects.filter(Prefix=path)]))

patients.sort() #Sorting alphabetically #opposite direction as preprocessing_aws -> hopefully no conflicts
print(patients)

#preprocess_aws(patients, path, savepath='pp_data/sample_images')

['00cba091fa4ad62cc3200a657aeb957e', '0a099f2549429d29b32f349e95fb2244', '0a0c32c9e08cc2ea76a71649de56be6d', '0a38e7597ca26f9374f8ea2770ba870d', '0acbebb8d463b4b9ca88cf38431aac69', '0b20184e0cd497028bdd155d9fb42dc9', '0bd0e3056cbf23a1cb7f0f0b18446068', '0c0de3749d4fe175b7a5098b060982a1', '0c37613214faddf8701ca41e6d43f56e', '0c59313f52304e25d5a7dcf9877633b1', '0c60f4b87afcb3e2dfa65abbbf3ef2f9', '0c98fcb55e3f36d0c2b6507f62f4c5f1', '0c9d8314f9c69840e25febabb1229fa4', '0ca943d821204ceb089510f836a367fd', '0d06d764d3c07572074d468b4cff954f', '0d19f1c627df49eb223771c28548350e', '0d2fcf787026fece4e57be167d079383', '0d941a3ad6c889ac451caf89c46cb92a', '0ddeb08e9c97227853422bd71a2a695e', '0de72529c30fe642bc60dcb75c87f6bd']
CPU times: user 636 ms, sys: 16 ms, total: 652 ms
Wall time: 1.11 s


In [63]:

#On 4 CPU machine
#4 Multiprocesses: 4m 29

p1 = multiprocessing.Process(target=preprocess_aws, args=(patients[:5], path, 'pp_data/sample_images/'))
p2 = multiprocessing.Process(target=preprocess_aws, args=(patients[5:10], path, 'pp_data/sample_images/'))

p3 = multiprocessing.Process(target=preprocess_aws, args=(patients[10:15], path, 'pp_data/sample_images/'))
p4 = multiprocessing.Process(target=preprocess_aws, args=(patients[15:20], path, 'pp_data/sample_images/'))


p1.start()
p2.start()
p3.start()
p4.start()

1  out of  5  completed. 0c60f4b87afcb3e2dfa65abbbf3ef2f9
1  out of  5  completed. 00cba091fa4ad62cc3200a657aeb957e


In [59]:
%%time

# 6 multiprocesses
# On 4 CPU machine machine: 
p1 = multiprocessing.Process(target=preprocess_aws, args=(patients[:3], path, 'pp_data/sample_images/'))
p2 = multiprocessing.Process(target=preprocess_aws, args=(patients[3:6], path, 'pp_data/sample_images/'))

p3 = multiprocessing.Process(target=preprocess_aws, args=(patients[6:9], path, 'pp_data/sample_images/'))
p4 = multiprocessing.Process(target=preprocess_aws, args=(patients[9:12], path, 'pp_data/sample_images/'))

p5 = multiprocessing.Process(target=preprocess_aws, args=(patients[12:16], path, 'pp_data/sample_images/'))
p6 = multiprocessing.Process(target=preprocess_aws, args=(patients[16:20], path, 'pp_data/sample_images/'))

p5.start()
p6.start()
p1.start()
p2.start()
p3.start()
p4.start()

CPU times: user 0 ns, sys: 52 ms, total: 52 ms
Wall time: 51 ms
1  out of  3  completed. 0a38e7597ca26f9374f8ea2770ba870d
1  out of  3  completed. 00cba091fa4ad62cc3200a657aeb957e
1  out of  4  completed. 0d2fcf787026fece4e57be167d079383


In [45]:
%%time
# 8 multiprocesses MAY CAUSE MEMORY ERROR 
# On 4 CPU machine machine: 

p1 = multiprocessing.Process(target=preprocess_aws, args=(patients[:3], path, 'pp_data/sample_images'))
p2 = multiprocessing.Process(target=preprocess_aws, args=(patients[3:6], path, 'pp_data/sample_images'))

p3 = multiprocessing.Process(target=preprocess_aws, args=(patients[6:9], path, 'pp_data/sample_images'))
p4 = multiprocessing.Process(target=preprocess_aws, args=(patients[9:12], path, 'pp_data/sample_images'))

p5 = multiprocessing.Process(target=preprocess_aws, args=(patients[12:14], path, 'pp_data/sample_images'))
p6 = multiprocessing.Process(target=preprocess_aws, args=(patients[14:16], path, 'pp_data/sample_images'))

p7 = multiprocessing.Process(target=preprocess_aws, args=(patients[16:18], path, 'pp_data/sample_images'))
p8 = multiprocessing.Process(target=preprocess_aws, args=(patients[18:20], path, 'pp_data/sample_images'))
#Larger processes at the beginning
p1.start()
p2.start()
p3.start()
p4.start()
p5.start()
p6.start()
p7.start()
p8.start()

CPU times: user 8 ms, sys: 48 ms, total: 56 ms
Wall time: 53.6 ms
1  out of  3  completed. 0a38e7597ca26f9374f8ea2770ba870d
1  out of  2  completed. 0d2fcf787026fece4e57be167d079383
1  out of  3  completed. 00cba091fa4ad62cc3200a657aeb957e
1  out of  2  completed. 0c9d8314f9c69840e25febabb1229fa4
1  out of  3  completed. 0c59313f52304e25d5a7dcf9877633b1
1  out of  3  completed. 0bd0e3056cbf23a1cb7f0f0b18446068
1  out of  2  completed. 0ddeb08e9c97227853422bd71a2a695e
2  out of  3  completed. 0acbebb8d463b4b9ca88cf38431aac69
2  out of  2  completed. 0d941a3ad6c889ac451caf89c46cb92a
COMPLETED:  2
2  out of  3  completed. 0a099f2549429d29b32f349e95fb2244
2  out of  2  completed. 0ca943d821204ceb089510f836a367fd
COMPLETED:  2
2  out of  2  completed. 0de72529c30fe642bc60dcb75c87f6bd
COMPLETED:  2
2  out of  3  completed. 0c60f4b87afcb3e2dfa65abbbf3ef2f9
2  out of  3  completed. 0c0de3749d4fe175b7a5098b060982a1
1  out of  2  completed. 0d06d764d3c07572074d468b4cff954f
3  out of  3  complete

In [32]:
%%time

#Compare vs time to read all slices in native dicom format

count = 0

for patient in patients:
    obj = client.get_object(Bucket='udst-dsb', Key='pp_data/'+patient+'.npy')
    img_re = np.load(BytesIO(obj['Body'].read()))
    
    print(img_re.shape)
    count += 1
    print(count, ' out of ', len(patients), ' completed.') 

(335, 306, 306)
1  out of  20  completed.
(282, 308, 308)
2  out of  20  completed.
(272, 330, 330)
3  out of  20  completed.
(275, 320, 320)
4  out of  20  completed.
(365, 279, 279)
5  out of  20  completed.
(398, 309, 309)
6  out of  20  completed.
(315, 310, 310)
7  out of  20  completed.
(360, 350, 350)
8  out of  20  completed.
(320, 347, 347)
9  out of  20  completed.
(305, 390, 390)
10  out of  20  completed.
(308, 355, 355)
11  out of  20  completed.
(294, 300, 300)
12  out of  20  completed.
(353, 360, 360)
13  out of  20  completed.
(366, 370, 370)
14  out of  20  completed.
(354, 259, 259)
15  out of  20  completed.
(332, 340, 340)
16  out of  20  completed.
(304, 424, 424)
17  out of  20  completed.
(350, 340, 340)
18  out of  20  completed.
(342, 392, 392)
19  out of  20  completed.
(328, 360, 360)
20  out of  20  completed.
CPU times: user 46.2 s, sys: 6.05 s, total: 52.3 s
Wall time: 1min 45s
