# Data integration

For each sub-dataset, write (and execute) code that converts a file (using possibly an old schema) into a file that has the new, latest schema version.

Your conversion code should not modify the original files, but instead create a new file. 2

Be sure to explain the design behind your conversion functions!

The data integration step is highly parallellizable. Therefore, your solution on this part
**must** be written in Spark

$\color{red}{\text{ASSUMPTIONS!!!.}}$ 
<br>
$\color{red}{\text{You must run the t1_explore notebook before running this notebook. }}$ 
<br>
$\color{red}{\text{The taxi zone folder must be in thesame location as this notebook. }}$ 

In [None]:
from pyspark.sql import SparkSession

#if a spark session was already started, we stop it before starting a new one
#(there can be only one spark context per jupyter notebook)
try: 
    spark
    print("Spark application already started. Terminating existing application and starting new one")
    spark.stop()
except: 
    pass

# Create a new spark session (note, the * indicates to use all available CPU cores)
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("TLC") \
    .getOrCreate()
    
sc=spark.sparkContext

In [None]:
# defining some key functions
#generator function
def file_content(main_folder):
    file_list = os.listdir(main_folder)
    for file_name in file_list:
        yield file_name

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
# Function to fix column names

def column_name_fix(df):
    '''Function converts column names to lowercase'''
    colnames = df.columns
    for x in range(len(colnames)):
        colnames[x] =  colnames[x].lower()
    df = df.toDF(*colnames)
    return df

In [None]:
# Function to drop column
def column_drop(df,drop_list):
    '''Function receives a spark dataframe and list of columns to be dropped.
    It returns a dataframe less the columns specified to be dropped'''
    y = []
    for x in drop_list:
        y.append(df.columns[x])
    return df.drop(*y)

In [None]:
# Function to rename column
def column_rename(df,col_list1,col_list2):
    '''Function renames the column name of a dataframe with a provided list of column names.
        The two lists must be of the order i.e. value one in list one replaces value one in list two and so on'''
    if len(col_list1) == len(col_list2):
        for x in range(len(col_list1)):
            df = df.select('*', df[col_list1[x]].alias(col_list2[x]))
        df = column_drop(df,col_list1)
        return df
    else:
        print('length of two list must be thesame.')
        pass

In [None]:
# Add columns to a dataframe
from pyspark.sql.functions import lit
def column_add(df,col_list):
    
    for x in col_list:
        df = df.withColumn(x, lit(""))
    return df


In [None]:
# Function to convert latitude and longitude to location ID

Loading the shapefile using geopandas (note: GeoPandas is not installed by default. If you use anaconda, you can install it by simply running conda install geopandas. 

In [None]:
pip install geopandas

In [None]:
import matplotlib.pyplot as plt 
import geopandas as gpd
from shapely.geometry import Point, Polygon

In [None]:
# Load the shapefile, this yields a GeoDataFrame that has a row for each zone
zones = gpd.read_file('./taxi_zones/taxi_zones.shp')

In [None]:
# Now re-project the coordinates to the CRS EPSG 4326, which is the CRS used in GPS (https://epsg.io/4326)
zones = zones.to_crs({'init':'epsg:4326'})

In [None]:
pip install pygeos

In [None]:
# create an R-tree index on it's geometry

rtree = zones.sindex

In [None]:
def location_id(df1,zones):
    n = len(df1)
    for i in range(n):
        query_point = Point( float(df1.iloc[i].pickup_longitude), float(df1.iloc[i].pickup_latitude))
        possible_matches = list(rtree.intersection( query_point.bounds ))
       
        for x in possible_matches:
            if zones.iloc[x].geometry.contains(query_point)==True:
                df1.pulocationid[i] = zones.iloc[x].LocationID
        
        query_point2 = Point( float(df1.iloc[i].dropoff_longitude), float(df1.iloc[i].dropoff_latitude))
        possible_matches = list(rtree.intersection( query_point2.bounds ))
        for x in possible_matches:
            if zones.iloc[x].geometry.contains(query_point2)==True:
                df1.dolocationid[i] = zones.iloc[x].LocationID
    
    return df1   

In [None]:
# Creating a folder to hold integrated files
# Check wether folder exist if not create
import os
if os.path.exists('Files/integrated_files'):
    pass
else:
    os.mkdir('Files/integrated_files')

### Integrating FHV files

In [None]:
#FHV taxi files

# Check wether folder exist if not create
if os.path.exists('Files/integrated_files/FHV'):
    pass
else:
    os.mkdir('Files/integrated_files/FHV')

# Schema One

import os
if os.path.exists('Files/integrated_files/FHV/Schema_v_1'):
    pass
else:
    os.mkdir('Files/integrated_files/FHV/Schema_v_1')
# Columns to be renamed
col_list1 = [1]
col_list2 = ['pickup_datetime']

# columns to be added
col_add = ['dropoff_datetime', 'pulocationid', 'dolocationid', 'sr_flag', 'dispatching_base_number']

# columnns to be dropped
col_drop = [2]

folder_path = './Files/FHV/v_1'
for file in file_content(folder_path):
        file_path = os.path.join('Files/FHV/v_1', file)
        df = column_name_fix(spark.read.csv(file_path, header=True))# The read process pass via column_name_fix function
        df = column_drop(df, col_drop)
        df = column_rename(df,col_list1,col_list2)
        df = column_add(df,col_add)
        df.toPandas().to_csv(os.path.join('Files/integrated_files/FHV/Schema_v_1', file),index=False)
        
     

In [None]:
#FHV taxi files
# Schema Two

import os
if os.path.exists('Files/integrated_files/FHV/Schema_v_2'):
    pass
else:
    os.mkdir('Files/integrated_files/FHV/Schema_v_2')
    
# Columns to be renamed
col_list1 = []
col_list2 = []

# columns to be added
col_add = ['sr_flag', 'dispatching_base_number']

# columnns to be dropped
col_drop = []

folder_path = './Files/FHV/v_2'
for file in file_content(folder_path):
        file_path = os.path.join('Files/FHV/v_2', file)
        df = column_name_fix(spark.read.csv(file_path, header=True))# The read process pass via column_name_fix function
        df = column_drop(df, col_drop)
        df = column_rename(df,col_list1,col_list2)
        df = column_add(df,col_add)
        df.toPandas().to_csv(os.path.join('Files/integrated_files/FHV/Schema_v_2', file),index=False)


In [None]:
#FHV taxi files
# Schema Three

import os
if os.path.exists('Files/integrated_files/FHV/Schema_v_3'):
    pass
else:
    os.mkdir('Files/integrated_files/FHV/Schema_v_3')
    
# Columns to be renamed
col_list1 = []
col_list2 = []

# columns to be added
col_add = ['dispatching_base_number']

# columnns to be dropped
col_drop = []

folder_path = './Files/FHV/v_3'
for file in file_content(folder_path):
        file_path = os.path.join('Files/FHV/v_3', file)
        df = column_name_fix(spark.read.csv(file_path, header=True))# The read process pass via column_name_fix function
        df = column_drop(df, col_drop)
        df = column_rename(df,col_list1,col_list2)
        df = column_add(df,col_add)
        df.toPandas().to_csv(os.path.join('Files/integrated_files/FHV/Schema_v_3', file),index=False)
 

In [None]:
# FHV taxi files 
# Schema Four
import shutil
if os.path.exists('Files/integrated_files/FHV/Schema_v_4'):
    pass
else:
    os.mkdir('Files/integrated_files/FHV/Schema_v_4')
    
# moving the files to 
for file in os.listdir('./Files/FHV/v_4'):
    shutil.copy2(os.path.join('Files/FHV/v_4', file), 'Files/integrated_files/FHV/Schema_v_4')

### Integrating green files

In [None]:
# green taxi files

# Check wether folder exist if not create
if os.path.exists('Files/integrated_files/green'):
    pass
else:
    os.mkdir('Files/integrated_files/green')


# schema one
import os
if os.path.exists('Files/integrated_files/green/Schema_v_1'):
    pass
else:
    os.mkdir('Files/integrated_files/green/Schema_v_1')
    
# Columns to be renamed
col_list1 = []
col_list2 = []

# columns to be added
col_add = ['pulocationid', 'dolocationid','improvement_surcharge', 'congestion_surcharge']

# columnns to be dropped
col_drop = ['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude']

folder_path = './Files/green/v_1'
for file in file_content(folder_path):
        file_path = os.path.join('Files/green/v_1', file)
        df = column_name_fix(spark.read.csv(file_path, header=True))# The read process pass via column_name_fix function
        df = column_add(df,col_add)
        df = df.toPandas()
        df = location_id(df,zones)
        df = df.drop(columns= col_drop)
        df.to_csv(os.path.join('Files/integrated_files/green/Schema_v_1', file),index=False)

In [None]:
# green taxi files
# schema Two

import os
if os.path.exists('Files/integrated_files/green/Schema_v_2'):
    pass
else:
    os.mkdir('Files/integrated_files/green/Schema_v_2')
    
# Columns to be renamed
col_list1 = []
col_list2 = []

# columns to be added
col_add = ['pulocationid', 'dolocationid', 'congestion_surcharge']
col_drop = ['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude']

folder_path = './Files/green/v_2'
for file in file_content(folder_path):
        file_path = os.path.join('Files/green/v_2', file)
        df = column_name_fix(spark.read.csv(file_path, header=True))# The read process pass via column_name_fix function
        df = column_add(df,col_add)
        df = df.toPandas()
        df = location_id(df,zones)
        df = df.drop(columns= col_drop)
        df.to_csv(os.path.join('Files/integrated_files/green/Schema_v_2', file),index=False)

In [None]:
# green taxi files
# schema Three

import os
if os.path.exists('Files/integrated_files/green/Schema_v_3'):
    pass
else:
    os.mkdir('Files/integrated_files/green/Schema_v_3')
    
# Columns to be renamed
col_list1 = []
col_list2 = []

# columns to be added
col_add = ['congestion_surcharge']
col_drop = ['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude']

folder_path = './Files/green/v_3'
for file in file_content(folder_path):
        file_path = os.path.join('Files/green/v_3', file)
        df = column_name_fix(spark.read.csv(file_path, header=True))# The read process pass via column_name_fix function
        df = column_add(df,col_add)
        df.toPandas().to_csv(os.path.join('Files/integrated_files/green/Schema_v_3', file),index=False)

### Integrating yellow files