In [1]:
from pyspark.sql import SparkSession
from urllib.request import urlretrieve
# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("ADS project 1")
    .config("spark.sql.repl.eagerEval.enabled", True)  
    .config("spark.executor.memory","2G")
    .config("spark.driver.memory","4G")
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

22/08/22 01:37:40 WARN Utils: Your hostname, Luo resolves to a loopback address: 127.0.1.1; using 172.17.1.121 instead (on interface eth0)
22/08/22 01:37:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/22 01:37:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark.conf.set("spark.sql.parquet.compression.codec", "gzip")

In [3]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sbs
import geopandas as gpd
import folium

In [4]:
YEARS = ['2019', '2021']
MONTHS = range(1, 13)
path = f"../data/curated/tlc_data/"
cols = ['trip_distance', 'fare_amount', 'tip_amount', 'average_speed', 'time_duration']

In [6]:
# create a empty dataframe to store the quantiles
def create_quan_df():
    colu = cols
    inde = ['Q1', 'Q3', 'IQR']
    return pd.DataFrame(index = inde, columns = colu)

In [5]:
# use the quantile found to clean the outliers
def clean_feature(sdf, year):
    quan_df = pd.read_csv(f'../data/curated/{year}_quantile.csv', index_col= 0 )
    for fea in cols:
        sdf = sdf\
        .where((F.col(fea) >= quan_df.loc['Q1', fea] - 1.5* quan_df.loc['IQR', fea]) & (F.col(fea) <= quan_df.loc['Q3', fea] + 1.5*quan_df.loc['IQR', fea]))
    return sdf


In [7]:
for year in YEARS:
    # read in the data
    sdf = spark.read.parquet(f'{path}{year}/firstclean/*/')
    # initialise the quantile dataframe
    quan_df = create_quan_df()
    
    for fea in cols:
        # for each feature, find Q1 and Q3
        Q1 = sdf.approxQuantile(fea,[0.25],relativeError=0.05)[0]
        Q3 = sdf.approxQuantile(fea,[0.75],relativeError=0.05)[0]
        IQR = Q3 - Q1
        # store in the dataframe
        quan_df.loc["Q1", fea] = Q1
        quan_df.loc["Q3", fea] = Q3
        quan_df.loc["IQR", fea] = IQR
    # save the quantile dataframe
    quan_df.to_csv(f'../data/curated/{year}_quantile.csv')

                                                                                

In [8]:
for year in YEARS:
    for month in MONTHS:
        # turn '1' to '01' to keep consistent with the format of the file names
        month = str(month).zfill(2)
        # read in the data
        sdf = spark.read.parquet(f'{path}{year}/firstclean/{year}--{month}-firstclean.parquet')
        # clean the dataset using the quantiles
        sdf = clean_feature(sdf, year)
        # add Tip rate as a feature
        sdf = sdf.withColumn("tip_rate", F.col('tip_amount') / F.col('fare_amount'))
        # only select the columns that will be used later
        final_sdf = sdf.select(['trip_distance', 'PULocationID', 'DOLocationID', 'fare_amount', \
        'is_airport', 'Month,date', 'Month', 'Pickup_time', 'time_duration', 'is_weekend', 'average_speed', 'tip_rate', 'tip_amount'])
        # save the data files to the folder '/finalclean'
        final_sdf.write.mode("overwrite").parquet(f'{path}{year}/finalclean/{year}--{month}-finalclean.parquet')
        print(f"finished {year} {month}")

                                                                                

finished 2019 01


                                                                                

finished 2019 02


                                                                                

finished 2019 03


                                                                                

finished 2019 04


                                                                                

finished 2019 05


                                                                                

finished 2019 06


                                                                                

finished 2019 07


                                                                                

finished 2019 08


                                                                                

finished 2019 09


                                                                                

finished 2019 10


                                                                                

finished 2019 11


                                                                                

finished 2019 12


                                                                                

finished 2021 01


                                                                                

finished 2021 02


                                                                                

finished 2021 03


                                                                                

finished 2021 04


                                                                                

finished 2021 05


                                                                                

finished 2021 06


                                                                                

finished 2021 07


                                                                                

finished 2021 08


                                                                                

finished 2021 09


                                                                                

finished 2021 10


                                                                                

finished 2021 11




finished 2021 12


                                                                                