In [2]:
from __future__ import print_function

from pyspark.ml.regression import LinearRegression
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors


In [3]:
 spark = SparkSession.builder.config('spark.sql.warehouse.dir','file:///home/sudipto21048867/Pyspark/spark-warehouse')\
    .appName("LinearRegression")\
    .getOrCreate()

In [4]:
#Load up the data and convert it to a form MLLLIb expects

inputLines = spark.sparkContext.textFile('/user/sudipto21048867/data/regression_data/')

In [5]:
inputLines.count()

1000

In [6]:
inputLines.take(5)

[u'-1.74,1.66', u'1.24,-1.18', u'0.29,-0.40', u'-0.13,0.09', u'-0.39,0.38']

In [7]:
data = inputLines.map(lambda data : data.split(",")).map(lambda x : (float(x[0]), Vectors.dense(float(x[1]))))

In [8]:
#Convert this RDD to DataFrame
colNames = ["label","features"]
df = data.toDF(colNames)

In [9]:
df.show(5)

+-----+--------+
|label|features|
+-----+--------+
|-1.74|  [1.66]|
| 1.24| [-1.18]|
| 0.29|  [-0.4]|
|-0.13|  [0.09]|
|-0.39|  [0.38]|
+-----+--------+
only showing top 5 rows



In [10]:
# Note, there are lots of cases where you can avoid going from an RDD to a DataFrame.
    # Perhaps you're importing data from a real database. Or you are using structured streaming
    # to get your data.


In [11]:
trainTest = df.randomSplit([0.5,0.5])
trainingDF = trainTest[0]
testingDF = trainTest[1]

In [12]:
# Now create our linear regression model
lir = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)


In [14]:
#Train the model with the training data
model = lir.fit(trainingDF)

In [15]:
# Now see if we can predict values in our test data.
# Generate predictions using our linear regression model for all features in our
# test dataframe:

fullPredictions = model.transform(testingDF).cache()

In [16]:
fullPredictions.show(5)

+-----+--------+-------------------+
|label|features|         prediction|
+-----+--------+-------------------+
|-2.09|  [1.97]| -1.423413486466285|
|-1.94|  [1.94]|-1.4018289558531445|
|-1.94|  [1.98]|-1.4306083300039985|
|-1.88|  [1.89]|-1.3658547381645771|
|-1.87|  [1.98]|-1.4306083300039985|
+-----+--------+-------------------+
only showing top 5 rows



In [17]:
# Extract the predictions and the "known" correct labels.
predictions = fullPredictions.select("prediction").rdd.map(lambda x: x[0])
labels = fullPredictions.select("label").rdd.map(lambda x: x[0])


In [18]:
# Zip them together
predictionAndLabel = predictions.zip(labels).collect()

In [19]:
 # Print out the predicted and actual values for each point
for prediction in predictionAndLabel:
      print(prediction)

(-1.423413486466285, -2.09)
(-1.4018289558531445, -1.94)
(-1.4306083300039985, -1.94)
(-1.3658547381645771, -1.88)
(-1.4306083300039985, -1.87)
(-1.3298805204760098, -1.8)
(-1.2507372415611615, -1.79)
(-1.2219578674103075, -1.75)
(-1.2003733367971672, -1.74)
(-1.1859836497217402, -1.66)
(-1.3298805204760098, -1.64)
(-1.2435423980234481, -1.61)
(-1.1787888061840266, -1.6)
(-1.1284249014200325, -1.57)
(-1.2147630238725942, -1.53)
(-1.0205022483543302, -1.46)
(-0.955748656514909, -1.39)
(-0.9053847517509146, -1.37)
(-1.0205022483543302, -1.36)
(-0.941358969439482, -1.34)
(-0.8262414728360663, -1.3)
(-0.8262414728360663, -1.29)
(-1.0636713095806112, -1.29)
(-0.8766053776000606, -1.26)
(-0.955748656514909, -1.25)
(-0.8694105340623471, -1.22)
(-0.8046569422229259, -1.2)
(-0.8694105340623471, -1.2)
(-0.9053847517509146, -1.17)
(-0.7974620986852125, -1.12)
(-0.6679549150063698, -1.1)
(-0.7830724116097856, -1.1)
(-0.8550208469869202, -1.09)
(-0.7470981939212181, -1.07)
(-0.7255136633080776, -1.