In [1]:
import findspark 
findspark.init("/Users/valentinaporcu/spark/spark-2.4.1-bin-hadoop2.7")
import pyspark 
from pyspark.sql import DataFrameNaFunctions 
from pyspark.sql.functions import lit 
from pyspark.ml.feature import StringIndexer  
from pyspark.ml import Pipeline 
from pyspark.sql import SparkSession
from pyspark.sql import functions
import pandas as pd
import numpy as np

In [2]:
spark = SparkSession.builder.appName('linear_regression').getOrCreate()

In [3]:
df = spark.read.csv("iris.csv", inferSchema=True, header=True)

In [None]:
df.head()

In [5]:
df.show()

+---+------------+-----------+------------+-----------+-------+
|_c0|Sepal_Length|Sepal_Width|Petal_Length|Petal_Width|Species|
+---+------------+-----------+------------+-----------+-------+
|  1|         5.1|        3.5|         1.4|        0.2| setosa|
|  2|         4.9|        3.0|         1.4|        0.2| setosa|
|  3|         4.7|        3.2|         1.3|        0.2| setosa|
|  4|         4.6|        3.1|         1.5|        0.2| setosa|
|  5|         5.0|        3.6|         1.4|        0.2| setosa|
|  6|         5.4|        3.9|         1.7|        0.4| setosa|
|  7|         4.6|        3.4|         1.4|        0.3| setosa|
|  8|         5.0|        3.4|         1.5|        0.2| setosa|
|  9|         4.4|        2.9|         1.4|        0.2| setosa|
| 10|         4.9|        3.1|         1.5|        0.1| setosa|
| 11|         5.4|        3.7|         1.5|        0.2| setosa|
| 12|         4.8|        3.4|         1.6|        0.2| setosa|
| 13|         4.8|        3.0|         1

In [4]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Sepal_Length: double (nullable = true)
 |-- Sepal_Width: double (nullable = true)
 |-- Petal_Length: double (nullable = true)
 |-- Petal_Width: double (nullable = true)
 |-- Species: string (nullable = true)



In [6]:
# per utilizzare Spark con Python dobbiamo sempre ricordarci che le colonne dei dati
# saranno solo due: la variabile da predire, o label, e tutte le altre variabili 
# aggregate e trasformate in un'unica colonna

In [7]:
# importiamo quindi i moduli per trasformare i dati

In [8]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [9]:
df.columns

['_c0',
 'Sepal_Length',
 'Sepal_Width',
 'Petal_Length',
 'Petal_Width',
 'Species']

In [10]:
assembler = VectorAssembler(
    inputCols=["Sepal_Length", "Sepal_Width", "Petal_Length"],
    outputCol="features")

In [11]:
transform = assembler.transform(df)

In [12]:
transform.select("features").show()

+-------------+
|     features|
+-------------+
|[5.1,3.5,1.4]|
|[4.9,3.0,1.4]|
|[4.7,3.2,1.3]|
|[4.6,3.1,1.5]|
|[5.0,3.6,1.4]|
|[5.4,3.9,1.7]|
|[4.6,3.4,1.4]|
|[5.0,3.4,1.5]|
|[4.4,2.9,1.4]|
|[4.9,3.1,1.5]|
|[5.4,3.7,1.5]|
|[4.8,3.4,1.6]|
|[4.8,3.0,1.4]|
|[4.3,3.0,1.1]|
|[5.8,4.0,1.2]|
|[5.7,4.4,1.5]|
|[5.4,3.9,1.3]|
|[5.1,3.5,1.4]|
|[5.7,3.8,1.7]|
|[5.1,3.8,1.5]|
+-------------+
only showing top 20 rows



In [13]:
transform.show()

+---+------------+-----------+------------+-----------+-------+-------------+
|_c0|Sepal_Length|Sepal_Width|Petal_Length|Petal_Width|Species|     features|
+---+------------+-----------+------------+-----------+-------+-------------+
|  1|         5.1|        3.5|         1.4|        0.2| setosa|[5.1,3.5,1.4]|
|  2|         4.9|        3.0|         1.4|        0.2| setosa|[4.9,3.0,1.4]|
|  3|         4.7|        3.2|         1.3|        0.2| setosa|[4.7,3.2,1.3]|
|  4|         4.6|        3.1|         1.5|        0.2| setosa|[4.6,3.1,1.5]|
|  5|         5.0|        3.6|         1.4|        0.2| setosa|[5.0,3.6,1.4]|
|  6|         5.4|        3.9|         1.7|        0.4| setosa|[5.4,3.9,1.7]|
|  7|         4.6|        3.4|         1.4|        0.3| setosa|[4.6,3.4,1.4]|
|  8|         5.0|        3.4|         1.5|        0.2| setosa|[5.0,3.4,1.5]|
|  9|         4.4|        2.9|         1.4|        0.2| setosa|[4.4,2.9,1.4]|
| 10|         4.9|        3.1|         1.5|        0.1| setosa|[

In [14]:
transformed_df = transform.select('features','Petal_Width')

In [15]:
train, test = transformed_df.randomSplit([0.7,0.3])

In [16]:
train.show()

+-------------+-----------+
|     features|Petal_Width|
+-------------+-----------+
|[4.4,2.9,1.4]|        0.2|
|[4.4,3.0,1.3]|        0.2|
|[4.4,3.2,1.3]|        0.2|
|[4.5,2.3,1.3]|        0.3|
|[4.6,3.1,1.5]|        0.2|
|[4.7,3.2,1.3]|        0.2|
|[4.7,3.2,1.6]|        0.2|
|[4.8,3.0,1.4]|        0.1|
|[4.8,3.0,1.4]|        0.3|
|[4.8,3.4,1.6]|        0.2|
|[4.8,3.4,1.9]|        0.2|
|[4.9,2.4,3.3]|        1.0|
|[4.9,2.5,4.5]|        1.7|
|[4.9,3.0,1.4]|        0.2|
|[4.9,3.1,1.5]|        0.1|
|[4.9,3.1,1.5]|        0.2|
|[4.9,3.6,1.4]|        0.1|
|[5.0,2.0,3.5]|        1.0|
|[5.0,2.3,3.3]|        1.0|
|[5.0,3.0,1.6]|        0.2|
+-------------+-----------+
only showing top 20 rows



In [17]:
train.describe().show()

+-------+------------------+
|summary|       Petal_Width|
+-------+------------------+
|  count|               105|
|   mean|1.1780952380952374|
| stddev|0.7544714084888214|
|    min|               0.1|
|    max|               2.5|
+-------+------------------+



In [18]:
test.describe().show()

+-------+------------------+
|summary|       Petal_Width|
+-------+------------------+
|  count|                45|
|   mean|1.2488888888888885|
| stddev|0.7864252325966189|
|    min|               0.1|
|    max|               2.5|
+-------+------------------+



In [19]:
# creiamo il modello di regressione

In [20]:
from pyspark.ml.regression import LinearRegression

In [21]:
# lr = LinearRegression()

In [22]:
lr = LinearRegression(featuresCol='features', labelCol='Petal_Width', 
                      predictionCol='prediction')

In [23]:
# adattiamolo ai dati

In [24]:
lr_model = lr.fit(train)

In [25]:
# stampiamo i coefficienti

In [26]:
print("Coefficients: {} Intercept: {}".format(lr_model.coefficients, lr_model.intercept))

Coefficients: [-0.20700703432769077,0.24044089331695623,0.5308134501427937] Intercept: -0.3126158582707814


In [27]:
# creiamo le predizioni sui dati di test

In [28]:
new_data = test.select('features')

In [29]:
predictions = lr_model.transform(new_data)

In [30]:
predictions.show()

+-------------+-------------------+
|     features|         prediction|
+-------------+-------------------+
|[4.3,3.0,1.1]|0.10247136922809019|
|[4.6,3.2,1.4]|0.24770147263601217|
|[4.6,3.4,1.4]| 0.2957896512994034|
|[4.6,3.6,1.0]|0.13155244990567727|
|[4.8,3.1,1.6]|0.28841866646733716|
|[5.0,3.2,1.2]|0.05873596887637711|
|[5.0,3.3,1.4]|0.18894274823663137|
|[5.0,3.4,1.6]|0.31914952759688586|
|[5.0,3.6,1.4]|0.26107501623171836|
|[5.1,2.5,3.0]| 0.8251908503787673|
|[5.1,3.4,1.5]|0.24536747914983736|
|[5.1,3.8,1.5]| 0.3415438364766198|
|[5.1,3.8,1.9]| 0.5538692165337371|
|[5.2,3.4,1.4]|0.17158543070278887|
|[5.5,2.4,3.7]|  1.089913362415951|
|[5.6,2.5,3.9]|  1.199419438343436|
|[5.6,2.7,4.2]|  1.406751652049666|
|[5.6,3.0,4.1]| 1.4258025750304728|
|[5.6,3.0,4.5]| 1.6381279550875905|
|[5.7,3.0,4.2]| 1.4581832166119837|
+-------------+-------------------+
only showing top 20 rows



In [31]:
test.show()

+-------------+-----------+
|     features|Petal_Width|
+-------------+-----------+
|[4.3,3.0,1.1]|        0.1|
|[4.6,3.2,1.4]|        0.2|
|[4.6,3.4,1.4]|        0.3|
|[4.6,3.6,1.0]|        0.2|
|[4.8,3.1,1.6]|        0.2|
|[5.0,3.2,1.2]|        0.2|
|[5.0,3.3,1.4]|        0.2|
|[5.0,3.4,1.6]|        0.4|
|[5.0,3.6,1.4]|        0.2|
|[5.1,2.5,3.0]|        1.1|
|[5.1,3.4,1.5]|        0.2|
|[5.1,3.8,1.5]|        0.3|
|[5.1,3.8,1.9]|        0.4|
|[5.2,3.4,1.4]|        0.2|
|[5.5,2.4,3.7]|        1.0|
|[5.6,2.5,3.9]|        1.1|
|[5.6,2.7,4.2]|        1.3|
|[5.6,3.0,4.1]|        1.3|
|[5.6,3.0,4.5]|        1.5|
|[5.7,3.0,4.2]|        1.2|
+-------------+-----------+
only showing top 20 rows



In [32]:
test_results = lr_model.evaluate(test)

In [33]:
print("RMSE: {}".format(test_results.rootMeanSquaredError))
print("MSE: {}".format(test_results.meanSquaredError))

RMSE: 0.173559973739051
MSE: 0.03012306448430007


In [34]:
trainingSummary = lr_model.summary

In [35]:
print("RMSE: {}".format(trainingSummary.rootMeanSquaredError))
print("r2: {}".format(trainingSummary.r2))

RMSE: 0.19649268677798304
r2: 0.9315200918396407


In [36]:
spark.stop()