This notebook summarises how yellow taxi data from 2019 and 2020 was cleaned. Steps and analysis can be found in **2_Preprocessing_Taxi.ipynb**. 

In [1]:
# Imports 
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import time
import gc
%matplotlib inline

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import col

import warnings
warnings.filterwarnings("ignore")

In [2]:
# Only run this cell below if you have installed SparkMonitor
from pyspark import SparkContext

# Start the spark context
sc = SparkContext.getOrCreate(conf=swan_spark_conf)

21/08/14 11:32:51 WARN Utils: Your hostname, LAPTOP-38BBQ5BL resolves to a loopback address: 127.0.1.1; using 172.18.108.55 instead (on interface eth0)
21/08/14 11:32:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/08/14 11:32:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
INFO:SparkMonitorKernel:Client Connected ('127.0.0.1', 55178)


In [3]:
# create a spark session (which will run spark jobs)
spark = SparkSession.builder.getOrCreate()

# Directly use Apache Arrow with pip3 install pyarrow
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

# Data Cleaning

In [4]:
# Construct schema
schema = StructType()

customSchema = StructType([
    StructField("VendorID", StringType(), True),
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("RatecodeID", StringType(), True), 
    StructField("store_and_fwd_flag", StringType(), True), 
    StructField("PULocationID", IntegerType(), True),
    StructField("DOLocationID", IntegerType(), True),
    StructField("payment_type", IntegerType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", StringType(), True), 
    StructField("mta_tax", StringType(), True), 
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", StringType(), True),
    StructField("improvement_surcharge", StringType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("congestion_surcharge", StringType(), True)]
)

In [5]:
def preprocessing(sdf, year):
    drop_cols = ['VendorID', 'RateCodeID', 
           'extra', 'mta_tax', 'tolls_amount', 'improvement_surcharge', 
           'congestion_surcharge', 'store_and_fwd_flag']
    sdf.drop(*drop_cols)
    if year == '2019':
        query = """
        SELECT
            *
        FROM
            sdf
        WHERE 
            sdf.total_amount < 34
            AND sdf.total_amount > 0
            AND sdf.trip_distance < 10
            AND sdf.trip_distance > 0
            AND sdf.passenger_count > 0
            AND sdf.tpep_pickup_datetime >= '2019-01-01'
            AND sdf.tpep_pickup_datetime < '2020-01-01'
            AND sdf.tpep_dropoff_datetime >= '2019-01-01'
            AND sdf.tpep_dropoff_datetime < '2020-01-01'
            AND sdf.PULocationID >= 1
            AND sdf.PULocationID <= 263
            AND sdf.DOLocationID >= 1
            AND sdf.DOLocationID <= 263
        """
    else:
        query = """
        SELECT
            *
        FROM
            sdf
        WHERE 
            sdf.total_amount < 34
            AND sdf.total_amount > 0
            AND sdf.trip_distance < 10
            AND sdf.trip_distance > 0
            AND sdf.passenger_count > 0
            AND sdf.tpep_pickup_datetime >= '2020-01-01'
            AND sdf.tpep_pickup_datetime < '2021-01-01'
            AND sdf.tpep_dropoff_datetime >= '2020-01-01'
            AND sdf.tpep_dropoff_datetime < '2021-01-01'
            AND sdf.PULocationID >= 1
            AND sdf.PULocationID <= 263
            AND sdf.DOLocationID >= 1
            AND sdf.DOLocationID <= 263
        """
    sdf.createOrReplaceTempView("sdf")
    result = spark.sql(query)
    return result 

In [6]:
sdf_2019 = spark.read.csv('../raw_data/2019', header=True, schema = customSchema)
cleaned_19 = preprocessing(sdf_2019, '2019')

In [20]:
sdf_2020 = spark.read.csv('../raw_data/2020', header=True, schema = customSchema)
cleaned_20 = preprocessing(sdf_2020, '2020')

# Aggregating 

Aggregate by day:

In [7]:
def aggregate_daily(sdf):
    query = """
        SELECT
            TO_DATE(tpep_pickup_datetime) AS pickup_date,
            ROUND(COUNT(*)/1000, 2) AS trip_count_k,
            MEAN(total_amount) AS Average_Trip_Amount_USD,
            MEAN(trip_distance) AS Average_Distance_in_Miles,
            MEAN(passenger_count) AS Average_passenger_count
        FROM
            sdf
        GROUP BY
            pickup_date
    """
    sdf.createOrReplaceTempView("sdf")
    result = spark.sql(query)
    return result


In [8]:
daily_19 = aggregate_daily(cleaned_19)

In [9]:
daily_19.show()

                                                                                

+-----------+------------+-----------------------+-------------------------+-----------------------+
|pickup_date|trip_count_k|Average_Trip_Amount_USD|Average_Distance_in_Miles|Average_passenger_count|
+-----------+------------+-----------------------+-------------------------+-----------------------+
| 2019-05-08|      247.37|       19.7860646717899|       3.0158937147292924|     1.5651650146338307|
| 2019-06-04|      218.33|     18.957261484875914|       3.0430321531626063|     1.5280950854211515|
| 2019-11-18|      217.47|     19.228767830013346|       2.9233298845818085|     1.5257322849128616|
| 2019-09-22|      195.67|      19.73755049366429|        3.371832900627628|     1.6246090316249975|
| 2019-11-01|      262.95|     19.229471154167534|        2.837496824491321|      1.580174938201179|
| 2019-11-21|      248.96|     19.738632690074414|       2.9294314658011764|     1.5180915503141117|
| 2019-05-27|      139.98|     19.920432130065947|       3.8215078474935873|      1.6726269

In [21]:
daily_20 = aggregate_daily(cleaned_20)

Aggregate by zone:

In [14]:
def aggregate_by_pickup(sdf):
    query = """
        SELECT
            PULocationID AS Pickup_location,
            ROUND(COUNT(*)/1000, 2) AS trip_count_k
        FROM
            sdf
        GROUP BY
            PULocationID
    """
    sdf.createOrReplaceTempView("sdf")
    result = spark.sql(query)
    return result

In [15]:
def aggregate_by_dropoff(sdf):
    query = """
        SELECT
            DOLocationID AS Dropoff_location,
            ROUND(COUNT(*)/1000, 2) AS trip_count_k
        FROM
            sdf
        GROUP BY
            DOLocationID
    """
    sdf.createOrReplaceTempView("sdf")
    result = spark.sql(query)
    return result

In [16]:
pickup_19 = aggregate_by_pickup(cleaned_19)
pickup_19.show()

                                                                                

+---------------+------------+
|Pickup_location|trip_count_k|
+---------------+------------+
|            148|     1004.57|
|            243|       13.44|
|             31|        0.42|
|            137|      963.36|
|             85|        3.25|
|            251|        0.09|
|             65|       93.99|
|            255|       59.62|
|             53|        1.07|
|            133|        3.71|
|             78|        2.48|
|            155|        2.94|
|            108|        1.02|
|            211|      643.92|
|             34|        1.83|
|            193|       30.05|
|            126|         1.6|
|            101|        1.06|
|            115|        0.11|
|             81|         1.8|
+---------------+------------+
only showing top 20 rows



In [25]:
dropoff_19 = aggregate_by_dropoff(cleaned_19)

In [26]:
pickup_20 = aggregate_by_pickup(cleaned_20)

In [27]:
dropoff_20 = aggregate_by_dropoff(cleaned_20)

## 6. Writing to Disk
- Optional: csv, pickle, feather, or parquet

Although it is more time-consuming to convert to pd and then csv, it will save time when reading the processed data later.

In [18]:
daily_19.orderBy(col("pickup_date").asc())

DataFrame[pickup_date: date, trip_count_k: double, Average_Trip_Amount_USD: double, Average_Distance_in_Miles: double, Average_passenger_count: double]

In [19]:
daily_19.toPandas().to_csv('../processed_data/daily_summary_2019.csv',header=True)

                                                                                

In [22]:
daily_20_new = daily_20.sort(col("pickup_date"))
daily_20_new.toPandas().to_csv('../processed_data/daily_summary_2020.csv',header=True)

                                                                                

In [28]:
pickup_19.toPandas().to_csv('../processed_data/pickup_location_tripcounts_2019.csv',header=True)
dropoff_19.toPandas().to_csv('../processed_data/dropoff_location_tripcounts_2019.csv',header=True)

                                                                                

In [29]:
pickup_20.toPandas().to_csv('../processed_data/pickup_location_tripcounts_2020.csv',header=True)
dropoff_20.toPandas().to_csv('../processed_data/dropoff_location_tripcounts_2020.csv',header=True)

                                                                                