## PySpark Advanced Joins, Aggregations, and Optimizations

**I. Advanced Joins:**

* **Beyond Basic Joins:** Explore join types beyond inner, left, right, and full outer joins.  Consider `left semi join` (returns only the matching rows from the left DataFrame), `left anti join` (returns rows from the left DataFrame that *do not* have a match in the right DataFrame), and `cross join` (Cartesian product of two DataFrames). Understand when each join type is appropriate and its performance implications.

* **Join Strategies:**  PySpark's execution engine employs various join strategies (e.g., broadcast hash join, shuffle hash join, sort-merge join) depending on the DataFrame sizes and data distribution.  Grasp how these strategies work.  Broadcast joins are particularly efficient when one DataFrame is small enough to fit into the memory of each executor node.  Shuffle joins require shuffling data across the network, potentially causing performance bottlenecks. Sort-merge joins are effective for ordered datasets.

* **Join Optimization:** Optimize join performance by considering data partitioning, data skew (uneven data distribution), and null values. Understand how data skew can lead to inefficient join operations and techniques like salting or pre-aggregation to mitigate its effects.  Evaluate join key data types – choosing appropriate data types can enhance efficiency.

* **Multiple Joins:** Understand how to chain multiple joins in a single query and the order of joins impact performance. Analyze the query plan to identify potential bottlenecks with complex join operations.

**II. Advanced Aggregations:**

* **Window Functions:**  Delve into the power of window functions. These functions perform calculations across a set of rows related to the current row, without grouping the data explicitly. Explore various window functions like `rank`, `dense_rank`, `row_number`, `lag`, `lead`, `avg`, `sum`, etc. Understand partitioning and ordering within window functions.

* **Grouping Sets, Rollups, and Cubes:** Expand beyond simple `groupBy` operations. Grouping sets allow you to calculate aggregates for different combinations of grouping columns simultaneously. Rollups generate aggregates at different levels of hierarchy, while cubes provide aggregates for all possible combinations of grouping columns.  These features are beneficial for multi-dimensional analysis.

* **User-Defined Aggregation Functions (UDAFs):** Learn how to create custom aggregation functions to perform complex calculations that are not directly available in PySpark. Understand the requirements for writing a UDAF, including handling initialization, merging, and evaluating the final results.

* **Aggregation Optimization:**  Explore strategies for optimizing aggregation performance, including data partitioning, data skew mitigation, and using appropriate data structures for intermediate results.


**III. Optimizations:**

* **Data Serialization and Deserialization:**  Understand how PySpark serializes and deserializes data during processing. Choosing appropriate serialization formats (e.g., Parquet, ORC) can significantly impact performance.  Consider the trade-offs between different serialization methods in terms of speed, compression ratio, and schema evolution capabilities.

* **Caching and Persistence:** Leverage caching and persistence mechanisms (e.g., `cache`, `persist`) to store intermediate results in memory or on disk for faster access.  Understand different storage levels and when to use each.   Analyze when caching offers the most benefit and the trade-offs in memory consumption.

* **Query Plan Analysis:**  Learn how to analyze the query execution plan (using `explain()`) to identify bottlenecks and opportunities for optimization. Understanding query execution plans will greatly enhance your debugging and tuning abilities.

* **Data Skew Handling:**  Address data skew in joins and aggregations.  Strategies like salting, pre-aggregating, and using different join strategies can improve performance significantly in the presence of skewed data.

* **Partitioning and Bucketing:**  Learn the differences and the best use cases of partitioning and bucketing.  Optimize data organization to improve query execution by efficiently distributing data across nodes, especially for join operations.

* **Broadcast Variables and Accumulators:** Master the efficient use of broadcast variables to send read-only data to all executors without re-serializing it on every task.  Understand accumulators as shared variables for aggregating information from different tasks.

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, sum, when, lit

#Initialize Spark Session


In [None]:
spark = SparkSession.builder.appName("Advanced PySpark Operations").getOrCreate()

#Load Sample Data


In [None]:
#Sample Customer Data
data_customers = [
    ("C001", "Alice", "NY"),
    ("C002", "Bob", "CA"),
    ("C003", "Charlie", "TX"),
    ("C004", "David", "FL"),
    ("C005", "Eve", "NY")
]

schema_customers = ["customer_id", "name", "location"]
customer_df = spark.createDataFrame(data_customers, schema=schema_customers)
customer_df.show()

+-----------+-------+--------+
|customer_id|   name|location|
+-----------+-------+--------+
|       C001|  Alice|      NY|
|       C002|    Bob|      CA|
|       C003|Charlie|      TX|
|       C004|  David|      FL|
|       C005|    Eve|      NY|
+-----------+-------+--------+



In [None]:
#Sample Call Records Data
data_calls = [
    ("C001", "C002", 5, "Connected"),
    ("C002", "C003", 15, "Connected"),
    ("C003", "C004", 7, "Dropped"),
    ("C004", "C005", 10, "Connected"),
    ("C005", "C001", 12, "Connected"),
    ("C001", "C003", 0, "Failed"),
    ("C002", "C004", 20, "Connected")
]

schema_calls = ["caller_id", "receiver_id", "duration", "status"]
call_df = spark.createDataFrame(data_calls, schema=schema_calls)
call_df.show()

+---------+-----------+--------+---------+
|caller_id|receiver_id|duration|   status|
+---------+-----------+--------+---------+
|     C001|       C002|       5|Connected|
|     C002|       C003|      15|Connected|
|     C003|       C004|       7|  Dropped|
|     C004|       C005|      10|Connected|
|     C005|       C001|      12|Connected|
|     C001|       C003|       0|   Failed|
|     C002|       C004|      20|Connected|
+---------+-----------+--------+---------+



#Implementing Advanced Joins


In [None]:
#Example 1: Inner Join
inner_join_df = customer_df.join(call_df, customer_df.customer_id == call_df.caller_id, "inner")
inner_join_df.show()

+-----------+-------+--------+---------+-----------+--------+---------+
|customer_id|   name|location|caller_id|receiver_id|duration|   status|
+-----------+-------+--------+---------+-----------+--------+---------+
|       C001|  Alice|      NY|     C001|       C002|       5|Connected|
|       C001|  Alice|      NY|     C001|       C003|       0|   Failed|
|       C002|    Bob|      CA|     C002|       C003|      15|Connected|
|       C002|    Bob|      CA|     C002|       C004|      20|Connected|
|       C003|Charlie|      TX|     C003|       C004|       7|  Dropped|
|       C004|  David|      FL|     C004|       C005|      10|Connected|
|       C005|    Eve|      NY|     C005|       C001|      12|Connected|
+-----------+-------+--------+---------+-----------+--------+---------+



In [None]:
#Example 2: Left Outer Join
left_join_df = customer_df.join(call_df, customer_df.customer_id == call_df.caller_id, "left")
left_join_df.show()

+-----------+-------+--------+---------+-----------+--------+---------+
|customer_id|   name|location|caller_id|receiver_id|duration|   status|
+-----------+-------+--------+---------+-----------+--------+---------+
|       C001|  Alice|      NY|     C001|       C003|       0|   Failed|
|       C001|  Alice|      NY|     C001|       C002|       5|Connected|
|       C002|    Bob|      CA|     C002|       C004|      20|Connected|
|       C002|    Bob|      CA|     C002|       C003|      15|Connected|
|       C003|Charlie|      TX|     C003|       C004|       7|  Dropped|
|       C004|  David|      FL|     C004|       C005|      10|Connected|
|       C005|    Eve|      NY|     C005|       C001|      12|Connected|
+-----------+-------+--------+---------+-----------+--------+---------+



In [None]:
#Example 3: Right Outer Join
right_join_df = customer_df.join(call_df, customer_df.customer_id == call_df.caller_id, "right")
right_join_df.show()

+-----------+-------+--------+---------+-----------+--------+---------+
|customer_id|   name|location|caller_id|receiver_id|duration|   status|
+-----------+-------+--------+---------+-----------+--------+---------+
|       C003|Charlie|      TX|     C003|       C004|       7|  Dropped|
|       C001|  Alice|      NY|     C001|       C002|       5|Connected|
|       C002|    Bob|      CA|     C002|       C003|      15|Connected|
|       C004|  David|      FL|     C004|       C005|      10|Connected|
|       C005|    Eve|      NY|     C005|       C001|      12|Connected|
|       C001|  Alice|      NY|     C001|       C003|       0|   Failed|
|       C002|    Bob|      CA|     C002|       C004|      20|Connected|
+-----------+-------+--------+---------+-----------+--------+---------+



In [None]:
#Example 4: Full Outer Join
full_outer_join_df = customer_df.join(call_df, customer_df.customer_id == call_df.caller_id, "outer")
full_outer_join_df.show()

+-----------+-------+--------+---------+-----------+--------+---------+
|customer_id|   name|location|caller_id|receiver_id|duration|   status|
+-----------+-------+--------+---------+-----------+--------+---------+
|       C001|  Alice|      NY|     C001|       C002|       5|Connected|
|       C001|  Alice|      NY|     C001|       C003|       0|   Failed|
|       C002|    Bob|      CA|     C002|       C003|      15|Connected|
|       C002|    Bob|      CA|     C002|       C004|      20|Connected|
|       C003|Charlie|      TX|     C003|       C004|       7|  Dropped|
|       C004|  David|      FL|     C004|       C005|      10|Connected|
|       C005|    Eve|      NY|     C005|       C001|      12|Connected|
+-----------+-------+--------+---------+-----------+--------+---------+



#Aggregations


In [None]:
#Example 1: Total Duration of Calls per Customer
call_duration_agg = call_df.groupBy("caller_id").agg(sum("duration").alias("total_duration"))
call_duration_agg.show()

+---------+--------------+
|caller_id|total_duration|
+---------+--------------+
|     C003|             7|
|     C001|             5|
|     C002|            35|
|     C004|            10|
|     C005|            12|
+---------+--------------+



In [None]:
#Example 2: Average Call Duration by Call Status
avg_call_duration = call_df.groupBy("status").agg(avg("duration").alias("avg_duration"))
avg_call_duration.show()

+---------+------------+
|   status|avg_duration|
+---------+------------+
|  Dropped|         7.0|
|Connected|        12.4|
|   Failed|         0.0|
+---------+------------+



In [None]:
#Example 3: Count of Calls per Caller
call_count_agg = call_df.groupBy("caller_id").agg(count("caller_id").alias("call_count"))
call_count_agg.show()

+---------+----------+
|caller_id|call_count|
+---------+----------+
|     C003|         1|
|     C001|         2|
|     C002|         2|
|     C004|         1|
|     C005|         1|
+---------+----------+



#Optimizations


In [None]:
#Caching DataFrames
customer_df.cache()
call_df.cache()
customer_df.count()
call_df.count()

7

In [None]:
#Using Broadcast Joins
from pyspark.sql.functions import broadcast

broadcast_join_df = call_df.join(broadcast(customer_df), call_df.caller_id == customer_df.customer_id, "inner")
broadcast_join_df.show()

+---------+-----------+--------+---------+-----------+-------+--------+
|caller_id|receiver_id|duration|   status|customer_id|   name|location|
+---------+-----------+--------+---------+-----------+-------+--------+
|     C001|       C002|       5|Connected|       C001|  Alice|      NY|
|     C002|       C003|      15|Connected|       C002|    Bob|      CA|
|     C003|       C004|       7|  Dropped|       C003|Charlie|      TX|
|     C004|       C005|      10|Connected|       C004|  David|      FL|
|     C005|       C001|      12|Connected|       C005|    Eve|      NY|
|     C001|       C003|       0|   Failed|       C001|  Alice|      NY|
|     C002|       C004|      20|Connected|       C002|    Bob|      CA|
+---------+-----------+--------+---------+-----------+-------+--------+



In [None]:
#Partitioning for Efficient Joins
partitioned_customer_df = customer_df.repartition(4, "location")
partitioned_call_df = call_df.repartition(4, "caller_id")

In [None]:
#Performing a join on partitioned data
partitioned_join_df = partitioned_customer_df.join(partitioned_call_df, partitioned_customer_df.customer_id == partitioned_call_df.caller_id, "inner")
partitioned_join_df.show()

+-----------+-------+--------+---------+-----------+--------+---------+
|customer_id|   name|location|caller_id|receiver_id|duration|   status|
+-----------+-------+--------+---------+-----------+--------+---------+
|       C001|  Alice|      NY|     C001|       C002|       5|Connected|
|       C005|    Eve|      NY|     C005|       C001|      12|Connected|
|       C001|  Alice|      NY|     C001|       C003|       0|   Failed|
|       C002|    Bob|      CA|     C002|       C003|      15|Connected|
|       C004|  David|      FL|     C004|       C005|      10|Connected|
|       C002|    Bob|      CA|     C002|       C004|      20|Connected|
|       C003|Charlie|      TX|     C003|       C004|       7|  Dropped|
+-----------+-------+--------+---------+-----------+--------+---------+



##Practice Case study - Implementing Advance joins and Optimizations on custom telecom dataset



```
# data_orders = [
    ("O001", "C001", "P001", 2),
    ("O002", "C002", "P002", 1),
    ("O003", "C003", "P003", 3),
    ("O004", "C001", "P004", 1),
    ("O005", "C002", "P001", 1),
    ("O006", "C004", "P005", 2),
    ("O007", "C003", "P002", 2),
]
```



In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, sum, when, lit
from pyspark.sql.functions import broadcast

In [None]:
# Initialize Spark Session
spark = SparkSession.builder.appName("Advanced PySpark Operations").getOrCreate()

# New Sample Data: Products and Orders
data_products = [
    ("P001", "Laptop", 1200),
    ("P002", "Tablet", 300),
    ("P003", "Phone", 800),
    ("P004", "Keyboard", 75),
    ("P005", "Mouse", 25)
]

schema_products = ["product_id", "product_name", "price"]
product_df = spark.createDataFrame(data_products, schema=schema_products)
product_df.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|      P001|      Laptop| 1200|
|      P002|      Tablet|  300|
|      P003|       Phone|  800|
|      P004|    Keyboard|   75|
|      P005|       Mouse|   25|
+----------+------------+-----+



In [None]:
data_orders = [
    ("O001", "C001", "P001", 2),
    ("O002", "C002", "P002", 1),
    ("O003", "C003", "P003", 3),
    ("O004", "C001", "P004", 1),
    ("O005", "C002", "P001", 1),
    ("O006", "C004", "P005", 2),
    ("O007", "C003", "P002", 2),
]

schema_orders = ["order_id", "customer_id", "product_id", "quantity"]
order_df = spark.createDataFrame(data_orders, schema=schema_orders)
order_df.show()

+--------+-----------+----------+--------+
|order_id|customer_id|product_id|quantity|
+--------+-----------+----------+--------+
|    O001|       C001|      P001|       2|
|    O002|       C002|      P002|       1|
|    O003|       C003|      P003|       3|
|    O004|       C001|      P004|       1|
|    O005|       C002|      P001|       1|
|    O006|       C004|      P005|       2|
|    O007|       C003|      P002|       2|
+--------+-----------+----------+--------+



Implementing Advanced Joins


In [None]:
# Example 1: Inner Join (Orders and Products)
inner_join_df = order_df.join(product_df, order_df.product_id == product_df.product_id, "inner")
inner_join_df.show()

+--------+-----------+----------+--------+----------+------------+-----+
|order_id|customer_id|product_id|quantity|product_id|product_name|price|
+--------+-----------+----------+--------+----------+------------+-----+
|    O001|       C001|      P001|       2|      P001|      Laptop| 1200|
|    O005|       C002|      P001|       1|      P001|      Laptop| 1200|
|    O002|       C002|      P002|       1|      P002|      Tablet|  300|
|    O007|       C003|      P002|       2|      P002|      Tablet|  300|
|    O003|       C003|      P003|       3|      P003|       Phone|  800|
|    O004|       C001|      P004|       1|      P004|    Keyboard|   75|
|    O006|       C004|      P005|       2|      P005|       Mouse|   25|
+--------+-----------+----------+--------+----------+------------+-----+



In [None]:
# Example 2: Left Outer Join (Orders and Products)
left_join_df = order_df.join(product_df, order_df.product_id == product_df.product_id, "left")
left_join_df.show()

+--------+-----------+----------+--------+----------+------------+-----+
|order_id|customer_id|product_id|quantity|product_id|product_name|price|
+--------+-----------+----------+--------+----------+------------+-----+
|    O003|       C003|      P003|       3|      P003|       Phone|  800|
|    O002|       C002|      P002|       1|      P002|      Tablet|  300|
|    O001|       C001|      P001|       2|      P001|      Laptop| 1200|
|    O004|       C001|      P004|       1|      P004|    Keyboard|   75|
|    O007|       C003|      P002|       2|      P002|      Tablet|  300|
|    O005|       C002|      P001|       1|      P001|      Laptop| 1200|
|    O006|       C004|      P005|       2|      P005|       Mouse|   25|
+--------+-----------+----------+--------+----------+------------+-----+



Implementing Aggregations


In [None]:
# Example 1: Total Revenue per Product
revenue_per_product = inner_join_df.groupBy("product_name").agg(sum(col("price") * col("quantity")).alias("total_revenue"))
revenue_per_product.show()

+------------+-------------+
|product_name|total_revenue|
+------------+-------------+
|       Phone|         2400|
|      Laptop|         3600|
|       Mouse|           50|
|      Tablet|          900|
|    Keyboard|           75|
+------------+-------------+



In [None]:
# Example 2: Average Order Quantity
average_order_quantity = order_df.agg(avg("quantity").alias("avg_quantity"))
average_order_quantity.show()

+------------------+
|      avg_quantity|
+------------------+
|1.7142857142857142|
+------------------+



Optimizations


In [None]:
# Caching DataFrames
product_df.cache()
order_df.cache()
product_df.count()
order_df.count()

7

In [None]:
# Using Broadcast Joins
broadcast_join_df = order_df.join(broadcast(product_df), order_df.product_id == product_df.product_id, "inner")
broadcast_join_df.show()

+--------+-----------+----------+--------+----------+------------+-----+
|order_id|customer_id|product_id|quantity|product_id|product_name|price|
+--------+-----------+----------+--------+----------+------------+-----+
|    O001|       C001|      P001|       2|      P001|      Laptop| 1200|
|    O002|       C002|      P002|       1|      P002|      Tablet|  300|
|    O003|       C003|      P003|       3|      P003|       Phone|  800|
|    O004|       C001|      P004|       1|      P004|    Keyboard|   75|
|    O005|       C002|      P001|       1|      P001|      Laptop| 1200|
|    O006|       C004|      P005|       2|      P005|       Mouse|   25|
|    O007|       C003|      P002|       2|      P002|      Tablet|  300|
+--------+-----------+----------+--------+----------+------------+-----+



In [None]:
# Partitioning for Efficient Joins
partitioned_product_df = product_df.repartition(2, "product_id") #Using product_id for partitioning
partitioned_order_df = order_df.repartition(2, "product_id") #Using product_id for partitioning

In [None]:
partitioned_join_df = partitioned_product_df.join(partitioned_order_df, partitioned_product_df.product_id == partitioned_order_df.product_id, "inner")
partitioned_join_df.show()

+----------+------------+-----+--------+-----------+----------+--------+
|product_id|product_name|price|order_id|customer_id|product_id|quantity|
+----------+------------+-----+--------+-----------+----------+--------+
|      P002|      Tablet|  300|    O002|       C002|      P002|       1|
|      P004|    Keyboard|   75|    O004|       C001|      P004|       1|
|      P002|      Tablet|  300|    O007|       C003|      P002|       2|
|      P001|      Laptop| 1200|    O001|       C001|      P001|       2|
|      P003|       Phone|  800|    O003|       C003|      P003|       3|
|      P001|      Laptop| 1200|    O005|       C002|      P001|       1|
|      P005|       Mouse|   25|    O006|       C004|      P005|       2|
+----------+------------+-----+--------+-----------+----------+--------+



# Thank You