In [None]:
!wget -q https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar -xvf spark-3.1.1-bin-hadoop2.7.tgz

In [1]:
import os
# The enviromental settings are set
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.1.1-bin-hadoop2.7"

In [None]:
# Imports
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder

In [None]:
# import necessary libraries
import pandas as pd 
import numpy
import matplotlib.pyplot as plt 
from pyspark.sql import SparkSession
# create sparksession
sc = SparkSession \
    .builder \
    .appName("Pysparkexample") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
# The data is read
data= sc.read.csv("data.csv", inferSchema=True, header=True)
data.printSchema()

In [None]:
# Data filtration is done
data.describe().toPandas().transpose()

In [None]:
# A filtration step is applied to the data
def replace(column, value):
    return when(column != value, column).otherwise(lit(None))
data = data.withColumn("Market Category", replace(col("Market Category"), "N/A"))
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()
data = data.drop("Market Category")
data = data.na.drop()
print((data.count(), len(data.columns)))

In [None]:
# The data is preprocessed to make it so the model can read it
assembler = VectorAssembler(inputCols=["Year",
                                      "Engine HP",
                                      "Engine Cylinders",
                                      "Number of Doors",
                                      "highway MPG",
                                      "city mpg",
                                      "Popularity"], outputCol = "Attributes")

In [None]:
# The model and pipeline are initialized
regressor = RandomForestRegressor(featuresCol = "Attributes", labelCol="MSRP")
pipeline = Pipeline(stages=[assembler, regressor])

In [None]:
# The annotation pipeline is saved
pipeline.write().overwrite().save("pipeline")


In [None]:
# And it's loaded again
pipelineModel = Pipeline.load("pipeline")
paramGrid = ParamGridBuilder().addGrid(regressor.numTrees, [100, 500]).build()

In [None]:
# We apply cross validation in 3 folds to our model for better results
crossval = CrossValidator(estimator=pipelineModel,
estimatorParamMaps = paramGrid,
evaluator= RegressionEvaluator(labelCol = "MSRP"),
                               numFolds=3)

In [None]:
# A 80/20 train test split is made
train_data, test_data = data.randomSplit([0.8,0.2], seed=123)

In [None]:
# And we train the model
cvModel= crossval.fit(train_data)

In [None]:
# The best model is selected, as cross validation was applied
bestModel= cvModel.bestModel
for x in range(len(bestModel.stages)):
    print(bestModel.stages[x])

In [None]:
# We predict values on the test set for evaluation
pred = cvModel.transform(test_data)
pred.select("MSRP", "prediction").show()

In [None]:
# And we evaluate the model
eval = RegressionEvaluator(labelCol = "MSRP")
rmse = eval.evaluate(pred)
mse= eval.evaluate(pred, {eval.metricName: "mse"})
mae= eval.evaluate(pred, {eval.metricName: "mae"})
r2 = eval.evaluate(pred, {eval.metricName: "r2"})

In [None]:
open("metrics.txt", "w").write("{}\n{}\n{}\n{}\n".format(rmse, mse, mae,r2))