# Data integration

For each sub-dataset, write (and execute) code that converts a file (using possibly an old schema) into a file that has the new, latest schema version.

Your conversion code should not modify the original files, but instead create a new file. Be sure to explain the design behind your conversion functions!

The data integration step is highly parallellizable. Therefore, your solution on this part
**must** be written in Spark

WARNING: this notebook assumes that:

- The data are in "MY_PARENT_FOLDER/data/sampled/" folder. You can run the bash script "download_metadata.sh" to download data and metadata in the correct folders to execute the jupyter notebooks.
- The data are sampled to be run on a personnal computer.

In [1]:
# Imports go here
import os
import glob
import pandas as pd
import os 
import shutil
import datetime
import geopandas as gpd
from datetime import date
from datetime import datetime
from pyspark.sql.functions import col, lit
import pyspark.sql.functions as f
from shutil import copyfile
from shapely.geometry import Point
os.environ['PYSPARK_SUBMIT_ARGS'] ="--conf spark.driver.memory=3g  pyspark-shell"
from pyspark.sql import SparkSession
try: 
    spark
    print("Spark application already started. Terminating existing application and starting new one")
    spark.stop()
except: 
    pass
# Create a new spark session (note, the * indicates to use all available CPU cores)
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("H600 L-Group") \
    .getOrCreate()
#When dealing with RDDs, we work the sparkContext object. See https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext
sc=spark.sparkContext
#in local mode, you will be able to access the Spark GUI at http://localhost:4040

In [2]:
#create cleaned data directories
try :  
    os.path.isdir("data/cleaned")
except OSError:
    os.mkdir("data/cleaned")
    print ("Creation of the directory data/cleaned failed")
else:
    print ("Successfully created the directory data/cleaned")

list_taxi = ["yellow", "green", "fhv", "fhvhv"]
#list_taxi = ["green"]
for taxi_brand in list_taxi :
    path = "data/cleaned/%s" %(taxi_brand)
    # List the file from the same taxi company brand 
    try:
        os.path.isdir(path)
    except OSError:
        print ("Creation of the directory %s" % path)
        os.mkdir(path)
    else:
        print ("Successfully created the directory %s " % path)
    

Successfully created the directory data/cleaned
Successfully created the directory data/cleaned/yellow 
Successfully created the directory data/cleaned/green 
Successfully created the directory data/cleaned/fhv 
Successfully created the directory data/cleaned/fhvhv 


## 1. FHVHV files

From previous analyses we saw that header was consistent across all then fhvhv files.
We then donc need to modify them.

In [123]:
source_dir= '/data/sampled/'       
for filename in glob.glob(os.path.join(source_dir,'fhvhv_*.csv')):
    shutil.copy(filename, 'data/cleanned/fhvhv')

## 2.FHV files

From previous analyse we decide to use as reference for the FHV taxi files the following schema:

['dispatching_base_num', 'pickup_datetime', 'dropoff_datetime', 'pulocationid', 'dolocationid', 'sr_flag'] 
 
We therefore need to apply somes transformations for creating new uniform files according to the time period previously defined and saved in the file Change_date_fhv.csv:

- Change schema 1 : 
            a) Add to the files empty columns for 'dropoff_datetime', 'DOLocationID' and 'SR_Flag'. 
            b) Change the columns name 'Pickup_date' by 'pickup_datetime', 'locationID' by 'PULocationID',        "Dispatching_base_num" by "dispatching_base_num".

- Change schema 2 : 
            a) Add to the files empty columns for 'DOLocationID' and 'SR_Flag'. 
            b) Change the columns name 'Pickup_date' by 'Pickup_DateTime', 'Dropoff_datetime' by 'dropoff_datetime', "Dispatching_base_num" by "dispatching_base_num".
            
- Change schema 3 : 
            a) Change the columns name 'Pickup_date' by 'Pickup_DateTime', 'Dropoff_datetime' by 'dropoff_datetime', "Dispatching_base_num" by "dispatching_base_num".
            
- Change schema 4 :
            a) Change the columns name 'Pickup_date' by 'Pickup_DateTime', 'Dropoff_datetime' by 'dropoff_datetime', "Dispatching_base_number" by "dispatching_base_num".
            b) Remove the double column Dispatching_base_num with no value
          
- Final schema 5 :
            NO change


In [3]:
source_dir= 'data/sampled/'
clean_dir = 'data/cleaned/'
taxi_brand='fhv'
list_files = []
nb_files=0
# List the file from the same taxi company brand 
for file in glob.glob("data/sampled/%s_*.csv" %(taxi_brand)):
    nb_files = nb_files+1
    # Save in list the files name
    list_files.append(file)
    # Order by date the file list
    list_files.sort()


# Open the date change file
df = pd.read_csv("data/Change_date_%s.csv" %(taxi_brand), sep=',', header=None)
dating_schema = [ datetime.strptime(x, '%Y-%m-%d') for x in df[1] ]
for yr in range(0,nb_files):
    if os.path.isfile(clean_dir+taxi_brand+'/'+list_files[yr][len(taxi_brand)+14::]) == False :
        year = int(list_files[yr][len(taxi_brand)+23:len(taxi_brand)+27])
        month = int(list_files[yr][len(taxi_brand)+28:len(taxi_brand)+30])
        date_file = date(year,month,1)
        fhv_DF = (spark.read
                    .option("sep", ",")
                    .option("header", True)
                    .option("inferSchema", True)
                    .csv(list_files[yr]) )
        for nb_schema in range(0,len(dating_schema)-1):
            print(date_file)
            if date_file >= dating_schema[nb_schema].date() and  date_file < dating_schema[nb_schema+1].date():
                if nb_schema+1 == 1 :
                    fhv1_DF = fhv_DF.withColumn("dropoff_datetime",lit('null'))\
                           .withColumn("DOLocationID",lit('null'))\
                           .withColumn("SR_Flag",lit('null'))\
                           .select(
                            col("Dispatching_base_num").alias("dispatching_base_num"),
                            col("Pickup_date").alias("pickup_datetime"),
                            "dropoff_datetime",
                            col("locationID").alias("PULocationID"),
                            "DOLocationID",
                            "SR_Flag")
                    fhv1_DF.toPandas().to_csv(clean_dir+taxi_brand+'/'+list_files[yr][len(taxi_brand)+14::])
                elif nb_schema+1 == 2 :
                    fhv2_DF = fhv_DF.withColumn("DOLocationID",lit('null'))\
                            .withColumn("SR_Flag",lit('null'))\
                            .select(
                                col("Dispatching_base_num").alias("dispatching_base_num"),
                                col("Pickup_DateTime").alias("pickup_datetime"),
                                col("Dropoff_datetime").alias("dropoff_datetime"),
                                "PULocationID",
                                "DOLocationID",
                                "SR_Flag")
                    fhv2_DF.toPandas().to_csv(clean_dir+taxi_brand+'/'+list_files[yr][len(taxi_brand)+14::])
                elif nb_schema+1 == 3 :
                    fhv3_DF = fhv_DF.select(
                                col("Dispatching_base_num").alias("dispatching_base_num"),
                                col("Pickup_DateTime").alias("pickup_datetime"),
                                col("Dropoff_datetime").alias("dropoff_datetime"),
                                "PULocationID",
                                "DOLocationID",
                                "SR_Flag")
                    fhv3_DF.toPandas().to_csv(clean_dir+taxi_brand+'/'+list_files[yr][len(taxi_brand)+14::])
                elif nb_schema+1 == 4 :
                    fhv4_DF = fhv_DF.select(
                                col("Dispatching_base_number").alias("dispatching_base_num"),
                                col("Pickup_DateTime").alias("pickup_datetime"),
                                col("Dropoff_datetime").alias("dropoff_datetime"),
                                "PULocationID",
                                "DOLocationID",
                                "SR_Flag")
                    fhv4_DF.toPandas().to_csv(clean_dir+taxi_brand+'/'+list_files[yr][len(taxi_brand)+14::])
                elif nb_schema+1 == 5 :
                    fhv5_DF = fhv_DF.toPandas().to_csv(clean_dir+taxi_brand+'/'+list_files[yr][len(taxi_brand)+14::])
        if date_file == dating_schema[5].date() :
            fhv5_DF = fhv_DF.toPandas().to_csv(clean_dir+taxi_brand+'/'+list_files[yr][len(taxi_brand)+14::])
new_files = len(os.listdir('data/cleaned/'+taxi_brand))
if new_files == nb_files :
    print("All the %i files are well integrated !" %(new_files))
else :
    print("[ERROR] %i files on %i files have been integrated ..." %(new_files, nb_files))

All the 64 files are well integrated !


## 3.Green files

From previous analyse we decide to use as reference for the GREEN taxi files the following schema:

['vendorid', 'lpep_pickup_datetime', 'lpep_dropoff_datetime', 'store_and_fwd_flag', 'ratecodeid', 'pulocationid', 'dolocationid', 'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'ehail_fee', 'improvement_surcharge', 'total_amount', 'payment_type', 'trip_type', 'congestion_surcharge']
 
We therefore need to apply somes transformations for creating new uniform files according to the time period previously defined and saved in the file Change_date_green.csv:

- Change schema 1 : 
            a) Two new columns are add : congestion_surcharge and improvement_surcharge
            b) The columns 'pickup_longitude', 'pickup_latitude' and 'dropoff_longitude', 'dropoff_latitude' are respectively changed by 'pulocationid' and 'dolocationid'. The transformation use geopandas to transform lat-lon position to location id.
            b) For all the others columns the upper case format letters are changed by lower case format.
           
- Change in schema 2 :
            a) One new column is add : congestion_surcharge
            b) The columns 'pickup_longitude', 'pickup_latitude' and 'dropoff_longitude', 'dropoff_latitude' are respectively changed by 'pulocationid' and 'dolocationid'. The transformation use geopandas to transform lat-lon position to location id.
            b) For all the others columns the upper case format letters are changed by lower case format.

- Change in schema 3 :
            a) One new column is add : congestion_surcharge
            b) For all the others columns the upper case format letters are changed by lower case format.

- Final schema 4 :
            NO change
            

In [5]:
source_dir= 'data/sampled/'
clean_dir = 'data/cleaned/'
taxi_brand='green'
list_files = []
nb_files=0
# List the file from the same taxi company brand 
for file in glob.glob("data/sampled/%s_*.csv" %(taxi_brand)):
    nb_files = nb_files+1
    # Save in list the files name
    list_files.append(file)
    # Order by date the file list
    list_files.sort()

# Creation of a function to convert lat-lon into location ID
def convertlocID(lon, lat):
    global locationID # access the outer scope variable by declaring it global
    if int(lon) != 0 and int(lat) != 0:
        query_point = Point( lon, lat)
        possible_matches = list(rtree.intersection( query_point.bounds ))
        for i in range(0,len(possible_matches)) :
            if zones.iloc[possible_matches[i]].geometry.contains(query_point) == True :
                locationID = possible_matches[i]
    else:
        locationID = 9999
    
    return locationID

# Load the shapefile, this yields a GeoDataFrame that has a row for each zone
zones = gpd.read_file('data/metadata/taxi_zones.shp')
zones = zones.to_crs({'init':'epsg:4326'})
rtree = zones.sindex

# Open the date change file
df = pd.read_csv("data/Change_date_%s.csv" %(taxi_brand), sep=',', header=None)
dating_schema = [ datetime.strptime(x, '%Y-%m-%d') for x in df[1] ]
for yr in range(0,nb_files):
    if os.path.isfile(clean_dir+taxi_brand+'/'+list_files[yr][len(taxi_brand)+14::]) == False :
        year = int(list_files[yr][len(taxi_brand)+23:len(taxi_brand)+27])
        month = int(list_files[yr][len(taxi_brand)+28:len(taxi_brand)+30])
        date_file = date(year,month,1)
        green_DF = (spark.read
                    .option("sep", ",")
                    .option("header", True)
                    .option("inferSchema", True)
                    .csv(list_files[yr]) )
        for nb_schema in range(0,len(dating_schema)-1):
            Drop_ID = []
            Pick_ID = []
            if date_file >= dating_schema[nb_schema].date() and  date_file < dating_schema[nb_schema+1].date():
                print(date_file)
                if nb_schema+1 == 1 :
                    print("schema 1 for file:",list_files[yr])
                    # Transform LAT-LON in location ID
                    Pickup_list_lat = green_DF.select(f.collect_list('Pickup_latitude')).first()[0]
                    Pickup_list_lon = green_DF.select(f.collect_list('Pickup_longitude')).first()[0]
                    Dropoff_list_lat = green_DF.select(f.collect_list('Dropoff_latitude')).first()[0]
                    Dropoff_list_lon = green_DF.select(f.collect_list('Dropoff_longitude')).first()[0]
                    for i in range(0,len(Pickup_list_lat)):
                        a = convertlocID(Pickup_list_lon[i],Pickup_list_lat[i])
                        Pick_ID.append(a) 
                    for i in range(0,len(Dropoff_list_lat)):
                        a = convertlocID(Dropoff_list_lon[i],Dropoff_list_lat[i])
                        Drop_ID.append(a)
                    # Create the new file
                    green1_DF = DF = green_DF.withColumn("pulocationid",
                                                            f.udf(lambda id: Pick_ID[id])(f.monotonically_increasing_id()))\
                                             .withColumn("dolocationid",
                                                            f.udf(lambda id: Drop_ID[id])(f.monotonically_increasing_id()))\
                                             .withColumn("congestion_surcharge",lit('null'))\
                                             .withColumn("improvement_surcharge",lit('null'))\
                                                        .select(
                                                            col("VendorID").alias("vendorID"),
                                                            col("lpep_pickup_datetime").alias("lpep_pickup_datetime"),
                                                            col("Lpep_dropoff_datetime").alias("lpep_dropoff_datetime"),
                                                            col("Store_and_fwd_flag").alias("store_and_fwd_flag"),
                                                            col("RateCodeID").alias("ratecodeID"),
                                                            "pulocationid",
                                                            "dolocationid",
                                                            col("Passenger_count").alias("passenger_count"),
                                                            col("Trip_distance").alias("trip_distance"),
                                                            col("Fare_amount").alias("fare_amount"),
                                                            col("Extra").alias("extra"),
                                                            col("MTA_tax").alias("mta_tax"),
                                                            col("Tip_amount").alias("tip_amount"),
                                                            col("Tolls_amount").alias("tolls_amount"),
                                                            col("Ehail_fee").alias("ehail_fee"),
                                                            "improvement_surcharge",
                                                            col("Total_amount").alias("total_amount"),
                                                            col("Payment_type").alias("payment_type"),
                                                            col("Trip_type").alias("trip_type"),
                                                            "congestion_surcharge")
                    green1_DF.toPandas().to_csv(clean_dir+taxi_brand+'/'+list_files[yr][len(taxi_brand)+14::])
                elif nb_schema+1 == 2 :
                    print("schema 2")
                    # Transform LAT-LON in location ID
                    Pickup_list_lat = green_DF.select(f.collect_list('Pickup_latitude')).first()[0]
                    Pickup_list_lon = green_DF.select(f.collect_list('Pickup_longitude')).first()[0]
                    Dropoff_list_lat = green_DF.select(f.collect_list('Dropoff_latitude')).first()[0]
                    Dropoff_list_lon = green_DF.select(f.collect_list('Dropoff_longitude')).first()[0]
                    for i in range(0,len(Pickup_list_lat)):
                        a = convertlocID(Pickup_list_lon[i],Pickup_list_lat[i])
                        Pick_ID.append(a) 
                    for i in range(0,len(Dropoff_list_lat)):
                        a = convertlocID(Dropoff_list_lon[i],Dropoff_list_lat[i])
                        Drop_ID.append(a)
                    # Create the new file
                    green2_DF = green_DF.withColumn("pulocationid",
                                                            f.udf(lambda id: Pick_ID[id])(f.monotonically_increasing_id()))\
                                        .withColumn("dolocationid",
                                                            f.udf(lambda id: Drop_ID[id])(f.monotonically_increasing_id()))\
                                        .withColumn("congestion_surcharge",lit('null'))\
                                                        .select(
                                                            col("VendorID").alias("vendorID"),
                                                            col("lpep_pickup_datetime").alias("lpep_pickup_datetime"),
                                                            col("Lpep_dropoff_datetime").alias("lpep_dropoff_datetime"),
                                                            col("Store_and_fwd_flag").alias("store_and_fwd_flag"),
                                                            col("RateCodeID").alias("ratecodeID"),
                                                            "pulocationid",
                                                            "dolocationid",
                                                            col("Passenger_count").alias("passenger_count"),
                                                            col("Trip_distance").alias("trip_distance"),
                                                            col("Fare_amount").alias("fare_amount"),
                                                            col("Extra").alias("extra"),
                                                            col("MTA_tax").alias("mta_tax"),
                                                            col("Tip_amount").alias("tip_amount"),
                                                            col("Tolls_amount").alias("tolls_amount"),
                                                            col("Ehail_fee").alias("ehail_fee"),
                                                            "improvement_surcharge",
                                                            col("Total_amount").alias("total_amount"),
                                                            col("Payment_type").alias("payment_type"),
                                                            col("Trip_type").alias("trip_type"),
                                                            "congestion_surcharge")
                    green2_DF.toPandas().to_csv(clean_dir+taxi_brand+'/'+list_files[yr][len(taxi_brand)+14::])
                elif nb_schema+1 == 3 :
                    print("schema 3")
                    green3_DF = green_DF.withColumn("congestion_surcharge",lit('null'))\
                                                        .select(
                                                            col("VendorID").alias("vendorID"),
                                                            col("lpep_pickup_datetime").alias("lpep_pickup_datetime"),
                                                            col("Lpep_dropoff_datetime").alias("lpep_dropoff_datetime"),
                                                            col("Store_and_fwd_flag").alias("store_and_fwd_flag"),
                                                            col("RateCodeID").alias("ratecodeID"),
                                                            "pulocationid",
                                                            "dolocationid",
                                                            col("Passenger_count").alias("passenger_count"),
                                                            col("Trip_distance").alias("trip_distance"),
                                                            col("Fare_amount").alias("fare_amount"),
                                                            col("Extra").alias("extra"),
                                                            col("MTA_tax").alias("mta_tax"),
                                                            col("Tip_amount").alias("tip_amount"),
                                                            col("Tolls_amount").alias("tolls_amount"),
                                                            col("Ehail_fee").alias("ehail_fee"),
                                                            "improvement_surcharge",
                                                            col("Total_amount").alias("total_amount"),
                                                            col("Payment_type").alias("payment_type"),
                                                            col("Trip_type").alias("trip_type"),
                                                            "congestion_surcharge")
                    green3_DF.toPandas().to_csv(clean_dir+taxi_brand+'/'+list_files[yr][len(taxi_brand)+14::])
                elif nb_schema+1 == 4 :
                    print("schema 4")
                    green_DF.toPandas().to_csv(clean_dir+taxi_brand+'/'+list_files[yr][len(taxi_brand)+14::])
        if date_file == dating_schema[4].date() :
            print("schema LAST")
            green4_DF = green_DF.toPandas().to_csv(clean_dir+taxi_brand+'/'+list_files[yr][len(taxi_brand)+14::])
new_files = len(os.listdir('data/cleaned/'+taxi_brand))
if new_files == nb_files :
    print("All the %i files are well integrated !" %(new_files))
else :
    print("[ERROR] %i files on %i files have been integrated ..." %(new_files, nb_files))

All the 76 files are well integrated !


## Yellow files

For yellow there are 131 files:

 In 2010 - 1 :
 
   12 diff on a total of 18 col: ['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'pickup_longitude', 'pickup_latitude', 'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude', 'fare_amount', 'tip_amount', 'tolls_amount', 'total_amount']
         12/12 column name have changed:
         
 In 2015 - 1 :
 
   6 diff on a total of 19 col: ['vendorid', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'ratecodeid', 'extra', 'improvement_surcharge']
         1/6 col add
         5/6 name change
         
 In 2016 - 7 :
 
   2 diff on a total of 17 col: ['pulocationid', 'dolocationid']
         2/2 col remove
         
 In 2019 - 1 :
 
   1 diff on a total of 18 col: ['congestion_surcharge']
         1/1 col add