## Introduction to PySpark

- **Spark** is a unified analytics engine for **large-scale data preprocessing.**
- PySpark is just a **python wrapper around the Spark**.

![PySpark](https://databricks.com/wp-content/uploads/2018/12/PySpark-1024x164.png)

Let's c the Spark Ecosystem by a below example:
![Spark](https://d1m75rqqgidzqn.cloudfront.net/wp-data/2020/08/14133246/Image-recreation-for-articles-A4-06-1-1024x578.jpg)

Key Data Structures in the Spark API:
**RDD (Resilient Distributed dataset) :- Transformations and Actions**

PySpark:

- 1. PySpark.ML (Dataframe, Current)
- 2. PySpark.MLlib (RDD, legacy)

![PySpark Pipeline](https://intellipaat.com/mediaFiles/2019/02/MLlib-algorithms.png)

## Public classes:

- SparkContext
**Main entry point for Spark functionality.**

A SparkContext represents the connection to **a Spark cluster, and can be used to create RDD and broadcast variables on that cluster.**

In [3]:
from pyspark import SparkContext

In [4]:
#create the spark context session (I have 8 cpu in my system, i will use 2 of them.)
sc = SparkContext(master = 'local[2]')

In [5]:
#if you want to see what's going behind the scene, we can use SPARK UI on web to track 
sc

### Basics of PySpark and it's different functions

In [6]:
import pyspark

In [7]:
dir(pyspark)

['Accumulator',
 'AccumulatorParam',
 'BarrierTaskContext',
 'BarrierTaskInfo',
 'BasicProfiler',
 'Broadcast',
 'HiveContext',
 'MarshalSerializer',
 'PickleSerializer',
 'Profiler',
 'RDD',
 'RDDBarrier',
 'Row',
 'SQLContext',
 'SparkConf',
 'SparkContext',
 'SparkFiles',
 'SparkJobInfo',
 'SparkStageInfo',
 'StatusTracker',
 'StorageLevel',
 'TaskContext',
 '_NoValue',
 '__all__',
 '__builtins__',
 '__cached__',
 '__doc__',
 '__file__',
 '__loader__',
 '__name__',
 '__package__',
 '__path__',
 '__spec__',
 '__version__',
 '_globals',
 'accumulators',
 'broadcast',
 'cloudpickle',
 'conf',
 'context',
 'copy_func',
 'files',
 'find_spark_home',
 'heapq3',
 'java_gateway',
 'join',
 'keyword_only',
 'profiler',
 'rdd',
 'rddsampler',
 'resource',
 'resultiterable',
 'serializers',
 'shuffle',
 'since',
 'sql',
 'statcounter',
 'status',
 'storagelevel',
 'taskcontext',
 'traceback_utils',
 'types',
 'util',
 'version',
 'wraps']

#### The entry point to programming Spark with the Dataset and DataFrame API.

class pyspark.sql.SparkSession(sparkContext, jsparkSession=None)

A SparkSession can be used to create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files. To create a SparkSession, use the following builder pattern:

spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [8]:
#Deal with SQL

from pyspark.sql import SparkSession

In [9]:
#create a spark session i.e. the entry point

spark = SparkSession.builder.appName("pyspark basics").getOrCreate()

In [12]:
# read a dataframe in csv without header/schema

data = spark.read.csv("dataset/hepatitis.csv")

In [13]:
#let's view the dataset

data.show(10)

+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
|_c0|   _c1|    _c2|       _c3|    _c4|    _c5|     _c6|      _c7|       _c8|            _c9|   _c10|   _c11|   _c12|     _c13|         _c14|_c15|   _c16|   _c17|     _c18| _c19|
+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
|age|   sex|steroid|antivirals|fatigue|malaise|anorexia|liver_big|liver_firm|spleen_palpable|spiders|ascites|varices|bilirubin|alk_phosphate|sgot|albumin|protime|histology|class|
| 30|  male|  False|     False|  False|  False|   False|    False|     False|          False|  False|  False|  False|        1|           85|  18|      4|   null|    False| live|
| 50|female|  False|     False|   True|  False|   False|    False|     False|          False|  False|  Fa

In [14]:
new_data = spark.read.csv("dataset/hepatitis.csv", header=True)

In [15]:
new_data.show(5)

+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
|age|   sex|steroid|antivirals|fatigue|malaise|anorexia|liver_big|liver_firm|spleen_palpable|spiders|ascites|varices|bilirubin|alk_phosphate|sgot|albumin|protime|histology|class|
+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
| 30|  male|  False|     False|  False|  False|   False|    False|     False|          False|  False|  False|  False|        1|           85|  18|      4|   null|    False| live|
| 50|female|  False|     False|   True|  False|   False|    False|     False|          False|  False|  False|  False|      0.9|          135|  42|    3.5|   null|    False| live|
| 78|female|   True|     False|   True|  False|   False|     True|     False|          False|  False|  Fa

In [16]:
# Get first row
new_data.first()

Row(age='30', sex='male', steroid='False', antivirals='False', fatigue='False', malaise='False', anorexia='False', liver_big='False', liver_firm='False', spleen_palpable='False', spiders='False', ascites='False', varices='False', bilirubin='1', alk_phosphate='85', sgot='18', albumin='4', protime=None, histology='False', class='live')

In [17]:
# get first 5 rows (use head like pandas)

new_data.head(5)

[Row(age='30', sex='male', steroid='False', antivirals='False', fatigue='False', malaise='False', anorexia='False', liver_big='False', liver_firm='False', spleen_palpable='False', spiders='False', ascites='False', varices='False', bilirubin='1', alk_phosphate='85', sgot='18', albumin='4', protime=None, histology='False', class='live'),
 Row(age='50', sex='female', steroid='False', antivirals='False', fatigue='True', malaise='False', anorexia='False', liver_big='False', liver_firm='False', spleen_palpable='False', spiders='False', ascites='False', varices='False', bilirubin='0.9', alk_phosphate='135', sgot='42', albumin='3.5', protime=None, histology='False', class='live'),
 Row(age='78', sex='female', steroid='True', antivirals='False', fatigue='True', malaise='False', anorexia='False', liver_big='True', liver_firm='False', spleen_palpable='False', spiders='False', ascites='False', varices='False', bilirubin='0.7', alk_phosphate='96', sgot='32', albumin='4', protime=None, histology='Fa

In [18]:
# check for all columns names
new_data.columns

['age',
 'sex',
 'steroid',
 'antivirals',
 'fatigue',
 'malaise',
 'anorexia',
 'liver_big',
 'liver_firm',
 'spleen_palpable',
 'spiders',
 'ascites',
 'varices',
 'bilirubin',
 'alk_phosphate',
 'sgot',
 'albumin',
 'protime',
 'histology',
 'class']

In [19]:
#check datatypes
new_data.dtypes

[('age', 'string'),
 ('sex', 'string'),
 ('steroid', 'string'),
 ('antivirals', 'string'),
 ('fatigue', 'string'),
 ('malaise', 'string'),
 ('anorexia', 'string'),
 ('liver_big', 'string'),
 ('liver_firm', 'string'),
 ('spleen_palpable', 'string'),
 ('spiders', 'string'),
 ('ascites', 'string'),
 ('varices', 'string'),
 ('bilirubin', 'string'),
 ('alk_phosphate', 'string'),
 ('sgot', 'string'),
 ('albumin', 'string'),
 ('protime', 'string'),
 ('histology', 'string'),
 ('class', 'string')]

In [20]:
# get the schema
new_data.printSchema()

root
 |-- age: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- steroid: string (nullable = true)
 |-- antivirals: string (nullable = true)
 |-- fatigue: string (nullable = true)
 |-- malaise: string (nullable = true)
 |-- anorexia: string (nullable = true)
 |-- liver_big: string (nullable = true)
 |-- liver_firm: string (nullable = true)
 |-- spleen_palpable: string (nullable = true)
 |-- spiders: string (nullable = true)
 |-- ascites: string (nullable = true)
 |-- varices: string (nullable = true)
 |-- bilirubin: string (nullable = true)
 |-- alk_phosphate: string (nullable = true)
 |-- sgot: string (nullable = true)
 |-- albumin: string (nullable = true)
 |-- protime: string (nullable = true)
 |-- histology: string (nullable = true)
 |-- class: string (nullable = true)



In [21]:
new_data.count() # for rows

155

In [22]:
len(new_data.columns) #for columns

20

In [24]:
print(new_data.count(), len(new_data.columns))

155 20


#### Descriptive summary

In [25]:
new_data.describe().show()

+-------+------------------+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+------------------+------------------+-----------------+------------------+-----------------+---------+-----+
|summary|               age|   sex|steroid|antivirals|fatigue|malaise|anorexia|liver_big|liver_firm|spleen_palpable|spiders|ascites|varices|         bilirubin|     alk_phosphate|             sgot|           albumin|          protime|histology|class|
+-------+------------------+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+------------------+------------------+-----------------+------------------+-----------------+---------+-----+
|  count|               155|   155|    154|       155|    154|    154|     154|      145|       144|            150|    150|    150|    150|               149|               126|              151|               139|               88|      155|  155|


In [27]:
new_data.describe('age').show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|               155|
|   mean|              41.2|
| stddev|12.565878349773197|
|    min|                20|
|    max|                78|
+-------+------------------+



In [28]:
# select a column
new_data.columns

['age',
 'sex',
 'steroid',
 'antivirals',
 'fatigue',
 'malaise',
 'anorexia',
 'liver_big',
 'liver_firm',
 'spleen_palpable',
 'spiders',
 'ascites',
 'varices',
 'bilirubin',
 'alk_phosphate',
 'sgot',
 'albumin',
 'protime',
 'histology',
 'class']

In [29]:
new_data.select('age').show()

+---+
|age|
+---+
| 30|
| 50|
| 78|
| 31|
| 34|
| 34|
| 51|
| 23|
| 39|
| 30|
| 39|
| 32|
| 41|
| 30|
| 47|
| 38|
| 66|
| 40|
| 38|
| 38|
+---+
only showing top 20 rows



In [31]:
# select multiple columns
new_data.select('age', 'sex').show(5)

+---+------+
|age|   sex|
+---+------+
| 30|  male|
| 50|female|
| 78|female|
| 31|female|
| 34|female|
+---+------+
only showing top 5 rows



#### Conditions and Selections

- dataframe.filter
- dataframe.where

In [32]:
new_data.show(10)

+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
|age|   sex|steroid|antivirals|fatigue|malaise|anorexia|liver_big|liver_firm|spleen_palpable|spiders|ascites|varices|bilirubin|alk_phosphate|sgot|albumin|protime|histology|class|
+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
| 30|  male|  False|     False|  False|  False|   False|    False|     False|          False|  False|  False|  False|        1|           85|  18|      4|   null|    False| live|
| 50|female|  False|     False|   True|  False|   False|    False|     False|          False|  False|  False|  False|      0.9|          135|  42|    3.5|   null|    False| live|
| 78|female|   True|     False|   True|  False|   False|     True|     False|          False|  False|  Fa

In [36]:
#select who is only 34 years of age
new_data.filter(new_data['age'] == 34).show()

+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
|age|   sex|steroid|antivirals|fatigue|malaise|anorexia|liver_big|liver_firm|spleen_palpable|spiders|ascites|varices|bilirubin|alk_phosphate|sgot|albumin|protime|histology|class|
+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
| 34|female|   True|     False|  False|  False|   False|     True|     False|          False|  False|  False|  False|        1|         null| 200|      4|   null|    False| live|
| 34|female|   True|     False|  False|  False|   False|     True|     False|          False|  False|  False|  False|      0.9|           95|  28|      4|     75|    False| live|
| 34|female|   True|     False|  False|  False|   False|     True|     False|          False|  False|  Fa

In [37]:
#select who is only 34 years of age
new_data.filter(new_data['age'] == 34).show(3)

+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
|age|   sex|steroid|antivirals|fatigue|malaise|anorexia|liver_big|liver_firm|spleen_palpable|spiders|ascites|varices|bilirubin|alk_phosphate|sgot|albumin|protime|histology|class|
+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
| 34|female|   True|     False|  False|  False|   False|     True|     False|          False|  False|  False|  False|        1|         null| 200|      4|   null|    False| live|
| 34|female|   True|     False|  False|  False|   False|     True|     False|          False|  False|  False|  False|      0.9|           95|  28|      4|     75|    False| live|
| 34|female|   True|     False|  False|  False|   False|     True|     False|          False|  False|  Fa

In [38]:
#second method using where

new_data.where(new_data['sex'] == 'female').show(5)

+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
|age|   sex|steroid|antivirals|fatigue|malaise|anorexia|liver_big|liver_firm|spleen_palpable|spiders|ascites|varices|bilirubin|alk_phosphate|sgot|albumin|protime|histology|class|
+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
| 50|female|  False|     False|   True|  False|   False|    False|     False|          False|  False|  False|  False|      0.9|          135|  42|    3.5|   null|    False| live|
| 78|female|   True|     False|   True|  False|   False|     True|     False|          False|  False|  False|  False|      0.7|           96|  32|      4|   null|    False| live|
| 31|female|   null|      True|  False|  False|   False|     True|     False|          False|  False|  Fa

In [39]:
#second method using where

new_data.where(new_data['sex'] == 'female').select('age', 'sex', 'class').show(5)

+---+------+-----+
|age|   sex|class|
+---+------+-----+
| 50|female| live|
| 78|female| live|
| 31|female| live|
| 34|female| live|
| 34|female| live|
+---+------+-----+
only showing top 5 rows



In [44]:
#how to add a column
new_data.withColumn('age_by_10', new_data['age'] * 5).show(5)

+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+---------+
|age|   sex|steroid|antivirals|fatigue|malaise|anorexia|liver_big|liver_firm|spleen_palpable|spiders|ascites|varices|bilirubin|alk_phosphate|sgot|albumin|protime|histology|class|age_by_10|
+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+---------+
| 30|  male|  False|     False|  False|  False|   False|    False|     False|          False|  False|  False|  False|        1|           85|  18|      4|   null|    False| live|    150.0|
| 50|female|  False|     False|   True|  False|   False|    False|     False|          False|  False|  False|  False|      0.9|          135|  42|    3.5|   null|    False| live|    250.0|
| 78|female|   True|     False|   True|  False|   False

In [43]:
#drop a column
new_data.show(5)

+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
|age|   sex|steroid|antivirals|fatigue|malaise|anorexia|liver_big|liver_firm|spleen_palpable|spiders|ascites|varices|bilirubin|alk_phosphate|sgot|albumin|protime|histology|class|
+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
| 30|  male|  False|     False|  False|  False|   False|    False|     False|          False|  False|  False|  False|        1|           85|  18|      4|   null|    False| live|
| 50|female|  False|     False|   True|  False|   False|    False|     False|          False|  False|  False|  False|      0.9|          135|  42|    3.5|   null|    False| live|
| 78|female|   True|     False|   True|  False|   False|     True|     False|          False|  False|  Fa

In [45]:
final_data = new_data.drop('age_by_10')

In [46]:
final_data.columns

['age',
 'sex',
 'steroid',
 'antivirals',
 'fatigue',
 'malaise',
 'anorexia',
 'liver_big',
 'liver_firm',
 'spleen_palpable',
 'spiders',
 'ascites',
 'varices',
 'bilirubin',
 'alk_phosphate',
 'sgot',
 'albumin',
 'protime',
 'histology',
 'class']