# Jupyter Notebooks and Spark for Data Processing
![Spark](images/spark.jpeg)


## The Objectives of this Talk are

* Provide a broad explanation of:
    * What is a Jupyter Notebook and what is it good for.
    * What is Spark and what is it good for.
* Provide an example on how to process data and run a model with Spark.

## Before We Start

1. Download Docker. https://www.docker.com/products
2. Download the pyspark container by typing `docker pull jupyter/all-spark-notebook` in the terminal
3. Run the container in the terminal: `docker run -it --rm -p 8888:8888 jupyter/all-spark-notebook`
4. Copy the URL displayed on terminal
5. Paste the URL in your navigator
6. Click on New
7. Select [Python](images/create_python_notebook.png).
8. Make sure you comply with at least steps 1-3 before we start this crash course.

##  A Jupyter Notebook is a Web Application
![Notebook](images/notebooks.jpeg)

 > A [Jupyter Notebook](https://jupyter.org/) is a web application for creating and sharing computational documents. It offers a simple, streamlined, document-centric experience

## A Jupyter Notebook:

* Provides a sandbox-like environment for experimenting
* Has markdown enabled that facilitates code or process documentation
* Allows to add images
* Allows to run code
* !!! Can be run in disorder

## Spark is a Unified Analytics Engine for Large-scale Data Processing
![Spark](images/Spark2.jpeg)


https://data-flair.training/blogs/apache-spark-features/

## Spark is a Unified Analytics Engine for Large-scale Data Processing

Apache Spark is an open-source, distributed processing system used for big data workloads. It utilizes in-memory caching and optimized query execution for fast queries against data of any size. Simply put, Spark is a fast and general engine for large-scale data processing.

It is a tool that can be excecuted with Python, R, Scala or Java and allows us to work with Big Data.

https://chartio.com/learn/data-analytics/what-is-spark/

# Big Data is a Ludicrous Amount of Data
For the sake of this talk [Big Data](https://arxiv.org/pdf/1309.5821.pdf&gt) is any amount of data my computer or dedicated server can't read or process on its own.

(A Ludicrous Amount of Data)

<img src="https://i1.sndcdn.com/artworks-000223795050-2qhpbr-t500x500.jpg" alt="drawing" width="400"/>

<p style="text-align: center;">(Don't worry we won't use real Big Data)<p>
    
![sick](images/sick_comp.jpeg)

## Big Data Needs to be Processed by a Team

Ok, so my computer can't run it...

Then, how is Big Data processed?

<img src="https://spark.apache.org/docs/latest/img/cluster-overview.png" alt="drawing" width="600"/>

## MapReduce is an Approach to Process Big Data
   
Spark is based on a Divide and Conquer approach called MapReduce. MapReduce involves:
* Keeping intermediate states
* Identifying dead nodes
* Labeling results to group them

<img src="https://i.stack.imgur.com/199Q1.png" alt="drawing" width="700">

# Example time!!!

## The first thing we need to work with Spark is to define the Spark Context

The SparkContext represents Spark's connetion with the cluster
    
https://sparkbyexamples.com/spark/spark-sparkcontext/

![sc](https://raw.githubusercontent.com/LiberPH/Spark_para_que/main/spark_context.png)

## Initialize Spark Contex

(we will use Python)

In [28]:
import pyspark
sc = pyspark.SparkContext('local[4]')
sc

## Let's read some data!

In [8]:
from pyspark.sql import SparkSession

# https://www.kaggle.com/johnsmith88/heart-disease-dataset
# Puede que tome un tiempo en una computadora local
spark = SparkSession.builder.appName("Basics").getOrCreate()

df = spark.read.csv('data/heart.csv', inferSchema=True, header=True)


## Why it took so long?

![meme](images/meme.jpeg)

Remember: Spark divides the data for processing, that takes time

## You can make almost any SQL-like process (join, count, select, etc...) using Spark as well as some ML algorithms

https://github.com/LiberPH/taller_spark

![Spark](https://docs.snowflake.com/en/_images/spark-snowflake-data-source.png)

## Ok, I just want to know what kind of data I have here without displaying it

In [9]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable = true)
 |-- restecg: integer (nullable = true)
 |-- thalach: integer (nullable = true)
 |-- exang: integer (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: integer (nullable = true)
 |-- ca: integer (nullable = true)
 |-- thal: integer (nullable = true)
 |-- target: integer (nullable = true)



## Heart
1. age
2. sex
3. chest pain type (4 values)
4. resting blood pressure
5. serum cholestoral in mg/dl
6. fasting blood sugar > 120 mg/dl
7. resting electrocardiographic results (values 0,1,2)
8. maximum heart rate achieved
9. exercise induced angina
10. oldpeak = ST depression induced by exercise relative to rest
11. the slope of the peak exercise ST segment
12. number of major vessels (0-3) colored by flourosopy
13. thal: 0 = normal; 1 = fixed defect; 2 = reversable defect
14. target: Is the patient cardiopath or not?
The names and social security numbers of the patients were recently removed from the database, replaced with dummy values.


## Let's get a look at the data


In [11]:
df.show()


+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 52|  1|  0|     125| 212|  0|      1|    168|    0|    1.0|    2|  2|   3|     0|
| 53|  1|  0|     140| 203|  1|      0|    155|    1|    3.1|    0|  0|   3|     0|
| 70|  1|  0|     145| 174|  0|      1|    125|    1|    2.6|    0|  0|   3|     0|
| 61|  1|  0|     148| 203|  0|      1|    161|    0|    0.0|    2|  1|   3|     0|
| 62|  0|  0|     138| 294|  1|      1|    106|    0|    1.9|    1|  3|   2|     0|
| 58|  0|  0|     100| 248|  0|      0|    122|    0|    1.0|    1|  0|   2|     1|
| 58|  1|  0|     114| 318|  0|      2|    140|    0|    4.4|    0|  3|   1|     0|
| 55|  1|  0|     160| 289|  0|      0|    145|    1|    0.8|    1|  1|   3|     0|
| 46|  1|  0|     120| 249|  0|      0|    144|    0|    0.8|    2|  0|   3|

## You can apply simple analytics with Spark!

In [12]:
from pyspark.sql.functions import col as c
import pyspark.sql.functions as F

df.select(
 F.avg(c("age")).alias("mean_age"), F.avg(c("chol")).alias("mean_chol")
).show()

+-----------------+---------+
|         mean_age|mean_chol|
+-----------------+---------+
|54.43414634146342|    246.0|
+-----------------+---------+



In [16]:
df.describe().toPandas()

Unnamed: 0,summary,age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,target
0,count,1025.0,1025.0,1025.0,1025.0,1025.0,1025.0,1025.0,1025.0,1025.0,1025.0,1025.0,1025.0,1025.0,1025.0
1,mean,54.43414634146342,0.6956097560975609,0.942439024390244,131.61170731707318,246.0,0.1492682926829268,0.5297560975609756,149.11414634146342,0.3365853658536585,1.0715121951219524,1.3853658536585365,0.7541463414634146,2.32390243902439,0.5131707317073171
2,stddev,9.072290233244278,0.4603733241196495,1.029640743645865,17.516718005376408,51.59251020618203,0.3565266897271575,0.5278775668748918,23.00572374597721,0.4727723760037115,1.175053255150177,0.6177552671745918,1.0307976650242825,0.6206602380510303,0.5000704980788011
3,min,29.0,0.0,0.0,94.0,126.0,0.0,0.0,71.0,0.0,0.0,0.0,0.0,0.0,0.0
4,max,77.0,1.0,3.0,200.0,564.0,1.0,2.0,202.0,1.0,6.2,2.0,4.0,3.0,1.0


## You can clean your data using Spark

The way to clean your data depends on many things:
* How dirty is the data source?
* How variables is your data?
* What kind of data the ML algorithm you are planning to use supports?
    * Null values
    * Deviation https://deepnote.com/@rajshekar-2021/Outlier-Detection-Pyspark-069e69af-2c1d-4d4d-884a-92aad276d06f


# Ok, let's imagine we already cleaned our data
![meme](images/wishful.jpeg)


## Now, we want to use this data to find out if a person has a heart disease


What kind of ML algorithm should we use?
![ML learning](https://miro.medium.com/max/1200/1*FUZS9K4JPqzfXDcC83BQTw.png)

# The next step would be to select the features to use

## Ok, let's imagine we already selected them XD
![meme](images/wishful.jpeg)

https://github.com/LiberPH/How_to_feature_selector_in_spark

![Kinds of algorithms](https://cdn-images-1.medium.com/max/800/1*rbaxTrB_CZCqbty_zv2bEg.png)

## Let's run the random forest algorithm to solve a classification problem

![RandomForest](https://serokell.io/files/vz/vz1f8191.Ensemble-of-decision-trees.png)

https://serokell.io/blog/random-forest-classification

> Note: For this case we will skip some essential steps like:data cleaning, exploratory analysis, arelevant variables nalysis, among others. The aim is to show the general process to apply the Random Forest algorithm is spark to a data set.

## So, before to start we need to assemble our data...

In [18]:
from pyspark.ml.feature import VectorAssembler

features = df.columns
features.remove("target")

assembler = VectorAssembler(inputCols = features,outputCol='features')

assembled = assembler.transform(df)

#Just random variables and target
data_rv = assembled.select('features','target')

data_rv.limit(3).toPandas()

Unnamed: 0,features,target
0,"[52.0, 1.0, 0.0, 125.0, 212.0, 0.0, 1.0, 168.0...",0
1,"[53.0, 1.0, 0.0, 140.0, 203.0, 1.0, 0.0, 155.0...",0
2,"[70.0, 1.0, 0.0, 145.0, 174.0, 0.0, 1.0, 125.0...",0


## Steps to Run Random Forest

1. Convert any categoric data to numeric
2. Divide data set in two data sets: training and testing 
3. Add any missing `import`
4. Train the model
5. Test the model
6. Check your metrics


# 1. Convert any categoric data to numeric

# 2. Divide data set in two data sets: training and testing 


In [20]:
(trainingData, testData) = data_rv.randomSplit([0.7, 0.3])


# 3. Add any missing `import`
https://github.com/LiberPH/taller_spark/blob/master/random_forest/RandomForest.ipynb

In [21]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 4. Train the model

In [22]:
rf = RandomForestClassifier(labelCol="target", featuresCol="features", numTrees=10)

# Train model.
model = rf.fit(trainingData)

# 5. Test the model


In [23]:
# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "probability", "features").show(5)

+----------+--------------------+--------------------+
|prediction|         probability|            features|
+----------+--------------------+--------------------+
|       0.0|[0.58296473943002...|(13,[0,1,3,4,7,10...|
|       1.0|[0.13834998356016...|(13,[0,1,3,4,7,10...|
|       1.0|[0.13834998356016...|(13,[0,1,3,4,7,10...|
|       1.0|[0.05696006115714...|(13,[0,2,3,4,7,10...|
|       1.0|[0.12727358073759...|(13,[0,2,3,4,7,10...|
+----------+--------------------+--------------------+
only showing top 5 rows



# 6. Take a look at some metrics

In [24]:
# Compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="target", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.0673077


In [25]:
predictions.limit(20).toPandas()


Unnamed: 0,features,target,rawPrediction,probability,prediction
0,"(41.0, 1.0, 0.0, 110.0, 172.0, 0.0, 0.0, 158.0...",0,"[5.82964739430021, 4.17035260569979]","[0.582964739430021, 0.417035260569979]",0.0
1,"(48.0, 1.0, 0.0, 122.0, 222.0, 0.0, 0.0, 186.0...",1,"[1.383499835601649, 8.616500164398351]","[0.1383499835601649, 0.8616500164398351]",1.0
2,"(48.0, 1.0, 0.0, 122.0, 222.0, 0.0, 0.0, 186.0...",1,"[1.383499835601649, 8.616500164398351]","[0.1383499835601649, 0.8616500164398351]",1.0
3,"(53.0, 0.0, 2.0, 128.0, 216.0, 0.0, 0.0, 115.0...",1,"[0.5696006115714188, 9.430399388428581]","[0.05696006115714188, 0.9430399388428581]",1.0
4,"(63.0, 0.0, 2.0, 135.0, 252.0, 0.0, 0.0, 172.0...",1,"[1.2727358073759316, 8.727264192624068]","[0.12727358073759315, 0.8727264192624068]",1.0
5,"(49.0, 0.0, 0.0, 130.0, 269.0, 0.0, 1.0, 163.0...",1,"[0.8321779834464635, 9.167822016553536]","[0.08321779834464635, 0.9167822016553536]",1.0
6,"(62.0, 0.0, 0.0, 124.0, 209.0, 0.0, 1.0, 163.0...",1,"[1.8628797282981366, 8.137120271701864]","[0.18628797282981366, 0.8137120271701864]",1.0
7,"(62.0, 0.0, 0.0, 124.0, 209.0, 0.0, 1.0, 163.0...",1,"[1.8628797282981366, 8.137120271701864]","[0.18628797282981366, 0.8137120271701864]",1.0
8,"(46.0, 0.0, 0.0, 138.0, 243.0, 0.0, 0.0, 152.0...",1,"[3.04495228693082, 6.95504771306918]","[0.304495228693082, 0.695504771306918]",1.0
9,"(62.0, 0.0, 0.0, 140.0, 268.0, 0.0, 0.0, 160.0...",0,"[7.989272727240232, 2.010727272759768]","[0.7989272727240232, 0.20107272727597678]",0.0


# The next step would be to feed the model with real data to test if patients suffer heart disease or not

## Finally, you must stop your spark session before closing 


In [29]:
spark.stop()