## Data Loading and Initial Exploration

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark=SparkSession.builder.appName("nyc_taxi_data_2014").getOrCreate()
df=spark.read.csv(r"C:\Users\shuba\Downloads\nyc_taxi_data_2014.csv\nyc_taxi_data_2014.csv",header=True,inferSchema=True)

In [3]:
df_partitioned = df.repartition(4)  # Adjust number of partitions based on data size

# Display the schema
df_partitioned.printSchema()

root
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)



Brief summary of each column in your dataset:

1.vendor_id: Identifies the taxi vendor (company or fleet).
2.pickup_datetime: Timestamp when the taxi trip started.
3.dropoff_datetime: Timestamp when the taxi trip ended.
4.passenger_count: The number of passengers in the taxi.
5.trip_distance: The total distance of the trip in miles.
6.pickup_longitude: Longitude coordinate where the trip started.
7.pickup_latitude: Latitude coordinate where the trip started.
8.rate_code: Code indicating the rate structure used for the fare.
9.store_and_fwd_flag: Indicates if trip data was stored and forwarded.
10.dropoff_longitude: Longitude coordinate where the trip ended.
11.dropoff_latitude: Latitude coordinate where the trip ended.
12.payment_type: The method used to pay for the trip (e.g., cash, credit card).
13.fare_amount: Base fare for the trip before additional charges.
14.surcharge: Extra charges added to the fare (e.g., tolls, taxes).
15.mta_tax: The MTA tax applied to the trip.
16.tip_amount: The tip given for the trip.
17.tolls_amount: The tolls charged during the trip.
18.total_amount: The total cost of the trip including all charges (fare, surcharges, tips, tolls).

In [4]:
df_partitioned.show(10)

+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+---------+------------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|rate_code|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+---------+------------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|      CMT|2014-01-06 13:08:54|2014-01-06 13:11:36|              1|          0.4|      -73.981085|       40.73781|        1|                 N|       -73.985924|       40.736733|         CSH|        4.0|      0.0|    0.5|  

In [5]:
total_rows=df_partitioned.count()
print(f"Total Rows : {total_rows}")

Total Rows : 14999999


## Data Cleaning and Transformation

In [6]:
from pyspark.sql.functions import col, sum

# Count missing values for each column and collect as dictionary
missing_values = df.select(
    [sum((col(column).isNull()).cast("int")).alias(column) for column in df_partitioned.columns]
).collect()[0].asDict()

# Print the missing values count for each column
print("Missing Values Count:")
for column, count in missing_values.items():
    print(f"{column}: {count}")



Missing Values Count:
vendor_id: 0
pickup_datetime: 0
dropoff_datetime: 0
passenger_count: 0
trip_distance: 0
pickup_longitude: 0
pickup_latitude: 0
rate_code: 0
store_and_fwd_flag: 7636077
dropoff_longitude: 145
dropoff_latitude: 145
payment_type: 0
fare_amount: 0
surcharge: 0
mta_tax: 0
tip_amount: 0
tolls_amount: 0
total_amount: 0


In [7]:
# Remove rows with any missing values (NULL or NaN) in any column
df_cleaned = df_partitioned.dropna()

In [8]:
# Count missing values for each column and collect as dictionary
missing_values = df_cleaned.select(
    [sum((col(column).isNull()).cast("int")).alias(column) for column in df_partitioned.columns]
).collect()[0].asDict()

# Print the missing values count for each column
print("Missing Values Count:")
for column, count in missing_values.items():
    print(f"{column}: {count}")



Missing Values Count:
vendor_id: 0
pickup_datetime: 0
dropoff_datetime: 0
passenger_count: 0
trip_distance: 0
pickup_longitude: 0
pickup_latitude: 0
rate_code: 0
store_and_fwd_flag: 0
dropoff_longitude: 0
dropoff_latitude: 0
payment_type: 0
fare_amount: 0
surcharge: 0
mta_tax: 0
tip_amount: 0
tolls_amount: 0
total_amount: 0


In [9]:
# Get unique values in the 'store_and_fwd_flag' column
df_cleaned.select('store_and_fwd_flag').distinct().show()

# Get unique values in the 'payment_type' column
df_cleaned.select('payment_type').distinct().show()

+------------------+
|store_and_fwd_flag|
+------------------+
|                 Y|
|                 N|
+------------------+

+------------+
|payment_type|
+------------+
|         CSH|
|         DIS|
|         CRD|
|         NOC|
+------------+



In [10]:
from pyspark.sql.functions import when

# Replace 'Y' with 'yes' and 'N' with 'no' in the 'store_and_fwd_flag' column
df_cleaned = df_cleaned.withColumn(
    'store_and_fwd_flag', 
    when(df_cleaned['store_and_fwd_flag'] == 'Y', 'yes')
    .otherwise(when(df_cleaned['store_and_fwd_flag'] == 'N', 'no'))
)

# Show the updated column values
df_cleaned.select('store_and_fwd_flag').distinct().show()


+------------------+
|store_and_fwd_flag|
+------------------+
|                no|
|               yes|
+------------------+



In [11]:
from pyspark.sql import functions as F

# Replace values in the 'payment_type' column
df_cleaned = df_cleaned.withColumn(
    "payment_type",
    F.when(df_cleaned["payment_type"] == "CSH", "Cash")
    .when(df_cleaned["payment_type"] == "DIS", "Discount")
    .when(df_cleaned["payment_type"] == "CRD", "Credit")
    .when(df_cleaned["payment_type"] == "NOC", "No Charge")
    .otherwise(df_cleaned["payment_type"])  # Keep original value if not one of the above
)

# Show the updated values to verify
df_cleaned.select("payment_type").distinct().show()


+------------+
|payment_type|
+------------+
|   No Charge|
|      Credit|
|    Discount|
|        Cash|
+------------+



In [12]:
# Count duplicate records
duplicate_count = df_cleaned.count() - df_cleaned.distinct().count()

In [13]:
if duplicate_count > 0:
    # Remove duplicate records
    df_cleaned = df_cleaned.dropDuplicates()
    print(f"Duplicate records handled: {duplicate_count} duplicates removed.")
else:
    df_cleaned = df_cleaned
    print("No duplicate records found. No rows removed.")

No duplicate records found. No rows removed.


In [14]:
#  Filtering Out Invalid Rows
df_cleaned = df_cleaned.filter(df_cleaned['trip_distance'] > 0)
df_cleaned = df_cleaned.filter(df_cleaned['fare_amount'] > 0)

In [15]:
# Step 7: Split the Data into Training and Testing Sets
train_set, test_set = df_cleaned.randomSplit([0.7, 0.3], seed=101)

In [16]:
train_set.show()

+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+---------+------------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|rate_code|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+---------+------------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|      CMT|2014-01-01 00:00:06|2014-01-01 00:10:11|              1|          6.0|      -73.870885|      40.773728|        1|                no|       -73.829123|       40.709735|        Cash|       18.0|      0.5|    0.5|  

In [17]:
#Normalization

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

# Assemble the features
assembler = VectorAssembler(inputCols=["trip_distance", "fare_amount", "pickup_longitude", "pickup_latitude",
                                       "dropoff_longitude", "dropoff_latitude", "surcharge", "mta_tax", 
                                       "tip_amount", "tolls_amount", "total_amount"], outputCol="features")
df_assembled = assembler.transform(df_cleaned)

# Normalize using Min-Max Scaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(df_assembled)
df_scaled = scaler_model.transform(df_assembled)

## Data Analysis Using Spark SQL

In [18]:
df_cleaned.createOrReplaceTempView("taxi_data")

In [19]:
# 3.1 Aggregation: Summary statistics (mean, median, standard deviation)
summary = spark.sql("""
    SELECT 
        AVG(fare_amount) AS avg_fare,
        MEDIAN(fare_amount) AS median_fare,
        STDDEV(fare_amount) AS stddev_fare
    FROM taxi_data
""")
summary.show()


+-----------------+-----------+-----------------+
|         avg_fare|median_fare|      stddev_fare|
+-----------------+-----------+-----------------+
|11.84016095530172|        9.0|9.532408714536784|
+-----------------+-----------+-----------------+



The table provides summary statistics for the fare_amount column in your dataset:

Average Fare (avg_fare): The mean fare amount is approximately 11.84. This is the average value of all the fares in your dataset.

Median Fare (median_fare): The median fare amount is 9.00. This means that half of the fares are below $9.00, and half are above. It's a measure of central tendency that is less sensitive to outliers compared to the average.

Standard Deviation of Fare (stddev_fare): The standard deviation is approximately 9.53. This indicates the spread or variability in the fare amounts. A higher value suggests that the fares vary widely from the mean.

Interpretation:
The average fare is higher than the median fare, suggesting that there may be some higher fare outliers pulling the average up.
The high standard deviation indicates that the fare amounts vary significantly, meaning there are both low and high fare trips in the data.

In [20]:
# 3.2 Grouping and filtering: Group by 'payment_type' and calculate the total fare amount per payment type
payment_grouped = spark.sql("""
    SELECT payment_type, SUM(fare_amount) AS total_fare
    FROM taxi_data
    GROUP BY payment_type
    ORDER BY total_fare DESC
""")
payment_grouped.show()


+------------+--------------------+
|payment_type|          total_fare|
+------------+--------------------+
|      Credit| 5.354956746000001E7|
|        Cash|3.2626811770000003E7|
|   No Charge|           302186.05|
|    Discount|           113966.42|
+------------+--------------------+



This table shows the total fare amount (total_fare) for each payment_type in your dataset:

Credit: The total fare amount for credit card payments is approximately 53.55 million. This suggests that the majority of transactions are paid using credit cards.

Cash: The total fare amount for cash payments is about 32.63 million. While cash payments are still significant, they are less than credit card payments.

No Charge: The total fare amount for no-charge transactions is 302,186.05. These transactions likely represent rides that were not charged, such as free rides or zero-fare promotions.

Discount: The total fare amount for discounted transactions is 113,966.42. This is the smallest amount, indicating fewer discounted rides compared to the other payment types.

Interpretation:
Credit is the dominant payment method, with the highest total fare amount.
Cash also makes up a significant portion of the total fare, but much less than credit.
No Charge and Discount payments are relatively minor, which may indicate special cases, such as promotional rides or errors.

In [21]:
# 3.3 Time-based analysis: Group by day of the week and average fare amount
time_based_analysis = spark.sql("""
    SELECT DAYOFWEEK(pickup_datetime) AS day_of_week, AVG(fare_amount) AS avg_fare
    FROM taxi_data
    GROUP BY day_of_week
    ORDER BY day_of_week
""")
time_based_analysis.show()


+-----------+------------------+
|day_of_week|          avg_fare|
+-----------+------------------+
|          1|11.678300983485261|
|          2|11.981475070327736|
|          3|11.884797757753606|
|          4|12.040015236101613|
|          5|12.043679669072347|
|          6|12.016992251563929|
|          7|11.162100062868168|
+-----------+------------------+



This table shows the average fare (avg_fare) for each day of the week (day_of_week) in your dataset:

Day 1 (Sunday): The average fare is 11.68, which is the second-lowest among the days of the week.
Day 2 (Monday): The average fare is 11.98, the highest on Mondays.
Day 3 (Tuesday): The average fare is 11.88, showing a slight decrease compared to Monday.
Day 4 (Wednesday): The average fare is 12.04, the highest value recorded, indicating slightly higher fares midweek.
Day 5 (Thursday): The average fare is 12.04, tied with Wednesday for the highest fare.
Day 6 (Friday): The average fare is 12.02, remaining high but slightly lower than Thursday.
Day 7 (Saturday): The average fare is 11.16, the lowest among the days of the week, which might indicate more short trips or discounted rides during the weekend.
Interpretation:
Midweek (Wednesday and Thursday) shows the highest average fares, suggesting longer or more expensive trips.
Sunday and Saturday have the lowest average fares, with Saturday being the least expensive day, possibly reflecting shorter or more routine trips.
Monday also shows relatively high average fares, but not as high as the midweek days.

## Machine Learning Model (Regression)

In [22]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline

In [23]:
# StringIndexer for payment_type
payment_type_encoder = StringIndexer(
    inputCol="payment_type", outputCol="encoded_payment_type"
)

# StringIndexer for 'store_and_fwd_flag'
store_and_fwd_flag_indexer = StringIndexer(
    inputCol="store_and_fwd_flag", outputCol="indexed_store_and_fwd_flag"
)

# StringIndexer for 'vendor_id'
vendor_id_indexer = StringIndexer(
    inputCol="vendor_id", outputCol="indexed_vendor_id"
)


In [24]:
# Combine features into a single vector using VectorAssembler
feature_columns = [ 'passenger_count', 'trip_distance', 'pickup_longitude', 
    'pickup_latitude', 'rate_code','dropoff_longitude', 'dropoff_latitude', 'surcharge', 'mta_tax', 
    'tip_amount', 'tolls_amount', 'total_amount']
feature_assembler = VectorAssembler(
    inputCols=feature_columns, outputCol="feature_vector1"
)

In [25]:
train_set_assembled = assembler.transform(train_set)

In [26]:
# Initialize the Decision Tree Regressor model
dt_regressor = DecisionTreeRegressor(featuresCol="feature_vector1", labelCol="fare_amount")


In [27]:
pipeline_dt = Pipeline(stages=[feature_assembler, dt_regressor])
model_dt = pipeline_dt.fit(train_set)
predictions_dt = model_dt.transform(test_set)


In [29]:
# Show some predictions
predictions_dt.select("fare_amount", "prediction").show(10)


+-----------+------------------+
|fare_amount|        prediction|
+-----------+------------------+
|       25.0|22.978673215898823|
|        5.0|  4.80000631760609|
|        3.5| 3.850213034118448|
|        7.0| 6.423970538432056|
|        7.5| 8.572593510026767|
|       19.0|19.203894432932486|
|        6.5|7.3112291635237465|
|       17.0|15.905049095233503|
|       10.5|11.259448323865549|
|       20.5|18.896964048395727|
+-----------+------------------+
only showing top 10 rows



In [31]:
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize the evaluator with the predicted and actual values for MSE
mse_evaluator = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="mse")

# Compute MSE
mse = mse_evaluator.evaluate(predictions_dt)
print(f"Mean Squared Error (MSE): {mse}")


Mean Squared Error (MSE): 4.364309227422931


The Mean Squared Error (MSE) of 4.36 indicates the average squared difference between predicted and actual values. A lower MSE means better model performance. In this case, the MSE suggests reasonable predictions, but there is room for improvement. To improve accuracy, consider tuning the model or trying different approaches.

## Model Tuning and Evaluation

In [34]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

# Step 1: Define a parameter grid for hyperparameter tuning
paramGrid = (ParamGridBuilder()
    .addGrid(dt_regressor.maxDepth, [5, 10, 15])
    .addGrid(dt_regressor.maxBins, [32, 64, 128])
    .addGrid(dt_regressor.minInstancesPerNode, [1, 5, 10])
    .build())



In [35]:
# Step 2: Initialize the evaluator
evaluator = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="rmse")



In [36]:
# Step 3: Set up Cross-Validation
crossval = CrossValidator(estimator=pipeline_dt,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)  # 5-fold cross-validation



In [None]:
# Step 4: Fit the model using cross-validation
cv_model = crossval.fit(train_set)
# Step 5: Get the best model from cross-validation
best_model = cv_model.bestModel

# Step 6: Evaluate the performance of the best model on the test set
predictions_dt = best_model.transform(test_set)

# Step 7: Compute Evaluation Metrics
rmse = evaluator.evaluate(predictions_dt)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

# Optionally, you can compute additional evaluation metrics such as MSE or R2
mse_evaluator = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="mse")
mse = mse_evaluator.evaluate(predictions_dt)
print(f"Mean Squared Error (MSE) on test data: {mse}")

