### References
[1] https://nirvan66.github.io/geoguessr.html
We took inspiration from this previous project and attempted to improve the results and collect more data. Our main contributions to the training portion of the project were:
- include creating a custom training loop and a custom loss function.
- Parallelizing the model itself with tensorflow distributed.
- We attempted to train the entirety of the model all in spark such that it is entirely distributed.

# Attempt at using Spark for entire Pipeline
This file contains our attempt at using Spark for the entire preprocessing and modeling pipeline.
We were successful in implementing the preprocessing portion but had consistent issues with running out of memory.
Additionally, we were unable to get any libraries to work that we needed to use keras with spark.
We implement a working version of this pipeline in pure numpy and tensorflow in main.ipynb

## Import All Packages


In [None]:
%load_ext autoreload
%autoreload 2

import os
import gdown
import sys
import pandas as pd
import numpy as np
import tensorflow as tf
from functools import reduce
from pyspark import SparkContext
from pyspark.sql import SparkSession
from numpy import asarray
from pyspark.ml.functions import array_to_vector
from PIL import Image
from pyspark.ml.feature import OneHotEncoder
from pyspark.sql.functions import rand
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.layers import Conv2D, BatchNormalization, MaxPool2D, GlobalMaxPool2D, Reshape
from tensorflow.keras.layers import TimeDistributed, Dense, Dropout, LSTM
from pyspark.ml import Pipeline
from distkeras import predictors, trainers
from pyspark.sql import functions, types
from pyspark import ml


# Download image data
The following cell downloads the Data tar file located in the team's Google Drive. This allows one to download all of the data necessary used for training and validating the model. After running the cell below, one should check the directory that they are in to verify that a directory and tar file name "Data" and "Data.tar" respectively appear. These files contain the images collected from the data scraping performed.

**(NOTE: If you receive an error that prevents you from accessing the Google Drive, restart the kernel and rerun the cell once. If this does to fix the issue, please email Hunter Price for access information)**

In [3]:

data_tar = 'Data.tar'
data_url = 'https://drive.google.com/uc?id=13JA-0Pafd5VbCXEdnxJZw8XbMqcurxSg'

if not os.path.exists(data_tar) and not os.path.exists('Data'):
    gdown.download(data_url, data_tar, quiet=False)
if not os.path.exists('Data'):
    !tar -xf Data.tar

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


# Create CSV
The following could handles parsing through data directory and creating a CSV of the filenames and metadata. One must first specify the directory that contain the images. This variable is preset to 'Data', which accesses the directory downloaded in the cell above. Once a directory is specified, one can run the cell to download the dataset into a CSV file named "DataLabels.csv". Essentially, the code uses the directory names within the data directory to label the data accordingly. The code also filters out any image folders that do not contain all three images. Please verify that the CSV file has been properly created before continuing.

In [4]:
#Declared global variables
dir_name = 'Data'
myDict = {}
data_frame = pd.DataFrame()

#This function gets the pathnames to each of the images
def get_filepaths(directory):
    file_paths = []  # List which will store all of the full filepaths.
    count = 0
    
    # Walk the tree.
    print(" ")
    print("The following paths will not be included because they contain less than 3 directions:")
    for root, directories, files in os.walk(directory):
        if(len(files) == 3 or len(files) == 0):
            for filename in files:
                count += 1
                # Join the two strings in order to form the full filepath.
                filepath = os.path.join(root, filename)
                count, file_paths.append(filepath) 
        else:
            print(str(root))
    return count, file_paths

#This function parses the path names and extracts the information
def data_to_labels(dir_name):
    counted, file_paths = (get_filepaths(dir_name))
    file_paths_segmented = [f.split(os.path.sep) for f in file_paths]
    file_segmented2 = []

    for i, f in enumerate(file_paths_segmented):
        files = []
        for fi in f:
            files.append(fi.replace('_',',').split(','))

        files = reduce(lambda a,b:a+b, files)
        files.insert(0,file_paths[i])
        file_segmented2.append(files)    
        myDict["File " + str(i)] = files
    return myDict

#Main simply stores the information in a CSV. This process will be turned into a seperate function
def main():
    
    #Fill the dir_name string manually to run in notebook
    if(dir_name == ''):
        if(len(sys.argv) != 2):
            print('Please specify a directory or Define "dir_path" in the code')   
        else:
            print(f"Created a CSV file named DataLabels.csv")
            data_frame = (pd.DataFrame.from_dict(data_to_labels(str(sys.argv[1])),orient='columns')).T
            data_frame.columns = ['File Path', 'Data Folder', 'Grid Number', 'Min X', 'Min Y', 'Max X', 'Max Y','Latitude','Longitude','Angle','File Name'] 
            data_frame.to_csv('DataLabels.csv')
    else:
            print(f"Created a CSV file named DataLabels.csv")
            data_frame = (pd.DataFrame.from_dict(data_to_labels(dir_name),orient='columns')).T
            data_frame.columns = ['File Path', 'Data Folder', 'Grid Number', 'Min X', 'Min Y', 'Max X', 'Max Y','Latitude','Longitude','Angle','File Name'] 
            data_frame.to_csv('DataLabels.csv')
    
if __name__=="__main__" :
    main()


Created a CSV file named DataLabels.csv
 
The following paths will not be included because they contain less than 3 directions:
Data/39_-116.0_44.0_-114.0_46.0/45.5970459,-115.861618
Data/0_-124.552441_39.97720877884329_-124.0_45.08734407897483/40.07824520959485,-124.0863631072246


# Create the Spark Context and Session
This creates the local spark context then creates a sql session with the spark context. This is needed as it gives us more functions otherwise unavailable with only the spark context.

In [6]:

# Create a spark context and a spark session with some extra config stuff
sc = SparkContext.getOrCreate()
sc.getConf().set('spark.executor.memory', '6g') \
    .set("spark.driver.memory", "16g") \
    .set("spark.memory.offHeap.enabled",True) \
    .set("spark.memory.offHeap.size",'6g')
spark = SparkSession(sc)

# Define map functions for spark
The following functions are used to define the map functions for the spark dataframes.

In [11]:
# This takes in a row of pandas series and converts it to a single dict of the values
def get_row(r):
    row = {
            'File Path 1': r['File Path'].iloc[0],
            'File Path 2': r['File Path'].iloc[1],
            'File Path 3': r['File Path'].iloc[2],
            'Latitude': r['Latitude'].iloc[0],
            'Longitude': r['Longitude'].iloc[0],
            'Min Y': r['Min Y'].iloc[0],
            'Min X': r['Min X'].iloc[0],
            'Max Y': r['Max Y'].iloc[0],
            'Max X': r['Max X'].iloc[0],
            'Grid Number': int(r['Grid Number'].iloc[0])
        }
    return row

# This loads in the image files and returns the data needed for the model
from pyspark.sql.functions import array
def load_data(r):
    dsize = (200,100)
    # Initialize the data vectors
    X = np.zeros((3,dsize[1],dsize[0],3))
    X[0,:] = asarray(Image.open(r['File Path 1']).resize(dsize, resample=Image.LANCZOS))
    X[1,:] = asarray(Image.open(r['File Path 2']).resize(dsize, resample=Image.LANCZOS))
    X[2,:] = asarray(Image.open(r['File Path 3']).resize(dsize, resample=Image.LANCZOS))

    # Scale it and Flatten it here because it's a pain to do later
    X /= 255
    X = X.flatten()

    # Grab remaining data
    y = int(r['Grid Number'])
    loc = np.array([r['Latitude'], r['Longitude']])

    return X.tolist(), y, loc.tolist()


# Load in all data
Read in all data specified in the csv into a dataframe.

In [12]:
# Read in the data, then create a list containing a group of 3 files for all the files in data
clean_data = pd.DataFrame(columns=['File Path 1', 'File Path 2', 'File Path 3',
                                   'Latitude', 'Longitude',
                                   'Min Y', 'Min X', 'Max Y', 'Max X', 'Grid Number'])
data = pd.read_csv('DataLabels.csv')
data = [data[i:i+3] for i in range(0, len(data), 3)]

# create an rdd to read in the data
rdd = sc.parallelize(data)
rdd = rdd.map(get_row)
rdd = rdd.map(load_data)

# use the rdd to create a structured dataframe
columns = ['X', 'y', 'loc']
df = spark.createDataFrame(rdd, columns)

# Convert the array column types to vectors
df = df.withColumn('X', array_to_vector('X'))
df = df.withColumn('loc', array_to_vector('loc'))

# TODO - throw this parquet into the Google Drive for download to speed up the process
# if we want to save the partially preprocessed data do it here
checkpoint_data = False
if checkpoint_data:
    df.write.parquet('loaded_data.parquet')
df.printSchema()


root
 |-- X: vector (nullable = true)
 |-- y: long (nullable = true)
 |-- loc: vector (nullable = true)



# Preprocessing Checkpoint
We make a checkpoint of our data to make sure it's all good as well as to make sure we don't have to rerun the preprocessing as it consistently ran out of memory or took too long.

In [13]:

# download the partially preprocessed data
download_data = False
checkpoint_data = False

if download_data:
    data_tar = 'loaded_data.tar'
    data_url = 'https://drive.google.com/uc?id=17YmXWu_K1b9o-VBjDFXLdX8WMfrRXA6b'

    if not os.path.exists(data_tar) and not os.path.exists('loaded_data.parquet'):
        gdown.download(data_url, data_tar, quiet=False)
    if not os.path.exists('loaded_data.parquet'):
        !tar -xf 'loaded_data.tar'
    checkpoint_data = True

# load in the partially preprocessed data
if checkpoint_data:
    df = spark.read.parquet("loaded_data.parquet")
df.printSchema()


root
 |-- X: vector (nullable = true)
 |-- y: long (nullable = true)
 |-- loc: vector (nullable = true)



# Build Preprocessing Pipeline
This creates a pipeline to preprocess any data that has not yet been preprocessed. This one hot encodes the y data.

In [23]:

stages = [
    OneHotEncoder(inputCols=['y'], outputCols=['y_enc']),
]
pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(df)
df_transform = pipeline_model.transform(df)

checkpoint_data = False
if checkpoint_data:
    df_transform.write.parquet('preprocessed_data.parquet')
df_transform.printSchema()


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.7/dist-packages/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored

# Preprocessing Checkpoint
We make a checkpoint of our data to make sure it's all good as well as to make sure we don't have to rerun the preprocessing as it consistently ran out of memory or took too long.

In [25]:
# download the partially preprocessed data
download_data = True
checkpoint_data = False

if download_data:
    data_tar = 'preprocessed_data.tar'

    #Push this one
    data_url = 'https://drive.google.com/uc?id=1d3heMiNKlonRTfY4FbDuLSRe-Bdu-4Hq'

    if not os.path.exists(data_tar) and not os.path.exists('preprocessed_data.parquet'):
        gdown.download(data_url, data_tar, quiet=False)
    if not os.path.exists('preprocessed_data.parquet'):
        !tar -xf 'preprocessed_data.tar'
    checkpoint_data = True

# load in the partially preprocessed data
if checkpoint_data:
    df_transform = spark.read.parquet("preprocessed_data.parquet")
df_transform.printSchema()


Downloading...
From: https://drive.google.com/uc?id=1d3heMiNKlonRTfY4FbDuLSRe-Bdu-4Hq
To: /content/preprocessed_data.tar
100%|██████████| 138M/138M [00:02<00:00, 59.0MB/s]


root
 |-- X: vector (nullable = true)
 |-- y: long (nullable = true)
 |-- loc: vector (nullable = true)
 |-- y_enc: vector (nullable = true)



# Final Data Preparation
Splits the data into and training and test set.

In [26]:

seed = 0  # can remove this in the future, here for now for consistency
val_percent = 0.2
df_transform = df_transform.orderBy(rand())
df_transform_fin = df_transform.select('X', 'y_enc')
train, test = df_transform_fin.randomSplit([1-val_percent,val_percent], seed=seed)
gridCount = len(train.select('y_enc').first()[0])

# Define the Model
This defines the model that we will use to train our data.
We use resnet50 as our backbone, then feed the output of each of the 3 images into an LSTM module configured for a many to one architecture. We then feed this into more dense layers and finally output the softmaxed prediction.

In [29]:
def get_model(useRestnet = True, inputShape=(3, 100, 200, 3), gridCount=39):
    '''
    The function is used to load or initialize a new model
    useRestnet : set to True to use pretrained frozen restnet model
                 set to False to use trainable CNN model
    inputShape: Shape of input image set
                (<numer-of-images>, <image-width>, <image-height>, <RGB-values>)
    gridCount: Number of ouput grids to predict on
    '''
    size = 1

    for i in inputShape:
        size *= i

    convnet = tf.keras.Sequential()
    if useRestnet:
        # use restnet CNN
        restnet = ResNet50(include_top=False, weights='imagenet', input_shape=inputShape[1:])
        # Freeze model
        restnet.trainable = False
        convnet.add(restnet)
    else:
        # Use trainable CNN
        convnet.add(Conv2D(128, (3,3), input_shape=inputShape[1:],
                           padding='same', activation='relu'))
        convnet.add(Conv2D(128, (3,3), padding='same', activation='relu'))
        convnet.add(BatchNormalization(momentum=.6))
        convnet.add(MaxPool2D())
        convnet.add(Conv2D(64, (3,3), padding='same', activation='relu'))
        convnet.add(Conv2D(64, (3,3), padding='same', activation='relu'))
        convnet.add(BatchNormalization(momentum=.6))
        convnet.add(MaxPool2D())
        convnet.add(Conv2D(64, (3,3), padding='same', activation='relu'))
        convnet.add(Conv2D(64, (3,3), padding='same', activation='relu'))
        convnet.add(BatchNormalization(momentum=.6))
        convnet.add(MaxPool2D())
        convnet.add(Conv2D(512, (3,3), padding='same', activation='relu'))
        convnet.add(Conv2D(512, (3,3), padding='same', activation='relu'))
        convnet.add(BatchNormalization(momentum=.6))
    convnet.add(GlobalMaxPool2D())
    model = tf.keras.Sequential()
    # Connect the CNN to an LSTM
    model.add(Reshape(inputShape,input_shape=(1, size)))
    model.add(TimeDistributed(convnet))
    model.add(LSTM(64))
    model.add(Dense(1024, activation='relu'))
    model.add(Dropout(.5))
    model.add(Dense(512, activation='relu'))
    model.add(Dropout(.5))
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(.5))
    model.add(Dense(64, activation='relu'))
    model.add(Dense(gridCount, activation='softmax'))

    return model



# Define the training parameters and get the model object

In [None]:
# Training Params
lr = 0.001
momentum = 0.1  # TODO - potentially use momentum here
num_workers = 1
epochs = 20
batch_size = 5
verbosity = 1
mode = 'synchronous'

# Get model
model = get_model(useRestnet=True, inputShape=(3, 100, 200, 3), gridCount=gridCount)


Model: "sequential_20"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 reshape_14 (Reshape)        (None, 3, 100, 200, 3)    0         
                                                                 
 time_distributed_5 (TimeDis  (None, 3, 2048)          23587712  
 tributed)                                                       
                                                                 
 lstm_5 (LSTM)               (None, 64)                540928    
                                                                 
 dense_25 (Dense)            (None, 1024)              66560     
                                                                 
 dropout_15 (Dropout)        (None, 1024)              0         
                                                                 
 dense_26 (Dense)            (None, 512)               524800    
                                                     

# Create Spark Keras Wrapper
This creates a Spark ml Estimator wrapper around the Keras model that we will then use to train and evaluate the model.

In [43]:
class DistKeras(ml.Estimator):
    def __init__(self, *args, **kwargs):
        self.__trainer_klass = args[0]
        self.__trainer_params = args[1]
        self.__build_trainer(**kwargs)
        super().__init__()

    @classmethod
    def __build_keras_model(klass, *args, **kwargs):
        keras_model = get_model(useRestnet=True, inputShape=(3, 100, 200, 3), gridCount=gridCount)
        return keras_model

    def __build_trainer(self, *args, **kwargs):
        print(kwargs)
        loss = kwargs['loss']
        keras_model = DistKeras.__build_keras_model(**kwargs)
        self._trainer = self.__trainer_klass(keras_model, 'adam',
                                             loss, **self.__trainer_params)

    def _fit(self, *args, **kwargs):
        data_frame = args[0]
        if len(args) > 1:
            params = args[1]
            self.__build_trainer(**params)
        keras_model = self._trainer.train(data_frame)
        return DistKerasModel(keras_model)


class DistKerasModel(ml.Model):

    def __init__(self, *args, **kwargs):
        self._keras_model = args[0]
        self._predictor = predictors.ModelPredictor(self._keras_model)
        super().__init__()

    def _transform(self, *args, **kwargs):
        data_frame = args[0]
        pred_col = self._predictor.output_column
        preds = self._predictor.predict(data_frame)
        return preds.withColumn(pred_col,
                                cast_to_double(preds[pred_col]))


cast_to_double = functions.udf(lambda row: float(row[0]), types.DoubleType())


# Create the Spark Keras Estimator
We create the distributed estimator then fit it and transform it to the training data

In [61]:
port = 6980
port += 1
estimator = DistKeras(trainers.ADAG,
                      {'batch_size': 256,
                       'communication_window': 3,
                       'num_epoch': 10,
                       'num_workers': 1,
                       'master_port': port,
                       'features_col': 'X',
                       'label_col': 'y_enc'}, loss='categorical_crossentropy',lr_decay=0.90, learning_rate=1e-2)
estimator = estimator.fit(train)


{'loss': 'categorical_crossentropy', 'lr_decay': 0.9, 'learning_rate': 0.01}


  self.center_variable = np.asarray(self.model.get_weights())


This cell was unsuccessful as we could not get the distributed estimator to work.

In [64]:
estimator.transform(train)

Py4JJavaError: ignored