<a href="https://colab.research.google.com/github/SanmeetGulati/Big-Data-Analytics-Basics/blob/main/car_evaluation_ml_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Q1 - Car Evaluation**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

Create Spark Session

In [None]:
spark = SparkSession.builder.appName("CarPriceClassification").getOrCreate()

Load dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
data_path = "/content/drive/My Drive/car_data.csv"
df = spark.read.option("header", True).option("inferSchema", True).csv(data_path)

print("Dataset Loaded Successfully!")
df.printSchema()
df.show(5)

Dataset Loaded Successfully!
root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)

+----+----------+----+--------------------+---------+----------------+-----------------+----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
|Make|     Model|Year|    Engine Fuel Type|Engine HP|Engine Cylin

Drop rows with missing MSRP

In [None]:
df = df.na.drop(subset=["MSRP"])

Create target variable (price category)

In [None]:
df = df.withColumn(
    "price_category",
    when(col("MSRP") < 20000, "Low")
    .when((col("MSRP") >= 20000) & (col("MSRP") <= 40000), "Medium")
    .otherwise("High")
)


Drop unnecessary columns (Make, Model, Year optional)

In [None]:
df = df.drop("Make", "Model", "Market Category")

Identify categorical and numerical features

In [None]:
categorical_cols = ["Engine Fuel Type", "Transmission Type", "Driven_Wheels", "Vehicle Size", "Vehicle Style"]
numeric_cols = ["Engine HP", "Engine Cylinders", "Number of Doors", "highway MPG", "city mpg", "Popularity"]

Handle missing numeric values

In [None]:
for c in numeric_cols:
    df = df.na.fill({c: 0})

StringIndexer + OneHotEncoder for categorical columns

In [None]:
indexers = [StringIndexer(inputCol=c, outputCol=c + "_idx", handleInvalid="keep") for c in categorical_cols]
encoder = OneHotEncoder(
    inputCols=[c + "_idx" for c in categorical_cols],
    outputCols=[c + "_ohe" for c in categorical_cols]
)

Index target variable

In [None]:
label_indexer = StringIndexer(inputCol="price_category", outputCol="label")

Assemble all features

In [None]:
assembler = VectorAssembler(
    inputCols=[c + "_ohe" for c in categorical_cols] + numeric_cols,
    outputCol="features_raw"
)
scaler = StandardScaler(inputCol="features_raw", outputCol="features")

Build model

In [None]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100, seed=42)

Create pipeline

In [None]:
pipeline = Pipeline(stages=indexers + [encoder, label_indexer, assembler, scaler, rf])

Split data

In [None]:
train, test = df.randomSplit([0.8, 0.2], seed=42)

Train model

In [None]:
model = pipeline.fit(train)

In [None]:
preds = model.transform(test)

Evaluate

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(preds)
print(f"\nModel Accuracy: {accuracy:.4f}")


Model Accuracy: 0.7899


Classification report with safe check for empty RDD

In [None]:
if preds.count() == 0:
    print("No predictions available; test set may be empty or has issues.")
else:
    preds_and_labels = preds.select("prediction", "label").rdd.map(lambda r: (float(r[0]), float(r[1])))
    metrics = MulticlassMetrics(preds_and_labels)

    print("\nClassification Report")
    labels = preds_and_labels.map(lambda x: x[1]).distinct().collect()
    for label in labels:
        print(f"Class {label}: Precision={metrics.precision(label):.3f}, Recall={metrics.recall(label):.3f}, F1={metrics.fMeasure(label):.3f}")

    print("\nConfusion Matrix")
    print(metrics.confusionMatrix().toArray())




Classification Report
Class 2.0: Precision=0.844, Recall=0.613, F1=0.710
Class 0.0: Precision=0.742, Recall=0.899, F1=0.813
Class 1.0: Precision=0.879, Recall=0.724, F1=0.794

Confusion Matrix
[[1053.   64.   54.]
 [ 185.  486.    0.]
 [ 181.    3.  292.]]


Save model

In [None]:
model.save("models/car_price_rf_model")
print("\nModel saved successfully as 'models/car_price_rf_model'")


Model saved successfully as 'models/car_price_rf_model'


In [None]:
spark.stop()

**Q2 — Telecom Churn Classification**

Create Spark session

In [None]:
spark = SparkSession.builder.appName("TelecomChurnClassification").getOrCreate()

Load dataset

In [None]:
data_path = "/content/drive/My Drive/Churn_Modelling.csv"
df = spark.read.option("header", True).option("inferSchema", True).csv(data_path)

print("Dataset Loaded Successfully!")
df.printSchema()
df.show(5)

Dataset Loaded Successfully!
root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId| Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+

Drop unnecessary columns

In [None]:
cols_to_drop = ['RowNumber', 'CustomerId', 'Surname']
df = df.drop(*cols_to_drop)

Handle categorical variables

In [None]:
cat_cols = ['Geography', 'Gender']
indexers = [StringIndexer(inputCol=c, outputCol=c+"_index", handleInvalid="keep") for c in cat_cols]

Define feature columns

In [None]:
feature_cols = [c for c in df.columns if c not in ['Exited'] + cat_cols]
feature_cols += [c + "_index" for c in cat_cols]

Assemble features

In [None]:
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

Label encoding

In [None]:
label_indexer = StringIndexer(inputCol="Exited", outputCol="label")

Apply transformations

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=indexers + [label_indexer, assembler])
data = pipeline.fit(df).transform(df)

Split into train/test

In [None]:
train, test = data.randomSplit([0.8, 0.2], seed=42)

Train model

In [None]:
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=100, maxDepth=6)
model = rf.fit(train)

Predictions

In [None]:
predictions = model.transform(test)
predictions.select("label", "prediction", "probability").show(10, truncate=False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|1.0  |0.0       |[0.7164837894083183,0.28351621059168175]|
|1.0  |1.0       |[0.04095023955194143,0.9590497604480587]|
|1.0  |1.0       |[0.11107404822002012,0.8889259517799799]|
|1.0  |1.0       |[0.21093233017033824,0.7890676698296617]|
|1.0  |1.0       |[0.2587129226338771,0.741287077366123]  |
|1.0  |0.0       |[0.8811319736741992,0.11886802632580082]|
|0.0  |0.0       |[0.9300995464137289,0.06990045358627103]|
|0.0  |0.0       |[0.8706012646387378,0.12939873536126226]|
|0.0  |0.0       |[0.8574074121879817,0.14259258781201833]|
|0.0  |0.0       |[0.9258531729579172,0.07414682704208271]|
+-----+----------+----------------------------------------+
only showing top 10 rows



Evaluate

In [None]:
multi_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
binary_eval = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction")

accuracy = multi_eval.evaluate(predictions, {multi_eval.metricName: "accuracy"})
f1 = multi_eval.evaluate(predictions, {multi_eval.metricName: "f1"})
precision = multi_eval.evaluate(predictions, {multi_eval.metricName: "weightedPrecision"})
recall = multi_eval.evaluate(predictions, {multi_eval.metricName: "weightedRecall"})
auc = binary_eval.evaluate(predictions)

print("\nClassification Report:")
print(f"Accuracy :  {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall   : {recall:.4f}")
print(f"F1 Score : {f1:.4f}")
print(f"AUC-ROC  : {auc:.4f}")



Classification Report:
Accuracy :  0.8511
Precision: 0.8444
Recall   : 0.8511
F1 Score : 0.8311
AUC-ROC  : 0.8484


Stop Spark session

In [None]:
spark.stop()