# 2 - DataFrame Relational Operations in Spark

This demonstration shows how to effectively use joins and set operations with DataFrames, focusing on performance optimization and best practices.

### Objectives
- Understand different types of DataFrame joins
- Implement performance optimizations for joins
- Handle complex join scenarios
- Use set operations effectively
- Apply best practices for data skew

## REQUIRED - SELECT CLASSIC COMPUTE

Before executing cells in this notebook, please select your classic compute cluster in the lab. Be aware that **Serverless** is enabled by default.

Follow these steps to select the classic compute cluster:

1. Navigate to the top-right of this notebook and click the drop-down menu to select your cluster. By default, the notebook will use **Serverless**.

1. If your cluster is available, select it and continue to the next cell. If the cluster is not shown:

    - In the drop-down, select **More**.

    - In the **Attach to an existing compute resource** pop-up, select the first drop-down. You will see a unique cluster name in that drop-down. Please select that cluster.

**NOTE:** If your cluster has terminated, you might need to restart it in order to select it. To do this:

1. Right-click on **Compute** in the left navigation pane and select *Open in new tab*.

1. Find the triangle icon to the right of your compute cluster name and click it.

1. Wait a few minutes for the cluster to start.

1. Once the cluster is running, complete the steps above to select your cluster.

## A. Setup and Data Loading

First, let's load our sample retail data tables and examine their structures.

In [0]:
from pyspark.sql.functions import *

# Read the data 
transactions_df = spark.read.table("samples.bakehouse.sales_transactions")
customers_df = spark.read.table("samples.bakehouse.sales_customers")
franchises_df = spark.read.table("samples.bakehouse.sales_franchises")
suppliers_df = spark.read.table("samples.bakehouse.sales_suppliers")

In [0]:
# 1. Examine schemas for each of the dataframes
transactions_df.printSchema()

In [0]:
customers_df.printSchema()

In [0]:
franchises_df.printSchema()

In [0]:
suppliers_df.printSchema()

## B. Basic Join Operations

Let's start with simple join operations to combine our data.

In [0]:
# 2. Python Inner join example to enrich the transactions with store information
enriched_transactions = franchises_df.join(
    transactions_df,
    on="franchiseID",
    how="inner"
)

display(enriched_transactions)

In [0]:
# 2. The "on" clause can contain an expression
enriched_transactions = franchises_df.join(
    transactions_df,
    on= transactions_df.franchiseID == franchises_df.franchiseID,
    how="inner"
)

display(enriched_transactions)

# This is particularly useful if the join key is named differently in both entities

In [0]:
# 5.  Create temporary views
customers_df.createOrReplaceTempView("customers")
franchises_df.createOrReplaceTempView("franchises")
transactions_df.createOrReplaceTempView("transactions")

In [0]:
%sql
-- 2. SQL Inner join example to enrich the transactions with store information
-- Spark temporary views allows the query to run in RAM memory which is much faster than disk memory

SELECT *
from transactions t
INNER JOIN franchises f
on t.franchiseID = f.franchiseID

In [0]:
# 3. Please note how all fields from both dataframes are present in the result, a better practice is to project the columns you need from each entity
# We will also alias some of the columns to disambiguate column names
enriched_transactions = franchises_df \
    .select(
        "franchiseID", 
        col("name").alias("store_name"), 
        col("city").alias("store_city"), 
        col("country").alias("store_country")
        ) \
    .join(
        transactions_df,
        on="franchiseID",
        how="inner"
    )
    
display(enriched_transactions)

In [0]:
%sql
-- # 3. Please note how all fields from both dataframes are present in the result, a better practice is to project the columns you need from each entity
-- # We will also alias some of the columns to disambiguate column names

SELECT 
    f.franchiseID,
    f.name as store_name,
    f.city as store_city,
    f.country as store_country
FROM franchises f
INNER JOIN transactions t
ON f.franchiseID = t.franchiseID

## C. Create two separate INNER Join one in Python and another in SQL with that pulls the following data:
1. Franchise country equal to US
2. Group by franchiseID, name, city, country, paymentMethod
3. Sum totalPrice

In [0]:
%sql
-- fillin SQL code below
SELECT 
    sum(t.totalPrice) as total_revenue
group by f.franchiseID, f.name, f.city, f.country, t.paymentMethod

In [0]:
# Imports
from pyspark.sql.functions import col, sum as spark_sum

# Fillin Python code below

).filter(col("country") == "US")

# ------------------------------------------------------------------
# 5️⃣  Group & aggregate
# ------------------------------------------------------------------
# Fillin Python code below
agg = (
    
    .agg(spark_sum("totalPrice").alias("total_revenue"))   # sum(t.totalPrice)
)

# ------------------------------------------------------------------
# 7️⃣  Inspect / export the final DataFrame
# ------------------------------------------------------------------
display(agg)               # quick look
# agg.write.csv("us_store_revenue.csv", header=True)   # or any other export


## Key Takeaways

1. **Join Strategy**
   - Use inner joins where keys exist in all dataframes
   - Use outer joins where there is a possibility that keys don't exist in both dataframes
   - Handle column name conflicts

2. **Performance Optimization**
   - Filter before joining
   - Project only needed columns
   - Handle skewed keys appropriately
   - Reference the smaller dataframe first; or
   - Use broadcast joins for small tables