<a href="https://colab.research.google.com/github/foukonana/Diabetes/blob/master/Pyspark_tutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Introduction
\
PySpark is an interface of Apache Spark in Python. It allows one to write Spark application using the Python APIs.
\
Some of the super advantages of PySpark are:
* PySpark is a general-purpose, in-memory, distributed processing engine that allows you to process data efficiently in a **distributed** fashion.
* Applications running on PySpark are **100x faster** than traditional systems.
* Using PySpark we can process data from Hadoop HDFS, AWS S3, and many file systems.

### Architecture  
Apache Spark works in a master-slave architecture where the master is called “Driver” and slaves are called “Workers”. When you run a Spark application, Spark Driver creates a context that is an entry point to your application, and all operations (transformations and actions) are executed on worker nodes, and the resources are managed by Cluster Manager.
\
![architecture](https://i2.wp.com/sparkbyexamples.com/wp-content/uploads/2020/02/spark-cluster-overview.png?w=596&ssl=1)



### The basics  
Every Spark application consists of a _driver program_. This, runs the main function and executes various parallel operations on a cluster.   
We initialize Spark with a **SparkContext** object. SparkContext contains all the accesing to cluster information. Before creating a SparkContext, **SparkConf** object needs to be build - it contains the information about the application.

In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8df35a64/pyspark-3.1.2.tar.gz (212.4MB)
[K     |████████████████████████████████| 212.4MB 63kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 17.6MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=27ef751825a1538240a64fd59b7508158f8c1bc7fe47929f83ca5a50273948e9
  Stored in directory: /root/.cache/pip/wheels/40/1b/2c/30f43be2627857ab80062bef1527c0128f7b4070b6b2d02139
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [2]:
from pyspark import SparkContext, SparkConf

In [3]:
conf = SparkConf()\
        .setAppName('tutorial')\
        .setMaster('local')
sc = SparkContext.getOrCreate(conf=conf)

### PySpark RDDs [Resilient Distributed Datasets]  
An RDD is a fault-tolerant, immuatable collection of elements that can be operated on in parallel. Each RDD is divided into logical partitions, which can be computed on different cluster nodes.  
\
Parallelized collections are created by calling SparkContext _parallelize_ methods on an existing iterable or collections. The elements of the iterable are comptied and form a distributed dataset that can be operated on in parallel. **Partitions** of the dataset can also be defined inside paralelize, otherwise they are set by default to the number of partitions of the cluster. (typically, 2-4 partitions for each CPU in the cluster).  
\
_Spark's power to fast processing of large quantities of data comes from the partitiones. Spark tasks are created and performed on each partition. Thus, each job is operated on a smalled dataset._

In [4]:
data = list(range(100))

data_rdd = sc.parallelize(data)

In [5]:
type(data_rdd)

pyspark.rdd.RDD

PySpark distributed datasets van be created from any Hadoop supported storage (S3, local file system, HDFS etc).  
Text file RDDs are created using **SparkContext.textFile**.
A URI of the local file path or a s3:// can be used.  
The data on the file are not loaded in memory. data_rdd is just a pointer to the file.

In [71]:
file_path = '/content/drive/MyDrive/Colab Notebooks/'
# read the data into 4 partitions
data_rdd = sc.textFile(file_path + 'water_potability.csv', 4)

In [72]:
data_rdd.take(5)

['ph,Hardness,Solids,Chloramines,Sulfate,Conductivity,Organic_carbon,Trihalomethanes,Turbidity,Potability',
 ',204.8904554713363,20791.318980747026,7.300211873184757,368.51644134980336,564.3086541722439,10.3797830780847,86.9909704615088,2.9631353806316407,0',
 '3.71608007538699,129.42292051494425,18630.057857970347,6.635245883862,,592.8853591348523,15.180013116357259,56.32907628451764,4.500656274942408,0',
 '8.099124189298397,224.23625939355776,19909.541732292393,9.275883602694089,,418.6062130644815,16.868636929550973,66.42009251176368,3.0559337496641685,0',
 '8.316765884214679,214.37339408562252,22018.417440775294,8.05933237743854,356.88613564305666,363.2665161642437,18.436524495493302,100.34167436508008,4.628770536837084,0']

### RDD Operations  
There are two types of operations: 
* **transformations** --> create new dataset from existing one
* **actions** --> return a values after running the computation on the dataset  


_Spark transformations are lazy. They are computed only when an action requires a result to be returned._  

By default, each transformed RDD is recumputed everytime we run and action on it. We can use the **persist** method to save the results of RDD evaluation in cache memory for reduction in computations, if we need to access these data for further computations.    
Find examples [here](https://blog.knoldus.com/understanding-persistence-in-apache-spark/).

#### Basic/Most common PySpark transformations and actions
Tranformation category | Transformation      |  Explanation
-----------------------|---------------------|------------------
General | map(func)           | Each element on the RDD where .map is applied, is passed through a function func. Input and output RDD will have the same number of elements.
General | flatMap(func)       | Produces multiple output elements for each input element. _Applies the function to all elements of the RDD and then flattens the results_
General | filter()        | Return RDD with only the elements that satisfy the condition(s).
General | groupByKey(), reduceByKey() | These transformation are applied on a (key, value) pair RDD. Values are grouped by each key in the original RDD. 
Math/ Statistical | sample(withReplacement, fraction) | Sample a fraction _fraction_ of the data with or withour replacement
Set theory/ Relational | union(otherDataset) | Return a new RDD that contains the union of the original RDD and the argument
Set theory/ Relational | distinct()          | Return a new RDD that contains the distinct elements contained in the source dataset
Data Structure/ I/O | coalelse() | ??

\

Action category | Action      |  Explanation
-----------------------|---------------------|------------------
General  | getNumPartitions() |  Returns the number of partions of the RDD
General | reduce() | Aggregate the elements of the dataset using a function func (which takes two arguments and returns one)
General | collect() | Return all the elements of the dataset as an array at the driver 
General | take(n) |  Take and display n sample elements from an RDD
General | first(), last() | Take the first (last) element from an RDD and display it
Math/ Statistical | count() | Count the number of elements in the RDD
Math/ Statistical | min(), max(), sum(), avg(), stdev() | Classic statistic measures

More on flatMap here: [map vs flatMap](https://data-flair.training/blogs/apache-spark-map-vs-flatmap/)

In [73]:
data_rdd.getNumPartitions()

4

In [76]:
import numpy as np 

# map the data to seperate the values of the rows 
data_rdd = data_rdd.map(lambda s: s.split(sep=','))

# use filter method to keep the header row and the data seperately 
header = data_rdd.first()
data_rdd = data_rdd.filter(lambda row: row!=header)

# all values are float, if missing replace with NaN
data_rdd = data_rdd.map(lambda row: [np.float(x) if x!='' else np.nan for x in row])

# persit so that we can access faster without re_calculating all previous steps
data_rdd.persist()

# all previous steps could be in the same pipeline

PythonRDD[146] at RDD at PythonRDD.scala:53

In [77]:
# potability is a feature that shows if the water can be drinked
# it should be binary (values 0 and 1) --> find the values with distinct
potability = data_rdd.map(lambda row: row[-1]).distinct()
# use collect method(action) to have the values returned
potability.collect()

[0.0, 1.0]

In [94]:
# find statistics of water pH(1st column) and chloramines(4th column) per potability value
grouped_rdd = data_rdd.map(lambda row: (row[-1], [row[0], row[3]])).groupByKey()
grouped_rdd.collect()

[(0.0, <pyspark.resultiterable.ResultIterable at 0x7ff3e8345b90>),
 (1.0, <pyspark.resultiterable.ResultIterable at 0x7ff3e819a590>)]

For someone that feels comfortable with incorporating python Pandas functionality in his pipelines, switching to an RDD-heavy logic can be quite frustrating.  
Instead of working with RDDs, PySpark has a DataFrame API.  
An RDD can be converted to a PySpark dataframe with .toDF() method.  

\
Although the actions that a user can do on a Python Pandas and a PySpark DataFrame are very similar, there are a few differences:

Pandas DataFrame | PySpark DataFrame
-----------------|--------------------
 - [x]   | Operation run on parallel on different nodes in the cluster


In [132]:
df = data_rdd.toDF(header)
type(df)

pyspark.sql.dataframe.DataFrame

In [134]:
# datatypes of columns 
df.printSchema()

root
 |-- ph: double (nullable = true)
 |-- Hardness: double (nullable = true)
 |-- Solids: double (nullable = true)
 |-- Chloramines: double (nullable = true)
 |-- Sulfate: double (nullable = true)
 |-- Conductivity: double (nullable = true)
 |-- Organic_carbon: double (nullable = true)
 |-- Trihalomethanes: double (nullable = true)
 |-- Turbidity: double (nullable = true)
 |-- Potability: double (nullable = true)



In [136]:
# the dataset has still the partitions that we defined when we created the pointer with the RDD
df.rdd.getNumPartitions()

4

In [137]:
# show values of selected columns
df.select('ph', 'Sulfate', 'Potability').show(5)

+-----------------+------------------+----------+
|               ph|           Sulfate|Potability|
+-----------------+------------------+----------+
|              NaN|368.51644134980336|       0.0|
| 3.71608007538699|               NaN|       0.0|
|8.099124189298397|               NaN|       0.0|
|8.316765884214679|356.88613564305666|       0.0|
|9.092223456290965|310.13573752420444|       0.0|
+-----------------+------------------+----------+
only showing top 5 rows



In [145]:
# just like pandas, column-wise summary statistics can be accesed with describe or summary methods
df.summary().show()

+-------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+
|summary|               ph|          Hardness|            Solids|       Chloramines|           Sulfate|      Conductivity|    Organic_carbon|   Trihalomethanes|         Turbidity|         Potability|
+-------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+
|  count|             3276|              3276|              3276|              3276|              3276|              3276|              3276|              3276|              3276|               3276|
|   mean|              NaN|196.36949601730146|22014.092526077104| 7.122276793425785|               NaN| 426.2051106825533|14.284970247677315|               NaN| 3.966786169791058| 0.3901098901098901|


In [149]:
# check for null values 
from pyspark.sql.functions import when, count, isnan
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+---+--------+------+-----------+-------+------------+--------------+---------------+---------+----------+
| ph|Hardness|Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
+---+--------+------+-----------+-------+------------+--------------+---------------+---------+----------+
|491|       0|     0|          0|    781|           0|             0|            162|        0|         0|
+---+--------+------+-----------+-------+------------+--------------+---------------+---------+----------+



In [157]:
nrows = df.count()
df.select([(count(when(isnan(c), c))/nrows).alias(c)for c in df.columns]).show()

+-------------------+--------+------+-----------+-------------------+------------+--------------+-------------------+---------+----------+
|                 ph|Hardness|Solids|Chloramines|            Sulfate|Conductivity|Organic_carbon|    Trihalomethanes|Turbidity|Potability|
+-------------------+--------+------+-----------+-------------------+------------+--------------+-------------------+---------+----------+
|0.14987789987789987|     0.0|   0.0|        0.0|0.23840048840048841|         0.0|           0.0|0.04945054945054945|      0.0|       0.0|
+-------------------+--------+------+-----------+-------------------+------------+--------------+-------------------+---------+----------+



In [169]:
from pyspark.sql.functions import mean
df.select('ph').na.drop().select(mean('ph')).show()

+------------------+
|           avg(ph)|
+------------------+
|7.0807945042768345|
+------------------+



In [173]:
df.groupby('potability').agg({'Chloramines': 'mean'}).show()

+----------+-----------------+
|potability| avg(Chloramines)|
+----------+-----------------+
|       0.0|7.092174563443739|
|       1.0|7.169338026214628|
+----------+-----------------+



In [175]:
df.groupby('potability').count().show()

+----------+-----+
|potability|count|
+----------+-----+
|       0.0| 1998|
|       1.0| 1278|
+----------+-----+

