In [0]:
from pyspark.sql import functions as F

# ---------- 1. Get pr_id parameter ----------
dbutils.widgets.text("pr_id", "local_dev")  # default value for manual runs
pr_id = dbutils.widgets.get("pr_id")

print(f"Running pipeline for pr_id = '{pr_id}'")

# ---------- 2. Build raw & clean database names ----------
if pr_id == "prod":
    raw_db_name = "raw"
    clean_db_name = "clean"
else:
    raw_db_name = f"{pr_id}_raw"
    clean_db_name = f"{pr_id}_clean"

print(f"Using raw DB   = {raw_db_name}")
print(f"Using clean DB = {clean_db_name}")

# Make sure the databases (schemas) exist
spark.sql(f"CREATE DATABASE IF NOT EXISTS {raw_db_name}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {clean_db_name}")

# ---------- 3. Read from raw table ----------
raw_table = f"{raw_db_name}.orders_raw"

print(f"Reading from raw table: {raw_table}")

orders_df = spark.table(raw_table)

print("Raw schema:")
orders_df.printSchema()
print("Sample raw data:")
display(orders_df.limit(10))

# ---------- 4. Transformations (example logic) ----------
orders_enriched_df = (
    orders_df
    .withColumn("order_year", F.year("created_at"))
    .withColumn("amount_eur", F.col("amount") * F.lit(0.93))  # fake FX rate
    .filter(F.col("amount") > 0)
)

print("Enriched schema:")
orders_enriched_df.printSchema()
print("Sample enriched data:")
display(orders_enriched_df.limit(10))

# ---------- 5. Write to clean table ----------
clean_table = f"{clean_db_name}.orders_enriched"

print(f"Writing to clean table: {clean_table}")

(
    orders_enriched_df
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(clean_table)
)

print("Pipeline finished successfully âœ…")
