In [1]:
# For extracting files from Amazon S3 Buckets
import boto3
from botocore import UNSIGNED
from botocore.client import Config

# For PySpark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

# For Delta Lake
from delta import *

# Working with images
from PIL import Image
import io
import base64

# To speed up, track time
import multiprocessing
import time
#import tqdm

# to dump data if something happens
import pickle

#to group flight ids and get counts
from itertools import groupby

import os
import sys
os.environ["SPARK_HOME"] = "/opt/spark-3.0.1-bin-hadoop2.7"
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [None]:
from multiprocessing import Pool
from functools import partial
import inspect

def parallel_task(func, iterable, *params):
    
    with open(f'./tmp_func.py', 'w') as file:
        file.write(inspect.getsource(func).replace(func.__name__, "task"))
        
    from tmp_func import task

    if __name__ == '__main__':
        func = partial(task, params)
        pool = Pool(processes=16)
        res = pool.map(func, iterable)
        pool.close()
        return res
    else:
        raise "Not in Jupyter Notebook"

In [None]:
def long_running_task(params, id):
    # Heavy job here
    return params, id

data_list = range(8)

for res in parallel_task(long_running_task, data_list, "a", 1, "b"):
    print(res) 

In [None]:
with multiprocessing.Pool(cpus) as p:
        img_content = p.map(download_images, images)

In [None]:
# These 2 links include the jar files needed to interact with AWS S3
!wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -P $SPARK_HOME/jars/
!wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar -P $SPARK_HOME/jars/

In [None]:
spark.stop()

In [None]:
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size","4g") \

In [2]:
# Create a Spark Session
spark = SparkSession.builder.appName("drones") \
    .config("spark.executor.memory", "25g") \
    .config("spark.executor.cores", "2") \
    .config("spark.driver.memory", "15g") \
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.driver.memoryOverhead", "15g") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
    .getOrCreate()

sc = spark.sparkContext

schema = StructType([StructField("img_path", StringType()),
                    StructField("img_content", StringType())])

In [None]:
spark

## Using Multiprocessing Pool

In [None]:
s3 = boto3.resource('s3', config=Config(signature_version=UNSIGNED))
bucket = s3.Bucket("airborne-obj-detection-challenge-training")

In [None]:
def list_image_paths():
    bucket_objects = bucket.objects.filter(Prefix="part1/Images")
    image_paths = []
    for obj in bucket_objects:
        if obj.key.endswith(".png"):
            image_paths.append(obj.key)
    return(image_paths)

In [3]:
#img_path = list_image_paths()

#with open('../img_path.pickle','wb') as file:
 #   pickle.dump(img_path,file)

with open('../img_path.pickle','rb') as file:
    img_path = pickle.load(file)
len(img_path)

1570557

In [4]:
# get a list of flight ids (folder names)
flight_ids = []
for path_name in img_path:
    flight_ids.append(path_name[13:45])
len(flight_ids)

1570557

#### Group by flight ids and get count of images for each flight

In [5]:
flight_id_order = [key for key, group in groupby(flight_ids)]
print(flight_id_order[0:10])

flight_img_counts = [len(list(group)) for key, group in groupby(flight_ids)]
print(flight_img_counts[0:10])

['0001ba865c8e410e88609541b8f55ffc', '0011f3f114a741b5b02326c9e96e597a', '001578c6c6b340738c9277fcb1307e34', '0022217d1ed446e9a3a5fef13d33facf', '0036dcc16c474b1abaf14d1735a7e1cf', '003e7a4bf58b4849a90556cb26de248c', '004474050bdc46c2805ae42048c24c2f', '004c26c2de5a4d2a85248be48844cb48', '005c047b72a344fca02a490168085adf', '006345553ed64e77a52d94035e1e747a']
[1199, 1199, 1199, 1199, 1199, 1199, 1199, 1199, 1198, 1199]


#### Limit the number of flights to 400, 1 Terabyte (479,426 imgages)

In [6]:
# first 400 flights (1 TB)
img_counts_1TB = flight_img_counts[0:400]
sum(img_counts_1TB)

479426

In [7]:
flights_1TB = flight_ids[0:479426]
len(flights_1TB)

479426

#### For each flight, download all images and store as delta table (1 table for each flight id)

In [None]:
def download_images(image_name):
    img_s3 = bucket.Object(image_name)
    img_content = img_s3.get()['Body'].read()
    img_PIL = Image.open(io.BytesIO(img_content))
    img_smaller = img_PIL.convert('RGB').resize((224,224))
    temp_img = io.BytesIO()
    img_smaller.save(temp_img, format = "png")
    png_encoded = base64.b64encode(temp_img.getvalue())
    
    return str(png_encoded)

In [None]:
from DownloadImages import download_images


In [8]:
cpus = multiprocessing.cpu_count()
cpus

16

In [None]:
for i in range(len(img_counts_1TB[5:5])):
    starting_time2 = time.time()

In [9]:
import DownloadImages
starting_time = time.time()
for i in range(len(img_counts_1TB[48:])):
    starting_time2 = time.time()
    i = i + 49
    prv_n_images = sum(img_counts_1TB[0:i])
    n_images = img_counts_1TB[i]
    flight_ids = flights_1TB[prv_n_images:prv_n_images+n_images]
    flight_id = flight_ids[0]
    images = img_path[prv_n_images:prv_n_images+n_images]
    
    print("Starting download for flight", i)
    with multiprocessing.Pool(cpus) as p:
        img_content = p.map(DownloadImages.download_images, images)
    
    print("Downloading flight", i, "(ID #:", flight_id, ") took", time.time()-starting_time2, "seconds.")
    
    img_path_rdd = sc.parallelize(flight_ids)
    img_content_rdd = sc.parallelize(img_content)
    zipped_rdd = img_path_rdd.zip(img_content_rdd).collect()
    df = spark.createDataFrame(zipped_rdd, schema)
    
    s3_bucket_name = "s3a://drones-project-test/" + flight_id + "/"
    df.write.format("delta").mode("overwrite").save(s3_bucket_name)
    print("Total time to download and upload flight", i, "to our own s3 bucket took", time.time()-starting_time2, "seconds.")

    del img_content
    del img_path_rdd
    del img_content_rdd
    del zipped_rdd
    del df
    
print("Multiprocessing time for 400 flights with", cpus,"cpus:", time.time()-starting_time)

Starting download for flight 49




Downloading flight 49 (ID #: 0317fe0622404ccb82a7d53d2a94ae63 ) took 21.096070766448975 seconds.
Total time to download and upload flight 49 to our own s3 bucket took 39.62532877922058 seconds.
Starting download for flight 50


Process ForkPoolWorker-21:
Process ForkPoolWorker-17:
Exception ignored in: Process ForkPoolWorker-22:
Process ForkPoolWorker-26:
Exception ignored in: Process ForkPoolWorker-19:
Process ForkPoolWorker-27:
Process ForkPoolWorker-25:
Process ForkPoolWorker-30:
Process ForkPoolWorker-23:
Process ForkPoolWorker-24:
Process ForkPoolWorker-28:
Process ForkPoolWorker-32:
Traceback (most recent call last):
Process ForkPoolWorker-18:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
<function JavaObject.__init__.<locals>.<lambda> at 0x7f8dd4aa32f0>Process ForkPoolWorker-20:
<function JavaObject.__init__.<locals>.<lambda> at 0x7f8dd4aa32f0>Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
Traceback (most recent call last):
 

  File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
  File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()

KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
  File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()

KeyboardInterrupt
  File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
KeyboardInterrupt
KeyboardInterrupt
  File "/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py", line 645, in _garbage_collect_object
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
  File "/usr/

  File "/usr/lib/python3.6/ssl.py", line 1012, in recv_into
    return self.read(nbytes, buffer)
  File "/usr/lib/python3.6/http/client.py", line 268, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/lib/python3.6/ssl.py", line 874, in read
    return self._sslobj.read(len, buffer)
  File "/usr/lib/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
  File "/usr/lib/python3.6/ssl.py", line 631, in read
    v = self._sslobj.read(len, buffer)
  File "/usr/lib/python3.6/ssl.py", line 1012, in recv_into
    return self.read(nbytes, buffer)
KeyboardInterrupt
  File "/usr/lib/python3.6/ssl.py", line 874, in read
    return self._sslobj.read(len, buffer)
  File "/usr/lib/python3.6/ssl.py", line 631, in read
    v = self._sslobj.read(len, buffer)
KeyboardInterrupt


KeyboardInterrupt: 

<hr>

### Do NOT run:

In [None]:
len(img_path)

In [None]:
one_tenth = len(img_path)//10
tenth1 = img_path[:one_tenth]
tenth2 = img_path[one_tenth:2*one_tenth]
tenth3 = img_path[2*one_tenth:3*one_tenth]
tenth4 = img_path[3*one_tenth:4*one_tenth]
tenth5 = img_path[4*one_tenth:5*one_tenth]
tenth6 = img_path[5*one_tenth:6*one_tenth]
tenth7 = img_path[6*one_tenth:7*one_tenth]
tenth8 = img_path[7*one_tenth:8*one_tenth]
tenth9 = img_path[8*one_tenth:9*one_tenth]
last_tenth = img_path[9*one_tenth:]

In [None]:
len(tenth1 + tenth2 + tenth3 + tenth4 + tenth5 + tenth6 + tenth7 + tenth8 + tenth9 + last_tenth)

In [None]:
cpus = multiprocessing.cpu_count()
cpus

In [None]:
starting_time = time.time()

#with multiprocessing.Pool(cpus) as p:
#    img_content = list(tqdm.tqdm(p.imap(download_images, tenth1), total = len(tenth1))) #switch to tenth2...

print("Multiprocessing time for Part 1.0 with", cpus," Cores:", time.time()-starting_time)

In [None]:
#with open('../tenth1.0.pickle','wb') as file:
 #   pickle.dump(img_content,file)
with open('../tenth1.0.pickle','rb') as file:
    img_content = pickle.load(file)
len(img_content)

In [None]:
157055//2

In [None]:
part_20th1 = tenth1[:78527]

In [None]:
img_20th1 = img_content[:78527]

In [None]:
# Write to our own S3 Bucket
starting_time = time.time()

partition_num = 4*cpus

#img_path_rdd = sc.parallelize(part_20th1, partition_num) #switch to tenth2...
#print("Image path RDD created in:", time.time()-starting_time)

img_content_rdd = sc.parallelize(img_20th1, partition_num)
print("Image content RDD created in:", time.time()-starting_time)

#zipped_rdd = img_path_rdd.zip(img_content_rdd).collect()
#print("Both RDDs zipped in:", time.time()-starting_time)

#img_path_rdd = sc.parallelize(flight_ids)
#img_content_rdd = sc.parallelize(img_content)
#zipped_rdd = img_path_rdd.zip(img_content_rdd).collect()
#df = spark.createDataFrame(zipped_rdd, schema)

schema2 = StructType([StructField("img_content", StringType())])

df = spark.createDataFrame(img_content_rdd, schema2)
print("Spark DataFrame created in:", time.time()-starting_time)

In [None]:
df = spark.createDataFrame(img_content_rdd, StringType())

In [None]:
df.explain(mode='cost')

In [None]:
df.count()

In [None]:
df.write.format("delta").mode("overwrite").save("s3a://drones-project-test/Part1-Twentieth1/") #change name

ending_time = time.time()
print("Content written into S3 bucket as a delta lake (parquet files):", ending_time - starting_time)

In [None]:
starting_time = time.time()

df = spark.read.format("delta").load("s3a://drones-project-test/Part1-Twentieth1/") #change name

ending_time = time.time()
print("Elapsed time for reading ", df.count(), " images from table:", ending_time - starting_time)