# **Big Data with Spark in Google Colab**

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.4.5 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. 
Follow the steps to install the dependencies:

In [None]:
#Set to the latest Spark version- Important
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

Run a local spark session to test your installation:

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Congrats! Your Colab is ready to run Pyspark. Let's build our first ML model. The goal of this exercise to predict the PPE equipment counts needed by considering it as the output variable, and all the other variables as input.

Download the PPE Dataset and keep it somewhere on your computer. Load the dataset into your Colab directory from your local system:

In [None]:
# Importing the basic packages plotting libraries
import numpy as np
import pandas as pd
# Importing the plotting libraries
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
# Option 1 - exporting the file to Google colab
from google.colab import files
files.upload()

In [None]:
# Option 2 - importing from Google colab directory
data = pd.read_csv("/content/Tarrant County COVID PPE Data.csv")

Check the dataset is uploaded correctly in the system by the following command

In [None]:
!ls

 COVID_PPE_Count_Predictions_Py_Spark_May_2020.ipynb
 drive
 sample_data
 spark-2.4.5-bin-hadoop2.7
 spark-2.4.5-bin-hadoop2.7.tgz
'Tarrant County COVID PPE Data.csv'


Reading the data set. Notice that we use InferSchema inside read.csv mofule. InferSchema enables us to infer automatically different data types for each column.

In [None]:
dataset = spark.read.csv('Tarrant County COVID PPE Data.csv',inferSchema=True, header =True)

Let us print look into the dataset to see the data types of each column:

In [None]:
dataset.printSchema()

root
 |-- CaseId: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Prop of non-ER Patients: double (nullable = true)
 |-- Prop of LT50 Age Group: double (nullable = true)
 |-- Air Purity: double (nullable = true)
 |-- Living Score: double (nullable = true)
 |-- PPE Count: integer (nullable = true)



In [None]:
data.dtypes

CaseId                      object
State                       object
Prop of non-ER Patients    float64
Prop of LT50 Age Group     float64
Air Purity                 float64
Living Score               float64
PPE Count                    int64
dtype: object

In [None]:
# Computing the correlation matrix
corr = data.corr()

# Set up the matplotlib figure
f, ax = plt.subplots(figsize=(16, 12))

# Generate a custom diverging colormap
cmap = sns.diverging_palette(220, 10, as_cmap=True)

# Draw the heatmap
sns.heatmap(corr, cmap="YlGn", square=True, ax = ax, annot=True, linewidth = 0.1)
# Setting the title of the graph
plt.title('Pearson Correlation of Features')

Now we can start building the ML model. As a first step, let us import the Vector Assembler from PySpark. 

Vector Assembler is a transformer that assembles all the features into one vector from multiple columns.

In [None]:
from pyspark.ml.feature import VectorAssembler

Next step is to create a Vector variable so as to enable to convert all the features from different columns into a single column. Let's call this new vector column as 'Attributes' in the outputCol.

In [None]:
#Input all the features in one vector column
assembler = VectorAssembler(inputCols=['Prop of non-ER Patients', 'Prop of LT50 Age Group', 'Air Purity', 'Living Score'], 
                            outputCol = 'Attributes')
output = assembler.transform(dataset)
output

DataFrame[CaseId: string, State: string, Prop of non-ER Patients: double, Prop of LT50 Age Group: double, Air Purity: double, Living Score: double, PPE Count: int, Attributes: vector]

Setting 'Attributes' as input features from all the columns and 'PPECount' as the target column.

In [None]:
finalized_data = output.select("Attributes","PPE Count")
finalized_data.show()

+--------------------+---------+
|          Attributes|PPE Count|
+--------------------+---------+
|[14.96,41.76,1024...|      463|
|[25.18,62.96,1020...|      444|
|[5.11,39.4,1012.1...|      489|
|[20.86,57.32,1010...|      446|
|[10.82,37.5,1009....|      474|
|[26.27,59.44,1012...|      444|
|[15.89,43.96,1014...|      467|
|[9.48,44.71,1019....|      478|
|[14.64,45.0,1021....|      476|
|[11.74,43.56,1015...|      478|
|[17.99,43.72,1008...|      453|
|[20.14,46.93,1014...|      454|
|[24.34,73.5,1011....|      440|
|[25.71,58.59,1012...|      451|
|[26.19,69.34,1009...|      434|
|[21.42,43.79,1015...|      462|
|[18.21,45.0,1022....|      468|
|[11.04,41.74,1022...|      477|
|[14.45,52.75,1023...|      460|
|[13.97,38.47,1015...|      464|
+--------------------+---------+
only showing top 20 rows



Split the training and testing data as 80%-20%

In [None]:
#Split training and testing data
train_data,test_data = finalized_data.randomSplit([0.8,0.2])

# Modeling using the Linear Regression ML algorithm
from pyspark.ml.regression import LinearRegression
regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'PPE Count')

#Learn to fit the model from training set
regressor = regressor.fit(train_data)

#To predict the values on the testing set
pred = regressor.evaluate(test_data)

#Predict the model
pred.predictions.show()

+--------------------+---------+------------------+
|          Attributes|PPE Count|        prediction|
+--------------------+---------+------------------+
|[1.81,39.42,1026....|      491| 493.4285055354441|
|[2.34,39.42,1028....|      490| 493.6138694112323|
|[2.58,39.42,1028....|      489| 493.2531179164478|
|[2.71,39.42,1026....|      489|490.98301752743583|
|[2.8,39.64,1011.0...|      483| 489.5035503882529|
|[2.8,39.64,1011.0...|      483| 489.5035503882529|
|[3.0,39.64,1011.0...|      485| 489.5477761274816|
|[3.0,39.64,1011.0...|      485| 489.5477761274816|
|[3.2,41.31,997.67...|      490| 485.0204110275682|
|[3.2,41.31,997.67...|      490| 485.0204110275682|
|[3.21,38.44,1016....|      491|488.80452337873373|
|[3.26,41.31,996.3...|      489| 484.6373597526065|
|[3.26,41.31,996.3...|      489| 484.6373597526065|
|[3.26,41.31,996.3...|      489| 484.6373597526065|
|[3.38,39.64,1011....|      489| 488.6264149260246|
|[3.38,41.31,998.7...|      489|484.90156773075734|
|[3.4,39.64,

We can also print the coefficient and intercept of the regression model by using the following command:

In [None]:
#coefficient of the regression model
coeff = regressor.coefficients

#X and Y intercept
intr = regressor.intercept

print ("The coefficient of the model is : %a" %coeff)
print ("The Intercept of the model is : %f" %intr)

The coefficient of the model is : DenseVector([-1.9803, -0.2331, 0.0614, -0.1563])
The Intercept of the model is : 455.197351


# Model Evaluation

Evaluting the regression model metrics by importing RegressionEvaluator module from Pyspark.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="PPE Count", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 4.560
MSE: 20.796
MAE: 3.614
r2: 0.929
