In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [None]:
# ---------------------------------------------------------
# 1. Spark Session Configuration
# ---------------------------------------------------------
spark = SparkSession.builder \
    .appName("Spark SQL External Integration") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024) \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

In [None]:
jdbc_url = "jdbc:mysql://localhost:3306/salesdb"
connection_properties = {
    "user": "root",
    "password": "password",
    "driver": "com.mysql.cj.jdbc.Driver"
}

# Load tables from MySQL
customers_df = spark.read.jdbc(
    url=jdbc_url,
    table="customers",
    properties=connection_properties
)

orders_df = spark.read.jdbc(
    url=jdbc_url,
    table="orders",
    properties=connection_properties
)

customers_df.printSchema()
orders_df.printSchema()


In [None]:
s3_sales_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("s3a://my-bucket/sales_data/")


In [None]:
# Register Temp Views
customers_df.createOrReplaceTempView("customers")
orders_df.createOrReplaceTempView("orders")

# ---------------------------------------------------------
# Advanced SQL with CTE, JOIN, Window Function
# ---------------------------------------------------------
query = """
WITH ranked_orders AS (
    SELECT
        c.customer_id,
        c.customer_name,
        o.order_id,
        o.order_date,
        o.amount,
        SUM(o.amount) OVER (
            PARTITION BY c.customer_id
            ORDER BY o.order_date
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) AS running_total,
        RANK() OVER (
            PARTITION BY c.customer_id
            ORDER BY o.amount DESC
        ) AS order_rank
    FROM customers c
    JOIN orders o
        ON c.customer_id = o.customer_id
)
SELECT *
FROM ranked_orders
WHERE order_rank <= 3
ORDER BY customer_id, order_date
"""

result_df = spark.sql(query)
result_df.show(truncate=False)


In [None]:
result_df.explain(True)


In [None]:
from pyspark.sql.functions import broadcast

optimized_df = orders_df.join(
    broadcast(customers_df),
    "customer_id",
    "inner"
)


In [None]:
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning", "true")


In [None]:
# ---------------------------------------------------------
# Save Output with Partitioning and Compression
# ---------------------------------------------------------
output_path = "output/customer_sales_summary"

result_df.write \
    .mode("overwrite") \
    .partitionBy("customer_id") \
    .option("compression", "snappy") \
    .parquet(output_path)


In [None]:
spark.read.parquet(output_path).show(5)
