%md
# Joins 
- joins are used while working with 2 dataframes
- we use this to join left dataframe with right dataframe
- To do so we need to specify tables name, join expression and type of join
- example: df_1.join(df_2,df_1.col1 == df_2.col2,'inner')
- inner join is the default way of join

## Join name ambiguity:
- When we join the tables, spark and sql engines in nehind the scene allots a id to the columns 
- When we have 2 tables with same columns, the name might be same but the underneath ids associated with them are different
- So when we do "*" it fetches all the columns
- But when we select the common columnn name ex- col1, it has 2 ids associted with it one for table1.col1 and another table2.col1 so we need to specify the table as well.

### Approaches:
- Rename the column before join a withColumnRenamed transformation
- Drop the ambiguous column
----
## Join internals types
  - Shuffle Join
  - Broadcast Join

## Shuffle Sort and Merge Join:
- Most common join type in Spark.
- Based on the Hadoop MapReduce framework.
- Always used for large-to-large dataset joins..

## Understanding the Shuffle Process

### 1. Initial Setup
For a distributed join between two datasets (`d1` and `d2`):
- Both datasets are partitioned (example: 3 partitions each).
  
Example Partitioning:
```
d1:
  - part-001
  - part-002
  - part-003

d2:
  - part-001
  - part-002
  - part-003
```

- These partitions are distributed across **executors**.
  - Example: 3 executors, each holding one partition of `d1` and one partition of `d2`.

---

### 2. The Problem in Direct Joins
- While joining on a key, the corresponding rows might **not reside** on the **same executor**.
- Example:
  - The key `001` from `d1` might be in `part-001` (Executor 1).
  - The corresponding key in `d2` might be in `part-001` (Executor 2).
- **Direct join is not possible** unless matching keys are on the same executor.

---

### 3. Map Phase
- **Map phase** starts by **mapping all records based on the join key**.
- All records are sent to a structure called the **Map Exchange** (a buffer).
- Each record is **tagged** by its join key so Spark can track where it needs to go.

---

### 4. Shuffle and Reduce Phase
- After mapping, **Spark shuffles** the data:
  - Records are **moved across the cluster** so that **matching keys are grouped together**.
- The **Reduce phase** reorganizes the mapped records into new **shuffle partitions**:
  - All records with the same join key now sit together.
  - This enables an **efficient join**.

---

### Key Terms

| Term | Meaning |
|:-----|:--------|
| **Map Exchange** | Buffer where mapped records are stored before being shuffled. |
| **Shuffle Operation** | Movement of records across the cluster based on join keys. |
| **Shuffle Partitions** | Reorganized partitions after shuffle, ready for join operation. |

---

### Why is Shuffle Expensive?
- **Heavy data movement** across the network.
- **Serialization/Deserialization** overhead.
- **Disk I/O** if shuffle spills to disk.
- **Memory pressure** if too much data gets shuffled at once.

---

## Final Summary
- **Shuffle Sort and Merge** is the default method for large-to-large joins.
- It is necessary because **distributed datasets may have matching keys on different executors**.
- The **map phase** prepares data based on the key, and the **reduce phase** rearranges it into **shuffle partitions** for joining.
- The **shuffle** itself is the main bottleneck for join performance in Spark.

## Broadcast join:
-  A broadcast join happens when Spark copies (broadcasts) the smaller table (even upto 1-2 gb) to every node in the cluster.
- The larger table stays partitioned. Each node joins its partition of the large table with the full copy of the small table locally.
- When Spark automatically detects it or when we force it using broadcast().

## Optimizing Joins:
### Optimizing Shuffle Joins
- Filtering the data before join will cut short the dataframe from join.
- Look for times where we can aggregate the data even before joinin the dataframes to reduce the size of dataframe
- Optimize on shuffle partitions and number of executors and unique join keys
  - What is the maximum possible parallelism: answer is the number of executors and number of unique keys
  - IF the dataset has 200 unique keys then each unique key can be part of 1 reduce exchange (shuffle partition) thus needing 200 shuffle partitions and each shuffle partition works on a single executor so we would need a 200 node spark cluster for running everything on parallel.
  - Even if we have higher number of executor still the max processing for this example is limited to 200 executors.
  - So for large cluster we should try to get the optimized shuffle partitions. if the join is limiting by unique ids then we should to increse the join cardinality.  
  - Join cardinality = How big the output becomes after two tables are joined. Higher cardinality → more rows → more shuffle → better use of large cluster.
- For 2 datasets example sales and product joined on key there can be key skewness. For faster moving products they all will be clustred on a shuffle partitions wheres are executors who are working with keys that correspond to slow moving products will have lesser load in their shuffle partitions. So for this we need to check tasks and break the larger shuffle partitions into smaller ones.



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
 
if __name__ == '__main__':

    spark = (
        SparkSession.builder
                    .appName('Spark Joins')
                    .enableHiveSupport()
                    .getOrCreate()
    )

    customer_df = spark.sql(
        """
        SELECT
            *
        FROM
            samples.tpch.customer
        """
    )

    order_df = spark.sql(
        """
        SELECT
            *
        FROM
        samples.tpch.orders
        """
    )

    display(customer_df.head(5))
    display(order_df.head(5))

    # Join the tables and find top 5 customers by products bought
    customer_df.join(
    order_df,
    customer_df.c_custkey == order_df.o_custkey,
    'left'
    ).groupBy(
        'c_name'
    ).agg(
        count('o_orderkey').alias('orderCounts')
    ).withColumnRenamed(
        'c_name',
        'customerName'
    ).orderBy(
        col('orderCounts').desc()
    ).show(5)