# 3. Iteratively Clean and Aggregate

### *NOTE: could have functionalised clean; but because certain column names different, was just more convenient to copy and paste - more convenient to use and time efficient*

In [1]:
from pyspark.sql import SparkSession
import os
from pyspark.sql.functions import *

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Tutorial 1")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)


22/08/08 16:26:36 WARN Utils: Your hostname, modaxuexiweiyuanzhangde-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.3.59 instead (on interface en0)
22/08/08 16:26:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/08 16:26:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/08/08 16:26:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# create new directory
output_relative_dirs = ['../data/curated/taxi_aggregated',
                        '../data/curated/taxi_parquet']

# check if it exists as it makedir will raise an error if it does exist
for output_relative_dir in output_relative_dirs:
    if not os.path.exists(output_relative_dir):
        os.makedirs(output_relative_dir)

### **Green**

In [4]:
# collect list of raw files to clean
to_clean_green = sorted(os.listdir('../data/raw/NYC_Taxi/green'))[1:]
to_clean_green

['2016-01.parquet',
 '2016-02.parquet',
 '2016-03.parquet',
 '2016-04.parquet',
 '2016-05.parquet',
 '2016-06.parquet']

In [5]:
# set schema
ddl_green = """VendorID int,
lpep_pickup_datetime datetime,
lpep_dropoff_datetime datetime,
store_and_fwd_flag string,
RatecodeID int,
PULocationID int,
DOLocationID int,
passenger_count int,
trip_distance double,
fare_amount double,
extra double,
mta_tax double,
tip_amount double,
tolls_amount double,
ehail_fee double,
improvement_surcharge double,
total_amount double,
payment_type int,
trip_type int,
congestion_surcharge double"""

# iteratively remove outlier and aggregate each file
for file in to_clean_green:
    if '2016-01' in file or '2016-02' in file or '2016-03' in file or '2016-04' in file or \
            '2016-05' in file or '2016-06' in file:

        sdf_taxi = spark.read.parquet(f'../data/raw/NYC_Taxi/green/{file}', schema = ddl_green)

        # engineer features: for determining whether trip duration ws strictly positive
        sdf_taxi = sdf_taxi.withColumn("lpep_pickup_datetime",
             to_timestamp(col("lpep_pickup_datetime"),"MM-dd-yyyy HH:mm:ss"))
        sdf_taxi = sdf_taxi.withColumn("lpep_dropoff_datetime",
             to_timestamp(col("lpep_dropoff_datetime"),"MM-dd-yyyy HH:mm:ss"))
        sdf_taxi = sdf_taxi.withColumn("trip_duration",
                                       col("lpep_dropoff_datetime")>col('lpep_pickup_datetime'))

        # drop outliers: non positive trip duration; locationID error; date outside correct range
        sdf_taxi_wo_outliers = sdf_taxi.filter((col('PULocationID') <= 263) &
                                               (col('PULocationID') >= 1) &
                                               (col('DOLocationID') <= 263) &
                                               (col('DOLocationID') >=  1)).filter(col('trip_duration') == True).filter((col("lpep_pickup_datetime") >= unix_timestamp(lit('2016-01-01 00:00:00')).cast('timestamp')) &
                                                (col("lpep_pickup_datetime") <= unix_timestamp(lit('2019-12-31 00:00:00')).cast('timestamp')))

        # drop useless rows - save space when saving
        sdf_taxi_wo_outliers = sdf_taxi_wo_outliers.drop('store_and_fwd_flag', 'passenger_count',
                                                         'lpep_dropoff_datetime', 'trip_distance',
                                                         'store_and_fwd_flag', 'payment_type',
                                                         'fare_amount', 'extra', 'mta_tax',
                                                         'mta_tax', 'tolls_amount',
                                                         'improvement_surcharge', 'total_amount',
                                                         'trip_duration', 'VendorID', 'tip_amount')

        # for counting up how many rows were removed in report
        length = sdf_taxi.count()
        print(length)
        print(length - sdf_taxi_wo_outliers.count())

        # aggregation
        sdf_groupby = sdf_taxi_wo_outliers.groupBy([date_trunc('hour',
                                                    col('lpep_pickup_datetime')).alias('rounded_lepep_pickup_datetime'),
                                                    'DOLocationID', 'PULocationID']).count()
        sdf_groupby = sdf_groupby.withColumn("weekday", date_format('rounded_lepep_pickup_datetime', 'EEEE'))
        sdf_groupby = sdf_groupby.withColumn("hour", date_format('rounded_lepep_pickup_datetime', 'HH'))

        # output
        sdf_groupby.toPandas().to_csv(f'../data/curated/taxi_aggregated/green_{file.split(".")[0]}.csv', index = False)

                                                                                

1445292


                                                                                

8179


[Stage 9:>                                                          (0 + 8) / 8]

22/08/08 16:21:16 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
22/08/08 16:21:16 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
22/08/08 16:21:18 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers


                                                                                

1510722
8328




22/08/08 16:21:55 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
22/08/08 16:21:55 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers


[Stage 22:>                                                         (0 + 8) / 8]

22/08/08 16:21:56 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers


                                                                                

1576393
8321




22/08/08 16:22:31 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
22/08/08 16:22:31 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers


                                                                                

22/08/08 16:22:32 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers


                                                                                

1543926
8547




22/08/08 16:23:09 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
22/08/08 16:23:09 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
22/08/08 16:23:10 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers


                                                                                

1536979
8272




22/08/08 16:23:46 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
22/08/08 16:23:46 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
22/08/08 16:23:47 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers


                                                                                

1404727
7306




22/08/08 16:24:24 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
22/08/08 16:24:24 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
22/08/08 16:24:24 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers


                                                                                

### **Yellow**

In [4]:
# collect list of raw files to clean
to_clean_yellow = sorted(os.listdir('../data/raw/NYC_Taxi/yellow'))[1:]
to_clean_yellow

['2016-01.parquet',
 '2016-02.parquet',
 '2016-03.parquet',
 '2016-04.parquet',
 '2016-05.parquet',
 '2016-06.parquet']

In [5]:
# set schema
ddl_yellow = ["""VendorID int,
              tpep_pickup_datetime datetime,
              tpep_dropoff_datetime datetime,
              passenger_count int,
              trip_distance double,
              RatecodeID int,
              store_and_fwd_flag string,
              PULocationID int,
              DOLocationID int,
              payment_type double,
              fare_amount double,
              extra double,
              mta_tax double,
              tip_amount double,
              tolls_amount double,
              improvement_surcharge double,
              total_amount double,
              congestion_surcharge double,
              airport_fee double"""]

# iteratively remove outlier and aggregate each file
for file in to_clean_yellow:
    if '2016-01' in file or '2016-02' in file or '2016-03' in file or \
            '2016-04' in file or '2016-05' in file or '2016-06' in file:

        sdf_taxi = spark.read.parquet(f'../data/raw/NYC_Taxi/yellow/{file}', schema = ddl_yellow)

        # engineer features: for determining whether trip duration ws strictly positive
        sdf_taxi = sdf_taxi.withColumnRenamed('tpep_pickup_datetime',
                                              'lpep_pickup_datetime').withColumnRenamed('tpep_dropoff_datetime',
                                                                                        'lpep_dropoff_datetime')
        sdf_taxi = sdf_taxi.withColumn("lpep_pickup_datetime",
             to_timestamp(col("lpep_pickup_datetime"),"MM-dd-yyyy HH:mm:ss"))
        sdf_taxi = sdf_taxi.withColumn("lpep_dropoff_datetime",
             to_timestamp(col("lpep_dropoff_datetime"),"MM-dd-yyyy HH:mm:ss"))
        sdf_taxi = sdf_taxi.withColumn("trip_duration",
                                       col("lpep_dropoff_datetime")>col('lpep_pickup_datetime'))

        # drop outliers: non positive trip duration; locationID error; date outside correct range
        sdf_taxi_wo_outliers = sdf_taxi.filter((col('PULocationID') <= 263) &
                                               (col('PULocationID') >= 1) &
                                               (col('DOLocationID') <= 263) &
                                               (col('DOLocationID') >=  1)).filter(col('trip_duration') == True).filter((col("lpep_pickup_datetime") >= unix_timestamp(lit('2016-01-01 00:00:00')).cast('timestamp')) &
                                                (col("lpep_pickup_datetime") <= unix_timestamp(lit('2019-12-31 00:00:00')).cast('timestamp')))

        # drop useless rows - save space when saving
        sdf_taxi_wo_outliers = sdf_taxi_wo_outliers.drop('store_and_fwd_flag', 'passenger_count',
                                                         'lpep_dropoff_datetime', 'trip_distance',
                                                         'store_and_fwd_flag', 'payment_type',
                                                         'fare_amount', 'extra', 'mta_tax', 'mta_tax',
                                                         'tolls_amount', 'improvement_surcharge',
                                                         'total_amount', 'trip_duration',
                                                         'VendorID', 'tip_amount')

        # for counting up how many rows were removed in report
        length = sdf_taxi.count()
        print(length)
        print(length - sdf_taxi_wo_outliers.count())

        # aggregation
        sdf_groupby = sdf_taxi_wo_outliers.groupBy([date_trunc('hour',
                                                    col('lpep_pickup_datetime')).alias('rounded_lepep_pickup_datetime'),
                                                    'DOLocationID', 'PULocationID']).count()
        sdf_groupby = sdf_groupby.withColumn("weekday", date_format('rounded_lepep_pickup_datetime', 'EEEE'))
        sdf_groupby = sdf_groupby.withColumn("hour", date_format('rounded_lepep_pickup_datetime', 'HH'))

        # output
        sdf_groupby.toPandas().to_csv(f'../data/curated/taxi_aggregated/yellow_{file.split(".")[0]}.csv', index = False)

                                                                                

10905067


                                                                                

211781


                                                                                

11375412


                                                                                

218386


                                                                                

12203824


                                                                                

226635


                                                                                

11927996


                                                                                

221059


                                                                                

11832049


                                                                                

193408


                                                                                

11131645


                                                                                

182132


                                                                                