# Chapter 5: Advanced Operations in Spark Code

In [0]:
salary_data = [("John", "Field-eng", 3500), 
    ("Michael", "Field-eng", 4500), 
    ("Robert", None, 4000), 
    ("Maria", "Finance", 3500), 
    ("John", "Sales", 3000), 
    ("Kelly", "Finance", 3500), 
    ("Kate", "Finance", 3000), 
    ("Martin", None, 3500), 
    ("Kiran", "Sales", 2200), 
    ("Michael", "Field-eng", 4500) 
  ]
columns= ["Employee", "Department", "Salary"]
salary_data = spark.createDataFrame(data = salary_data, schema = columns)
salary_data.printSchema()
salary_data.show()


root
 |-- Employee: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: long (nullable = true)

+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
|    John| Field-eng|  3500|
| Michael| Field-eng|  4500|
|  Robert|      NULL|  4000|
|   Maria|   Finance|  3500|
|    John|     Sales|  3000|
|   Kelly|   Finance|  3500|
|    Kate|   Finance|  3000|
|  Martin|      NULL|  3500|
|   Kiran|     Sales|  2200|
| Michael| Field-eng|  4500|
+--------+----------+------+



In [0]:
salary_data.groupby('Department')

GroupedData[grouping expressions: [Department], value: [Employee: string, Department: string, Salary: bigint], type: GroupBy]

In [0]:
salary_data.groupby('Department').avg().show()

+----------+------------------+
|Department|       avg(Salary)|
+----------+------------------+
| Field-eng| 4166.666666666667|
|     Sales|            2600.0|
|      NULL|            3750.0|
|   Finance|3333.3333333333335|
+----------+------------------+



In [0]:
from pyspark.sql.functions import col, round

salary_data.groupBy('Department')\
  .sum('Salary')\
  .withColumn('sum(Salary)',round(col('sum(Salary)'), 2))\
  .withColumnRenamed('sum(Salary)', 'Salary')\
  .orderBy('Department')\
  .show()


+----------+------+
|Department|Salary|
+----------+------+
|      NULL|  7500|
| Field-eng| 12500|
|   Finance| 10000|
|     Sales|  5200|
+----------+------+



In [0]:
salary_data_with_id = [(1, "John", "Field-eng", 3500), \
    (2, "Robert", "Sales", 4000), \
    (3, "Maria", "Finance", 3500), \
    (4, "Michael", "Sales", 3000), \
    (5, "Kelly", "Finance", 3500), \
    (6, "Kate", "Finance", 3000), \
    (7, "Martin", "Finance", 3500), \
    (8, "Kiran", "Sales", 2200), \
  ]
columns= ["ID", "Employee", "Department", "Salary"]
salary_data_with_id = spark.createDataFrame(data = salary_data_with_id, schema = columns)
salary_data_with_id.show()


+---+--------+----------+------+
| ID|Employee|Department|Salary|
+---+--------+----------+------+
|  1|    John| Field-eng|  3500|
|  2|  Robert|     Sales|  4000|
|  3|   Maria|   Finance|  3500|
|  4| Michael|     Sales|  3000|
|  5|   Kelly|   Finance|  3500|
|  6|    Kate|   Finance|  3000|
|  7|  Martin|   Finance|  3500|
|  8|   Kiran|     Sales|  2200|
+---+--------+----------+------+



In [0]:
employee_data = [(1, "NY", "M"), \
    (2, "NC", "M"), \
    (3, "NY", "F"), \
    (4, "TX", "M"), \
    (5, "NY", "F"), \
    (6, "AZ", "F") \
  ]
columns= ["ID", "State", "Gender"]
employee_data = spark.createDataFrame(data = employee_data, schema = columns)
employee_data.show()


+---+-----+------+
| ID|State|Gender|
+---+-----+------+
|  1|   NY|     M|
|  2|   NC|     M|
|  3|   NY|     F|
|  4|   TX|     M|
|  5|   NY|     F|
|  6|   AZ|     F|
+---+-----+------+



In [0]:
salary_data_with_id.join(employee_data,salary_data_with_id.ID ==  employee_data.ID,"inner").show()

+---+--------+----------+------+---+-----+------+
| ID|Employee|Department|Salary| ID|State|Gender|
+---+--------+----------+------+---+-----+------+
|  1|    John| Field-eng|  3500|  1|   NY|     M|
|  2|  Robert|     Sales|  4000|  2|   NC|     M|
|  3|   Maria|   Finance|  3500|  3|   NY|     F|
|  4| Michael|     Sales|  3000|  4|   TX|     M|
|  5|   Kelly|   Finance|  3500|  5|   NY|     F|
|  6|    Kate|   Finance|  3000|  6|   AZ|     F|
+---+--------+----------+------+---+-----+------+



In [0]:
salary_data_with_id.join(employee_data,salary_data_with_id.ID ==  employee_data.ID,"outer").show()

+---+--------+----------+------+----+-----+------+
| ID|Employee|Department|Salary|  ID|State|Gender|
+---+--------+----------+------+----+-----+------+
|  1|    John| Field-eng|  3500|   1|   NY|     M|
|  2|  Robert|     Sales|  4000|   2|   NC|     M|
|  3|   Maria|   Finance|  3500|   3|   NY|     F|
|  4| Michael|     Sales|  3000|   4|   TX|     M|
|  5|   Kelly|   Finance|  3500|   5|   NY|     F|
|  6|    Kate|   Finance|  3000|   6|   AZ|     F|
|  7|  Martin|   Finance|  3500|NULL| NULL|  NULL|
|  8|   Kiran|     Sales|  2200|NULL| NULL|  NULL|
+---+--------+----------+------+----+-----+------+



In [0]:
salary_data_with_id.join(employee_data,salary_data_with_id.ID ==  employee_data.ID,"left").show()

+---+--------+----------+------+----+-----+------+
| ID|Employee|Department|Salary|  ID|State|Gender|
+---+--------+----------+------+----+-----+------+
|  1|    John| Field-eng|  3500|   1|   NY|     M|
|  2|  Robert|     Sales|  4000|   2|   NC|     M|
|  3|   Maria|   Finance|  3500|   3|   NY|     F|
|  4| Michael|     Sales|  3000|   4|   TX|     M|
|  5|   Kelly|   Finance|  3500|   5|   NY|     F|
|  6|    Kate|   Finance|  3000|   6|   AZ|     F|
|  7|  Martin|   Finance|  3500|NULL| NULL|  NULL|
|  8|   Kiran|     Sales|  2200|NULL| NULL|  NULL|
+---+--------+----------+------+----+-----+------+



In [0]:
salary_data_with_id.join(employee_data,salary_data_with_id.ID ==  employee_data.ID,"right").show()

+---+--------+----------+------+---+-----+------+
| ID|Employee|Department|Salary| ID|State|Gender|
+---+--------+----------+------+---+-----+------+
|  1|    John| Field-eng|  3500|  1|   NY|     M|
|  2|  Robert|     Sales|  4000|  2|   NC|     M|
|  3|   Maria|   Finance|  3500|  3|   NY|     F|
|  4| Michael|     Sales|  3000|  4|   TX|     M|
|  5|   Kelly|   Finance|  3500|  5|   NY|     F|
|  6|    Kate|   Finance|  3000|  6|   AZ|     F|
+---+--------+----------+------+---+-----+------+



In [0]:
salary_data_with_id_2 = [(1, "John", "Field-eng", 3500), \
    (2, "Robert", "Sales", 4000), \
    (3, "Aliya", "Finance", 3500), \
    (4, "Nate", "Sales", 3000), \
  ]
columns2= ["ID", "Employee", "Department", "Salary"]

salary_data_with_id_2 = spark.createDataFrame(data = salary_data_with_id_2, schema = columns2)

salary_data_with_id_2.printSchema()
salary_data_with_id_2.show(truncate=False)



root
 |-- ID: long (nullable = true)
 |-- Employee: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: long (nullable = true)

+---+--------+----------+------+
|ID |Employee|Department|Salary|
+---+--------+----------+------+
|1  |John    |Field-eng |3500  |
|2  |Robert  |Sales     |4000  |
|3  |Aliya   |Finance   |3500  |
|4  |Nate    |Sales     |3000  |
+---+--------+----------+------+



In [0]:
unionDF = salary_data_with_id.union(salary_data_with_id_2)
unionDF.show(truncate=False)


+---+--------+----------+------+
|ID |Employee|Department|Salary|
+---+--------+----------+------+
|1  |John    |Field-eng |3500  |
|2  |Robert  |Sales     |4000  |
|3  |Maria   |Finance   |3500  |
|4  |Michael |Sales     |3000  |
|5  |Kelly   |Finance   |3500  |
|6  |Kate    |Finance   |3000  |
|7  |Martin  |Finance   |3500  |
|8  |Kiran   |Sales     |2200  |
|1  |John    |Field-eng |3500  |
|2  |Robert  |Sales     |4000  |
|3  |Aliya   |Finance   |3500  |
|4  |Nate    |Sales     |3000  |
+---+--------+----------+------+



Reading and Writing Data

In [0]:

salary_data_with_id.write.csv('salary_data.csv', mode='overwrite', header=True)
spark.read.csv('/salary_data.csv', header=True).show()


+---+--------+----------+------+
| ID|Employee|Department|Salary|
+---+--------+----------+------+
|  1|    John| Field-eng|  3500|
|  2|  Robert|     Sales|  4000|
|  3|   Maria|   Finance|  3500|
|  4| Michael|     Sales|  3000|
|  5|   Kelly|   Finance|  3500|
|  6|    Kate|   Finance|  3000|
|  7|  Martin|   Finance|  3500|
|  8|   Kiran|     Sales|  2200|
+---+--------+----------+------+



In [0]:
from pyspark.sql.types import *

filePath = '/salary_data.csv'
columns= ["ID", "State", "Gender"] 
schema = StructType([
      StructField("ID", IntegerType(),True),
  StructField("State",  StringType(),True),
  StructField("Gender",  StringType(),True)
])
 
read_data = spark.read.format("csv").option("header","true").schema(schema).load(filePath)
read_data.show()


+---+-------+---------+
| ID|  State|   Gender|
+---+-------+---------+
|  1|   John|Field-eng|
|  2| Robert|    Sales|
|  3|  Maria|  Finance|
|  4|Michael|    Sales|
|  5|  Kelly|  Finance|
|  6|   Kate|  Finance|
|  7| Martin|  Finance|
|  8|  Kiran|    Sales|
+---+-------+---------+



In [0]:
salary_data_with_id.write.parquet('salary_data.parquet', mode='overwrite')
spark.read.parquet('/salary_data.parquet').show()


+---+--------+----------+------+
| ID|Employee|Department|Salary|
+---+--------+----------+------+
|  5|   Kelly|   Finance|  3500|
|  6|    Kate|   Finance|  3000|
|  1|    John| Field-eng|  3500|
|  2|  Robert|     Sales|  4000|
|  3|   Maria|   Finance|  3500|
|  4| Michael|     Sales|  3000|
|  7|  Martin|   Finance|  3500|
|  8|   Kiran|     Sales|  2200|
+---+--------+----------+------+



In [0]:
salary_data_with_id.write.orc('salary_data.orc', mode='overwrite')
spark.read.orc('/salary_data.orc').show()

+---+--------+----------+------+
| ID|Employee|Department|Salary|
+---+--------+----------+------+
|  5|   Kelly|   Finance|  3500|
|  6|    Kate|   Finance|  3000|
|  1|    John| Field-eng|  3500|
|  2|  Robert|     Sales|  4000|
|  7|  Martin|   Finance|  3500|
|  8|   Kiran|     Sales|  2200|
|  3|   Maria|   Finance|  3500|
|  4| Michael|     Sales|  3000|
+---+--------+----------+------+



In [0]:
salary_data_with_id.write.format("delta").save("/FileStore/tables/salary_data_with_id", mode='overwrite')
df = spark.read.load("/FileStore/tables/salary_data_with_id")
df.show()


+---+--------+----------+------+
| ID|Employee|Department|Salary|
+---+--------+----------+------+
|  1|    John| Field-eng|  3500|
|  2|  Robert|     Sales|  4000|
|  3|   Maria|   Finance|  3500|
|  4| Michael|     Sales|  3000|
|  5|   Kelly|   Finance|  3500|
|  6|    Kate|   Finance|  3000|
|  7|  Martin|   Finance|  3500|
|  8|   Kiran|     Sales|  2200|
+---+--------+----------+------+



In [0]:
salary_data_with_id.createOrReplaceTempView("SalaryTable")
spark.sql("SELECT count(*) from SalaryTable").show()


+--------+
|count(1)|
+--------+
|       8|
+--------+



Catalyst Optimizer

In [0]:
# SparkSession setup 
from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName("CatalystOptimizerExample").getOrCreate() 
# Load data 
df = spark.read.csv("/salary_data.csv", header=True, inferSchema=True) 
# Query with Catalyst Optimizer 
result_df = df.select("employee", "department").filter(df["salary"] > 3500) 
# Explain the optimized query plan 
result_df.explain() 


== Physical Plan ==
*(1) Project [employee#129490, department#129491]
+- *(1) Filter (isnotnull(salary#129492) AND (salary#129492 > 3500))
   +- FileScan csv [Employee#129490,Department#129491,Salary#129492] Batched: false, DataFilters: [isnotnull(Salary#129492), (Salary#129492 > 3500)], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/salary_data.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Salary), GreaterThan(Salary,3500)], ReadSchema: struct<Employee:string,Department:string,Salary:int>




In [0]:
# Cache a DataFrame 
df.cache() 
# Unpersist the cached DataFrame 
df.unpersist() 


DataFrame[ID: int, Employee: string, Department: string, Salary: int]

In [0]:
# Repartition a DataFrame into 8 partitions 
df.repartition(8) 


DataFrame[ID: int, Employee: string, Department: string, Salary: int]

In [0]:
# Coalesce a DataFrame to 4 partitions 
df.coalesce(4) 


DataFrame[ID: int, Employee: string, Department: string, Salary: int]