<a href="https://colab.research.google.com/github/hinzle/cognizant/blob/main/2_Advanced_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Dataframe basics for PySpark

**Colab only code**

In [1]:
!pip install pyspark --quiet
!pip install -U -q PyDrive --quiet 
!apt install openjdk-8-jdk-headless &> /dev/null

[K     |████████████████████████████████| 281.3 MB 43 kB/s 
[K     |████████████████████████████████| 199 kB 46.9 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


**Setup Spark Session**

In [2]:
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext

Let's start with a subset of the Titanic data on Kaggle and load it into a pandas dataframe, then convert it into a Spark dataframe.

In [3]:
data1 = {'PassengerId': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5},
         'Name': {0: 'Owen', 1: 'Florence', 2: 'Laina', 3: 'Lily', 4: 'William'},
         'Sex': {0: 'male', 1: 'female', 2: 'female', 3: 'female', 4: 'male'},
         'Survived': {0: 0, 1: 1, 2: 1, 3: 1, 4: 0}}

data2 = {'PassengerId': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5},
         'Age': {0: 22, 1: 38, 2: 26, 3: 35, 4: 35},
         'Fare': {0: 7.3, 1: 71.3, 2: 7.9, 3: 53.1, 4: 8.0},
         'Pclass': {0: 3, 1: 1, 2: 3, 3: 1, 4: 3}}

df1_pd = pd.DataFrame(data1, columns=data1.keys())
df2_pd = pd.DataFrame(data2, columns=data2.keys())

Let's look at our Panda's dataframe: df1_pd contents

In [4]:
df1_pd

Unnamed: 0,PassengerId,Name,Sex,Survived
0,1,Owen,male,0
1,2,Florence,female,1
2,3,Laina,female,1
3,4,Lily,female,1
4,5,William,male,0


Let's look at the other Panda's dataframe 

In [5]:
df2_pd

Unnamed: 0,PassengerId,Age,Fare,Pclass
0,1,22,7.3,3
1,2,38,71.3,1
2,3,26,7.9,3
3,4,35,53.1,1
4,5,35,8.0,3


**We now convert the Panda's dataframe to a Spark dataframe and display its contents**

In [6]:
df1 = spark.createDataFrame(df1_pd)
df2 = spark.createDataFrame(df2_pd)
df1.show()

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          1|    Owen|  male|       0|
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
|          5| William|  male|       0|
+-----------+--------+------+--------+



Let's see the schema

In [7]:
df1.printSchema()

root
 |-- PassengerId: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Survived: long (nullable = true)



# Basic dataframe transformations

![alt text](https://changhsinlee.com/figure/source/2018-03-04-pyspark-dataframe-basics/dataframe-verbs.png)

**Select**

Takes either a list of column names or an unpacked list.

In [8]:
cols1 = ['PassengerId', 'Name']
df1.select(cols1).show()

+-----------+--------+
|PassengerId|    Name|
+-----------+--------+
|          1|    Owen|
|          2|Florence|
|          3|   Laina|
|          4|    Lily|
|          5| William|
+-----------+--------+



**Filter**

Filter with column expression

In [9]:
df1.filter(df1.Sex == 'female').show()

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
+-----------+--------+------+--------+



Filter with SQL expression. Note the double and single quotes

In [10]:
df1.filter("Sex='male'").show()

+-----------+-------+----+--------+
|PassengerId|   Name| Sex|Survived|
+-----------+-------+----+--------+
|          1|   Owen|male|       0|
|          5|William|male|       0|
+-----------+-------+----+--------+



**Mutate, or creating new columns**

Creating new columns in Spark uses `.withColumn()`

In [11]:
df2.withColumn('AgeTimesFare', df2.Age*df2.Fare).show()

+-----------+---+----+------+------------+
|PassengerId|Age|Fare|Pclass|AgeTimesFare|
+-----------+---+----+------+------------+
|          1| 22| 7.3|     3|       160.6|
|          2| 38|71.3|     1|      2709.4|
|          3| 26| 7.9|     3|       205.4|
|          4| 35|53.1|     1|      1858.5|
|          5| 35| 8.0|     3|       280.0|
+-----------+---+----+------+------------+



**Summarize and group by**

To summarize or aggregate a dataframe, first you need to convert the dataframe to a *GroupedData* object with `groupby()`, then call aggregate transformations.

In [12]:
gdf2 = df2.groupby('Pclass')
gdf2

<pyspark.sql.group.GroupedData at 0x7f87d621bc50>

Note: We take the average of more than one column by passing an unpacked list of column names

In [13]:
avg_cols = ['Age', 'Fare']
gdf2.avg(*avg_cols).show() #Note the *, this is unpacking the list!

+------+------------------+-----------------+
|Pclass|          avg(Age)|        avg(Fare)|
+------+------------------+-----------------+
|     1|              36.5|             62.2|
|     3|27.666666666666668|7.733333333333333|
+------+------------------+-----------------+



Note: To call multiple aggregation functions at once, pass a dictionary

In [14]:
gdf2.agg({'*': 'count', 'Age': 'avg', 'Fare':'sum'}).show()

+------+--------+------------------+---------+
|Pclass|count(1)|          avg(Age)|sum(Fare)|
+------+--------+------------------+---------+
|     1|       2|              36.5|    124.4|
|     3|       3|27.666666666666668|     23.2|
+------+--------+------------------+---------+



These column names look ugly (at least to me) let's make them read nicely using `toDF()`

In [15]:
(
  gdf2.agg({'*': 'count', 'Age': 'avg', 'Fare':'sum'})
    .toDF('Pclass', 'counts', 'average_age', 'total_fare')
    .show()
)

+------+------+------------------+----------+
|Pclass|counts|       average_age|total_fare|
+------+------+------------------+----------+
|     1|     2|              36.5|     124.4|
|     3|     3|27.666666666666668|      23.2|
+------+------+------------------+----------+



**Arrange (sort)**

Use the `.sort()` method to sort the dataframes.

In [16]:
df2.sort('Fare', ascending=False).show()

+-----------+---+----+------+
|PassengerId|Age|Fare|Pclass|
+-----------+---+----+------+
|          2| 38|71.3|     1|
|          4| 35|53.1|     1|
|          5| 35| 8.0|     3|
|          3| 26| 7.9|     3|
|          1| 22| 7.3|     3|
+-----------+---+----+------+



**Joins and unions**

There are two ways to combine dataframes --- joins and unions. The idea here is the same as joining and unioning tables in SQL.

**Joins**

Let's Join the two titanic dataframes by the column *PassengerId*

In [17]:
df1.join(df2, ['PassengerId']).show()

+-----------+--------+------+--------+---+----+------+
|PassengerId|    Name|   Sex|Survived|Age|Fare|Pclass|
+-----------+--------+------+--------+---+----+------+
|          1|    Owen|  male|       0| 22| 7.3|     3|
|          2|Florence|female|       1| 38|71.3|     1|
|          3|   Laina|female|       1| 26| 7.9|     3|
|          4|    Lily|female|       1| 35|53.1|     1|
|          5| William|  male|       0| 35| 8.0|     3|
+-----------+--------+------+--------+---+----+------+



**Nonequi joins**

Here is an example of nonequi join. They can be very slow due to skewed data, but this is one operation that Spark can do while Hive can not.

In [18]:
df1.join(df2, df1.PassengerId <= df2.PassengerId).show()

+-----------+--------+------+--------+-----------+---+----+------+
|PassengerId|    Name|   Sex|Survived|PassengerId|Age|Fare|Pclass|
+-----------+--------+------+--------+-----------+---+----+------+
|          1|    Owen|  male|       0|          1| 22| 7.3|     3|
|          1|    Owen|  male|       0|          2| 38|71.3|     1|
|          2|Florence|female|       1|          2| 38|71.3|     1|
|          1|    Owen|  male|       0|          3| 26| 7.9|     3|
|          1|    Owen|  male|       0|          4| 35|53.1|     1|
|          1|    Owen|  male|       0|          5| 35| 8.0|     3|
|          2|Florence|female|       1|          3| 26| 7.9|     3|
|          2|Florence|female|       1|          4| 35|53.1|     1|
|          2|Florence|female|       1|          5| 35| 8.0|     3|
|          3|   Laina|female|       1|          3| 26| 7.9|     3|
|          3|   Laina|female|       1|          4| 35|53.1|     1|
|          3|   Laina|female|       1|          5| 35| 8.0|   

**Unions**

`Union()` returns a dataframe from the union of two dataframes

In [19]:
df1.union(df1).show()

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          1|    Owen|  male|       0|
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
|          5| William|  male|       0|
|          1|    Owen|  male|       0|
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
|          5| William|  male|       0|
+-----------+--------+------+--------+



# Going deeper

**Explain(), transformations, and actions**

When you create a dataframe in PySpark, unlike Python objects, dataframes are lazy evaluated. What it means is that most operations are *transformations* that modify the execution plan on how Spark should handle the data, but the plan is not executed until we call an *action*.

For example, if I want to `join` df1 and df2 on the key *PassengerId* as before:

In [20]:
df1.explain()

== Physical Plan ==
*(1) Scan ExistingRDD[PassengerId#0L,Name#1,Sex#2,Survived#3L]




In [21]:
df2.explain()

== Physical Plan ==
*(1) Scan ExistingRDD[PassengerId#8L,Age#9L,Fare#10,Pclass#11L]




In [22]:
dfj1 = df1.join(df2, ['PassengerId'])
dfj1.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [PassengerId#0L, Name#1, Sex#2, Survived#3L, Age#9L, Fare#10, Pclass#11L]
   +- SortMergeJoin [PassengerId#0L], [PassengerId#8L], Inner
      :- Sort [PassengerId#0L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(PassengerId#0L, 200), ENSURE_REQUIREMENTS, [id=#417]
      :     +- Filter isnotnull(PassengerId#0L)
      :        +- Scan ExistingRDD[PassengerId#0L,Name#1,Sex#2,Survived#3L]
      +- Sort [PassengerId#8L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(PassengerId#8L, 200), ENSURE_REQUIREMENTS, [id=#418]
            +- Filter isnotnull(PassengerId#8L)
               +- Scan ExistingRDD[PassengerId#8L,Age#9L,Fare#10,Pclass#11L]




In this case, `join()` is a transformation that laid out a plan for Spark to join the two dataframes, but it wasn't executed unless you call an action, such as `.count()`, that has to go through the actual data defined by df1 and df2 in order to return a Python object (integer).

In [23]:
dfj1.count()

5

# Data persistence: cache() and checkpoint()

**caching**

Proper caching is the key to high performance Spark. 


A rule of thumb is that: **Cache a dataframe when it is used multiple times in the script.**

Keep in mind that it is only cached after the *first action*. If for whatever reason you want to make sure the data is cached before you save the dataframe, then you have to call an action like `.count()`.

In [24]:
df1.cache()

DataFrame[PassengerId: bigint, Name: string, Sex: string, Survived: bigint]

This also works as .cache() is an inplace method.

In [25]:
df1 = df1.cache()

To check if a dataframe is cached, check the storageLevel property.

In [26]:
df1.storageLevel

StorageLevel(True, True, False, True, 1)

To **un-cache** a dataframe, use `unpersist()`

In [27]:
df1.unpersist()
df1.storageLevel
# [df2, df3, df4, df1]

StorageLevel(False, False, False, False, 1)

**Checkpointing**

`checkpoint()` truncates the execution plan and saves the checkpointed dataframe to a temporary location on the disk.

It is recommended to do caching before checkpointing so Spark doesn't have to read in the dataframe from disk after it's checkpointed.

To use `checkpoint()`, you need to specify the temporary file location to save the datafame to by accessing the sparkContext object from SparkSession.

In [28]:
sc = spark.sparkContext
sc.setCheckpointDir("/checkpointdir") # save to ./checkpointdir

For example, let's join df1 to itself 3 times:

In [29]:
df = df1.join(df1, ['PassengerId'])
df.join(df1, ['PassengerId']).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [PassengerId#0L, Name#1, Sex#2, Survived#3L, Name#396, Sex#397, Survived#398L, Name#407, Sex#408, Survived#409L]
   +- SortMergeJoin [PassengerId#0L], [PassengerId#406L], Inner
      :- Project [PassengerId#0L, Name#1, Sex#2, Survived#3L, Name#396, Sex#397, Survived#398L]
      :  +- SortMergeJoin [PassengerId#0L], [PassengerId#395L], Inner
      :     :- Sort [PassengerId#0L ASC NULLS FIRST], false, 0
      :     :  +- Exchange hashpartitioning(PassengerId#0L, 200), ENSURE_REQUIREMENTS, [id=#663]
      :     :     +- Filter isnotnull(PassengerId#0L)
      :     :        +- Scan ExistingRDD[PassengerId#0L,Name#1,Sex#2,Survived#3L]
      :     +- Sort [PassengerId#395L ASC NULLS FIRST], false, 0
      :        +- Exchange hashpartitioning(PassengerId#395L, 200), ENSURE_REQUIREMENTS, [id=#664]
      :           +- Filter isnotnull(PassengerId#395L)
      :              +- Scan ExistingRDD[PassengerId#395L,Name#396,Sex#397

Let's `checkpoint()` after the first `join` to truncate the plan.

In [30]:
df = df1.join(df1, ['PassengerId']).checkpoint()
df.join(df1, ['PassengerId']).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [PassengerId#0L, Name#1, Sex#2, Survived#3L, Name#421, Sex#422, Survived#423L, Name#439, Sex#440, Survived#441L]
   +- SortMergeJoin [PassengerId#0L], [PassengerId#438L], Inner
      :- Sort [PassengerId#0L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(PassengerId#0L, 200), ENSURE_REQUIREMENTS, [id=#790]
      :     +- Filter isnotnull(PassengerId#0L)
      :        +- Scan ExistingRDD[PassengerId#0L,Name#1,Sex#2,Survived#3L,Name#421,Sex#422,Survived#423L]
      +- Sort [PassengerId#438L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(PassengerId#438L, 200), ENSURE_REQUIREMENTS, [id=#791]
            +- Filter isnotnull(PassengerId#438L)
               +- Scan ExistingRDD[PassengerId#438L,Name#439,Sex#440,Survived#441L]




**Partitions and repartition()**

A common cause of performance problems is having too many partitions. 

To check the number of partitions, use `.rdd.getNumPartitions()`

In [31]:
df1.rdd.getNumPartitions()

2

This dataframe, despite having only 5 rows, has 2 partitions.

This is too many, let's repartition to only 1 partition.

In [32]:
df1_repartitioned = df1.repartition(1)
df1_repartitioned.rdd.getNumPartitions()

1

Sources:
https://changhsinlee.com/pyspark-dataframe-basics/

More about unpacking: 
https://thispointer.com/python-how-to-unpack-list-tuple-or-dictionary-to-function-arguments-using/