## CIS5560: Final Term Project -> Linear Regression

### Andrew Pang (apang5@calstatela.edu)

In [1]:
%pyspark
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA, BisectingKMeans
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import regexp_extract, col
from pyspark.ml.tuning import CrossValidator, TrainValidationSplit, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline
import time




from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.regression import LinearRegression


from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

import re

# Configure Settings for spark-submit

In [3]:
%pyspark
# True when to create Python soure code to run with spark-submit 
IS_SPARK_SUBMIT_CLI = False

if IS_SPARK_SUBMIT_CLI:
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)

# Import and Parse the Dataset

In [5]:
%pyspark
# File location and type
file_location = "/user/apang5/used_cars_sample_data--01percent.csv"
file_type = "csv"


# Load the CSV with safe parsing options
df = spark.read.format(file_type) \
 .option("header", "true") \
 .option("inferSchema", "true") \
 .option("sep", ",") \
 .option("quote", "\"") \
 .option("escape", "\"") \
 .option("multiLine", "true") \
 .option("mode", "PERMISSIVE") \
 .load(file_location)

df.printSchema()

In [6]:
%pyspark
# Create a view or table

#temp_table_name = "used_cars_sample_data_csv"

#df.createOrReplaceTempView(temp_table_name)

# Select Columns/Features

In [8]:
%pyspark

categorical_cols = ["body_type","engine_type","make_name","fuel_type","maximum_seating","model_name","wheel_system","year"]
numeric_cols = ["city_fuel_economy","highway_fuel_economy","mileage"]

columns = categorical_cols + numeric_cols

data = df.select(columns + ["price"])

print(data.columns)

print(f'# of Rows: {data.count()}\n# of Columns: {len(data.columns)}')

# Split the Dataset

In [10]:
%pyspark

splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, "\nTesting Rows:", test_rows)

# Preprocess the Categorical Features

In [12]:
%pyspark

# Select categorical columns

# Handle the nulls in categorical columns by replacing nulls with 'unknown'
for col_name in categorical_cols:
    data = data.fillna({col_name: 'unknown'})
    
# Step 1: Convert string categories to numeric indices
indexers = [StringIndexer(inputCol = col,
                          outputCol = col + "_index",
                          handleInvalid = "skip") for col in categorical_cols]
                          

# Step 2: One-hot encode the indices
encoder = OneHotEncoder(
    inputCols=[col + "_index" for col in categorical_cols],
    outputCols=[col + "_vec" for col in categorical_cols]
)

# Step 3: Assemble one-hot encoded vectors into a single feature vector
encoded_cols = [col + "_vec" for col in categorical_cols]

# Assemble categorical vector
categorical_VectorAssembler = VectorAssembler(
    inputCols=[f"{col}_vec" for col in categorical_cols],
    outputCol="catFeatures"
)

# Preprocess the Numerical Features

In [14]:
%pyspark

# Cast numeric columns
for col_name in numeric_cols:
    data = data.withColumn(col_name, col(col_name).cast('double'))

# Handle the nulls in numeric columns with the mean
for col_name in numeric_cols:
    mean_value = data.selectExpr(f"avg({col_name}) as mean").first()["mean"]
    data = data.na.fill({col_name: mean_value})

# Assemble numeric features
numeric_assembler = VectorAssembler(inputCols=numeric_cols, outputCol="numeric_features", handleInvalid = "skip")

# Scale numeric features
scaler = MinMaxScaler(inputCol="numeric_features", outputCol="scaled_features")

# Combine Categorical & Numeric Features

In [16]:
%pyspark

# Final assembler: combine scaled numeric + categorical
featVect = VectorAssembler(inputCols=["catFeatures", "scaled_features"], outputCol="features")

# Define the Model

In [18]:
%pyspark

# Use Linear Regression instead of Decision Tree Classifier
lr = LinearRegression(featuresCol="features", labelCol="price")

# Define the Pipeline

In [20]:
%pyspark
pipeline = Pipeline(stages=indexers + [encoder, categorical_VectorAssembler, numeric_assembler, scaler, featVect, lr])

# Build the ParamGridBuilder

In [22]:
%pyspark

paramGrid = ParamGridBuilder() \
  .addGrid(lr.regParam, [0.0, 0.01, 0.1]) \
  .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
  .addGrid(lr.maxIter, [10, 50]) \
  .build()

# Train Validation Split

In [24]:
%pyspark

tvs = TrainValidationSplit(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse"))
                    
start = time.time()
tvsModel = tvs.fit(train)
end = time.time()

tvs_time = end - start

tvs_predictions = tvsModel.transform(test)

print(f"tvsModel fit time: {tvs_time:.2f} seconds")

# EVAL

evaluator_rmse = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")

tvs_rmse = evaluator_rmse.evaluate(tvs_predictions)
tvs_r2 = evaluator_r2.evaluate(tvs_predictions)

print("TVS RMSE =", tvs_rmse)
print("TVS R² =", tvs_r2)

# Cross Validator

In [26]:
%pyspark

cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse"), numFolds=3)



start = time.time()
cvModel = cv.fit(train)
end = time.time()

cv_time = end - start

cv_predictions = cvModel.transform(test)

print(f"cvModel fit time: {cv_time:.2f} seconds")

# EVAL

evaluator_rmse = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")

cv_rmse = evaluator_rmse.evaluate(cv_predictions)
cv_r2 = evaluator_r2.evaluate(cv_predictions)

print("CV RMSE =", cv_rmse)
print("CV R² =", cv_r2)

# Extract the Coefficients/Hyper-Parameters from the Best Model

In [28]:
%pyspark

# Note: LinearRegression uses coefficients, not featureImportances

# 1. Define feature names in the same order they were fed into VectorAssembler
all_feature_names = numeric_cols + [col + "_index" for col in categorical_cols]

finalStageModel = cvModel.bestModel.stages[-1]

# 2. Get coefficients
coefficients = finalStageModel.coefficients.toArray()

# 3. Create DataFrame
import pandas as pd
featureImp = pd.DataFrame(
    zip(all_feature_names, coefficients),
    columns=["feature", "coefficient"]
)

# 4. Sort by importance
featureImp = featureImp.reindex(featureImp.coefficient.abs().sort_values(ascending=False).index)

# 5. Print
print("Best regParam:", finalStageModel.getRegParam())
print("Best elasticNetParam:", finalStageModel.getElasticNetParam())
print(featureImp)


# Summarize the Evaluation Results

In [30]:
%pyspark

# --- Summary of All Results ---
print(f"\n--- Summary of All Results ---")
print(f"Training Rows: {train_rows:,}\nTesting Rows: {test_rows:,}")

print(f"\n--- Summary of Cross-Validation Results ---")
print(f"Cross-Validated RMSE: {cv_rmse:.4f}")
print(f"Cross-Validated R2:   {cv_r2:.4f}")
print(f"CV Time: {cv_time:.2f} seconds")

print(f"\n--- Summary of Train-Validation-Split Results ---")
print(f"TrainValidationSplit RMSE: {tvs_rmse:.4f}")
print(f"TrainValidationSplit R2:   {tvs_r2:.4f}")
print(f"TVS Time: {tvs_time:.2f} seconds")
