In [1]:
# Boiler Plate
import findspark
import numpy as np
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('lrex').getOrCreate()

In [2]:
from pyspark.ml.regression import LinearRegression

In [3]:
training = spark.read.format('libsvm').load('sample_linear_regression_data.txt')

In [4]:
# This is conveniant because the data is in the correct format...
training.show()

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
| -9.490009878824548|(10,[0,1,2,3,4,5,...|
| 0.2577820163584905|(10,[0,1,2,3,4,5,...|
| -4.438869807456516|(10,[0,1,2,3,4,5,...|
|-19.782762789614537|(10,[0,1,2,3,4,5,...|
| -7.966593841555266|(10,[0,1,2,3,4,5,...|
| -7.896274316726144|(10,[0,1,2,3,4,5,...|
| -8.464803554195287|(10,[0,1,2,3,4,5,...|
| 2.1214592666251364|(10,[0,1,2,3,4,5,...|
| 1.0720117616524107|(10,[0,1,2,3,4,5,...|
|-13.772441561702871|(10,[0,1,2,3,4,5,...|
| -5.082010756207233|(10,[0,1,2,3,4,5,...|
|  7.887786536531237|(10,[0,1,2,3,4,5,...|
| 14.323146365332388|(10,[0,1,2,3,4,5,...|
|-20.057482615789212|(10,[0,1,2,3,4,5,...|
|-0.8995693247765151|(10,[0,1,2,3,4,5,...|
| -19.16829262296376|(10,[0,1,2,3,4,5,...|
|  5.601801561245534|(10,[0,1,2,3,4,5,...|
|-3.2256352187273354|(10,[0,1,2,3,4,5,...|
| 1.5299675726687754|(10,[0,1,2,3,4,5,...|
| -0.250102447941961|(10,[0,1,2,3,4,5,...|
+----------

In [5]:
lr = LinearRegression(featuresCol='features',labelCol='label',
                     predictionCol='prediction')

In [6]:
lrModel = lr.fit(training)

In [7]:
lrModel.coefficients

DenseVector([0.0073, 0.8314, -0.8095, 2.4412, 0.5192, 1.1535, -0.2989, -0.5129, -0.6197, 0.6956])

In [8]:
lrModel.intercept

0.14228558260358093

In [9]:
training_summary = lrModel.summary

In [10]:
# Grab the R Squared Value
training_summary.r2

0.027839179518600154

In [11]:
training_summary.rootMeanSquaredError

10.16309157133015

One mistake here is that we trained on our actual data. In reality you want to train on one portion of the data, then test your model on a different portion.

In [12]:
all_data = spark.read.format('libsvm').load('sample_linear_regression_data.txt')

Splitting into training and testing data

In [13]:
split_object = all_data.randomSplit([0.7,0.3])

In [14]:
# The first dataframe in this list has 70% of the data, the second dataframe has 30% of the data.
split_object

[DataFrame[label: double, features: vector],
 DataFrame[label: double, features: vector]]

In [15]:
# Use tuple unpacking instead
train_data, test_data = all_data.randomSplit([0.7,0.3])

In [17]:
train_data.describe().show()

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|                344|
|   mean|0.07974478261924665|
| stddev| 10.530185906896772|
|    min|-28.571478869743427|
|    max|  27.78383192005107|
+-------+-------------------+



In [18]:
test_data.describe().show()

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|                157|
|   mean| 0.6450260808832419|
| stddev|  9.858429606416498|
|    min|-26.805483428483072|
|    max| 27.111027963108548|
+-------+-------------------+



Note that we're split 70-30

In [19]:
correct_model = lr.fit(train_data)

In [20]:
test_results = correct_model.evaluate(test_data)

In [21]:
test_results.residuals.show()

+-------------------+
|          residuals|
+-------------------+
|-27.839336717327704|
| -21.44146599878472|
| -20.38179924334598|
|-15.417291811688367|
|-16.473945771021583|
|-15.340106871083803|
|-18.934763680500065|
| -17.41379148351359|
| -17.58399232303062|
| -17.47440523773516|
| -14.21847642183023|
|-11.708250291600939|
|-14.921770319501807|
|-13.637847894718576|
| -5.679517365029663|
|-12.244644586657937|
|-13.056263953436032|
|  -9.39403674519666|
|  -7.05281797484432|
| -9.557033596170456|
+-------------------+
only showing top 20 rows



Now we can call any kind of methods we want to see the results

In [22]:
unlabeled_data = test_data.select('features')

In [24]:
unlabeled_data.show()

+--------------------+
|            features|
+--------------------+
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
+--------------------+
only showing top 20 rows



In [25]:
predictions = correct_model.transform(unlabeled_data)

In [26]:
# These are the predictions based off the feature data
predictions.show()

+--------------------+-------------------+
|            features|         prediction|
+--------------------+-------------------+
|(10,[0,1,2,3,4,5,...| 1.0338532888446315|
|(10,[0,1,2,3,4,5,...| 1.3839833829955093|
|(10,[0,1,2,3,4,5,...|0.49723846907255587|
|(10,[0,1,2,3,4,5,...|-3.7510008112753948|
|(10,[0,1,2,3,4,5,...|-1.0202545858617604|
|(10,[0,1,2,3,4,5,...|-1.9866138615921454|
|(10,[0,1,2,3,4,5,...| 2.2425566591889585|
|(10,[0,1,2,3,4,5,...| 1.6902758704650214|
|(10,[0,1,2,3,4,5,...| 2.2244474431979433|
|(10,[0,1,2,3,4,5,...| 2.1396377578128183|
|(10,[0,1,2,3,4,5,...|0.24234549067752645|
|(10,[0,1,2,3,4,5,...|-2.1588376035578296|
|(10,[0,1,2,3,4,5,...| 1.8818422553971925|
|(10,[0,1,2,3,4,5,...| 0.6599991693264718|
|(10,[0,1,2,3,4,5,...| -5.961032312859163|
|(10,[0,1,2,3,4,5,...|  1.644514244748904|
|(10,[0,1,2,3,4,5,...|  2.700345353061351|
|(10,[0,1,2,3,4,5,...|-0.8946205071920484|
|(10,[0,1,2,3,4,5,...|-3.1806216121088333|
|(10,[0,1,2,3,4,5,...|-0.6311237790206556|
+----------