In [1]:
from google.colab import files
file = files.upload()

Saving Car-Price-Hst.csv to Car-Price-Hst (1).csv


In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Car-sales-Regression-analysis').getOrCreate()

In [4]:
spark

In [5]:
df = spark.read.csv('Car-Price-Hst.csv', header=True, inferSchema=True)

In [6]:
df.show()

+-------------+------+---------+-------+-------+-----------+------------+----+----------------+
|        Brand| Price|     Body|Mileage|EngineV|Engine Type|Registration|Year|           Model|
+-------------+------+---------+-------+-------+-----------+------------+----+----------------+
|          BMW|  4200|    sedan|    277|      2|     Petrol|         yes|1991|             320|
|Mercedes-Benz|  7900|      van|    427|    2.9|     Diesel|         yes|1999|    Sprinter 212|
|Mercedes-Benz| 13300|    sedan|    358|      5|        Gas|         yes|2003|           S 500|
|         Audi| 23000|crossover|    240|    4.2|     Petrol|         yes|2007|              Q7|
|       Toyota| 18300|crossover|    120|      2|     Petrol|         yes|2011|           Rav 4|
|Mercedes-Benz|199999|crossover|      0|    5.5|     Petrol|         yes|2016|          GLS 63|
|          BMW|  6100|    sedan|    438|      2|        Gas|         yes|1997|             320|
|         Audi| 14200|    vagon|    200|

In [7]:
df.describe().show()

+-------+----------+-----------------+---------+------------------+-----------------+-----------+------------+------------------+------------------+
|summary|     Brand|            Price|     Body|           Mileage|          EngineV|Engine Type|Registration|              Year|             Model|
+-------+----------+-----------------+---------+------------------+-----------------+-----------+------------+------------------+------------------+
|  count|      4345|             4345|     4345|              4345|             4345|       4345|        4345|              4345|              4345|
|   mean|      NULL|19418.74693505871|     NULL| 161.2372842347526|2.790734207389744|       NULL|        NULL|2006.5500575373992| 447.7392120075047|
| stddev|      NULL|25584.24262025089|     NULL|105.70579715642641|5.066437281595026|       NULL|        NULL|  6.71909682043085|185.61755668714557|
|    min|      Audi|            10000|crossover|                 0|              0.6|     Diesel|         

In [8]:
num_rows = df.count()
num_cols = len(df.columns)
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_cols}")

Number of rows: 4345
Number of columns: 9


In [9]:
# Finding the nan values columnwise

In [10]:
from pyspark.sql.functions import isnan, when, count

In [11]:
nan_counts = df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).collect()[0]

for column, count in nan_counts.asDict().items():
    print(f"Column '{column}' has {count} NaN values.")

Column 'Brand' has 0 NaN values.
Column 'Price' has 0 NaN values.
Column 'Body' has 0 NaN values.
Column 'Mileage' has 0 NaN values.
Column 'EngineV' has 0 NaN values.
Column 'Engine Type' has 0 NaN values.
Column 'Registration' has 0 NaN values.
Column 'Year' has 0 NaN values.
Column 'Model' has 0 NaN values.


In [12]:
# Encoding

In [13]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [14]:
df.show(1)

+-----+-----+-----+-------+-------+-----------+------------+----+-----+
|Brand|Price| Body|Mileage|EngineV|Engine Type|Registration|Year|Model|
+-----+-----+-----+-------+-------+-----------+------------+----+-----+
|  BMW| 4200|sedan|    277|      2|     Petrol|         yes|1991|  320|
+-----+-----+-----+-------+-------+-----------+------------+----+-----+
only showing top 1 row



In [15]:
df.describe()

DataFrame[summary: string, Brand: string, Price: string, Body: string, Mileage: string, EngineV: string, Engine Type: string, Registration: string, Year: string, Model: string]

In [16]:
df.printSchema()

root
 |-- Brand: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Body: string (nullable = true)
 |-- Mileage: integer (nullable = true)
 |-- EngineV: string (nullable = true)
 |-- Engine Type: string (nullable = true)
 |-- Registration: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Model: string (nullable = true)



In [17]:
indexer = StringIndexer(inputCols=['Brand','Price','Body','EngineV','Engine Type','Registration','Model'], outputCols=['i_Brand','i_Price','i_Body','i_ngineV','i_Engine_Type','i_Registration','i_Model'])
indexed_df = indexer.fit(df).transform(df)

In [18]:
indexed_df.show(2)

+-------------+-----+-----+-------+-------+-----------+------------+----+------------+-------+-------+------+--------+-------------+--------------+-------+
|        Brand|Price| Body|Mileage|EngineV|Engine Type|Registration|Year|       Model|i_Brand|i_Price|i_Body|i_ngineV|i_Engine_Type|i_Registration|i_Model|
+-------------+-----+-----+-------+-------+-----------+------------+----+------------+-------+-------+------+--------+-------------+--------------+-------+
|          BMW| 4200|sedan|    277|      2|     Petrol|         yes|1991|         320|    2.0|  109.0|   0.0|     0.0|          1.0|           0.0|   14.0|
|Mercedes-Benz| 7900|  van|    427|    2.9|     Diesel|         yes|1999|Sprinter 212|    1.0|   41.0|   2.0|    29.0|          0.0|           0.0|  189.0|
+-------------+-----+-----+-------+-------+-----------+------------+----+------------+-------+-------+------+--------+-------------+--------------+-------+
only showing top 2 rows



In [19]:
encoder = OneHotEncoder(inputCols=['i_Brand','i_Price','i_Body','i_ngineV','i_Engine_Type','i_Registration','i_Model'], outputCols=['encoded_Brand','encoded_Price','encoded_Body','encoded_EngineV','encoded_Engine_Type','encoded_Registration','encoded_Model'])
encoded_df = encoder.fit(indexed_df).transform(indexed_df)

In [20]:
encoded_df.show(2)

+-------------+-----+-----+-------+-------+-----------+------------+----+------------+-------+-------+------+--------+-------------+--------------+-------+-------------+-----------------+-------------+---------------+-------------------+--------------------+-----------------+
|        Brand|Price| Body|Mileage|EngineV|Engine Type|Registration|Year|       Model|i_Brand|i_Price|i_Body|i_ngineV|i_Engine_Type|i_Registration|i_Model|encoded_Brand|    encoded_Price| encoded_Body|encoded_EngineV|encoded_Engine_Type|encoded_Registration|    encoded_Model|
+-------------+-----+-----+-------+-------+-----------+------------+----+------------+-------+-------+------+--------+-------------+--------------+-------+-------------+-----------------+-------------+---------------+-------------------+--------------------+-----------------+
|          BMW| 4200|sedan|    277|      2|     Petrol|         yes|1991|         320|    2.0|  109.0|   0.0|     0.0|          1.0|           0.0|   14.0|(6,[2],[1.0])|

In [21]:
# Spliting the dataset

In [22]:
(training_data, test_data) = encoded_df.randomSplit([0.9, 0.1], seed=42)

# Print the number of rows in each set
print("Number of rows in training set:", training_data.count())
print("Number of rows in test set:", test_data.count())

Number of rows in training set: 3912
Number of rows in test set: 433


In [23]:
# 'Price' is the target variable
feature_cols = [col for col in training_data.columns if col != 'Price']

# Select the features and target variable
X_train = training_data.select(feature_cols)
y_train = training_data.select('Price')

X_test = test_data.select(feature_cols)
y_test = test_data.select('Price')

In [24]:
X_train.count()

3912

In [25]:
y_test.count()

433

In [26]:
# Implementing the ML Regression Alg.


In [27]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

In [35]:
from pyspark.ml.linalg import DenseVector
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Define a UDF to convert sparse vectors to a scalar value
def vector_to_double(vector):
    return float(vector.values[0]) if vector.values else 0.0

# Register the UDF
vector_to_double_udf = udf(vector_to_double, DoubleType())

# Apply the UDF to convert encoded_Price into a numeric column
assembled_data = assembled_data.withColumn("label", vector_to_double_udf("encoded_Price"))

# Verify the structure of assembled_data
assembled_data.select("features", "label").show(5)

# Step 2: Initialize and train the Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(assembled_data)

# Step 3: Make predictions on the data
predictions = lr_model.transform(assembled_data)

# Step 4: Evaluate the model using RMSE
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

# Optional: Show some example predictions
predictions.select("features", "label", "prediction").show(5)


+--------------------+-----+
|            features|label|
+--------------------+-----+
|(399,[2,6,11,85,8...|  1.0|
|(399,[1,8,40,84,8...|  1.0|
|(399,[1,6,29,86,8...|  1.0|
|(399,[5,7,30,85,8...|  1.0|
|(399,[3,7,11,85,8...|  1.0|
+--------------------+-----+
only showing top 5 rows

Root Mean Squared Error (RMSE): 0.015078073278277338
+--------------------+-----+------------------+
|            features|label|        prediction|
+--------------------+-----+------------------+
|(399,[2,6,11,85,8...|  1.0|1.0003893979883913|
|(399,[1,8,40,84,8...|  1.0|0.9997226973455153|
|(399,[1,6,29,86,8...|  1.0|1.0002176853716014|
|(399,[5,7,30,85,8...|  1.0|1.0001904181015626|
|(399,[3,7,11,85,8...|  1.0| 1.000163938387686|
+--------------------+-----+------------------+
only showing top 5 rows

