In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import hour, dayofweek, to_timestamp, when, col, sqrt, pow
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("HighFarePrediction") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Task 1: Data Preparation
# a) Load Dataset
df = spark.read.csv("yellow_tripdata_2015-01.csv", header=True, inferSchema=True)

# b) Data Exploration
df.printSchema()
df.show(5)
df = df.dropna()

# c) Feature Engineering
df = df.withColumn("pickup_hour", hour(to_timestamp(df["tpep_pickup_datetime"])))
df = df.withColumn("pickup_day", dayofweek(to_timestamp(df["tpep_pickup_datetime"])))
df = df.withColumn(
    "distance",
    sqrt(
        pow(col("pickup_longitude") - col("dropoff_longitude"), 2) +
        pow(col("pickup_latitude") - col("dropoff_latitude"), 2)
    )
)

# d) Target Variable Creation
df = df.withColumn("high_fare", when(df["fare_amount"] > 20, 1).otherwise(0))

# e) Data Splitting
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

# Task 2: Decision Tree Classifier Pipeline
# a) Define the pipeline stages
feature_cols = ["pickup_hour", "pickup_day", "distance"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
dt = DecisionTreeClassifier(labelCol="high_fare", featuresCol="scaled_features")

pipeline_dt = Pipeline(stages=[assembler, scaler, dt])

# b) Hyperparameter Tuning
param_grid_dt = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 15]) \
    .addGrid(dt.minInstancesPerNode, [1, 2, 4]) \
    .build()

evaluator = MulticlassClassificationEvaluator(labelCol="high_fare", metricName="f1")

cv_dt = CrossValidator(
    estimator=pipeline_dt,
    estimatorParamMaps=param_grid_dt,
    evaluator=evaluator,
    numFolds=3,
    parallelism=1
)

# c) Model Training
model_dt = cv_dt.fit(train_data)

# d) Model Evaluation
predictions_dt = model_dt.transform(test_data)
f1_dt = evaluator.evaluate(predictions_dt)

# e) Save Pipeline
model_dt.write().overwrite().save("decision_tree_pipeline")

# Task 3: Logistic Regression Pipeline
# a) Define the pipeline
lr = LogisticRegression(labelCol="high_fare", featuresCol="scaled_features")
pipeline_lr = Pipeline(stages=[assembler, scaler, lr])

# b) Hyperparameter Tuning
param_grid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.maxIter, [50, 100]) \
    .build()

cv_lr = CrossValidator(
    estimator=pipeline_lr,
    estimatorParamMaps=param_grid_lr,
    evaluator=evaluator,
    numFolds=3,
    parallelism=1
)

# c) Model Training
model_lr = cv_lr.fit(train_data)

# d) Model Evaluation
predictions_lr = model_lr.transform(test_data)
f1_lr = evaluator.evaluate(predictions_lr)

# Save Logistic Regression Pipeline
model_lr.write().overwrite().save("logistic_regression_pipeline")

# Task 4: Report Findings
# a) Discuss performance
results = f"""
Decision Tree F1 Score: {f1_dt}
Logistic Regression F1 Score: {f1_lr}
Best Model: {'Decision Tree' if f1_dt > f1_lr else 'Logistic Regression'}
"""

# b) State the best-performing pipeline
print(results)

# Stop the Spark Session
spark.stop()

                                                                                

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+------------------+---------------

24/11/27 15:10:59 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/11/27 15:10:59 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                


Decision Tree F1 Score: 0.9616559334817972
Logistic Regression F1 Score: 0.8317299367110219
Best Model: Decision Tree

