# Info-H-600 group work

## 1. Metadata information.

### 1.1 Imports

In [1]:
#imports go here
import os 
import numpy as np
from pyspark.sql import SparkSession
from datetime import datetime as dt
import os
import math
import random


In [2]:
def choose_mode():
    """
    Write "cluster" to set up in cluster mode or in "local" to run the notebook locally
    """
    mode = input("Choose between Cluster or Local mode (write local or cluster in the field): ")
    
    
    if mode.lower() == "local":
        basedir = "./data/sampled"
        print("\nYou have chosen local mode!")
        return(basedir)
        
        
    elif mode.lower() == "cluster":
        basedir = '/home/epb199/data/'
        print("\nYou have chosen cluster mode!")
        return(basedir)
    
    else:
        print("\nError, incorrect entries, select between \"local\" or \"cluster\" !")

        
        
basedir = choose_mode()



if basedir == '/home/epb199/data/':
    
    os.environ['PYSPARK_SUBMIT_ARGS'] ="--conf spark.driver.memory=3g  pyspark-shell"

    os.environ['HADOOP_CONF_DIR']="/etc/hadoop/conf"

    os.environ['PYSPARK_PYTHON']="/usr/local/anaconda3/bin/python"
    os.environ['PYSPARK_DRIVER_PYTHON']="/usr/local/anaconda3/bin/python"


    try: 
        spark
        print("Spark application already started. Terminating existing application and starting new one")
        spark.stop()
    except: 
        pass

    spark = SparkSession \
        .builder \
        .master("yarn") \
        .config("spark.executor.instances","4") \
        .appName("demoRDD") \
        .getOrCreate()


    sc=spark.sparkContext    
    
elif basedir == './data/sampled':
    os.environ['PYSPARK_SUBMIT_ARGS'] ="--conf spark.driver.memory=4g  pyspark-shell"

    try: 
        spark
        print("Spark application already started. Terminating existing application and starting new one")
        spark.stop()
    except: 
        pass

    spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("demoRDD") \
        .getOrCreate()
    sc=spark.sparkContext    



Choose between Cluster or Local mode (write local or cluster in the field): local

You have chosen local mode!


### 1.2 general statistics

#### 1.21 Function to get general statistics

In [3]:
def info_all_sub_data(sub_data, basedir = basedir):
    """
    This function computes the meta-data information about a defined sub-database (i.e. yellow data)
    @sub_data: This parameter must be a string specifying which sub-data we are interested by (i.e. 'yellow')
    @basedir: This parameter specifies where the data folder is. It must be a string.
    """
    start_name = sub_data + "_"
    names = os.listdir(basedir)
    paths = [basedir +"/"+name for name in names if name.startswith(start_name)]
    nbr_files = len(paths)
    sizes = [os.stat(path).st_size for path in paths]
    lengths = [sc.textFile(path).count() for path in paths]
    print("*************************information about the", sub_data, "files*************************", 
          "\nNumber of files", "files:", 
          nbr_files,
          "\n\n**************size information about", sub_data, "files in bytes" + "***********",
          "\nmean:" ,np.mean(sizes),
          "\nmin:", min(sizes),
          "\nmax:", max(sizes),
          "\n 25th quantile:", np.percentile(sizes,25),
          "\n50th quantile:", np.percentile(sizes,50),
          "\n75th quantile:", np.percentile(sizes,75),
          "\n90th quantile:", np.percentile(sizes,90),
          "\n\n**************Number of lines in", sub_data, 
          "files" + "**************",
          "\nmean:" ,np.mean(lengths),
          "\nmin:", min(lengths),
          "\nmax:", max(lengths),
          "\n25th quantile:", np.percentile(lengths,25),
          "\n50th quantile:", np.percentile(lengths,50),
          "\n75th quantile:", np.percentile(lengths,75),
          "\n90th quantile:", np.percentile(lengths,90),
          "\n","\n","\n")

#### 1.22 Metadata statistics

In [4]:
info_all_sub_data("yellow")
info_all_sub_data("green")
info_all_sub_data("fhv")
info_all_sub_data("fhvhv")

*************************information about the yellow files************************* 
Number of files files: 131 

**************size information about yellow files in bytes*********** 
mean: 3750759.6870229007 
min: 43103 
max: 5959352 
 25th quantile: 1756967.0 
50th quantile: 4442047.0 
75th quantile: 5123591.5 
90th quantile: 5491438.0 

**************Number of lines in yellow files************** 
mean: 24204.51145038168 
min: 477 
max: 32301 
25th quantile: 19990.0 
50th quantile: 26295.0 
75th quantile: 29081.0 
90th quantile: 30203.0 
 
 

*************************information about the green files************************* 
Number of files files: 76 

**************size information about green files in bytes*********** 
mean: 262437.2894736842 
min: 2512 
max: 570765 
 25th quantile: 121494.75 
50th quantile: 190194.5 
75th quantile: 456955.0 
90th quantile: 499751.0 

**************Number of lines in green files************** 
mean: 2027.5 
min: 16 
max: 3547 
25th quantile: 135

## 1.3 Analysis of the schema evolution.

Over time, the relational schema associated to each type of trip data (yellow, green, fhv, hvfhv) has changed. Let us analyze the changes.

#### 1.31 Auxiliary functions

In [43]:
# Code to help analyze the schema changes goes here
import os 

def get_schemas(sub_data, basedir = basedir):
    """It outputs the list of tuples which are the month and the schema for a given files in a subdatabase"""
    start_name = sub_data + "_"
    names = os.listdir(basedir)
    paths = [basedir +"/"+name for name in names if name.startswith(start_name)]
    list_of_schemas = []
    for path in paths:
        x = open(path)
        list_of_schemas.append((path[-11: -4], x.readlines()[0].strip().lower()))
    return(sorted(list_of_schemas))


def unique_schema(list_of_schemas):
    """ Compute the unique schemas of a sub-dataset along with the first date where it changed
    The only input is a list of multiple schemas along with their dates (see get_schemas)"""
    schemas = list_of_schemas
    list_of_unique_schema = [schemas[0]]
    for date, schema in schemas:
        if schema.replace(" ", "") != list_of_unique_schema[-1][1]: #important to remove whitespaces
            list_of_unique_schema.append((date, schema))
    return(list_of_unique_schema)


def diff_schemas(sub_data, basedir = basedir):
    """This function returns the different schemas over a period and subdatabase. The date indicates the first
    time that this schema was used
    The only input is the name of the directory that contains the subdatabase"""
    schemas = get_schemas(sub_data, basedir)
    different_schemas = unique_schema(schemas)
    print("There have been", len(different_schemas)-1, "changes in this subdatabase")
    return(different_schemas)

#### 1.32 Analysis of schema changes for fhv cab data files

Analyze the schema changes for the FHV cab data files. Write down your conclusions

In [None]:
fhv_all_schemas = get_schemas('fhv')
unique_schema_fhv = diff_schemas("fhv")
print(unique_schema_fhv)

The changes to `fhv` are the following:
- From January 2015 to December 2016, the database had only 3 columns: the `dispatching base number` which is the TLC Base License number of the base that provided the ride. The `pickup_date`which is the date and time of a given trip and `the location ID` which is the TLC taxi zone where the trip began
- From January 2017 to June 2017, they made some changes & added some columns: they changed `pickup_date` to `pickup_datetime` which is just a change of name, they added a `drop_off_datetime`. They added also the `pulocationid` and the `dolocationid`which are the TLC taxi zone in which the trip began and ended to replace the `locationid`. 
- From July 2017 to December 2017, they added the column `sr_flag` which denotes if the trip was done by a shared ride chain. Note that for Lyft, it does not necessarly indicates that the trip was shared because they also recorded as "1" even if the trip was not shared in the end.
- From January 2018 to December 2018, they only sent to the end of the columns the `dispatching_base_number` and added a `dispatching_base_num` but this column is a blank one and cannot really count as a column.
- From January 2019 to now, they removed the blank column `dispatching_base_num` and moved back the `dispatching base_number` to the start of the columns

#### 1.33 Analysis of schema changes for fhvhv data files

Analyze the schema changes for the FHV cab data files. Write down your conclusions

In [None]:
diff_schemas("fhvhv")

This dataset is pretty recent (started in february 2019) and has not undergo changes. <br>
The colums are the same as `fhv` (the schema of 2019), except that it has a supplementary column at the start `hvfhs_license_num` which gives the information on which companies provided the ride and also the number of the car that provided it

#### 1.34 Analysis of schema changes for green cab data files

Analyze the schema changes for the green taxi data files. Write down your conclusions

In [44]:
diff_schemas("green")

There have been 3 changes in this subdatabase


[('2013-08',
  'vendorid,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,ratecodeid,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,total_amount,payment_type,trip_type'),
 ('2015-01',
  'vendorid,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,ratecodeid,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type'),
 ('2016-07',
  'vendorid,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,ratecodeid,pulocationid,dolocationid,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type'),
 ('2019-01',
  'vendorid,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,ratecodeid,pulocationid,do

The Green database is the second largest and underwent 3 changes of schemas:
- From August 2013 to December to 2014, its first schema was the one used at the same as the schema of yellow between January 2015 to June 2016, at the exceptions of  `vendor_id` was here `vendorid`, `pickup_datetime` was `lpep_pickup_datetime`, `dropoff_datetime` was `lpep_dropoff_datetime`. There was also two supplementary variables, namely `ehail_fee`, which is a blank column (I am not sure though!!!!!!!!) and finally `trip_type` which is 1 if the trip was a street hail or 2 if it was a dispatch.
- From January 2015 to June 2016, the green database underwent the same process as yellow one. Indeed the latitude and longitude columns were replaced by `pulocationid`, `dolocationid`.
- In January 2019, the last change happened to the database where a new column was added, the `congestion_surchage` (same as yellow database).

#### 1.35 Analysis of schema changes for yellow cab data files

Analyze the schema changes for the Yellow taxi data files. Write down your conclusions

In [None]:
diff_schemas("yellow")

The `yellow` database is the oldest and underwent several changes:
- From January 2009 to December 2009, they used `the vendor_name` which is the name of the data_provider (either CMT or LLC). Then, they have the columns `trip_pickup_datetime`, `trip_dropoff_date_time`, `passenger_count` and `trip_distance` which are all self-explanatory. `start_lon` and `start_lat` are respectively the longitute and latitude of the pick-up. Then, there was the rate code which is an integer representing the last fare/rate applied to the trip. The variable `store_and_forward` stores wheter the vehicule downloaded the record because it was offline or not (so it can sent it later). `end_lon` and `end_lat` are the longitude and latitude of the drop-oof. `payment_type` specified how the passenger paid for the trip. `fare_amt` gives the time-and-distance fare in dollar per meter. `surcharge` represent the eventual surchages for rush hour or overnight;  `mta_tax` is the amount of a tax that is automatically triggered when the trip is longer than a given distance. `tip_amt` is the amount of tip (when paid by card, cash tips are not recorded). `tolls_amt` gives the amount of tolls paid and  `total_amt` is the total amount paid for this trip. 
- From January 2010 to December 2014, they changed nearly all the names to make them more explicit, but the order and the meaning of the data.  <br>
`the vendor_name` &#8594; `vendor_id` which is now an integer & not a string anymore; `trip_pickup_datetime` &#8594; `pickup_datetime` ; `trip_dropoff_date_time` &#8594; `dropoff_datetime` ; `start_lon` &#8594; `pickup_longitude` ; `start_lat` &#8594; `pickup_latitude`; `store_and_forward` &#8594; `store_and_fwd_flag`; `end_lon` &#8594; `dropoff_longitude` ; `end_lat` &#8594; `dropoff_latitude` ; `fare_amt` &#8594; `fare_amount`; `tip_amt` &#8594; `tip_amount`; `tolls_amt` &#8594; `tolls_amount`; `total_amt` &#8594; `total_amount`
- From January 2015 to June 2016, the main changes were renaming. `vendor_id` became `vendorid`, `pickup_datetime` and `dropoff_datetime`became `tpep_pickup_datetime` and `tpep_dropoff_datetime`. `rate_code` was also changed to `ratecodeid`. `surchage` was renamed into `extra` to include a new variable `improvement_surchage` which is an extra amount depending on the dropoff location which started in 2015. This new variable was placed between `tolls_amount` and `total_amount`.
- From July 2016 to December 2018, the two variables `pickup_longitude` and `pickup_latitude`  were replaced by a single variable `pulocationid` which reprensent the TLC taxi zone of the pickup. The same happened to `dropoff_longitude` and `dropoff_latitude` which were replaced by `dolocationid`. They wer both placed after `store_and_forward_flag`.
- From January 2019 to now, it was added the variable `congestion surcharge` which is a new surchage if there is a congestion. 


<u> Side note: </u> in the year 2014, the columns of the data were encoded in this manner: vendor_id, trip_pickup_datetime, with a whitespace after the ",", which is the details that might have its importance in the merging section.

## 2. Data integration

For each sub-dataset, write (and execute) code that converts a file (using possibly an old schema) into a file that has the new, latest schema version.

Your conversion code should not modify the original files, but instead create a new file. 2

Be sure to explain the design behind your conversion functions!

The data integration step is highly parallellizable. Therefore, your solution on this part
**must** be written in Spark

In [15]:
test = "2,2013-08-24 22:35:15,2013-08-24 22:45:34,N,1,-73.844230651855469,40.721363067626953,-73.86566162109375,40.720394134521484,3,2.47,10,0.5,0.5,0,0,,11,2".split(",")
test1 = "2,2013-08-31 23:11:03,2013-08-31 23:19:57,N,1,-73.847312927246094,40.754188537597656,-73.857452392578125,40.733768463134766,1,2.75,10.5,0.5,0.5,0,0,,11.5,2".split(",")
test2 = [test, test1]
test3 = sc.parallelize(test2)


In [38]:
test3.map(lambda x: transfo_lon_lat(x[5], x[6])).collect()

[94, 92]

In [39]:
zones = gpd.read_file('metadata/taxi_zones.shp')
zones = zones.to_crs({'init':'epsg:4326'})
rtree = zones.sindex

In [42]:
rtree = zones.sindex

### 2.1 Imports

In [45]:
import glob
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, Polygon

### 2.2 Auxillary functions

In [85]:
zones = gpd.read_file('metadata/taxi_zones.shp')
zones = zones.to_crs({'init':'epsg:4326'})

def transfo_lon_lat(lon, lat, zones = zones):
    """
    This function computes from a list (or a row in a RDD) that specifies the latitude & longitude of a point
    and returns the index of the geographic zone that contains it.
    @lon: it represents the longitude of a given point
    @lat: it represents the latitude of a given point
    """
    rtree = zones.sindex
    
    try: 
        point = Point(float(lon), float(lat))
        psb_zones = list(rtree.intersection(point.bounds))
        out = [zone for zone in psb_zones if zones.iloc[zone].geometry.contains(point)]
        if out == []:
            return("") #if not in any zone
        elif len(out) >1:
            return(out[0]) #if there are multiple zones matching the given point (on a border for example)
        #it selects the first one.
        else:
            return(out[0])
    except:
        return("") #if the input cannot be converted to a float




def separating_schema(sub_data, unique_schema, basedir = basedir):
    """This function returns a list where each element of the list is the input which will be able to separate to
    select the various files that have the same schemas together. Therefore, the output is a list for which the 
    number of element is the number of different schemas for a given datatabase
    The first input is the name of the directory which contains the data about a given subfiles
    The second input is the output of the function unique_schema on the given databases"""
    start_name = sub_data + "_"
    #basedir =  "./home/epb199/data/"
    names = os.listdir(basedir)
    paths = [basedir +"/"+name for name in names if name.startswith(start_name)]
    list_ofpaths = []
    for path in paths:
        if path != './data/'+ sub_data + '/.ipynb_checkpoints':     #j'ai simplifié ici
            #x = open(path)
            list_ofpaths.append((path[-11: -4], path))
   
    separating_list = []
    list_paths = sc.parallelize(sorted(list_ofpaths)).map(lambda x: (dt.strptime(x[0],'%Y-%m'), x[1]))
    
    for sche_idx in range(1,len(unique_schema)):
        list_paths1 = list_paths.filter(lambda x: dt.strptime(unique_schema[sche_idx - 1][0],'%Y-%m' ) <= x[0] <  dt.strptime(unique_schema[sche_idx][0],'%Y-%m' ))
        list_paths1 = list_paths1.map(lambda x : x[1]).reduce(lambda x, y: x+","+y)
        separating_list.append(list_paths1)
    
    list_paths2 = list_paths.filter(lambda x: dt.strptime(unique_schema[len(unique_schema) - 1][0],'%Y-%m' ) <= x[0] )
    list_paths2 = list_paths2.map(lambda x : x[1]).reduce(lambda x, y: x+","+y)
    separating_list.append(list_paths2)
    return(separating_list)



def unified_sub_schema(sub_data, unique_schema, idx, basedir):
    """This function returns a unified RDD with  data that shares the same schema.
    The two inputs are the name of the directory with the subdata & the index that correspond to the number of the
    schema (i.e. the first schema putted in place between 2010 & 2013 will have an index of 0, the following schema
    will have the index 1)"""
    schema = separating_schema(sub_data, unique_schema, basedir )[idx]
    RDD_unified =  sc.textFile(schema)
    RDD_clean = RDD_unified.filter(lambda x: (x.replace(" ", "").lower() != unique_schema[idx][1])) #remove the column names
    return(RDD_clean)




def flatten_nan(x):
    """This function allows to flatten a list of list & encode missing values"""
    z = [val.upper() for sublist in x for val in sublist]
    #w = [None if (val == "") else val for val in z]
    return(z)



def sr_flag_switch(x):
    """
    This function changes the shared flag variable into a binary variable (== 1 if ride was shared 0 if not)
    In the data dictionnary, the ride recorded with "" means that the ride was not shared, so we just switch
    to a binary variable to diffentiate with the recording when the shared_flag was missing.
    """
    if x == "1":
        return(x)
    elif (x == "0") or (x == ""):
        return("0")
    else:
        return(x)

###  2.3 FHV integration

In [None]:
fhv_all_schemas = get_schemas('fhv') 
unique_schema_fhv = diff_schemas("fhv")
print(unique_schema_fhv)

In [None]:
#modifying & joining the various schema


fhv0 = (unified_sub_schema("fhv",unique_schema_fhv,  0, basedir)
        .map(lambda x : x.replace("\"", "").split(","))
        .map(lambda x : [x[0].upper(), x[1].upper(), "", x[2].upper(), "", ""]))


fhv1 = (unified_sub_schema("fhv",unique_schema_fhv,  1, basedir)
        .map(lambda x : x.replace("\"", "").split(","))
        .map(lambda x : flatten_nan([x[0:5], [""]])))


fhv2 = (unified_sub_schema("fhv",unique_schema_fhv,  2, basedir)
        .map(lambda x : x.replace("\"", "").split(","))
        .map(lambda x : flatten_nan([x[:5], [sr_flag_switch(x[5])]])))


fhv3 = (unified_sub_schema("fhv",unique_schema_fhv, 3, basedir)
        .map(lambda x : x.replace("\"", "").split(","))
        .map(lambda x : flatten_nan([[x[5]], x[0:4], [sr_flag_switch(x[4])]]) ))


fhv4 = (unified_sub_schema("fhv",unique_schema_fhv,  4, basedir)
        .map(lambda x : x.split(","))
        .map(lambda x : flatten_nan([x[:5], [sr_flag_switch(x[5])]] )))
        
        

fhv_data = fhv0.union(fhv1).union(fhv2).union(fhv3).union(fhv4)

In [None]:
fhv_data.sample(False, 0.0003).collect() #first look at the data

### 2.4 FHVHV Integration

In [8]:
#loading the different schemas

fhvhv_all_schemas = get_schemas('fhvhv') 
unique_schema_fhvhv = diff_schemas("fhvhv")
print(unique_schema_fhvhv)

There have been 0 changes in this subdatabase
[('2019-02', 'hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,pulocationid,dolocationid,sr_flag')]


In [9]:
fhvhv_data = (unified_sub_schema("fhvhv",unique_schema_fhvhv,  0, basedir)
             .map(lambda x : x.replace("\"", "").split(",")).
              map(lambda x: flatten_nan([x[:6], [sr_flag_switch(x[6])]])))

In [None]:
fhvhv_data.sample(False, 0.0003).collect() #first look at the data

### 2.5 Green Integration

In [47]:
#loading the different schemas

green_all_schemas = get_schemas('green') 
unique_schema_green = diff_schemas("green")
unique_schema_green

There have been 3 changes in this subdatabase


[('2013-08',
  'vendorid,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,ratecodeid,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,total_amount,payment_type,trip_type'),
 ('2015-01',
  'vendorid,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,ratecodeid,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type'),
 ('2016-07',
  'vendorid,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,ratecodeid,pulocationid,dolocationid,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type'),
 ('2019-01',
  'vendorid,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,ratecodeid,pulocationid,do

In [48]:
#modifying & joining the various schema

green0 = (unified_sub_schema("green",unique_schema_green,  0, basedir)
        .map(lambda x : x.replace("\"", "").split(","))
        .map(lambda x : flatten_nan([x[0:5], [str(transfo_lon_lat(x[5], x[6]))], [str(transfo_lon_lat(x[7],x[8]))],
                        x[9:17],["0"], x[17:20], ["0"]])))


green1 = (unified_sub_schema("green",unique_schema_green,  1, basedir)
        .map(lambda x : x.replace("\"", "").split(","))
        .map(lambda x : flatten_nan([x[0:5], [str(transfo_lon_lat(x[5], x[6]))], [str(transfo_lon_lat(x[7],x[8]))],
                        x[9:21], ["0"]])))


green2 = (unified_sub_schema("green",unique_schema_green,  2, basedir)
        .map(lambda x : x.replace("\"", "").split(","))
        .map(lambda x : flatten_nan([x[0:19], ["0"]])))


green3 = (unified_sub_schema("green",unique_schema_green, 3, basedir)
        .map(lambda x : x.replace("\"", "").split(","))
        .map(lambda x : flatten_nan([x[0:20]])))

green_data = green0.union(green1).union(green2).union(green3)

In [None]:
green_data.sample(False, 0.0003).collect() #first look at the data

### Yellow integration

In [76]:
#loading the different schemas

yellow_all_schemas = get_schemas('yellow') 
unique_schema_yellow = diff_schemas("yellow")
unique_schema_yellow

There have been 4 changes in this subdatabase


[('2009-01',
  'vendor_name,trip_pickup_datetime,trip_dropoff_datetime,passenger_count,trip_distance,start_lon,start_lat,rate_code,store_and_forward,end_lon,end_lat,payment_type,fare_amt,surcharge,mta_tax,tip_amt,tolls_amt,total_amt'),
 ('2010-01',
  'vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount'),
 ('2015-01',
  'vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,ratecodeid,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount'),
 ('2016-07',
  'vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,payment_type,fare_amount,extra,mta_tax,tip_a

In [77]:
strip_val_list = lambda x: [val.strip() for val in x] #because one period had whitespace in front of every
#values

In [86]:
yellow0 = (unified_sub_schema("yellow",unique_schema_yellow,  0, basedir)
           .map(lambda x : strip_val_list( x.replace("\"", "").split(","))) 
           .map(lambda x : flatten_nan([x[:5], x[7:9], [str(transfo_lon_lat(x[5],x[6]))], 
                                        [str(transfo_lon_lat(x[9],x[10]))], x[11:17], ["0"], [x[17]], ["0"]])))


                
#[x[0], x[1], x[2], x[3], x[4], x[7], x[8], str(transfo_lon_lat(x[5],x[6])), str(transfo_lon_lat(x[9],x[10])), x[11], x[12], x[13], x[14],  x[15], x[16],'0', x[17], '0']))

yellow1 = (unified_sub_schema("yellow",unique_schema_yellow,  1, basedir)
           .map(lambda x : strip_val_list( x.replace("\"", "").split(",")))
           .map(lambda x : flatten_nan([x[:5], x[7:9], [str(transfo_lon_lat(x[5],x[6]))], 
                                        [str(transfo_lon_lat(x[9],x[10]))], x[11:17], ["0"], [x[17]], ["0"]])))



yellow2 = (unified_sub_schema("yellow",unique_schema_yellow,  2, basedir)
           .map(lambda x : strip_val_list( x.replace("\"", "").split(",")))
           .map(lambda x : flatten_nan([x[:5], x[7:9], [str(transfo_lon_lat(x[5],x[6]))], 
                            [str(transfo_lon_lat(x[9],x[10]))], x[11:19], ["0"]]))) #I removed the x[19], he was out of range



#[x[0], x[1], x[2], x[3], x[4], x[7], x[8], str(transfo_lon_lat(x[5],x[6])), str(transfo_lon_lat(x[9],x[10])), x[11], x[12], x[13], x[14], x[15], x[16], x[17], x[18], '0']


yellow3 = (unified_sub_schema("yellow",unique_schema_yellow, 3, basedir)
           .map(lambda x : strip_val_list( x.replace("\"", "").split(",")))
           .map(lambda x : flatten_nan([x[:17], ["0"]]))) # I completely changed bc there was no more lat & lon variable in that schema
#it should go to 17 no?


yellow4 = (unified_sub_schema("yellow",unique_schema_yellow, 4, basedir) #I replaced the 3 by 4 here
           .map(lambda x : strip_val_list( x.replace("\"", "").split(",")))
           .map(lambda x : flatten_nan([x[ :18]]))) 


yellow_data = yellow0.union(yellow1).union(yellow2).union(yellow3).union(yellow4) #I added the .union(yellow4)

In [None]:
yellow_data.sample(False, 0.0003).collect() #first look at the data

## 3. Data Cleaning

### 3.1 Auxilliary functions

In [49]:
def time_object(x, formatting, min_val, max_val):
    """
    Determine if an object can be converted to a datetime object given an inputted format and if it is the case
    if this object is between two dates.
    
    @x: a python object
    @formatting: the desired datetime format
    @min_val: a string in the format %Y-%m that is the minimum of the date range
    @max_val: a string in the format %Y-%m that is the maximum of the date range
    """
    try: 
        return((type(dt.strptime(x, formatting)) is dt) and (
            dt.strptime(min_val,'%Y-%m-%d') <= dt.strptime(x, formatting) < dt.strptime(max_val,'%Y-%m-%d')))
    except:
        return(False)


    
def integer(x, values):
    """
    Determine if the input is an integer and if it is, determine if it belongs to some specified values.
    
    @x: A python object
    @values: A list of excepted values or None if we only want to determine if the input is an integer and
    nothing else.
    """
    if isinstance(values, list):
        try:
            return(int(x) in values)
        except:
            return(False)
        
        
    elif values is None:
        try:
            return(isinstance(int(x), int))
        except:
            return(False)
        
    
    else:
        try: 
            return(int(x) == values)
        except:
            return(False)

        

def limits(x, limit_min, limit_max):
    """
    Given an input x, it determine if x fits between two numerical values (both not comprised).
    
    @x: a python object
    @limit_min: a float that represents the first value (lower limit) that we should not expect from x
    @limit_max: a float that represents the first value (upper limit) that we should not expect from x
    """
    try:
        return(limit_min < float(x) < limit_max)
    except:
        return(False)
    
    
    
    
def visu_dirty(schemas, total_data, validation_func):
    """
    This function prints for every column present in a grouped dataset all the possible
    broken values that the values in this column can take and count the number of time a particular value
    has been observed. The validity of a value is defined by a validation function.
    
    The function is mainly use to see which columns have broken value & what values they take when they
    are broken and how often do they take each value.
    
    @schemas: is a list of tuples where the first value of the tuple is a date & the second is a schema.
    It is the product of the diff_schema function.
    @total_data: is an rdd where each "row" is a list or a tuple. It is intended to be used on the merged data
    where all the rows share the same schema (aka the last schema)
    @validation func: is a function object which output an iterable of two elements, the first one is a list
    of conditions. The number of those conditions is the same as the number of column in the total_data. 
    Its second element is a "general" rule which is true if all the element in the first element is true.
    (it check the overall validity of the full row.)
    """
    last_schema = schemas[-1][1].split(",")
    dirty_records = total_data.filter(lambda x: not (validation_func(x)[1]))
    for idx, col_name in enumerate(last_schema):
        
        print(col_name+":", "\n", dirty_records.map(lambda x : (x[idx], not (validation_func(x)[0][idx]))).
        filter(lambda x: x[1]).map(lambda x: (x[0], 1)).reduceByKey(lambda x,y : x + y).collect())

        
        
        
def visu_all(schemas, total_data, idxs):
    """
    This function prints the unique values along with the number of observations for each 
    specificed columns of an rdd.
    
    This function is mainly used after the visu_dirty function on the columns that have been identified
    with broken data.
    
    @schemas: is the result of diff_schema on a particular data
    @total_data: is a rdd where each row is an iterable.
    @idx: is the indexes of the columns we want to look at in this particular dataset.
    """
    last_schema = schemas[-1][1].split(",")
    for idx in idxs:
        
        print(last_schema[idx]+":", "\n", total_data.map(lambda x: (x[idx], 1)).
              reduceByKey(lambda x,y : x + y).take(50))

### 3.2 Cleaning of FHV data

#### 3.21 Analysis of valid values

In [None]:
#loading the different schemas

print(unique_schema_fhv)

#### 3.22 Validity rules

In [None]:
#defining the validity rules based on the data dictionnary

def validation_fhv(x):
    dispatching_base_num,pickup_datetime,dropoff_datetime,pulocationid,dolocationid,sr_flag = x
    zone_id = np.arange(1, 264).astype(str)
    
    valid0 = (integer(dispatching_base_num[1:], None)) and (len(dispatching_base_num)== 6) and (
        dispatching_base_num[0] == 'B')
    
    valid1 = time_object(pickup_datetime,'%Y-%m-%d %H:%M:%S', "2014-12-30", "2020-07-02" )
    
    valid2 = (time_object(dropoff_datetime,'%Y-%m-%d %H:%M:%S',  "2014-12-30", "2020-07-02")) or (
        (dropoff_datetime == "") and (time_object(pickup_datetime,'%Y-%m-%d %H:%M:%S', "2014-12-30", "2017-01-01")) )
    
    valid3 = pulocationid in zone_id
    
    valid4 = (dolocationid in zone_id) or (
        (dolocationid == "") and (time_object(pickup_datetime,'%Y-%m-%d %H:%M:%S', "2014-12-30", "2017-01-01")))
    
    valid5 = (sr_flag in ["0", "1"])  or (
        (sr_flag == "") and (time_object(pickup_datetime,'%Y-%m-%d %H:%M:%S', "2014-12-30", "2017-07-01")))
   
    out = [valid0, valid1, valid2, valid3, valid4, valid5]
   
    rules = (valid0 and valid1 and valid2 and valid3 and valid4 and valid5)
    return(out, rules)

#### 3.2.3 Identifying & repairing dirty records

In [None]:
visu_dirty(unique_schema_fhv, fhv_data, validation_fhv)

In [None]:
visu_all(unique_schema_fhv, fhv_data, [5]) #we only check those with an issue here #it also offers a count
#of all the rows in the rdd

In [None]:
def change_sr(x):
    
    if x == "0":
        return(x)
    elif (x > "0") and (integer(x, None)):
        return("1")
    else:
        return("")

In [None]:
repaired_fhv = (fhv_data.
                  map(lambda x: [x[0].strip(), x[1], x[2], x[3], x[4], change_sr(x[5])]))

visu_dirty(unique_schema_fhv, repaired_fhv, validation_fhv)

In [None]:
final_fhv = repaired_fhv.filter(lambda x: validation_fhv(x))
final_fhv.count()

### 3.3 Cleaning FHVHV data

#### 3.31 Analysis of valid values

In [11]:
#loading the different schemas

print(unique_schema_fhvhv)

[('2019-02', 'hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,pulocationid,dolocationid,sr_flag')]


#### 3.32 Validity rules

In [20]:
def license_number(proposed_number):
    available_numbers = ['HV0002','HV0003','HV0004','HV0005']
    return(proposed_number in available_numbers)

 

def validation_fhvhv(x):
    hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,pulocationid,dolocationid,sr_flag = x
    zone_id = np.arange(1, 264).astype(str)
    valid0 = license_number(hvfhs_license_num)
    valid1 = (integer(dispatching_base_num[1:], None)) and (len(dispatching_base_num)== 6) and (dispatching_base_num[0] == 'B')
    valid2 = time_object(pickup_datetime,'%Y-%m-%d %H:%M:%S', "2018-12-30", "2020-07-01" )
    valid3 = time_object(dropoff_datetime,'%Y-%m-%d %H:%M:%S',  "2018-12-30", "2020-07-02")
    valid4 = pulocationid in zone_id
    valid5 = dolocationid in zone_id
    valid6 = (sr_flag == "1") or (sr_flag == "0")
 
    out = [valid0, valid1, valid2, valid3, valid4, valid5, valid6]
    
    rules = (valid0 and valid1 and valid2 and valid3 and valid4 and valid5 and valid6)
    
    return(out, rules)

#### 3.33 Identifying & repairing dirty records

In [15]:
visu_dirty(unique_schema_fhvhv, fhvhv_data, validation_fhvhv)

hvfhs_license_num: 
 []
dispatching_base_num: 
 [('', 3)]
pickup_datetime: 
 []
dropoff_datetime: 
 []
pulocationid: 
 [('265', 20)]
dolocationid: 
 [('265', 9794), ('264', 2)]
sr_flag: 
 []


In [17]:
a = fhvhv_data.count()
a

321819

We cannot fix any of the data above. It will lead to the removal of 10 000 rows on 320 000 in total, which would lead to the removal of 3% of the data.

In [22]:
final_fhvhv = fhvhv_data.filter(lambda x: (validation_fhvhv(x)[1]))
final_fhvhv.count() #we get indeed 310 000 rows after cleaning

312006

### 3.4 Cleaning of GREEN dataset

#### 3.41 Analysis of valid values

In [26]:
print(unique_schema_green)

[('2013-08', 'vendorid,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,ratecodeid,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,total_amount,payment_type,trip_type'), ('2015-01', 'vendorid,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,ratecodeid,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type'), ('2016-07', 'vendorid,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,ratecodeid,pulocationid,dolocationid,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type'), ('2019-01', 'vendorid,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,ratecodeid,pulocationid,dolocationid,

#### 3.42 Validity rules

In [50]:
def validation_green(x):
    """ 
    This function returns True if all the dimension of a record is conformed with the data definitions
    of the green data. 
    
    @x is a list
    """
    vendorid, putime, dotime, store_flag, ratecodeid, puloc, doloc, nbr_pass, distance, fare_mt, extra, mta, tip, tolls, ehail, improve, total, pay_type, trip_type, congestion = x
    zone_id = np.arange(1, 264).astype(str)
    valid0 = integer(vendorid, [1,2])
    valid1 = time_object(putime,'%Y-%m-%d %H:%M:%S', "2013-07-30", "2020-07-01" )
    valid2 = time_object(dotime,'%Y-%m-%d %H:%M:%S',  "2013-07-30", "2020-07-02")
    valid3 = (store_flag == "Y") or (store_flag == "N")
    valid4 = integer(ratecodeid, [1,2,3,4,5,6])
    valid5 = puloc in zone_id
    valid6 = doloc in zone_id
    valid7 = (integer(nbr_pass, None)) and (limits(x[7], 0, 10)) 
    valid8 = limits(distance, 0, 60)
    valid9 = limits(fare_mt, -0.1, 501)
    valid10 = extra in ["0", "0.5", "1"] #extra, je sais pas, je sais que 
    valid11 = mta in ["0", "0.5"]
    valid12 = limits(tip, -0.001, 100)
    valid13 = limits(tolls, -0.001, 50)
    valid14 = limits(ehail, -0.001, 5)
    valid15 = (improve == "0.3") or ((improve == "0") and (time_object(putime,'%Y-%m-%d %H:%M:%S', "2013-07-30", "2015-01-01")))
    valid16 = limits(total, -0.1, 750)
    valid17 = integer(pay_type, [1,2,3,4,5,6])
    valid18 = integer(trip_type, [1,2])
    valid19 = (limits(congestion, -0.0001, 5))
    
    out = [valid0, valid1, valid2, valid3, valid4, valid5, valid6, valid7, valid8, valid9, valid10, valid11, valid12, valid13, valid14, valid15, valid16, valid17, valid18, valid19]
    
    rules = (valid0 and valid1 and valid2 and valid3 and valid4 and valid5 and valid6 and valid7 and valid8
            and valid9 and valid10 and valid11 and valid12 and valid13 and valid14 and valid15 and valid16 and valid17 and 
            valid18 and valid19)
    
    return(out, rules)


a = "2,2020-06-09 09:23:05,2020-06-09 09:33:07,N,1,74,151,1,2.01,9,0,0.5,0,0,,0.3,9.8,2,1,0".split(",")
validation_green(a)[0]


a = "2,2020-06-09 09:23:05,2020-06-09 09:33:07,N,1,74,151,1,2.01,9,0,0.5,0,0,,0.3,9.8,2,1,0".split(",")
validation_green(a)[0]
    



[True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 False,
 True,
 True,
 True,
 True,
 True]

#### 3.43 Identifying dirty records & repairing

In [51]:
#we identify first what are the possible "dirty values" that each columns can take
visu_dirty(unique_schema_green, green_data, validation_green)

vendorid: 
 [('', 512)]
lpep_pickup_datetime: 
 [('2009-01-01 01:41:47', 1)]
lpep_dropoff_datetime: 
 [('2009-01-01 02:24:53', 1)]
store_and_fwd_flag: 
 [('', 512)]
ratecodeid: 
 [('', 512), ('99', 1)]
pulocationid: 
 [('', 209), ('265', 24), ('264', 90), ('0', 1)]
dolocationid: 
 [('', 392), ('265', 114), ('264', 89), ('0', 32)]
passenger_count: 
 [('', 512), ('0', 72)]
trip_distance: 
 [('-15.34', 1), ('.00', 2166), ('56.35', 1)]
fare_amount: 
 [('-140', 1), ('200', 2), ('499', 1), ('190', 1), ('131.5', 1), ('170', 1), ('150', 8), ('114', 2), ('130', 1), ('109', 1), ('165', 1), ('108.5', 1), ('-3', 33), ('145.5', 1), ('400', 2), ('103.5', 1), ('419.88', 1), ('-12', 1), ('-2.5', 86), ('1200.5', 1), ('115.5', 1), ('300', 1), ('-25', 2), ('-52', 8), ('-5', 21), ('-30', 2), ('124.5', 2), ('169.5', 1), ('125', 3), ('110.5', 1), ('495', 1), ('-4', 22), ('-32', 1), ('107', 1), ('-7.5', 3), ('160', 2), ('-22.3', 1), ('160.5', 1), ('-5.5', 14), ('-3.5', 23), ('112.5', 1), ('100.5', 3), ('134.

In [73]:
#visu_all(unique_schema_green, green_data, [8, 14, 18,19]) #we only check those with an issue here
green_data.count()

154014

In [74]:
for i in green_data:
    print(i)

TypeError: 'RDD' object is not iterable

We see the only value we can really repair is the trip distance problem & we can also remove the column ehail fee because it is always a missing value. Instead of removing it, we will instead put all its values to 0. We can not unfortunatly fix the issue for trip type & congestion surchage. We are now gonna apply this knowledge to fix partly the data.

In [66]:
def abs_str(x):
    try:
        z = str(abs(int(x)))
        return(z)
    except:
        z = str(abs(float(x)))
        return(z)

In [72]:
#we essentially drop the columns "ehail" & we also drop all rows with missing/non-conform values
def abs_str(x):
    """convert negative values to positive values & then retransform them to a string"""
    try:
        z = str(abs(int(x)))
        return(z)
    except:
        z = str(abs(float(x)))
        return(z)
    
    
    
repaired_green = (green_data.
                  map(lambda x: flatten_nan([x[:8], ["0" if x[8] == ".00" else abs_str(x[8])], 
                                             [abs_str(x[9])] ,[x[10]], [abs_str(x[11])], [abs_str(x[12])], 
                                            [abs_str(x[13])], ["0"] , [abs_str(x[15])], [abs_str(x[16])], x[17:]])))



In [None]:
visu_dirty(unique_schema_green, repaired_green, validation_green)

In [None]:
final_green = repaired_green.filter(lambda x: validation_green(x)[1])
final_green.count()

### 3.5 Cleaning of yellow data

#### 3.51 Analysis of valid values

#### 3.52 Validity rules

In [75]:
def validation_yellow(x):
    vendorid, putime, dotime, nbr_pass, distance, ratecodeid, store_flag, puloc, doloc, pay_type,  fare_mt, extra, mta, tip, tolls, improve, total, congestion = x
    zone_id = np.arange(1, 264).astype(str)
    valid0 = integer(vendorid, [1,2])
    valid1 = time_object(putime,'%Y-%m-%d %H:%M:%S', "2008-12-30", "2020-07-02" )
    valid2 = time_object(dotime,'%Y-%m-%d %H:%M:%S',  "2008-12-30", "2020-07-02")
    valid3 = (integer(nbr_pass, None)) and (limits(nbr_pass, 0, 10))
    valid4 = limits(distance, 0, 100)
    valid5 = integer(ratecodeid, [1,2,3,4,5,6])
    valid6 = (store_flag == "Y") or (store_flag == "N")
    valid7 = puloc in zone_id
    valid8 = doloc in zone_id
    valid9 = pay_type in ["1", "2", "3", "4", "5"," 6"]
    valid10 = limits(fare_mt, -0.1 , 100)
    valid11 = extra in ["0", "0.5", "1"]
    valid12 = mta in ["0", "0.5"]
    valid13 = limits(tip, -0.001, 20)
    valid14 = limits(tolls, -0.001, 20)
    valid15 = (improve == "0.3") or (improve == "0" and time_object(putime,'%Y-%m-%d %H:%M:%S', "2008-12-30", "2015-01-01"))
    valid16 = limits(total, -0.1 , 100)
    valid17 = (limits(congestion, -0.0001, 5))
    
    out = [valid0, valid1, valid2, valid3, valid4, valid5, valid6, valid7, valid8, valid9, 
           valid10, valid11, valid12, valid13, valid14, valid15, valid16, valid17]
    
    rules = (valid0 and valid1 and valid2 and valid3 and valid4 and valid5 and valid6 and valid7 and valid8
            and valid9 and valid10 and valid11 and valid12 and valid13 and valid14 and valid15 and valid16 and 
             valid17)
    
    return(out, rules)


a = "1,2020-06-09 09:23:05,2020-06-09 09:33:07,1,10,3,Y,50,50,1,0.5,0,0,0.5,0.5,0.3,5.2,3.9".split(",")
not (validation_yellow(a)[1])

False

#### 3.53 Identifying dirty records & repairing

In [None]:
visu_dirty(unique_schema_yellow, yellow_data, validation_yellow)

vendorid: 
 [('', 505), ('CMT', 1011211), ('DDS', 28264), ('3', 8), ('VTS', 1028129), ('4', 1508)]
tpep_pickup_datetime: 
 []
tpep_dropoff_datetime: 
 []
passenger_count: 
 [('', 505), ('0', 12055), ('208', 3)]
trip_distance: 
 [('60.259999999999998', 1), ('50.60', 1), ('96.26', 1), ('500.00', 1), ('81.700000000000003', 1), ('62.799999999999997', 2), ('61.200000000000003', 1), ('54.75', 1), ('57.20', 1), ('71.90', 1), ('494.40', 1), ('55.17', 1), ('83.10', 1), ('93.930000000000007', 1), ('70', 1), ('250.10', 1), ('51.95', 1), ('60.200000000000003', 1), ('71.700000000000003', 1), ('50.299999999999997', 1), ('84.5', 1), ('58.700000000000003', 1), ('59', 1), ('55.219999999999999', 1), ('71.07', 1), ('-7.64', 1), ('88.099999999999994', 1), ('0', 13986), ('77.299999999999997', 1), ('59.700000000000003', 1), ('300833.10', 1), ('64', 1), ('57.100000000000001', 1), ('145.90', 1), ('54.70', 1), ('86.00', 1), ('74.629999999999995', 1), ('90', 1), ('50.020000000000003', 1), ('79.299999999999997',

In [None]:
#I think DDS is just another third suppliers of data (that existed before, I would not diregard it for that,
#trip distance converted to positive
#store  & fwd flag, I should watch the visu_all on it, bu I would assume if missing, it is bc Not forwarded
#otherwise It would be weird, also 0 are not forwarded (or atleast the most observed value) & 1 to be forwarded

visu_all(unique_schema_yellow, yellow_data, [8, 14, 18,19]) #needs to be adapted to yellow data

In [None]:
def name_to_id(name):
    """
    This function transforms the vendor name to its corresponding ID"""
    x = name.upper()
    
    if (x == 'CMT') or (x == '1') :
        return("1")
    
    elif (x == 'VTS') or (x == "2"):
        return("2")
    
    else:
        return("")

    

def convert_pay_type(pay_type):
    x = pay_type.upper()
    
    if (x == 'CREDIT') or (x == 'CRE') or (x ==  "1"):
        return('1')
    
    elif (x == 'CASH') or (x == "CAS") or (x == "2"):
        return('2')
    
    elif (x == 'NO CHARGE') or (x == 'NO') or (x =='3'):
        return('3')
    
    elif (x == 'DISPUTE') or (x == "DIS") or (x == "4"):
        return('4')
    
    elif (x == '5'):
        return('5')
    
    elif (x == '6'):
        return('6')

    else:
        return("")



    

In [None]:
#repairement of broken data in the vendor_id & pay_type & dropping all the missingg/ill-defined records

repaired_yellow = (yellow_data.map
                   (lambda x: flatten_nan([[name_to_id(x[0])], x[1:9], [convert_pay_type(x[9])], x[10:]])))

visu_dirty(unique_schema_yellow, repaired_yellow, validation_yellow)

In [None]:
final_yellow = repaired_yellow.filter(lambda x: validation_yellow(x)[1])
final_yellow.count()

## Part 4: Analysis

### 4.1 Auxillary functions

In [None]:
from matplotlib import pyplot as plt
import seaborn as sns
import pandas as pd


def to_month(x, idx_date,  idx):
    """
    This function changes the date from normal frequency to monthly frequency & select only the
    columns of interest to solve the particular problem at hand
    x is list
    idx_date is the index (an integer) in the list x where there is a date object
    idx is a list with the indexes of the particular values we want to keep for the analysis
    """
    val = [x[i] for i in idx]
    datum = dt.strptime(x[idx_date], '%Y-%m-%d %H:%M:%S')
    date_month = dt.strftime(datum, '%Y-%m')
    out = (date_month, val)
    return(out)

b = ["2020-06-15 09:23:05", "truc_imp", "pasimp", "superimp", "notimp"]
to_month(b, 0, [1,3])



def to_month(x, idx_date,  idx):
    """
    This function changes the date from normal frequency to monthly frequency & select only the
    columns of interest to solve the particular problem at hand
    x is list
    idx_date is the index (an integer) in the list x where there is a date object
    idx is a list with the indexes of the particular values we want to keep for the analysis
    """
    val = [x[i] for i in idx]
    datum = dt.strptime(x[idx_date], '%Y-%m-%d %H:%M:%S')
    date_month = dt.strftime(datum, '%Y-%m')
    out = (date_month, val)
    return(out)

def to_month(x):
    datum = dt.strptime(x, '%Y-%m-%d %H:%M:%S')
    date_month = dt.strftime(datum, '%Y-%m')
    return(date_month)
def plotting(datas, title, colors = ["yellow", "green", "orange", "red"], 
             labels = ["Yellow Taxis", "Green Taxis", "FHV", "FHVHV"], title_position = (0.4, 1.05)
            , legend_position = "lower right"):
    """
    This function plot time series with some predefined aestetics parameters.
    
    @datas is a list containing lists of tuples with tuples of the following form: (date, value). 
    It is important to always put the dataset with the most values first.
    @title is the title of the plot
    @colors is list containing the color of the different datasets. It must have the same number
    of elements as the data arguments (same number of lists)
    @labels are the labels of the different series to be printed in the legend of the graph.
    It must have the same number of elements as the data arguments (same number of lists).
    @title position is a tuple with two elements, the first one specify its position on the x-axis
    & the second on the y-axis (represented by floats)
    @legend_position specifies the position of the legend position among different possibilities
    (e.g. "upper right")
    """
    fig, axs = plt.subplots()
    sns.set_style("white")
    axs.spines["bottom"].set_color("Gray")
    axs.spines["left"].set_color("Gray")
    axs.spines["bottom"].set_alpha(0.7)
    axs.spines["left"].set_alpha(0.7)
    sns.despine()
    axs.set_title(title, x = title_position[0], y = title_position[1], 
                  fontsize = 16, alpha = 0.7, color = "gray")
    plt.xticks(fontsize = 8, color = "gray", alpha = 0.7)
    plt.yticks(fontsize = 8, color = "gray", alpha = 0.7)
    
    for data, color, label in zip(datas, colors, labels):
        date, count = map(tuple, zip(*data))
        axs.plot(date, count, alpha = 0.35, c = color, linewidth = 4, label = label )
        
    plt.legend(loc = legend_position);


### 4.2 Answers

In [None]:
#1. Monthly total number of trips per service

grouped_data = (final_yellow, final_green, final_fhv, final_fhvhv)
idx_dates = (1,1,1,1) #je mets 1 pcq les autres pas tjrs des do_time
idx_nbr_trip = (1, 1, 1, 1)
nbr_trip_per_month = []
#je pense que dans les to_month, il faut pas mettre les x d'abord et puis prier pour qu'ils acceptent
#les valeurs de la loop, pls jesus ==> it does, but you have to provide a lambda function, idk why
for data, idx_date, idx_val in zip(grouped_data, idx_dates, idx_nbr_trip):
    nbr_trip_per_month.append(data.map(lambda x: to_month(x, idx_date = idx_date, idx = [idx_val]))
                              .countByKey().sortByKey().collect())

    

plotting(nbr_trip_per_month, "Number of trips per month per service")




In [None]:
#2. Monthly number of trips in Manhattan and Brooklyn grouped per dataset type

#we first create a lookup table for the locationID that are in Manhattan & Brooklyn

zones = gpd.read_file('taxi_zones.shp') 
zones = zones.to_crs({'init':'epsg:4326'})
man_bro_lookup = list(zones.loc[(zones.borough == "Manhattan") | (zones.borough == "Brooklyn"), 
                                "LocationID"].astype(str))


In [None]:
idx_pulocs = [7, 5, 3, 5]#careful green has one col less so the index can be different than in validation green
idx_dolocs = [8, 6, 4, 6] #this is not true anymore, i kept the ehail column, just set it to 0

nbr_trip_man_bro = []

for data, idx_date, idx_puloc, idx_doloc in zip(grouped_data, idx_dates, idx_pulocs, idx_dolocs):
    (nbr_trip_man_bro.append(data.
     filter(lambda x: (x[idx_puloc] in man_bro_lookup) and (x[idx_doloc] in man_bro_lookup)). 
     map(lambda x: to_month(x, idx_date = idx_date, idx = [idx_puloc])).countByKey().sortByKey().collect())) 



plotting(nbr_trip_man_bro, "Monthly number of trips in Manhattan and Brooklyn")

In [None]:
#3. Monthly total receipts per dataset type

grouped_taxi_data = [final_yellow, final_green]
idx_dates_small = [1,1]
idx_totals = [[13,16], [12,16]]#careful green has one col less so the index can be different than in validation green


#maybe we should check what is included in the total for each
total_receipts = []

for data, idx_date, idx_val in zip(grouped_taxi_data, idx_dates_small, idx_totals):
    (total_receipts.append(data.
     map(lambda x: to_month(x, idx_date = idx_date, idx = idx_val)).
                           map(lambda x: (x[0], float(x[1][1]) - float(x[1][0]))).
                           reduceByKey(lambda x,y : x+y).sortByKey().collect()))
    
plotting(total_receipts, "Monthly Total Receipts Per Service")

In [None]:
#4. average trip receipts per dataset type


#maybe we should check what is included in the total for each
avg_receipts = []



for data, idx_date, idx_val in zip(grouped_taxi_data, idx_dates_small, idx_totals):
    (avg_receipts.append(data.
     map(lambda x: to_month(x, idx_date = idx_date, idx = idx_val)).
                           map(lambda x: (x[0], float(x[1][1]) - float(x[1][0]))).
                           aggregateByKey((0,0), lambda x,y : (x[0] + y, x[1] + 1), lambda x,y : (x[0] + y[0], x[1]+ y[1]))
                           mapValues(lambda x : x[0]/x[1].sortByKey().collect()))
    
plotting(avg_receipts, "Monthly Average Receipts Per Trip")

In [None]:
#5 Average cost per in-progress-minute 

def cost_per_min(x):
    cost = float(x[1][3]) - float(x[1][2])
    duration = dt.strptime(x[1][1], "%Y-%m-%d %H:%M:%S") - dt.strptime(x[1][0], "%Y-%m-%d %H:%M:%S")
    duration_in_minutes = duration.days * 60 * 24 + duration.seconds/ 60
    try:
        cost_min = cost/ duration_in_minutes #if duration is 0
    except:
        cost_min = "div_by_0"
    return((x[0], cost_min))


def agg_cost_min(x, y):
    
    if (y == "div_by_0"):
        out = (x[0], x[1])
        return(out)
    else :
        out = (x[0] + y, x[1] + 1)
        return(out)
    


avg_cost_per_min = []
idx_cost_min = [[1,2,13,16], [1,2, 12,16]]


for data, idx_date, idx_val in zip(grouped_taxi_data, idx_dates_small, idx_cost_min):
    (avg_cost_per_min.append(data.
     map(lambda x: to_month(x, idx_date = idx_date, idx = idx_val)).
                           map(cost_per_min).
                           aggregateByKey((0,0), agg_cost_min, lambda x,y : (x[0] + y[0], x[1]+ y[1]))
                           mapValues(lambda x : x[0]/x[1].sortByKey().collect()))
    
plotting(avg_cost_per_min, "Monthly Average Cost Per Minute")

In [None]:
#6 Average tip per trip

avg_tips = []

idx_tips = [13, 12]

for data, idx_date, idx_val in zip(grouped_taxi_data, idx_dates_small, idx_tips):
    (avg_receipts.append(data.
     map(lambda x: to_month(x, idx_date = idx_date, idx = [idx_val])).
                           map(lambda x: (x[0], float(x[1][0])))).
                           aggregateByKey((0,0), lambda x,y : (x[0] + y, x[1] + 1), lambda x,y : (x[0] + y[0], x[1]+ y[1]))
                           mapValues(lambda x : x[0]/x[1].sortByKey().collect()))
    
plotting(avg_receipts, "Monthly Average Tips Per Trip")

In [None]:
#7 Median Monthly  trip speed grouped per service & borough (il faut aussi la locationid)

idx_dist_time = [[1, 2, 4, 7, 8], [1, 2, 8, 5, 6]] #in order pu_time, do_time, distance, pu_id, do_id

#how to do the borough thing? if both puloc & doloc are in the borough? donc first filtrer que les intraborough?

#map(if (name of borough x & y) in list[id in borough] (list id in borough à faire (tu fais un for in unique_value et
#puis tu automatises)), else (donc ceux qui sont pas dans les 2), puis filter if diff de la valeur que tu 
#mets dans le else, puis tu fais un key rdd (où la key est month (reconverti en str) + borough)
#for the median there is a function that can help us (which approximate basically)
#then I will have for one of the dataset, all the month with all the neighboorhood
#then group by neighboorhood, cut that big rdd into smaller one based on that neighboorhood (so you 
# remove the col neighboorhood, bc it is in the name now, sortby date, & then you plot (date & median) that by facetgrid
#so basically, all that concerns the yellow taxi are on graph, & those on the green are on another one


#je pense que pour le plot, il faut faire le facet_grid this time

In [None]:
def finding_boro(x, y,  zones_id):
        borough1 = zones_id.loc[int(x) == (zones_id.LocationID)].borough.iloc[0]
        borough2 = zones_id.loc[int(y) == (zones_id.LocationID)].borough.iloc[0]
        if borough1 == borough2:
            return(borough1)
        
        else: 
            return("remove")
    

def time_diff(x, y):
    """ Gives the time difference between two datetime object in hours"""
    diff = dt.strptime(x, "%Y-%m-%d %H:%M:%S") - dt.strptime(y, "%Y-%m-%d %H:%M:%S")
    diff_in_hour = diff.days *24 + diff.seconds/3600
    return(diff_in_hour)

In [None]:
zones_id = zones[["LocationID", "borough"]]


In [None]:
idx_dist_time = [[1,2,4,7,8], [1,2,8,5,6]] #pu_time, do_time, dist, pu_id, do_id

yellow_neigh = (final_yellow.filter(lambda x: to_month(x, idx_small_dates[0], idx = idx_dist_time[0])).
                map(lambda x: (x[0], [ time_diff(x[1][1], x[1][0]), float(x[1][3]), finding_boro(x[1][3], x[1][4], zones_id)])).
                filter(lambda x: x[1][2] != "remove" ).map(lambda x: (x[0],  x[1][2], (x[1][1])/ (x[1][0]))).
                toDF(["date", "neighborhood", "speed"]))

fig, axs = plt.subplots()
sns.set_style("white")
axs.spines["bottom"].set_color("Gray")
axs.spines["left"].set_color("Gray")
axs.spines["bottom"].set_alpha(0.7)
axs.spines["left"].set_alpha(0.7)
sns.despine()
axs.set_title("Median Trip Speed Per Neighborhood \nof the Yellow Taxis", x = 0.4, y = 1.05,
                 fontsize = 16, alpha = 0.7, color = "gray")
plt.xticks(fontsize = 8, color = "gray", alpha = 0.7)
plt.yticks(fontsize = 8, color = "gray", alpha = 0.7)


colors = ["yellow", "green", "orange", "red", "brown", "salmon"]

for i, color  in zip(zones_id.borough.unique(), colors):
    (yellow_neigh[yellow_neigh.neighborhood == i].grouby(["date"]).
     approxQuantile("speed", [0.5], 0.5).toPandas().sortby("date").
     plot(x = "date", y = "speed", alpha = 0.35, c = color,  label = i))

In [None]:
idx_dist_time = [[1,2,4,7,8], [1,2,8,5,6]] #pu_time, do_time, dist, pu_id, do_id

green_neigh = (final_green.filter(lambda x: to_month(x, idx_small_dates[1], idx = idx_dist_time[1])).
                map(lambda x: (x[0], [ time_diff(x[1][1], x[1][0]), float(x[1][3]), finding_boro(x[1][3], x[1][4], zones_id)])).
                filter(lambda x: x[1][2] != "remove" ).map(lambda x: (x[0],  x[1][2], (x[1][1])/ (x[1][0]))).
                toDF(["date", "neighborhood", "speed"]))

fig, axs = plt.subplots()
sns.set_style("white")
axs.spines["bottom"].set_color("Gray")
axs.spines["left"].set_color("Gray")
axs.spines["bottom"].set_alpha(0.7)
axs.spines["left"].set_alpha(0.7)
sns.despine()
axs.set_title("Median Trip Speed Per Neighborhood \nof the Green Taxis", x = 0.4, y = 1.05,
                 fontsize = 16, alpha = 0.7, color = "gray")
plt.xticks(fontsize = 8, color = "gray", alpha = 0.7)
plt.yticks(fontsize = 8, color = "gray", alpha = 0.7)


colors = ["yellow", "green", "orange", "red", "brown", "salmon"]

for i, color  in zip(zones_id.borough.unique(), colors):
    (green_neigh[green_neigh.neighborhood == i].grouby(["date"]). #j'ai peur que certains n'ait aucun trip ds certains neighboorhood
     approxQuantile("speed", [0.5], 0.5).toPandas().sortby("date").
     plot(x = "date", y = "speed", alpha = 0.35, c = color,  label = i))

il faudrait réflechir au potentiel bug, genre division par 0 et d'autres choses de  genre!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

In [30]:
sc.stop()