In [35]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

from pyspark.sql import Row
from pyspark.sql.functions import hour, dayofweek, month, col
from datetime import timedelta

# Start spark session

In [5]:
spark = SparkSession.builder.appName("NYC_Taxi_Analysis").getOrCreate()

# Load Data

In [25]:
cwd = os.getcwd()

# For convenience, I save the data in the NYC folder which is in the same directory as this notebook
parquet_file_path = os.path.join(cwd, "NYC", "*.parquet")
taxi_df = spark.read.parquet(parquet_file_path)

# Trip Analysis

In [26]:
def trip_analysis(taxi_df = taxi_df):
    # cwd = os.getcwd()

    # # For convenience, I save the data in the NYC folder which is in the same directory as this notebook
    # parquet_file_path = os.path.join(cwd, "NYC", "*.parquet")
    # taxi_df = spark.read.parquet(parquet_file_path)
    
    # Calculate Duration and Distance:
    # Create a new column for trip duration and calculate it using the difference between pickup and dropoff times. Also,calculate the average distance for each record:
    taxi_df = taxi_df.withColumn("trip_duration", F.unix_timestamp("tpep_dropoff_datetime") - F.unix_timestamp("tpep_pickup_datetime"))
    taxi_df = taxi_df.withColumn("avg_distance", (F.col("trip_distance") / F.col("passenger_count")))
    
    # Extract Time of Day, Day of Week, and Month of Year:
    taxi_df = taxi_df.withColumn("pickup_hour", F.hour("tpep_pickup_datetime"))
    taxi_df = taxi_df.withColumn("pickup_day_of_week", F.dayofweek("tpep_pickup_datetime"))
    taxi_df = taxi_df.withColumn("pickup_month", F.month("tpep_pickup_datetime"))
    
    # Group the data by time of day, day of week, and month of year, and calculate the average duration and distance for each group:
    agg_df = taxi_df.groupBy("pickup_hour", "pickup_day_of_week", "pickup_month").agg(F.avg("trip_duration").alias("avg_duration"),F.avg("avg_distance").alias("avg_distance")).orderBy("pickup_hour", "pickup_day_of_week", "pickup_month")
    
    # Group and Count Pickup Locations:
    pickup_locations = taxi_df.groupBy("PULocationID").count().orderBy(F.desc("count"))
    top_pickup_locations = pickup_locations.limit(10)
    dropoff_locations = taxi_df.groupBy("DOLocationID").count().orderBy(F.desc("count"))
    top_dropoff_locations = dropoff_locations.limit(10)
    
    agg_df.show()
    print("Top 10 Pickup Locations:")
    top_pickup_locations.show()
    print("Top 10 Dropoff Locations:")
    top_dropoff_locations.show()
    return taxi_df

In [27]:
taxi_df = trip_analysis()

                                                                                

+-----------+------------------+------------+------------------+------------------+
|pickup_hour|pickup_day_of_week|pickup_month|      avg_duration|      avg_distance|
+-----------+------------------+------------+------------------+------------------+
|          0|                 1|           1| 888.9417608770127|3.2521391261294736|
|          0|                 1|           2| 726.9726522187823|  2.89541996285979|
|          0|                 1|           3| 888.4640534063677|2.8471578638497625|
|          0|                 1|           4| 854.6067429406037|2.9782131248083417|
|          0|                 1|           5| 883.9497659700705|2.8323654661521616|
|          0|                 1|           6| 954.6639178045153| 2.721734596752614|
|          0|                 1|           7|1011.6342042755344| 2.835302943232113|
|          0|                 1|           8| 997.7019489609131|2.8669667749237213|
|          0|                 1|           9|1057.5752172184677|2.5259332031

# Tip analysis

In [28]:
def tip_analysis(taxi_df=taxi_df):
    
    # Calculate the tip percentage for each trip. This involves dividing the tip amount by the total fare amount
    tip_analysis_df = taxi_df.withColumn("tip_percentage", F.col("tip_amount") / F.col("total_amount") * 100)
    tip_analysis_df = tip_analysis_df.withColumn("tip_percentage", F.when(F.col("tip_percentage") <= 100, F.col("tip_percentage")).otherwise(0))  # Handle possible outliers

    # Group by pickup and dropoff locations and calculate average tip percentage and average distance
    tip_location_analysis = tip_analysis_df.groupBy("PULocationID", "DOLocationID").agg(F.avg("tip_percentage").alias("avg_tip_percentage"), F.avg("trip_distance").alias("avg_distance")).orderBy(F.desc("avg_tip_percentage"))
    # Calculate the average tip percentage and total tip amount for each time interval
    tip_time_analysis = tip_analysis_df.groupBy("pickup_hour", "pickup_day_of_week", "pickup_month").agg(F.avg("tip_percentage").alias("avg_tip_percentage"), F.sum("tip_amount").alias("total_tip_amount")).orderBy("pickup_hour", "pickup_day_of_week", "pickup_month")

    # Group the data by payment type and calculate statistics such as the average tip percentage, average tip amount, and total tip amount for each payment type
    payment_tip_analysis = tip_analysis_df.groupBy("payment_type").agg(F.avg("tip_percentage").alias("avg_tip_percentage"), F.avg("tip_amount").alias("avg_tip_amount"), F.sum("tip_amount").alias("total_tip_amount"))
    
    print("Tip Analysis by Location:")
    tip_location_analysis.show()
    print("Tip Analysis by Time:")
    tip_time_analysis.show()
    print("Payment and Tip Analysis:")
    payment_tip_analysis.show()

In [29]:
tip_analysis()

Tip Analysis by Location:


                                                                                

+------------+------------+------------------+-----------------+
|PULocationID|DOLocationID|avg_tip_percentage|     avg_distance|
+------------+------------+------------------+-----------------+
|         187|         251|56.179775280898866|             1.54|
|         176|         176|          53.90625|             0.32|
|          96|         236| 48.85197850512946|            11.31|
|         109|         172|46.948356807511736|             2.09|
|         251|         161|46.200737170399776|            19.58|
|         120|         151| 43.01075268817204|             0.73|
|         118|         214|  41.3564929693962|             4.16|
|         172|         214| 39.96670910603205|3.293333333333333|
|         208|         114| 38.55192080359299|            8.975|
|          34|           1| 38.41764929631039|            15.11|
|          82|         253| 38.28483920367534|             2.48|
|         112|         214| 37.67972235994051|            16.03|
|          96|         17

                                                                                

+-----------+------------------+------------+------------------+------------------+
|pickup_hour|pickup_day_of_week|pickup_month|avg_tip_percentage|  total_tip_amount|
+-----------+------------------+------------+------------------+------------------+
|          0|                 1|           1|10.509465545793379| 6285.209999999994|
|          0|                 1|           2|10.960206819588734|  8032.64000000002|
|          0|                 1|           3|11.105551868685199|12657.859999999993|
|          0|                 1|           4|11.314341113424645|18623.729999999974|
|          0|                 1|           5|12.117369325323516|36288.720000000074|
|          0|                 1|           6| 12.19923946046261| 36665.33000000003|
|          0|                 1|           7|11.893981566402744| 34842.08999999987|
|          0|                 1|           8|12.147081300758224|47726.230000000294|
|          0|                 1|           9|12.178405026405924|43128.000000

# Fare Analysis:

In [30]:
def fare_analysis(taxi_df = taxi_df):
    # Calculate the average fare by pickup and dropoff location
    fare_location_analysis = taxi_df.groupBy("PULocationID", "DOLocationID").agg(F.avg("fare_amount").alias("avg_fare")).orderBy(F.desc("avg_fare"))
    # Calculate the average fare for different passenger counts to analyze the correlation between passenger count and fare amount
    fare_passenger_analysis = taxi_df.groupBy("passenger_count").agg(F.avg("fare_amount").alias("avg_fare")).orderBy("passenger_count")
    # Calculate the Pearson correlation coefficient to investigate the correlation between fare amount and trip distance
    fare_distance_correlation = taxi_df.select(F.corr("fare_amount", "trip_distance").alias("correlation")).collect()[0]["correlation"]
    
    print("Average Fare by Pickup & Drop Location:")
    fare_location_analysis.show()
    
    print("Average Fare by Passenger Count:")
    fare_passenger_analysis.show()
    
    print("Correlation between Fare Amount and Trip Distance:")
    print("Correlation coefficient:", fare_distance_correlation)

fare_analysis()

                                                                                

Average Fare by Pickup & Drop Location:


                                                                                

+------------+------------+------------------+
|PULocationID|DOLocationID|          avg_fare|
+------------+------------+------------------+
|         154|          28|            1164.0|
|         234|         189| 843.4665424430641|
|           1|         247|             420.0|
|          83|         136|             378.5|
|           5|          74|             306.0|
|          54|         265|             275.5|
|          29|         264|213.75227272727273|
|           2|         265|            200.25|
|           6|         265|            192.25|
|         123|         265|            177.35|
|         235|         115|             170.0|
|         221|         265|             160.0|
|         253|         208|             160.0|
|         112|         109|             155.0|
|         204|         265|           152.375|
|          44|         138|             151.5|
|         118|         265|            151.25|
|          55|           1|             150.0|
|          10

                                                                                

# Traffic Analysis

In [31]:
def trafic_analysis(taxi_df = taxi_df):
    
    # Calculate the speed per hour
    trip_speed_df = taxi_df.withColumn("trip_speed", (F.col("trip_distance") / (F.col("trip_duration") / 3600)))
    # Group by Trip Time Intervals:
    trip_time_speed_analysis = trip_speed_df.groupBy("PULocationID", "DOLocationID", "pickup_hour", "pickup_day_of_week", "pickup_month").agg(F.avg("trip_speed").alias("avg_trip_speed")).orderBy("PULocationID", "DOLocationID", "pickup_hour", "pickup_day_of_week", "pickup_month")
    
    trip_time_speed_analysis.show()
    

trafic_analysis()

23/08/05 11:41:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/08/05 11:41:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/08/05 11:41:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/08/05 11:41:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/08/05 11:41:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/08/05 11:41:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/08/05 11:41:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/08/05 11:41:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/08/05 11:41:10 WARN RowBasedKeyValueBatch: Calling spill() on



23/08/05 11:41:11 WARN TaskMemoryManager: Failed to allocate a page (8388608 bytes), try again.
23/08/05 11:41:12 WARN TaskMemoryManager: Failed to allocate a page (8388608 bytes), try again.
23/08/05 11:41:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
[Stage 91:>                                                         (0 + 8) / 9]

+------------+------------+-----------+------------------+------------+-----------------+
|PULocationID|DOLocationID|pickup_hour|pickup_day_of_week|pickup_month|   avg_trip_speed|
+------------+------------+-----------+------------------+------------+-----------------+
|           1|           1|          0|                 1|           4|              0.0|
|           1|           1|          0|                 2|           3|              0.0|
|           1|           1|          0|                 2|          11|              0.0|
|           1|           1|          0|                 4|           6|              0.0|
|           1|           1|          0|                 7|           5|2438.181818181818|
|           1|           1|          1|                 1|           6|              0.0|
|           1|           1|          1|                 1|          11|              0.0|
|           1|           1|          1|                 2|           3|              0.0|
|         

                                                                                

# Demand Analysis

In [33]:
def demand_analysis(taxi_df = taxi_df):
    
    # Create features from the date and time of pickups
    demand_df = taxi_df.withColumn("pickup_hour", F.hour("tpep_pickup_datetime"))
    demand_df = demand_df.withColumn("pickup_day_of_week", F.dayofweek("tpep_pickup_datetime"))
    demand_df = demand_df.withColumn("pickup_month", F.month("tpep_pickup_datetime"))

    # Regression Model (Linear Regression):
    feature_columns = ["pickup_hour", "pickup_day_of_week", "pickup_month"]
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    demand_df = assembler.transform(demand_df)
    
    feature_columns = ["pickup_hour", "pickup_day_of_week", "pickup_month"]
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="input_features")
    demand_df = assembler.transform(demand_df)
    
    # Aggregate data for regression
    regression_df = demand_df.groupBy("pickup_hour").agg(F.sum("passenger_count").alias("total_pickups"),F.first("input_features").alias("features"))
    
    lr = LinearRegression(featuresCol="features", labelCol="total_pickups")
    lr_model = lr.fit(regression_df)
    
    return lr_model

lr_model = demand_analysis()

23/08/05 11:44:57 WARN Instrumentation: [ece564a5] regParam is zero, which might cause numerical instability and overfitting.
23/08/05 11:44:58 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/08/05 11:44:58 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/08/05 11:44:58 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

- Making prediction

In [36]:
def prediction_hour(lr_model = lr_model, taxi_df = taxi_df):
    
    # Get the last record in the Parquet data
    last_record = taxi_df.orderBy(F.desc("tpep_pickup_datetime")).limit(1).collect()[0]

    # Extract relevant information from the last record
    last_pickup_datetime = last_record["tpep_pickup_datetime"]
    next_pickup_datetime = last_pickup_datetime + timedelta(hours=1)

    # Extract features for the next hour
    next_hour = next_pickup_datetime.hour
    next_day_of_week = next_pickup_datetime.weekday()
    next_month = next_pickup_datetime.month

    # Create a DataFrame for the next hour
    next_pickup_row_hour = Row(pickup_hour=next_hour, pickup_day_of_week=next_day_of_week, pickup_month=next_month)
    next_pickup_df_hour = spark.createDataFrame([next_pickup_row_hour])

    # Convert columns to feature vector using VectorAssembler
    assembler = VectorAssembler(inputCols=["pickup_hour", "pickup_day_of_week", "pickup_month"], outputCol="features")

    # Transform the features and make predictions for the next hour
    next_pickup_df_hour = assembler.transform(next_pickup_df_hour)
    predictions_hour = lr_model.transform(next_pickup_df_hour)
    predicted_pickups_hour = predictions_hour.select("prediction").collect()[0]["prediction"]

    print("Predicted Pickups for Next Hour:", predicted_pickups_hour)

prediction_hour()

                                                                                

Predicted Pickups for Next Hour: 1341885.7375189392


In [39]:
def prediction_day(lr_model=lr_model, taxi_df=taxi_df):
    
    # Get the last record in the Parquet data
    last_record = taxi_df.orderBy(F.desc("tpep_pickup_datetime")).limit(1).collect()[0]

    # Extract relevant information from the last record
    last_pickup_datetime = last_record["tpep_pickup_datetime"]
    next_pickup_datetime = last_pickup_datetime + timedelta(hours=1)
    
    # Calculate the next day and month
    next_day_datetime = last_pickup_datetime + timedelta(days=1)
    next_day_hour = next_day_datetime.hour
    next_day_of_week = next_day_datetime.weekday()
    next_month = next_day_datetime.month

    # Create a DataFrame for the next day
    next_pickup_row_day = Row(pickup_hour=next_day_hour, pickup_day_of_week=next_day_of_week, pickup_month=next_month)
    next_pickup_df_day = spark.createDataFrame([next_pickup_row_day])

    # Convert columns to feature vector using VectorAssembler
    assembler = VectorAssembler(inputCols=["pickup_hour", "pickup_day_of_week", "pickup_month"], outputCol="features")

    # Convert columns to feature vector using VectorAssembler
    next_pickup_df_day = assembler.transform(next_pickup_df_day)

    # Make predictions for the next day
    predictions_day = lr_model.transform(next_pickup_df_day)
    predicted_pickups_day = predictions_day.select("prediction").collect()[0]["prediction"]

    print("Predicted Pickups for Next Day:", predicted_pickups_day)
    
prediction_day()



Predicted Pickups for Next Day: 1461753.3968590284


                                                                                

In [40]:
def prediction_month(lr_model=lr_model, taxi_df=taxi_df):
        
    # Get the last record in the Parquet data
    last_record = taxi_df.orderBy(F.desc("tpep_pickup_datetime")).limit(1).collect()[0]

    # Extract relevant information from the last record
    last_pickup_datetime = last_record["tpep_pickup_datetime"]
    next_pickup_datetime = last_pickup_datetime + timedelta(hours=1)
    
    # Calculate the next month
    next_month_datetime = last_pickup_datetime + timedelta(days=30)  # Assuming 30 days in a month
    next_month_hour = next_month_datetime.hour
    next_day_of_week = next_month_datetime.weekday()
    next_month = next_month_datetime.month

    # Create a DataFrame for the next month
    next_pickup_row_month = Row(pickup_hour=next_month_hour, pickup_day_of_week=next_day_of_week, pickup_month=next_month)
    next_pickup_df_month = spark.createDataFrame([next_pickup_row_month])

    # Convert columns to feature vector using VectorAssembler
    assembler = VectorAssembler(inputCols=["pickup_hour", "pickup_day_of_week", "pickup_month"], outputCol="features")

    # Convert columns to feature vector using VectorAssembler
    next_pickup_df_month = assembler.transform(next_pickup_df_month)

    # Make predictions for the next month
    predictions_month = lr_model.transform(next_pickup_df_month)
    predicted_pickups_month = predictions_month.select("prediction").collect()[0]["prediction"]

    print("Predicted Pickups for Next Month:", predicted_pickups_month)

prediction_month()



Predicted Pickups for Next Month: 1198863.0639412608


                                                                                