In [37]:
import findspark
findspark.init()

In [53]:
from pyspark.ml.feature import StringIndexer, StandardScaler,  VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession

from pyspark.ml.evaluation import RegressionEvaluator

In [39]:
spark = SparkSession.builder.appName("Machine Learning session").getOrCreate()

In [40]:
df = spark.read.parquet('mpg-cleaned.parquet')

In [41]:
rowcount = df.count()
rowcount

385

In [42]:
df.show(5)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|24.0|        4|      134.0|        96|  2702|      13.5|  75|Japanese|
|18.0|        6|      250.0|        88|  3139|      14.5|  71|American|
|29.0|        4|       68.0|        49|  1867|      19.5|  73|European|
|22.4|        6|      231.0|       110|  3415|      15.8|  81|American|
|20.5|        6|      231.0|       105|  3425|      16.9|  77|American|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows



In [43]:
df.printSchema()

root
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Engine_Disp: double (nullable = true)
 |-- Horsepower: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Accelerate: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Origin: string (nullable = true)



In [44]:
indexer = StringIndexer(inputCol = 'Origin', outputCol = 'OriginIndex')

In [45]:
assembler = VectorAssembler(inputCols = ['Cylinders', 'Engine_Disp', 'Horsepower', 'Weight', 'Accelerate', 'Year'], outputCol = 'features')

In [46]:
scaler = StandardScaler(inputCol = 'features', outputCol = 'scaledFeatures', withMean = True, withStd = True)

In [47]:
lr = LinearRegression(featuresCol = 'scaledFeatures',labelCol = "MPG")

In [48]:
pipeline = Pipeline(stages = [indexer, assembler, scaler, lr])

In [49]:
(training_data, testing_data) = df.randomSplit([0.7, 0.3], seed = 42)

In [50]:
pipelineModel = pipeline.fit(training_data)

In [51]:
predictions = pipelineModel.transform(testing_data)

In [52]:
predictions.show(5, truncate=False)

+----+---------+-----------+----------+------+----------+----+--------+-----------+----------------------------------+---------------------------------------------------------------------------------------------------------------------+------------------+
|MPG |Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year|Origin  |OriginIndex|features                          |scaledFeatures                                                                                                       |prediction        |
+----+---------+-----------+----------+------+----------+----+--------+-----------+----------------------------------+---------------------------------------------------------------------------------------------------------------------+------------------+
|10.0|8        |360.0      |215       |4615  |14.0      |70  |American|0.0        |[8.0,360.0,215.0,4615.0,14.0,70.0]|[1.5745319883009272,1.7319454976406607,3.050166362711152,2.0931977237704493,-0.5832806936985389,-1.671670079144657

In [55]:
evaluator = RegressionEvaluator(labelCol = 'MPG', predictionCol='prediction', metricName='mse')
mse = evaluator.evaluate(predictions)
print(mse)

12.226745835567343


In [56]:
evaluator = RegressionEvaluator(labelCol = 'MPG', predictionCol='prediction', metricName = 'mae')
mae = evaluator.evaluate(predictions)
print(mae)

2.8457151130131373


In [57]:
evaluator = RegressionEvaluator(labelCol = 'MPG', predictionCol='prediction', metricName='r2')
r2 = evaluator.evaluate(predictions)
print(r2)

0.8018737394896357


In [58]:
pipelineModel.write().overwrite().save('project_1_model')

In [59]:
spark.stop()