## understanding the data 
| column name | description |
| --- | --- |
|manufacturer | the manufacturer of the car |
|mpg | miles per gallon |
|cyl| number of cylinders |
|disp | displacement |
|hp | horsepower |
|drat | rear axle ratio |
|wt | weight |
|qsec | 1/4 mile time |
|vs | V/S |
|am | transmission |
|gear | number of forward gears |
|carb | number of carburetors |

## Downloading the data

In [None]:
%%bash 
# get mtcars data and save it in tmp folder
wget https://raw.githubusercontent.com/plotly/datasets/master/mtcars.csv -O tmp/mtcars.csv

## Initializations

In [46]:
#!pip install pyspark
#!pip install numpy
#!pip install pandas

In [22]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName("ML").getOrCreate()

**Linear Regression** is a statistical method that models the relationship between a dependent variable and one or more independent variables by fitting a linear equation to observed data. One of the most common methods used in machine learning.
**train test split** is a method used to split the data into two parts, one for training and the other for testing. The training data is used to train the model and the testing data is used to evaluate the model.
**mean squared error** is a method used to evaluate the performance of a model. It is the average of the squared differences between the predicted values and the actual values.
**feature scaling** is a method used to standardize the range of independent variables or features of data. It is used to normalize the data so that the model can learn the patterns in the data more effectively.
**feature engineering** is a method used to create new features from the existing features in the data. It is used to improve the performance of the model by providing more information to the model.

In [35]:
mtcars_df = spark.read.csv('tmp/mtcars.csv', header=True, inferSchema=True)

In [6]:
mtcars_df.show(5)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|     manufacturer| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



In [7]:
mtcars_df.printSchema()

root
 |-- manufacturer: string (nullable = true)
 |-- mpg: double (nullable = true)
 |-- cyl: integer (nullable = true)
 |-- disp: double (nullable = true)
 |-- hp: integer (nullable = true)
 |-- drat: double (nullable = true)
 |-- wt: double (nullable = true)
 |-- qsec: double (nullable = true)
 |-- vs: integer (nullable = true)
 |-- am: integer (nullable = true)
 |-- gear: integer (nullable = true)
 |-- carb: integer (nullable = true)



In [11]:
mtcars_df.count()

32

In [12]:
mtcars_df.columns 

['manufacturer',
 'mpg',
 'cyl',
 'disp',
 'hp',
 'drat',
 'wt',
 'qsec',
 'vs',
 'am',
 'gear',
 'carb']

In [13]:
mtcars_df.describe().show()

[Stage 7:>                                                          (0 + 1) / 1]

+-------+------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+
|summary|manufacturer|               mpg|               cyl|              disp|               hp|              drat|                wt|              qsec|                vs|                 am|              gear|              carb|
+-------+------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+
|  count|          32|                32|                32|                32|               32|                32|                32|                32|                32|                 32|                32|                32|
|   mean|        null|20.090624999999996|            6.1875|230.72187500

                                                                                

In [36]:
## stamdardize mpg column : preprocessing
from pyspark.sql.functions import col
max_mpg = mtcars_df.agg({"mpg": "max"}).collect()[0][0]
min_mpg = mtcars_df.agg({"mpg": "min"}).collect()[0][0]
mtcars_df = mtcars_df.withColumn("mpg", col("mpg") / (max_mpg - min_mpg))

In [27]:
mtcars_df.show(5)

+-----------------+------------------+---+-----+---+----+-----+-----+---+---+----+----+
|     manufacturer|               mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+------------------+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|0.8936170212765957|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|0.8936170212765957|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|0.9702127659574469|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|0.9106382978723404|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|0.7957446808510638|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+------------------+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



In [8]:
# spilt the data into training and testing
train, test = mtcars_df.randomSplit([0.7, 0.3], seed=1111)

In [31]:
## feature engineering
features_col = ['cyl', 'disp', 'hp', 'drat', 'wt', 'qsec', 'vs', 'am', 'gear', 'carb']
target_col = 'mpg'

In [37]:
# vector assembler is used to combine all features into a single vector column 
assembler = VectorAssembler(inputCols=features_col, outputCol="features")
assembled_df = assembler.transform(mtcars_df)

In [38]:
# show sample of the training data
assembled_df.show(5)

+-----------------+------------------+---+-----+---+----+-----+-----+---+---+----+----+--------------------+
|     manufacturer|               mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|            features|
+-----------------+------------------+---+-----+---+----+-----+-----+---+---+----+----+--------------------+
|        Mazda RX4|0.8936170212765957|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|[6.0,160.0,110.0,...|
|    Mazda RX4 Wag|0.8936170212765957|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|[6.0,160.0,110.0,...|
|       Datsun 710|0.9702127659574469|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|[4.0,108.0,93.0,3...|
|   Hornet 4 Drive|0.9106382978723404|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|[6.0,258.0,110.0,...|
|Hornet Sportabout|0.7957446808510638|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|[8.0,360.0,175.0,...|
+-----------------+------------------+---+-----+---+----+-----+-----+---+---+----+----+--------------------+
only showing top 5 

In [39]:
## standardize the features
# standard scaler is used to standardize the features using the mean and standard deviation
# using the standard scaler, the features will have a mean of 0 and a standard deviation of 1
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(assembled_df)
scaled_df = scaler_model.transform(assembled_df)
scaled_df.show(5)

+-----------------+------------------+---+-----+---+----+-----+-----+---+---+----+----+--------------------+--------------------+
|     manufacturer|               mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|            features|     scaled_features|
+-----------------+------------------+---+-----+---+----+-----+-----+---+---+----+----+--------------------+--------------------+
|        Mazda RX4|0.8936170212765957|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|[6.0,160.0,110.0,...|[3.35960987440765...|
|    Mazda RX4 Wag|0.8936170212765957|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|[6.0,160.0,110.0,...|[3.35960987440765...|
|       Datsun 710|0.9702127659574469|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|[4.0,108.0,93.0,3...|[2.23973991627177...|
|   Hornet 4 Drive|0.9106382978723404|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|[6.0,258.0,110.0,...|[3.35960987440765...|
|Hornet Sportabout|0.7957446808510638|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|[8.

## Building A Machine Learning Model With Spark ML

In [40]:
train_data, test_data = scaled_df.randomSplit([.8,.2], seed=123)

In [55]:
# create a linear regression model
lr = LinearRegression(featuresCol = 'scaled_features',
                       labelCol='mpg',
                       maxIter=10,
                       regParam=0.3,
                       elasticNetParam=0.01,
                       standardization=False,
                       predictionCol='mpg_pred')


In [56]:
# Fit the data to the model
linearModel = lr.fit(train_data)

In [57]:
# find the coefficients and intercept for the linear regression model
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

Coefficients: [-0.031761725720689075,-0.03121534111809282,-0.029250429781173887,0.023076416262198418,-0.044528198406411104,0.011014346311797568,0.020445354653571845,0.02681427368069747,0.011503827526204179,-0.03008937701326752]
Intercept: 0.9296455474427316


In [58]:
import pandas as pd
import numpy as np
coeff_df = pd.DataFrame({"Feature": ["Intercept"] + features_col, "Co-efficients": np.insert(linearModel.coefficients.toArray(), 0, linearModel.intercept)})
coeff_df = coeff_df[["Feature", "Co-efficients"]]

In [59]:
coeff_df

Unnamed: 0,Feature,Co-efficients
0,Intercept,0.929646
1,cyl,-0.031762
2,disp,-0.031215
3,hp,-0.02925
4,drat,0.023076
5,wt,-0.044528
6,qsec,0.011014
7,vs,0.020445
8,am,0.026814
9,gear,0.011504


In [60]:
# Generate predictions
predictions = linearModel.transform(test_data)

In [61]:
predandlabels = predictions.select("mpg_pred", "mpg")

In [62]:
predandlabels.show(5)

+------------------+------------------+
|          mpg_pred|               mpg|
+------------------+------------------+
|0.6481718499182795|0.5659574468085107|
|0.6381480920022511|0.6085106382978723|
|0.8861869159704069|0.9106382978723404|
|0.7158692308691824|0.7957446808510638|
|0.9196939955526666|0.8936170212765957|
+------------------+------------------+
only showing top 5 rows



In [63]:
## Using the RegressionMetrics from pyspark.mllib package
# mllib is old so the methods are available in rdd
metrics = RegressionMetrics(predandlabels.rdd)



                                                                                

In [64]:
print("RMSE: {0}".format(metrics.rootMeanSquaredError))

RMSE: 0.06996297820988183


In [65]:
print("MAE: {0}".format(metrics.meanAbsoluteError))

MAE: 0.05892555706367346


In [66]:
print("R2: {0}".format(metrics.r2))

R2: 0.6979231047092722


In [67]:
spark.stop()