**Step 1: import all necessary packages**

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=8ffd6b0d197134508ce4be7b9d93269a7965490bbf983107a658a24b4ff63846
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, when, mean, max, sum

**Step 2: Create Spark Session and Download CSV**

In [None]:
spark = SparkSession.builder.appName("Car_Predictive_Model").getOrCreate()

In [None]:
!wget https://raw.githubusercontent.com/lschenk3/badm358final/main/car_data.csv
df = spark.read.csv("car_data.csv", header=True, inferSchema=True)

--2024-04-11 01:54:32--  https://raw.githubusercontent.com/lschenk3/badm358final/main/car_data.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1041622 (1017K) [text/plain]
Saving to: ‘car_data.csv.5’


2024-04-11 01:54:33 (15.1 MB/s) - ‘car_data.csv.5’ saved [1041622/1041622]



**Step 3: Explore the Dataset and Clean**

We want to get rid of any Null values and outliers. We also want to change the Year column to Age, as that is easier for predictive models later.Since the same types of cars can havde the same features, we dont need to remove duplicates. We also must remove labels from some numerical columns for easy regression later on

In [None]:
df.columns

['car_name',
 'year',
 'selling_price',
 'km_driven',
 'fuel',
 'seller_type',
 'transmission',
 'owner',
 'mileage',
 'engine',
 'max_power',
 'torque',
 'seats']

In [None]:
df.printSchema()

root
 |-- car_name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- selling_price: integer (nullable = true)
 |-- km_driven: integer (nullable = true)
 |-- fuel: string (nullable = true)
 |-- seller_type: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- max_power: string (nullable = true)
 |-- torque: string (nullable = true)
 |-- seats: integer (nullable = true)



In [None]:
df_rows=df.count()
print(df_rows)

8128


In [None]:
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import FloatType, IntegerType

# Clean the "mileage" column by extracting the numeric value and casting it to FloatType
df = df.withColumn("mileage_clean", regexp_extract("mileage", r"(\d+\.\d+)", 1).cast(FloatType()))

# Clean the "engine" column by extracting the numeric value and casting it to IntegerType
df = df.withColumn("engine_clean", regexp_extract("engine", r"(\d+)", 1).cast(IntegerType()))

# Clean the "max_power" column by extracting the numeric value and casting it to FloatType
df = df.withColumn("max_power_clean", regexp_extract("max_power", r"(\d+\.\d+)", 1).cast(FloatType()))

# Clean the "torque" column by extracting the numeric value and casting it to FloatType
df = df.withColumn("torque_clean", regexp_extract("torque", r"(\d+\.\d+)", 1).cast(FloatType()))

#delete original columns
df = df.drop("mileage", "engine", "max_power", "torque")
df.show()

+--------------------+----+-------------+---------+------+-----------+------------+------------+-----+-------------+------------+---------------+------------+
|            car_name|year|selling_price|km_driven|  fuel|seller_type|transmission|       owner|seats|mileage_clean|engine_clean|max_power_clean|torque_clean|
+--------------------+----+-------------+---------+------+-----------+------------+------------+-----+-------------+------------+---------------+------------+
|Maruti Swift Dzir...|2014|       450000|   145500|Diesel| Individual|      Manual| First Owner|    5|         23.4|        1248|           NULL|        NULL|
|Skoda Rapid 1.5 T...|2014|       370000|   120000|Diesel| Individual|      Manual|Second Owner|    5|        21.14|        1498|         103.52|        NULL|
|Honda City 2017-2...|2006|       158000|   140000|Petrol| Individual|      Manual| Third Owner|    5|         17.7|        1497|           NULL|        12.7|
|Hyundai i20 Sport...|2010|       225000|   12

In [None]:
age_column = 2024 - df["year"]
new_df = df.withColumn("car_age", age_column)
cleaned_df = new_df.drop("year")
cleaned_df.show()

+--------------------+-------------+---------+------+-----------+------------+------------+-----+-------------+------------+---------------+------------+-------+
|            car_name|selling_price|km_driven|  fuel|seller_type|transmission|       owner|seats|mileage_clean|engine_clean|max_power_clean|torque_clean|car_age|
+--------------------+-------------+---------+------+-----------+------------+------------+-----+-------------+------------+---------------+------------+-------+
|Maruti Swift Dzir...|       450000|   145500|Diesel| Individual|      Manual| First Owner|    5|         23.4|        1248|           NULL|        NULL|     10|
|Skoda Rapid 1.5 T...|       370000|   120000|Diesel| Individual|      Manual|Second Owner|    5|        21.14|        1498|         103.52|        NULL|     10|
|Honda City 2017-2...|       158000|   140000|Petrol| Individual|      Manual| Third Owner|    5|         17.7|        1497|           NULL|        12.7|     18|
|Hyundai i20 Sport...|      

In [None]:

col_null_cnt_df =  cleaned_df.select([count(when(col(c).isNull(),c)).alias(c) for c in cleaned_df.columns])
col_null_cnt_df.show()
print(cleaned_df.count())

+--------+-------------+---------+----+-----------+------------+-----+-----+-------------+------------+---------------+------------+-------+
|car_name|selling_price|km_driven|fuel|seller_type|transmission|owner|seats|mileage_clean|engine_clean|max_power_clean|torque_clean|car_age|
+--------+-------------+---------+----+-----------+------------+-----+-----+-------------+------------+---------------+------------+-------+
|       0|            0|        0|   0|          0|           0|    0|  221|          221|         221|           3129|        6570|      0|
+--------+-------------+---------+----+-----------+------------+-----+-----+-------------+------------+---------------+------------+-------+

8128


In [None]:
#torque and max power has a lot of Null values so we will drop the column
cleaned_df = cleaned_df.drop("max_power_clean")
cleaned_df = cleaned_df.drop("torque_clean")

In [None]:
complete_df = cleaned_df.na.drop(how="any")
complete_df_rows=complete_df.count()
print(complete_df_rows)
complete_cnt_df =  complete_df.select([count(when(col(c).isNull(),c)).alias(c) for c in complete_df.columns])
complete_cnt_df.show()

7907
+--------+-------------+---------+----+-----------+------------+-----+-----+-------------+------------+-------+
|car_name|selling_price|km_driven|fuel|seller_type|transmission|owner|seats|mileage_clean|engine_clean|car_age|
+--------+-------------+---------+----+-----------+------------+-----+-----+-------------+------------+-------+
|       0|            0|        0|   0|          0|           0|    0|    0|            0|           0|      0|
+--------+-------------+---------+----+-----------+------------+-----+-----+-------------+------------+-------+



In [None]:
mean_price = cleaned_df.select(mean("selling_price")).collect()[0][0]
std_price= cleaned_df.select(stddev("selling_price")).collect()[0][0]
print(mean_price)
print(std_price)

638271.8077017716
806253.4035082327


In [None]:
upperbound = mean_price + (1.5*std_price)
lowerbound = mean_price - (1.5*std_price)
print(upperbound)
print(lowerbound)

1847651.9129641205
-571108.2975605773


In [None]:
new_df = cleaned_df.filter((complete_df["selling_price"] <= upperbound) & (complete_df["selling_price"] >= lowerbound))
new_df.count()

7692

In [None]:
selected = new_df.select("car_age", "selling_price", "km_driven", "seats")
stats = selected.describe()
stats.show()

+-------+------------------+------------------+-----------------+------------------+
|summary|           car_age|     selling_price|        km_driven|             seats|
+-------+------------------+------------------+-----------------+------------------+
|  count|              7692|              7692|             7692|              7473|
|   mean|10.406136245449819|476099.48725949036|72099.53003120126| 5.426736250501807|
| stddev|  4.03912780462313|301606.69051502965|56982.02957785775|0.9709469439850772|
|    min|                 4|             29999|                1|                 2|
|    max|                41|           1825000|          2360457|                14|
+-------+------------------+------------------+-----------------+------------------+



**Step 4: Vectorize Dataset**

We must import all packages.

Then vectorize the dataset

In [None]:
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

In [None]:
new_df.printSchema()

root
 |-- car_name: string (nullable = true)
 |-- selling_price: integer (nullable = true)
 |-- km_driven: integer (nullable = true)
 |-- fuel: string (nullable = true)
 |-- seller_type: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- seats: integer (nullable = true)
 |-- mileage_clean: float (nullable = true)
 |-- engine_clean: integer (nullable = true)
 |-- car_age: integer (nullable = true)



In [None]:
#Convert string into numerical columns
string_cols = ["car_name", "fuel", "seller_type", "transmission", "owner"]

indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep") for col in string_cols]

encoder = OneHotEncoder(inputCols=[indexer.getOutputCol() for indexer in indexers],
                        outputCols=[col+"_encoded" for col in string_cols])

In [None]:
pipeline = Pipeline(stages=indexers + [encoder])
transformed_df = pipeline.fit(new_df).transform(new_df)
transformed_df = transformed_df.drop(*string_cols)

In [None]:
cnt_df =  transformed_df.select([count(when(col(c).isNull(),c)).alias(c) for c in transformed_df.columns])
cnt_df.show()

+-------------+---------+-----+-------------+------------+-------+--------------+----------+-----------------+------------------+-----------+----------------+------------+-------------------+--------------------+-------------+
|selling_price|km_driven|seats|mileage_clean|engine_clean|car_age|car_name_index|fuel_index|seller_type_index|transmission_index|owner_index|car_name_encoded|fuel_encoded|seller_type_encoded|transmission_encoded|owner_encoded|
+-------------+---------+-----+-------------+------------+-------+--------------+----------+-----------------+------------------+-----------+----------------+------------+-------------------+--------------------+-------------+
|            0|        0|  219|          219|         219|      0|             0|         0|                0|                 0|          0|               0|           0|                  0|                   0|            0|
+-------------+---------+-----+-------------+------------+-------+--------------+----------+

In [None]:
transformed_df = transformed_df.na.drop(how="any")
transformed_df_rows=transformed_df.count()
print(transformed_df_rows)
transformed_cnt_df =  transformed_df.select([count(when(col(c).isNull(),c)).alias(c) for c in transformed_df.columns])
transformed_cnt_df.show()

7473
+-------------+---------+-----+-------------+------------+-------+--------------+----------+-----------------+------------------+-----------+----------------+------------+-------------------+--------------------+-------------+
|selling_price|km_driven|seats|mileage_clean|engine_clean|car_age|car_name_index|fuel_index|seller_type_index|transmission_index|owner_index|car_name_encoded|fuel_encoded|seller_type_encoded|transmission_encoded|owner_encoded|
+-------------+---------+-----+-------------+------------+-------+--------------+----------+-----------------+------------------+-----------+----------------+------------+-------------------+--------------------+-------------+
|            0|        0|    0|            0|           0|      0|             0|         0|                0|                 0|          0|               0|           0|                  0|                   0|            0|
+-------------+---------+-----+-------------+------------+-------+--------------+------

In [None]:
feature_cols = [col for col in transformed_df.columns if col != "selling_price"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
transformed_df = assembler.transform(transformed_df)

**Step 5: Train the Dataset and Create the Model**

In [None]:
train, test = transformed_df.randomSplit([0.7, 0.3], seed=42)

In [None]:
lr = LinearRegression(featuresCol="features", labelCol="selling_price", predictionCol="prediction")
lr_model = lr.fit(train)

In [None]:
train_predictions = lr_model.transform(train)
test_predictions = lr_model.transform(test)

In [None]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("R2: %f" % trainingSummary.r2)
print("Intercept: %s" % str(lr_model.intercept))
print("Coefficients: %s" % str(lr_model.coefficients))

RMSE: 64636.840737
R2: 0.954237
Intercept: 1026352.1586668593
Coefficients: [-0.1565561434386861,-96134.23313917291,8171.427611381418,121.39833474609362,-27895.819975949988,23.524368099590006,-46038.65987312768,-13382.819267821602,63190.44029996329,-8133.9026942583605,-79138.40893848873,-177627.54602149548,-69238.38989325642,-155684.24426404748,-121381.95304764222,-33209.939928590466,-185598.6407640247,-147706.1632690297,223568.34341405917,-58212.277126496396,-28571.72607941987,482623.379929772,-220068.2027093626,-61893.0097217847,48755.670476132356,82333.05178581638,-14586.115099802135,-225588.7316528132,-126228.74295547168,-183451.9357583742,190698.16350182347,199217.7340348309,-139630.67325144107,-111795.41158205751,-51370.11746273204,140627.8604044834,-76227.96236296896,-145126.2254392501,-258205.27402812324,-107427.93092963753,-12988.083186678816,374626.35124676913,-192793.7496530959,-1869.671365853111,-73963.11073508182,-14875.967202557498,-86438.21922556996,-57631.1865292869,-18