# Learning Pyspark - Session 01

In the session, we'll be covering the following topics:

1. What is PySpark?
2. How to check the version of PySpark installed on the local PC?
3. What is SparkContext?
4. What is an RDD in PySpark?

    4.1 How to create a RDD using PySpark?
    
    4.2 How to read a `.txt` file as a PySpark RDD?

5. What is a PySpark DataFrame?

    5.1. How to check the schema?
    
    5.2. How to read a `.csv` file as a PySpark DataFrame?
    
    5.3. How to check the shape of PySpark DataFrame?
    
    5.4. How to see the first 5 rows of a PySpark DataFrame?
    
    5.5. How to see the last 5 rows of a PySpark DataFrame?
    
    5.6. How to subset columns from a PySpark DataFrame?
    
    5.7. How to subset rows from a PySpark DataFrame?
    
    5.8. How to generate summary statistics from a PySpark DataFrame?
    
    5.9. How to add a new column to a PySpark DataFrame?
    
    5.10. How to perform groupby operations on a PySpark DataFrame?

## 1. What is PySpark?

PySpark is an interface for Apache Spark in Python. It allows the user to write Spark applications using Python API and provides features to analyze data in a distributed environment. PySpark supports the following features:

1. Spark SQL
2. DataFrame
3. Streaming
4. MLlib
5. Spark Core

<img src='./images/pyspark-components.png'/>

#### 1. Spark SQL and DataFrame:

Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrame and can also act as distributed SQL query engine.

#### 2. pandas API on Spark:

Pandas API on Spark allows you to scale your pandas workload out. This lets you seemlessly convert a pandas DataFrame to a pyspark DataFrame.

#### 3. Streaming:

Streaming feature in Apache Spark enables the user to analyze streaming data with fault tolerance characterstics.

#### 4. MLlib:


A scalable machine learning library that provies set of APIs that help user create and tune practical machine learning piplelines.

#### 5. Spark Core:

It provides an RDD (Resilient Distributed Dataset) and in-memory computing capabilities.

## 2. How to check the version of PySpark installed on the local PC?

In [1]:
import pyspark
print(pyspark.__version__)

3.1.2


## 3. What is SparkContext?

**Spark Context** is the entry point of the Apache Spark functionality. It enables the user/code to access the Spark cluster with the help of a Resource Manager (YARN/Mesos or Other). It is the first step before you can use Apache Spark and it helps us utilise all of the Spark's capabilities.

<img src='./images/spark_context_01.jpg'/>

Following are the parameters of a SparkContext:

1. **Master** − It is the URL of the cluster it connects to.
2. **appName** − Name of your job.
3. **sparkHome** − Spark installation directory.
4. **pyFiles** − The .zip or .py files to send to the cluster and add to the PYTHONPATH.
5. **Environment** − Worker nodes environment variables.
6. **batchSize** − The number of Python objects represented as a single Java object. Set 1 to disable batching, 0 to automatically choose the batch size based on object sizes, or -1 to **use an unlimited batch size.
7. **Serializer** − RDD serializer.
8. **Conf** − An object of L{SparkConf} to set all the Spark properties.
9. **Gateway** − Use an existing gateway and JVM, otherwise initializing a new JVM.
10. **JSC** − The JavaSparkContext instance.
11. **profiler_cls** − A class of custom Profiler used to do profiling (the default is pyspark.profiler.BasicProfiler).

In [2]:
from pyspark.sql import SparkSession
sc = SparkSession.builder.appName('session 01').getOrCreate()
sc

22/06/30 09:19:02 WARN Utils: Your hostname, amandeep resolves to a loopback address: 127.0.1.1; using 192.168.195.129 instead (on interface ens33)
22/06/30 09:19:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/06/30 09:19:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 4. What is RDD in PySpark?

Apache Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs:

1. Parallelizing an existing collection in the driver program.
2. Referencing a dataaset in an external storage system.

Parallelized collections are created by calling `SparkContext`s `parallelize` method on an existing iterable or collection in the driver program.

In [3]:
shopping_list = sc.sparkContext.parallelize([
    'bread', 'eggs', 'mangoes'
])
print(type(shopping_list))

<class 'pyspark.rdd.RDD'>


The elements ofthe collection are copied to form a distributed dataset that can be operated on in parallel. One important parameter for parallel collections is the number of partitions to the cut the datasets into. Spark will run one task for each partition of the cluster.

In [4]:
shopping_list = sc.sparkContext.parallelize([
    'bread', 'eggs', 'mangoes', 'bananas', 'chicken', 'ketchup'
], 3) # The RDD with the name shopping_list has 3 partitions.
print(shopping_list)

ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:274


## 4.2 How to read a `.txt` file as a PySpark RDD?

The `textFile` method, just like the `parallelize` method takes an optional second argument for controlling the number of partitions of the file. Spark creates one partition for each block of file (Each block is 128 MB by default in HDFS).

In [5]:
text_rdd = sc.sparkContext.textFile('./data/rdd_text.txt')
print(type(text_rdd))

<class 'pyspark.rdd.RDD'>


In [30]:
text_rdd = sc.sparkContext.textFile('./data/rdd_text.txt', 3)
print(text_rdd)
text_rdd.foreach(print)

./data/rdd_text.txt MapPartitionsRDD[127] at textFile at NativeMethodAccessorImpl.java:0


Mankey
Arbok
Weedle
Metapod
Gloom
Geodude
Pikachu
Evee
Snorlax


Spark also lets you read a directory containing multiple small text file as RDDs. This can be done with the help of the `wholeTextFiles` method, which will return each of the files as (filename, file-content) pairs.

In [28]:
rdd_files = sc.sparkContext.wholeTextFiles('./data/text_files_folder')
print(type(rdd_files))
rdd_files.foreach(print)

<class 'pyspark.rdd.RDD'>


('file:/home/amandeep/Projects/Learning-PySpark/data/text_files_folder/metal_bands.txt', 'Pantera\nMetallica\nSlayer\nBlack Sabbath\nDef Leppard\nExodus\nVenom\nIron Maiden\n')
('file:/home/amandeep/Projects/Learning-PySpark/data/text_files_folder/matrix_characters.txt', 'Neo\nMorphious\nTrinity\nAgent Smith\n')
('file:/home/amandeep/Projects/Learning-PySpark/data/text_files_folder/pokemon.txt', 'Mankey\nArbok\nWeedle\nMetapod\nGloom\nGeodude\nPikachu\nEvee\nSnorlax\n')


## 5. What is a PySpark DataFrame?
PySpark DataFrames are lazily evaluated and are implemented on top of RDDs. When Apache Spark transforms data, it does not immediately compute the transform but plans how to perform the computation later. 

A PySpark DataFrame can be created by passing:

1. list of lists
2. tuple/list of tuples/lists
3. pandas DataFrame
4. RDDs
5. pyspark.sql.Rows

In [7]:
from pyspark.sql import Row

df = sc.createDataFrame([
    Row(character='Neo', description='The chosen one.'),
    Row(character='Morpheus', description='The bald guy with cool glasses.'),
    Row(character='Trinity', description='The badass woman character.')
])
df.show()

+---------+--------------------+
|character|         description|
+---------+--------------------+
|      Neo|     The chosen one.|
| Morpheus|The bald guy with...|
|  Trinity|The badass woman ...|
+---------+--------------------+



In [8]:
df = sc.createDataFrame([
    ('Neo', 'The chosen one.'),
    ('Morpheus', 'The bald guy with cool glasses.'),
    ('Trinity', 'The badass woman character.')
], schema='Character String, Description String')
df.show()

+---------+--------------------+
|Character|         Description|
+---------+--------------------+
|      Neo|     The chosen one.|
| Morpheus|The bald guy with...|
|  Trinity|The badass woman ...|
+---------+--------------------+



In [9]:
import pandas as pd
df = pd.DataFrame({
    'Character': ['Neo', 'Morpheus', 'Trinity'],
    'Description': ['The chosen one.', 'The bald guy with cool glasses.', 'The badass woman character.']
})
df = sc.createDataFrame(df)
df.show()

+---------+--------------------+
|Character|         Description|
+---------+--------------------+
|      Neo|     The chosen one.|
| Morpheus|The bald guy with...|
|  Trinity|The badass woman ...|
+---------+--------------------+



In [10]:
rdd = sc.sparkContext.parallelize([
    ('Neo', 'The chosen one.'),
    ('Morpheus', 'The bald guy with cool glasses.'),
    ('Trinity', 'The badass woman character.')
])
df = sc.createDataFrame(rdd, schema='Character String, Description String')
df.show()

+---------+--------------------+
|Character|         Description|
+---------+--------------------+
|      Neo|     The chosen one.|
| Morpheus|The bald guy with...|
|  Trinity|The badass woman ...|
+---------+--------------------+



### 5.1 How to check the schema of a PySpark DataFrame? 

In [11]:
df.printSchema()

root
 |-- Character: string (nullable = true)
 |-- Description: string (nullable = true)



### 5.2 How to read a `.csv` file as a PySpark DataFrame?

In [12]:
iris = sc.read.csv('./data/iris.csv', inferSchema=True, header=True)
iris.show()

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
|  6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|
|  7|          4.6|         3.4|          1.4|         0.3|Iris-setosa|
|  8|          5.0|         3.4|          1.5|         0.2|Iris-setosa|
|  9|          4.4|         2.9|          1.4|         0.2|Iris-setosa|
| 10|          4.9|         3.1|          1.5|         0.1|Iris-setosa|
| 11|          5.4|         3.7|          1.5|         0.2|Iris-

### 5.3 How to check the shape of PySpark DataFrame?

In [13]:
print(iris.count(), len(iris.columns))

150 6


### 5.4 How to see the first 5 rows of a PySpark DataFrame?

In [14]:
iris.head(5)

[Row(Id=1, SepalLengthCm=5.1, SepalWidthCm=3.5, PetalLengthCm=1.4, PetalWidthCm=0.2, Species='Iris-setosa'),
 Row(Id=2, SepalLengthCm=4.9, SepalWidthCm=3.0, PetalLengthCm=1.4, PetalWidthCm=0.2, Species='Iris-setosa'),
 Row(Id=3, SepalLengthCm=4.7, SepalWidthCm=3.2, PetalLengthCm=1.3, PetalWidthCm=0.2, Species='Iris-setosa'),
 Row(Id=4, SepalLengthCm=4.6, SepalWidthCm=3.1, PetalLengthCm=1.5, PetalWidthCm=0.2, Species='Iris-setosa'),
 Row(Id=5, SepalLengthCm=5.0, SepalWidthCm=3.6, PetalLengthCm=1.4, PetalWidthCm=0.2, Species='Iris-setosa')]

### 5.5 How to see the last 5 rows of a PySpark DataFrame?

In [15]:
iris.tail(5)

[Row(Id=146, SepalLengthCm=6.7, SepalWidthCm=3.0, PetalLengthCm=5.2, PetalWidthCm=2.3, Species='Iris-virginica'),
 Row(Id=147, SepalLengthCm=6.3, SepalWidthCm=2.5, PetalLengthCm=5.0, PetalWidthCm=1.9, Species='Iris-virginica'),
 Row(Id=148, SepalLengthCm=6.5, SepalWidthCm=3.0, PetalLengthCm=5.2, PetalWidthCm=2.0, Species='Iris-virginica'),
 Row(Id=149, SepalLengthCm=6.2, SepalWidthCm=3.4, PetalLengthCm=5.4, PetalWidthCm=2.3, Species='Iris-virginica'),
 Row(Id=150, SepalLengthCm=5.9, SepalWidthCm=3.0, PetalLengthCm=5.1, PetalWidthCm=1.8, Species='Iris-virginica')]

### 5.6 How to subset columns from a PySpark DataFrame?

In [16]:
iris.select(['SepalLengthCm', 'Species']).show(n=3)

+-------------+-----------+
|SepalLengthCm|    Species|
+-------------+-----------+
|          5.1|Iris-setosa|
|          4.9|Iris-setosa|
|          4.7|Iris-setosa|
+-------------+-----------+
only showing top 3 rows



### 5.7 How to subset rows from a PySpark DataFrame on the basis of a condition?

In [17]:
print('Rows in the PySpark DataFrame before filtering rows:', iris.count())
filtered_df = iris.filter(iris['Species']=='Iris-virginica')
print('Rows in the PySpark DataFrame after filtering rows:', filtered_df.count())

Rows in the PySpark DataFrame before filtering rows: 150
Rows in the PySpark DataFrame after filtering rows: 50


In [18]:
iris.filter((iris['Species']=='Iris-virginica') & (iris['PetalLengthCm']>5)).show()

+---+-------------+------------+-------------+------------+--------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|       Species|
+---+-------------+------------+-------------+------------+--------------+
|101|          6.3|         3.3|          6.0|         2.5|Iris-virginica|
|102|          5.8|         2.7|          5.1|         1.9|Iris-virginica|
|103|          7.1|         3.0|          5.9|         2.1|Iris-virginica|
|104|          6.3|         2.9|          5.6|         1.8|Iris-virginica|
|105|          6.5|         3.0|          5.8|         2.2|Iris-virginica|
|106|          7.6|         3.0|          6.6|         2.1|Iris-virginica|
|108|          7.3|         2.9|          6.3|         1.8|Iris-virginica|
|109|          6.7|         2.5|          5.8|         1.8|Iris-virginica|
|110|          7.2|         3.6|          6.1|         2.5|Iris-virginica|
|111|          6.5|         3.2|          5.1|         2.0|Iris-virginica|
|112|          6.4|      

In [19]:
iris.filter(((iris['Species']=='Iris-virginica') & (iris['PetalLengthCm']>5)) | (iris['SepalWidthCm']<3)).show()

+---+-------------+------------+-------------+------------+---------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|        Species|
+---+-------------+------------+-------------+------------+---------------+
|  9|          4.4|         2.9|          1.4|         0.2|    Iris-setosa|
| 42|          4.5|         2.3|          1.3|         0.3|    Iris-setosa|
| 54|          5.5|         2.3|          4.0|         1.3|Iris-versicolor|
| 55|          6.5|         2.8|          4.6|         1.5|Iris-versicolor|
| 56|          5.7|         2.8|          4.5|         1.3|Iris-versicolor|
| 58|          4.9|         2.4|          3.3|         1.0|Iris-versicolor|
| 59|          6.6|         2.9|          4.6|         1.3|Iris-versicolor|
| 60|          5.2|         2.7|          3.9|         1.4|Iris-versicolor|
| 61|          5.0|         2.0|          3.5|         1.0|Iris-versicolor|
| 63|          6.0|         2.2|          4.0|         1.0|Iris-versicolor|
| 64|       

### 5.8 How to generate summary statistics from a PySpark DataFrame?

In [20]:
iris.describe().show()

+-------+------------------+------------------+-------------------+------------------+------------------+--------------+
|summary|                Id|     SepalLengthCm|       SepalWidthCm|     PetalLengthCm|      PetalWidthCm|       Species|
+-------+------------------+------------------+-------------------+------------------+------------------+--------------+
|  count|               150|               150|                150|               150|               150|           150|
|   mean|              75.5| 5.843333333333335| 3.0540000000000007|3.7586666666666693|1.1986666666666672|          null|
| stddev|43.445367992456916|0.8280661279778637|0.43359431136217375| 1.764420419952262|0.7631607417008414|          null|
|    min|                 1|               4.3|                2.0|               1.0|               0.1|   Iris-setosa|
|    max|               150|               7.9|                4.4|               6.9|               2.5|Iris-virginica|
+-------+------------------+----

### 5.9 How to add a new column to a PySpark DataFrame?

#### 5.9.1 How to add a new column to a PySpark DataFrame that contains a constant value?

In [21]:
from pyspark.sql.functions import lit
iris_updated = iris.withColumn('Code Writer', lit('Amandeep Singh Khanna'))
iris_updated.show(3)

+---+-------------+------------+-------------+------------+-----------+--------------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|         Code Writer|
+---+-------------+------------+-------------+------------+-----------+--------------------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|Amandeep Singh Kh...|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|Amandeep Singh Kh...|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|Amandeep Singh Kh...|
+---+-------------+------------+-------------+------------+-----------+--------------------+
only showing top 3 rows



In [22]:
iris_updated = iris_updated.withColumn('My Coding Skills', lit(10))
iris_updated.show(3)

+---+-------------+------------+-------------+------------+-----------+--------------------+----------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|         Code Writer|My Coding Skills|
+---+-------------+------------+-------------+------------+-----------+--------------------+----------------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|Amandeep Singh Kh...|              10|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|Amandeep Singh Kh...|              10|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|Amandeep Singh Kh...|              10|
+---+-------------+------------+-------------+------------+-----------+--------------------+----------------+
only showing top 3 rows



#### 5.9.2 How to add a new column to a PySpark DataFrame that is on the basis of a condition?

In [23]:
from pyspark.sql.functions import when, col
iris_updated = iris_updated.withColumn('Conditional Column', when(col('SepalLengthCm')>5.83, 'Above Average Sepal Length').otherwise('Below Average Sepal Length'))
iris_updated.show(3)

+---+-------------+------------+-------------+------------+-----------+--------------------+----------------+--------------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|         Code Writer|My Coding Skills|  Conditional Column|
+---+-------------+------------+-------------+------------+-----------+--------------------+----------------+--------------------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|Amandeep Singh Kh...|              10|Below Average Sep...|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|Amandeep Singh Kh...|              10|Below Average Sep...|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|Amandeep Singh Kh...|              10|Below Average Sep...|
+---+-------------+------------+-------------+------------+-----------+--------------------+----------------+--------------------+
only showing top 3 rows



In [24]:
iris_updated.filter(iris_updated['Conditional Column'] == 'Above Average Sepal Length').show(3)

+---+-------------+------------+-------------+------------+---------------+--------------------+----------------+--------------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|        Species|         Code Writer|My Coding Skills|  Conditional Column|
+---+-------------+------------+-------------+------------+---------------+--------------------+----------------+--------------------+
| 51|          7.0|         3.2|          4.7|         1.4|Iris-versicolor|Amandeep Singh Kh...|              10|Above Average Sep...|
| 52|          6.4|         3.2|          4.5|         1.5|Iris-versicolor|Amandeep Singh Kh...|              10|Above Average Sep...|
| 53|          6.9|         3.1|          4.9|         1.5|Iris-versicolor|Amandeep Singh Kh...|              10|Above Average Sep...|
+---+-------------+------------+-------------+------------+---------------+--------------------+----------------+--------------------+
only showing top 3 rows



### 5.10 How to perform groupby operations of a PySpark DataFrame?

In [25]:
iris.groupBy(['Species']).avg().show()

+---------------+-------+------------------+------------------+------------------+------------------+
|        Species|avg(Id)|avg(SepalLengthCm)| avg(SepalWidthCm)|avg(PetalLengthCm)| avg(PetalWidthCm)|
+---------------+-------+------------------+------------------+------------------+------------------+
| Iris-virginica|  125.5| 6.587999999999998|2.9739999999999998|             5.552|             2.026|
|    Iris-setosa|   25.5| 5.005999999999999|3.4180000000000006|             1.464|0.2439999999999999|
|Iris-versicolor|   75.5|             5.936|2.7700000000000005|              4.26|1.3259999999999998|
+---------------+-------+------------------+------------------+------------------+------------------+



In [24]:
iris.select(['Species', 'SepalLengthCm']).groupby(['Species']).avg().show()

+---------------+------------------+
|        Species|avg(SepalLengthCm)|
+---------------+------------------+
| Iris-virginica| 6.587999999999998|
|    Iris-setosa| 5.005999999999999|
|Iris-versicolor|             5.936|
+---------------+------------------+

