In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [2]:
spark = SparkSession.builder.appName("NYC Yellow Taxi Analysis").getOrCreate()

# Read files to data

In [3]:
# Load the dataset in .parquet format
import os

path = 'raw_data'
parquet_file_name = os.listdir(path)

raw_data = []

for file_name in parquet_file_name:
    parquet_path = os.path.join(path, file_name)
    raw_data.append(spark.read.parquet(parquet_path))

zone_table_path = 'taxi_zone_lookup.csv'
zone_df = spark.read.option('header', 'true').csv(zone_table_path)

In [4]:
# change the datatype for January data to make sure the consistent
from pyspark.sql.types import IntegerType, DoubleType, LongType

raw_data[0] = raw_data[0].withColumn('VendorID', raw_data[0]['VendorID'].cast(IntegerType(IF )))\
                        .withColumn('passenger_count', raw_data[0]['passenger_count'].cast(LongType()))\
                        .withColumn('RatecodeID', raw_data[0]['RatecodeID'].cast(LongType()))\
                        .withColumn('PULocationID', raw_data[0]['PULocationID'].cast(IntegerType()))\
                        .withColumn('DOLocationID', raw_data[0]['DOLocationID'].cast(IntegerType()))

# change the zone to integer
zone_df = zone_df.withColumn('LocationID', zone_df['LocationID'].cast(IntegerType()))

In [5]:
# Print the schema to understand the structure of the dataset
raw_data[0].printSchema()
zone_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |

In [6]:
# union the raw data into one df
df = raw_data[0]

for i in range(1, len(raw_data)):
    df = df.union(raw_data[i])

In [7]:
total_trips = df.count()
print(f"Total trips: {total_trips}")

Total trips: 38310226


# Analysis by Month

In [8]:
# group by month
from pyspark.sql.functions import month

group_by_month = df.groupBy(month(col('tpep_pickup_datetime')).alias('Month'))

In [9]:
# trips per month
from pyspark.sql.functions import count

trips_month = group_by_month.agg(count('*').alias('Trips')).orderBy(col('Month').asc())
trips_month.show()

+-----+-------+
|Month|  Trips|
+-----+-------+
|    1|3066759|
|    2|2914003|
|    3|3403660|
|    4|3288248|
|    5|3513664|
|    6|3307259|
|    7|2907093|
|    8|2824201|
|    9|2846741|
|   10|3522280|
|   11|3339732|
|   12|3376586|
+-----+-------+



In [10]:
# total revenue per month
from pyspark.sql.functions import sum as spark_sum

revenue_month = group_by_month.agg(spark_sum(col('total_amount')).alias('Total Revenue'))\
    .orderBy(col('Month').asc())
revenue_month.show()

+-----+--------------------+
|Month|       Total Revenue|
+-----+--------------------+
|    1| 8.286512795978236E7|
|    2| 7.838160042983432E7|
|    3| 9.463402765971829E7|
|    4| 9.295684310977364E7|
|    5|1.0176602127973004E8|
|    6| 9.613736022974902E7|
|    7| 8.305003029981662E7|
|    8| 8.085130960007511E7|
|    9| 8.478139693005617E7|
|   10|1.0274983841009632E8|
|   11| 9.583627967009455E7|
|   12| 9.637357631009677E7|
+-----+--------------------+



In [14]:
# average income in 2 hours interval each month
from pyspark.sql.functions import hour, when, avg as spark_avg, round as spark_round

time_interval = df.withColumn('hour', hour(col('tpep_pickup_datetime')))

time_interval = time_interval.withColumn('Time Interval',
                    when((col("hour") >= 0) & (col("hour") < 2), "00:00 - 02:00")
                   .when((col("hour") >= 2) & (col("hour") < 4), "02:00 - 04:00")
                   .when((col("hour") >= 4) & (col("hour") < 6), "04:00 - 06:00")
                   .when((col("hour") >= 6) & (col("hour") < 8), "06:00 - 08:00")
                   .when((col("hour") >= 8) & (col("hour") < 10), "08:00 - 10:00")
                   .when((col("hour") >= 10) & (col("hour") < 12), "10:00 - 12:00")
                   .when((col("hour") >= 12) & (col("hour") < 14), "12:00 - 14:00")
                   .when((col("hour") >= 14) & (col("hour") < 16), "14:00 - 16:00")
                   .when((col("hour") >= 16) & (col("hour") < 18), "16:00 - 18:00")
                   .when((col("hour") >= 18) & (col("hour") < 20), "18:00 - 20:00")
                   .when((col("hour") >= 20) & (col("hour") < 22), "20:00 - 22:00")
                   .when((col("hour") >= 22) & (col("hour") < 24), "22:00 - 00:00"))

income_time_interval = time_interval.groupBy(month(col('tpep_pickup_datetime')).alias('Month'),'Time Interval')\
                        .agg(spark_round(spark_avg(col('total_amount')), 2).alias('Avg Income'),\
                        spark_round(spark_avg(col('tip_amount')), 2).alias('Avg Tip'))\
                        .orderBy('Month', 'Time Interval')

income_time_interval.show(n=income_time_interval.count(),truncate=False)

+-----+-------------+----------+-------+
|Month|Time Interval|Avg Income|Avg Tip|
+-----+-------------+----------+-------+
|1    |00:00 - 02:00|27.46     |3.41   |
|1    |02:00 - 04:00|25.08     |3.03   |
|1    |04:00 - 06:00|33.51     |3.69   |
|1    |06:00 - 08:00|27.87     |3.3    |
|1    |08:00 - 10:00|25.17     |3.14   |
|1    |10:00 - 12:00|25.09     |3.09   |
|1    |12:00 - 14:00|25.77     |3.16   |
|1    |14:00 - 16:00|27.43     |3.37   |
|1    |16:00 - 18:00|28.76     |3.58   |
|1    |18:00 - 20:00|26.75     |3.41   |
|1    |20:00 - 22:00|26.95     |3.5    |
|1    |22:00 - 00:00|28.8      |3.66   |
|2    |00:00 - 02:00|26.47     |3.31   |
|2    |02:00 - 04:00|24.38     |2.94   |
|2    |04:00 - 06:00|35.09     |3.97   |
|2    |06:00 - 08:00|28.06     |3.36   |
|2    |08:00 - 10:00|25.2      |3.16   |
|2    |10:00 - 12:00|25.08     |3.11   |
|2    |12:00 - 14:00|26.21     |3.24   |
|2    |14:00 - 16:00|27.41     |3.4    |
|2    |16:00 - 18:00|28.4      |3.57   |
|2    |18:00 - 2

# Borough trips

In [37]:
# Join the two table
# name alias for the dataset first to avoid confusion and ambiguity of rejoin with the same dataset
trips_df = df.alias('trips')
locations_df_alias_pu = zone_df.alias("pu_locations")
locations_df_alias_do = zone_df.alias("do_locations")

trips_with_pu_borough = trips_df.join(locations_df_alias_pu,\
                    trips_df.PULocationID == locations_df_alias_pu.LocationID, how='left')\
                    .select(trips_df['*'],col("pu_locations.Borough").alias('PU Borough')
)

trips_with_boroughs = trips_with_pu_borough.join(locations_df_alias_do,\
                    trips_with_pu_borough.DOLocationID == locations_df_alias_do.LocationID, how='left')\
                    .select(trips_with_pu_borough['*'], col("do_locations.Borough").alias('DO Borough')
)

# get only the trips that has different PU and DO location
corss_borough_trips = trips_with_boroughs.filter(col('PU Borough') != col('DO Borough'))

# count the number of trips and total income for every pair of borough
cross_borough_group = corss_borough_trips.groupBy('PU Borough', 'DO Borough')
result_df = cross_borough_group.agg(count('*').alias('Trips'), \
                                    spark_round(spark_sum(col('total_amount')),2).alias('Total Revenue'),\
                                    spark_round(spark_avg(col('total_amount')),2).alias('Avg Trip Revenue'))

# order rows
result_df = result_df.orderBy('PU Borough', 'DO Borough').orderBy(col('Total Revenue').desc())

result_df.show(n=income_time_interval.count(),truncate=False)

+-------------+-------------+-------+--------------+----------------+
|PU Borough   |DO Borough   |Trips  |Total Revenue |Avg Trip Revenue|
+-------------+-------------+-------+--------------+----------------+
|Queens       |Manhattan    |2125164|1.7057280433E8|80.26           |
|Manhattan    |Queens       |1184488|7.814936782E7 |65.98           |
|Queens       |Brooklyn     |552317 |3.773648694E7 |68.32           |
|Manhattan    |Brooklyn     |800033 |3.44390308E7  |43.05           |
|Manhattan    |EWR          |105643 |1.299695386E7 |123.03          |
|Queens       |N/A          |101888 |1.296407725E7 |127.24          |
|Queens       |Bronx        |79539  |5993290.4     |75.35           |
|Manhattan    |Bronx        |112602 |5074039.86    |45.06           |
|Manhattan    |N/A          |42767  |4483475.32    |104.83          |
|Brooklyn     |Manhattan    |104248 |4130135.78    |39.62           |
|Manhattan    |Unknown      |87450  |2315583.62    |26.48           |
|Unknown      |Manha

In [10]:
# # Save the results to a CSV file
# results_path = "CSVs"
# df.repartition(1).write.option("header", "true").csv(results_path)

In [38]:
# Stop the Spark session
spark.stop()