## [Spark Tutorial](https://www.youtube.com/watch?v=5RosqOeJrrs)

### Spark Session

In [1]:
# Spark Session
from pyspark.sql import SparkSession

# `local[*]` means use all available cores on the local machine.

spark = (
    SparkSession.builder
    .appName("spark-intro")
    .master("local[*]")
    .getOrCreate()
)

In [2]:
spark

In [3]:
# Print the number of driver cores.

spark.sparkContext.defaultParallelism

24

In [4]:
# Read csv into dataframe

emp = spark.read.csv('data/emp.csv', header=True, inferSchema=True)
emp.show()

print(f"Number of rows in the Employees DataFrame: {emp.count()}")
print(f"Number of partitions in the Employees DataFrame: {emp.rdd.getNumPartitions()}")

+-----------+-------------+-------------+---+------+------+-------------------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|
+-----------+-------------+-------------+---+------+------+-------------------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01 00:00:00|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15 00:00:00|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01 00:00:00|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30 00:00:00|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01 00:00:00|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01 00:00:00|
|          7|          101|James Johnson| 42|  Male| 70000|2012-03-15 00:00:00|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01 00:00:00|
|          9|          103|      Tom Tan| 33|  Male| 58000|2016-06-01 00:00:00|
|         10|          104|     Lisa Lee

In [5]:
# Print the schema of the dataframe

emp.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- hire_date: timestamp (nullable = true)



In [6]:
# Increase the number of partitions to 10
# (use `coalesce()` to decrease the number of partitions)

emp_re = emp.repartition(10)
emp_re.rdd.getNumPartitions()

10

In [7]:
emp_re.show()

print(f"Number of rows in the Employees (Repartitioned) DataFrame: {emp_re.count()}")
print(f"Number of partitions in the Employees (Repartitioned) DataFrame: {emp_re.rdd.getNumPartitions()}")

+-----------+-------------+-------------+---+------+------+-------------------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|
+-----------+-------------+-------------+---+------+------+-------------------+
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15 00:00:00|
|         12|          105|   Susan Chen| 31|Female| 54000|2017-02-15 00:00:00|
|         20|          102|    Grace Kim| 32|Female| 53000|2018-11-01 00:00:00|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01 00:00:00|
|         16|          107|  Kelly Zhang| 30|Female| 49000|2018-04-01 00:00:00|
|         15|          106|  Michael Lee| 37|  Male| 63000|2014-09-30 00:00:00|
|         19|          103|  Steven Chen| 36|  Male| 62000|2015-08-01 00:00:00|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01 00:00:00|
|         17|          105|  George Wang| 34|  Male| 57000|2016-03-15 00:00:00|
|          5|          103|    Jack Chan

In [8]:
# Load another (much larger) csv into a new dataframe and check its properties.

cities = spark.read.csv('data/cities.csv', header=True, inferSchema=True)

# Print the rows and partitions of the dataframe
print(f"Number of rows in the Cities-DataFrame: {cities.count()}")
print(f"Number of partitions in the Cities-DataFrame: {cities.rdd.getNumPartitions()}")

Number of rows in the Cities-DataFrame: 2349391
Number of partitions in the Cities-DataFrame: 19


### Basic Transformations 1

In [9]:
emp.schema

StructType([StructField('employee_id', IntegerType(), True), StructField('department_id', IntegerType(), True), StructField('name', StringType(), True), StructField('age', IntegerType(), True), StructField('gender', StringType(), True), StructField('salary', IntegerType(), True), StructField('hire_date', TimestampType(), True)])

In [10]:
emp.show()

+-----------+-------------+-------------+---+------+------+-------------------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|
+-----------+-------------+-------------+---+------+------+-------------------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01 00:00:00|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15 00:00:00|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01 00:00:00|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30 00:00:00|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01 00:00:00|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01 00:00:00|
|          7|          101|James Johnson| 42|  Male| 70000|2012-03-15 00:00:00|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01 00:00:00|
|          9|          103|      Tom Tan| 33|  Male| 58000|2016-06-01 00:00:00|
|         10|          104|     Lisa Lee

In [None]:
# Creating a manual schema in Spark
from pyspark.sql.types import _parse_datatype_string
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# ! IMPLICT INFERENCE
# Spark can infer the schema from a string
schema_string = "name string, age int"
print(_parse_datatype_string(schema_string))

# ! EXPLICIT INFERENCE
# Template: StructType([StructField(name, dataType, nullable?)]) 
schema_spark = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])
print(schema_spark)

StructType([StructField('name', StringType(), True), StructField('age', IntegerType(), True)])
StructType([StructField('name', StringType(), True), StructField('age', IntegerType(), True)])


In [13]:
# Columns and Expressions
from pyspark.sql.functions import col, expr

# ? col("name") == expr("name"), since both are Column objects and hence treated as same.
# select employee_id, name, age, salary from emp

emp_filtered = emp.select(col("employee_id"), expr("name"), emp.age, emp.salary)    # ! TRANSFORMATION
emp_filtered.show() # ! ACTION

+-----------+-------------+---+------+
|employee_id|         name|age|salary|
+-----------+-------------+---+------+
|          1|     John Doe| 30| 50000|
|          2|   Jane Smith| 25| 45000|
|          3|    Bob Brown| 35| 55000|
|          4|    Alice Lee| 28| 48000|
|          5|    Jack Chan| 40| 60000|
|          6|    Jill Wong| 32| 52000|
|          7|James Johnson| 42| 70000|
|          8|     Kate Kim| 29| 51000|
|          9|      Tom Tan| 33| 58000|
|         10|     Lisa Lee| 27| 47000|
|         11|   David Park| 38| 65000|
|         12|   Susan Chen| 31| 54000|
|         13|    Brian Kim| 45| 75000|
|         14|    Emily Lee| 26| 46000|
|         15|  Michael Lee| 37| 63000|
|         16|  Kelly Zhang| 30| 49000|
|         17|  George Wang| 34| 57000|
|         18|    Nancy Liu| 29| 50000|
|         19|  Steven Chen| 36| 62000|
|         20|    Grace Kim| 32| 53000|
+-----------+-------------+---+------+



In [14]:
emp_casted = emp_filtered.select(expr("employee_id as emp_id"), emp_filtered.name, expr("cast(age as int) as age"), emp_filtered.salary)
emp_casted.show()

+------+-------------+---+------+
|emp_id|         name|age|salary|
+------+-------------+---+------+
|     1|     John Doe| 30| 50000|
|     2|   Jane Smith| 25| 45000|
|     3|    Bob Brown| 35| 55000|
|     4|    Alice Lee| 28| 48000|
|     5|    Jack Chan| 40| 60000|
|     6|    Jill Wong| 32| 52000|
|     7|James Johnson| 42| 70000|
|     8|     Kate Kim| 29| 51000|
|     9|      Tom Tan| 33| 58000|
|    10|     Lisa Lee| 27| 47000|
|    11|   David Park| 38| 65000|
|    12|   Susan Chen| 31| 54000|
|    13|    Brian Kim| 45| 75000|
|    14|    Emily Lee| 26| 46000|
|    15|  Michael Lee| 37| 63000|
|    16|  Kelly Zhang| 30| 49000|
|    17|  George Wang| 34| 57000|
|    18|    Nancy Liu| 29| 50000|
|    19|  Steven Chen| 36| 62000|
|    20|    Grace Kim| 32| 53000|
+------+-------------+---+------+



In [16]:
emp_casted.printSchema()

root
 |-- emp_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)



In [17]:
emp_casted_alt = emp_filtered.selectExpr("employee_id as emp_id", "name", "cast(age as int) as age", "salary")
emp_casted_alt.show()

+------+-------------+---+------+
|emp_id|         name|age|salary|
+------+-------------+---+------+
|     1|     John Doe| 30| 50000|
|     2|   Jane Smith| 25| 45000|
|     3|    Bob Brown| 35| 55000|
|     4|    Alice Lee| 28| 48000|
|     5|    Jack Chan| 40| 60000|
|     6|    Jill Wong| 32| 52000|
|     7|James Johnson| 42| 70000|
|     8|     Kate Kim| 29| 51000|
|     9|      Tom Tan| 33| 58000|
|    10|     Lisa Lee| 27| 47000|
|    11|   David Park| 38| 65000|
|    12|   Susan Chen| 31| 54000|
|    13|    Brian Kim| 45| 75000|
|    14|    Emily Lee| 26| 46000|
|    15|  Michael Lee| 37| 63000|
|    16|  Kelly Zhang| 30| 49000|
|    17|  George Wang| 34| 57000|
|    18|    Nancy Liu| 29| 50000|
|    19|  Steven Chen| 36| 62000|
|    20|    Grace Kim| 32| 53000|
+------+-------------+---+------+



In [18]:
# Filter emp_casted based on Age > 30

emp_casted.select("emp_id", "name", "age", "salary").where("age > 30").show()

+------+-------------+---+------+
|emp_id|         name|age|salary|
+------+-------------+---+------+
|     3|    Bob Brown| 35| 55000|
|     5|    Jack Chan| 40| 60000|
|     6|    Jill Wong| 32| 52000|
|     7|James Johnson| 42| 70000|
|     9|      Tom Tan| 33| 58000|
|    11|   David Park| 38| 65000|
|    12|   Susan Chen| 31| 54000|
|    13|    Brian Kim| 45| 75000|
|    15|  Michael Lee| 37| 63000|
|    17|  George Wang| 34| 57000|
|    19|  Steven Chen| 36| 62000|
|    20|    Grace Kim| 32| 53000|
+------+-------------+---+------+



### Basic Transformations 2

In [21]:
emp.show()

+-----------+-------------+-------------+---+------+------+-------------------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|
+-----------+-------------+-------------+---+------+------+-------------------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01 00:00:00|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15 00:00:00|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01 00:00:00|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30 00:00:00|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01 00:00:00|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01 00:00:00|
|          7|          101|James Johnson| 42|  Male| 70000|2012-03-15 00:00:00|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01 00:00:00|
|          9|          103|      Tom Tan| 33|  Male| 58000|2016-06-01 00:00:00|
|         10|          104|     Lisa Lee

In [23]:
emp.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- hire_date: timestamp (nullable = true)



In [24]:
from pyspark.sql.functions import col, cast

emp.select("employee_id", "name", "age", col("salary").cast("double")).printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: double (nullable = true)



In [26]:
# Adding new columns to the DataFrame

emp_casted = emp.select("employee_id", "name", "age", col("salary").cast("double"))

emp_taxed = emp_casted.withColumn("tax", col("salary") * 0.2)
emp_taxed.show()

+-----------+-------------+---+-------+-------+
|employee_id|         name|age| salary|    tax|
+-----------+-------------+---+-------+-------+
|          1|     John Doe| 30|50000.0|10000.0|
|          2|   Jane Smith| 25|45000.0| 9000.0|
|          3|    Bob Brown| 35|55000.0|11000.0|
|          4|    Alice Lee| 28|48000.0| 9600.0|
|          5|    Jack Chan| 40|60000.0|12000.0|
|          6|    Jill Wong| 32|52000.0|10400.0|
|          7|James Johnson| 42|70000.0|14000.0|
|          8|     Kate Kim| 29|51000.0|10200.0|
|          9|      Tom Tan| 33|58000.0|11600.0|
|         10|     Lisa Lee| 27|47000.0| 9400.0|
|         11|   David Park| 38|65000.0|13000.0|
|         12|   Susan Chen| 31|54000.0|10800.0|
|         13|    Brian Kim| 45|75000.0|15000.0|
|         14|    Emily Lee| 26|46000.0| 9200.0|
|         15|  Michael Lee| 37|63000.0|12600.0|
|         16|  Kelly Zhang| 30|49000.0| 9800.0|
|         17|  George Wang| 34|57000.0|11400.0|
|         18|    Nancy Liu| 29|50000.0|1

In [28]:
# Literals (Adding a constant to the DataFrame)
from pyspark.sql.functions import lit

emp_new_cols = emp_taxed.withColumn("columnOne", lit(1)).withColumn("columnTwo", lit("two"))
emp_new_cols.show()

+-----------+-------------+---+-------+-------+---------+---------+
|employee_id|         name|age| salary|    tax|columnOne|columnTwo|
+-----------+-------------+---+-------+-------+---------+---------+
|          1|     John Doe| 30|50000.0|10000.0|        1|      two|
|          2|   Jane Smith| 25|45000.0| 9000.0|        1|      two|
|          3|    Bob Brown| 35|55000.0|11000.0|        1|      two|
|          4|    Alice Lee| 28|48000.0| 9600.0|        1|      two|
|          5|    Jack Chan| 40|60000.0|12000.0|        1|      two|
|          6|    Jill Wong| 32|52000.0|10400.0|        1|      two|
|          7|James Johnson| 42|70000.0|14000.0|        1|      two|
|          8|     Kate Kim| 29|51000.0|10200.0|        1|      two|
|          9|      Tom Tan| 33|58000.0|11600.0|        1|      two|
|         10|     Lisa Lee| 27|47000.0| 9400.0|        1|      two|
|         11|   David Park| 38|65000.0|13000.0|        1|      two|
|         12|   Susan Chen| 31|54000.0|10800.0| 

In [29]:
emp_new_cols.withColumnRenamed("employee_id", "emp_id").show()

+------+-------------+---+-------+-------+---------+---------+
|emp_id|         name|age| salary|    tax|columnOne|columnTwo|
+------+-------------+---+-------+-------+---------+---------+
|     1|     John Doe| 30|50000.0|10000.0|        1|      two|
|     2|   Jane Smith| 25|45000.0| 9000.0|        1|      two|
|     3|    Bob Brown| 35|55000.0|11000.0|        1|      two|
|     4|    Alice Lee| 28|48000.0| 9600.0|        1|      two|
|     5|    Jack Chan| 40|60000.0|12000.0|        1|      two|
|     6|    Jill Wong| 32|52000.0|10400.0|        1|      two|
|     7|James Johnson| 42|70000.0|14000.0|        1|      two|
|     8|     Kate Kim| 29|51000.0|10200.0|        1|      two|
|     9|      Tom Tan| 33|58000.0|11600.0|        1|      two|
|    10|     Lisa Lee| 27|47000.0| 9400.0|        1|      two|
|    11|   David Park| 38|65000.0|13000.0|        1|      two|
|    12|   Susan Chen| 31|54000.0|10800.0|        1|      two|
|    13|    Brian Kim| 45|75000.0|15000.0|        1|   

In [30]:
# Dropping columns from the DataFrame

emp_new_cols.drop("columnTwo").show()

+-----------+-------------+---+-------+-------+---------+
|employee_id|         name|age| salary|    tax|columnOne|
+-----------+-------------+---+-------+-------+---------+
|          1|     John Doe| 30|50000.0|10000.0|        1|
|          2|   Jane Smith| 25|45000.0| 9000.0|        1|
|          3|    Bob Brown| 35|55000.0|11000.0|        1|
|          4|    Alice Lee| 28|48000.0| 9600.0|        1|
|          5|    Jack Chan| 40|60000.0|12000.0|        1|
|          6|    Jill Wong| 32|52000.0|10400.0|        1|
|          7|James Johnson| 42|70000.0|14000.0|        1|
|          8|     Kate Kim| 29|51000.0|10200.0|        1|
|          9|      Tom Tan| 33|58000.0|11600.0|        1|
|         10|     Lisa Lee| 27|47000.0| 9400.0|        1|
|         11|   David Park| 38|65000.0|13000.0|        1|
|         12|   Susan Chen| 31|54000.0|10800.0|        1|
|         13|    Brian Kim| 45|75000.0|15000.0|        1|
|         14|    Emily Lee| 26|46000.0| 9200.0|        1|
|         15| 

In [31]:
emp_new_cols.show()

+-----------+-------------+---+-------+-------+---------+---------+
|employee_id|         name|age| salary|    tax|columnOne|columnTwo|
+-----------+-------------+---+-------+-------+---------+---------+
|          1|     John Doe| 30|50000.0|10000.0|        1|      two|
|          2|   Jane Smith| 25|45000.0| 9000.0|        1|      two|
|          3|    Bob Brown| 35|55000.0|11000.0|        1|      two|
|          4|    Alice Lee| 28|48000.0| 9600.0|        1|      two|
|          5|    Jack Chan| 40|60000.0|12000.0|        1|      two|
|          6|    Jill Wong| 32|52000.0|10400.0|        1|      two|
|          7|James Johnson| 42|70000.0|14000.0|        1|      two|
|          8|     Kate Kim| 29|51000.0|10200.0|        1|      two|
|          9|      Tom Tan| 33|58000.0|11600.0|        1|      two|
|         10|     Lisa Lee| 27|47000.0| 9400.0|        1|      two|
|         11|   David Park| 38|65000.0|13000.0|        1|      two|
|         12|   Susan Chen| 31|54000.0|10800.0| 

In [33]:
# Filter DataFrame where tax > 10000, along with LIMIT to 5 rows

emp_taxed.where("tax > 10000").limit(5).show()

+-----------+-------------+---+-------+-------+
|employee_id|         name|age| salary|    tax|
+-----------+-------------+---+-------+-------+
|          3|    Bob Brown| 35|55000.0|11000.0|
|          5|    Jack Chan| 40|60000.0|12000.0|
|          6|    Jill Wong| 32|52000.0|10400.0|
|          7|James Johnson| 42|70000.0|14000.0|
|          8|     Kate Kim| 29|51000.0|10200.0|
+-----------+-------------+---+-------+-------+



In [37]:
# Bonus: Adding multiple columns to the dataframe at once

columns = {
    'tax': col('salary') * 0.2,
    'bonus': col('salary') * 0.1
}

emp_casted.withColumns(columns).show()

+-----------+-------------+---+-------+-------+------+
|employee_id|         name|age| salary|    tax| bonus|
+-----------+-------------+---+-------+-------+------+
|          1|     John Doe| 30|50000.0|10000.0|5000.0|
|          2|   Jane Smith| 25|45000.0| 9000.0|4500.0|
|          3|    Bob Brown| 35|55000.0|11000.0|5500.0|
|          4|    Alice Lee| 28|48000.0| 9600.0|4800.0|
|          5|    Jack Chan| 40|60000.0|12000.0|6000.0|
|          6|    Jill Wong| 32|52000.0|10400.0|5200.0|
|          7|James Johnson| 42|70000.0|14000.0|7000.0|
|          8|     Kate Kim| 29|51000.0|10200.0|5100.0|
|          9|      Tom Tan| 33|58000.0|11600.0|5800.0|
|         10|     Lisa Lee| 27|47000.0| 9400.0|4700.0|
|         11|   David Park| 38|65000.0|13000.0|6500.0|
|         12|   Susan Chen| 31|54000.0|10800.0|5400.0|
|         13|    Brian Kim| 45|75000.0|15000.0|7500.0|
|         14|    Emily Lee| 26|46000.0| 9200.0|4600.0|
|         15|  Michael Lee| 37|63000.0|12600.0|6300.0|
|         

### String and Dates

In [38]:
spark = (
    SparkSession
    .builder
    .appName("String & Dates")
    .master("local[*]")
    .getOrCreate()
)

spark

In [45]:
# Add a "Case" column to the DataFrame based on conditions
from pyspark.sql.functions import when, col

emp.withColumn('new_gender', when(col('gender') == 'Male', 'M').when(col('gender') == 'Female', 'F').otherwise(None)).show()

+-----------+-------------+-------------+---+------+------+-------------------+----------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|new_gender|
+-----------+-------------+-------------+---+------+------+-------------------+----------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01 00:00:00|         M|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15 00:00:00|         F|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01 00:00:00|         M|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30 00:00:00|         F|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01 00:00:00|         M|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01 00:00:00|         F|
|          7|          101|James Johnson| 42|  Male| 70000|2012-03-15 00:00:00|         M|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01 00:00:00|         F|

In [46]:
# Replace in Strings

from pyspark.sql.functions import regexp_replace

emp.withColumn("new_name", regexp_replace("name", "J", "Z")).show()

+-----------+-------------+-------------+---+------+------+-------------------+-------------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|     new_name|
+-----------+-------------+-------------+---+------+------+-------------------+-------------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01 00:00:00|     Zohn Doe|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15 00:00:00|   Zane Smith|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01 00:00:00|    Bob Brown|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30 00:00:00|    Alice Lee|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01 00:00:00|    Zack Chan|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01 00:00:00|    Zill Wong|
|          7|          101|James Johnson| 42|  Male| 70000|2012-03-15 00:00:00|Zames Zohnson|
|          8|          102|     Kate Kim| 29|Female| 51000|2

In [53]:
# Convert Timestamp (String) type column to Date type

from pyspark.sql.functions import to_date, col

emp.withColumn("hire_date", to_date(col("hire_date"), 'yyyy-MM-dd')).printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- hire_date: date (nullable = true)



In [55]:
# Add Current Date and Timestamp columns

from pyspark.sql.functions import current_date, current_timestamp

columns = {
    "current_date": current_date(),
    "current_timestamp": current_timestamp()
}

emp.withColumns(columns).show(truncate=False)

+-----------+-------------+-------------+---+------+------+-------------------+------------+--------------------------+
|employee_id|department_id|name         |age|gender|salary|hire_date          |current_date|current_timestamp         |
+-----------+-------------+-------------+---+------+------+-------------------+------------+--------------------------+
|1          |101          |John Doe     |30 |Male  |50000 |2015-01-01 00:00:00|2025-04-30  |2025-04-30 02:52:20.972595|
|2          |101          |Jane Smith   |25 |Female|45000 |2016-02-15 00:00:00|2025-04-30  |2025-04-30 02:52:20.972595|
|3          |102          |Bob Brown    |35 |Male  |55000 |2014-05-01 00:00:00|2025-04-30  |2025-04-30 02:52:20.972595|
|4          |102          |Alice Lee    |28 |Female|48000 |2017-09-30 00:00:00|2025-04-30  |2025-04-30 02:52:20.972595|
|5          |103          |Jack Chan    |40 |Male  |60000 |2013-04-01 00:00:00|2025-04-30  |2025-04-30 02:52:20.972595|
|6          |103          |Jill Wong    

In [59]:
# Drop rows with null values in a column

temp = emp.withColumn("gender", when(col("name") == "Nancy Liu", None).otherwise(col("gender")))
temp.na.drop().show()

del temp

+-----------+-------------+-------------+---+------+------+-------------------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|
+-----------+-------------+-------------+---+------+------+-------------------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01 00:00:00|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15 00:00:00|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01 00:00:00|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30 00:00:00|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01 00:00:00|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01 00:00:00|
|          7|          101|James Johnson| 42|  Male| 70000|2012-03-15 00:00:00|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01 00:00:00|
|          9|          103|      Tom Tan| 33|  Male| 58000|2016-06-01 00:00:00|
|         10|          104|     Lisa Lee

In [60]:
# Fix null values with coalesce
from pyspark.sql.functions import coalesce, lit

temp = emp.withColumn("gender", when(col("name") == "Nancy Liu", None).otherwise(col("gender")))

temp.withColumn("gender", coalesce(col("gender"), lit("Unknown"))).show()

+-----------+-------------+-------------+---+-------+------+-------------------+
|employee_id|department_id|         name|age| gender|salary|          hire_date|
+-----------+-------------+-------------+---+-------+------+-------------------+
|          1|          101|     John Doe| 30|   Male| 50000|2015-01-01 00:00:00|
|          2|          101|   Jane Smith| 25| Female| 45000|2016-02-15 00:00:00|
|          3|          102|    Bob Brown| 35|   Male| 55000|2014-05-01 00:00:00|
|          4|          102|    Alice Lee| 28| Female| 48000|2017-09-30 00:00:00|
|          5|          103|    Jack Chan| 40|   Male| 60000|2013-04-01 00:00:00|
|          6|          103|    Jill Wong| 32| Female| 52000|2018-07-01 00:00:00|
|          7|          101|James Johnson| 42|   Male| 70000|2012-03-15 00:00:00|
|          8|          102|     Kate Kim| 29| Female| 51000|2019-10-01 00:00:00|
|          9|          103|      Tom Tan| 33|   Male| 58000|2016-06-01 00:00:00|
|         10|          104| 

In [61]:
# Convert date/timestamp into string and extract information from it
from pyspark.sql.functions import date_format

emp.withColumn("hire_year", date_format(col("hire_date"), "yyyy")).show()

+-----------+-------------+-------------+---+------+------+-------------------+---------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|hire_year|
+-----------+-------------+-------------+---+------+------+-------------------+---------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01 00:00:00|     2015|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15 00:00:00|     2016|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01 00:00:00|     2014|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30 00:00:00|     2017|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01 00:00:00|     2013|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01 00:00:00|     2018|
|          7|          101|James Johnson| 42|  Male| 70000|2012-03-15 00:00:00|     2012|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01 00:00:00|     2019|
|         

### Sort, Union & Aggregation

In [70]:
# Convert all columns to string type

emp_str = emp.select([col(c).cast("string") for c in emp.columns])
emp_str.printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- hire_date: string (nullable = true)



In [75]:
# Split the dataframe into 2 parts

emp_str1 = emp_str.filter(emp.employee_id < 11)
emp_str2 = emp_str.filter(emp.employee_id > 10)

In [None]:
# Union and Union All (remove duplicates)
# ! The columns must be in the same order and have the same data types
# ? UnionByName can used when the column names are different but data types are same

emp_str2.union(emp_str1).show()

+-----------+-------------+-------------+---+------+------+-------------------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|
+-----------+-------------+-------------+---+------+------+-------------------+
|         11|          104|   David Park| 38|  Male| 65000|2015-11-01 00:00:00|
|         12|          105|   Susan Chen| 31|Female| 54000|2017-02-15 00:00:00|
|         13|          106|    Brian Kim| 45|  Male| 75000|2011-07-01 00:00:00|
|         14|          107|    Emily Lee| 26|Female| 46000|2019-01-01 00:00:00|
|         15|          106|  Michael Lee| 37|  Male| 63000|2014-09-30 00:00:00|
|         16|          107|  Kelly Zhang| 30|Female| 49000|2018-04-01 00:00:00|
|         17|          105|  George Wang| 34|  Male| 57000|2016-03-15 00:00:00|
|         18|          104|    Nancy Liu| 29|Female| 50000|2017-06-01 00:00:00|
|         19|          103|  Steven Chen| 36|  Male| 62000|2015-08-01 00:00:00|
|         20|          102|    Grace Kim

In [85]:
# Sorting the dataframe

from pyspark.sql.functions import asc, desc

emp.orderBy(desc("salary")).show(5)

emp.orderBy(asc("hire_date")).show(5)

+-----------+-------------+-------------+---+------+------+-------------------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|
+-----------+-------------+-------------+---+------+------+-------------------+
|         13|          106|    Brian Kim| 45|  Male| 75000|2011-07-01 00:00:00|
|          7|          101|James Johnson| 42|  Male| 70000|2012-03-15 00:00:00|
|         11|          104|   David Park| 38|  Male| 65000|2015-11-01 00:00:00|
|         15|          106|  Michael Lee| 37|  Male| 63000|2014-09-30 00:00:00|
|         19|          103|  Steven Chen| 36|  Male| 62000|2015-08-01 00:00:00|
+-----------+-------------+-------------+---+------+------+-------------------+
only showing top 5 rows

+-----------+-------------+-------------+---+------+------+-------------------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|
+-----------+-------------+-------------+---+------+------+-------------------+
|         13|  

In [92]:
# Aggregation functions
from pyspark.sql.functions import count, sum, avg, max, min

emp.groupBy("department_id").agg(count("employee_id").alias("dept_count"), sum("salary").alias("dept_pay")).orderBy(desc("dept_pay")).show()


+-------------+----------+--------+
|department_id|dept_count|dept_pay|
+-------------+----------+--------+
|          103|         4|  232000|
|          102|         4|  207000|
|          101|         3|  165000|
|          104|         3|  162000|
|          106|         2|  138000|
|          105|         2|  111000|
|          107|         2|   95000|
+-------------+----------+--------+



In [93]:
emp.groupby("department_id").agg(avg("salary").alias("avg_dept_salary")).where(col("avg_dept_salary") > 50000).show()

+-------------+---------------+
|department_id|avg_dept_salary|
+-------------+---------------+
|          101|        55000.0|
|          103|        58000.0|
|          102|        51750.0|
|          105|        55500.0|
|          106|        69000.0|
|          104|        54000.0|
+-------------+---------------+



### Unique Data and Window

In [95]:
# Get unique data from the dataframe

emp.distinct().show()

+-----------+-------------+-------------+---+------+------+-------------------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|
+-----------+-------------+-------------+---+------+------+-------------------+
|         10|          104|     Lisa Lee| 27|Female| 47000|2018-08-01 00:00:00|
|         11|          104|   David Park| 38|  Male| 65000|2015-11-01 00:00:00|
|         13|          106|    Brian Kim| 45|  Male| 75000|2011-07-01 00:00:00|
|         16|          107|  Kelly Zhang| 30|Female| 49000|2018-04-01 00:00:00|
|         20|          102|    Grace Kim| 32|Female| 53000|2018-11-01 00:00:00|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01 00:00:00|
|         19|          103|  Steven Chen| 36|  Male| 62000|2015-08-01 00:00:00|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01 00:00:00|
|         12|          105|   Susan Chen| 31|Female| 54000|2017-02-15 00:00:00|
|          2|          101|   Jane Smith

In [104]:
emp.select('department_id').distinct().show()

+-------------+
|department_id|
+-------------+
|          101|
|          103|
|          107|
|          102|
|          105|
|          106|
|          104|
+-------------+



In [105]:
# Window functions
# ? Allows to compute values based on a "window" of rows without collapsing them into a single row, unlike groupBy()

from pyspark.sql.window import Window
from pyspark.sql.functions import max, col, desc

window_spec = Window.partitionBy("department_id").orderBy(desc("salary"))
max_func = max(col("salary")).over(window_spec)

emp.withColumn("max_dept_salary", max_func).show()

+-----------+-------------+-------------+---+------+------+-------------------+---------------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|max_dept_salary|
+-----------+-------------+-------------+---+------+------+-------------------+---------------+
|          7|          101|James Johnson| 42|  Male| 70000|2012-03-15 00:00:00|          70000|
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01 00:00:00|          70000|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15 00:00:00|          70000|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01 00:00:00|          55000|
|         20|          102|    Grace Kim| 32|Female| 53000|2018-11-01 00:00:00|          55000|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01 00:00:00|          55000|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30 00:00:00|          55000|
|         19|          103|  Steven Chen

In [107]:
# Get the 2nd highest salary in each department

from pyspark.sql.functions import rank, col

rank_spec = Window.partitionBy("department_id").orderBy(desc("salary"))
rank_func = rank().over(rank_spec)

emp.withColumn("rank", rank_func).filter(col("rank") == 2).show()

+-----------+-------------+-----------+---+------+------+-------------------+----+
|employee_id|department_id|       name|age|gender|salary|          hire_date|rank|
+-----------+-------------+-----------+---+------+------+-------------------+----+
|          1|          101|   John Doe| 30|  Male| 50000|2015-01-01 00:00:00|   2|
|         20|          102|  Grace Kim| 32|Female| 53000|2018-11-01 00:00:00|   2|
|          5|          103|  Jack Chan| 40|  Male| 60000|2013-04-01 00:00:00|   2|
|         18|          104|  Nancy Liu| 29|Female| 50000|2017-06-01 00:00:00|   2|
|         12|          105| Susan Chen| 31|Female| 54000|2017-02-15 00:00:00|   2|
|         15|          106|Michael Lee| 37|  Male| 63000|2014-09-30 00:00:00|   2|
|         14|          107|  Emily Lee| 26|Female| 46000|2019-01-01 00:00:00|   2|
+-----------+-------------+-----------+---+------+------+-------------------+----+



### Joins and Data Partitions

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = (
    SparkSession
    .builder
    .appName("Join & Partition")
    .master("local[*]")
    .getOrCreate()
)

spark

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 36290)
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/local/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/local/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/local/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/spark/python/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/spark/python/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/spark/python/pyspark/accumulators.py", line 257, in accum_updates
    num_updates = read_int(self.rfile)
  File "/spark/python/pyspark/serializers.py", line 595, in re

In [2]:
emp = spark.read.csv("data/emp.csv", header=True, inferSchema=True)
emp = emp.select([col(c).cast("string") for c in emp.columns])

emp.printSchema()
emp.show()

root
 |-- employee_id: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- hire_date: string (nullable = true)

+-----------+-------------+-------------+---+------+------+-------------------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|
+-----------+-------------+-------------+---+------+------+-------------------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01 00:00:00|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15 00:00:00|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01 00:00:00|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30 00:00:00|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01 00:00:00|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01 00:00:00

In [3]:
dept_data = [
    ["101", "Sales", "NYC", "US", "1000000"],
    ["102", "Marketing", "LA", "US", "900000"],
    ["103", "Finance", "London", "UK", "1200000"],
    ["104", "Engineering", "Beijing", "China", "1500000"],
    ["105", "Human Resources", "Tokyo", "Japan", "800000"],
    ["106", "Research and Development", "Perth", "Australia", "1100000"],
    ["107", "Customer Service", "Sydney", "Australia", "950000"]
]

dept_schema = "department_id string, department_name string, city string, country string, budget string"

dept = spark.createDataFrame(data=dept_data, schema=dept_schema)

dept.printSchema()
dept.show()

root
 |-- department_id: string (nullable = true)
 |-- department_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- budget: string (nullable = true)

+-------------+--------------------+-------+---------+-------+
|department_id|     department_name|   city|  country| budget|
+-------------+--------------------+-------+---------+-------+
|          101|               Sales|    NYC|       US|1000000|
|          102|           Marketing|     LA|       US| 900000|
|          103|             Finance| London|       UK|1200000|
|          104|         Engineering|Beijing|    China|1500000|
|          105|     Human Resources|  Tokyo|    Japan| 800000|
|          106|Research and Deve...|  Perth|Australia|1100000|
|          107|    Customer Service| Sydney|Australia| 950000|
+-------------+--------------------+-------+---------+-------+



In [7]:
print(f"Partitions in Employee dataset: {emp.rdd.getNumPartitions()}")
print(f"Partitions in Department dataset: {dept.rdd.getNumPartitions()}")

Partitions in Employee dataset: 1
Partitions in Department dataset: 24


In [8]:
emp.repartition(4).rdd.getNumPartitions()

4

In [9]:
dept.repartition(8).rdd.getNumPartitions()

8

In [10]:
# `coalesce` can help *reduce* the number of partitions without reshuffle (shuffling b/w the executors).
# It also doesn't guarantee uniform data distribution, while repartition does.

emp.coalesce(4).rdd.getNumPartitions()

1

In [13]:
# Repartition the Employee dataframe on/using the department_id column

emp_part = emp.repartition(4, "department_id")
emp_part.withColumn("partition_num", spark_partition_id()).show()

+-----------+-------------+-------------+---+------+------+-------------------+-------------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|partition_num|
+-----------+-------------+-------------+---+------+------+-------------------+-------------+
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01 00:00:00|            0|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30 00:00:00|            0|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01 00:00:00|            0|
|         14|          107|    Emily Lee| 26|Female| 46000|2019-01-01 00:00:00|            0|
|         16|          107|  Kelly Zhang| 30|Female| 49000|2018-04-01 00:00:00|            0|
|         20|          102|    Grace Kim| 32|Female| 53000|2018-11-01 00:00:00|            0|
|         12|          105|   Susan Chen| 31|Female| 54000|2017-02-15 00:00:00|            1|
|         17|          105|  George Wang| 34|  Male| 57000|2

In [None]:
# (Inner) join the Employee and Department dataframes and show a select set of columns

emp.join(dept, how="inner", on=emp.department_id==dept.department_id).select(emp.name, dept.department_id, dept.department_name, emp.salary).show()

+-------------+-------------+--------------------+------+
|         name|department_id|     department_name|salary|
+-------------+-------------+--------------------+------+
|James Johnson|          101|               Sales| 70000|
|   Jane Smith|          101|               Sales| 45000|
|     John Doe|          101|               Sales| 50000|
|    Grace Kim|          102|           Marketing| 53000|
|     Kate Kim|          102|           Marketing| 51000|
|    Alice Lee|          102|           Marketing| 48000|
|    Bob Brown|          102|           Marketing| 55000|
|  Steven Chen|          103|             Finance| 62000|
|      Tom Tan|          103|             Finance| 58000|
|    Jill Wong|          103|             Finance| 52000|
|    Jack Chan|          103|             Finance| 60000|
|    Nancy Liu|          104|         Engineering| 50000|
|   David Park|          104|         Engineering| 65000|
|     Lisa Lee|          104|         Engineering| 47000|
|  George Wang

### Reading from CSV File

In [19]:
df = spark.read.csv("data/emp.csv", header=True, inferSchema=True)

df.printSchema()
df.show()

root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- hire_date: timestamp (nullable = true)

+-----------+-------------+-------------+---+------+------+-------------------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|
+-----------+-------------+-------------+---+------+------+-------------------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01 00:00:00|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15 00:00:00|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01 00:00:00|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30 00:00:00|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01 00:00:00|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01 0

In [23]:
# Proactively passing Spark the dataframe-schema helps optimise the process by preventing 
# Spark from having to read any data and infer the header/schema by itself.
# (No new "Job" was initiated for this transformation)

emp_schema = "employee_id int, department_id int, name string, age int, gender string, salary int, hire_date timestamp"

emp_give_schema = spark.read.csv("data/emp.csv", header=True, schema=emp_schema)

In [24]:
emp_give_schema.show()

+-----------+-------------+-------------+---+------+------+-------------------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|
+-----------+-------------+-------------+---+------+------+-------------------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01 00:00:00|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15 00:00:00|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01 00:00:00|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30 00:00:00|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01 00:00:00|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01 00:00:00|
|          7|          101|James Johnson| 42|  Male| 70000|2012-03-15 00:00:00|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01 00:00:00|
|          9|          103|      Tom Tan| 33|  Male| 58000|2016-06-01 00:00:00|
|         10|          104|     Lisa Lee

In [30]:
# The `mode` attribute (PERMISSIVE, by default) helps with handling "bad" records.

emp_bad_schema = "employee_id int, department_id int, name string, age int, gender string, salary int, hire_date timestamp, _corrupt_record string"

bad_df = spark.read.csv("data/emp_new.csv", header=True, schema=emp_bad_schema)
bad_df.show(truncate=False)

+-----------+-------------+-------------+---+------+------+-------------------+--------------------------------------------+
|employee_id|department_id|name         |age|gender|salary|hire_date          |_corrupt_record                             |
+-----------+-------------+-------------+---+------+------+-------------------+--------------------------------------------+
|1          |101          |John Doe     |30 |Male  |50000 |2015-01-01 00:00:00|null                                        |
|2          |101          |Jane Smith   |25 |Female|45000 |2016-02-15 00:00:00|null                                        |
|3          |102          |Bob Brown    |35 |Male  |55000 |2014-05-01 00:00:00|null                                        |
|4          |102          |Alice Lee    |28 |Female|48000 |2017-09-30 00:00:00|null                                        |
|5          |103          |Jack Chan    |40 |Male  |60000 |2013-04-01 00:00:00|null                                        |


In [None]:

# ? `DROPMALFORMED` mode drops the corrupt rows in the dataframe
# ? `FAILFAST` mode fails as soon as it encounters any corrupt data (Usually used in scenarios involving payments processing)

spark.read.format("csv").option("header", True).option("mode", "DROPMALFORMED").schema(emp_bad_schema).load("data/emp_new.csv").show()

+-----------+-------------+-----------+---+------+------+-------------------+---------------+
|employee_id|department_id|       name|age|gender|salary|          hire_date|_corrupt_record|
+-----------+-------------+-----------+---+------+------+-------------------+---------------+
|          1|          101|   John Doe| 30|  Male| 50000|2015-01-01 00:00:00|           null|
|          2|          101| Jane Smith| 25|Female| 45000|2016-02-15 00:00:00|           null|
|          3|          102|  Bob Brown| 35|  Male| 55000|2014-05-01 00:00:00|           null|
|          4|          102|  Alice Lee| 28|Female| 48000|2017-09-30 00:00:00|           null|
|          5|          103|  Jack Chan| 40|  Male| 60000|2013-04-01 00:00:00|           null|
|          6|          103|  Jill Wong| 32|Female| 52000|2018-07-01 00:00:00|           null|
|          8|          102|   Kate Kim| 29|Female| 51000|2019-10-01 00:00:00|           null|
|          9|          103|    Tom Tan| 33|  Male| 58000|201

In [None]:
# Pass multiple arguments for Option using dict()

_options = {
    "header": True,
    "inferSchema": True,
    "mode": "PERMISSIVE",
}

spark.read.format("csv").options(**_options).load("data/emp.csv").show()

+-----------+-------------+-------------+---+------+------+-------------------+
|employee_id|department_id|         name|age|gender|salary|          hire_date|
+-----------+-------------+-------------+---+------+------+-------------------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01 00:00:00|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15 00:00:00|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01 00:00:00|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30 00:00:00|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01 00:00:00|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01 00:00:00|
|          7|          101|James Johnson| 42|  Male| 70000|2012-03-15 00:00:00|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01 00:00:00|
|          9|          103|      Tom Tan| 33|  Male| 58000|2016-06-01 00:00:00|
|         10|          104|     Lisa Lee

### Reading Complex Data Formats


`Columnar Format` benifits when extracting specific column information since it only needs to read, decompress and process a specific part for the current query. In comparison. `Row Format` needs to read, decompress and process the entire file to determine the result for a particular query.

- Row Format: <pre>A001, Dexter, <b>500</b>, A002, Tom, <b>600</b>, A003, Jerry, <b>1000</b></pre>
- Column Format: <pre>A001, A002, A003, Dexter, Tom, Jerry, <b>`500, 600, 1000`</b></pre>
- Query: Print the salary for all employees
<br>
<br>
<center><img src="images/format-comparison.png"/></center>

In [73]:
sales_pq = spark.read.parquet("data/sales_data.parquet")

sales_pq.printSchema()
sales_pq.show(truncate=False)
sales_pq.count()

root
 |-- transacted_at: timestamp (nullable = true)
 |-- trx_id: integer (nullable = true)
 |-- retailer_id: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- city_id: integer (nullable = true)

+-------------------+----------+-----------+-----------------------------------------------+-------+----------+
|transacted_at      |trx_id    |retailer_id|description                                    |amount |city_id   |
+-------------------+----------+-----------+-----------------------------------------------+-------+----------+
|2017-11-24 19:00:00|1995601912|2077350195 |Walgreen       11-25                           |197.23 |216510442 |
|2017-11-24 19:00:00|1734117021|644879053  |unkn    ppd id: 768641     11-26               |8.58   |930259917 |
|2017-11-24 19:00:00|1734117022|847200066  |Wal-Mart  ppd id: 555914     Algiers    11-26  |1737.26|1646415505|
|2017-11-24 19:00:00|1734117030|1953761884 |Home Depot     ppd id: 265

1102576

In [75]:
sales_orc = spark.read.orc("data/sales_data.orc")

sales_orc.printSchema()
sales_orc.show()
sales_orc.count()

root
 |-- transacted_at: timestamp (nullable = true)
 |-- trx_id: integer (nullable = true)
 |-- retailer_id: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- city_id: integer (nullable = true)

+-------------------+----------+-----------+--------------------+-------+----------+
|      transacted_at|    trx_id|retailer_id|         description| amount|   city_id|
+-------------------+----------+-----------+--------------------+-------+----------+
|2017-11-24 19:00:00|1995601912| 2077350195|Walgreen       11-25| 197.23| 216510442|
|2017-11-24 19:00:00|1734117021|  644879053|unkn    ppd id: 7...|   8.58| 930259917|
|2017-11-24 19:00:00|1734117022|  847200066|Wal-Mart  ppd id:...|1737.26|1646415505|
|2017-11-24 19:00:00|1734117030| 1953761884|Home Depot     pp...|  384.5| 287177635|
|2017-11-24 19:00:00|1734117089| 1898522855| Target        11-25|  66.33|1855530529|
|2017-11-24 19:00:00|1734117117|  997626433|Sears  ppd id: 85.

1102576

In [76]:
# Loading all files in a folder

sales_m_pq = spark.read.parquet('data/sales_total_parquet/*.parquet')
sales_m_pq.count()

1102576

In [79]:
# Initialise a decorator to track operation time

import time

def get_time(func):
    def inner_get_time() -> str:
        st = time.time()
        func()
        return (f"Execution Time: {(time.time() - st) * 1000:.4f} ms")
    print(inner_get_time())

In [80]:
@get_time
def x():
    spark.read.parquet("data/sales_data.parquet").count()

Execution Time: 377.0742 ms


In [83]:
@get_time
def x():
    spark.read.parquet("data/sales_data.parquet").select("trx_id").count()

Execution Time: 215.5883 ms


In [84]:
# Load nested files within subfolders

spark.read.parquet("data/sales_recursive/", recursiveFileLookup=True).show()

+-------------------+----------+-----------+--------------------+------+---------+
|      transacted_at|    trx_id|retailer_id|         description|amount|  city_id|
+-------------------+----------+-----------+--------------------+------+---------+
|2017-11-24 19:00:00|1734117123| 1953761884|unkn   ppd id: 15...| 19.55| 45522086|
|2017-11-24 19:00:00|1734117021|  644879053|unkn    ppd id: 7...|  8.58|930259917|
+-------------------+----------+-----------+--------------------+------+---------+



#### Experiment

Calculate the average amount per retailer and append a column displaying the corresponding value.

In [62]:
sales.groupBy("retailer_id").agg(avg("amount").alias("avg_amount")).orderBy(desc("avg_amount")).show()

+-----------+------------------+
|retailer_id|        avg_amount|
+-----------+------------------+
|  862777075| 400.6277299001569|
| 2092104004| 395.6865486725659|
|  495545430|394.33791799362956|
|  771821475| 393.7823759630201|
|  508452694| 390.6041735537194|
| 1295306792|389.76992566161067|
| 1006678445| 389.4951192145855|
|  316135668|386.18770249924586|
|  860355551|384.89784578313174|
|  162598651| 384.4054847207588|
|  386167994|  383.733645990921|
| 2145070162| 382.9899318729707|
|  304276488|381.42095686663316|
|  270266090| 381.3091858037574|
|  606497335| 380.2884948915997|
| 1720938479| 379.8883515482675|
|  997626433| 379.4370040204126|
| 1445595477| 379.0481333504646|
|  143327090| 379.0115743380849|
|  771066397| 378.9928698752224|
+-----------+------------------+
only showing top 20 rows



In [70]:
from pyspark.sql.window import Window

window = Window.partitionBy("retailer_id")
avg_amount = avg("amount").over(window)

sales.withColumn("retailer_avg_amount", avg_amount).orderBy(asc("transacted_at")).show()

+-------------------+----------+-----------+--------------------+-------+----------+-------------------+
|      transacted_at|    trx_id|retailer_id|         description| amount|   city_id|retailer_avg_amount|
+-------------------+----------+-----------+--------------------+-------+----------+-------------------+
|2017-01-01 19:00:00|1853124918|  582210968|Family Dollar Sto...|  757.1| 637093548| 367.40095518419776|
|2017-01-01 19:00:00|1853101949|  914585647|Whole Foods Marke...|    3.5| 903387909| 356.07879924407587|
|2017-01-01 19:00:00|1853158553|  582210968|Family Dollar Sto...| 1975.2|1912579202| 367.40095518419776|
|2017-01-01 19:00:00|1853160484|  304276488|Belk    arc id: 6...|   3.67| 333864585| 381.42095686663316|
|2017-01-01 19:00:00|1852974205|  386167994|unkn     ppd id: ...|1309.26| 352952442|   383.733645990921|
|2017-01-01 19:00:00|1853035514|  386167994|Wendy's  ppd id: ...| 207.15|1462628288|   383.733645990921|
|2017-01-01 19:00:00|1853039330|  304276488|           

### Read JSON files

In [87]:
# Loading a single line JSON file

spark.read.json("data/order_singleline.json").printSchema()

root
 |-- contact: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_line_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: double (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- qty: long (nullable = true)



In [90]:
# Loading a multi-line JSON file

spark.read.json("data/order_multiline.json", multiLine=True).printSchema()

root
 |-- contact: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_line_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: double (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- qty: long (nullable = true)



In [91]:
# Loading and storing a single-line JSON file

order = spark.read.json("data/order_singleline.json")
order.show()

+--------------------+-----------+--------+--------------------+
|             contact|customer_id|order_id|    order_line_items|
+--------------------+-----------+--------+--------------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|
+--------------------+-----------+--------+--------------------+



In [94]:
# Defining a complex schema for the JSON file

complex_schema = "contact array<string>, customer_id string, order_id string, order_line_items array<struct<amount double, item_id string, qty long>>"

spark.read.json("data/order_singleline.json", schema=complex_schema).printSchema()

root
 |-- contact: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_line_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: double (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- qty: long (nullable = true)



In [118]:
# String to JSON using from_json function, with a schema

json_str = spark.read.text("data/order_singleline.json")
json_str.show()

json_expanded = json_str.withColumn("parsed", from_json(json_str.value, complex_schema))
json_expanded.show()

+--------------------+
|               value|
+--------------------+
|{"order_id":"O101...|
+--------------------+

+--------------------+--------------------+
|               value|              parsed|
+--------------------+--------------------+
|{"order_id":"O101...|{[9000010000, 900...|
+--------------------+--------------------+



In [122]:
# JSON to String using to_json

json_expanded.withColumn("unparsed", to_json(json_expanded.parsed)).select("unparsed").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|unparsed                                                                                                                                                                               |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"contact":["9000010000","9000010001"],"customer_id":"C001","order_id":"O101","order_line_items":[{"amount":102.45,"item_id":"I001","qty":6},{"amount":2.01,"item_id":"I003","qty":2}]}|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



In [None]:
# Extract/Expand struct/dict data in JSON

json_expanded.select("parsed").show(truncate=False)

json_expanded.select("parsed.*").show(truncate=False)

+----------------------------------------------------------------------------+
|parsed                                                                      |
+----------------------------------------------------------------------------+
|{[9000010000, 9000010001], C001, O101, [{102.45, I001, 6}, {2.01, I003, 2}]}|
+----------------------------------------------------------------------------+

+------------------------+-----------+--------+------------------------------------+
|contact                 |customer_id|order_id|order_line_items                    |
+------------------------+-----------+--------+------------------------------------+
|[9000010000, 9000010001]|C001       |O101    |[{102.45, I001, 6}, {2.01, I003, 2}]|
+------------------------+-----------+--------+------------------------------------+



In [141]:
json_expanded.select("parsed.*").withColumn("exploded", explode("order_line_items")).show()

+--------------------+-----------+--------+--------------------+-----------------+
|             contact|customer_id|order_id|    order_line_items|         exploded|
+--------------------+-----------+--------+--------------------+-----------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|{102.45, I001, 6}|
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|  {2.01, I003, 2}|
+--------------------+-----------+--------+--------------------+-----------------+



### Writing Data

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = (
    SparkSession
    .builder
    .appName("Writing Data")
    .master("local[*]")
    .getOrCreate()
)

spark

In [2]:
# Print the available number of cores

spark.sparkContext.defaultParallelism

24

In [3]:
emp = spark.read.csv("data/emp.csv", header=True)

emp.printSchema()
emp.show()

root
 |-- employee_id: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- hire_date: string (nullable = true)

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|       

In [5]:
emp.rdd.getNumPartitions()

1

In [6]:
# Save/Write the data in parquet format

emp.write.parquet("data/output/emp.parquet")

In [7]:
emp.withColumn("partition_id", spark_partition_id()).show()

+-----------+-------------+-------------+---+------+------+----------+------------+
|employee_id|department_id|         name|age|gender|salary| hire_date|partition_id|
+-----------+-------------+-------------+---+------+------+----------+------------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|           0|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|           0|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|           0|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|           0|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|           0|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|           0|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|           0|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|           0|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|      

In [12]:
# Save the file wrt partitions
# ! Write Modes: Append, Overwrite, Ignore, Error

emp.write.format("csv").partitionBy("department_id").option("header", True).save("data/output/emp.csv")

### Understanding Clusters


- Use the docker compose file to setup/deploy required containers
- Execute command from docker container, from the `/spark` dir

    - <pre>./bin/spark-submit --master spark://78962bfc976e:7077 <b>[Replace with master server address]</b> --num-executors 3 --executor-cores 2 </br>--executor-memory 512M /data/12_understand_cluster.py</pre>

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = (
    SparkSession
    .builder
    .appName("Cluster Execution")
    .master("spark://78962bfc976e:7077")
    .config("spark.executor.instances", 4)
    .config("spark.executor.cores", 4)
    .config("spark.exectuor.memory", "512M")
    .getOrCreate()
)

spark.sparkContext.getConf().getAll()

[('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.app.startTime', '1746507577605'),
 ('spark.executor.instances', '4'),
 ('spark.exectuor.memory', '512M'),
 ('spark.app.name', 'Cluster Execution'),
 ('spark.master', 'spark://78962bfc976e:7077'),
 ('spark.app.submitTime', '1

In [6]:
spark.range(10).rdd.getNumPartitions()

48

In [24]:
spark.stop()

### User-Defined Functions (UDF)

- Extremely slow and expensive since python requires Serializing/De-Serailizing data.
- Can be mitigated by:
    - Using in-built higher-order functions
    - Writing the functions in Java or Scala

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = (
    SparkSession
    .builder
    .appName("UDF")
    .master("spark://eb22766c6b88:7077")
    .config("spark.executor.cores", 2)
    .config("spark.cores.max", 6)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

In [8]:
emp = spark.read.csv("/data/emp.csv", header=True)

emp.show()
emp.rdd.getNumPartitions()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|        011|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

1

In [9]:
# UDF

def bonus(salary):
    return int(salary) * 0.1

bonus_udf = udf(bonus)

In [12]:

# ! Doesn't run, config error
# emp.withColumn('bonus', bonus_udf("salary")).show()

In [14]:
# spark.sparkContext.getConf().getAll()
spark.stop()

### DAG Plan

In [47]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = (
    SparkSession
    .builder
    .appName("DAG Plan")
    .master("local[*]")
    .getOrCreate()
)

spark

In [48]:
# Disabling Spark features that help with Optimisation

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [49]:
spark.sparkContext.defaultParallelism

24

In [50]:
df1, df2 = spark.range(4, 200, 2), spark.range(2, 200, 4)
df3, df4 = df1.repartition(5), df2.repartition(7)

In [51]:
# df1.rdd.getNumPartitions(), df2.rdd.getNumPartitions(), df3.rdd.getNumPartitions(), df4.rdd.getNumPartitions()

In [52]:
df34 = df3.join(df4, on='id')
df34_sum = df34.selectExpr("sum(id) as total_sum")

In [53]:
df34_sum.show()

+---------+
|total_sum|
+---------+
|     4998|
+---------+



<center><img src='images/DAG.png/' width=1200></center>

<b>JOB 0 & 1</b>
*spark.range*
- 1 stage each
- 24 cores = 24 tasks

<b>JOB 2 & 3</b>
*Repartition (involves reshuffle)*
- 1 stage each
- 5/7 partitions = 5/7 tasks

<b>JOB 4</b>
*Join (involves shuffle)*
- 1 stage
- defualt spark shuffle is 200 partitions = 200 tasks

<b>JOB 5</b>
*Sum*
- 1 stage
- sum to a single value = 1 task

In [54]:
df34_sum.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[sum(id#384L)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#1296]
      +- HashAggregate(keys=[], functions=[partial_sum(id#384L)])
         +- Project [id#384L]
            +- SortMergeJoin [id#384L], [id#386L], Inner
               :- Sort [id#384L ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(id#384L, 200), ENSURE_REQUIREMENTS, [id=#1288]
               :     +- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [id=#1279]
               :        +- Range (4, 200, step=2, splits=24)
               +- Sort [id#386L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(id#386L, 200), ENSURE_REQUIREMENTS, [id=#1289]
                     +- Exchange RoundRobinPartitioning(7), REPARTITION_BY_NUM, [id=#1281]
                        +- Range (2, 200, step=4, splits=24)




In [46]:
spark.stop()

### Optimising Shuffle

- PySpark can perform multiple *narrow-transformations* together by creating a pipeline.
- If a reshuffle is required, which is a *wide-transformation*, it will create 2 different pipleline (and hence 2 seperate stages).
- In the midst of shuffling and switching pipelines, the files are written in the **Tungsten Binary Format (Unsafe row)** which can be directly read in disk-memory, thus imporving the read performance.
- Since shuffling requires disk I/O access along with Network, it is a costly operation and should be avoided as much as possible. But it is also necssary for *narrow-transformations*.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = (
    SparkSession
    .builder
    .appName("Optimising Shuffle")
    .master("spark://eb22766c6b88:7077")
    .config("spark.cores.max", 16)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

In [2]:
spark.sparkContext.defaultParallelism

16

In [3]:
# Disabling Spark features that help with Optimisation (Adaptive Query Engine)

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [5]:
_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

empRec = spark.read.csv("/data/employee_records.csv", header=True, schema=_schema)

In [6]:
# Compute average salary per department

emp_avg = empRec.groupBy("department_id").agg(avg("salary").alias("avg_sal"))

In [8]:
# Writing data for benchmarking (noop)

emp_avg.write.format("noop").mode("overwrite").save()

In [9]:
# Check the Spark Shuffle Default Partitions

spark.conf.get("spark.sql.shuffle.partitions")

'200'

In [10]:
# There are many partitions that are doing nothing, and increasing the computing overhead. We can optimise this by reducing the default shuffle partitions

spark.conf.set("spark.sql.shuffle.partitions", 16)

In [11]:
spark.conf.get("spark.sql.shuffle.partitions")

'16'

In [12]:
emp_avg.write.format("noop").mode("overwrite").save()

In [13]:
# Read partitioned data (benefits in computation around shuffle)

empPart = spark.read.csv("/data/emp.csv/", header=True, schema=_schema)

In [15]:
emp_avg = empPart.groupby("department_id").agg(avg("salary")).alias("avg_sal")
emp_avg.write.format("noop").mode("overwrite").save()

In [16]:
spark.stop()

### Spark Caching Techniques

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = (
    SparkSession
    .builder
    .appName("Spark Caching")
    .master("local[*]")
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

In [2]:
_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"

sales = spark.read.csv("/data/new_sales.csv", header=True, schema=_schema)

In [3]:
sales.where("amount > 300").show()

+--------------------+----------+-----------+--------------------+-------+----------+
|       transacted_at|    trx_id|retailer_id|         description| amount|   city_id|
+--------------------+----------+-----------+--------------------+-------+----------+
|2017-11-24T19:00:...|1734117022|  847200066|Wal-Mart  ppd id:...|1737.26|1646415505|
|2017-11-24T19:00:...|1734117030| 1953761884|Home Depot     pp...|  384.5| 287177635|
|2017-11-24T19:00:...|1734117153|  847200066|unkn        Kings...|2907.57|1483931123|
|2017-11-24T19:00:...|1734117241|  486576507|              iTunes|2912.67|1663872965|
|2017-11-24T19:00:...|2076947146|  511877722|unkn     ccd id: ...|1915.35|1698762556|
|2017-11-24T19:00:...|2076947113| 1996661856|AutoZone  arc id:...| 1523.6|1759612211|
|2017-11-24T19:00:...|2076946994| 1898522855|Target    ppd id:...|2589.93|2074005445|
|2017-11-24T19:00:...|2076946121|  562903918|unkn    ccd id: 5...| 315.86|1773943669|
|2017-11-24T19:00:...|2076946063| 1070485878|Amazon.co

In [4]:
# Cache DataFrame (default storage level: MEMORY_AND_DISK)
# NOTE: For Cachine, we need an *Action* to be executed and Count/Write are preferred since they scan the whole dataset.

sales.cache().count()   # Caches data in de-serialised form

7202569

In [5]:
sales.where("amount > 300").show()

+--------------------+----------+-----------+--------------------+-------+----------+
|       transacted_at|    trx_id|retailer_id|         description| amount|   city_id|
+--------------------+----------+-----------+--------------------+-------+----------+
|2017-11-24T19:00:...|1734117022|  847200066|Wal-Mart  ppd id:...|1737.26|1646415505|
|2017-11-24T19:00:...|1734117030| 1953761884|Home Depot     pp...|  384.5| 287177635|
|2017-11-24T19:00:...|1734117153|  847200066|unkn        Kings...|2907.57|1483931123|
|2017-11-24T19:00:...|1734117241|  486576507|              iTunes|2912.67|1663872965|
|2017-11-24T19:00:...|2076947146|  511877722|unkn     ccd id: ...|1915.35|1698762556|
|2017-11-24T19:00:...|2076947113| 1996661856|AutoZone  arc id:...| 1523.6|1759612211|
|2017-11-24T19:00:...|2076946994| 1898522855|Target    ppd id:...|2589.93|2074005445|
|2017-11-24T19:00:...|2076946121|  562903918|unkn    ccd id: 5...| 315.86|1773943669|
|2017-11-24T19:00:...|2076946063| 1070485878|Amazon.co

In [6]:
# Remove Cache

sales.unpersist()
# spark.catalog.clearCache()  # Clears all session cache

DataFrame[transacted_at: string, trx_id: string, retailer_id: string, description: string, amount: double, city_id: string]

In [7]:
# To use other caching storage levels, we use `persist`. (eg: MEMORY_ONLY, DISK_ONLY, etc.)
import pyspark

sales_persist = sales.persist(pyspark.StorageLevel.MEMORY_ONLY)
sales_persist.write.format("noop").mode("overwrite").save()   # Caches data in Serialised form

In [8]:
spark.stop()

### Distributed Shared Variables

#### Broadcast

In [13]:
spark = (
    SparkSession
    .builder
    .appName("Distributed Shared Variables")
    .master("spark://eb22766c6b88:7077")
    .config("spark.cores.max", 16)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)


spark

In [14]:
_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.csv("/data/employee_records.csv", header=True, schema=_schema)

What if we want to match deparment IDs to department names.
- Creating a new dataframe and using `Join` to assign the department names can Shuffle the data, if not optimised correctly, which is undesired.
- Using UDF or a map operation. But when we create a variable, it will be Serialised along with the task. Hence, also de-serialised everytime the task is de-serialised which will create an unnecessary overhead.

Solution: *Broadcast Variable*
- The variable is sent and cached individually at each executor. Will prevent shuffling between themselves.

In [15]:
# (Lookup) Variable

dept_names = {
    1: 'Department 1',
    2: 'Department 2',
    3: 'Department 3',
    4: 'Department 4',
    5: 'Department 5',
    6: 'Department 6',
    7: 'Department 7',
    8: 'Department 8',
    9: 'Department 9',
    10: 'Department 10',
}

In [16]:
# Broadcast the Variable

broadcastDeptNames = spark.sparkContext.broadcast(dept_names)

In [17]:
type(broadcastDeptNames)

pyspark.broadcast.Broadcast

In [18]:
broadcastDeptNames.value

{1: 'Department 1',
 2: 'Department 2',
 3: 'Department 3',
 4: 'Department 4',
 5: 'Department 5',
 6: 'Department 6',
 7: 'Department 7',
 8: 'Department 8',
 9: 'Department 9',
 10: 'Department 10'}

In [19]:
# Create UDF to return Department name

@udf
def get_dept_name(dept_id):
    return broadcastDeptNames.value.get(dept_id)

In [None]:
emp.withColumn("deptName", get_dept_name(col("department_id"))).show()

#### Accumulator

In [22]:
# Calculate total salary of Dept-6
# Remember: The data is partitioned between all executors, hence any sort of "aggregation" would require collection and hence, shuffling.
# The Accumulator helps avoid that.

emp.where("department_id = 6").groupBy("department_id").agg(sum("salary").cast("long")).show()

+-------------+---------------------------+
|department_id|CAST(sum(salary) AS BIGINT)|
+-------------+---------------------------+
|            6|                50294510721|
+-------------+---------------------------+



In [23]:
dept_sal = spark.sparkContext.accumulator(0)

In [27]:
# Use foreach (allows us to go through each record of that dataframe, row by row)

def calculate_salary(dept_id, salary):
    if dept_id == 6:
        dept_salary.add(salary)

emp.foreach(lambda row: calculate_salary(row.department_id, row.salary))    # Still not working
dept_sal.value

0

In [28]:
spark.stop()

### Optimising Joins

For joining 2 tables:
1. Spark **reads** and distributes the data between its executors.
2. Then **shuffles** the data to bring similar IDs into the same executor.
3. Finally, performs **join** between the rows with similar IDs (or any other primary key).

Join Strategies
1. Shuffle Hash: Reliable when smaller dataset can fit in the memory.
    - Shuffle the data.
    - Hash the "smaller" dataset.
    - Hashed dataset is matched with the "bigger" dataset.
    - Join the datasets.

2. Sort Merge: Useful when we need to join two BIG datasets
    - Shuffle the datasets.
    - Sort the primary keys
    - Merge the datasets

3. Broadcast Join: Most efficient for small dataset (default size of 10MB, can be incresed to 8G)
    - Broadcast the smaller dataset to all executors.
    - Join the datasets with the help of hashing.

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = (
    SparkSession
    .builder
    .appName("Optimising Joins")
    .master("spark://eb22766c6b88:7077")
    .config("spark.cores.max", 16)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)


spark

In [11]:
# Disable AQE and Broadcast Join

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

#### Join Big and Small table - SortMerge vs BroadCast Join

In [12]:
_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.csv("/data/employee_records.csv", header=True, schema=_schema)

In [13]:
_dept_schema = "department_id int, department_name string, description string, city string, state string, country string"

dept = spark.read.csv("/data/department_data.csv", header=True, schema=_dept_schema)

In [14]:
df_joined = emp.join(dept, on=emp.department_id==dept.department_id, how="left_outer")
df_joined.write.format("noop").mode("overwrite").save()

In [15]:
df_joined.explain()

== Physical Plan ==
*(4) SortMergeJoin [department_id#135], [department_id#144], LeftOuter
:- *(1) Sort [department_id#135 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(department_id#135, 200), ENSURE_REQUIREMENTS, [id=#123]
:     +- FileScan csv [first_name#128,last_name#129,job_title#130,dob#131,email#132,phone#133,salary#134,department_id#135] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/data/employee_records.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<first_name:string,last_name:string,job_title:string,dob:string,email:string,phone:string,s...
+- *(3) Sort [department_id#144 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(department_id#144, 200), ENSURE_REQUIREMENTS, [id=#135]
      +- *(2) Filter isnotnull(department_id#144)
         +- FileScan csv [department_id#144,department_name#145,description#146,city#147,state#148,country#149] Batched: false, DataFilters: [isnotnull(department_id#144)

In [16]:
# Join using broadcast. Saves the need to shuffle the data

df_joined = emp.join(broadcast(dept), on=emp.department_id==dept.department_id, how="left_outer")
df_joined.write.format("noop").mode("overwrite").save()

In [17]:
df_joined.explain()

== Physical Plan ==
*(2) BroadcastHashJoin [department_id#135], [department_id#144], LeftOuter, BuildRight, false
:- FileScan csv [first_name#128,last_name#129,job_title#130,dob#131,email#132,phone#133,salary#134,department_id#135] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/data/employee_records.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<first_name:string,last_name:string,job_title:string,dob:string,email:string,phone:string,s...
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#213]
   +- *(1) Filter isnotnull(department_id#144)
      +- FileScan csv [department_id#144,department_name#145,description#146,city#147,state#148,country#149] Batched: false, DataFilters: [isnotnull(department_id#144)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/data/department_data.csv], PartitionFilters: [], PushedFilters: [IsNotNull(department_id)], ReadSchema: struct<dep

#### Join Big and Big tables - SortMerge without buckets

In [18]:
_sales_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"

sales = spark.read.csv("/data/new_sales.csv", header=True, schema=_sales_schema)

In [19]:
_city_schema = "city_id string, city string, state string, state_abv string, country string"

city = spark.read.csv("/data/cities.csv", header=True, schema=_city_schema)

In [20]:
sales_joined = sales.join(city, on=sales.city_id==city.city_id, how="left_outer")
sales_joined.write.format("noop").mode("overwrite").save()

In [21]:
sales_joined.explain()

== Physical Plan ==
*(4) SortMergeJoin [city_id#275], [city_id#282], LeftOuter
:- *(1) Sort [city_id#275 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(city_id#275, 200), ENSURE_REQUIREMENTS, [id=#293]
:     +- FileScan csv [transacted_at#270,trx_id#271,retailer_id#272,description#273,amount#274,city_id#275] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/data/new_sales.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<transacted_at:string,trx_id:string,retailer_id:string,description:string,amount:double,cit...
+- *(3) Sort [city_id#282 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(city_id#282, 200), ENSURE_REQUIREMENTS, [id=#305]
      +- *(2) Filter isnotnull(city_id#282)
         +- FileScan csv [city_id#282,city#283,state#284,state_abv#285,country#286] Batched: false, DataFilters: [isnotnull(city_id#282)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/data/cities.csv], PartitionFilters: [], 

#### Big & Big join - with buckets

- Bucketing utilises a hashing function to divide the data into a buckets. Based upon the joining-column, depositing the same "ids" in the same (separate set of) buckets for each dataset, and pass the same buckets to one exectuor.
- No shuffling is required if the data is bucketed properly.
- The number of buckets between the datasets should be the same.
- Bucketing only works when we save the data as tables.

In [22]:
sales.write.format("csv").mode("overwrite").bucketBy(4, "city_id").option("header", True).option("path", "/data/output/sales_bucket.csv").saveAsTable("sales_bucket")

In [23]:
city.write.format("csv").mode("overwrite").bucketBy(4, "city_id").option("header", True).option("path", "/data/output/city_bucket.csv").saveAsTable("city_bucket")

In [25]:
spark.sql("show tables in default").show()

+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
|  default| city_bucket|      false|
|  default|sales_bucket|      false|
+---------+------------+-----------+



In [26]:
sales_bucket = spark.read.table("sales_bucket")
city_bucket = spark.read.table("city_bucket")

In [27]:
buckets_joined = sales_bucket.join(city_bucket, on=sales_bucket.city_id==city_bucket.city_id, how="left_outer")
buckets_joined.write.format("noop").mode("overwrite").save()

In [28]:
buckets_joined.explain()

== Physical Plan ==
*(3) SortMergeJoin [city_id#752], [city_id#759], LeftOuter
:- *(1) Sort [city_id#752 ASC NULLS FIRST], false, 0
:  +- FileScan csv default.sales_bucket[transacted_at#747,trx_id#748,retailer_id#749,description#750,amount#751,city_id#752] Batched: false, Bucketed: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/data/output/sales_bucket.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<transacted_at:string,trx_id:string,retailer_id:string,description:string,amount:double,cit..., SelectedBucketsCount: 4 out of 4
+- *(2) Sort [city_id#759 ASC NULLS FIRST], false, 0
   +- *(2) Filter isnotnull(city_id#759)
      +- FileScan csv default.city_bucket[city_id#759,city#760,state#761,state_abv#762,country#763] Batched: false, Bucketed: true, DataFilters: [isnotnull(city_id#759)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/data/output/city_bucket.csv], PartitionFilters: [], PushedFilters: [IsNotNull(city_id)], ReadSchem

#### Points to note

1. Joining Column different than Bucket Column, Same Bucket Size - Shuffle on Both table
2. Joining Column Same, One table in Bucket - Shuffle on non Bucket table
3. Joining Column Same, Different Bucket Size - Shuffle on Smaller Bucket Side
4. Joining Column Same, Same Bucket Size - No Shuffle (Faster Join)

***

1. So its very importatant to choose correct Bucket column and Bucket Size
2. Decide effectively on number of Buckets, as too mant buckets with not enough data can lead to Small file issue.
3. Datasets are Small - you can prefer Shuffle Hash Join

In [29]:
spark.stop()

### Dynamic Allocation

When compared to a similar functionality in DataBricks ("scale up"), pyspark adds & removes clusters within the designated worker nodes whereas DataBricks adds/removed the worker nodes themselves

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = (
    SparkSession
    .builder
    .appName("Dynamic Allocation")
    .master("spark://eb22766c6b88:7077")
    .config("spark.executor.cores", 2)
    .config("spark.executor.memory", "512M")
    .config("spark.dynamicAllocation.enabled", True)
    .config("spark.dynamicAllocation.minExecutors", 0)
    .config("spark.dynamicAllocation.maxExecutors", 5)
    .config("spark.dynamicAllocation.initialExecutors", 1)
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True)
    .config("spark.dynamicAllocation.executorIdleTimeout", "60s")
    .config("spark.dynamicAllocation.cachedExecutorIdleTimeout", "60s")
    .getOrCreate()
)

spark

In [2]:
_sales_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"

sales = spark.read.csv("/data/new_sales.csv", header=True, schema=_sales_schema)

In [3]:
_city_schema = "city_id string, city string, state string, state_abv string, country string"

city = spark.read.csv("/data/cities.csv", header=True, schema=_city_schema)

In [4]:
df_sales_joined = sales.join(city, on=sales.city_id==city.city_id, how="left_outer")

In [5]:
df_sales_joined.write.format("noop").mode("overwrite").save()

In [7]:
spark.stop()

### Skewness and Spillage

- Memory spillage can happen if there exists skewed data in the dataset, ie, when the column used for joining has skew towards a particular value.
- This can lead to more unserialized data being present in the memory than can be stored so the data needs be serialized and stored on the disk instead.
- This increased the I/O overhead for the operations, leading to longer processing and execution time.
- *Salting*: Technique to help repartition skewed data.
    - "Salting" helps create new keys from exisiting ones by appending values to the joining key.
    - This enables more equal distribution of data between the executors based upon the new keys.

In [8]:
spark = (
    SparkSession
    .builder
    .appName("Skewness and Spillage")
    .master("spark://eb22766c6b88:7077")
    .config("spark.cores.max", 8)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

In [9]:
# Disabling Spark features that help with Optimisation (Adaptive Query Engine)

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [12]:
# Read Employee data (Missing Skewed Dataset. Refer to the actual notebook & video)
_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv").schema(_schema).option("header", True).load("/data/employee_records.csv")

In [13]:
# Read DEPT CSV data
_dept_schema = "department_id int, department_name string, description string, city string, state string, country string"

dept = spark.read.format("csv").schema(_dept_schema).option("header", True).load("/data/department_data.csv")

In [14]:
df_joined = emp.join(dept, on=emp.department_id==dept.department_id, how="left_outer")

In [18]:
df_joined.write.format("noop").mode("overwrite").save()

In [17]:
df_joined.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").agg(count("partition_id")).show()

+------------+-------------------+
|partition_id|count(partition_id)|
+------------+-------------------+
|         103|             100417|
|         122|              99780|
|          43|              99451|
|         107|              99805|
|          49|              99706|
|          51|             100248|
|         102|             100214|
|          66|             100210|
|         174|             100155|
|          89|             100014|
+------------+-------------------+



In [19]:
spark.stop()

### Adaptive Query Execution

In [20]:
spark = (
    SparkSession
    .builder
    .appName("AQE")
    .master("spark://eb22766c6b88:7077")
    .config("spark.cores.max", 8)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

**POST-SHUFFLE**
- *coalescePartitions*: Scales the Shuffle partitions to use only the necessary (and optimised) amount for a given dataset.
- *skewed join optimizations*: Balances partition sizes, ie, join smaller partitions and split bigger partitions. Automatically avoids spillage.
- *autoBroadcast*: Automatically converts SortMerge joins into Broadcast joins for smaller datasets.

In [22]:
# Disabling Spark features that help with Optimisation (Adaptive Query Engine)

spark.conf.set("spark.sql.adaptive.enabled", False) # True
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False) # True
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) # 10MB (default)

In [23]:
spark.stop()

### Spark SQL

- *Catalog*: Stores the metadata of SQL objects. Catolog can be stored either in-memory (RAM) or hive (Disk).
    - Use `.enableHiveSupport()` while creating SparkSession to persist the tables.
- *Hints*: We can use hints in spark-sql queries to enforce a particular technique in the execution of task. (`/* <HINT(<TABLE>)> */`)

In [61]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = (
    SparkSession
    .builder
    .appName("Spark SQL")
    .master("local[*]")
    .config("spark.cores.max", 8)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

In [63]:
# Read Employee data (Missing Skewed Dataset. Refer to the actual notebook & video)
_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv").schema(_schema).option("header", True).load("/data/employee_records.csv")

In [64]:
# Read DEPT CSV data
_dept_schema = "department_id int, department_name string, description string, city string, state string, country string"

dept = spark.read.format("csv").schema(_dept_schema).option("header", True).load("/data/department_data.csv")

In [65]:
spark.conf.get("spark.sql.catalogImplementation")

'in-memory'

In [66]:
spark.sql("show databases")

DataFrame[namespace: string]

In [67]:
db = spark.sql("show databases")
db.show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [68]:
spark.sql("show tables in default").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [69]:
emp.createOrReplaceTempView("emp_temp")
dept.createOrReplaceTempView("dept_temp")

In [70]:
spark.sql("show tables in default").show()


+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|         |dept_temp|       true|
|         | emp_temp|       true|
+---------+---------+-----------+



In [71]:
spark.sql("""
select * from emp_temp
""").show()

+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+
|first_name| last_name|           job_title|       dob|               email|               phone|  salary|department_id|
+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+
|   Richard|  Morrison|Public relations ...|1973-05-05|melissagarcia@exa...|       (699)525-4827|512653.0|            8|
|     Bobby|  Mccarthy|   Barrister's clerk|1974-04-25|   llara@example.net|  (750)846-1602x7458|999836.0|            7|
|    Dennis|    Norman|Land/geomatics su...|1990-06-24| jturner@example.net|    873.820.0518x825|131900.0|           10|
|      John|    Monroe|        Retail buyer|1968-06-16|  erik33@example.net|    820-813-0557x624|485506.0|            1|
|  Michelle|   Elliott|      Air cabin crew|1975-03-31|tiffanyjohnston@e...|       (705)900-5337|604738.0|            8|
|    Ashley|   Montoya|        C

In [72]:
spark.sql("""
select * from emp_temp
where department_id = 1
""").show()

+-----------+---------+--------------------+----------+--------------------+--------------------+--------+-------------+
| first_name|last_name|           job_title|       dob|               email|               phone|  salary|department_id|
+-----------+---------+--------------------+----------+--------------------+--------------------+--------+-------------+
|       John|   Monroe|        Retail buyer|1968-06-16|  erik33@example.net|    820-813-0557x624|485506.0|            1|
|    Rachael|Rodriguez|         Media buyer|1966-12-02|griffinmary@examp...| +1-791-344-7586x548|544732.0|            1|
|Christopher| Callahan| Exhibition designer|1966-10-23| qwalter@example.com|001-947-745-3939x...|251057.0|            1|
|    Lindsey|   Huerta|Embryologist, cli...|1964-10-20|  psmith@example.net|   527.934.6665x1378|878257.0|            1|
|      David|   Harris|   Company secretary|1990-04-13|     nli@example.com|001-959-766-1180x...|249553.0|            1|
|      Brian|Hernandez|     Thea

In [73]:
spark.sql("""
select e.*, date_format(dob, 'yyyy') as dob_year from emp_temp e
""").show()

+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+--------+
|first_name| last_name|           job_title|       dob|               email|               phone|  salary|department_id|dob_year|
+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+--------+
|   Richard|  Morrison|Public relations ...|1973-05-05|melissagarcia@exa...|       (699)525-4827|512653.0|            8|    1973|
|     Bobby|  Mccarthy|   Barrister's clerk|1974-04-25|   llara@example.net|  (750)846-1602x7458|999836.0|            7|    1974|
|    Dennis|    Norman|Land/geomatics su...|1990-06-24| jturner@example.net|    873.820.0518x825|131900.0|           10|    1990|
|      John|    Monroe|        Retail buyer|1968-06-16|  erik33@example.net|    820-813-0557x624|485506.0|            1|    1968|
|  Michelle|   Elliott|      Air cabin crew|1975-03-31|tiffanyjohnston@e...|       (705)90

In [76]:
# Writing the data as table
emp = spark.sql("""
select * from emp_temp
where department_id = 1
""")

emp.write.format("parquet").mode("overwrite").saveAsTable("emp_table")

In [77]:
spark.sql("show tables in default").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|emp_table|      false|
|         |dept_temp|       true|
|         | emp_temp|       true|
+---------+---------+-----------+



In [78]:
spark.read.table("emp_table").show() # ???

+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+
|first_name| last_name|           job_title|       dob|               email|               phone|  salary|department_id|
+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+
|     Stacy|  Richmond|Chartered public ...|1963-06-22|  paul02@example.org|    705-270-1751x276|567073.0|            1|
|   Zachary|    Torres|Recruitment consu...|1963-09-04|perkinslisa@examp...|        465.879.5063|180863.0|            1|
|     Bryan|   Edwards|Engineer, biomedical|1983-03-05|   orush@example.com|    001-470-868-4085|815828.0|            1|
|   Allison|   Hoffman|Therapist, hortic...|2001-01-04|mathewspatrick@ex...|   691-800-7757x7378|657399.0|            1|
|    Travis|     Smith|Education officer...|1968-05-09|dawsonvictoria@ex...|    882-991-6093x704|464773.0|            1|
|      Troy|     Weber|Geophysic

In [79]:
spark.sql("select * from emp_table").show()

+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+
|first_name| last_name|           job_title|       dob|               email|               phone|  salary|department_id|
+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+
|     Stacy|  Richmond|Chartered public ...|1963-06-22|  paul02@example.org|    705-270-1751x276|567073.0|            1|
|   Zachary|    Torres|Recruitment consu...|1963-09-04|perkinslisa@examp...|        465.879.5063|180863.0|            1|
|     Bryan|   Edwards|Engineer, biomedical|1983-03-05|   orush@example.com|    001-470-868-4085|815828.0|            1|
|   Allison|   Hoffman|Therapist, hortic...|2001-01-04|mathewspatrick@ex...|   691-800-7757x7378|657399.0|            1|
|    Travis|     Smith|Education officer...|1968-05-09|dawsonvictoria@ex...|    882-991-6093x704|464773.0|            1|
|      Troy|     Weber|Geophysic

In [80]:
spark.sql("select * from emp_temp").show()

+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+
|first_name| last_name|           job_title|       dob|               email|               phone|  salary|department_id|
+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+
|   Richard|  Morrison|Public relations ...|1973-05-05|melissagarcia@exa...|       (699)525-4827|512653.0|            8|
|     Bobby|  Mccarthy|   Barrister's clerk|1974-04-25|   llara@example.net|  (750)846-1602x7458|999836.0|            7|
|    Dennis|    Norman|Land/geomatics su...|1990-06-24| jturner@example.net|    873.820.0518x825|131900.0|           10|
|      John|    Monroe|        Retail buyer|1968-06-16|  erik33@example.net|    820-813-0557x624|485506.0|            1|
|  Michelle|   Elliott|      Air cabin crew|1975-03-31|tiffanyjohnston@e...|       (705)900-5337|604738.0|            8|
|    Ashley|   Montoya|        C

In [81]:
# Show table metadata

spark.sql("describe extended emp_temp").show()

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|   first_name|   string|   null|
|    last_name|   string|   null|
|    job_title|   string|   null|
|          dob|   string|   null|
|        email|   string|   null|
|        phone|   string|   null|
|       salary|   double|   null|
|department_id|      int|   null|
+-------------+---------+-------+



In [82]:
spark.stop()

### Online Implementations (Refer to Video)