<a href="https://colab.research.google.com/github/arieframasyarif/PySpark-Regression-Analysis/blob/master/PySpark_Regression_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Running Pyspark in Colab**




**Install the dependencies**
https://www-us.apache.org/dist/spark/



In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

**Set the location of Java and Spark**

In [0]:

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

**Run a local spark session to test our installation**

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

**Mount Google Drive**

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/gdrive


**Read dataset**

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
dataset = spark.read.csv('/content/gdrive/My Drive/Dataset-PySpark/BostonHousing.csv',inferSchema=True, header =True)
dataset.head(10)

[Row(crim=0.00632, zn=18.0, indus=2.31, chas=0, nox=0.538, rm=6.575, age=65.2, dis=4.09, rad=1, tax=296, ptratio=15.3, b=396.9, lstat=4.98, medv=24.0),
 Row(crim=0.02731, zn=0.0, indus=7.07, chas=0, nox=0.469, rm=6.421, age=78.9, dis=4.9671, rad=2, tax=242, ptratio=17.8, b=396.9, lstat=9.14, medv=21.6),
 Row(crim=0.02729, zn=0.0, indus=7.07, chas=0, nox=0.469, rm=7.185, age=61.1, dis=4.9671, rad=2, tax=242, ptratio=17.8, b=392.83, lstat=4.03, medv=34.7),
 Row(crim=0.03237, zn=0.0, indus=2.18, chas=0, nox=0.458, rm=6.998, age=45.8, dis=6.0622, rad=3, tax=222, ptratio=18.7, b=394.63, lstat=2.94, medv=33.4),
 Row(crim=0.06905, zn=0.0, indus=2.18, chas=0, nox=0.458, rm=7.147, age=54.2, dis=6.0622, rad=3, tax=222, ptratio=18.7, b=396.9, lstat=5.33, medv=36.2),
 Row(crim=0.02985, zn=0.0, indus=2.18, chas=0, nox=0.458, rm=6.43, age=58.7, dis=6.0622, rad=3, tax=222, ptratio=18.7, b=394.12, lstat=5.21, medv=28.7),
 Row(crim=0.08829, zn=12.5, indus=7.87, chas=0, nox=0.524, rm=6.012, age=66.6, di

**Data types of each column**

In [0]:
dataset.printSchema()

root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- b: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



In [0]:
#Input all the features in one vector column
assembler = VectorAssembler(inputCols=['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'b', 'lstat'], outputCol = 'Attributes')
output = assembler.transform(dataset)
#Input vs Output
finalized_data = output.select("Attributes","medv")
finalized_data.show()

+--------------------+----+
|          Attributes|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
|[0.03237,0.0,2.18...|33.4|
|[0.06905,0.0,2.18...|36.2|
|[0.02985,0.0,2.18...|28.7|
|[0.08829,12.5,7.8...|22.9|
|[0.14455,12.5,7.8...|27.1|
|[0.21124,12.5,7.8...|16.5|
|[0.17004,12.5,7.8...|18.9|
|[0.22489,12.5,7.8...|15.0|
|[0.11747,12.5,7.8...|18.9|
|[0.09378,12.5,7.8...|21.7|
|[0.62976,0.0,8.14...|20.4|
|[0.63796,0.0,8.14...|18.2|
|[0.62739,0.0,8.14...|19.9|
|[1.05393,0.0,8.14...|23.1|
|[0.7842,0.0,8.14,...|17.5|
|[0.80271,0.0,8.14...|20.2|
|[0.7258,0.0,8.14,...|18.2|
+--------------------+----+
only showing top 20 rows



**Split the training and testing data according to our dataset**

In [0]:
#Split training and testing data
train_data,test_data = finalized_data.randomSplit([0.8,0.2])
regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'medv')
#Learn to fit the model from training set
regressor = regressor.fit(train_data)
#To predict the prices on testing set
pred = regressor.evaluate(test_data)
#Predict the model
pred.predictions.show()

+--------------------+----+------------------+
|          Attributes|medv|        prediction|
+--------------------+----+------------------+
|[0.00906,90.0,2.9...|32.2|31.724112473341094|
|[0.01301,35.0,1.5...|32.7|30.104132932201132|
|[0.01311,90.0,1.2...|35.4| 30.70597051150918|
|[0.01778,95.0,1.4...|32.9| 30.76944399452809|
|[0.01951,17.5,1.3...|33.0|22.439125753064232|
|[0.03113,0.0,4.39...|17.5|16.145107954182393|
|[0.03427,0.0,5.19...|19.5|19.806257774036602|
|[0.03445,82.5,2.0...|24.1|29.877757091816925|
|[0.04301,80.0,1.9...|18.2| 13.71627214631959|
|[0.04462,25.0,4.8...|23.9|26.902608971887123|
|[0.05023,35.0,6.0...|17.1|20.147759496397402|
|[0.05083,0.0,5.19...|22.2|21.492470381896542|
|[0.05372,0.0,13.9...|27.1|27.301793388401173|
|[0.05425,0.0,4.05...|24.6|  29.3739807062421|
|[0.05515,33.0,2.1...|36.1| 33.12293230998227|
|[0.05644,40.0,6.4...|32.4| 37.06294146031155|
|[0.06263,0.0,11.9...|22.4|23.279243435906217|
|[0.06466,70.0,2.2...|22.5|  29.4445357735309|
|[0.06617,0.0

**Coefficient and intercept of the regression model**

In [0]:
#coefficient of the regression model
coeff = regressor.coefficients
#X and Y intercept
intr = regressor.intercept
print ("The coefficient of the model is : %a" %coeff)
print ("The Intercept of the model is : %f" %intr)

The coefficient of the model is : DenseVector([-0.1119, 0.0499, 0.0162, 3.3871, -20.7122, 3.5445, -0.0011, -1.6591, 0.3013, -0.0111, -1.0282, 0.0079, -0.531])
The Intercept of the model is : 42.138003


**Analyze our model statistically by importing RegressionEvaluator module from Pyspark**

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="medv", predictionCol="prediction", metricName="rmse")
# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)
# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)
# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)
# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 3.693
MSE: 13.639
MAE: 2.714
r2: 0.780
