In [1]:
#We're gonna start doing some machine learning with spark 
# ----------- Linear regression -------------------
# we're gonna predict miles per gallon based on weight using linear regression



In [2]:
from pyspark import SparkContext
from pyspark.mllib.linalg import Vectors
import math

In [3]:
sc = SparkContext()

In [4]:
autoData = sc.textFile('auto-miles-per-gallon.csv')
autoData.cache()

auto-miles-per-gallon.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
#let's now take a quick look at the data
autoData.take(20)

['MPG,CYLINDERS,DISPLACEMENT,HORSEPOWER,WEIGHT,ACCELERATION,MODELYEAR,NAME',
 '18,8,307,130,3504,12,70,chevrolet chevelle malibu',
 '15,8,350,165,3693,11.5,70,buick skylark 320',
 '18,8,318,150,3436,11,70,plymouth satellite',
 '16,8,304,150,3433,12,70,amc rebel sst',
 '17,8,302,140,3449,10.5,70,ford torino',
 '15,8,429,198,4341,10,70,ford galaxie 500',
 '14,8,454,220,4354,9,70,chevrolet impala',
 '14,8,440,215,4312,8.5,70,plymouth fury iii',
 '14,8,455,225,4425,10,70,pontiac catalina',
 '15,8,390,190,3850,8.5,70,amc ambassador dpl',
 '15,8,383,170,3563,10,70,dodge challenger se',
 "14,8,340,160,3609,8,70,plymouth 'cuda 340",
 '15,8,400,150,3761,9.5,70,chevrolet monte carlo',
 '14,8,455,225,3086,10,70,buick estate wagon (sw)',
 '24,4,113,95,2372,15,70,toyota corona mark ii',
 '22,6,198,95,2833,15.5,70,plymouth duster',
 '18,6,199,97,2774,15.5,70,amc hornet',
 '21,6,200,85,2587,16,70,ford maverick',
 '27,4,97,88,2130,14.5,70,datsun pl510']

In [6]:
#let's now remove the first line

dataLines = autoData.filter(lambda x: 'MPG' not in x)

We want to now turn our RDD into a 'dense vector'. In order to do that, we need to:
1. Remove unwanted columns
2. Change non-numeric values to numeric.

In order to do that, we will now create the `transformToNumeric`

In [7]:
avgHP = sc.broadcast(80.0)

def transformToNumeric(Str):
    #global avgHP
    att = Str.split(',')
    hp = att[3]
    if hp == '?':
        hp = 80.0
    #we now filter the unwanted columns
    values = Vectors.dense([float(att[0]), float(att[1]), hp, float(att[5]), float(att[6])])
    return values


In [8]:
autoVectors = dataLines.map(transformToNumeric)
autoVectors.take(29)

[DenseVector([18.0, 8.0, 130.0, 12.0, 70.0]),
 DenseVector([15.0, 8.0, 165.0, 11.5, 70.0]),
 DenseVector([18.0, 8.0, 150.0, 11.0, 70.0]),
 DenseVector([16.0, 8.0, 150.0, 12.0, 70.0]),
 DenseVector([17.0, 8.0, 140.0, 10.5, 70.0]),
 DenseVector([15.0, 8.0, 198.0, 10.0, 70.0]),
 DenseVector([14.0, 8.0, 220.0, 9.0, 70.0]),
 DenseVector([14.0, 8.0, 215.0, 8.5, 70.0]),
 DenseVector([14.0, 8.0, 225.0, 10.0, 70.0]),
 DenseVector([15.0, 8.0, 190.0, 8.5, 70.0]),
 DenseVector([15.0, 8.0, 170.0, 10.0, 70.0]),
 DenseVector([14.0, 8.0, 160.0, 8.0, 70.0]),
 DenseVector([15.0, 8.0, 150.0, 9.5, 70.0]),
 DenseVector([14.0, 8.0, 225.0, 10.0, 70.0]),
 DenseVector([24.0, 4.0, 95.0, 15.0, 70.0]),
 DenseVector([22.0, 6.0, 95.0, 15.5, 70.0]),
 DenseVector([18.0, 6.0, 97.0, 15.5, 70.0]),
 DenseVector([21.0, 6.0, 85.0, 16.0, 70.0]),
 DenseVector([27.0, 4.0, 88.0, 14.5, 70.0]),
 DenseVector([26.0, 4.0, 46.0, 20.5, 70.0]),
 DenseVector([25.0, 4.0, 87.0, 17.5, 70.0]),
 DenseVector([24.0, 4.0, 90.0, 14.5, 70.0]),
 

We will now do Statistical Analysis over the Data we have collected.

Remember that our columns are, respectively **MPG, CYLINDERS, HP, ACCELERATION, MODEL YEAR**

In [9]:
from pyspark.mllib.stat import Statistics
autoStats = Statistics.colStats(autoVectors)

In [10]:
#This are some of the operations we can do with this module
print(
autoStats.mean(),
autoStats.variance(),
autoStats.min(),
autoStats.max())

[ 23.51457286   5.45477387 104.10050251  15.56809045  76.01005025] [  61.08961077    2.89341544 1468.09062947    7.60484823   13.67244282] [ 9.  3. 46.  8. 70.] [ 46.6   8.  230.   24.8  82. ]


In [11]:
#with the following commnand, we can obtain a correlation matrix among the different attributes. In other words, we can see how 
# correlated these are among each other

Statistics.corr(autoVectors)

#so for instance we can see that there's a strong correlation between the values of HP and Cylinders.

array([[ 1.        , -0.77539629, -0.77463084,  0.42028891,  0.57926713],
       [-0.77539629,  1.        ,  0.84275215, -0.50541949, -0.3487458 ],
       [-0.77463084,  0.84275215,  1.        , -0.68829885, -0.41559383],
       [ 0.42028891, -0.50541949, -0.68829885,  1.        ,  0.28813695],
       [ 0.57926713, -0.3487458 , -0.41559383,  0.28813695,  1.        ]])

Next, we're gonna transform our data into a Labeled Point. Let's remember that a Labeled Point is a vector that contains first the value (outcome) and the rest are all the attributes that we use to predict our value - > (y, x_1, x_2,..., x_n)

In [12]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [13]:
#we create the function for our Labeled Point
# the first value will be the MPG, and the rest are the other attributes
#note that we decided to drop the value for the MODEL YEAR as it doesn't have a strong correlation (as we can see from the corr matrix)
#actually, in the video they got rid of ACCELERATION instead, but it doesn't really matter.

def transformToLabeledPoint(Str):
    lp = (float(Str[0]), Vectors.dense(Str[1], Str[2], Str[3]))
    return lp

In [14]:
autoLp = autoVectors.map(transformToLabeledPoint)


And next we're gonna use the `SQLContext` that we imported a couple of cells ago to create a DataFrame (as we've already learned in past lectures), which is what we're gonna use next for our modelling

In [15]:
autoDf = sqlContext.createDataFrame(autoLp, ['label', 'features'])
autoDf.show(10)

+-----+----------------+
|label|        features|
+-----+----------------+
| 18.0|[8.0,130.0,12.0]|
| 15.0|[8.0,165.0,11.5]|
| 18.0|[8.0,150.0,11.0]|
| 16.0|[8.0,150.0,12.0]|
| 17.0|[8.0,140.0,10.5]|
| 15.0|[8.0,198.0,10.0]|
| 14.0| [8.0,220.0,9.0]|
| 14.0| [8.0,215.0,8.5]|
| 14.0|[8.0,225.0,10.0]|
| 15.0| [8.0,190.0,8.5]|
+-----+----------------+
only showing top 10 rows



Next we need to find **correlations**

In [16]:
#we first find the number of features in each row (which we know to be 3)
numFeatures = autoDf.take(1)[0].features.size
labelRDD = autoDf.rdd.map(lambda x: float(x.label))
for i in range(numFeatures):
    featureRDD = autoDf.rdd.map(lambda x: x.features[i])
    corr = Statistics.corr(labelRDD, featureRDD, 'pearson')
    print('%d\t%g' % (i, corr))

0	-0.775396
1	-0.774631
2	0.420289


Obviously, we obtain the same result in the previous correlation numbers, as we did with the correlation matrix beforehand (as it's doing the same operations).

Now, we want to do some actual Machine Learning. In order to do that, we're gonna split our data into **Training Data** and **Test Data**, as we know already. We're gonna do respectively 90% and 10%, because our sample is small, so we want to use as much training data as possible. 

In [17]:
(trainingData, testData) = autoDf.randomSplit([0.9,0.1])
print(
trainingData.count(),
testData.count())

347 51


We now build the Model on training data

In [18]:
from pyspark.ml.regression import LinearRegression


In [19]:
trainingData.show(20)

+-----+----------------+
|label|        features|
+-----+----------------+
| 10.0|[8.0,200.0,15.0]|
| 10.0|[8.0,215.0,14.0]|
| 11.0|[8.0,150.0,14.0]|
| 11.0|[8.0,208.0,11.0]|
| 11.0|[8.0,210.0,13.5]|
| 12.0|[8.0,160.0,13.5]|
| 12.0|[8.0,167.0,12.5]|
| 12.0|[8.0,180.0,12.5]|
| 12.0|[8.0,198.0,11.5]|
| 13.0|[8.0,129.0,12.0]|
| 13.0|[8.0,130.0,14.0]|
| 13.0|[8.0,140.0,16.0]|
| 13.0|[8.0,145.0,13.0]|
| 13.0|[8.0,150.0,12.0]|
| 13.0|[8.0,150.0,14.5]|
| 13.0|[8.0,155.0,13.5]|
| 13.0|[8.0,158.0,13.0]|
| 13.0|[8.0,165.0,12.0]|
| 13.0|[8.0,170.0,12.0]|
| 13.0|[8.0,175.0,12.0]|
+-----+----------------+
only showing top 20 rows



In [21]:
lr = LinearRegression(maxIter = 10, featuresCol = 'features', labelCol = 'label') 
lrModel = lr.fit(trainingData)

IllegalArgumentException: requirement failed: Column features must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.