In [None]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=6efc03e3756fe1b607399aca65889bc2f6a9df78192b1fee54af6762996080a5
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, sum, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import FeatureHasher
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import DecisionTreeRegressor


In [None]:
spark = SparkSession.builder.appName("Colab").getOrCreate()

In [None]:
df = spark.read.csv("/content/car_prices.csv",header=True,inferSchema=True)

In [None]:
check_missig = df.select([sum(col(i).isNull().cast("int")).alias(i) for i in df.columns])
check_missig.show()

+----+----+-----+----+-----+---+-----+---------+--------+-----+--------+------+---+------------+--------+
|year|make|model|trim| body|vin|state|condition|odometer|color|interior|seller|mmr|sellingprice|saledate|
+----+----+-----+----+-----+---+-----+---------+--------+-----+--------+------+---+------------+--------+
|   0|7588| 7659|7917|10461|  0|    0|    11797|      88|  594|     594|     0|  7|           3|       3|
+----+----+-----+----+-----+---+-----+---------+--------+-----+--------+------+---+------------+--------+



In [None]:
df = df.drop(df.columns[5])

In [None]:
df = df.na.drop()

In [None]:
df.show()

+----+---------+-------------------+--------------------+-----------+-----------------+-----+---------+--------+------+--------+--------------------+-----+------------+--------------------+
|year|     make|              model|                trim|       body|              vin|state|condition|odometer| color|interior|              seller|  mmr|sellingprice|            saledate|
+----+---------+-------------------+--------------------+-----------+-----------------+-----+---------+--------+------+--------+--------------------+-----+------------+--------------------+
|2015|      Kia|            Sorento|                  LX|        SUV|5xyktca69fg566472|   ca|        5|   16639| white|   black|kia motors americ...|20500|       21500|Tue Dec 16 2014 1...|
|2015|      Kia|            Sorento|                  LX|        SUV|5xyktca69fg561319|   ca|        5|    9393| white|   beige|kia motors americ...|20800|       21500|Tue Dec 16 2014 1...|
|2014|      BMW|           3 Series|          328i

In [None]:
selected_features = ['year', 'odometer', 'make', 'model', 'trim', 'body', 'condition', 'color', 'interior']


In [None]:
hasher = FeatureHasher(inputCols=selected_features, outputCol="features")
df_transformed = hasher.transform(df)

In [None]:
from pyspark.ml.stat import Correlation

In [None]:
# Calculate correlation matrix
correlation_matrix = Correlation.corr(df_transformed, "features").head()[0]

# Convert to Pandas DataFrame for better readability
import pandas as pd
corr_df = pd.DataFrame(correlation_matrix.toArray(), columns=selected_features, index=selected_features)
print(corr_df)

ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
df_transformed.show()

ConnectionRefusedError: [Errno 111] Connection refused

----- Model  --

In [None]:
train_data = df_transformed.sample(0.8,seed=41)
test_data = df_transformed.subtract(train_data)

In [None]:
ln_reg = LinearRegression(featuresCol="features", labelCol="sellingprice")

In [None]:
lm = ln_reg.fit(train_data)

In [None]:
prediction = lm.transform(test_data)

In [None]:
evaluator = RegressionEvaluator(labelCol="sellingprice", predictionCol="prediction", metricName="rmse")

In [None]:
r_squared = evaluator.evaluate(prediction, {evaluator.metricName: "r2"})
print("R-squared:", r_squared)

R-squared: 0.7916480652546654


In [None]:
rmse = evaluator.evaluate(prediction)
print("Root Mean Squared Error (RMSE) on test data:", rmse)

Root Mean Squared Error (RMSE) on test data: 5506.302844581191


In [None]:
df.describe("sellingprice").show()

+-------+------------------+
|summary|      sellingprice|
+-------+------------------+
|  count|              5706|
|   mean|15531.962320364528|
| stddev|11945.861723709193|
|    min|               150|
|    max|            154000|
+-------+------------------+



----- Lasso Regression ----------

In [None]:
#### Use Lasso Regression ###
lasso = LinearRegression(featuresCol="features", labelCol="sellingprice", regParam=0.1, elasticNetParam=1.0)
lass_reg= lasso.fit(train_data)

In [None]:
model_lasso = lass_reg.transform(test_data)

In [None]:
rmse_lasso = evaluator.evaluate(model_lasso)
print("Root Mean Squared Error (RMSE) on test data:", rmse_lasso)

Root Mean Squared Error (RMSE) on test data: 5503.384911502671


In [None]:
lasso_r2 = evaluator.evaluate(model_lasso, {evaluator.metricName: "r2"})
print("R-squared:", lasso_r2)

R-squared: 0.791868828962514


--- Ridge Regression

In [None]:
Ridge = LinearRegression(featuresCol="features", labelCol="sellingprice", regParam=0.1, elasticNetParam=1.0)
ridge_reg= Ridge.fit(train_data)

In [None]:
ridge_model = ridge_reg.transform(test_data)

In [None]:
rmse_ridge = evaluator.evaluate(ridge_model)
print("Root Mean Squared Error (RMSE) on test data:", rmse_ridge)

Root Mean Squared Error (RMSE) on test data: 5503.384911502671


In [None]:
ridge_r2 = evaluator.evaluate(ridge_model, {evaluator.metricName: "r2"})
print("R-squared:", ridge_r2)

R-squared: 0.791868828962514
