In [1]:
# Q3.

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

# ✅ Step 1: Initialize Spark Session
spark = SparkSession.builder \
    .appName("Spark Joins") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

# ✅ Step 2: Define Schema for Orders Dataset
orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_status", StringType(), True),
    StructField("order_amount", DoubleType(), True)
])

# ✅ Step 3: Load Orders and Customers DataFrames
orders_df = spark.read \
    .format("csv") \
    .schema(orders_schema) \
    .option("header", "true") \
    .load(r"C:\Users\sayed\Desktop\DDP\prac3\data\orders_1gb.csv")

customers_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(r"C:\Users\sayed\Desktop\DDP\M2prac\data\part-00000 (1)")

# ✅ Step 4: Print Column Names
print("Orders Columns:", orders_df.columns)
print("Customers Columns:", customers_df.columns)

# ✅ Step 5: Rename Customer Column if Necessary
if "1" in customers_df.columns:
    customers_df = customers_df.withColumnRenamed("1", "customer_id")

# ✅ Step 6: Perform Inner Join (DataFrame API)
inner_join_df = orders_df.join(customers_df, "customer_id", "inner")
inner_join_df.show(5)

# ✅ Step 7: Perform Inner Join (Spark SQL)
orders_df.createOrReplaceTempView("orders")
customers_df.createOrReplaceTempView("customers")

inner_join_sql = spark.sql("""
    SELECT * FROM orders o
    INNER JOIN customers c
    ON o.customer_id = c.customer_id
""")
inner_join_sql.show(5)

# ✅ Step 8: Check Join Strategy
print("\n🔹 Join Strategy with Default Settings:")
inner_join_df.explain(True)

# ✅ Step 9: Disable Broadcast Join and Check Again
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")  # Disable Broadcast Join

inner_join_df_no_broadcast = orders_df.join(customers_df, "customer_id", "inner")
print("\n🔹 Join Strategy After Disabling Broadcast:")
inner_join_df_no_broadcast.explain(True)

# ✅ Step 10: Stop Spark Session
spark.stop()


Orders Columns: ['order_id', 'customer_id', 'order_status', 'order_amount']
Customers Columns: ['1', 'Richard', 'Hernandez', 'XXXXXXXXX3', 'XXXXXXXXX4', '6303 Heather Plaza', 'Brownsville', 'TX', '78521']
+-----------+--------+------------+------------+-------+---------+----------+----------+------------------+-----------+---+-----+
|customer_id|order_id|order_status|order_amount|Richard|Hernandez|XXXXXXXXX3|XXXXXXXXX4|6303 Heather Plaza|Brownsville| TX|78521|
+-----------+--------+------------+------------+-------+---------+----------+----------+------------------+-----------+---+-----+
+-----------+--------+------------+------------+-------+---------+----------+----------+------------------+-----------+---+-----+

+--------+-----------+------------+------------+-----------+-------+---------+----------+----------+------------------+-----------+---+-----+
|order_id|customer_id|order_status|order_amount|customer_id|Richard|Hernandez|XXXXXXXXX3|XXXXXXXXX4|6303 Heather Plaza|Brownsville| 