# Decision Tree Model

In [0]:
# ===
# Decision Tree Regressor (PySpark + MLlib)
# Hyperparameter tuning + 5-fold CV + Feature Importances
# ===

# Step 1: Install dependencies
!pip install pyspark matplotlib
!apt-get update -qq > /dev/null
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

# Step 1.5: load libraries
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import matplotlib.pyplot as plt
import pandas as pd

# --- Step 2.5: Mount Google Drive ---
from google.colab import drive
drive.mount('/content/drive')


# --- Step 2: Start Spark session ---
spark = SparkSession.builder.master("local[*]").appName("DecisionTreeTuning").getOrCreate()

# --- Step 3: Load dataset ---
data_path = "/content/drive/MyDrive/ait614_rutting2/data/processed/rutting_climate_traffic.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

print(f"Total rows: {df.count()}")
df.printSchema()

# --- Step 4: Define features and target ---
features = [
    "REL_HUM_AVG_AVG",
    "PRECIPITATION",
    "EVAPORATION",
    "PRECIP_DAYS",
    "CLOUD_COVER_AVG",
    "SHORTWAVE_SURFACE_AVG",
    "TEMP_AVG",
    "FREEZE_INDEX",
    "FREEZE_THAW",
    "WIND_VELOCITY_AVG",
    "AADTT_VEH_CLASS_4_TREND",
    "AADTT_VEH_CLASS_5_TREND",
    "AADTT_VEH_CLASS_6_TREND",
    "AADTT_VEH_CLASS_7_TREND",
    "AADTT_VEH_CLASS_8_TREND",
    "AADTT_VEH_CLASS_9_TREND",
    "AADTT_VEH_CLASS_10_TREND",
    "AADTT_VEH_CLASS_11_TREND",
    "AADTT_VEH_CLASS_12_TREND",
    "AADTT_VEH_CLASS_13_TREND"
]
target = "MAX_MEAN_DEPTH_1_8"

# --- Step 5: Split dataset into train/test (80/20) ---
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
print(f"Training set size: {train_df.count()}")
print(f"Test set size: {test_df.count()}")
print(f"Number of features: {len(features)}")
print(f"Number of targets: {len([target])}")

# Step 6: Assemble feature vector
assembler = VectorAssembler(inputCols=features, outputCol="features")

# Step 7: Define Decision Tree Regressor
dt = DecisionTreeRegressor(
    labelCol=target,
    featuresCol="features",
    seed=42
)

# Step 8: Build pipeline
pipeline = Pipeline(stages=[assembler, dt])

# Step 9: Define hyperparameter grid
# Criterion hyperparameter -> only "variance" is supported for regression
paramGrid = (ParamGridBuilder()
    .addGrid(dt.maxDepth, [5, 10, 15])
    .addGrid(dt.minInstancesPerNode, [1, 2, 4])   # similar to min_samples_leaf
    .addGrid(dt.minInfoGain, [0.0, 0.001, 0.01, 0.05]) # minimum info gain required for split
    .addGrid(dt.maxBins, [32, 64])                # affects feature splits
    .build())

# Step 10: Define evaluator (RMSE)
evaluator = RegressionEvaluator(
    labelCol=target,
    predictionCol="prediction",
    metricName="rmse"
)

# Step 11: Cross-validation for hyperparameter tuning
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5,   # 5-fold CV
    parallelism=2,
    seed=42
)

print("Training Decision Tree with 5-fold CV (this may take several minutes)...")
cv_model = cv.fit(train_df)

# Step 12: Get 5-fold CV RMSE for best model config
best_cv_rmse = min(cv_model.avgMetrics)  # avg RMSE across folds for best hyperparameters
print("\n=== Decision Tree Results ===")
print(f"5-Fold CV RMSE (best hyperparameters): {best_cv_rmse:.3f}")

# Step 13: Get best model
best_model = cv_model.bestModel

# Step 14: Show best hyperparameters
best_dt = best_model.stages[-1]
print("\n=== Best Hyperparameters ===")
print(f"Max Depth: {best_dt.getOrDefault('maxDepth')}")
print(f"Min Instances per Node (min_samples_leaf): {best_dt.getOrDefault('minInstancesPerNode')}")
print(f"Min Info Gain: {best_dt.getOrDefault('minInfoGain')}")
print(f"Max Bins: {best_dt.getOrDefault('maxBins')}")
print(f"Criterion: variance (default and only supported for regression)")

# Step 15: Feature Importance Plot
importances = best_dt.featureImportances
feature_importances = [(features[i], importances[i]) for i in range(len(features))]
feature_importances = sorted(feature_importances, key=lambda x: x[1], reverse=True)

# Convert to Pandas DataFrame for plotting
fi_df = pd.DataFrame(feature_importances, columns=["Feature", "Importance"])

plt.figure(figsize=(10, 6))
plt.barh(fi_df["Feature"], fi_df["Importance"], color="steelblue")
plt.gca().invert_yaxis()
plt.title("Decision Tree Feature Importances")
plt.xlabel("Importance")
plt.ylabel("Feature")
plt.tight_layout()
plt.show()

# Step 16: Stop Spark session
spark.stop()
