**Libraries**

In [1]:
import os
import sys
import pandas as pd

**Creating a SPARK session**

In [2]:
if sys.platform.startswith('win'):
    os.chdir(r"C:\Users\Thatoi\SparkPythonDoBigDataAnalytics-Resources\SparkPythonDoBigDataAnalytics-Resources")
    os.environ['SPARK_HOME'] = 'C:/Users/Thatoi/Downloads/spark-3.0.0-preview2-bin-hadoop2.7'

# create a variable for root path
SPARK_HOME = os.environ['SPARK_HOME'] 

sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
#sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.10.1-src.zip")) 'Doesn't work : Try conda install py4j'

#Initialize SparkSession and SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext

#Create a Spark Session
SpSession = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("V2 Maestros") \
    .config("spark.executor.memory", "1g") \
    .config("spark.cores.max","2") \
    .config("spark.sql.warehouse.dir", "file:///c:/Users/temp/spark-warehouse")\
    .getOrCreate()

#Get the Spark Context from Spark Session    
SpContext = SpSession.sparkContext

**Loading the file**

In [3]:
auto = SpContext.textFile('auto-miles-per-gallon.csv')
auto.cache()
auto.take(5)

['MPG,CYLINDERS,DISPLACEMENT,HORSEPOWER,WEIGHT,ACCELERATION,MODELYEAR,NAME',
 '18,8,307,130,3504,12,70,chevrolet chevelle malibu',
 '15,8,350,165,3693,11.5,70,buick skylark 320',
 '18,8,318,150,3436,11,70,plymouth satellite',
 '16,8,304,150,3433,12,70,amc rebel sst']

In [12]:
# Remove header line
datalines = auto.filter(lambda line : 'CYLINDERS' not in line)

**Data Cleanup**

In [38]:
from pyspark.sql import Row

#creating a default value for HP
avgHP = SpContext.broadcast(80.0)
avgMPG = SpContext.broadcast(140.0)

#Function to clean up data
def cleanup (inputstr):
    global avgHP
    global avgMPG
    
    attlist = inputstr.split(",")
    
    #Replace ? with the average
    hpvalue = attlist[3]
    mpgvalue = attlist[0]
    
    if hpvalue == '?':
        hpvalue = avgHP.value
    if mpgvalue == '?':
        mpgvalue = avgMPG.value
        
        
    #Replace row with converted data
    values = Row( MPG = float(mpgvalue),\
                  CYLINDERS = float(attlist[1]),\
                  DISPLACEMENT = float(attlist[2]),\
                  HORSEPOWER = float(hpvalue),\
                  WEIGHT = float(attlist[4]),\
                  ACCELERATION = float(attlist[5]),\
                  MODELYEAR = float(attlist[6]),\
                  NAME = attlist[7])
    return values        

In [39]:
# Running the map
cleandata = datalines.map(cleanup)

In [40]:
cleandata.take(5)

[Row(ACCELERATION=12.0, CYLINDERS=8.0, DISPLACEMENT=307.0, HORSEPOWER=130.0, MODELYEAR=70.0, MPG=18.0, NAME='chevrolet chevelle malibu', WEIGHT=3504.0),
 Row(ACCELERATION=11.5, CYLINDERS=8.0, DISPLACEMENT=350.0, HORSEPOWER=165.0, MODELYEAR=70.0, MPG=15.0, NAME='buick skylark 320', WEIGHT=3693.0),
 Row(ACCELERATION=11.0, CYLINDERS=8.0, DISPLACEMENT=318.0, HORSEPOWER=150.0, MODELYEAR=70.0, MPG=18.0, NAME='plymouth satellite', WEIGHT=3436.0),
 Row(ACCELERATION=12.0, CYLINDERS=8.0, DISPLACEMENT=304.0, HORSEPOWER=150.0, MODELYEAR=70.0, MPG=16.0, NAME='amc rebel sst', WEIGHT=3433.0),
 Row(ACCELERATION=10.5, CYLINDERS=8.0, DISPLACEMENT=302.0, HORSEPOWER=140.0, MODELYEAR=70.0, MPG=17.0, NAME='ford torino', WEIGHT=3449.0)]

**Descriptive analysis**

In [41]:
cleandf=SpSession.createDataFrame(cleandata)

In [43]:
cleandf.select('MPG','CYLINDERS').describe().show()

+-------+-----------------+------------------+
|summary|              MPG|         CYLINDERS|
+-------+-----------------+------------------+
|  count|              398|               398|
|   mean|23.51457286432161| 5.454773869346734|
| stddev|7.815984312565782|1.7010042445332125|
|    min|              9.0|               3.0|
|    max|             46.6|               8.0|
+-------+-----------------+------------------+



**Correlation Analysis**

In [45]:
#Find correlation between predictors and target
for i in cleandf.columns:
    if not( isinstance(cleandf.select(i).take(1)[0][0], str)) :
        print( "Correlation to MPG for ", i, cleandf.stat.corr('MPG',i))

Correlation to MPG for  ACCELERATION 0.4202889121016501
Correlation to MPG for  CYLINDERS -0.7753962854205548
Correlation to MPG for  DISPLACEMENT -0.8042028248058979
Correlation to MPG for  HORSEPOWER -0.7746308409203807
Correlation to MPG for  MODELYEAR 0.5792671330833091
Correlation to MPG for  MPG 1.0
Correlation to MPG for  WEIGHT -0.8317409332443347


**Preparin data for ML**

In [46]:
from pyspark.ml.linalg import Vectors

In [50]:
def transformtolabelpoint(row):
    lp = (row['MPG'],Vectors.dense([row['ACCELERATION'],row['DISPLACEMENT'],row['WEIGHT']]))
    return lp

In [51]:
autolp = cleandata.map(transformtolabelpoint)
autodf = SpSession.createDataFrame(autolp,['label','features'])

In [52]:
autodf.show(10)

+-----+-------------------+
|label|           features|
+-----+-------------------+
| 18.0|[12.0,307.0,3504.0]|
| 15.0|[11.5,350.0,3693.0]|
| 18.0|[11.0,318.0,3436.0]|
| 16.0|[12.0,304.0,3433.0]|
| 17.0|[10.5,302.0,3449.0]|
| 15.0|[10.0,429.0,4341.0]|
| 14.0| [9.0,454.0,4354.0]|
| 14.0| [8.5,440.0,4312.0]|
| 14.0|[10.0,455.0,4425.0]|
| 15.0| [8.5,390.0,3850.0]|
+-----+-------------------+
only showing top 10 rows



**Training & Test dataset**

In [54]:
(training,test) = autodf.randomSplit([0.7,0.3])

In [56]:
print(training.count(), test.count())

294 104


**Building the Model**

In [57]:
from pyspark.ml.regression import LinearRegression

In [58]:
lr = LinearRegression(maxIter =10)
lrModel = lr.fit(training)
print('Coefficient'+ str(lrModel.coefficients))
print('Intercept'+ str(lrModel.intercept))

Coefficient[0.15740204680068842,-0.009686820784601998,-0.0063927133208949915]
Intercept42.03496486348603


**Prediction**

In [59]:
prediction= lrModel.transform(test)
prediction.select('prediction','label','features').show()

+------------------+-----+-------------------+
|        prediction|label|           features|
+------------------+-----+-------------------+
|11.248966060308561| 10.0|[14.0,360.0,4615.0]|
| 11.85128594317737| 12.0|[12.5,350.0,4499.0]|
|11.511988846778252| 13.0|[12.0,400.0,4464.0]|
| 7.190514641853234| 13.0|[12.0,400.0,5140.0]|
|11.989509850015374| 13.0|[13.5,350.0,4502.0]|
| 14.17766673542415| 13.0|[16.0,302.0,4294.0]|
|11.219892849306127| 14.0| [9.0,454.0,4354.0]|
| 17.42625156884853| 14.0|[11.5,304.0,3672.0]|
|14.285604074524258| 14.0|[13.0,351.0,4129.0]|
|12.587160214562914| 14.0|[13.5,318.0,4457.0]|
| 14.20448726490223| 14.0|[13.5,351.0,4154.0]|
|10.988952464492048| 14.0|[13.5,351.0,4657.0]|
|15.095092299903357| 14.0|[14.0,318.0,4077.0]|
|15.552527422088655| 14.0|[14.5,302.0,4042.0]|
| 16.77680322597082| 15.0|[12.5,318.0,3777.0]|
| 22.49441091256257| 15.0|[19.5,250.0,3158.0]|
|20.978910532838377| 15.0|[21.0,250.0,3432.0]|
|12.307528407462996| 16.0| [9.5,400.0,4278.0]|
|10.129174305

**Performance Analysis**

In [60]:
from pyspark.ml.evaluation import RegressionEvaluator

In [62]:
evaluator = RegressionEvaluator(predictionCol = 'prediction', labelCol ='label', metricName = 'r2')
evaluator.evaluate(prediction)

0.6756183014210286