<a href="https://colab.research.google.com/github/Sparrow0hawk/spark_in_colabs/blob/master/pyspark_in_colab_test1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Testing using apache spark in Colabs

Based on https://towardsdatascience.com/pyspark-in-google-colab-6821c2faf41c

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.mirror.anlx.net/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!wget https://raw.githubusercontent.com/asifahmed90/pyspark-ML-in-Colab/master/BostonHousing.csv


In [0]:
! ls

BostonHousing.csv  sample_data		      spark-2.4.3-bin-hadoop2.7.tgz
policedata_dump    spark-2.4.3-bin-hadoop2.7


In [0]:
!tar xf spark-2.4.3-bin-hadoop2.7.tgz


In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7/"

In [0]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

dataset = spark.read.csv('BostonHousing.csv',inferSchema=True, header =True)

In [0]:
dataset.printSchema()

root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- b: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



In [0]:
print(dataset.count())

print(len(dataset.columns))

506
14


In [0]:
#Input all the features in one vector column
assembler = VectorAssembler(inputCols=['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'b', 'lstat'], outputCol = 'Attributes')

output = assembler.transform(dataset)

#Input vs Output
finalized_data = output.select("Attributes","medv")

finalized_data.show()

+--------------------+----+
|          Attributes|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
|[0.03237,0.0,2.18...|33.4|
|[0.06905,0.0,2.18...|36.2|
|[0.02985,0.0,2.18...|28.7|
|[0.08829,12.5,7.8...|22.9|
|[0.14455,12.5,7.8...|27.1|
|[0.21124,12.5,7.8...|16.5|
|[0.17004,12.5,7.8...|18.9|
|[0.22489,12.5,7.8...|15.0|
|[0.11747,12.5,7.8...|18.9|
|[0.09378,12.5,7.8...|21.7|
|[0.62976,0.0,8.14...|20.4|
|[0.63796,0.0,8.14...|18.2|
|[0.62739,0.0,8.14...|19.9|
|[1.05393,0.0,8.14...|23.1|
|[0.7842,0.0,8.14,...|17.5|
|[0.80271,0.0,8.14...|20.2|
|[0.7258,0.0,8.14,...|18.2|
+--------------------+----+
only showing top 20 rows



In [0]:
#Split training and testing data
train_data,test_data = finalized_data.randomSplit([0.8,0.2])

regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'medv')

#Learn to fit the model from training set
regressor = regressor.fit(train_data)

#To predict the prices on testing set
pred = regressor.evaluate(test_data)

#Predict the model
pred.predictions.show()

+--------------------+----+------------------+
|          Attributes|medv|        prediction|
+--------------------+----+------------------+
|[0.0136,75.0,4.0,...|18.9|15.372183075942722|
|[0.01965,80.0,1.7...|20.1| 20.04997191854074|
|[0.02055,85.0,0.7...|24.7|24.926136003338506|
|[0.03113,0.0,4.39...|17.5| 16.00796132822326|
|[0.0456,0.0,13.89...|23.3| 26.79203635634001|
|[0.04666,80.0,1.5...|30.3|32.487995147628524|
|[0.04684,0.0,3.41...|22.6|27.279320408625026|
|[0.04741,0.0,11.9...|11.9|22.910550365640923|
|[0.04981,21.0,5.6...|23.4|23.795571999400376|
|[0.05425,0.0,4.05...|24.6|29.617764280917914|
|[0.05479,33.0,2.1...|28.4| 31.08378498208328|
|[0.05602,0.0,2.46...|50.0| 35.00568222038368|
|[0.05644,40.0,6.4...|32.4| 36.29122925469472|
|[0.0566,0.0,3.41,...|23.6| 30.87433371324914|
|[0.06127,40.0,6.4...|33.1| 34.72777000293232|
|[0.06211,40.0,1.2...|22.9| 19.96631872128188|
|[0.06724,0.0,3.24...|22.6|23.355871084692332|
|[0.08387,0.0,12.8...|20.3|22.506683277334727|
|[0.08829,12.

In [0]:
#coefficient of the regression model
coeff = regressor.coefficients

#X and Y intercept
intr = regressor.intercept

print ("The coefficient of the model is : %a" %coeff)
print ("The Intercept of the model is : %f" %intr)

The coefficient of the model is : DenseVector([-0.0982, 0.0569, 0.0413, 2.7137, -22.0147, 2.9795, 0.0195, -1.6157, 0.3222, -0.0138, -0.9903, 0.0068, -0.5875])
The Intercept of the model is : 45.697935


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="medv", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 4.740
MSE: 22.472
MAE: 3.378
r2: 0.737
