## AUTHORS : BEN MADANI YAZID, LIYANG FAANG, MOHAMMED SAMEER

In [2]:
# Importing required functions using %run
%run src/data_cleaning.py
%run src/trip_analysis.py
%run src/tip_analysis.py
%run src/fare_analysis.py
%run src/traffic_analysis.py
%run src/demand_prediction.py

In [3]:
import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression


In [4]:
# Function to set environment variables
def set_environment_variables():
    os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk-11'
    os.environ['SPARK_HOME'] = r'C:\spark'
    os.environ['PYSPARK_PYTHON'] = r'C:\Users\yazid\.conda\envs\spark_project\python.exe'
    os.environ['PYSPARK_DRIVER_PYTHON'] = r'C:\Users\yazid\.conda\envs\spark_project\python.exe'
    print("Environment variables set successfully")

# Function to delete environment variables
def delete_environment_variables():
    os.environ.pop('JAVA_HOME', None)
    os.environ.pop('SPARK_HOME', None)
    os.environ.pop('PYSPARK_PYTHON', None)
    os.environ.pop('PYSPARK_DRIVER_PYTHON', None)
    print("Environment variables deleted successfully")

# Set environment variables
set_environment_variables()


Environment variables set successfully


In [5]:
# Initialize Spark session with increased memory settings
try:
    spark = SparkSession.builder \
        .appName('NYC_taxi_analysis') \
        .config("spark.driver.memory", "8g") \
        .config("spark.executor.memory", "8g") \
        .config("spark.memory.offHeap.enabled", "true") \
        .config("spark.memory.offHeap.size", "8g") \
        .getOrCreate()
    print("Spark Session created successfully")

    folder_path = r'C:\Users\yazid\Desktop\spark_assigment\NYC'
    dataframes = []

    for filename in os.listdir(folder_path):
        if filename.endswith('.parquet'):
            file_path = os.path.join(folder_path, filename)
            print(f"Reading {file_path}")
            try:
                df = spark.read.parquet(file_path)
                dataframes.append(df)
            except Exception as e:
                print(f"Error reading {file_path}: {e}")

    if dataframes:
        combined_df = dataframes[0]
        for df in dataframes[1:]:
            combined_df = combined_df.unionByName(df, allowMissingColumns=True)

        combined_df = combined_df.dropDuplicates()
        combined_df.show(5)
        combined_df.printSchema()
        print(f"Total number of rows after removing duplicates: {combined_df.count()}")

        combined_df.write.parquet(r"C:\Users\yazid\Desktop\spark_assigment\combined_nyc_taxi_2021.parquet")
    else:
        print("No Parquet files found.")
except Exception as e:
    print(f"Error creating Spark Session: {e}")



Spark Session created successfully
Reading C:\Users\yazid\Desktop\spark_assigment\NYC\1.parquet
Reading C:\Users\yazid\Desktop\spark_assigment\NYC\10.parquet
Reading C:\Users\yazid\Desktop\spark_assigment\NYC\11.parquet
Reading C:\Users\yazid\Desktop\spark_assigment\NYC\12.parquet
Reading C:\Users\yazid\Desktop\spark_assigment\NYC\2.parquet
Reading C:\Users\yazid\Desktop\spark_assigment\NYC\3.parquet
Reading C:\Users\yazid\Desktop\spark_assigment\NYC\4.parquet
Reading C:\Users\yazid\Desktop\spark_assigment\NYC\5.parquet
Reading C:\Users\yazid\Desktop\spark_assigment\NYC\6.parquet
Reading C:\Users\yazid\Desktop\spark_assigment\NYC\7.parquet
Reading C:\Users\yazid\Desktop\spark_assigment\NYC\8.parquet
Reading C:\Users\yazid\Desktop\spark_assigment\NYC\9.parquet
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------

## Handling Missing values

In [6]:
# Handle missing values
calculate_nan_percentage(combined_df)
combined_df = handle_missing_values(combined_df)


-RECORD 0-------------------------------------
 VendorID              | 0.0                  
 tpep_pickup_datetime  | 0.0                  
 tpep_dropoff_datetime | 0.0                  
 passenger_count       | 0.047847536337005184 
 trip_distance         | 0.0                  
 RatecodeID            | 0.047847536337005184 
 store_and_fwd_flag    | 0.047847536337005184 
 PULocationID          | 0.0                  
 DOLocationID          | 0.0                  
 payment_type          | 0.0                  
 fare_amount           | 0.0                  
 extra                 | 0.0                  
 mta_tax               | 0.0                  
 tip_amount            | 0.0                  
 tolls_amount          | 0.0                  
 improvement_surcharge | 0.0                  
 total_amount          | 0.0                  
 congestion_surcharge  | 0.047847536337005184 
 airport_fee           | 0.18254471188935859  



In [7]:
combined_df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2021-10-01 00:50:24|  2021-10-01 01:02:11|            1.0|         4.05|       1.0|                 N|          24|          50|           4|      -14.5| -0.5|   -0.5|       0.

## Calling Function to RUN Jobs

In [8]:
#  trip analysis job
analyze_trip(combined_df)


+----+------------------+------------------+
|hour|avg(trip_distance)|avg(trip_duration)|
+----+------------------+------------------+
|0   |5.697302468454763 |15.730267774016255|
|1   |5.339389276880548 |15.850162623066096|
|2   |5.281638347063696 |14.82795115797    |
|3   |8.981665757846745 |15.549170610945112|
|4   |45.895842964455724|16.76969686715944 |
|5   |43.51459739253192 |16.754208948062846|
|6   |28.825264044061754|16.470007003437665|
|7   |14.787098091862468|16.430668381762306|
|8   |12.428777984690617|15.92411902626665 |
|9   |8.48698598152477  |15.937159524944068|
|10  |6.890357112173898 |15.968707707679075|
|11  |7.591890247088315 |15.990086114978126|
|12  |5.918490297216475 |16.550210286584242|
|13  |6.369747729357942 |16.92855739941454 |
|14  |5.343977336571386 |18.008710551099977|
|15  |5.5549453596293095|18.798311988582775|
|16  |5.726637253255657 |18.979807123045916|
|17  |4.738831453140698 |17.867833479002556|
|18  |3.8434327806086697|16.217714356261908|
|19  |4.47

In [9]:
# Perform tip analysis
analyze_tips(combined_df)


+------------+-------------------+
|PULocationID|avg(tip_percentage)|
+------------+-------------------+
|252         |1989.087674627638  |
|1           |859.4629608102979  |
|130         |122.02896363647231 |
|62          |99.15102255958752  |
|176         |97.12963522296104  |
|265         |83.61822178057028  |
|251         |80.10595263395518  |
|214         |77.50651795756703  |
|243         |52.63534475327919  |
|92          |47.99232259803468  |
+------------+-------------------+
only showing top 10 rows

Correlation between PULocationID and avg(tip_percentage): 0.05990458014090846
Correlation between trip distance and tip amount: 0.0016487551576155291
+----+------------------+
|hour|avg(tip_amount)   |
+----+------------------+
|0   |2.583079471352376 |
|1   |2.4375389658809925|
|2   |2.3408283832060093|
|3   |2.3831183065413164|
|4   |2.6947065546774502|
|5   |2.878895230713873 |
|6   |2.4898286533567093|
|7   |2.295978216787362 |
|8   |2.250740438488405 |
|9   |2.17063126231812

certain locations tend to tip more than others. The highest average tip percentages are observed in specific locations like PULocationID 252 and 1, suggesting significant variations in tipping behavior based on pickup location.

by applying pearson test : The correlation coefficient between trip distance and tip amount is approximately 0.0016. This indicates a very weak correlation, implying that the distance of the trip does not significantly influence the tip amount. also there is low Correlation between PULocationID and avg(tip_percentage): 0.059

In [10]:
# Perform fare analysis
analyze_fares(combined_df)


+------------+------------+------------------+
|PULocationID|DOLocationID|avg(fare_amount)  |
+------------+------------+------------------+
|154         |28          |1164.0            |
|234         |189         |843.4665424430642 |
|1           |247         |420.0             |
|83          |136         |378.5             |
|5           |74          |306.0             |
|54          |265         |275.5             |
|29          |264         |213.75227272727273|
|2           |265         |200.25            |
|6           |265         |192.25            |
|123         |265         |177.35            |
|235         |115         |170.0             |
|253         |208         |160.0             |
|221         |265         |160.0             |
|112         |109         |155.0             |
|204         |265         |152.375           |
|44          |138         |151.5             |
|118         |265         |151.25            |
|55          |1           |150.0             |
|10          

* The average fare between various pickup and dropoff locations varies significantly, with some routes, like from 192 to 44, having notably high fares.
* Average fares for different passenger counts show little variation, though larger groups tend to have higher fares.
* The correlation between fare amount and trip distance is very weak (0.000873), indicating other factors like traffic or fixed pricing zones play a significant role in fare determination.
* Further analysis is recommended for high-fare routes and additional factors affecting fares.

In [11]:
analyze_traffic(combined_df)

+-------------------+------------------+
|trip_duration_hours|     avg_speed_mph|
+-------------------+------------------+
| 0.1963888888888889| 20.62234794908062|
|              0.075|19.333333333333332|
|0.22527777777777777|27.166461159062887|
|0.05694444444444444|23.356097560975613|
|0.12361111111111112|11.892134831460673|
+-------------------+------------------+
only showing top 5 rows

+------------+------------+------------------+
|PULocationID|DOLocationID|avg_speed         |
+------------+------------+------------------+
|53          |70          |363264.56557343114|
|131         |185         |273688.16         |
|206         |206         |200809.50217849136|
|26          |56          |185347.1707317073 |
|128         |205         |161908.14193548384|
|159         |228         |143596.55666307133|
|127         |182         |136132.36834431795|
|240         |18          |127313.17426499365|
|212         |62          |124956.90172891079|
|59          |18          |112381.24201331

In [16]:
combined_df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2021-10-01 00:50:24|  2021-10-01 01:02:11|            1.0|         4.05|       1.0|                 N|          24|          50|           4|      -14.5| -0.5|   -0.5|       0.

In [13]:
# Perform demand prediction
#predictions = demand_prediction(combined_df)


we did'nt run the model on the entire dataset(+30m rows) cuz it take a lot of time +10 min and didnt finish, so we decide to do sampling
we took a sample of 1% of the data set and run the model on it

In [14]:
# Take a 1% sample of the data for demand prediction
sampled_df = combined_df.sample(fraction=0.01)


In [15]:
demand_prediction(sampled_df)

R2: 0.38202320428784275, RMSE: 15.816717750828882
+---------------+-----+------------------+
|       features|label|        prediction|
+---------------+-----+------------------+
|[23.0,4.0,12.0]|    1| 77.97679508046733|
|  [0.0,5.0,1.0]|    1|13.925180822234791|
|  [0.0,6.0,1.0]|    1|15.187972547831363|
|  [0.0,6.0,1.0]|    2|15.187972547831363|
|  [0.0,6.0,1.0]|    3|15.187972547831363|
|  [0.0,6.0,1.0]|    4|15.187972547831363|
|  [0.0,6.0,1.0]|    5|15.187972547831363|
|  [0.0,6.0,1.0]|    6|15.187972547831363|
|  [0.0,6.0,1.0]|    7|15.187972547831363|
|  [0.0,6.0,1.0]|    8|15.187972547831363|
+---------------+-----+------------------+
only showing top 10 rows



DataFrame[features: vector, label: bigint, prediction: double]