In [None]:
# Install dependencies
!pip install -r requirements.txt

In [1]:
# Initial Spark

from pyspark.sql import SparkSession, Row, DataFrame
import pyspark.sql.functions as f
import pandas as pd
import numpy as np

spark = SparkSession.builder\
    .master("local[*]")\
    .appName("main")\
    .config("spark.dynamicAllocation.enabled", "true")\
    .config("spark.shuffle.service.enabled", "true")\
    .getOrCreate()


23/03/19 10:13:47 WARN Utils: Your hostname, Hais-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.31.78 instead (on interface en0)
23/03/19 10:13:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/19 10:13:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Define input directory
INPUT_PATH = "<input_your_input_data_path_here>"

In [5]:
# Read zone lookup csv file
df_zone = spark.read.option("header", True).csv(f"{INPUT_PATH}/taxi_zones_lookup.csv")
df_zone.createOrReplaceTempView("zoneTable")
df_zone.printSchema()
print(f"Number of rows in zone table: {df_zone.count()}")

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

Number of rows in zone table: 265


In [12]:
# Read taxi parquet files
df_trip = spark.read.parquet(f"{INPUT_PATH}/yellow_tripdata_2019-01.parquet")
df_trip.createOrReplaceTempView("taxiTable")
df_trip.printSchema()
print(f"Number of rows in trip table: {df_trip.count()}")

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)

Number of rows in trip table: 7696617


In [27]:
# Clean up zone table column names

df_zone = df_zone.withColumnRenamed("LocationID", "location_id") \
                    .withColumnRenamed("Borough", "borough") \
                    .withColumnRenamed("Zone", "zone").dropna()

df_zone.printSchema()
df_zone.show(1)

root
 |-- location_id: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

+-----------+-------+--------------+------------+
|location_id|borough|          zone|service_zone|
+-----------+-------+--------------+------------+
|          1|    EWR|Newark Airport|         EWR|
+-----------+-------+--------------+------------+
only showing top 1 row



In [75]:
# Clean up trip table column names
import pyspark.sql.types as t

df_trip = df_trip.withColumn("uuid", f.expr("uuid()")).withColumnRenamed("VendorId", "vendor_id") \
                        .withColumnRenamed("Passenger_count", "passenger_count") \
                        .withColumnRenamed("Trip_distance", "trip_distance") \
                        .withColumnRenamed("PULocationID","pick_up_location_id") \
                        .withColumnRenamed("DOLocationID","drop_off_location_id") \
                        .withColumnRenamed("RateCodeID","rate_code_id") \
                        .withColumnRenamed("Store_and_fwd_flag","store_and_fwd_flag") \
                        .withColumnRenamed("Payment_type","payment_type") \
                        .withColumnRenamed("Fare_amount","fare_amount") \
                        .withColumnRenamed("Extra","extra") \
                        .withColumnRenamed("MTA_tax","mta_tax") \
                        .withColumnRenamed("Improvement_surcharge","improvement_surcharge") \
                        .withColumnRenamed("Tip_amount","tip_amount") \
                        .withColumnRenamed("Tolls_amount","tolls_amount") \
                        .withColumnRenamed("Total_amount","total_amount") \
                        .withColumnRenamed("Congestion_Surcharge","congestion_surcharge") \
                        .withColumnRenamed("Airport_fee","airport_fee") \
                        .withColumn("passenger_count", f.col("passenger_count").cast(t.LongType()))
df_trip.printSchema()
df_trip.show(1)

root
 |-- vendor_id: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- rate_code_id: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pick_up_location_id: long (nullable = true)
 |-- drop_off_location_id: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)
 |-- uuid: string (nullable = false)

+---------+--------------------+---------------------+---------------+-------------+

In [31]:
# Create date table from tpep_pickup_datetime
df_date = df_trip.select(f.col("tpep_pickup_datetime")) \
                .withColumnRenamed("tpep_pickup_datetime", "timestamp") \
                .withColumn("year", f.year(f.col("timestamp"))) \
                .withColumn("month", f.month(f.col("timestamp"))) \
                .withColumn("day", f.dayofmonth(f.col("timestamp"))) \
                .withColumn("hour", f.hour(f.col("timestamp"))) \
                .withColumn("minute", f.minute(f.col("timestamp"))) \
                .withColumn("second", f.second(f.col("timestamp"))) \
                .withColumn("day_of_week", f.dayofweek(f.col("timestamp")))

df_date.show(1)

+-------------------+----+-----+---+----+------+------+-----------+
|          timestamp|year|month|day|hour|minute|second|day_of_week|
+-------------------+----+-----+---+----+------+------+-----------+
|2019-01-01 02:46:40|2019|    1|  1|   2|    46|    40|          3|
+-------------------+----+-----+---+----+------+------+-----------+
only showing top 1 row



In [58]:
# Create pickup_location_gain table
def create_location_gain_table(df_trip: DataFrame, df_zone: DataFrame, type_location: str) -> DataFrame:
    df_location_gain = df_trip.select(f.col(type_location), 
                                  f.col("fare_amount"), f.col("extra"), f.col("mta_tax"), 
                                  f.col("tip_amount"), f.col("tolls_amount"), f.col("total_amount")) \
                                    .groupBy(type_location).agg(f.sum("fare_amount").alias("total_fare_amount"), 
                                                                f.sum("extra").alias("total_extra"),
                                                                f.sum("mta_tax").alias("total_mta_tax"),
                                                                f.sum("tip_amount").alias("total_tip_amount"),
                                                                f.sum("tolls_amount").alias("total_tolls_amount"))

    df_zone_simplified = df_zone.select(f.col("zone"), f.col("location_id"))
    df_location_gain = df_location_gain.join(df_zone_simplified) \
                        .where(df_zone_simplified["location_id"] == df_location_gain[type_location]) \
                        .drop(type_location)
    
    return df_location_gain

                        
df_pu_location_gain = create_location_gain_table(df_trip, df_zone, "pick_up_location_id")
df_do_location_gain = create_location_gain_table(df_trip, df_zone, "drop_off_location_id")

print("Pick up location gain table")
df_pu_location_gain.show(2)

print("Drop off location gain table")
df_do_location_gain.show(2)

Pick up location gain table


                                                                                

+-----------------+-----------+-------------+------------------+------------------+--------------+-----------+
|total_fare_amount|total_extra|total_mta_tax|  total_tip_amount|total_tolls_amount|          zone|location_id|
+-----------------+-----------+-------------+------------------+------------------+--------------+-----------+
| 7225.38999999999|     174.75|        135.5|            226.16| 295.4799999999998|Brighton Beach|         29|
|18777.30000000003|      447.5|        376.5|104.55999999999997| 712.4399999999991|  Borough Park|         26|
+-----------------+-----------+-------------+------------------+------------------+--------------+-----------+
only showing top 2 rows

Drop off location gain table




+------------------+-----------+-------------+------------------+------------------+--------------+-----------+
| total_fare_amount|total_extra|total_mta_tax|  total_tip_amount|total_tolls_amount|          zone|location_id|
+------------------+-----------+-------------+------------------+------------------+--------------+-----------+
| 91443.14999999995|    1136.25|       1192.0| 4649.979999999999| 3225.010000000037|  Borough Park|         26|
|45342.759999999995|      536.0|        512.5|2450.4899999999984| 1502.959999999998|Brighton Beach|         29|
+------------------+-----------+-------------+------------------+------------------+--------------+-----------+
only showing top 2 rows



                                                                                

In [82]:
# Create daily passenger/distance table

df_trip_date = df_trip.join(df_date).where(df_trip["tpep_pickup_datetime"] == df_date["timestamp"]) \
                    .select(f.col("day_of_week"), f.col("passenger_count"), f.col("trip_distance")).dropna()
                            
df_daily_passenger = df_trip_date.groupBy("day_of_week").agg(f.sum("passenger_count").alias("total_passenger_count"),
                                                             f.sum("trip_distance").alias("total_trip_distance"))
df_daily_passenger.show(2)

23/03/20 01:48:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/03/20 01:48:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/03/20 01:48:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/03/20 01:48:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/03/20 01:48:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/03/20 01:48:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/03/20 01:48:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/03/20 01:48:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/03/20 01:48:56 WARN RowBasedKeyValueBatch: Calling spill() on

In [None]:
spark.stop()