In [None]:
#==================================
# Islamic University of Gaza
# Faculty of Information Technology
# Course: Cloud and Distributed Systems (SICT 4313)
# Project: Spark Cluster Simulation for Taxi Trip Analytics
#==================================

# ^_^ Colab single machine => simulate virtual cluster using Spark local[N] ^_^
# ^_^ Descriptive Stats  + Map/Reduce + 4 ML Jobs + Scalability metrics ^_^


import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, mean, min as spark_min, max as spark_max, count as spark_count,
    collect_set, slice, array_distinct
)
from pyspark.sql.types import DoubleType

from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator, RegressionEvaluator
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.fpm import FPGrowth

from google.colab import files


NODES = [1, 2, 4, 8]

# ML settings
KMEANS_K = 4
KMEANS_MAXITER = 12
RF_TREES = 12
RF_DEPTH = 6

# FPGrowth safety
MAX_ITEMS_PER_BASKET = 30
FPG_SUPPORT = 0.02
FPG_CONF = 0.2

def build_spark(app_name, n):
    spark = SparkSession.builder \
        .appName(app_name) \
        .master(f"local[{n}]") \
        .config("spark.driver.memory", "4g") \
        .config("spark.sql.shuffle.partitions", str(max(16, n * 4))) \
        .config("spark.ui.showConsoleProgress", "false") \
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    return spark

# ------------------------------------------
#  Upload
# ------------------------------------------
print("Cloud-Based Data Processing Service")
print("-----------------------------------")
print("^_^ Please upload the CSV file ... ^_^\n")

uploaded = files.upload()
if not uploaded:
    raise Exception("No file uploaded.")

file_name = next(iter(uploaded))
print(f"File uploaded: {file_name}")

# ------------------------------------------
# 1) Descriptive Statistics
# ------------------------------------------
spark = build_spark("Taxi_Stats", 2)
df = spark.read.csv(file_name, header=True, inferSchema=True)

df_stats = df.select(
    col("trip_distance").cast(DoubleType()).alias("trip_distance"),
    col("fare_amount").cast(DoubleType()).alias("fare_amount")
).na.drop()

stats = df_stats.agg(
    spark_count("trip_distance").alias("count_td"),
    mean("trip_distance").alias("mean_td"),
    spark_min("trip_distance").alias("min_td"),
    spark_max("trip_distance").alias("max_td"),
    spark_count("fare_amount").alias("count_fa"),
    mean("fare_amount").alias("mean_fa"),
    spark_min("fare_amount").alias("min_fa"),
    spark_max("fare_amount").alias("max_fa")
).collect()[0]

print("\nDescriptive Statistics (Count/Mean/Min/Max)")
print("-----------------------------------")
print(f"trip_distance -> Count: {stats['count_td']}, Mean: {stats['mean_td']:.4f}, Min: {stats['min_td']}, Max: {stats['max_td']}")
print(f"fare_amount   -> Count: {stats['count_fa']}, Mean: {stats['mean_fa']:.4f}, Min: {stats['min_fa']}, Max: {stats['max_fa']}")
print("-----------------------------------")
spark.stop()
time.sleep(1)

# ------------------------------------------
# 2) Workload setting
# ------------------------------------------
print("\nWorkload setting (for fairness across N):")
print("- Same Map/Reduce aggregation job on the FULL dataset for N = [1, 2, 4, 8] using local[N].")
print("- ML jobs are executed once on the FULL dataset (no limit).")

# ------------------------------------------
# 3) Performance Table (Scalability) - FULL DATASET
# ------------------------------------------
print("\nPerformance Table (Scalability) - FULL dataset")
print("------------------------------------------------")
print(f"{'Nodes':<8} | {'Time(s)':<10} | {'Speedup':<10} | {'Efficiency':<10}")
print("------------------------------------------------")

base_time = None

for n in NODES:
    spark = build_spark(f"Taxi_Sim_{n}", n)
    start = time.time()

    raw = spark.read.csv(file_name, header=True, inferSchema=True)

    # Map/Reduce on FULL dataset
    full = raw.select(col("pickup_location_id").cast("string").alias("pickup_location_id")) \
              .na.drop() \
              .repartition(max(16, n * 4))

    _ = full.groupBy("pickup_location_id").count().orderBy(col("count").desc()).limit(10).collect()

    t = time.time() - start

    if base_time is None:
        base_time = t
        speedup = 1.0
    else:
        speedup = base_time / t if t > 0 else 0.0

    efficiency = speedup / n
    print(f"{n:<8} | {t:<10.2f} | {speedup:<10.2f} | {efficiency:<10.2f}")

    spark.stop()
    time.sleep(1)

print("------------------------------------------------")

# ------------------------------------------
# 4) ML Jobs (4)
# ------------------------------------------
print("\nMachine Learning Jobs & Results (FULL dataset)")
print("------------------------------------------------")

spark = build_spark("Taxi_ML_Full", 4)
raw = spark.read.csv(file_name, header=True, inferSchema=True)

ml_df = raw.select(
    col("trip_distance").cast(DoubleType()).alias("trip_distance"),
    col("fare_amount").cast(DoubleType()).alias("fare_amount"),
    col("pickup_location_id").cast("string").alias("pickup_location_id")
).na.drop().repartition(32).cache()


_ = ml_df.count()

# ---- ML Job 1: KMeans (features: trip_distance + fare_amount) ----
feat_vec = VectorAssembler(
    inputCols=["trip_distance", "fare_amount"],
    outputCol="features_raw",
    handleInvalid="skip"
).transform(ml_df)

scaled = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=False
).fit(feat_vec).transform(feat_vec).cache()

_ = scaled.count()

km_pred = KMeans(
    k=KMEANS_K,
    seed=42,
    maxIter=KMEANS_MAXITER,
    featuresCol="features"
).fit(scaled).transform(scaled)

sil = ClusteringEvaluator(metricName="silhouette", featuresCol="features").evaluate(km_pred)

print("ML Job 1: KMeans Clustering")
print(f"  Silhouette Score: {sil:.4f}")
print("------------------------------------------------")

# ---- ML Job 2: Linear Regression (NO leakage) ----
# label = fare_amount, features = trip_distance only
reg_df = VectorAssembler(
    inputCols=["trip_distance"],
    outputCol="features",
    handleInvalid="skip"
).transform(ml_df).select(
    col("fare_amount").alias("label"),
    col("features")
)

train, test = reg_df.randomSplit([0.8, 0.2], seed=42)

rmse_eval = RegressionEvaluator(metricName="rmse", labelCol="label")
r2_eval = RegressionEvaluator(metricName="r2", labelCol="label")

lr_pred = LinearRegression(featuresCol="features", labelCol="label").fit(train).transform(test)
lr_rmse, lr_r2 = rmse_eval.evaluate(lr_pred), r2_eval.evaluate(lr_pred)

print("ML Job 2: Linear Regression (fare_amount prediction)")
print(f"  RMSE: {lr_rmse:.4f} , R2: {lr_r2:.4f}")
print("------------------------------------------------")

# ---- ML Job 3: Random Forest Regression ----
rf_pred = RandomForestRegressor(
    featuresCol="features",
    labelCol="label",
    numTrees=RF_TREES,
    maxDepth=RF_DEPTH,
    seed=42
).fit(train).transform(test)

rf_rmse, rf_r2 = rmse_eval.evaluate(rf_pred), r2_eval.evaluate(rf_pred)

print("ML Job 3: Random Forest Regression (fare_amount prediction)")
print(f"  RMSE: {rf_rmse:.4f} , R2: {rf_r2:.4f}")
print("------------------------------------------------")

# ---- ML Job 4: FPGrowth Frequent Itemsets (FULL dataset, capped baskets) ----
basket_base = ml_df.select(
    col("pickup_location_id").alias("id"),
    (col("trip_distance") / 3).cast("int").cast("string").alias("item")
)

baskets = basket_base.groupBy("id").agg(collect_set(col("item")).alias("items"))
baskets = baskets.select(
    col("id"),
    slice(array_distinct(col("items")), 1, MAX_ITEMS_PER_BASKET).alias("items")
).cache()

_ = baskets.count()

fpg_model = FPGrowth(itemsCol="items", minSupport=FPG_SUPPORT, minConfidence=FPG_CONF).fit(baskets)
itemsets_count = fpg_model.freqItemsets.count()

print("ML Job 4: FPGrowth Frequent Itemsets")
print(f"  Itemsets found: {itemsets_count}")
print("------------------------------------------------")

spark.stop()

print("\nSimulation Finished.")

Cloud-Based Data Processing Service
-----------------------------------
^_^ Please upload the CSV file ... ^_^



Saving taxi_trip_data.csv to taxi_trip_data.csv
File uploaded: taxi_trip_data.csv

Descriptive Statistics (Count/Mean/Min/Max)
-----------------------------------
trip_distance -> Count: 1048575, Mean: 8.8492, Min: 0.0, Max: 7655.76
fare_amount   -> Count: 1048575, Mean: 31.7606, Min: -317.0, Max: 19269.65
-----------------------------------

Workload setting (for fairness across N):
- Same Map/Reduce aggregation job on the FULL dataset for N = [1, 2, 4, 8] using local[N].
- ML jobs are executed once on the FULL dataset (no limit).

Performance Table (Scalability) - FULL dataset
------------------------------------------------
Nodes    | Time(s)    | Speedup    | Efficiency
------------------------------------------------
1        | 13.19      | 1.00       | 1.00      
2        | 10.36      | 1.27       | 0.64      
4        | 9.08       | 1.45       | 0.36      
8        | 10.26      | 1.29       | 0.16      
------------------------------------------------

Machine Learning Jobs & Re