In [7]:
import findspark
findspark.init()

In [8]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [13]:
import pandas as pd
sc = spark.sparkContext

# Make a sample dataframe from Titanic data

In [14]:
data1 = {'PassengerId': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5},
         'Name': {0: 'Owen', 1: 'Florence', 2: 'Laina', 3: 'Lily', 4: 'William'},
         'Sex': {0: 'male', 1: 'female', 2: 'female', 3: 'female', 4: 'male'},
         'Survived': {0: 0, 1: 1, 2: 1, 3: 1, 4: 0}}

data2 = {'PassengerId': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5},
         'Age': {0: 22, 1: 38, 2: 26, 3: 35, 4: 35},
         'Fare': {0: 7.3, 1: 71.3, 2: 7.9, 3: 53.1, 4: 8.0},
         'Pclass': {0: 3, 1: 1, 2: 3, 3: 1, 4: 3}}

df1_pd = pd.DataFrame(data1, columns=data1.keys())
df2_pd = pd.DataFrame(data2, columns=data2.keys())

In [15]:
df1_pd

Unnamed: 0,PassengerId,Name,Sex,Survived
0,1,Owen,male,0
1,2,Florence,female,1
2,3,Laina,female,1
3,4,Lily,female,1
4,5,William,male,0


# Convert pandas dataframe to Spark dataframe


In [16]:
df1 = spark.createDataFrame(df1_pd)
df2 = spark.createDataFrame(df2_pd)
df1.show()

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          1|    Owen|  male|       0|
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
|          5| William|  male|       0|
+-----------+--------+------+--------+



In [17]:
df1.printSchema()

root
 |-- PassengerId: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Survived: long (nullable = true)



Basic dataframe verbs
In R’s dplyr package, Hadley Wickham defined the 5 basic verbs — select, filter, mutate, summarize, and arrange.
Here are the equivalents of the 5 basic verbs for Spark dataframes.

Select
I can select a subset of columns. The method select() takes either a list of column names or an unpacked list of names.

In [18]:
cols1 = ['PassengerId', 'Name']
df1.select(cols1).show()

+-----------+--------+
|PassengerId|    Name|
+-----------+--------+
|          1|    Owen|
|          2|Florence|
|          3|   Laina|
|          4|    Lily|
|          5| William|
+-----------+--------+



Filter
I can filter a subset of rows. The method filter() takes column expressions or SQL expressions. Think of the WHERE clause in SQL queries.

Filter with a column expression

In [19]:
df1.filter(df1.Sex == 'female').show()

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
+-----------+--------+------+--------+



Filter with a SQL expression. Note the double and single quotes as I’m passing a SQL where clause into filter().

In [20]:
df1.filter("Sex='female'").show()

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
+-----------+--------+------+--------+



Mutate, or creating new columns
I can create new columns in Spark using .withColumn(). I have yet found a convenient way to create multiple columns at once without chaining multiple .withColumn() methods

In [21]:
df2.withColumn('AgeTimesFare', df2.Age*df2.Fare).show()

+-----------+---+----+------+------------+
|PassengerId|Age|Fare|Pclass|AgeTimesFare|
+-----------+---+----+------+------------+
|          1| 22| 7.3|     3|       160.6|
|          2| 38|71.3|     1|      2709.4|
|          3| 26| 7.9|     3|       205.4|
|          4| 35|53.1|     1|      1858.5|
|          5| 35| 8.0|     3|       280.0|
+-----------+---+----+------+------------+



Summarize and group by
To summarize or aggregate a dataframe, first I need to convert the dataframe to a GroupedData object with groupby(), then call the aggregate functions.

In [22]:
gdf2 = df2.groupby('Pclass')
gdf2

<pyspark.sql.group.GroupedData at 0x29f76dcb8d0>

I can take the average of columns by passing an unpacked list of column names.

In [24]:
avg_cols = ['Age', 'Fare']
gdf2.avg(*avg_cols).show()

+------+------------------+-----------------+
|Pclass|          avg(Age)|        avg(Fare)|
+------+------------------+-----------------+
|     1|              36.5|             62.2|
|     3|27.666666666666668|7.733333333333333|
+------+------------------+-----------------+



To call multiple aggregation functions at once, pass a dictionary.

In [26]:
gdf2.agg({'*': 'count', 'Age': 'avg', 'Fare':'sum'}).show()

+------+--------+------------------+---------+
|Pclass|count(1)|          avg(Age)|sum(Fare)|
+------+--------+------------------+---------+
|     1|       2|              36.5|    124.4|
|     3|       3|27.666666666666668|     23.2|
+------+--------+------------------+---------+



To rename the columns count(1), avg(Age) etc, use toDF().

In [27]:
(
    gdf2
    .agg({'*': 'count', 'Age': 'avg', 'Fare':'sum'})
    .toDF('Pclass', 'counts', 'average_age', 'total_fare')
    .show()
)

+------+------+------------------+----------+
|Pclass|counts|       average_age|total_fare|
+------+------+------------------+----------+
|     1|     2|              36.5|     124.4|
|     3|     3|27.666666666666668|      23.2|
+------+------+------------------+----------+



Arrange (sort)
Use the .sort() method to sort the dataframes. I haven’t seen a good use case for this in Spark though

In [28]:
df2.sort('Fare', ascending=False).show()

+-----------+---+----+------+
|PassengerId|Age|Fare|Pclass|
+-----------+---+----+------+
|          2| 38|71.3|     1|
|          4| 35|53.1|     1|
|          5| 35| 8.0|     3|
|          3| 26| 7.9|     3|
|          1| 22| 7.3|     3|
+-----------+---+----+------+



Joins and unions
There are two ways to combine dataframes — joins and unions. The idea here is the same as joining and unioning tables in SQL.

Joins
For example, I can join the two titanic dataframes by the column PassengerId

In [29]:
df1.join(df2, ['PassengerId']).show()

+-----------+--------+------+--------+---+----+------+
|PassengerId|    Name|   Sex|Survived|Age|Fare|Pclass|
+-----------+--------+------+--------+---+----+------+
|          5| William|  male|       0| 35| 8.0|     3|
|          1|    Owen|  male|       0| 22| 7.3|     3|
|          3|   Laina|female|       1| 26| 7.9|     3|
|          2|Florence|female|       1| 38|71.3|     1|
|          4|    Lily|female|       1| 35|53.1|     1|
+-----------+--------+------+--------+---+----+------+



I can also join by conditions, but it creates duplicate column names if the keys have the same name, which is frustrating. For now, the only way I know to avoid this is to pass a list of join keys as in the previous cell. If I want to make nonequi joins, then I need to rename the keys before I join.

Nonequi joins 

Here is an example of nonequi join. They can be very slow due to skewed data, but this is one thing that Spark can do that Hive can not.

In [31]:
df1.join(df2, df1.PassengerId <= df2.PassengerId).show() # Note the duplicate col names


+-----------+--------+------+--------+-----------+---+----+------+
|PassengerId|    Name|   Sex|Survived|PassengerId|Age|Fare|Pclass|
+-----------+--------+------+--------+-----------+---+----+------+
|          1|    Owen|  male|       0|          1| 22| 7.3|     3|
|          1|    Owen|  male|       0|          2| 38|71.3|     1|
|          1|    Owen|  male|       0|          3| 26| 7.9|     3|
|          1|    Owen|  male|       0|          4| 35|53.1|     1|
|          1|    Owen|  male|       0|          5| 35| 8.0|     3|
|          2|Florence|female|       1|          2| 38|71.3|     1|
|          2|Florence|female|       1|          3| 26| 7.9|     3|
|          2|Florence|female|       1|          4| 35|53.1|     1|
|          2|Florence|female|       1|          5| 35| 8.0|     3|
|          3|   Laina|female|       1|          3| 26| 7.9|     3|
|          3|   Laina|female|       1|          4| 35|53.1|     1|
|          3|   Laina|female|       1|          5| 35| 8.0|   

Unions
Union() returns a dataframe from the union of two dataframes

In [32]:
df1.union(df1).show()

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          1|    Owen|  male|       0|
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
|          5| William|  male|       0|
|          1|    Owen|  male|       0|
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
|          5| William|  male|       0|
+-----------+--------+------+--------+



Some of my iterative algorithms create chained union() objects. There is a potential catch that the execution plan may grow too long, which cause performance problems or errors.

One common symptom of performance issues caused by chained unions in a for loop is it took longer and longer to iterate through the loop. In this case, repartition() and checkpoint() may help solving this problem.

Temp tables
As an example, I can register the two dataframes as temp tables then join them through spark.sql().

In [33]:
df1.createOrReplaceTempView('df1_temp')
df2.createOrReplaceTempView('df2_temp')

In [35]:
query = '''
    select
        a.PassengerId,
        a.Name,
        a.Sex,
        a.Survived,
        b.Age,
        b.Fare,
        b.Pclass
    from df1_temp a
    join df2_temp b
        on a.PassengerId = b.PassengerId'''
dfj = spark.sql(query)
dfj.show()

+-----------+--------+------+--------+---+----+------+
|PassengerId|    Name|   Sex|Survived|Age|Fare|Pclass|
+-----------+--------+------+--------+---+----+------+
|          5| William|  male|       0| 35| 8.0|     3|
|          1|    Owen|  male|       0| 22| 7.3|     3|
|          3|   Laina|female|       1| 26| 7.9|     3|
|          2|Florence|female|       1| 38|71.3|     1|
|          4|    Lily|female|       1| 35|53.1|     1|
+-----------+--------+------+--------+---+----+------+



In this section, I will go through some idea and useful tools associated with said ideas that I found helpful in tuning performance or debugging dataframes. The first of which is the difference between two types of operations: transformations and actions, and a method explain() that prints out the execution plan of a dataframe

# Explain(), transformations, and actions

When I create a dataframe in PySpark, dataframes are lazy evaluated. What it means is that most operations are transformations that modify the execution plan on how Spark should handle the data, but the plan is not executed unless we call an action.

For example, if I want to join df1 and df2 on the key PassengerId as before

In this case, join() is a transformation that laid out a plan for Spark to join the two dataframes, but it wasn’t executed unless I call an action, such as .count(), that has to go through the actual data defined by df1 and df2 in order to return a Python object (integer).

In [36]:
query = '''
    select
        a.PassengerId,
        a.Name,
        a.Sex,
        a.Survived,
        b.Age,
        b.Fare,
        b.Pclass
    from df1_temp a
    join df2_temp b
        on a.PassengerId = b.PassengerId'''
dfj = spark.sql(query)
dfj.explain()

== Physical Plan ==
*(5) Project [PassengerId#24L, Name#25, Sex#26, Survived#27L, Age#33L, Fare#34, Pclass#35L]
+- *(5) SortMergeJoin [PassengerId#24L], [PassengerId#32L], Inner
   :- *(2) Sort [PassengerId#24L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(PassengerId#24L, 200), true, [id=#427]
   :     +- *(1) Filter isnotnull(PassengerId#24L)
   :        +- *(1) Scan ExistingRDD[PassengerId#24L,Name#25,Sex#26,Survived#27L]
   +- *(4) Sort [PassengerId#32L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(PassengerId#32L, 200), true, [id=#433]
         +- *(3) Filter isnotnull(PassengerId#32L)
            +- *(3) Scan ExistingRDD[PassengerId#32L,Age#33L,Fare#34,Pclass#35L]




In [44]:
dfj.show()

+-----------+--------+------+--------+---+----+------+
|PassengerId|    Name|   Sex|Survived|Age|Fare|Pclass|
+-----------+--------+------+--------+---+----+------+
|          5| William|  male|       0| 35| 8.0|     3|
|          1|    Owen|  male|       0| 22| 7.3|     3|
|          3|   Laina|female|       1| 26| 7.9|     3|
|          2|Florence|female|       1| 38|71.3|     1|
|          4|    Lily|female|       1| 35|53.1|     1|
+-----------+--------+------+--------+---+----+------+



# Data persistence: cache() and checkpoint()

In [37]:
df1.cache()

DataFrame[PassengerId: bigint, Name: string, Sex: string, Survived: bigint]

This also works as .cache() is an inplace method

In [39]:
df1 = df1.cache()

To check if a dataframe is cached, check the storageLevel property.



In [41]:
df1.storageLevel

StorageLevel(True, True, False, True, 1)

To un-cache a dataframe, use unpersist()

In [42]:
df1.unpersist()
df1.storageLevel

StorageLevel(False, False, False, False, 1)

There are 4 caching levels that can be fine-tuned with persist(), but I have not seen a use case where fine tuning has been worthwhile. Refer to the persist()