**Import Pyspark**

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.regression  import LinearRegression

**Start Spark**

In [2]:
spark = SparkSession.builder.appName('lrex').getOrCreate()

**Load Data**

In [3]:
training = spark.read.format('libsvm').load('sample_linear_regression_data.txt')

In [4]:
training.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [6]:
training.show(10)

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
| -9.490009878824548|(10,[0,1,2,3,4,5,...|
| 0.2577820163584905|(10,[0,1,2,3,4,5,...|
| -4.438869807456516|(10,[0,1,2,3,4,5,...|
|-19.782762789614537|(10,[0,1,2,3,4,5,...|
| -7.966593841555266|(10,[0,1,2,3,4,5,...|
| -7.896274316726144|(10,[0,1,2,3,4,5,...|
| -8.464803554195287|(10,[0,1,2,3,4,5,...|
| 2.1214592666251364|(10,[0,1,2,3,4,5,...|
| 1.0720117616524107|(10,[0,1,2,3,4,5,...|
|-13.772441561702871|(10,[0,1,2,3,4,5,...|
+-------------------+--------------------+
only showing top 10 rows



**Create Model instance**

In [7]:
lr = LinearRegression(featuresCol='features',labelCol='label'
                     ,predictionCol='prediction')

**Training model**

In [9]:
lr_model= lr.fit(training)
type(lr_model)

pyspark.ml.regression.LinearRegressionModel

In [11]:
lr_model.coefficients

DenseVector([0.0073, 0.8314, -0.8095, 2.4412, 0.5192, 1.1535, -0.2989, -0.5129, -0.6197, 0.6956])

In [12]:
lr_model.intercept

0.14228558260358093

**Sumary**

In [14]:
trainning_summary = lr_model.summary
type(trainning_summary)

pyspark.ml.regression.LinearRegressionTrainingSummary

In [17]:
trainning_summary.rootMeanSquaredError

10.16309157133015

**Realistic situation**

In [18]:
all_data = spark.read.format('libsvm').load('sample_linear_regression_data.txt')

In [22]:
train_data,test_data = all_data.randomSplit([0.7,0.3])

**Show Data**

In [25]:
train_data.show(10)

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
|-28.571478869743427|(10,[0,1,2,3,4,5,...|
|-26.805483428483072|(10,[0,1,2,3,4,5,...|
|-26.736207182601724|(10,[0,1,2,3,4,5,...|
| -23.51088409032297|(10,[0,1,2,3,4,5,...|
|-23.487440120936512|(10,[0,1,2,3,4,5,...|
|-22.949825936196074|(10,[0,1,2,3,4,5,...|
|-21.432387764165806|(10,[0,1,2,3,4,5,...|
|-20.212077258958672|(10,[0,1,2,3,4,5,...|
|-19.782762789614537|(10,[0,1,2,3,4,5,...|
| -19.66731861537172|(10,[0,1,2,3,4,5,...|
+-------------------+--------------------+
only showing top 10 rows



**Describe**

In [26]:
train_data.describe().show()

+-------+--------------------+
|summary|               label|
+-------+--------------------+
|  count|                 366|
|   mean|0.040450423565125675|
| stddev|   9.918347708756695|
|    min| -28.571478869743427|
|    max|  26.903524792043335|
+-------+--------------------+



In [27]:
test_data.describe().show()

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|                135|
|   mean| 0.8436773695915133|
| stddev| 11.349062976422784|
|    min|-28.046018037776633|
|    max|  27.78383192005107|
+-------+-------------------+



**Correct model**

In [29]:
correct_model = lr.fit(train_data)

In [30]:
test_results = correct_model.evaluate(test_data)

In [33]:
test_results.residuals.show(10)

+-------------------+
|          residuals|
+-------------------+
|-27.392656305761747|
| -20.40751362716858|
|-21.348770352976626|
| -21.14696833585335|
|-19.527986482715676|
|-20.953297114032214|
|-17.174247745234275|
| -19.82008550699919|
|-15.507236481222101|
| -15.78451883130861|
+-------------------+
only showing top 10 rows



In [34]:
test_results.rootMeanSquaredError

11.328332293356908

In [38]:
unlabeled_data = test_data.select('features')
type (unlabeled_data)

pyspark.sql.dataframe.DataFrame

In [36]:
unlabeled_data.show(5)

+--------------------+
|            features|
+--------------------+
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
+--------------------+
only showing top 5 rows



**Prediction**

In [37]:
predictions = correct_model.transform(unlabeled_data)

In [39]:
predictions.show(10)

+--------------------+-------------------+
|            features|         prediction|
+--------------------+-------------------+
|(10,[0,1,2,3,4,5,...|-0.6533617320148861|
|(10,[0,1,2,3,4,5,...| -2.429946789750763|
|(10,[0,1,2,3,4,5,...| 1.2912877371874125|
|(10,[0,1,2,3,4,5,...| 1.2624075615799273|
|(10,[0,1,2,3,4,5,...|-0.3450045553527287|
|(10,[0,1,2,3,4,5,...| 1.5509610838176622|
|(10,[0,1,2,3,4,5,...| -1.994044877729487|
|(10,[0,1,2,3,4,5,...| 2.3914109360596867|
|(10,[0,1,2,3,4,5,...|-1.5192557829874476|
|(10,[0,1,2,3,4,5,...|-0.9345780022964791|
+--------------------+-------------------+
only showing top 10 rows

