<a href="https://colab.research.google.com/github/JBobrutsky/pyspark-ML-in-Colab/blob/master/PySpark_PCA_Regression_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Install Spark

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

## Load and Organize Data

In [0]:
from google.colab import files
files.upload()

In [0]:
dataset = spark.read.csv('BostonHousing.csv',inferSchema=True, header =True)

dataset.show()

dataset.printSchema()

+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
|   crim|  zn|indus|chas|  nox|   rm|  age|   dis|rad|tax|ptratio|     b|lstat|medv|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
|0.00632|18.0| 2.31|   0|0.538|6.575| 65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|
|0.02731| 0.0| 7.07|   0|0.469|6.421| 78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|
|0.02729| 0.0| 7.07|   0|0.469|7.185| 61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|
|0.03237| 0.0| 2.18|   0|0.458|6.998| 45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|
|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|
|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|222|   18.7|394.12| 5.21|28.7|
|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5605|  5|311|   15.2| 395.6|12.43|22.9|
|0.14455|12.5| 7.87|   0|0.524|6.172| 96.1|5.9505|  5|311|   15.2| 396.9|19.15|27.1|
|0.21124|12.5| 7.87|   0|0.524|5.631|100.0|6.0821|  5|311|   15.2

In [0]:
from pyspark.ml.feature import VectorAssembler

#Input all the features in one vector column
assembler = VectorAssembler(inputCols=['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'b', 'lstat'], outputCol = 'Attributes')
output = assembler.transform(dataset)

#Input vs Output
finalized_data = output.select("Attributes","medv")
finalized_data.show()

#Split training and testing data
train_data,test_data = finalized_data.randomSplit([0.8,0.2])

+--------------------+----+
|          Attributes|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
|[0.03237,0.0,2.18...|33.4|
|[0.06905,0.0,2.18...|36.2|
|[0.02985,0.0,2.18...|28.7|
|[0.08829,12.5,7.8...|22.9|
|[0.14455,12.5,7.8...|27.1|
|[0.21124,12.5,7.8...|16.5|
|[0.17004,12.5,7.8...|18.9|
|[0.22489,12.5,7.8...|15.0|
|[0.11747,12.5,7.8...|18.9|
|[0.09378,12.5,7.8...|21.7|
|[0.62976,0.0,8.14...|20.4|
|[0.63796,0.0,8.14...|18.2|
|[0.62739,0.0,8.14...|19.9|
|[1.05393,0.0,8.14...|23.1|
|[0.7842,0.0,8.14,...|17.5|
|[0.80271,0.0,8.14...|20.2|
|[0.7258,0.0,8.14,...|18.2|
+--------------------+----+
only showing top 20 rows



## Linear Regression

In [0]:
from pyspark.ml.regression import LinearRegression

regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'medv')
#Learn to fit the model from training set
regressor = regressor.fit(train_data)
#To predict the prices on testing set
pred = regressor.evaluate(test_data)
#Predict the model
pred.predictions.show()

+--------------------+----+------------------+
|          Attributes|medv|        prediction|
+--------------------+----+------------------+
|[0.01778,95.0,1.4...|32.9|30.859829886607418|
|[0.0187,85.0,4.15...|23.1|25.729672608033585|
|[0.02498,0.0,1.89...|16.5| 21.91729418444487|
|[0.02899,40.0,1.2...|26.6| 22.12654263143647|
|[0.02985,0.0,2.18...|28.7|25.131048902410726|
|[0.03427,0.0,5.19...|19.5|19.881479595545645|
|[0.03584,80.0,3.3...|23.5| 30.66120330591262|
|[0.03768,80.0,1.5...|34.6|35.413377869114875|
|[0.04011,80.0,1.5...|33.3|36.747906202018605|
|[0.04417,70.0,2.2...|24.8|31.466869254658658|
|[0.0459,52.5,5.32...|22.3| 27.79818359238362|
|[0.05372,0.0,13.9...|27.1|27.667564807272303|
|[0.05602,0.0,2.46...|50.0|  35.3638504646294|
|[0.0578,0.0,2.46,...|37.2| 32.46005549730855|
|[0.06162,0.0,4.39...|17.2|14.973894127106412|
|[0.06263,0.0,11.9...|22.4| 22.93502027220878|
|[0.0686,0.0,2.89,...|33.2|31.843105648627095|
|[0.06911,45.0,3.4...|30.5| 30.44960375135788|
|[0.07013,0.0

In [0]:
#coefficient of the regression model
coeff = regressor.coefficients
#X and Y intercept
intr = regressor.intercept
print ("The coefficient of the model is : %a" %coeff)
print ("The Intercept of the model is : %f" %intr)

The coefficient of the model is : DenseVector([-0.1147, 0.0521, 0.042, 2.1887, -19.2016, 3.7468, 0.0087, -1.3656, 0.3327, -0.0132, -1.0335, 0.0084, -0.5051])
The Intercept of the model is : 38.103696


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
reval = RegressionEvaluator(labelCol="medv", predictionCol="prediction", metricName="rmse")
# Root Mean Square Error
rmse = reval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)
# Mean Square Error
mse = reval.evaluate(pred.predictions, {reval.metricName: "mse"})
print("MSE: %.3f" % mse)
# Mean Absolute Error
mae = reval.evaluate(pred.predictions, {reval.metricName: "mae"})
print("MAE: %.3f" % mae)
# r2 - coefficient of determination
r2 = reval.evaluate(pred.predictions, {reval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 5.504
MSE: 30.298
MAE: 3.512
r2: 0.629


## PCA

In [0]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

FEATURE_NUM = 10
pca = PCA(k=FEATURE_NUM, inputCol="Attributes", outputCol="pcaFeatures")
model = pca.fit(finalized_data)

result = model.transform(finalized_data).select("pcaFeatures")
result.show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|pcaFeatures                                                                                                                                                                                       |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[-170.48334946511406,466.4384020319935,20.078098061799402,50.75554287623922,-4.604172476281945,0.25409369238824464,-2.6753499433893433,-10.202479732479079,-15.550371374788469,1.2561467094913883]|
|[-121.41203669936277,450.7621375378928,47.68729219561534,46.76072566080398,-2.415764243841794,3.324343052394645,-2.271392148019748,-5.640134387359168,-17.953554732923944,2.885110662438193]      |
|[-120.99048471

### Join the new feature vector with the original data

In [0]:
from pyspark.sql.types import StructType, StructField, LongType

def with_column_index(sdf): 
    new_schema = StructType(sdf.schema.fields + [StructField("ColumnIndex", LongType(), False),])
    return sdf.rdd.zipWithIndex().map(lambda row: row[0] + (row[1],)).toDF(schema=new_schema)


In [0]:
df1_ci = with_column_index(finalized_data)
df2_ci = with_column_index(result)

finalized_data_pca = df1_ci.join(df2_ci, df1_ci.ColumnIndex == df2_ci.ColumnIndex, 'inner').drop("ColumnIndex")
finalized_data_pca.show()

+--------------------+----+--------------------+
|          Attributes|medv|         pcaFeatures|
+--------------------+----+--------------------+
|[0.67191,0.0,8.14...|16.6|[-190.21866129551...|
|[1.00245,0.0,8.14...|21.0|[-188.93883375276...|
|[8.05579,0.0,18.1...|13.8|[-540.16466466912...|
|[0.03584,80.0,3.3...|23.5|[-202.90294258659...|
|[0.06911,45.0,3.4...|30.5|[-265.58763370271...|
|[73.5341,0.0,18.1...| 8.8|[-640.41543598763...|
|[0.62356,0.0,6.2,...|27.5|[-185.22997145593...|
|[0.29916,20.0,6.9...|21.1|[-101.96707923547...|
|[0.08265,0.0,13.9...|23.9|[-161.24962550251...|
|[0.12757,30.0,4.9...|23.7|[-175.71654852674...|
|[0.07978,40.0,6.4...|29.1|[-127.18362140013...|
|[13.5222,0.0,18.1...|23.1|[-605.01293880659...|
|[5.66637,0.0,18.1...|18.4|[-527.89131680193...|
|[0.7258,0.0,8.14,...|18.2|[-184.30813795673...|
|[0.0136,75.0,4.0,...|18.9|[-331.20744297629...|
|[0.05372,0.0,13.9...|27.1|[-165.12386860461...|
|[0.00632,18.0,2.3...|24.0|[-170.48334946511...|
|[0.06127,40.0,6.4..

### Retrain the Linear Regression

In [0]:
# from pyspark.ml.regression import LinearRegression

#Split training and testing data
train_data,test_data = finalized_data_pca.randomSplit([0.8,0.2])

regressor = LinearRegression(featuresCol = 'pcaFeatures', labelCol = 'medv')
#Learn to fit the model from training set
regressor = regressor.fit(train_data)
#To predict the prices on testing set
pred_pca = regressor.evaluate(test_data)
#Predict the model
pred_pca.predictions.show()

+--------------------+----+--------------------+------------------+
|          Attributes|medv|         pcaFeatures|        prediction|
+--------------------+----+--------------------+------------------+
|[13.5222,0.0,18.1...|23.1|[-605.01293880659...|19.832507690301853|
|[0.00632,18.0,2.3...|24.0|[-170.48334946511...| 32.31148176654435|
|[0.22212,0.0,10.0...|18.7|[-303.71052712947...| 19.93380566568665|
|[0.34006,0.0,21.8...|19.2|[-309.46208298914...| 18.77910735222654|
|[0.14455,12.5,7.8...|27.1|[-188.20788381191...| 18.50751917513194|
|[9.91655,0.0,18.1...| 6.3|[-543.22367108108...| 9.106035085990989|
|[0.38735,0.0,25.6...|15.7|[-83.457430173030...|12.633149049504695|
|[0.04294,28.0,15....|20.6|[-147.01130615636...| 26.69886316420293|
|[7.02259,0.0,18.1...|14.2|[-549.56679995829...|19.873155229946406|
|[0.15936,0.0,6.91...|24.7|[-107.52105567350...|25.082230466216316|
|[0.21719,0.0,10.5...|22.4|[-154.65659173946...|20.082507492671688|
|[0.5405,20.0,3.97...|43.5|[-141.06773850939...|

In [0]:
# from pyspark.ml.evaluation import RegressionEvaluator
reval = RegressionEvaluator(labelCol="medv", predictionCol="prediction", metricName="rmse")
# Root Mean Square Error
rmse = reval.evaluate(pred_pca.predictions)
print("RMSE: %.3f" % rmse)
# Mean Square Error
mse = reval.evaluate(pred_pca.predictions, {reval.metricName: "mse"})
print("MSE: %.3f" % mse)
# Mean Absolute Error
mae = reval.evaluate(pred_pca.predictions, {reval.metricName: "mae"})
print("MAE: %.3f" % mae)
# r2 - coefficient of determination
r2 = reval.evaluate(pred_pca.predictions, {reval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 6.045
MSE: 36.541
MAE: 4.459
r2: 0.595


RegressionEvaluator:

*   RMSE: 5.504
*   MSE: 30.298
*   MAE: 3.512
*   r2: 0.629

## Visualization

In [0]:
import pandas as pd
df = pd.read_csv("BostonHousing.csv")
df.head()

In [0]:
import seaborn as sns
sns.set(style="ticks")

sns.pairplot(df)

In [0]:
sns.set(style="ticks")
df_pca = finalized_data_pca.toPandas()
# sns.pairplot(df_pca)
# df_pca_col = pd.DataFrame(df_pca.pcaFeatures.tolist(), columns=['pca1', 'pca2', 'pca3'])

# df_pca.pcaFeatures.tolist()

In [0]:
df_pca['pcaFeatures'].values.tolist()