<a href="https://colab.research.google.com/github/Kumyyy21/colab-week-4.1/blob/main/Spark_SQL_Advanced.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
from pyspark.sql import SparkSession

base_dir = "/content/megatech"
data_dir = f"{base_dir}/.data"
wh_dir = f"{data_dir}/warehouse"
out_dir = f"{data_dir}/out"

os.makedirs(wh_dir, exist_ok=True)
os.makedirs(out_dir, exist_ok=True)

spark = (
    SparkSession.builder.appName("megatech")
    .config("spark.sql.warehouse.dir", wh_dir)
    .getOrCreate()
)

In [None]:
import random
from datetime import datetime, timedelta
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col
need_bootstrap = (not os.path.exists(f"{wh_dir}/customers.parquet")or
                  not os.path.exists(f"{wh_dir}/sales.parquet"))
need_bootstrap = (not os.path.exists(f"{wh_dir}/customers.parquet")or
                  not os.path.exists(f"{wh_dir}/sales.parquet"))
if need_bootstrap:
  print ("Warehouse belum ada, perlu dibuat data sintetis..")
  customer_schema = StructType([
      StructField("customer_id", IntegerType(), False),
      StructField("name", StringType(), True),
      StructField("age",IntegerType(), True),
      StructField("city", StringType(), True),])
  customers_data = [
        (101, "Budi", 34, "Jakarta"),
        (102, "Sari", 41, "Surabaya"),
        (103, "Andi", 26, "Bandung"),
        (104, "Dewi", 29, "Jakarta"),
        (105, "Rudi", 38, "Bandung"),
        (106, "Maya", 31, "Surabaya"),
    ]
  customers_df = spark.createDataFrame(customers_data, schema = customer_schema)
  sales_schema = StructType([StructField("tx_id", IntegerType(), False),
                             StructField("customer_id", IntegerType(), False),
                             StructField("product", StringType(), True),
                             StructField("quantity", IntegerType(), True),
                             StructField("price", IntegerType(), True),
                             StructField("date", StringType(), True),])
  random.seed(42)
  product = ["TV","Laptop","HP","Kulkas","AC","Headphone","Speaker"]
  price_map = {"TV":5_000_000, "Laptop":12_000_000, "HP":3_000_000, "Kulkas":6_000_000,
               "AC":5_500_00, "Headphone":800_000, "Speaker":12_000_000,}
  start = datetime(2025, 1, 1)
  rows = []
  for i in range (1, 51):
    c = random.choice([x[0]for x in customers_data])
    p = random.choice(product)
    q = random.randint(1, 4)
    pr = price_map[p]*random.choice([1,1,1,2])
    d = (start + timedelta(days=random.randint(0, 179))).strftime("%Y-%m-%d")
    rows.append((1000 + i, c, p, q, pr, d))
sales_df = spark.createDataFrame(rows,schema=sales_schema)
sales_enriched_df = sales_df.withColumn("total_value", col("quantity")*col("price"))
customers_df.write.mode("overwrite").parquet(f"{wh_dir}/customers.parquet")
sales_enriched_df.write.mode("overwrite").parquet(f"{wh_dir}/sales.parquet")
print ("Data Warehouse sintetis selesai dibuat")


In [None]:
from pyspark.sql.functions import to_date, col, year, month
customers_df = spark.read.parquet(f"{wh_dir}/customers.parquet")
sales_df = spark.read.parquet(f"{wh_dir}/sales.parquet")
sales_df = (sales_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd")).
            withColumn("year", year(col("date"))).
            withColumn("month", month(col("date"))))
customers_df.createOrReplaceTempView("customers") #buat table bayangan dari customer
sales_df.createOrReplaceTempView("sales")
customers_df.show(5, truncate=False)
sales_df.show(5, truncate=False)


In [None]:
#basic sql in spark
q_basic = spark.sql("""SELECT tx_id, customer_id, product, quantity, price, total_value, date
                    FROM sales where product IN ('TV', 'Laptop', 'Kulkas') AND total_value > 10000000
                    ORDER BY total_value DESC limit 10""")
q_basic.show(5, truncate=False)
q_basic.explain ("formatted") # menampilkan struktur dasar sql

In [None]:
#spark join dgn CASE -> tambahan 1 fitur yaitu bucket (kategori)
q_join = spark.sql("""SELECT s.tx_id, c.name, c.city, s.product, s.quantity, s.total_value,
                    CASE WHEN s.total_value >=20000000 THEN 'BIG'
                    WHEN s.total_value >=12000000 THEN 'MEDIUM' ELSE 'SMALL'
                    END AS bucket
                      FROM sales s INNER JOIN customers c ON s.customer_id = c.customer_id
                      ORDER BY s.total_value DESC limit 10""")
q_join.show(5, truncate=False)

In [None]:
#join sub query dengan CTE (memecah query menjadi bagian2 yang lebih sederhana)
q_cte = spark.sql("""
WITH base AS (
    SELECT s.*, c.city
    FROM sales s
    JOIN customers c USING (customer_id)
),
filtered AS (
    SELECT *
    FROM base
    WHERE product IN ('TV', 'Laptop')
      AND total_value >= 12000000
)
SELECT city,
       COUNT(*) AS big_tx,
       SUM(total_value) AS revenue
FROM filtered
GROUP BY city
ORDER BY revenue DESC
""")

q_cte.show(truncate=False)
