<a href="https://colab.research.google.com/github/ernesto-miguez/Machine-Learning-training/blob/master/Linear_Regression_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init()

In [0]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext(appName='JSON Schema')
sqlContext = SQLContext(sc)

In [3]:
from google.colab import  files
uploaded = files.upload()

Saving Boston.csv to Boston (1).csv


In [4]:
!ls

'Boston (1).csv'	     spark-2.4.5-bin-hadoop2.7.tgz
 Boston.csv		     spark-2.4.5-bin-hadoop2.7.tgz.1
 sample_data		     spark-2.4.5-bin-hadoop2.7.tgz.2
 spark-2.4.5-bin-hadoop2.7   spark-2.4.5-bin-hadoop2.7.tgz.3


In [0]:
import io
import pandas as pd
data =  pd.read_csv(io.BytesIO(uploaded['Boston.csv']))

In [6]:
data.head()

Unnamed: 0.1,Unnamed: 0,crim,zn,indus,chas,nox,rm,age,dis,rad,tax,ptratio,black,lstat,medv
0,1,0.00632,18.0,2.31,0,0.538,6.575,65.2,4.09,1,296,15.3,396.9,4.98,24.0
1,2,0.02731,0.0,7.07,0,0.469,6.421,78.9,4.9671,2,242,17.8,396.9,9.14,21.6
2,3,0.02729,0.0,7.07,0,0.469,7.185,61.1,4.9671,2,242,17.8,392.83,4.03,34.7
3,4,0.03237,0.0,2.18,0,0.458,6.998,45.8,6.0622,3,222,18.7,394.63,2.94,33.4
4,5,0.06905,0.0,2.18,0,0.458,7.147,54.2,6.0622,3,222,18.7,396.9,5.33,36.2


## Converting pandas dataframe into spark dataframe

In [0]:
df = sqlContext.createDataFrame(data)

In [11]:
df.show()

+----------+--------------------+----+-----+----+-------------------+------------------+-----+------+---+---+-------+------+-----+----+
|Unnamed: 0|                crim|  zn|indus|chas|                nox|                rm|  age|   dis|rad|tax|ptratio| black|lstat|medv|
+----------+--------------------+----+-----+----+-------------------+------------------+-----+------+---+---+-------+------+-----+----+
|         1|             0.00632|18.0| 2.31|   0| 0.5379999999999999|             6.575| 65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|
|         2|             0.02731| 0.0| 7.07|   0|              0.469|             6.421| 78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|
|         3|             0.02729| 0.0| 7.07|   0|              0.469|             7.185| 61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|
|         4|0.032369999999999996| 0.0| 2.18|   0|0.45799999999999996| 6.997999999999999| 45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|
|         5|             0.06905| 0.0| 2.18|   0

## Creating a features array using VectorAssembler


In [0]:
feature_columns = list(df.columns[:-1]) # We omit the final column
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

In [0]:
data_2 = assembler.transform(df)

In [14]:
data_2.show()

+----------+--------------------+----+-----+----+-------------------+------------------+-----+------+---+---+-------+------+-----+----+--------------------+
|Unnamed: 0|                crim|  zn|indus|chas|                nox|                rm|  age|   dis|rad|tax|ptratio| black|lstat|medv|            features|
+----------+--------------------+----+-----+----+-------------------+------------------+-----+------+---+---+-------+------+-----+----+--------------------+
|         1|             0.00632|18.0| 2.31|   0| 0.5379999999999999|             6.575| 65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|[1.0,0.00632,18.0...|
|         2|             0.02731| 0.0| 7.07|   0|              0.469|             6.421| 78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|[2.0,0.02731,0.0,...|
|         3|             0.02729| 0.0| 7.07|   0|              0.469|             7.185| 61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|[3.0,0.02729,0.0,...|
|         4|0.032369999999999996| 0.0| 2.18|   0|0.4579999

## Train/Test Split (70% / 30%)

In [0]:
train, test = data_2.randomSplit([0.7, 0.3])

## Training the Machine Learning Algorithm

In [0]:
from pyspark.ml.regression import LinearRegression

# We define the algorithm variable
algo = LinearRegression(featuresCol="features", labelCol="medv")
model = algo.fit(train)

## Evaluating Model

In [0]:
evaluation_summary = model.evaluate(test)

In [26]:
print("Mean Absolute Error = " + str(evaluation_summary.meanAbsoluteError))
print("RMSE = " + str(evaluation_summary.rootMeanSquaredError))
print("R2 = " + str(evaluation_summary.r2))

Mean Absolute Error = 3.5851106029673265
RMSE = 5.358209533209839
R2 = 0.6870630073903935


## Predicting Values

In [0]:
predictions = model.transform(test)

In [32]:
predictions.select(predictions.columns[14:]).show(10)

+----+--------------------+------------------+
|medv|            features|        prediction|
+----+--------------------+------------------+
|34.7|[3.0,0.02729,0.0,...|29.938981279230163|
|36.2|[5.0,0.06905,0.0,...|27.502399718871658|
|28.7|[6.0,0.02985,0.0,...| 24.77325812979857|
|22.9|[7.0,0.08829,12.5...| 22.83870433113871|
|27.1|[8.0,0.14455,12.5...| 19.43158073325713|
|16.5|[9.0,0.21124,12.5...| 11.91232218097015|
|21.7|[13.0,0.09378,12....|21.146656394455295|
|23.1|[17.0,1.05393,0.0...|20.790431099323925|
|13.6|[21.0,1.25179,0.0...|12.958062989088596|
|19.6|[22.0,0.852039999...|17.830814666069955|
+----+--------------------+------------------+
only showing top 10 rows

