# Challenge 3

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import when, count, col, lit, year, month

In [None]:
spark = (SparkSession
  .builder
  .appName("nyctlc-dev")
  .getOrCreate())

In [None]:
s3_bucket = "s3://your_bucket_name"

# Green taxi

In [None]:
green_path = f'{s3_bucket}/raw/nyc-tlc/green'
green_2013 = f'{green_path}/green_tripdata_2013*.csv.gz'
green_2014 = f'{green_path}/green_tripdata_2014*.csv.gz'
green_2015 = f'{green_path}/green_tripdata_2015*.csv.gz'
green_2016 = f'{green_path}/green_tripdata_2016*.csv.gz'
green_2017 = f'{green_path}/green_tripdata_2017*.csv.gz'
green_2018 = f'{green_path}/green_tripdata_2018*.csv.gz'
green_2019 = f'{green_path}/green_tripdata_2019*.csv.gz'
green_2020 = f'{green_path}/green_tripdata_2020*.csv.gz'
green_output_path = f'{s3_bucket}/staging/nyc-tlc/green'

## Explore 2013-2014

Start exploring the 2013-2014 data by instructing Spark to infer schema

In [None]:
df1_raw = spark.read.csv(
    [green_2013, green_2014],
    header=True,
    inferSchema=True,
    timestampFormat='yyyy-MM-dd HH:mm:ss',
)

In [None]:
# Cross check with the S3 select result
df1_raw.printSchema()

In [None]:
df1_raw.summary().show()

In [None]:
# Counting null values
df1_raw.select([count(when(col(c).isNull(), c)).alias(c) for c in df1_raw.columns]).show()

## Data load
### 2013-2014
Data looks good. Now customize the schema and load again

In [None]:
# Changes:
# Upper case > lower case
# Ehail_fee - double


df1_schema = StructType([
    StructField('VendorID',IntegerType(),True),
    StructField('lpep_pickup_datetime',TimestampType(),True),
    StructField('lpep_dropoff_datetime',TimestampType(),True),
    StructField('store_and_fwd_flag',StringType(),True),
    StructField('RatecodeID',IntegerType(),True),
    StructField('Pickup_longitude',DoubleType(),True),
    StructField('Pickup_latitude',DoubleType(),True),
    StructField('Dropoff_longitude',DoubleType(),True),
    StructField('Dropoff_latitude',DoubleType(),True),
    StructField('passenger_count',IntegerType(),True),
    StructField('trip_distance',DoubleType(),True),
    StructField('fare_amount',DoubleType(),True),
    StructField('extra',DoubleType(),True),
    StructField('mta_tax',DoubleType(),True),
    StructField('tip_amount',DoubleType(),True),
    StructField('tolls_amount',DoubleType(),True),
    StructField('ehail_fee',DoubleType(),True),
    StructField('total_amount',DoubleType(),True),
    StructField('payment_type',IntegerType(),True),
    StructField('trip_type' ,IntegerType(),True)])

In [None]:
df1_raw = spark.read.csv(
    [green_2013, green_2014],
    schema=df1_schema,
    header=True,
    timestampFormat='yyyy-MM-dd HH:mm:ss',
)

In [None]:
df1_raw.printSchema()

In [None]:
# Cross-check with the one with schema inferred 
df1_raw.summary().show()

In [None]:
# Add more columns to make the schema the same across all years
# Add: improvement_surcharge, PULocationID, DOLocationID, congestion_surcharge

df_2013_2014 = (df1_raw.withColumn('improvement_surcharge', lit(None).astype(DoubleType()))
                .withColumn('PULocationID', lit(None).astype(IntegerType()))
                .withColumn('DOLocationID', lit(None).astype(IntegerType()))
                .withColumn('congestion_surcharge', lit(None).astype(DoubleType()))
               )

In [None]:
df_2013_2014.printSchema()

### 2015-2016

In [None]:
# Changes from 2013-2014:
# Add: improvement_surcharge


df2_schema = StructType([
    StructField('VendorID',IntegerType(),True),
    StructField('lpep_pickup_datetime',TimestampType(),True),
    StructField('lpep_dropoff_datetime',TimestampType(),True),
    StructField('store_and_fwd_flag',StringType(),True),
    StructField('RatecodeID',IntegerType(),True),
    StructField('Pickup_longitude',DoubleType(),True),
    StructField('Pickup_latitude',DoubleType(),True),
    StructField('Dropoff_longitude',DoubleType(),True),
    StructField('Dropoff_latitude',DoubleType(),True),
    StructField('passenger_count',IntegerType(),True),
    StructField('trip_distance',DoubleType(),True),
    StructField('fare_amount',DoubleType(),True),
    StructField('extra',DoubleType(),True),
    StructField('mta_tax',DoubleType(),True),
    StructField('tip_amount',DoubleType(),True),
    StructField('tolls_amount',DoubleType(),True),
    StructField('ehail_fee',DoubleType(),True),
    StructField('improvement_surcharge',DoubleType(),True),
    StructField('total_amount',DoubleType(),True),
    StructField('payment_type',IntegerType(),True),
    StructField('trip_type' ,IntegerType(),True)])

In [None]:
df2_raw = spark.read.csv(
    [green_2015, green_2016],
    schema=df2_schema,
    header=True,
    timestampFormat='yyyy-MM-dd HH:mm:ss',
)

In [None]:
# Cross check with the S3 select result
df2_raw.printSchema()

In [None]:
# Add more columns to make the schema the same across all years
# Add: improvement_surcharge, PULocationID, DOLocationID, congestion_surcharge

df_2015_2016 = (df2_raw.withColumn('PULocationID', lit(None).astype(IntegerType()))
                .withColumn('DOLocationID', lit(None).astype(IntegerType()))
                .withColumn('congestion_surcharge', lit(None).astype(DoubleType()))
               )

In [None]:
df_2015_2016.summary().show()

### 2017-2018

In [None]:
# Changes from 2015-2016:
# Remove: Pickup_longitude, Pickup_latitude, Dropoff_longitude, Dropoff_latitude
# Add: PULocationID, DOLocationID


df3_schema = StructType([
    StructField('VendorID',IntegerType(),True),
    StructField('lpep_pickup_datetime',TimestampType(),True),
    StructField('lpep_dropoff_datetime',TimestampType(),True),
    StructField('store_and_fwd_flag',StringType(),True),
    StructField('RatecodeID',IntegerType(),True),
    StructField('PULocationID',IntegerType(),True),
    StructField('DOLocationID',IntegerType(),True),
    StructField('passenger_count',IntegerType(),True),
    StructField('trip_distance',DoubleType(),True),
    StructField('fare_amount',DoubleType(),True),
    StructField('extra',DoubleType(),True),
    StructField('mta_tax',DoubleType(),True),
    StructField('tip_amount',DoubleType(),True),
    StructField('tolls_amount',DoubleType(),True),
    StructField('ehail_fee',DoubleType(),True),
    StructField('improvement_surcharge',DoubleType(),True),
    StructField('total_amount',DoubleType(),True),
    StructField('payment_type',IntegerType(),True),
    StructField('trip_type' ,IntegerType(),True)])

In [None]:
df3_raw = spark.read.csv(
    [green_2017, green_2018],
    schema=df3_schema,
    header=True,
    timestampFormat='yyyy-MM-dd HH:mm:ss',
)

In [None]:
# Cross check with the S3 select result
df3_raw.printSchema()

In [None]:
# Add more columns to make the schema the same across all years
# Add: Pickup_longitude, Pickup_latitude, Dropoff_longitude, Dropoff_latitude, congestion_surcharge

df_2017_2018 = (df3_raw.withColumn('Pickup_longitude', lit(None).astype(DoubleType()))
                .withColumn('Pickup_latitude', lit(None).astype(DoubleType()))
                .withColumn('Dropoff_longitude', lit(None).astype(DoubleType()))
                .withColumn('Dropoff_latitude', lit(None).astype(DoubleType()))
                .withColumn('congestion_surcharge', lit(None).astype(DoubleType()))
               )

In [None]:
df_2017_2018.summary().show()

### 2019-2020

In [None]:
# Changes from 2017-2018:
# Add: congestion_surcharge


df4_schema = StructType([
    StructField('VendorID',IntegerType(),True),
    StructField('lpep_pickup_datetime',TimestampType(),True),
    StructField('lpep_dropoff_datetime',TimestampType(),True),
    StructField('store_and_fwd_flag',StringType(),True),
    StructField('RatecodeID',IntegerType(),True),
    StructField('PULocationID',IntegerType(),True),
    StructField('DOLocationID',IntegerType(),True),
    StructField('passenger_count',IntegerType(),True),
    StructField('trip_distance',DoubleType(),True),
    StructField('fare_amount',DoubleType(),True),
    StructField('extra',DoubleType(),True),
    StructField('mta_tax',DoubleType(),True),
    StructField('tip_amount',DoubleType(),True),
    StructField('tolls_amount',DoubleType(),True),
    StructField('ehail_fee',DoubleType(),True),
    StructField('improvement_surcharge',DoubleType(),True),
    StructField('total_amount',DoubleType(),True),
    StructField('payment_type',IntegerType(),True),
    StructField('trip_type' ,IntegerType(),True),
    StructField('congestion_surcharge',DoubleType(),True)
])

In [None]:
df4_raw = spark.read.csv(
    [green_2019, green_2020],
    schema=df4_schema,
    header=True,
    timestampFormat='yyyy-MM-dd HH:mm:ss',
)

In [None]:
# Cross check with the S3 select result
df4_raw.printSchema()

In [None]:
# Add more columns to make the schema the same across all years
# Add: Pickup_longitude, Pickup_latitude, Dropoff_longitude, Dropoff_latitude

df_2019_2020 = (df4_raw.withColumn('Pickup_longitude', lit(None).astype(DoubleType()))
                .withColumn('Pickup_latitude', lit(None).astype(DoubleType()))
                .withColumn('Dropoff_longitude', lit(None).astype(DoubleType()))
                .withColumn('Dropoff_latitude', lit(None).astype(DoubleType()))
               )

In [None]:
df_2019_2020.summary().show()

## Merge all years

In [None]:
df_green = (df_2013_2014
            .unionByName(df_2015_2016)
            .unionByName(df_2017_2018)
            .unionByName(df_2019_2020)
           )

In [None]:
df_green.printSchema()

In [None]:
df_green.summary().show()

Looks good. Sink the df to staging partitioned by pickup year and month

In [None]:
df_green_final = (df_green
                  .withColumn("year", year('lpep_pickup_datetime'))
                  .withColumn("month", month('lpep_pickup_datetime'))
                 )

In [None]:
partitions = ['year', 'month']

(df_green_final
 .repartition(col(partitions[0]), col(partitions[1]))
 .write.mode("OVERWRITE")
 .option("maxRecordsPerFile", 1000000)
 .partitionBy(partitions)
 .parquet(green_output_path, compression="gzip"))

# FHV

In [None]:
fhv_path = f'{s3_bucket}/raw/nyc-tlc/fhv'
fhv_2015 = f'{fhv_path}/fhv_tripdata_2015*.csv.gz'
fhv_2016 = f'{fhv_path}/fhv_tripdata_2016*.csv.gz'
fhv_2017 = f'{fhv_path}/fhv_tripdata_2017*.csv.gz'
fhv_2018 = f'{fhv_path}/fhv_tripdata_2018*.csv.gz'
fhv_2019 = f'{fhv_path}/fhv_tripdata_2019*.csv.gz'
fhv_2020 = f'{fhv_path}/fhv_tripdata_2020*.csv.gz'
fhv_output_path = f'{s3_bucket}/staging/nyc-tlc/fhv'

## Explore 2020

Infer the schema and make manual adjustment

In [None]:
df4_raw = spark.read.csv(
    [fhv_2020],
    header=True,
    inferSchema=True,
    timestampFormat='yyyy-MM-dd HH:mm:ss',
)

In [None]:
df4_raw.printSchema()

In [None]:
df4_raw.show(5)

In [None]:
fhv_2020_schema = StructType([
    StructField('dispatching_base_num',StringType(),True),
    StructField('pickup_datetime',TimestampType(),True),
    StructField('dropoff_datetime',TimestampType(),True),
    StructField('PULocationID',IntegerType(),True),
    StructField('DOLocationID',IntegerType(),True),
    StructField('SR_Flag',StringType(),True)
])

## Explore 2015: Does Pickup_date includes time?

In [None]:
df1_raw = spark.read.csv(
    [fhv_2015],
    header=True,
    inferSchema=True,
    timestampFormat='yyyy-MM-dd HH:mm:ss',
)

In [None]:
df1_raw.show(5)

## Explore 2018: Duplicated dispatching_num?

In [None]:
fhv_2018_schema = StructType([
    StructField('pickup_datetime',TimestampType(),True),
    StructField('dropoff_datetime',TimestampType(),True),
    StructField('PULocationID',IntegerType(),True),
    StructField('DOLocationID',IntegerType(),True),
    StructField('dispatching_base_number',StringType(),True),
    StructField('dispatching_base_num',StringType(),True)
])

In [None]:
df2_raw = spark.read.csv(
    [fhv_2018],
    schema=fhv_2018_schema,
    header=True,
    timestampFormat='yyyy-MM-dd HH:mm:ss',
)

In [None]:
df2_raw.show(5)

In [None]:
df2_raw.count()

In [None]:
# Is the column all null?
df2_raw.select(count(when(col('dispatching_base_number').isNull(), 1))).show()

In [None]:
# Is the column all null?
df2_raw.select(count(when(col('dispatching_base_num').isNull(), 1))).show()

In [None]:
# What are those not null values?
df2_raw.filter(df2_raw['dispatching_base_number'].isNotNull()).show(5)

In [None]:
# Does the column only contain 1 and null? Yes
df2_raw.select('dispatching_base_number').summary().show()

## Data loading

### 2015-2016

In [None]:
fhv_2015_schema = StructType([
    StructField('dispatching_base_num',StringType(),True),
    StructField('pickup_datetime',TimestampType(),True),
    StructField('PULocationID',IntegerType(),True)
])

In [None]:
fhv_2015_2016_raw = spark.read.csv(
    [fhv_2015, fhv_2016],
    schema=fhv_2015_schema,
    header=True,
    timestampFormat='yyyy-MM-dd HH:mm:ss',
)

In [None]:
# Add more columns to make the schema the same across all years
# Add: dropoff_datetime, DOlocationID, SR_Flag

fhv_2015_2016 = (fhv_2015_2016_raw.withColumn('dropoff_datetime', lit(None).astype(TimestampType()))
                 .withColumn('DOlocationID', lit(None).astype(IntegerType()))
                 .withColumn('SR_Flag', lit(None).astype(StringType()))
               )

### 2017

In [None]:
fhv_2017_schema = StructType([
    StructField('dispatching_base_num',StringType(),True),
    StructField('pickup_datetime',TimestampType(),True),
    StructField('dropoff_datetime',TimestampType(),True),
    StructField('PULocationID',IntegerType(),True),
    StructField('DOLocationID',IntegerType(),True)
])

In [None]:
fhv_2017_raw = spark.read.csv(
    [fhv_2017],
    schema=fhv_2017_schema,
    header=True,
    timestampFormat='yyyy-MM-dd HH:mm:ss',
)

In [None]:
# Add more columns to make the schema the same across all years
# Add: SR_Flag

fhv_2017 = fhv_2017_raw.withColumn('SR_Flag', lit(None).astype(StringType()))

### 2018

In [None]:
fhv_2018_schema = StructType([
    StructField('pickup_datetime',TimestampType(),True),
    StructField('dropoff_datetime',TimestampType(),True),
    StructField('PULocationID',IntegerType(),True),
    StructField('DOLocationID',IntegerType(),True),
    StructField('dispatching_base_number',StringType(),True),
    StructField('dispatching_base_num',StringType(),True)
])

In [None]:
fhv_2018_raw = spark.read.csv(
    [fhv_2018],
    schema=fhv_2018_schema,
    header=True,
    timestampFormat='yyyy-MM-dd HH:mm:ss',
)

In [None]:
# Add more columns to make the schema the same across all years
# Add: SR_Flag
# Drop: dispatching_base_number

fhv_2018 = (fhv_2018_raw.withColumn('SR_Flag', lit(None).astype(StringType()))
            .drop('dispatching_base_number')
           )

### 2019-2020

In [None]:
fhv_2020_schema = StructType([
    StructField('dispatching_base_num',StringType(),True),
    StructField('pickup_datetime',TimestampType(),True),
    StructField('dropoff_datetime',TimestampType(),True),
    StructField('PULocationID',IntegerType(),True),
    StructField('DOLocationID',IntegerType(),True),
    StructField('SR_Flag',StringType(),True)
])

In [None]:
fhv_2019_2020_raw = spark.read.csv(
    [fhv_2019, fhv_2020],
    schema=fhv_2020_schema,
    header=True,
    timestampFormat='yyyy-MM-dd HH:mm:ss',
)

## Merge all years

In [None]:
df_fhv = (fhv_2015_2016
            .unionByName(fhv_2017)
            .unionByName(fhv_2018)
            .unionByName(fhv_2019_2020_raw)
           )

In [None]:
df_fhv.printSchema()

In [None]:
df_fhv.summary().show()

Looks good. Sink the df to staging partitioned by pickup year and month

In [None]:
df_fhv_final = (df_fhv
                  .withColumn("year", year('pickup_datetime'))
                  .withColumn("month", month('pickup_datetime'))
                 )

In [None]:
partitions = ['year', 'month']

(df_fhv_final
 .repartition(col(partitions[0]), col(partitions[1]))
 .write.mode("OVERWRITE")
 .option("maxRecordsPerFile", 1000000)
 .partitionBy(partitions)
 .parquet(fhv_output_path, compression="gzip"))