# Introduction  
### The following code was tested on:  
Python 3+ running on...  
  
Microsoft Surface Go:
- CPU - Intel Pentium 4415Y @ 1.60 GHz, 2 cores, 4 threads  
- RAM - 8 GB

ULB Virtual Machine on Surface Go with:
- CPU - 2 cores dedicated  
- RAM - 4096 MB dedicated

ULB Cluster - user epb123

# Data integration

For each sub-dataset, write (and execute) code that converts a file (using possibly an old schema) into a file that has the new, latest schema version.

Your conversion code should not modify the original files, but instead create a new file. 

Be sure to explain the design behind your conversion functions!

The data integration step is highly parallellizable. Therefore, your solution on this part
**must** be written in Spark

Here we import various packages to read files as well as to deal with the shapefile of TLC location data.

In [1]:
# Various imports
import os 
import glob
import math
import pickle
import ntpath
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, Polygon

In [2]:
v_direc = pickle.load(open("v_direc",'rb'))

data_directory = pickle.load(open(v_direc + "data_folder",'rb'))
save_directory = pickle.load(open(v_direc + "save_directory",'rb'))

# Set this = True if you are running the code on the ULB cluster
cluster = True

if cluster == True:
    # Please set the directory of your geographic metadata
    shapefile_directory = r"/home/epb123/shapefile"
    old_data_directory = data_directory
    data_directory = r"hdfs:///user/epb199/data/"
elif cluster == False:
    shapefile_directory = r"/media/sf_Distributed/Project cleaner/shapefile"

The below code will start Spark in local mode, using all available CPU cores.

In [3]:
# -------------------------------
# Start Spark in LOCAL mode
# -------------------------------

if cluster == False:

    #This is needed to start a Spark session from the notebook
    os.environ['PYSPARK_SUBMIT_ARGS'] ="--conf spark.driver.memory=4g  pyspark-shell"

    from pyspark.sql import SparkSession

    # Stop any previously running spark session
    try: 
        spark
        print("Spark application already started. Terminating existing application and starting new one")
        spark.stop()
    except: 
        pass

    # Create a new spark session (note, the * indicates to use all available CPU cores)
    spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("H600Project") \
        .getOrCreate()

    #When dealing with RDDs, we work the sparkContext object. 
    #See https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext
    sc=spark.sparkContext

    from pyspark.sql.types import *
    from pyspark.sql.functions import col,udf,lit,struct
    from pyspark.sql import SQLContext
    sql_sc = SQLContext(sc)

    print(sc.version)

The below code will start Spark on the ULB cluster.

In [4]:
# ----------------------------------------------------------------
# Start Spark on the ULB cluster ,with YARN as resource manager
# ----------------------------------------------------------------

if cluster == True:

    #This is needed to start a Spark session from the notebook
    os.environ['PYSPARK_SUBMIT_ARGS'] ="--conf spark.driver.memory=4g  pyspark-shell"

    from pyspark.sql import SparkSession

    # We need to set the following environment variable, so that Spark knows where YARN runs
    os.environ['HADOOP_CONF_DIR']="/etc/hadoop/conf"

    # Since we are accessing spark through it's python API, we need to make sure that all executor
    # instances run the same version of python. 
    # (and we want Anaconda to be used, so we have access to numpy, pandas, and so forth)
    # You will likely need to adjust this path if your run on a different cluster
    os.environ['PYSPARK_PYTHON']="/usr/local/anaconda3/bin/python"
    os.environ['PYSPARK_DRIVER_PYTHON']="/usr/local/anaconda3/bin/python"

    #The following lines are just there to allow this cell to be re-executed multiple times:
    #if a spark session was already started, we stop it before starting a new one
    #(there can be only one spark context per jupyter notebook)
    try: 
        spark
        print("Spark application already started. Terminating existing application and starting new one")
        spark.stop()
    except: 
        pass

    spark = SparkSession \
        .builder \
        .master("yarn") \
        .config("spark.executor.instances","4") \
        .appName("H600Project") \
        .getOrCreate()

    #When dealing with RDDs, we work the sparkContext object. See https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext
    sc=spark.sparkContext

    from pyspark.sql.types import *
    from pyspark.sql.functions import col,udf,lit,struct
    from pyspark.sql import SQLContext
    sql_sc = SQLContext(sc)

    print(sc.version)

2.4.4


In [5]:
# Check that spark context is working, print its configuration
sc._conf.getAll()

[('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
  'public00.hpda.ulb.ac.be'),
 ('spark.executor.instances', '4'),
 ('spark.driver.port', '44961'),
 ('spark.driver.memory', '4g'),
 ('spark.driver.appUIAddress', 'http://publiclogin.hpda.ulb.ac.be:4043'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.host', 'publiclogin.hpda.ulb.ac.be'),
 ('spark.app.name', 'H600Project'),
 ('spark.app.id', 'application_1610647739131_0066'),
 ('spark.master', 'yarn'),
 ('spark.executorEnv.PYTHONPATH',
  '{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.7-src.zip'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
  'http://public00.hpda.ulb.ac.be:8088/proxy/application_1610647739131_0066'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.yarn.isPython', 'true'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.filters',
  'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter')

# Various helpful functions for later use:

We first start with a function to create a PySpark dataframe from a file at a given filepath/address. We chose to work with dataframes instead of RDDs in our implementation because of the additional optimisation given to Spark dataframes vs. RDDs when running in Python.  

We import the CSVs without inferring any schema - this appeared to be an expensive operation and while the data is still not cleaned, casting everything manually is preferable to having PySpark guess for us.  

We also directly change variable names to be lowercase and stripped for simplicity's sake. We use the select() method to do this rather than withColumnRenamed() as withColumnRenamed() appears to have significant performance issues as the number of columns increases.  

We had to change tactics and load the dataframes with Pandas first, before converting, due to limitations running on the ULB cluster

In [6]:
def create_df(address):
# Takes csv from address given and creats a dataframe
# Returns given csv as a dataframe
# Schema of this csv will be in string format!!!!!
    # Reading a csv from address while taking headers as a schema
    # To infer schema type (i.e. int, float), add option inferSchema = True
        
    # When running in YARN mode, spark expects its input to come from HDFS by default
    # Copying files to local Hadoop would prevent a 'Path does not exist' error on the cluster
    # but this is not allowed
    # We can therefore only run the below in local mode, but there is a workaround through pandas
    
    # When opening these through pandas, we have to add the "error_bad_lines = False" argument
    # This is because PySpark seemed to drop rows with an extra column, but Pandas did not
    
    #Note - opening as Pandas appears to add a recordID in an unnamed first column
    
    # Code which does not work on cluster:
    # Solved by substituting HDFS file location above - as mentioned, Spark requires an HDFS copy of the files 
    
    addressDF = spark.read.format('csv').option("header", True).load(address)

    print("opened", address)


    # pandas_DF = pd.read_csv(address, warn_bad_lines = True, error_bad_lines = False)
    # addressDF = sql_sc.createDataFrame(pandas_DF.astype(str))
    # Replacing column names with lowercase and stripped versions
    # Note - not using withColumnRenamed() due to lower efficiency as column number increases
        # (it is probably fine here - issues do not start until much bigger column numbers, but wanted to learn)
    replacements = {i:i.lower().strip() for i in addressDF.columns}
    addressDF = addressDF.select([col(j).alias(replacements.get(j, j)) for j in addressDF.columns])
    
    return addressDF

As the comments say, this function will be very useful when renaming columns later on.

In [7]:
def replace_colNames(df, namesOg, newNames):
# Takes a dataframe and two lists - original column names and names to replace them (does not have to be all names)
# Returns the dataframe with renamed columns
# LISTS MUST BE IN THE ORDER OF REPLACEMENT - i.e. namesOg[1] will be replaced by newNames[1]
    
    replacements = {}
    for i in range(len(namesOg)):
        replacements[namesOg[i]] = newNames[i]
    df = df.select([col(j).alias(replacements.get(j, j)) for j in df.columns])
    
    return df

This function is just to make it easy to add additional blank columns (for instance to add "congestion_surcharge" to early datasets)

In [8]:
def add_empty_cols(df, newNames):
# Takes a dataframe and a list of names for new empty columns
# Returns the dataframe with new columns
# Not using multiple withColumn() and a for loop, as this is more efficient

    cols = ["*"] + [lit(None).cast(StringType()).alias(i) for i in newNames]
    df = df.select(cols)
    
    return df

We don't actually use this function aside from testing, but as you can imagine it is very useful for that purpose. I have left it uncommented for usage convenience, but it could easily be deleted.

In [9]:
def create_df_fromSchema(schema):
# Takes given column names as a list and creates an empty dataframe with that schema in string format
# Returns dataframe with schema and no values
    fields = []
    
    for i in schema:
        fields.append(StructField(i, StringType(), True))
    
    dfSchema = StructType(fields)
    emptyDF = spark.createDataFrame([],dfSchema)
    
    return emptyDF

Again, a function to test further code which has been maintained for it's usefulness.

In [10]:
def file_schema(address):   
# Function takes a csv from the given address - NOT IN SPARK
# Returns a list of stripped and lowercase column names
    
    headers = [item.lower().strip() for item in list(pd.read_csv(address,nrows=0).columns)]
    
    return headers

We are trying not to modify the original dataset, and making a folder to store the new files is necessary. We use os.path.join() because it is less error prone than working directly with the strings.

In [11]:
def create_folder(newFolderName, newFolderPath):
# Function takes a folder name and a path and creates a new folder/directory
# Function returns folder path
    
    fullFolderPath = os.path.join(newFolderPath, newFolderName)
    
    # Folder might already have been created
    try:
        os.mkdir(fullFolderPath)
        print("new folder created at "+ fullFolderPath)
    except FileExistsError:
        print("folder already exists at " + fullFolderPath)
    
    return fullFolderPath

Obviously, we need to save the modified dataframes. PySpark by default will write one csv per partition, i.e. if the database has been split over 20 processors, you will end up with 20 csv files. Here we are using coalesce() to avoid that - if the datasets were too big for local memory, we would omit this and continue working with the partitioned CSVs, but our maximum filesize here is ~5mb.  

Due to limitations in getting the code to run on the cluster, we had to revert to converting our PySpark dataframes to Pandas and saving through Pandas methods

In [12]:
def save_df(saveDF, saveName, savePath):
# Function takes a dataframe, filename, and filepath and saves the dataframe
# csv will be saved inside a folder with the given saveName, along with a text file _SUCCESS
# Want to use this as it is quicker than converting to PandasDF

# WARNING: WILL OVERWRITE PREVIOUS VERSIONS
    
    # I have not been given write permissions on the cluster and cannot directly save this through HDFS
    # Another workaround is to just move via pandas again
    # If working locally, the first line of code below can be uncommented and used instead
    
# NOTE: coalesce(1) can be used due to small data size - otherwise, must be written as multiple files due to memory restrictions 
    
    # Code which does not work on cluster:
    # saveDF.coalesce(1).write.options(header = True).mode("overwrite").csv(os.path.join(savePath, saveName))
    saveDF.toPandas().to_csv(os.path.join(savePath, saveName))

The base code here was taken from the H600 shapefile usage example, but it has been modified for work with a PySpark UDF. Essentially, the easiest and quickest way to update the coordinate data in older schemas is to pass this function to all rows in a given set of columns in our PySpark dataframe, and in order to do that we need to cast it as a UDF.  

Of course, UDFs are not optimal for use with PySpark, but given the use of the spatial index this would be very complicated to write with PySpark functions and might end up being slower regardless.  

One very interesting point is that the spatial index seems to require generation within the UDF itself. This is unfortunate as it would have been better to generate it once and then broadcast it to the worker nodes. We have instead settled for broadcasting the zones themselves to avoid having to pass at least this to each node with every function call.

In [13]:
# Load the shapefile, this yields a GeoDataFrame that has a row for each zone
zones = gpd.read_file(os.path.join(shapefile_directory, "taxi_zones.shp"))

# Now re-project the coordinates to the CRS EPSG 4326, which is the CRS used in GPS (https://epsg.io/4326)
zones = zones.to_crs({'init':'epsg:4326'})

# Broadcast zones variable because otherwise it's going to be passed across to spark nodes very often
zonesBC = sc.broadcast(zones)

# Create an R-tree index on geopanda dataframe would go here and be broadcast, but it does not work when outside of the UDF
# My theory is that it does not work because it is a class and not a normal python variable
#rtree = zones.sindex
#rtreeBC = sc.broadcast(rtree)

print("TLC shapefile loaded")

def latLong_ID(lat, long):
# Function to calculate NYT locationID from a given latitude and longitude
# Function returns the locationID (cast as a string for now!)

    if lat == "":
        return None
    if long == "":
        return None
    
    if lat == None:
        return None
    if long == None:
        return None
    
    if lat == "NaN":
        return None
    if long == "NaN":
        return None
    
    try:
        latfloat = float(lat)
    except:
        return None
    try:
        longfloat = float(long)
    except:
        return None
    
    query_point = Point(longfloat, latfloat)
    
    # Unknown why, but on the cluster, yellow 2010-08 has issues with "RTreeError: Coordinates must not have minimums more than maximums"
    # This only happens on the cluster, but additional error checking can be built in regardless
    if query_point.is_valid:
        # Use spatial index to avoid looping through all zones
        rtree = zonesBC.value.sindex

        for i in rtree.intersection(query_point.bounds):
            zoneID = zonesBC.value.iloc[i]
            if zoneID.geometry.contains(query_point):
                return str(zoneID.LocationID)
    else:
        return None
    
    return str("failed to match")

# Define function as a UDF for pyspark usage
udflatLong_ID = udf(latLong_ID, StringType())

print("Location conversion function defined as UDF")

def add_locationID(dfLocMod):
# Adds the location ID as a new dataframe column, calculated from other columns in the dataframe
# Takes a dataframe
# Returns that dataframe with modifications
    
    dfLocMod = dfLocMod.withColumn('pulocationid', udflatLong_ID('pickup_latitude', 'pickup_longitude'))
    dfLocMod = dfLocMod.withColumn('dolocationid', udflatLong_ID('dropoff_latitude', 'dropoff_longitude'))

    return dfLocMod

TLC shapefile loaded
Location conversion function defined as UDF


If you want to test the location function, this will let you check that it's working properly.

In [14]:
# # location test
# print(latLong_ID("40.677341461181641","-73.943748474121094"))
# print(latLong_ID("40.683746337890625","-73.927146911621094"))
# dfxxx = spark.createDataFrame([('40.677341461181641','-73.943748474121094'),('40.683746337890625','-73.927146911621094')],['latitude','longitude'])
# dfxxx = dfxxx.withColumn('pulocationid', udflatLong_ID('latitude', 'longitude'))
# dfxxx.show()

Of course, we need the filepaths and the files broken down by schema from 2.1

In [15]:
# Recover 2.1 variables from storage
# %store -r filepaths
filepaths = pickle.load(open(v_direc + "filepaths",'rb'))
# %store -r fhv_sameSchema
fhv_sameSchema = pickle.load(open(v_direc + "fhv_sameSchema",'rb'))
# %store -r fhvhv_sameSchema
fhvhv_sameSchema = pickle.load(open(v_direc + "fhvhv_sameSchema",'rb'))
# %store -r green_sameSchema
green_sameSchema = pickle.load(open(v_direc + "green_sameSchema",'rb'))
# %store -r yellow_sameSchema
yellow_sameSchema = pickle.load(open(v_direc + "yellow_sameSchema",'rb'))

# Updating the respective schemas

Below, we use the variables we just removed from storage to iterate through lists of lists, where each sub-list contains filepaths where the schema is the same. We can then apply one block of functions to each sub-list which brings it into line with the desired schema.  

Given the analysis just completed in 2.1, the function blocks here have been created manually by tracking which transformations are required in each area. Since we have completed 2.1 results and analysis, this was simple. Given the structure of the code below, converting it to a function which takes a general file from the dataset as an argument would use the lists of which file belongs in which schema and it would be a matter of matching them. If the schema of this general file was unknown at the time of running 2.1, we could always just re-run 2.1's code - assuming the file belongs to an already existing schema (which appears to be a safe assumption), 2.1 would correctly classify it for use here.

We could have also first merged all months inside a particular taxi type or sub-schema set into one dataframe and proceeded from there, which would possibly make better use of spark's parallelisation. However, given that these monthly data sets are a small sample of the real data (so the full dataset would make use of the parallelisation anyway), we proceed on a monthly basis. 

## Updating FHV schemas

In [16]:
# Create folder for output files:
fhv_folderPath = create_folder("integrated FHV", save_directory)
#%store fhv_folderPath
with open(v_direc + "fhv_folderPath",'wb') as fhvfolderPath:
    pickle.dump(fhv_folderPath,fhvfolderPath)   

folder already exists at /home/epb123/output/integrated FHV


In [17]:
# Want to match all files to the sorted version of the final schema
# With identified schema type, rename columns as necessary then drop columns as necessary, sort all columns to ensure correct ordering

#fhv_sameSchema[0] is older
    #[0] requires:
        #renaming locationid to pulocationid
        #renaming pickup_date to pickup_datetime
        #addition of dolocationid
        #addition of dropoff_datetime
        #addition of sr_flag
        #sorting of final columns
for i in fhv_sameSchema[0]:
    if cluster == True:
        i = data_directory + ntpath.basename(i)
    dfi = create_df(i)
    dfi = replace_colNames(dfi, ['locationid', 'pickup_date'], ['pulocationid', 'pickup_datetime'])
    dfi = add_empty_cols(dfi, ['dolocationid', 'dropoff_datetime', 'sr_flag'])
    dfi = dfi.select(sorted(dfi.columns))
    save_df(dfi, ntpath.basename(i), fhv_folderPath)

#fhv_sameSchema[1] is the same as [4], but it does not have variable sr_flag
    #[1] requires:
        #addition of sr_flag
        #sorting of final columns
for i in fhv_sameSchema[1]:
    if cluster == True:
        i = data_directory + ntpath.basename(i)
    dfi = create_df(i)
    # Using withColumn as just one column here
    dfi = dfi.withColumn("sr_flag",lit(None).cast(StringType()))
    dfi = dfi.select(sorted(dfi.columns))
    save_df(dfi, ntpath.basename(i), fhv_folderPath)
    
#fhv_sameSchema[2] is the same as [4] - it only requires sorting 
for i in fhv_sameSchema[2]:
    if cluster == True:
        i = data_directory + ntpath.basename(i)
    dfi = create_df(i)
    dfi = dfi.select(sorted(dfi.columns))
    save_df(dfi, ntpath.basename(i), fhv_folderPath)

#fhv_sameSchema[3] is the same as [4], but with dispatching_base_number containing data previously in dispatching_base_num
    #[3] requires:
        #the column dispatching_base_num to be dropped
        #dispatching_base_number to be renamed to dispatching_base_num
        #sorting of final columns
for i in fhv_sameSchema[3]:
    if cluster == True:
        i = data_directory + ntpath.basename(i)
    dfi = create_df(i)
    dfi = dfi.drop("dispatching_base_num")
    dfi = replace_colNames(dfi, ['dispatching_base_number'], ['dispatching_base_num'])
    dfi = dfi.select(sorted(dfi.columns))
    save_df(dfi, ntpath.basename(i), fhv_folderPath)
    
#fhv_sameSchema[4] is the final schema - it only requires sorting
for i in fhv_sameSchema[4]:
    if cluster == True:
        i = data_directory + ntpath.basename(i)
    dfi = create_df(i)
    dfi = dfi.select(sorted(dfi.columns))
    save_df(dfi, ntpath.basename(i), fhv_folderPath)

print("FHV schema integration complete")

opened hdfs:///user/epb199/data/fhv_tripdata_2015-01.csv
opened hdfs:///user/epb199/data/fhv_tripdata_2015-02.csv
opened hdfs:///user/epb199/data/fhv_tripdata_2015-03.csv
opened hdfs:///user/epb199/data/fhv_tripdata_2015-04.csv
opened hdfs:///user/epb199/data/fhv_tripdata_2015-05.csv
opened hdfs:///user/epb199/data/fhv_tripdata_2015-06.csv
opened hdfs:///user/epb199/data/fhv_tripdata_2015-07.csv
opened hdfs:///user/epb199/data/fhv_tripdata_2015-08.csv
opened hdfs:///user/epb199/data/fhv_tripdata_2015-09.csv
opened hdfs:///user/epb199/data/fhv_tripdata_2015-10.csv
opened hdfs:///user/epb199/data/fhv_tripdata_2015-11.csv
opened hdfs:///user/epb199/data/fhv_tripdata_2015-12.csv
opened hdfs:///user/epb199/data/fhv_tripdata_2016-01.csv
opened hdfs:///user/epb199/data/fhv_tripdata_2016-02.csv
opened hdfs:///user/epb199/data/fhv_tripdata_2016-03.csv
opened hdfs:///user/epb199/data/fhv_tripdata_2016-04.csv
opened hdfs:///user/epb199/data/fhv_tripdata_2016-05.csv
opened hdfs:///user/epb199/data

## Updating FHVHV schemas

In [18]:
# Create folder for output files:
fhvhv_folderPath = create_folder("integrated FHVHV", save_directory)
#%store fhvhv_folderPath
with open(v_direc + "fhvhv_folderPath",'wb') as fhvhvfolderPath:
    pickle.dump(fhvhv_folderPath,fhvhvfolderPath)   

folder already exists at /home/epb123/output/integrated FHVHV


In [19]:
# Want to match all files to the sorted version of the final schema
# With identified schema type, rename columns as necessary then drop columns as necessary, sort all columns to ensure correct ordering

# FHVHV schema never changed - sort and cast to lowercase and strip to be sure
for i in fhvhv_sameSchema[0]:
    if cluster == True:
        i = data_directory + ntpath.basename(i)
    dfi = create_df(i)
    dfi = dfi.select(sorted(dfi.columns))
    save_df(dfi, ntpath.basename(i), fhvhv_folderPath)

print("FHVHV schema integration complete")

opened hdfs:///user/epb199/data/fhvhv_tripdata_2019-02.csv
opened hdfs:///user/epb199/data/fhvhv_tripdata_2019-03.csv
opened hdfs:///user/epb199/data/fhvhv_tripdata_2019-04.csv
opened hdfs:///user/epb199/data/fhvhv_tripdata_2019-05.csv
opened hdfs:///user/epb199/data/fhvhv_tripdata_2019-06.csv
opened hdfs:///user/epb199/data/fhvhv_tripdata_2020-01.csv
opened hdfs:///user/epb199/data/fhvhv_tripdata_2020-03.csv
opened hdfs:///user/epb199/data/fhvhv_tripdata_2020-04.csv
opened hdfs:///user/epb199/data/fhvhv_tripdata_2020-05.csv
opened hdfs:///user/epb199/data/fhvhv_tripdata_2020-06.csv
FHVHV schema integration complete


## Updating Green schemas

In [20]:
# Create folder for output files:
green_folderPath = create_folder("integrated Green", save_directory)
#%store green_folderPath
with open(v_direc + "green_folderPath",'wb') as greenfolderPath:
    pickle.dump(green_folderPath,greenfolderPath)   

folder already exists at /home/epb123/output/integrated Green


In [21]:
# Want to match all files to the sorted version of the final schema
# With identified schema type, rename columns as necessary then drop columns as necessary, sort all columns to ensure correct ordering

#green_sameSchema[0]
    #[0] requires:
        #conversion of location columns
        #addition of columns congestion_surcharge, improvement_surcharge
        #sorting
for i in green_sameSchema[0]:
    if cluster == True:
        i = data_directory + ntpath.basename(i)
    dfi = create_df(i)
    dfi = add_locationID(dfi)
    dfi = dfi.drop(*["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"])
    dfi = add_empty_cols(dfi,["improvement_surcharge", "congestion_surcharge"])
    dfi = dfi.select(sorted(dfi.columns))
    save_df(dfi, ntpath.basename(i), green_folderPath)

#green_sameSchema[1]
    #[1] requires:
        #conversion of location columns
        #addition of column congestion_surcharge
        #sorting
for i in green_sameSchema[1]:
    if cluster == True:
        i = data_directory + ntpath.basename(i)
    dfi = create_df(i)
    dfi = add_locationID(dfi)
    dfi = dfi.drop(*["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"])
    dfi = dfi.withColumn("congestion_surcharge",lit(None).cast(StringType()))
    dfi = dfi.select(sorted(dfi.columns))
    save_df(dfi, ntpath.basename(i), green_folderPath)

#green_sameSchema[2]
    #[2] requires:
        #addition of column congestion_surcharge
        #sorting
for i in green_sameSchema[2]:
    if cluster == True:
        i = data_directory + ntpath.basename(i)
    dfi = create_df(i)
    dfi = dfi.withColumn("congestion_surcharge",lit(None).cast(StringType()))
    dfi = dfi.select(sorted(dfi.columns))
    save_df(dfi, ntpath.basename(i), green_folderPath)

#green_sameSchema[3] is the final schema
    #[3] requires:
        #sorting
for i in green_sameSchema[3]:
    if cluster == True:
        i = data_directory + ntpath.basename(i)
    dfi = create_df(i)
    dfi = dfi.select(sorted(dfi.columns))
    save_df(dfi, ntpath.basename(i), green_folderPath)
    
print("Green schema integration complete")

opened hdfs:///user/epb199/data/green_tripdata_2013-08.csv
opened hdfs:///user/epb199/data/green_tripdata_2013-09.csv
opened hdfs:///user/epb199/data/green_tripdata_2013-10.csv
opened hdfs:///user/epb199/data/green_tripdata_2013-11.csv
opened hdfs:///user/epb199/data/green_tripdata_2013-12.csv
opened hdfs:///user/epb199/data/green_tripdata_2014-01.csv
opened hdfs:///user/epb199/data/green_tripdata_2014-02.csv
opened hdfs:///user/epb199/data/green_tripdata_2014-03.csv
opened hdfs:///user/epb199/data/green_tripdata_2014-04.csv
opened hdfs:///user/epb199/data/green_tripdata_2014-05.csv
opened hdfs:///user/epb199/data/green_tripdata_2014-06.csv
opened hdfs:///user/epb199/data/green_tripdata_2014-07.csv
opened hdfs:///user/epb199/data/green_tripdata_2014-08.csv
opened hdfs:///user/epb199/data/green_tripdata_2014-09.csv
opened hdfs:///user/epb199/data/green_tripdata_2014-10.csv
opened hdfs:///user/epb199/data/green_tripdata_2014-11.csv
opened hdfs:///user/epb199/data/green_tripdata_2014-12.c

## Updating Yellow schemas

In [22]:
# Create folder for output files:
yellow_folderPath = create_folder("integrated Yellow", save_directory)
#%store yellow_folderPath
with open(v_direc + "yellow_folderPath",'wb') as yellowfolderPath:
    pickle.dump(yellow_folderPath,yellowfolderPath)   

folder already exists at /home/epb123/output/integrated Yellow


In [23]:
# Want to match all files to the sorted version of the final schema
# With identified schema type, rename columns as necessary then drop columns as necessary, sort all columns to ensure correct ordering

#yellow_sameSchema[0]
    #[0] requires:
        #renaming start_lon to pickup_longitude
        #renaming total_amt to total_amount
        #renaming tip_amt to tip_amount
        #renaming end_lon to dropoff_longitude
        #renaming tolls_amt to tolls_amount
        #renaming fare_amt to fare_amount
        #renaming vendor_name to vendorid
        #renaming trip_pickup_datetime to tpep_pickup_datetime
        #renaming trip_dropoff_datetime to tpep_dropoff_datetime
        #renaming store_and_forward to store_and_fwd_flag
        #renaming end_lat to dropoff_latitude
        #renaming start_lat to pickup_latitude
        #renaming rate_code to ratecodeid
        #renaming surcharge to extra (the values appear to match, based on 2.3 analysis)
        #conversion of location columns
        #addition of column congestion_surcharge
        #addition of column improvement_surcharge
        #sorting
for i in yellow_sameSchema[0]:
    if cluster == True:
        i = data_directory + ntpath.basename(i)
    dfi = create_df(i)
    dfi = replace_colNames(dfi,\
                            ['surcharge', 'rate_code', 'start_lat', 'end_lat', 'store_and_forward', 'start_lon', 'total_amt', 'tip_amt', 'end_lon', 'tolls_amt', 'fare_amt', 'vendor_name', 'trip_pickup_datetime', 'trip_dropoff_datetime'],\
                            ['extra', 'ratecodeid', 'pickup_latitude', 'dropoff_latitude', 'store_and_fwd_flag', 'pickup_longitude', 'total_amount', 'tip_amount', 'dropoff_longitude', 'tolls_amount', 'fare_amount', 'vendorid', 'tpep_pickup_datetime', 'tpep_dropoff_datetime'])
    dfi = add_locationID(dfi)   
    dfi = dfi.drop(*["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"])
    dfi = add_empty_cols(dfi,["improvement_surcharge", "congestion_surcharge"])
    dfi = dfi.select(sorted(dfi.columns))
    save_df(dfi, ntpath.basename(i), yellow_folderPath)
    
#yellow_sameSchema[1]
    #[1] requires:
        #renaming vendor_id to vendorid
        #renaming pickup_datetime to tpep_pickup_datetime
        #renaming dropoff_datetime to tpep_dropoff_datetime
        #renaming rate_code to ratecodeid
        #renaming surcharge to extra
        #conversion of location columns
        #addition of column congestion_surcharge
        #addition of column improvement_surcharge
        #sorting
for i in yellow_sameSchema[1]:
    if cluster == True:
        i = data_directory + ntpath.basename(i)
    dfi = create_df(i)
    dfi = replace_colNames(dfi, ['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'rate_code', 'surcharge'],\
                           ['vendorid', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'ratecodeid', 'extra'])
    dfi = add_locationID(dfi)   
    dfi = dfi.drop(*["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"])
    dfi = add_empty_cols(dfi,["improvement_surcharge", "congestion_surcharge"])
    dfi = dfi.select(sorted(dfi.columns))
    save_df(dfi, ntpath.basename(i), yellow_folderPath)

#yellow_sameSchema[2]
    #[2] requires:
        #conversion of location columns
        #addition of column congestion_surcharge
        #sorting
for i in yellow_sameSchema[2]:
    if cluster == True:
        i = data_directory + ntpath.basename(i)
    dfi = create_df(i)
    dfi = add_locationID(dfi)
    dfi = dfi.drop(*["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"])
    dfi = dfi.withColumn("congestion_surcharge",lit(None).cast(StringType()))
    dfi = dfi.select(sorted(dfi.columns))
    save_df(dfi, ntpath.basename(i), yellow_folderPath)

#yellow_sameSchema[3]
    #[3] requires:
        #addition of column congestion_surcharge
        #sorting
for i in yellow_sameSchema[3]:
    if cluster == True:
        i = data_directory + ntpath.basename(i)
    dfi = create_df(i)
    dfi = dfi.withColumn("congestion_surcharge",lit(None).cast(StringType()))
    dfi = dfi.select(sorted(dfi.columns))
    save_df(dfi, ntpath.basename(i), yellow_folderPath)

#yellow_sameSchema[4]
    #[4] requires:
        #sorting
for i in yellow_sameSchema[4]:
    if cluster == True:
        i = data_directory + ntpath.basename(i)
    dfi = create_df(i)
    dfi = dfi.select(sorted(dfi.columns))
    save_df(dfi, ntpath.basename(i), yellow_folderPath)
    
print("Yellow schema integration complete")

opened hdfs:///user/epb199/data/yellow_tripdata_2009-01.csv
opened hdfs:///user/epb199/data/yellow_tripdata_2009-02.csv
opened hdfs:///user/epb199/data/yellow_tripdata_2009-03.csv
opened hdfs:///user/epb199/data/yellow_tripdata_2009-04.csv
opened hdfs:///user/epb199/data/yellow_tripdata_2009-05.csv
opened hdfs:///user/epb199/data/yellow_tripdata_2009-06.csv
opened hdfs:///user/epb199/data/yellow_tripdata_2009-07.csv
opened hdfs:///user/epb199/data/yellow_tripdata_2009-08.csv
opened hdfs:///user/epb199/data/yellow_tripdata_2009-09.csv
opened hdfs:///user/epb199/data/yellow_tripdata_2009-10.csv
opened hdfs:///user/epb199/data/yellow_tripdata_2009-11.csv
opened hdfs:///user/epb199/data/yellow_tripdata_2009-12.csv
opened hdfs:///user/epb199/data/yellow_tripdata_2010-01.csv
opened hdfs:///user/epb199/data/yellow_tripdata_2010-02.csv
opened hdfs:///user/epb199/data/yellow_tripdata_2010-03.csv
opened hdfs:///user/epb199/data/yellow_tripdata_2010-04.csv
opened hdfs:///user/epb199/data/yellow_t

# Cleanup

As mentioned previously, the PySpark save option results in a bit of a mess in the file system. The result of the previous integration code is a new folder containing one folder per each integrated file, where the file name is almost gibberish and where there is an additional text file stating "success".  

This code cleans this up - it renames each file properly and moves it to a common folder.

When running on the cluster and opening/saving with pandas methods, this is not needed.

In [24]:
def cleanup(folderpath):
# Function which takes the folderpath for integrated data
# Function moves all CSV's to one folder and renames them properly
# Function returns the filepath of the new csvs
# DO NOT CLEAN MORE THAN ONCE

    newPath = create_folder("all CSV", folderpath)
    testFilePath = []

    for (root,direc,files) in os.walk(folderpath):
        for file in files:
            if file.endswith('.csv'):
                if os.path.basename(root) != os.path.basename(newPath):
                    os.rename(os.path.join(root,file),os.path.join(newPath,os.path.basename(root)))
                    testFilePath.append(os.path.join(newPath,os.path.basename(root)))

    testFilePath.sort()
    return testFilePath

In [25]:
# fhv_integratedPaths = cleanup(fhv_folderPath)
# fhvhv_integratedPaths = cleanup(fhvhv_folderPath)
# green_integratedPaths = cleanup(green_folderPath)
# yellow_integratedPaths = cleanup(yellow_folderPath)

# elif cluster == True:
fhv_integratedPaths = []
for i in os.listdir(fhv_folderPath):
    if i != ".ipynb_checkpoints":
        fhv_integratedPaths.append(os.path.join(fhv_folderPath,i))
fhv_integratedPaths.sort()

fhvhv_integratedPaths = []
for i in os.listdir(fhvhv_folderPath):
    if i != ".ipynb_checkpoints":
        fhvhv_integratedPaths.append(os.path.join(fhvhv_folderPath,i))
fhvhv_integratedPaths.sort()

green_integratedPaths = []
for i in os.listdir(green_folderPath):
    if i != ".ipynb_checkpoints":
        green_integratedPaths.append(os.path.join(green_folderPath,i))
green_integratedPaths.sort()

yellow_integratedPaths = []
for i in os.listdir(yellow_folderPath):
    if i != ".ipynb_checkpoints":
        yellow_integratedPaths.append(os.path.join(yellow_folderPath,i))
yellow_integratedPaths.sort()


# fhv_integratedPaths = sorted([os.path.join(fhv_folderPath, fhvfile) for fhvfile in os.listdir(fhv_folderPath)])
# fhvhv_integratedPaths = sorted([os.path.join(fhvhv_folderPath, fhvhvfile) for fhvhvfile in os.listdir(fhvhv_folderPath)])
# green_integratedPaths = sorted([os.path.join(green_folderPath, greenfile) for greenfile in os.listdir(green_folderPath)])
# yellow_integratedPaths = sorted([os.path.join(yellow_folderPath, yellowfile) for yellowfile in os.listdir(yellow_folderPath)])

#%store fhv_integratedPaths
with open(v_direc + "fhv_integratedPaths",'wb') as fhvintegratedPaths:
    pickle.dump(fhv_integratedPaths,fhvintegratedPaths)   
#%store fhvhv_integratedPaths
with open(v_direc + "fhvhv_integratedPaths",'wb') as fhvhvintegratedPaths:
    pickle.dump(fhvhv_integratedPaths,fhvhvintegratedPaths)  
#%store green_integratedPaths
with open(v_direc + "green_integratedPaths",'wb') as greenintegratedPaths:
    pickle.dump(green_integratedPaths,greenintegratedPaths)  
#%store yellow_integratedPaths
with open(v_direc + "yellow_integratedPaths",'wb') as yellowintegratedPaths:
    pickle.dump(yellow_integratedPaths,yellowintegratedPaths)

print("Paths cleaned and stored")

Paths cleaned and stored


# Testing

Testing code below verifies that the algorithm above has in fact resulted in new files which have identical schemas. It is just a modified version of the 2.1 code.

In [26]:
def column_analysis(sub_data):
# Function to analyse changes in data column labels, taking filepath list as inputs
# Function returns a tuple containing:
# Return 1. dataframe containing columns added to files when comparing vs. previous year, and columns dropped from files when comparing vs. subsequent year
# Return 2. list of lists, where each sub-list contains the dates as a tuple (year, month) which have a common schema
# Return 3. dataframe of variables where the index changed from one year to another
    # Create variables to capture changes - column changes as a set and a list of dates with the same schema
    col_changes = {}
    same_schema = []
    order_changesDf = pd.DataFrame(columns = ['file 0', 'file 1', 'index 0', 'index 1'])
    
    # Variable to store dates with the current schema
    current_schema = []

    # Checking if column names are the same - iterates over all files in sub-dataset
    for j in range (len(sub_data)):        
        # Storing first row of the csv and ensuring all headers are lowercase without leading and trailing white spaces
        df_j= pd.read_csv(sub_data[j],nrows=0)
        l1 = [item.lower().strip() for item in list(df_j.columns)]
        
        jdate = get_pathdate(sub_data[j])
        
        if j == 0:
        # Storing first csv's columns as l0
            l0 = [item.lower().strip() for item in list(df_j.columns)]
            #REPLACE WITH l1[:]???
            current_schema.append(sub_data[j])  
        
        elif set(l1) != set(l0):
        # Comparing the column names in set format, as order should not matter here
            l0_name = ntpath.basename(sub_data[j-1])
            l1_name = ntpath.basename(sub_data[j])
            
            # Elements in l0 but not in l1 => dropped elements
            col_changes['dropped from '+l0_name] = set(l0) - set(l1)
            # Elements in l1 and not in l0 => added elements
            col_changes['added to '+l1_name] = set(l1) - set(l0)
            # Reset schema tracker
            same_schema.append(current_schema[:])
            current_schema.clear()
            current_schema.append(sub_data[j])                      
            
            # Tracking column changes
            for k in range(len(l0)):
                try:
                    if l0[k] != l1[k]:
                        for j in range(len(l1)):
                            if l0[k] == l1[j]:
                                order_changesDf.loc[l0[k]+', y:'+str(jdate[0])+", m:"+str(jdate[1])] = [l0_name, l1_name, k, j]
                except IndexError:
                    for j in range(len(l1)):
                        if l0[k] == l1[j]:
                            order_changesDf.loc[l0[k]+', y:'+str(jdate[0])+", m:"+str(jdate[1])] = [l0_name, l1_name, k, j]
                    
            # Reset l0 for next loop
            l0 = l1
        
        elif set(l1) == set(l0):
            current_schema.append(sub_data[j])
            # Append the current_schema if we have reached the last record
            if j == len(sub_data) - 1:
                same_schema.append(current_schema[:])

            # Tracking column changes
            if l0 != l1:
                for k in range(len(l0)):
                    try:
                        if l0[k] != l1[k]:
                            for j in range(len(l1)):
                                if l0[k] == l1[j]:
                                    order_changesDf.loc[l0[k]+', y:'+str(jdate[0])+", m:"+str(jdate[1])] = [l0_name, l1_name, k, j]
                    except IndexError:
                        for j in range(len(l1)):
                            if l0[k] == l1[j]:
                                order_changesDf.loc[l0[k]+', y:'+str(jdate[0])+", m:"+str(jdate[1])] = [l0_name, l1_name, k, j]
            
            # Reset l0 for next loop
            l0 = l1
        
        else:
            print("error reading columns")
                
    return (pd.DataFrame.from_dict(col_changes, orient='index'), same_schema, order_changesDf)

In [27]:
def get_pathdate(givenpath):
# Function to get the date from a given filepath
# Returns the date as a tuple in (year, month) form
        return (int(givenpath[-11:-7]), int(givenpath[-6:-4]))

In [28]:
fhv_test = column_analysis(fhv_integratedPaths)
fhv_test[0]

In [29]:
try:
    fhv_test[1][1]
    print("FAILURE")
except:
    print("SUCCESS")

SUCCESS


In [30]:
fhv_test[2]

Unnamed: 0,file 0,file 1,index 0,index 1


In [31]:
fhvhv_test = column_analysis(fhvhv_integratedPaths)
fhvhv_test[0]

In [32]:
try:
    fhvhv_test[1][1]
    print("FAILURE")
except:
    print("SUCCESS")

SUCCESS


In [33]:
fhvhv_test[2]

Unnamed: 0,file 0,file 1,index 0,index 1


In [34]:
green_test = column_analysis(green_integratedPaths)
green_test[0]

In [35]:
try:
    green_test[1][1]
    print("FAILURE")
except:
    print("SUCCESS")

SUCCESS


In [36]:
green_test[2]

Unnamed: 0,file 0,file 1,index 0,index 1


In [37]:
yellow_test = column_analysis(yellow_integratedPaths)
yellow_test[0]

In [38]:
try:
    yellow_test[1][1]
    print("FAILURE")
except:
    print("SUCCESS")

SUCCESS


In [39]:
yellow_test[2]

Unnamed: 0,file 0,file 1,index 0,index 1
