In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, lit, upper

spark = SparkSession.builder.appName("CustomerPipeline").getOrCreate()

df_cust = spark.read.table("bircatalog.default.customers") 
df_orders = spark.read.table("bd.birschema.orders") 
df_payments = spark.read.table("hive_metastore.myschema.payments") 
df_geo = spark.read.table("catalog.schema1.customer_geography") 
df_feedback = spark.read.table("feedback")


df_orders.cache()


df_join1 = df_cust.join(df_orders, df_cust.customer_id == df_orders.customer_id)


df_join1_filtered = df_join1.filter((col("order_status") == "completed") & (col("order_date") >= "2023-01-01"))


df_join2 = df_join1_filtered.join(df_payments, df_join1_filtered.order_id == df_payments.order_id)


df_agg = df_join2.groupBy("customer_id").agg(
    count("*").alias("total_orders"),
    count(when(col("payment_status") == "failed", True)).alias("failed_payments")
)


df_join3 = df_agg.join(df_geo, "customer_id", "left")


df_transformed = df_join3.withColumn("country_upper", upper(col("country")))


df_final = df_transformed.join(df_feedback, "customer_id")


df_final.coalesce(1).write.format("delta").mode("overwrite").save("/mnt/output/customer_summary/")