In [1]:
# Let's import the libraries we will need
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [2]:
import os
os.environ["SPARK_HOME"] = "/home/alejandro/Escritorio/venv/lib/python3.12/site-packages/pyspark"
os.environ["PYSPARK_PYTHON"] = "python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python3"

from pyspark.sql import SparkSession
from pyspark import SparkContext

# Detener contextos previos si existen
try:
   sc.stop()
except:
   pass

# Crear SparkSession con configuración de memoria
spark = SparkSession.builder \
   .appName("DataProcessing") \
   .config("spark.driver.memory", "4g") \
   .config("spark.executor.memory", "4g") \
   .config("spark.driver.maxResultSize", "2g") \
   .config("spark.sql.adaptive.enabled", "true") \
   .getOrCreate()

# Obtener SparkContext desde la sesión
sc = spark.sparkContext

spark

25/05/22 12:44:28 WARN Utils: Your hostname, Alejandro resolves to a loopback address: 127.0.1.1; using 192.168.1.109 instead (on interface enp46s0)
25/05/22 12:44:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/22 12:44:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
#Library Imports and Setup
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor, LinearRegression
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.ml.stat import Correlation
import random
import math
from datetime import datetime, timedelta

In [4]:
# Haversine Distance UDF
@udf(returnType=DoubleType())
def calculate_haversine_distance(lat1, lon1, lat2, lon2):
    if any(coord is None for coord in [lat1, lon1, lat2, lon2]):
        return None
    
    lat1_rad, lon1_rad = math.radians(lat1), math.radians(lon1)
    lat2_rad, lon2_rad = math.radians(lat2), math.radians(lon2)
    
    dlat = lat2_rad - lat1_rad
    dlon = lon2_rad - lon1_rad
    a = math.sin(dlat/2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon/2)**2
    c = 2 * math.asin(math.sqrt(a))
    
    r = 3959
    return c * r

In [5]:
# Data Loading (CSV or Sample Data Generation)
def load_or_create_data(spark, csv_path=None, num_records=5000):
    if csv_path:
        schema = StructType([
            StructField("VendorID", IntegerType(), True),
            StructField("tpep_pickup_datetime", StringType(), True),
            StructField("tpep_dropoff_datetime", StringType(), True),
            StructField("passenger_count", IntegerType(), True),
            StructField("trip_distance", DoubleType(), True),
            StructField("pickup_longitude", DoubleType(), True),
            StructField("pickup_latitude", DoubleType(), True),
            StructField("RatecodeID", IntegerType(), True),
            StructField("store_and_fwd_flag", StringType(), True),
            StructField("dropoff_longitude", DoubleType(), True),
            StructField("dropoff_latitude", DoubleType(), True),
            StructField("payment_type", IntegerType(), True),
            StructField("fare_amount", DoubleType(), True),
            StructField("extra", DoubleType(), True),
            StructField("mta_tax", DoubleType(), True),
            StructField("tip_amount", DoubleType(), True),
            StructField("tolls_amount", DoubleType(), True),
            StructField("improvement_surcharge", DoubleType(), True),
            StructField("total_amount", DoubleType(), True)
        ])
        
        try:
            df = spark.read.format("csv") \
                .option("header", "true") \
                .schema(schema) \
                .load(csv_path)
            
            print(f"CSV data loaded successfully from: {csv_path}")
            print("Sample of original data:")
            df.show(5)
            return df
            
        except Exception as e:
            print(f"Error loading CSV: {e}")
            print("Falling back to sample data generation...")
    
    print("Generating sample data...")
    sample_schema = StructType([
        StructField("VendorID", IntegerType(), True),
        StructField("tpep_pickup_datetime", StringType(), False),
        StructField("tpep_dropoff_datetime", StringType(), False),
        StructField("passenger_count", IntegerType(), False),
        StructField("trip_distance", DoubleType(), False),
        StructField("pickup_longitude", DoubleType(), False),
        StructField("pickup_latitude", DoubleType(), False),
        StructField("RatecodeID", IntegerType(), True),
        StructField("store_and_fwd_flag", StringType(), True),
        StructField("dropoff_longitude", DoubleType(), False),
        StructField("dropoff_latitude", DoubleType(), False),
        StructField("payment_type", IntegerType(), True),
        StructField("fare_amount", DoubleType(), False),
        StructField("extra", DoubleType(), True),
        StructField("mta_tax", DoubleType(), True),
        StructField("tip_amount", DoubleType(), True),
        StructField("tolls_amount", DoubleType(), True),
        StructField("improvement_surcharge", DoubleType(), True),
        StructField("total_amount", DoubleType(), False)
    ])
    
    sample_data = []
    base_date = datetime(2016, 3, 1)
    
    nyc_bounds = {
        'lat_min': 40.4774, 'lat_max': 40.9176,
        'lon_min': -74.2591, 'lon_max': -73.7004
    }
    
    for i in range(num_records):
        pickup_time = base_date + timedelta(
            days=random.randint(0, 30),
            hours=random.randint(0, 23),
            minutes=random.randint(0, 59),
            seconds=random.randint(0, 59)
        )
        
        passenger_count = random.choices([1, 2, 3, 4, 5, 6], weights=[40, 25, 15, 10, 7, 3])[0]
        trip_distance = random.uniform(0.5, 25.0)
        
        pickup_lat = random.uniform(nyc_bounds['lat_min'], nyc_bounds['lat_max'])
        pickup_lon = random.uniform(nyc_bounds['lon_min'], nyc_bounds['lon_max'])
        dropoff_lat = random.uniform(nyc_bounds['lat_min'], nyc_bounds['lat_max'])
        dropoff_lon = random.uniform(nyc_bounds['lon_min'], nyc_bounds['lon_max'])
        
        base_duration = trip_distance * random.uniform(2.5, 6.0)
        
        hour = pickup_time.hour
        if 7 <= hour <= 9 or 17 <= hour <= 19:
            base_duration *= random.uniform(1.8, 2.8)
        elif 22 <= hour or hour <= 5:
            base_duration *= random.uniform(0.8, 1.3)
        else:
            base_duration *= random.uniform(1.0, 1.5)
        
        if pickup_time.weekday() in [5, 6]:
            base_duration *= random.uniform(0.9, 1.4)
        
        base_duration = max(1, min(180, base_duration))
        
        dropoff_time = pickup_time + timedelta(minutes=base_duration)
        
        base_fare = 2.50
        distance_fare = trip_distance * random.uniform(2.50, 3.50)
        time_fare = base_duration * random.uniform(0.30, 0.60)
        fare_amount = base_fare + distance_fare + time_fare
        
        tip = fare_amount * random.uniform(0, 0.25)
        extras = random.uniform(0, 3.0)
        total_amount = fare_amount + tip + extras
        
        sample_data.append((
            random.choice([1, 2]),
            pickup_time.strftime('%Y-%m-%d %H:%M:%S'),
            dropoff_time.strftime('%Y-%m-%d %H:%M:%S'),
            passenger_count,
            round(trip_distance, 2),
            round(pickup_lon, 6),
            round(pickup_lat, 6),
            random.choice([1, 2, 3, 4, 5]),
            random.choice(['N', 'Y']),
            round(dropoff_lon, 6),
            round(dropoff_lat, 6),
            random.choice([1, 2, 3, 4]),
            round(fare_amount, 2),
            round(random.uniform(0, 1), 2),
            0.50,
            round(tip, 2),
            round(random.uniform(0, 5), 2),
            0.30,
            round(total_amount, 2)
        ))
    
    df = spark.createDataFrame(sample_data, sample_schema)
    return df

# Load data - Change csv_path to your actual file path or set to None for sample data
csv_path = "data/yellow_tripdata_2016-03.csv"  # Set to None to use sample data
df_raw = load_or_create_data(spark, csv_path=csv_path, num_records=5000)
print(f"Dataset loaded: {df_raw.count()} records")
df_raw.printSchema()

CSV data loaded successfully from: data/yellow_tripdata_2016-03.csv
Sample of original data:


                                                                                

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+-----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag| dropoff_longitude| dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+-----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       1| 2016-03-01 00:00:00|  2016-03-01 00:07:55|              1|          2.5|-73.97674560546875| 40.76515197753906|         1|       

[Stage 1:>                                                        (0 + 20) / 20]

Dataset loaded: 12210952 records
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



                                                                                

In [6]:
#  Data Preprocessing
def preprocess_data(df):
    df = df.withColumn("pickup_datetime", 
                      to_timestamp(col("tpep_pickup_datetime"), "yyyy-MM-dd HH:mm:ss"))
    df = df.withColumn("dropoff_datetime", 
                      to_timestamp(col("tpep_dropoff_datetime"), "yyyy-MM-dd HH:mm:ss"))
    
    df = df.withColumn("trip_duration_minutes", 
                      (unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime")) / 60.0)
    
    df = df.withColumn("pickup_hour", hour("pickup_datetime")) \
           .withColumn("pickup_day_of_week", dayofweek("pickup_datetime")) \
           .withColumn("pickup_month", month("pickup_datetime")) \
           .withColumn("pickup_day_of_year", dayofyear("pickup_datetime"))
    
    df = df.withColumn("hour_category", 
                      when(col("pickup_hour").between(6, 12), "Morning")
                      .when(col("pickup_hour").between(13, 18), "Afternoon") 
                      .when(col("pickup_hour").between(19, 22), "Evening")
                      .otherwise("Night"))
    
    df = df.withColumn("is_weekend", 
                      when(col("pickup_day_of_week").isin([1, 7]), 1).otherwise(0))
    
    df = df.withColumn("calculated_distance", 
                      calculate_haversine_distance(
                          col("pickup_latitude"), col("pickup_longitude"),
                          col("dropoff_latitude"), col("dropoff_longitude")))
    
    initial_count = df.count()
    
    df = df.filter(col("trip_duration_minutes") > 0) \
           .filter(col("trip_duration_minutes") < 180) \
           .filter(col("passenger_count") > 0) \
           .filter(col("passenger_count") <= 6) \
           .filter(col("trip_distance") > 0) \
           .filter(col("trip_distance") < 50) \
           .filter(col("pickup_latitude").between(40.4, 41.0)) \
           .filter(col("pickup_longitude").between(-74.3, -73.7)) \
           .filter(col("dropoff_latitude").between(40.4, 41.0)) \
           .filter(col("dropoff_longitude").between(-74.3, -73.7))
    
    final_count = df.count()
    print(f"Records after filtering: {final_count} (removed: {initial_count - final_count})")
    
    df = df.withColumn("trip_category",
                      when(col("trip_duration_minutes") <= 10, "Short")
                      .when(col("trip_duration_minutes") <= 30, "Medium")
                      .otherwise("Long"))
    
    df = df.withColumn("is_long_trip", 
                      when(col("trip_duration_minutes") > 20, 1.0).otherwise(0.0))
    
    return df

df_processed = preprocess_data(df_raw)

25/05/22 12:44:42 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors

Records after filtering: 11937106 (removed: 273846)


                                                                                

In [7]:
# Exploratory Data Analysis
def exploratory_data_analysis(df):
    print("EXPLORATORY DATA ANALYSIS")
    print("=" * 50)
    
    print("\nTrip Duration Statistics:")
    df.select("trip_duration_minutes").describe().show()
    
    print("\nTrip Category Distribution:")
    df.groupBy("trip_category").count().orderBy("count", ascending=False).show()
    
    print("\nAverage Duration by Hour:")
    hourly_stats = df.groupBy("pickup_hour") \
                     .agg(avg("trip_duration_minutes").alias("avg_duration"),
                          count("*").alias("trip_count")) \
                     .orderBy("pickup_hour")
    hourly_stats.show()
    
    print("\nAverage Duration by Day of Week:")
    daily_stats = df.groupBy("pickup_day_of_week") \
                    .agg(avg("trip_duration_minutes").alias("avg_duration"),
                         count("*").alias("trip_count")) \
                    .orderBy("pickup_day_of_week")
    daily_stats.show()
    
    print("\nAverage Duration by Passenger Count:")
    passenger_stats = df.groupBy("passenger_count") \
                        .agg(avg("trip_duration_minutes").alias("avg_duration"),
                             avg("trip_distance").alias("avg_distance"),
                             count("*").alias("trip_count")) \
                        .orderBy("passenger_count")
    passenger_stats.show()

exploratory_data_analysis(df_processed)

# Feature Preparation
def prepare_features_for_ml(df):
    numeric_features = [
        "pickup_hour", "pickup_day_of_week", "pickup_month", 
        "is_weekend", "passenger_count", "trip_distance", 
        "calculated_distance", "pickup_longitude", "pickup_latitude",
        "dropoff_longitude", "dropoff_latitude"
    ]
    
    categorical_features = ["hour_category"]
    
    indexers = []
    for cat_col in categorical_features:
        indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_index", handleInvalid="keep")
        indexers.append(indexer)
    
    encoders = []
    encoded_cols = []
    for cat_col in categorical_features:
        encoder = OneHotEncoder(inputCol=f"{cat_col}_index", outputCol=f"{cat_col}_encoded")
        encoders.append(encoder)
        encoded_cols.append(f"{cat_col}_encoded")
    
    all_feature_cols = numeric_features + encoded_cols
    
    assembler = VectorAssembler(
        inputCols=all_feature_cols, 
        outputCol="features", 
        handleInvalid="skip"
    )
    
    print(f"Features prepared: {len(all_feature_cols)} total features")
    
    return indexers, encoders, assembler

indexers, encoders, assembler = prepare_features_for_ml(df_processed)

#  Data Pipeline and Split
def create_preprocessing_pipeline_and_split_data(df, indexers, encoders, assembler):
    preprocessing_stages = indexers + encoders + [assembler]
    preprocessing_pipeline = Pipeline(stages=preprocessing_stages)
    
    preprocessing_model = preprocessing_pipeline.fit(df)
    df_features = preprocessing_model.transform(df)
    
    train_data, test_data = df_features.randomSplit([0.7, 0.3], seed=42)
    
    print(f"Training set: {train_data.count()} records")
    print(f"Test set: {test_data.count()} records")
    
    return train_data, test_data, preprocessing_model

train_data, test_data, preprocessing_model = create_preprocessing_pipeline_and_split_data(
    df_processed, indexers, encoders, assembler
)


EXPLORATORY DATA ANALYSIS

Trip Duration Statistics:


                                                                                

+-------+---------------------+
|summary|trip_duration_minutes|
+-------+---------------------+
|  count|             11937106|
|   mean|   13.673365486017442|
| stddev|   10.454997064136354|
|    min| 0.016666666666666666|
|    max|   179.63333333333333|
+-------+---------------------+


Trip Category Distribution:


                                                                                

+-------------+-------+
|trip_category|  count|
+-------------+-------+
|       Medium|5708960|
|        Short|5399060|
|         Long| 829086|
+-------------+-------+


Average Duration by Hour:


                                                                                

+-----------+------------------+----------+
|pickup_hour|      avg_duration|trip_count|
+-----------+------------------+----------+
|          0|12.752213704866902|    419666|
|          1|12.776843214230256|    300580|
|          2|11.503745206805428|    188027|
|          3|11.656112473601748|    176148|
|          4|12.251764408224137|    125160|
|          5|11.809048572256774|    121894|
|          6|11.384076072352196|    270418|
|          7|12.432140464444485|    459732|
|          8|13.784295976064938|    561631|
|          9|13.930470123624328|    560030|
|         10|13.995830316589045|    537279|
|         11| 14.19687779314925|    550081|
|         12|14.204344865576635|    576673|
|         13|14.404576607820777|    577837|
|         14|15.243962770787855|    603809|
|         15|15.569020053570203|    586396|
|         16|15.471575336468138|    532938|
|         17|14.995256439712275|    635458|
|         18|13.874890487597169|    752274|
|         19|12.903943260910784|

                                                                                

+------------------+------------------+----------+
|pickup_day_of_week|      avg_duration|trip_count|
+------------------+------------------+----------+
|                 1| 12.64893460540238|   1448978|
|                 2|13.348102517711938|   1390641|
|                 3|13.715500564562475|   1833573|
|                 4| 14.18349718853027|   1915309|
|                 5|14.577880846312045|   1991070|
|                 6| 14.02597495567632|   1658819|
|                 7|12.788294384307413|   1698716|
+------------------+------------------+----------+


Average Duration by Passenger Count:


                                                                                

+---------------+------------------+------------------+----------+
|passenger_count|      avg_duration|      avg_distance|trip_count|
+---------------+------------------+------------------+----------+
|              1|13.474862508849416|2.8986088638699505|   8481758|
|              2| 14.32388176098397|3.1536527257232687|   1695605|
|              3|14.209151493976186| 3.045415419444609|    489421|
|              4|14.588819098690495|  3.14508470024553|    234592|
|              5|13.820332701637945| 3.054876866581224|    635855|
|              6|13.698880233406294|2.9757775554860877|    399875|
+---------------+------------------+------------------+----------+

Features prepared: 12 total features


25/05/22 12:45:27 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Training set: 8356184 records




Test set: 3580922 records


                                                                                

In [8]:
#Regression Models Training
def train_regression_models(train_data, test_data):
    print("REGRESSION MODELS TRAINING")
    print("=" * 50)
    
    evaluator = RegressionEvaluator(
        labelCol="trip_duration_minutes", 
        predictionCol="prediction"
    )
    
    results = {}
    
    print("\n1. Linear Regression")
    lr = LinearRegression(featuresCol="features", labelCol="trip_duration_minutes")
    lr_model = lr.fit(train_data)
    lr_predictions = lr_model.transform(test_data)
    
    lr_rmse = evaluator.evaluate(lr_predictions, {evaluator.metricName: "rmse"})
    lr_mae = evaluator.evaluate(lr_predictions, {evaluator.metricName: "mae"})
    lr_r2 = evaluator.evaluate(lr_predictions, {evaluator.metricName: "r2"})
    
    results['Linear_Regression'] = {
        'RMSE': lr_rmse, 'MAE': lr_mae, 'R²': lr_r2, 'model': lr_model
    }
    
    print(f"   RMSE: {lr_rmse:.3f}, MAE: {lr_mae:.3f}, R²: {lr_r2:.3f}")
    
    print("\n2. Random Forest Regression")
    rf = RandomForestRegressor(
        featuresCol="features", 
        labelCol="trip_duration_minutes",
        numTrees=50, 
        maxDepth=10, 
        seed=42
    )
    rf_model = rf.fit(train_data)
    rf_predictions = rf_model.transform(test_data)
    
    rf_rmse = evaluator.evaluate(rf_predictions, {evaluator.metricName: "rmse"})
    rf_mae = evaluator.evaluate(rf_predictions, {evaluator.metricName: "mae"})
    rf_r2 = evaluator.evaluate(rf_predictions, {evaluator.metricName: "r2"})
    
    results['Random_Forest'] = {
        'RMSE': rf_rmse, 'MAE': rf_mae, 'R²': rf_r2, 'model': rf_model
    }
    
    print(f"   RMSE: {rf_rmse:.3f}, MAE: {rf_mae:.3f}, R²: {rf_r2:.3f}")
    
    feature_importance = rf_model.featureImportances.toArray()
    print("   Top 5 Feature Importances:")
    for i, importance in enumerate(feature_importance[:5]):
        print(f"     Feature {i}: {importance:.4f}")
    
    print("\n3. Gradient Boosted Trees")
    gbt = GBTRegressor(
        featuresCol="features", 
        labelCol="trip_duration_minutes",
        maxIter=50, 
        maxDepth=8, 
        seed=42
    )
    gbt_model = gbt.fit(train_data)
    gbt_predictions = gbt_model.transform(test_data)
    
    gbt_rmse = evaluator.evaluate(gbt_predictions, {evaluator.metricName: "rmse"})
    gbt_mae = evaluator.evaluate(gbt_predictions, {evaluator.metricName: "mae"})
    gbt_r2 = evaluator.evaluate(gbt_predictions, {evaluator.metricName: "r2"})
    
    results['Gradient_Boosted_Trees'] = {
        'RMSE': gbt_rmse, 'MAE': gbt_mae, 'R²': gbt_r2, 'model': gbt_model
    }
    
    print(f"   RMSE: {gbt_rmse:.3f}, MAE: {gbt_mae:.3f}, R²: {gbt_r2:.3f}")
    
    return results, rf_predictions

regression_results, best_predictions = train_regression_models(train_data, test_data)

REGRESSION MODELS TRAINING

1. Linear Regression


25/05/22 12:46:06 WARN Instrumentation: [7d4f1755] regParam is zero, which might cause numerical instability and overfitting.
25/05/22 12:46:22 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/05/22 12:46:25 WARN Instrumentation: [7d4f1755] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
25/05/22 12:46:25 ERROR LBFGS: Failure! Resetting history: breeze.optimize.FirstOrderException: Line search zoom failed
25/05/22 12:46:25 ERROR LBFGS: Failure again! Giving up and returning. Maybe the objective is just poorly behaved?
                                                                                

   RMSE: 5.999, MAE: 4.090, R²: 0.670

2. Random Forest Regression


25/05/22 12:48:33 WARN MemoryStore: Not enough space to cache rdd_193_6 in memory! (computed 28.7 MiB so far)
25/05/22 12:48:33 WARN MemoryStore: Not enough space to cache rdd_193_18 in memory! (computed 5.3 MiB so far)
25/05/22 12:48:33 WARN BlockManager: Persisting block rdd_193_6 to disk instead.
25/05/22 12:48:33 WARN BlockManager: Persisting block rdd_193_18 to disk instead.
25/05/22 12:48:33 WARN MemoryStore: Not enough space to cache rdd_193_15 in memory! (computed 43.1 MiB so far)
25/05/22 12:48:33 WARN BlockManager: Persisting block rdd_193_15 to disk instead.
25/05/22 12:48:33 WARN MemoryStore: Not enough space to cache rdd_193_3 in memory! (computed 43.1 MiB so far)
25/05/22 12:48:33 WARN BlockManager: Persisting block rdd_193_3 to disk instead.
25/05/22 12:48:33 WARN MemoryStore: Not enough space to cache rdd_193_2 in memory! (computed 43.1 MiB so far)
25/05/22 12:48:33 WARN MemoryStore: Not enough space to cache rdd_193_16 in memory! (computed 43.1 MiB so far)
25/05/22 12:

   RMSE: 4.752, MAE: 3.022, R²: 0.793
   Top 5 Feature Importances:
     Feature 0: 0.0332
     Feature 1: 0.0058
     Feature 2: 0.0000
     Feature 3: 0.0100
     Feature 4: 0.0001

3. Gradient Boosted Trees


25/05/22 12:54:19 WARN DAGScheduler: Broadcasting large task binary with size 1001.7 KiB
25/05/22 12:54:19 WARN DAGScheduler: Broadcasting large task binary with size 1006.3 KiB
25/05/22 12:54:20 WARN DAGScheduler: Broadcasting large task binary with size 1015.5 KiB
25/05/22 12:54:20 WARN DAGScheduler: Broadcasting large task binary with size 1034.4 KiB
25/05/22 12:54:20 WARN DAGScheduler: Broadcasting large task binary with size 1041.4 KiB
25/05/22 12:54:22 WARN DAGScheduler: Broadcasting large task binary with size 1041.9 KiB
25/05/22 12:54:22 WARN DAGScheduler: Broadcasting large task binary with size 1042.5 KiB
25/05/22 12:54:22 WARN DAGScheduler: Broadcasting large task binary with size 1043.6 KiB
25/05/22 12:54:22 WARN DAGScheduler: Broadcasting large task binary with size 1045.9 KiB
25/05/22 12:54:23 WARN DAGScheduler: Broadcasting large task binary with size 1050.4 KiB
25/05/22 12:54:23 WARN DAGScheduler: Broadcasting large task binary with size 1059.6 KiB
25/05/22 12:54:23 WAR

   RMSE: 4.221, MAE: 2.615, R²: 0.837


                                                                                

In [9]:
#Classification Models Training
def train_classification_models(train_data, test_data):
    print("\nCLASSIFICATION MODELS TRAINING")
    print("=" * 50)
    
    accuracy_evaluator = MulticlassClassificationEvaluator(
        labelCol="is_long_trip", predictionCol="prediction", metricName="accuracy"
    )
    precision_evaluator = MulticlassClassificationEvaluator(
        labelCol="is_long_trip", predictionCol="prediction", metricName="weightedPrecision"
    )
    recall_evaluator = MulticlassClassificationEvaluator(
        labelCol="is_long_trip", predictionCol="prediction", metricName="weightedRecall"
    )
    f1_evaluator = MulticlassClassificationEvaluator(
        labelCol="is_long_trip", predictionCol="prediction", metricName="f1"
    )
    auc_evaluator = BinaryClassificationEvaluator(
        labelCol="is_long_trip", rawPredictionCol="rawPrediction", metricName="areaUnderROC"
    )
    
    results = {}
    
    print("\n1. Logistic Regression")
    lr_clf = LogisticRegression(featuresCol="features", labelCol="is_long_trip")
    lr_clf_model = lr_clf.fit(train_data)
    lr_clf_predictions = lr_clf_model.transform(test_data)
    
    lr_accuracy = accuracy_evaluator.evaluate(lr_clf_predictions)
    lr_precision = precision_evaluator.evaluate(lr_clf_predictions)
    lr_recall = recall_evaluator.evaluate(lr_clf_predictions)
    lr_f1 = f1_evaluator.evaluate(lr_clf_predictions)
    lr_auc = auc_evaluator.evaluate(lr_clf_predictions)
    
    results['Logistic_Regression'] = {
        'Accuracy': lr_accuracy, 'Precision': lr_precision, 'Recall': lr_recall,
        'F1': lr_f1, 'AUC': lr_auc, 'model': lr_clf_model
    }
    
    print(f"   Accuracy: {lr_accuracy:.3f}, Precision: {lr_precision:.3f}, Recall: {lr_recall:.3f}")
    print(f"   F1: {lr_f1:.3f}, AUC: {lr_auc:.3f}")
    
    print("\n2. Random Forest Classification")
    rf_clf = RandomForestClassifier(
        featuresCol="features", 
        labelCol="is_long_trip",
        numTrees=50, 
        maxDepth=10, 
        seed=42
    )
    rf_clf_model = rf_clf.fit(train_data)
    rf_clf_predictions = rf_clf_model.transform(test_data)
    
    rf_accuracy = accuracy_evaluator.evaluate(rf_clf_predictions)
    rf_precision = precision_evaluator.evaluate(rf_clf_predictions)
    rf_recall = recall_evaluator.evaluate(rf_clf_predictions)
    rf_f1 = f1_evaluator.evaluate(rf_clf_predictions)
    rf_auc = auc_evaluator.evaluate(rf_clf_predictions)
    
    results['Random_Forest_Clf'] = {
        'Accuracy': rf_accuracy, 'Precision': rf_precision, 'Recall': rf_recall,
        'F1': rf_f1, 'AUC': rf_auc, 'model': rf_clf_model
    }
    
    print(f"   Accuracy: {rf_accuracy:.3f}, Precision: {rf_precision:.3f}, Recall: {rf_recall:.3f}")
    print(f"   F1: {rf_f1:.3f}, AUC: {rf_auc:.3f}")
    
    print("\n3. Gradient Boosted Trees Classification")
    gbt_clf = GBTClassifier(
        featuresCol="features", 
        labelCol="is_long_trip",
        maxIter=50, 
        maxDepth=8, 
        seed=42
    )
    gbt_clf_model = gbt_clf.fit(train_data)
    gbt_clf_predictions = gbt_clf_model.transform(test_data)
    
    gbt_accuracy = accuracy_evaluator.evaluate(gbt_clf_predictions)
    gbt_precision = precision_evaluator.evaluate(gbt_clf_predictions)
    gbt_recall = recall_evaluator.evaluate(gbt_clf_predictions)
    gbt_f1 = f1_evaluator.evaluate(gbt_clf_predictions)
    gbt_auc = auc_evaluator.evaluate(gbt_clf_predictions)
    
    results['Gradient_Boosted_Trees_Clf'] = {
        'Accuracy': gbt_accuracy, 'Precision': gbt_precision, 'Recall': gbt_recall,
        'F1': gbt_f1, 'AUC': gbt_auc, 'model': gbt_clf_model
    }
    
    print(f"   Accuracy: {gbt_accuracy:.3f}, Precision: {gbt_precision:.3f}, Recall: {gbt_recall:.3f}")
    print(f"   F1: {gbt_f1:.3f}, AUC: {gbt_auc:.3f}")
    
    return results, rf_clf_predictions

classification_results, clf_predictions = train_classification_models(train_data, test_data)


CLASSIFICATION MODELS TRAINING

1. Logistic Regression


                                                                                

   Accuracy: 0.896, Precision: 0.892, Recall: 0.896
   F1: 0.890, AUC: 0.940

2. Random Forest Classification


25/05/22 13:00:51 WARN MemoryStore: Not enough space to cache rdd_2146_12 in memory! (computed 64.9 MiB so far)
25/05/22 13:00:51 WARN BlockManager: Persisting block rdd_2146_12 to disk instead.
25/05/22 13:00:51 WARN MemoryStore: Not enough space to cache rdd_2146_19 in memory! (computed 43.1 MiB so far)
25/05/22 13:00:51 WARN BlockManager: Persisting block rdd_2146_19 to disk instead.
25/05/22 13:00:51 WARN MemoryStore: Not enough space to cache rdd_2146_17 in memory! (computed 43.1 MiB so far)
25/05/22 13:00:51 WARN BlockManager: Persisting block rdd_2146_17 to disk instead.
25/05/22 13:00:51 WARN MemoryStore: Not enough space to cache rdd_2146_2 in memory! (computed 28.7 MiB so far)
25/05/22 13:00:51 WARN BlockManager: Persisting block rdd_2146_2 to disk instead.
25/05/22 13:00:51 WARN MemoryStore: Not enough space to cache rdd_2146_14 in memory! (computed 43.1 MiB so far)
25/05/22 13:00:51 WARN BlockManager: Persisting block rdd_2146_14 to disk instead.
25/05/22 13:00:51 WARN Memo

   Accuracy: 0.912, Precision: 0.909, Recall: 0.912
   F1: 0.910, AUC: 0.950

3. Gradient Boosted Trees Classification


25/05/22 13:06:52 WARN DAGScheduler: Broadcasting large task binary with size 1001.0 KiB
25/05/22 13:06:53 WARN DAGScheduler: Broadcasting large task binary with size 1003.3 KiB
25/05/22 13:06:53 WARN DAGScheduler: Broadcasting large task binary with size 1007.8 KiB
25/05/22 13:06:53 WARN DAGScheduler: Broadcasting large task binary with size 1016.9 KiB
25/05/22 13:06:54 WARN DAGScheduler: Broadcasting large task binary with size 1035.7 KiB
25/05/22 13:06:54 WARN DAGScheduler: Broadcasting large task binary with size 1042.5 KiB
25/05/22 13:06:56 WARN DAGScheduler: Broadcasting large task binary with size 1042.9 KiB
25/05/22 13:06:56 WARN DAGScheduler: Broadcasting large task binary with size 1043.5 KiB
25/05/22 13:06:56 WARN DAGScheduler: Broadcasting large task binary with size 1044.7 KiB
25/05/22 13:06:57 WARN DAGScheduler: Broadcasting large task binary with size 1047.0 KiB
25/05/22 13:06:57 WARN DAGScheduler: Broadcasting large task binary with size 1051.5 KiB
25/05/22 13:06:57 WAR

   Accuracy: 0.926, Precision: 0.924, Recall: 0.926
   F1: 0.925, AUC: 0.968


In [None]:
def test_models_quick():
    print("\nQUICK MODEL TESTING")
    print("=" * 40)
    
    best_reg_model = regression_results['Random_Forest']['model']
    
    # Test scenarios
    test_cases = [
        ("Rush hour Manhattan", 8, 2, 1, 2.5, -73.9851, 40.7589, -73.9934, 40.7505),
        ("Evening long trip", 22, 6, 3, 12.8, -73.7949, 40.7282, -74.0445, 40.6892),
        ("Short night trip", 2, 3, 1, 1.1, -73.9851, 40.7589, -73.9934, 40.7505)
    ]
    
    for desc, hour, day, passengers, distance, p_lon, p_lat, d_lon, d_lat in test_cases:
        # Create test data
        test_data = spark.createDataFrame([
            (hour, day, passengers, distance, p_lon, p_lat, d_lon, d_lat)
        ], StructType([
            StructField("pickup_hour", IntegerType(), True),
            StructField("pickup_day_of_week", IntegerType(), True),
            StructField("passenger_count", IntegerType(), True),
            StructField("trip_distance", DoubleType(), True),
            StructField("pickup_longitude", DoubleType(), True),
            StructField("pickup_latitude", DoubleType(), True),
            StructField("dropoff_longitude", DoubleType(), True),
            StructField("dropoff_latitude", DoubleType(), True)
        ]))
        
        # Add features
        test_data = test_data.withColumn("pickup_month", lit(3)) \
                           .withColumn("pickup_day_of_year", lit(60)) \
                           .withColumn("is_weekend", when(col("pickup_day_of_week").isin([1, 7]), 1).otherwise(0)) \
                           .withColumn("hour_category", when(col("pickup_hour").between(6, 12), "Morning")
                                                      .when(col("pickup_hour").between(13, 18), "Afternoon")
                                                      .when(col("pickup_hour").between(19, 22), "Evening")
                                                      .otherwise("Night")) \
                           .withColumn("calculated_distance", calculate_haversine_distance(
                               col("pickup_latitude"), col("pickup_longitude"),
                               col("dropoff_latitude"), col("dropoff_longitude"))) \
                           .withColumn("trip_duration_minutes", lit(0.0)) \
                           .withColumn("trip_category", lit("Unknown")) \
                           .withColumn("is_long_trip", lit(0.0))
        
        # Predict
        test_processed = preprocessing_model.transform(test_data)
        prediction = best_reg_model.transform(test_processed)
        duration = prediction.select("prediction").collect()[0]["prediction"]
        
        print(f"{desc}: {duration:.1f} minutes ({distance:.1f} miles)")

test_models_quick()

# Results Summary
def final_summary():
    print("\nFINAL RESULTS")
    print("=" * 40)
    
    # Fix: Use Python's built-in min/max explicitly
    import builtins
    
    best_reg = builtins.min(regression_results.items(), key=lambda x: x[1]['RMSE'])
    best_clf = builtins.max(classification_results.items(), key=lambda x: x[1]['F1'])
    
    print("BEST REGRESSION MODEL:")
    print(f"  {best_reg[0]}")
    print(f"  RMSE: {best_reg[1]['RMSE']:.2f} minutes")
    print(f"  R²: {best_reg[1]['R²']:.3f} ({best_reg[1]['R²']*100:.1f}% accuracy)")
    
    print("\nBEST CLASSIFICATION MODEL:")
    print(f"  {best_clf[0]}")
    print(f"  Accuracy: {best_clf[1]['Accuracy']:.3f} ({best_clf[1]['Accuracy']*100:.1f}%)")
    print(f"  F1-Score: {best_clf[1]['F1']:.3f}")
    
    print(f"\nDATASET: {df_processed.count():,} taxi trips processed")
    print("ALGORITHMS: Linear Regression, Random Forest, Gradient Boosted Trees")
    print("EVALUATION: Train-Test Split (70/30), RMSE, MAE, R², Accuracy, F1")

final_summary()


QUICK MODEL TESTING
Rush hour Manhattan: 18.0 minutes (2.5 miles)
Evening long trip: 33.6 minutes (12.8 miles)
Short night trip: 5.7 minutes (1.1 miles)

FINAL RESULTS
BEST REGRESSION MODEL:
  Gradient_Boosted_Trees
  RMSE: 4.22 minutes
  R²: 0.837 (83.7% accuracy)

BEST CLASSIFICATION MODEL:
  Gradient_Boosted_Trees_Clf
  Accuracy: 0.926 (92.6%)
  F1-Score: 0.925





DATASET: 11,937,106 taxi trips processed
ALGORITHMS: Linear Regression, Random Forest, Gradient Boosted Trees
EVALUATION: Train-Test Split (70/30), RMSE, MAE, R², Accuracy, F1


                                                                                