Sliver To Gold

In [0]:
from pyspark.sql.functions import col,sum,split,concat,lit,current_timestamp,rank
from delta.tables import DeltaTable

In [0]:
## read data from sliver
sliverlocation="/mnt/bayerhackathon/squad2-srinivasu/sliver/"
customerDf=spark.read.format("delta").load(f"{sliverlocation}/customer/").distinct()

OrderDF=spark.read.format("delta").load(f"{sliverlocation}/order/").distinct()
OrderLineDF=spark.read.format("delta").load(f"{sliverlocation}/orderline/").distinct()
CustomerBehaviourDF=spark.read.format("delta").load(f"{sliverlocation}/customerbehaviour/").distinct()



In [0]:
##transformations

from pyspark.sql.window import Window

customerDf_t=customerDf.withColumn("Adrress_new",split("Address"," "))\
                    .withColumn("Adrress_Line_1",col("Adrress_new")[0])\
                    .withColumn("Adrress_Line_2",concat(col("Adrress_new")[1],col("Adrress_new")[2]))

spec = Window.partitionBy("customer_id").orderBy(col("Created_date").desc())
customerDf_tl = customerDf_t.withColumn("rank", rank().over(spec)).filter(col("rank")==1)

In [0]:
%python
from pyspark.sql.functions import lit, current_timestamp
from delta.tables import DeltaTable

# Read the source data
customercdcDf1 = spark.read.format("csv").option("header", "True").option("inferSchema", "True") \
    .option("delimiter", ";") \
    .load("/mnt/bayerhackathon/customer_SCD2_data.csv")

# Deduplicate the source DataFrame
customercdcDf1 = customercdcDf1.dropDuplicates(["customer_id"])

gold_path = "/mnt/bayerhackathon/squad2-srinivasu/gold/"

# Check if Gold table exists
if not DeltaTable.isDeltaTable(spark, f"{gold_path}/customer/"):
    # Initialize Gold table
    customercdcDf1.withColumn("IsCurrent", lit(True)) \
        .withColumn("StartDate", current_timestamp()) \
        .withColumn("EndDate", lit(None).cast("timestamp")) \
        .write.format("delta").save(f"{gold_path}/customer/")
else:
    # Perform SCD Type 2 merge
    gold_table = DeltaTable.forPath(spark, f"{gold_path}/customer/")

    gold_table.alias("gold").merge(
        customercdcDf1.alias("silver"),
        "gold.customer_id = silver.customer_id"
    ).whenMatchedUpdate(
        condition="gold.first_name != silver.first_name OR gold.last_name != silver.last_name OR gold.email != silver.email OR gold.gender != silver.gender OR gold.Address != silver.Address OR gold.city != silver.city OR gold.state != silver.state OR gold.country != silver.country OR gold.zipcode != silver.zipcode OR gold.phone != silver.phone OR gold.created_date != silver.created_date",
        set={
            "IsCurrent": lit(False),
            "EndDate": current_timestamp()
        }
    ).whenNotMatchedInsert(
        values={
            "customer_id": "silver.customer_id",
            "first_name": "silver.first_name",
            "last_name": "silver.last_name",
            "email": "silver.email",
            "gender": "silver.gender",
            "Address": "silver.Address",
            "city": "silver.city",
            "state": "silver.state",
            "country": "silver.country",
            "zipcode": "silver.zipcode",
            "phone": "silver.phone",
            "created_date": "silver.created_date",
            "IsCurrent": lit(True),
            "StartDate": current_timestamp(),
            "EndDate": lit(None).cast("timestamp")
        }
    ).execute()

In [0]:
.join(OrderLineDF.alias("orderline"),col("order.order_id")==col("orderline.order_id"),"left")\
.join(CustomerBehaviourDF.alias("behaviour"),col("cust.customer_id")==col("behaviour.customer_id"),"left")\

In [0]:
"behaviour.order_frequency","behaviour.average_order_value","behaviour.customer_lifetime_value","behaviour.website_visits","behaviour.seconds_spent_on_website","behaviour.page_views","behaviour.cart_abandonment_rate"

In [0]:
## final dataset
gold_analytics_cust=spark.read.format("delta").load(f"{gold_path}/customer/")

gold_analytics=gold_analytics_cust.alias("cust")\
    .join(CustomerBehaviourDF.alias("behaviour"),col("cust.customer_id")==col("behaviour.customer_id"),"left")\
    .join(OrderDF.alias("order"),col("cust.customer_id")==col("order.customer_id"),"left")\
    .join(OrderLineDF.alias("orderline"),col("order.order_id")==col("orderline.order_id"),"left")\
    .selectExpr("cust.*","order.order_id","order.order_date","order.order_channel","order.store_code","order.state as order_state","order.order_country","order.total_purchase_value","order_line_id","product","quantity","price","order_currency","behaviour.order_frequency","behaviour.average_order_value","behaviour.customer_lifetime_value","behaviour.website_visits","behaviour.seconds_spent_on_website","behaviour.page_views","behaviour.cart_abandonment_rate").distinct()



In [0]:
gold_analytics_cust.count()

In [0]:
display(CustomerBehaviourDF)

In [0]:
CustomerBehaviourDF.count()

In [0]:
gold_analytics.count()

In [0]:
gold_analytics.write.format("delta").mode("overwrite").save(f"{gold_path}/gold_analytics/")