# Team 15
# Jayveersinh Raj, Mark Zakharov
# BS20-DS-01
# Big data project spark ML

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=30fed2f4fcc4b8d9f5242415a7fede19bb7cd315052cafaaa79216a4c1f6c948
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


# Reading the data

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, minute, dayofweek, month
from pyspark.sql.types import TimestampType

spark = SparkSession.builder \
    .appName("NYC Taxi Trip Duration") \
    .getOrCreate()

# Read train and test data
train = spark.read.csv("new_train.csv", header=True, inferSchema=True)
test = spark.read.csv("new_test.csv", header=True, inferSchema=True)

# Having a look at the data

In [None]:
train.show()

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|id2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|
|id2377394|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|-73.98041534423828|40.738563537597656|-73.99948120117188| 40.73115158081055|                 N|          663|
|id3858529|        2|2016-01-19 11:35:24|2016-01-19 12:10:48|    

In [None]:
test.show()

+---------+---------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+
|       id|vendor_id|    pickup_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|
+---------+---------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+
|id3004672|        1|2016-06-30 23:59:58|              1|-73.98812866210938| 40.73202896118164|-73.99017333984375| 40.75667953491211|                 N|
|id3505355|        1|2016-06-30 23:59:53|              1|-73.96420288085938| 40.67999267578125|-73.95980834960938| 40.65540313720703|                 N|
|id1217141|        1|2016-06-30 23:59:47|              1| -73.9974365234375| 40.73758316040039|-73.98616027832031|40.729522705078125|                 N|
|id2150126|        2|2016-06-30 23:59:41|              1|-73.95606994628906| 40.77

# Value counts

## Train

In [None]:
from pyspark.sql.functions import count

vendor_id_counts = train.groupBy("vendor_id").agg(count("*").alias("count")).orderBy("vendor_id")
vendor_id_counts.show()

+---------+------+
|vendor_id| count|
+---------+------+
|        1|678342|
|        2|780302|
+---------+------+



## Test

In [None]:
from pyspark.sql.functions import count

vendor_id_counts = test.groupBy("vendor_id").agg(count("*").alias("count")).orderBy("vendor_id")
vendor_id_counts.show()

+---------+------+
|vendor_id| count|
+---------+------+
|        1|290760|
|        2|334374|
+---------+------+



# Randomizing samples, and handling date time

In [None]:
# Sample the data
train = train.sample(fraction=0.15, seed=1)
test = test.sample(fraction=0.15, seed=1)

# Convert pickup_datetime to timestamp and create new columns
train = train.withColumn("pickup_datetime", col("pickup_datetime").cast(TimestampType())) \
    .withColumn("hour", hour("pickup_datetime")) \
    .withColumn("minute", minute("pickup_datetime")) \
    .withColumn("minute_oftheday", (col("hour") * 60 + col("minute"))) \
    .withColumn("day_week", dayofweek("pickup_datetime")) \
    .withColumn("month", month("pickup_datetime"))

test = test.withColumn("pickup_datetime", col("pickup_datetime").cast(TimestampType())) \
    .withColumn("hour", hour("pickup_datetime")) \
    .withColumn("minute", minute("pickup_datetime")) \
    .withColumn("minute_oftheday", (col("hour") * 60 + col("minute"))) \
    .withColumn("day_week", dayofweek("pickup_datetime")) \
    .withColumn("month", month("pickup_datetime"))

## Having a look

In [None]:
train.show()

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+----+------+---------------+--------+-----+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|hour|minute|minute_oftheday|day_week|month|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+----+------+---------------+--------+-----+
|id3858529|        2|2016-01-19 11:35:24|2016-01-19 12:10:48|              1| -73.9790267944336|40.763938903808594|-74.00533294677734|40.710086822509766|                 N|         2124|  11|    35|            695|       3|    1|
|id3504673|        2|2016-04-06 19:32:31|2016-04-06 19:39:40|              1|-74

In [None]:
test.show()

+---------+---------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+----+------+---------------+--------+-----+
|       id|vendor_id|    pickup_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|hour|minute|minute_oftheday|day_week|month|
+---------+---------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+----+------+---------------+--------+-----+
|id1217141|        1|2016-06-30 23:59:47|              1| -73.9974365234375| 40.73758316040039|-73.98616027832031|40.729522705078125|                 N|  23|    59|           1439|       5|    6|
|id2150126|        2|2016-06-30 23:59:41|              1|-73.95606994628906| 40.77190017700195| -73.9864273071289|       40.73046875|                 N|  23|    59|           1439|       5|    6|
|id0898117|        1

## Now the date time is not needed (the original)

In [None]:
# Drop columns from the train DataFrame
train = train.drop("pickup_datetime", "dropoff_datetime")

# Drop column from the test DataFrame
test = test.drop("pickup_datetime")

In [None]:
train.show()

+---------+---------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+----+------+---------------+--------+-----+
|       id|vendor_id|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|hour|minute|minute_oftheday|day_week|month|
+---------+---------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+----+------+---------------+--------+-----+
|id3858529|        2|              1| -73.9790267944336|40.763938903808594|-74.00533294677734|40.710086822509766|                 N|         2124|  11|    35|            695|       3|    1|
|id3504673|        2|              1|-74.01004028320312|   40.719970703125|-74.01226806640625| 40.70671844482422|                 N|          429|  19|    32|           1172|       4|    4|
|id1324603|        2|              1|-73.969276428

In [None]:
test.show()

+---------+---------+---------------+------------------+------------------+------------------+------------------+------------------+----+------+---------------+--------+-----+
|       id|vendor_id|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|hour|minute|minute_oftheday|day_week|month|
+---------+---------+---------------+------------------+------------------+------------------+------------------+------------------+----+------+---------------+--------+-----+
|id1217141|        1|              1| -73.9974365234375| 40.73758316040039|-73.98616027832031|40.729522705078125|                 N|  23|    59|           1439|       5|    6|
|id2150126|        2|              1|-73.95606994628906| 40.77190017700195| -73.9864273071289|       40.73046875|                 N|  23|    59|           1439|       5|    6|
|id0898117|        1|              2|-74.01271057128906|  40.7015266418457|-73.98648071289062| 40.71950912475586|       

# Feature engineering

In [None]:
from pyspark.sql.functions import log2, mean as _mean, stddev as _stddev, col
from pyspark.sql.window import Window

# Drop the id columns
train = train.drop("id")
test = test.drop("id")

# Filter 1: Apply log2 transformation to trip_duration
trip_duration_fil1 = train.withColumn("trip_duration", log2(col("trip_duration") + 1))

# Calculate mean and standard deviation
stats = trip_duration_fil1.select(_mean(col("trip_duration")).alias("mean"), _stddev(col("trip_duration")).alias("stddev")).collect()
mean = stats[0]["mean"]
sd = stats[0]["stddev"]

# Calculate lower and upper bounds
lower_bound = mean - (3 * sd)
upper_bound = mean + (3 * sd)

# Filter 2: Remove outliers
trip_duration_fil2 = trip_duration_fil1.filter((col("trip_duration") > lower_bound) & (col("trip_duration") < upper_bound))

## Finding the distance from latitudes, and longitudes

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from geopy import distance

# Define a UDF to calculate the distance
def get_distance(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude):
    pick = (pickup_latitude, pickup_longitude)
    drop = (dropoff_latitude, dropoff_longitude)
    dist = distance.geodesic(pick, drop).km
    return dist

get_distance_udf = udf(get_distance, FloatType())

# Apply the UDF to the train and test DataFrames
train = train.withColumn("distance", get_distance_udf("pickup_latitude", "pickup_longitude", "dropoff_latitude", "dropoff_longitude"))
test = test.withColumn("distance", get_distance_udf("pickup_latitude", "pickup_longitude", "dropoff_latitude", "dropoff_longitude"))

## Making all the columns numerical, and seperating the label

In [None]:
from pyspark.sql.functions import when

# Replace 'N' and 'Y' with 0 and 1 in the store_and_fwd_flag column in both train and test DataFrames
train = train.withColumn("store_and_fwd_flag", when(col("store_and_fwd_flag") == "N", 0).otherwise(1))
test = test.withColumn("store_and_fwd_flag", when(col("store_and_fwd_flag") == "N", 0).otherwise(1))

# Print the value counts for the updated store_and_fwd_flag column in the train DataFrame
store_and_fwd_flag_counts_updated = train.groupBy("store_and_fwd_flag").agg(count("*").alias("count")).orderBy("store_and_fwd_flag")
store_and_fwd_flag_counts_updated.show()

# Remove the minute_oftheday column from the train and test DataFrames
train = train.drop("minute_oftheday")
test = test.drop("minute_oftheday")

# Define the label and features
label = "trip_duration"
features = [col for col in train.columns if col != label]

+------------------+------+
|store_and_fwd_flag| count|
+------------------+------+
|                 0|217374|
|                 1|  1197|
+------------------+------+



# Saving model

In [None]:
# Convert the Spark DataFrame to a Pandas DataFrame
pandas_df = train.toPandas()

# Now you can use pandas' to_csv function to save the DataFrame to a CSV


In [None]:
pandas_df.to_csv('/content/taxi_data.csv', index=False)

In [None]:
train.show()

+---------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+----+------+--------+-----+---------+
|vendor_id|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|hour|minute|day_week|month| distance|
+---------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+----+------+--------+-----+---------+
|        2|              1| -73.9790267944336|40.763938903808594|-74.00533294677734|40.710086822509766|                 0|         2124|  11|    35|       3|    1|6.3796873|
|        2|              1|-74.01004028320312|   40.719970703125|-74.01226806640625| 40.70671844482422|                 0|          429|  19|    32|       4|    4|1.4836324|
|        2|              1|-73.96927642822266| 40.79777908325195|-73.92247009277344| 40.76055908203125|                 0|        

# Vector assembler for training and test

In [None]:
from pyspark.ml.feature import VectorAssembler

# Assemble the features into a single feature vector column
assembler = VectorAssembler(inputCols=features, outputCol="features")
train_with_features = assembler.transform(train)

# Rename the label column to "label" if it's not already named "label"
if label != "label":
    train_with_features = train_with_features.withColumnRenamed(label, "label")

# Show the schema to verify the features and label columns
train_with_features.printSchema()

root
 |-- vendor_id: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: integer (nullable = false)
 |-- label: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- day_week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- distance: float (nullable = true)
 |-- features: vector (nullable = true)



In [None]:
train_with_features.show()

+---------+---------------+------------------+------------------+------------------+------------------+------------------+-----+----+------+--------+-----+---------+--------------------+
|vendor_id|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|label|hour|minute|day_week|month| distance|            features|
+---------+---------------+------------------+------------------+------------------+------------------+------------------+-----+----+------+--------+-----+---------+--------------------+
|        2|              1| -73.9790267944336|40.763938903808594|-74.00533294677734|40.710086822509766|                 0| 2124|  11|    35|       3|    1|6.3796873|[2.0,1.0,-73.9790...|
|        2|              1|-74.01004028320312|   40.719970703125|-74.01226806640625| 40.70671844482422|                 0|  429|  19|    32|       4|    4|1.4836324|[2.0,1.0,-74.0100...|
|        2|              1|-73.96927642822266| 40.79777908325195|

In [None]:
test_with_features = assembler.transform(test)
if label != "label":
    test_with_features = test_with_features.withColumnRenamed(label, "label")

In [None]:
test_with_features.show()

+---------+---------------+------------------+------------------+------------------+------------------+------------------+----+------+--------+-----+----------+--------------------+
|vendor_id|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|hour|minute|day_week|month|  distance|            features|
+---------+---------------+------------------+------------------+------------------+------------------+------------------+----+------+--------+-----+----------+--------------------+
|        1|              1| -73.9974365234375| 40.73758316040039|-73.98616027832031|40.729522705078125|                 0|  23|    59|       5|    6| 1.3071119|[1.0,1.0,-73.9974...|
|        2|              1|-73.95606994628906| 40.77190017700195| -73.9864273071289|       40.73046875|                 0|  23|    59|       5|    6| 5.2669783|[2.0,1.0,-73.9560...|
|        1|              2|-74.01271057128906|  40.7015266418457|-73.98648071289062| 40.71

## Since test set does not have labels i.e. ground truth it cannot be used for evaluation

# Modeling

In [None]:
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
import time
import os

# Create a directory to save the models
model_save_path = "best_models"
if not os.path.exists(model_save_path):
    os.makedirs(model_save_path)

# Prepare the validation data 90% train, 10% validation, seed 42
train_data, validation_data = train_with_features.randomSplit([0.9, 0.1], seed=42)

# Initialize the models
models = [
    ("Linear Regression", LinearRegression(maxIter=20)),
    ("Decision Tree Regressor", DecisionTreeRegressor(maxDepth=8)),
    ("Gradient Boosting Regressor", GBTRegressor(maxIter=20)),
]

# Set up the evaluation metrics
evaluator_mae = RegressionEvaluator(metricName="mae")
evaluator_r2 = RegressionEvaluator(metricName="r2")

# Train and evaluate the models
results = []
for name, model in models:
    start_time = time.time()
    
    # Create a pipeline with the model
    pipeline = Pipeline(stages=[model])

    # Train the model using cross-validation
    param_grid = ParamGridBuilder().build()
    cross_validator = CrossValidator(
        estimator=pipeline,
        estimatorParamMaps=param_grid,
        evaluator=evaluator_mae,
        numFolds=3
    )
    cv_model = cross_validator.fit(train_with_features)

    # Make predictions on the validation data
    predictions = cv_model.transform(validation_data)

    # Evaluate the model
    mae = evaluator_mae.evaluate(predictions)
    r2 = evaluator_r2.evaluate(predictions)
    duration = time.time() - start_time

    results.append((name, mae, r2, duration))

    # Save the best model
    best_model = cv_model.bestModel
    best_model.save(os.path.join(model_save_path, name.replace(" ", "_")))

# Create a comparison table
comparison_table = spark.createDataFrame(results, ["Model", "MAE", "R2 Score", "Duration"])
comparison_table.show(truncate=False)

+---------------------------+------------------+---------------------+-----------------+
|Model                      |MAE               |R2 Score             |Duration         |
+---------------------------+------------------+---------------------+-----------------+
|Linear Regression          |561.9390271170785 |0.0037072279662681007|467.6174705028534|
|Decision Tree Regressor    |407.37455527081164|0.9511363268774429   |533.0892181396484|
|Gradient Boosting Regressor|434.8726869260014 |0.7780102701133474   |628.8232140541077|
+---------------------------+------------------+---------------------+-----------------+



## It can be seen that Decision Tree Regressor is the best

In [None]:
!zip -r /content/best_models.zip /content/best_models

  adding: content/best_models/ (stored 0%)
  adding: content/best_models/Gradient_Boosting_Regressor/ (stored 0%)
  adding: content/best_models/Gradient_Boosting_Regressor/metadata/ (stored 0%)
  adding: content/best_models/Gradient_Boosting_Regressor/metadata/_SUCCESS (stored 0%)
  adding: content/best_models/Gradient_Boosting_Regressor/metadata/.part-00000.crc (stored 0%)
  adding: content/best_models/Gradient_Boosting_Regressor/metadata/._SUCCESS.crc (stored 0%)
  adding: content/best_models/Gradient_Boosting_Regressor/metadata/part-00000 (deflated 20%)
  adding: content/best_models/Gradient_Boosting_Regressor/stages/ (stored 0%)
  adding: content/best_models/Gradient_Boosting_Regressor/stages/0_GBTRegressor_fa81a156afef/ (stored 0%)
  adding: content/best_models/Gradient_Boosting_Regressor/stages/0_GBTRegressor_fa81a156afef/metadata/ (stored 0%)
  adding: content/best_models/Gradient_Boosting_Regressor/stages/0_GBTRegressor_fa81a156afef/metadata/_SUCCESS (stored 0%)
  adding: conte