In [13]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.ml.regression import LinearRegression
from pyspark.sql.types import StructType

def zip_df(l, r):
    return l.rdd.zip(r.rdd)\
            .map(lambda x: (x[0][0],x[0][1],x[1][0]))\
            .toDF(StructType([l.schema[0],l.schema[1],r.schema[0]]))

In [2]:
spark = SparkSession.builder.appName('lin_reg').getOrCreate()

22/02/14 10:18:55 WARN Utils: Your hostname, ganesh-pi resolves to a loopback address: 127.0.1.1; using 192.168.1.119 instead (on interface eth0)
22/02/14 10:18:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/14 10:18:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
training = spark.read.format('libsvm').load('sample_linear_regression_data.txt')
training.printSchema()

22/02/14 10:19:12 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.
[Stage 0:>                                                          (0 + 1) / 1]

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



                                                                                

In [4]:
training.show(5, truncate=True)

[Stage 1:>                                                          (0 + 1) / 1]

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
| -9.490009878824548|(10,[0,1,2,3,4,5,...|
| 0.2577820163584905|(10,[0,1,2,3,4,5,...|
| -4.438869807456516|(10,[0,1,2,3,4,5,...|
|-19.782762789614537|(10,[0,1,2,3,4,5,...|
| -7.966593841555266|(10,[0,1,2,3,4,5,...|
+-------------------+--------------------+
only showing top 5 rows



                                                                                

In [5]:
#Instantiate the model
lr = LinearRegression(featuresCol='features', labelCol='label', predictionCol='prediction')

lrModel = lr.fit(training)

22/02/14 10:19:32 WARN Instrumentation: [da38e8bd] regParam is zero, which might cause numerical instability and overfitting.
22/02/14 10:19:34 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [8]:
lrModel.intercept

0.14228558260358093

In [9]:
training_summary = lrModel.summary

print(training_summary.r2)
print(training_summary.rootMeanSquaredError)

0.027839179518600154
10.16309157133015


In [7]:
#Splitting the data into train test split
all_data = training
# all_data.printSchema()

#Creating the randomised split
split_obj = all_data.randomSplit([0.7, 0.3])
split_obj

[DataFrame[label: double, features: vector],
 DataFrame[label: double, features: vector]]

In [8]:
train_data , test_data = all_data.randomSplit([0.7, 0.3])

In [14]:
test_data.describe().show()

                                                                                

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|                155|
|   mean| -0.997507602880807|
| stddev| 10.627032952242061|
|    min|-28.046018037776633|
|    max| 22.647750304177556|
+-------+-------------------+



In [9]:
correct_model = lr.fit(train_data)

test_results = correct_model.evaluate(test_data)    #Comparing the predictions vs the actual values
test_results.rootMeanSquaredError

22/02/14 10:20:05 WARN Instrumentation: [f336436b] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

10.237310555715675

In [10]:
unlabelled_data = test_data.select('features')

predictions = correct_model.transform(unlabelled_data)
predictions.show()

+--------------------+--------------------+
|            features|          prediction|
+--------------------+--------------------+
|(10,[0,1,2,3,4,5,...| -0.7642274306336959|
|(10,[0,1,2,3,4,5,...| -2.0622071452412563|
|(10,[0,1,2,3,4,5,...| -0.2900894514586674|
|(10,[0,1,2,3,4,5,...|  -2.710431609214994|
|(10,[0,1,2,3,4,5,...| -0.9680430791731777|
|(10,[0,1,2,3,4,5,...|  1.6811920401703473|
|(10,[0,1,2,3,4,5,...| -1.2232849817228173|
|(10,[0,1,2,3,4,5,...|  0.6519810757083847|
|(10,[0,1,2,3,4,5,...|  0.5977817790927897|
|(10,[0,1,2,3,4,5,...|  3.1810333904575585|
|(10,[0,1,2,3,4,5,...| -1.0644358794560151|
|(10,[0,1,2,3,4,5,...|-0.21507318695515246|
|(10,[0,1,2,3,4,5,...|  1.2745772611198392|
|(10,[0,1,2,3,4,5,...|  0.1526879012486029|
|(10,[0,1,2,3,4,5,...|   2.930424095171104|
|(10,[0,1,2,3,4,5,...|-0.40887776481490923|
|(10,[0,1,2,3,4,5,...|  0.5385669931816103|
|(10,[0,1,2,3,4,5,...|  -4.123482088084502|
|(10,[0,1,2,3,4,5,...| -2.1575761558254443|
|(10,[0,1,2,3,4,5,...|  0.958272

In [14]:
#Compbining the actual and predicted columns into one dataframe
predictions = zip_df(predictions, test_data.select('label'))
predictions.show()

[Stage 8:>                                                          (0 + 1) / 1]

+--------------------+--------------------+-------------------+
|            features|          prediction|              label|
+--------------------+--------------------+-------------------+
|(10,[0,1,2,3,4,5,...| -0.7642274306336959|-28.571478869743427|
|(10,[0,1,2,3,4,5,...| -2.0622071452412563|-26.736207182601724|
|(10,[0,1,2,3,4,5,...| -0.2900894514586674|-23.487440120936512|
|(10,[0,1,2,3,4,5,...|  -2.710431609214994|-22.837460416919342|
|(10,[0,1,2,3,4,5,...| -0.9680430791731777|-19.872991038068406|
|(10,[0,1,2,3,4,5,...|  1.6811920401703473| -18.27521356600463|
|(10,[0,1,2,3,4,5,...| -1.2232849817228173|-17.494200356883344|
|(10,[0,1,2,3,4,5,...|  0.6519810757083847|-17.065399625876015|
|(10,[0,1,2,3,4,5,...|  0.5977817790927897|-16.692207021311106|
|(10,[0,1,2,3,4,5,...|  3.1810333904575585| -16.26143027545273|
|(10,[0,1,2,3,4,5,...| -1.0644358794560151|-16.151349351277112|
|(10,[0,1,2,3,4,5,...|-0.21507318695515246| -16.08565904102149|
|(10,[0,1,2,3,4,5,...|  1.27457726111983

                                                                                

In [1]:
#Thanks for reading