# <font color='blue'>Machine Learning with PySpark Combo</font>

### <font color='blue'>Linear Regression</font>

We will use Linear Regression to predict the fuel consumption of automobiles.

The **consumption** variable in dataset1.csv will be the target variable (dependent) and the other variables will be candidate features (predictive or independent variables).

This is, therefore, a Multiple Linear Regression problem (when we have more than 1 predictor or independent variable).

In [1]:
# Import findspark and initialize
import findspark
findspark.init()

In [2]:
# Imports
import numpy as np
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

## Loading the Data

In [3]:
# Creating the Spark Context
sc = SparkContext(appName = "ML-Combo")

In [4]:
sc.setLogLevel("ERROR")

In [5]:
# Creating the Spark Session
spSession = SparkSession.builder.master("local").getOrCreate()

In [6]:
# Loading the data and generating an RDD
data = sc.textFile("data/dataset1.csv")

In [7]:
type(data)

pyspark.rdd.RDD

In [8]:
# Caching the RDD. This process optimizes the performance
data.cache()

data/dataset1.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [9]:
# Number of records
data.count()

399

In [10]:
# Viewing the first lines of an RDD
data.take(5)

['consumption,number_cylinders,capacity,horsepower,weight,acceleration,year,name',
 '30,4,79,70,2074,19.5,71,peugeot 304',
 '30,4,88,76,2065,14.5,71,fiat 124b',
 '31,4,71,65,1773,19,71,toyota corolla 1200',
 '35,4,72,69,1613,18,71,datsun 1200']

In [11]:
# Removing the first line of the file (header)
data2 = data.filter(lambda x: "horsepower" not in x)
data2.count()

398

In [12]:
# Preview the first lines
data2.take(5)

['30,4,79,70,2074,19.5,71,peugeot 304',
 '30,4,88,76,2065,14.5,71,fiat 124b',
 '31,4,71,65,1773,19,71,toyota corolla 1200',
 '35,4,72,69,1613,18,71,datsun 1200',
 '27,4,97,60,1834,19,71,volkswagen model 111']

## Data Cleanup

Let's check for missing values. RDDs are great for processing but bad for exploration, so we'll convert the RDD to Spark DataFrame and then to Pandas DataFrame.

In [13]:
# Convert RDD to Spark DataFrame
df_spark = data2.map(lambda x: str(x)).map(lambda w: w.split(',')).toDF()

In [14]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [15]:
# Convert Spark DataFrame to Pandas DataFrame
df_pandas = df_spark.toPandas()

In [16]:
type(df_pandas)

pandas.core.frame.DataFrame

In [17]:
df_pandas.head()

Unnamed: 0,_1,_2,_3,_4,_5,_6,_7,_8
0,30,4,79,70,2074,19.5,71,peugeot 304
1,30,4,88,76,2065,14.5,71,fiat 124b
2,31,4,71,65,1773,19.0,71,toyota corolla 1200
3,35,4,72,69,1613,18.0,71,datsun 1200
4,27,4,97,60,1834,19.0,71,volkswagen model 111


In [18]:
# Does it have null values?
df_pandas.isnull().values.any()

False

There is no null value, but is there a missing value (lack of information)? Let's check.

In [19]:
# Check if there is any line with special character "?"
total = np.sum(df_pandas.apply(lambda x: x.str.contains('\?')).values)
total

6

In [20]:
# Vejamos quais linhas e colunas têm o caracter especial
np.where(df_pandas.apply(lambda x: x.str.contains('\?')).values)

(array([ 48, 126, 330, 336, 354, 374], dtype=int64),
 array([3, 3, 3, 3, 3, 3], dtype=int64))

In [21]:
# Viewing a line
df_pandas.iloc[330]

_1                    40.9
_2                       4
_3                      85
_4                       ?
_5                    1835
_6                    17.3
_7                      80
_8    renault lecar deluxe
Name: 330, dtype: object

In [22]:
# Using a default value for average HP (which will be used to fill in the missing values)
avgHP = sc.broadcast(75.0)

In [23]:
# Function to clear data
def cleardata(inputStr) :
    
    # global variable
    global avgHP
    
    # Attribute list
    attList = inputStr.split(",")
    
    # Replace the ? by a value in index column 3
    hpValue = attList[3]
    if hpValue == "?":
        hpValue = avgHP.value
       
    # Create a row using the Row function, clearing and converting the data from string to float
    rows = Row(consumption = float(attList[0]), 
                 number_cylinders = float(attList[1]), 
                 capacity = float(attList[2]), 
                 horsepower = float(hpValue), 
                 weight = float(attList[4]), 
                 acceleration = float(attList[5]), 
                 year = float(attList[6]), 
                 name = attList[7]) 
    return rows

In [24]:
# Run the function in RDD
data3 = data2.map(cleardata)
data3.cache()
data3.take(5)

[Row(consumption=30.0, number_cylinders=4.0, capacity=79.0, horsepower=70.0, weight=2074.0, acceleration=19.5, year=71.0, name='peugeot 304'),
 Row(consumption=30.0, number_cylinders=4.0, capacity=88.0, horsepower=76.0, weight=2065.0, acceleration=14.5, year=71.0, name='fiat 124b'),
 Row(consumption=31.0, number_cylinders=4.0, capacity=71.0, horsepower=65.0, weight=1773.0, acceleration=19.0, year=71.0, name='toyota corolla 1200'),
 Row(consumption=35.0, number_cylinders=4.0, capacity=72.0, horsepower=69.0, weight=1613.0, acceleration=18.0, year=71.0, name='datsun 1200'),
 Row(consumption=27.0, number_cylinders=4.0, capacity=97.0, horsepower=60.0, weight=1834.0, acceleration=19.0, year=71.0, name='volkswagen model 111')]

## Exploratory Data Analysis

In [25]:
# Create a Dataframe
df_cars = spSession.createDataFrame(data3)

In [26]:
# Two-variable descriptive statistics (as an example)
df_cars.select("consumption", "number_cylinders").describe().show()

+-------+-----------------+-----------------+
|summary|      consumption| number_cylinders|
+-------+-----------------+-----------------+
|  count|              398|              398|
|   mean|23.51457286432161|5.454773869346734|
| stddev|7.815984312565782|1.701004244533212|
|    min|              9.0|              3.0|
|    max|             46.6|              8.0|
+-------+-----------------+-----------------+



In [27]:
# Finding the correlation between the target variable and the predictor variables (except the name)
for i in df_cars.columns:
    if not(isinstance(df_cars.select(i).take(1)[0][0], str)):
        print("Correlation of the Target Variable with:", i, df_cars.stat.corr('consumption', i))

Correlation of the Target Variable with: consumption 1.0
Correlation of the Target Variable with: number_cylinders -0.7753962854205546
Correlation of the Target Variable with: capacity -0.8042028248058979
Correlation of the Target Variable with: horsepower -0.7747041523498721
Correlation of the Target Variable with: weight -0.8317409332443347
Correlation of the Target Variable with: acceleration 0.42028891210164976
Correlation of the Target Variable with: year 0.5792671330833099


## Data Pre-Processing

In [28]:
# Converting to a LabeledPoint(target, Vector[features])
# Remove columns that are not relevant or have low correlation
def transformVar(row) :
    obj = (row["consumption"], Vectors.dense([row["weight"], row["capacity"], row["number_cylinders"]]))
    return obj

In [29]:
# Apply the function to the RDD and create another RDD
data4 = data3.map(transformVar)

In [30]:
# View
data4.take(5)

[(30.0, DenseVector([2074.0, 79.0, 4.0])),
 (30.0, DenseVector([2065.0, 88.0, 4.0])),
 (31.0, DenseVector([1773.0, 71.0, 4.0])),
 (35.0, DenseVector([1613.0, 72.0, 4.0])),
 (27.0, DenseVector([1834.0, 97.0, 4.0]))]

In [31]:
# Convert RDD to Spark DataFrame
df_cars = spSession.createDataFrame(data4, ["label", "features"])

In [32]:
# View label (y) and attributes (x)
df_cars.select("label","features").show(10)

+-----+------------------+
|label|          features|
+-----+------------------+
| 30.0| [2074.0,79.0,4.0]|
| 30.0| [2065.0,88.0,4.0]|
| 31.0| [1773.0,71.0,4.0]|
| 35.0| [1613.0,72.0,4.0]|
| 27.0| [1834.0,97.0,4.0]|
| 26.0| [1955.0,91.0,4.0]|
| 24.0|[2278.0,113.0,4.0]|
| 25.0| [2126.0,97.5,4.0]|
| 23.0| [2254.0,97.0,4.0]|
| 20.0|[2408.0,140.0,4.0]|
+-----+------------------+
only showing top 10 rows



In [33]:
# Split into Training and Test data with 70/30 split
(training_data, test_data) = df_cars.randomSplit([0.7, 0.3])

In [34]:
training_data.count()

286

In [35]:
test_data.count()

112

## Machine Learning

In [36]:
# Create the object
linearReg = LinearRegression()

In [37]:
# Train the object with data and create the model
model = linearReg.fit(training_data)

In [38]:
print(model)

LinearRegressionModel: uid=LinearRegression_d9ff98be1278, numFeatures=3


In [39]:
# Printing the coefficients (what the model has learned)
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

Coefficients: [-0.005913037974367063,-0.006669421078096151,-0.6419921295574764]
Intercept: 45.75464773056361


In [40]:
# Predictions with test data
predictions = model.transform(test_data)

In [41]:
# View forecasts
predictions.select("features", "prediction").show()

+------------------+------------------+
|          features|        prediction|
+------------------+------------------+
|[4615.0,360.0,8.0]|10.929048854285192|
|[4997.0,400.0,8.0]|  8.40349150495313|
|[4499.0,350.0,8.0]|11.681655470092728|
|[4906.0,400.0,8.0]| 8.941577960620528|
|[3821.0,360.0,8.0]|15.624001005932637|
|[4100.0,350.0,8.0]| 14.04095762186519|
|[4464.0,400.0,8.0]|11.555140745290771|
|[4654.0,360.0,8.0]|10.698440373284875|
|[4735.0,440.0,8.0]| 9.685930611113449|
|[4746.0,400.0,8.0]| 9.887664036519261|
|[5140.0,400.0,8.0]|7.5579270746186396|
|[3672.0,304.0,8.0]|16.878531244486716|
|[4042.0,302.0,8.0]|14.704046036127092|
|[4129.0,351.0,8.0]| 13.86281009953045|
|[4154.0,351.0,8.0]|13.714984150171269|
|[4257.0,304.0,8.0]|13.419404029481989|
|[4385.0,400.0,8.0]|12.022270745265764|
|[4457.0,318.0,8.0]|12.143424539515223|
|[3693.0,350.0,8.0]|16.447564077432585|
|[3761.0,400.0,8.0]|15.712006441270816|
+------------------+------------------+
only showing top 20 rows



In [42]:
# Coefficient of determination R2
evaluator = RegressionEvaluator(predictionCol = "prediction", labelCol = "label", metricName = "r2")

In [43]:
# Result
evaluator.evaluate(predictions) 

0.6858033570914506

## Disclaimer: 
A good part of this project was largely done in the Data Science Academy, Big Data Real-Time Analytics with Python and Spark course (part of the Data Scientist training)

# End