In [0]:
# Notebook: 04_Gold.ipynb
# Sets up Gold layer using DLT, creating two tables:
# 1. "transactions_per_product" 
# 2. "transactions_details" 

import dlt
from pyspark.sql import functions as F

In [0]:
# Table: transactions_per_product

@dlt.table(comment="Pivoted transactions per product aggregated by total sales per day")
def transactions_per_product():
    # Read the transactions table from the Silver layer.
    df = spark.table("jp_assessment.latam_lab_silver.sales_transactions")
    # Convert the transaction timestamp to a date for grouping.
    df = df.withColumn("trans_date", F.to_date("dateTime"))
    # Pivot the data: for each day, sum the totalPrice per product.
    df_pivot = df.groupBy("trans_date").pivot("product").agg(F.sum("totalPrice").alias("total_sales"))
    return df_pivot

In [0]:
# Table: transactions_details

@dlt.table(comment="Detailed transactions enriched with customer information")
def transactions_details():
    # Read transactions and customer data from the Silver layer.
    df_trans = spark.table("jp_assessment.latam_lab_silver.sales_transactions")
    df_customers = spark.table("jp_assessment.latam_lab_silver.sales_customers")
    # Join transactions with customers on customerID to add customer name, address, and email.
    df_details = df_trans.join(df_customers, "customerID", "left") \
                  .select(
                      "transactionID",
                      "customerID",
                      "dateTime",
                      "product",
                      "quantity",
                      "totalPrice",
                      "paymentMethod",
                      F.concat_ws(" ", F.col("first_name"), F.col("last_name")).alias("customer_name"),
                      "address",
                      "email_address"
                  )
    return df_details