In [0]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Optimizing Joins")
    .master("spark://17e348267994:7077")
    .config("spark.cores.max", 16)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
    
)



In [0]:
# link to storage account
storage_account_name = "joindatasets"
container_name = "datasets"
blob_url = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/"

strorage_account_key= dbutils.secrets.get(scope = "key-vault-secret", key = "blob-datasets-accesskey")
spark.conf.set(
    "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
    strorage_account_key)


In [0]:
# Disable AQE and Broadcast join

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.join.preferSortMergeJoin", True)

In [0]:

# Read CSV from Azure Blob Storage
_dept_schema = "department_id int, department_name string, description string, city string, state string, country string"

csv_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/department_data.csv"


#### Join Big and Small table - SortMerge vs BroadCast Join

In [0]:
# Read EMP CSV data

_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv") \
    .schema(_schema) \
    .option("header", "true") \
    .option("delimiter", ",") \
    .load(f"{blob_url}/employee_records.csv")


In [0]:
# Read DEPT CSV data

_dept_schema = "department_id int, department_name string, description string, city string, state string, country string"

dept = spark.read.format("csv") \
    .schema(_dept_schema) \
    .option("header", "true") \
    .option("delimiter", ",") \
    .load(f"{blob_url}/department_data.csv")

Join in noop format for performance benchmarking

In [0]:
# Join both datasets
df_joined = emp.join(dept, on=emp.department_id==dept.department_id, how="left_outer")

# df_joined.explain()
# Write data in noop format for performance beachmarking
df_joined.write.format("noop").mode("overwrite").save()

In [0]:
# Join Datasets
from pyspark.sql.functions import broadcast

df_joined = emp.join(broadcast(dept), on=emp.department_id==dept.department_id, how="left_outer")

In [0]:
df_joined.write.format("noop").mode("overwrite").save()

In [0]:
df_joined.explain()

#### Join Big and Big table - SortMerge without Buckets

In [0]:
# Read Sales data

sales_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"

sales = spark.read.format("csv") \
    .schema(sales_schema) \
    .option("header", "true") \
    .option("delimiter", ",") \
    .load(f"{blob_url}/new_sales.csv")

In [0]:
# Read City data

city_schema = "city_id string, city string, state string, state_abv string, country string"

city = spark.read.format("csv") \
    .schema(city_schema) \
    .option("header", "true") \
    .option("delimiter", ",") \
    .load(f"{blob_url}/cities.csv")

In [0]:
# Join Data

df_sales_joined = sales.join(city, on=sales.city_id==city.city_id, how="left_outer")

In [0]:
df_sales_joined.write.format("noop").mode("overwrite").save()

In [0]:
# Explain Plan
df_sales_joined.explain()


##### Write Sales and City data in Buckets

In [0]:
# Write Sales data in Buckets

sales.write.format("csv").mode("overwrite").bucketBy(4, "city_id").option("header", True).option("path", "/data/input/datasets/sales_bucket.csv").saveAsTable("sales_bucket")

In [0]:
# Write City data in Buckets

city.write.format("csv").mode("overwrite").bucketBy(4, "city_id").option("header", True).option("path", "/data/input/datasets/city_bucket.csv").saveAsTable("city_bucket")

In [0]:
# Check tables

spark.sql("show tables in default").show()

#### Join Sales and City data - SortMerge with Bucket

In [0]:
# Read Sales table

sales_bucket = spark.read.table("sales_bucket")

In [0]:
# Read City table

city_bucket = spark.read.table("city_bucket")

In [0]:
# Join datasets

df_joined_bucket = sales_bucket.join(city_bucket, on=sales_bucket.city_id==city_bucket.city_id, how="left_outer")

In [0]:
# Write dataset

df_joined_bucket.write.format("noop").mode("overwrite").save()

In [0]:
df_joined_bucket.explain()

In [0]:
# View how tasks are reading Bucket data



#### Points to note

1. Joining Column different than Bucket Column, Same Bucket Size - Shuffle on Both table
2. Joining Column Same, One table in Bucket - Shuffle on non Bucket table
3. Joining Column Same, Different Bucket Size - Shuffle on Smaller Bucket Side
4. Joining Column Same, Same Bucket Size - No Shuffle (Faster Join)

1. So its very importatant to choose correct Bucket column and Bucket Size
2. Decide effectively on number of Buckets, as too mant buckets with not enough data can lead to Small file issue.
3. Datasets are Small - you can prefer Shuffle Hash Join