In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("NYC_Taxi_Analysis").getOrCreate()

df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("multiline", "true") \
    .format('parquet').load("data/yellow_tripdata_2018-01.parquet")

df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2018-01-01 00:21:05|  2018-01-01 00:24:23|              1|          0.5|         1|                 N|          41|          24|           2|        4.5|  0.5|    0.5|       0.

In [11]:
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [19]:
from pyspark.sql.functions import col

df = df.withColumn("Revenue", 
    col("fare_amount") + 
    col("extra") + 
    col("mta_tax") + 
    col("tip_amount") + 
    col("tolls_amount") + 
    col("improvement_surcharge") + 
    col("total_amount")
)

In [20]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|           Revenue|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2018-01-01 00:21:05|  2018-01-01 00:24:23|              1|          0.5|         1|                 N|          41|    

In [22]:
passenger_by_area = df.groupBy("PULocationID").agg(sum("passenger_count").alias("total_passengers")).orderBy("total_passengers")

passenger_by_area.show()

+------------+----------------+
|PULocationID|total_passengers|
+------------+----------------+
|          44|               1|
|         204|               1|
|         184|               2|
|         187|               2|
|           5|               3|
|         176|               3|
|          46|               3|
|          58|               4|
|           2|               4|
|         199|               4|
|          30|               4|
|         245|               5|
|         221|               6|
|         214|               6|
|          84|               8|
|         105|               8|
|         156|               8|
|         109|               8|
|         251|               9|
|         115|              12|
+------------+----------------+
only showing top 20 rows


In [23]:
vendor_earnings = df.groupBy("VendorID") \
    .agg(
        avg("fare_amount").alias("avg_fare"),
        avg("total_amount").alias("avg_total_earning"),
        avg("Revenue").alias("avg_revenue")
    ).orderBy("VendorID")

vendor_earnings.show()

+--------+------------------+------------------+------------------+
|VendorID|          avg_fare| avg_total_earning|       avg_revenue|
+--------+------------------+------------------+------------------+
|       1|11.959975333672647|15.127384289902137|30.254768579804274|
|       2|12.467037370438032|15.775723474073514|31.545275988143338|
+--------+------------------+------------------+------------------+



In [26]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("payment_type").orderBy("tpep_pickup_datetime").rowsBetween(Window.unboundedPreceding, Window.currentRow)

payment_analysis = df.select(
    "payment_type",
    "tpep_pickup_datetime",
    "total_amount"
).withColumn(
    "running_count", 
    row_number().over(window_spec)
)

payment_analysis.orderBy("payment_type", "tpep_pickup_datetime").show()

+------------+--------------------+------------+-------------+
|payment_type|tpep_pickup_datetime|total_amount|running_count|
+------------+--------------------+------------+-------------+
|           1| 2002-12-31 23:01:55|        7.56|            1|
|           1| 2003-01-01 00:06:34|       48.96|            2|
|           1| 2008-12-31 18:30:07|      180.06|            3|
|           1| 2008-12-31 23:02:27|       22.88|            4|
|           1| 2008-12-31 23:45:58|       28.56|            5|
|           1| 2009-01-01 00:36:33|       12.36|            6|
|           1| 2009-01-01 00:48:49|       20.38|            7|
|           1| 2009-01-01 01:14:52|       18.36|            8|
|           1| 2009-01-01 01:37:21|        6.36|            9|
|           1| 2009-01-01 03:32:24|       38.47|           10|
|           1| 2009-01-01 08:04:56|       49.86|           11|
|           1| 2009-01-01 10:05:05|        12.0|           12|
|           1| 2009-01-01 13:25:15|         9.0|       

In [27]:
from pyspark.sql.functions import date_format, desc

specific_date = "2009-01-01"

top_vendors = df.filter(date_format("tpep_pickup_datetime", "yyyy-MM-dd") == specific_date) \
                    .groupBy("VendorID") \
                    .agg(sum("Revenue").alias("total_revenue"), sum("passenger_count").alias("total_passenger_count"), sum("trip_distance").alias("total_trip_distance")) \
                    .orderBy(desc("total_revenue")) \
                    .limit(2)

top_vendors.show()

+--------+-----------------+---------------------+-------------------+
|VendorID|    total_revenue|total_passenger_count|total_trip_distance|
+--------+-----------------+---------------------+-------------------+
|       2|544.9800000000001|                   21|  48.89999999999999|
+--------+-----------------+---------------------+-------------------+



In [28]:
route_passengers = df.groupBy("PULocationID", "DOLocationID").agg(sum("passenger_count").alias("total_passengers")).orderBy(desc("total_passengers"))

route_passengers.show()

+------------+------------+----------------+
|PULocationID|DOLocationID|total_passengers|
+------------+------------+----------------+
|         264|         264|          186705|
|         237|         236|           86737|
|         236|         236|           78691|
|         236|         237|           71752|
|         237|         237|           64499|
|         239|         238|           44479|
|         239|         142|           42383|
|         142|         239|           40365|
|         238|         239|           37758|
|         141|         236|           36201|
|         230|         186|           35109|
|         237|         162|           34384|
|         186|         230|           33942|
|         263|         236|           33661|
|         161|         237|           33641|
|         237|         161|           33593|
|         239|         239|           32184|
|          48|          48|           31495|
|          79|          79|           31263|
|         

In [29]:
from datetime import timedelta

latest_time = df.agg(max("tpep_pickup_datetime").alias("max_time")).collect()[0]["max_time"]

time_window_10s = latest_time - timedelta(seconds=10)
time_window_5s = latest_time - timedelta(seconds=5)

recent_pickups_10s = df.filter(col("tpep_pickup_datetime") >= time_window_10s) \
                        .groupBy("PULocationID") \
                        .agg(
                            sum("passenger_count").alias("passengers_last_10s")
                        ).orderBy(desc("passengers_last_10s"))

recent_pickups_5s = df.filter(col("tpep_pickup_datetime") >= time_window_5s) \
                        .groupBy("PULocationID") \
                        .agg(
                            sum("passenger_count").alias("passengers_last_5s")
                        ).orderBy(desc("passengers_last_5s"))

In [30]:
recent_pickups_10s.show()

+------------+-------------------+
|PULocationID|passengers_last_10s|
+------------+-------------------+
|          48|                  2|
+------------+-------------------+



In [31]:
recent_pickups_5s.show()

+------------+------------------+
|PULocationID|passengers_last_5s|
+------------+------------------+
|          48|                 2|
+------------+------------------+

