# Bronze layer

The bronze layer contains raw data. It is intended to serves as the single source of truth, preserving the data’s fidelity, and enables reprocessing and auditing by retaining all historical data.

In the bronze layer we are going to add field to the tables:
* Ingestion_Time
* Hash - to map row's changes
* State - addopt the operation made in SQL database (like the cdc code)

| **Operation** | **CDC Code** |  **Comments** |
|--------------|------------|------------|
| Delete   | `1`        | implemented with MERGE |
| Insert   | `2`        | implemented with MERGE |
| Update (before image) | `3` | implemented with MERGE |
| Update (after image)  | `4` | implemented with INSERT after MERGE |

Traditionally, Delta Lake offered a **_MERGE INTO_** command to capture changes. **_MERGE INTO_** has a warning point related of the order of change data. Since we reading data directly from a SQL database, we just read a state of database. So, we are not woried about change orders.

We are declaring tables in a lazy way (not declaring all columns). Doing that, we are already take advantage of schema evolution.

* For now, we are just taking the Customer table.

# Imports

In [0]:
import pytz
from pyspark.sql.functions import col, concat_ws, sha2, lit #current_timestamp, from_utc_timestamp, 
from delta.tables import DeltaTable
from datetime import datetime

# Credentials for Azure SQL database (using Azure Key Vault)

In [0]:
# Defining the credentials of Azure SQL database
sql_db_retail_key = dbutils.secrets.get(scope="keys", key="sqldbretailkey")
sql_db_retail_url = "jdbc:sqlserver://retail-oltp-server.database.windows.net:1433;databaseName=retail"
sql_db_retail_user = "andre"

spark.conf.set("spark.sql.retail_url", sql_db_retail_url)
spark.conf.set("spark.sql.retail_user", sql_db_retail_user)
spark.conf.set("spark.sql.retail_key", sql_db_retail_key)

properties = {
    "user": sql_db_retail_user,         # Replace with your username
    "password": sql_db_retail_key,      # Replace with your password
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
print(properties)

# Defining what tables to ingest

In [0]:
# Dictionary with tables and their primary keys
primary_keys = {
    "CUSTOMERS": ["CUSTOMER_ID"] #,
#    "BRANDS": ["BRAND_ID"],
#    "CATEGORIES": ["CATEGORY_ID"],
#    "PRODUCTS": ["PRODUCT_ID"],
#    "STORES": ["STORE_ID"],
#    "PROMOTIONS": ["PROMOTION_ID"],
#    "PAYMENT_METHODS": ["PAYMENT_METHOD_ID"],
#    "INVENTORY": ["INVENTORY_ID"],
#    "SALES": ["SALE_ID"],
#    "TRANSACTION_ITEM": ["TRANSACTION_ID"] 
}

# Bronze Ingestion

In [0]:
for table in primary_keys:
    # Get the list of primary keys for the current table
    keys = primary_keys[table]

    # Convert the list of primary keys into a SQL column definition
    primary_keys_str = ",\n  ".join([f"{key} INTEGER" for key in keys]) # DEALING WITH COMPOSITE KEYS (just integers)
    merge_condition = " AND ".join([f"target.{key} = source.{key}" for key in keys])
    ids = ", ".join([f"{key}" for key in keys])

    # Build condition for merge
    merge_condition = " AND ".join([f"target.{key} = source.{key}" for key in keys])       # source.IDs = target.IDs

    # freezed timestamp:
    freezed_timestamp_dt = datetime.now(pytz.timezone("America/Sao_Paulo"))
    freezed_timestamp = lit(freezed_timestamp_dt)

    # taking the bronze table inside delta lake
    df_bronze = DeltaTable.forName(spark, f"bronze.{table}")

    # reading the source table
    df_source = spark.read \
        .jdbc(
            url=sql_db_retail_url,
            table=f"dbo.{table}",
            properties=properties)

    # Add basics collumns to the source DataFrame:
    df_source = df_source.withColumn("hash", sha2(concat_ws("|", *[col(c) for c in df_source.columns]), 256)) 
    df_source = df_source.withColumn("load_timestamp", freezed_timestamp)
    df_source = df_source.withColumn("end_timestamp", lit(None).cast("timestamp"))
    df_source = df_source.withColumn("state", lit(2))   


    ################################################################################################
    #################################### Start Bronze ingestion ####################################

    #       | **Operation**         | **CDC Code** |
    #       |-----------------------|--------------|
    #       | Delete                |     `1`      |
    #       | Insert                |     `2`      |
    #       | Update (before image) |     `3`      |
    #       | Update (after image)  |     `4`      |   => inserting outside the merge


    # Mark 1 for delete and 3 for update (before image) in bronze.table
    # Insert: STATE = 2 
    df_bronze.alias("target").merge(
        df_source.alias("source"),
        merge_condition + " AND target.state in (2,4)"   # source.IDs = target.IDs
    ).withSchemaEvolution(
    ).whenMatchedUpdate(
        condition="target.hash != source.hash AND target.state in (2,4)",
        set={
            "state": "3",
            "end_timestamp": freezed_timestamp
        }
    ).whenNotMatchedBySourceUpdate(
        condition="target.state in (2,4)",
        set={
            "state": "1",
            "end_timestamp": freezed_timestamp
        }
    ).whenNotMatchedInsertAll(
    ).execute()

    # Filtering all Update (after image) STATE = 4:
    df_source_state4 = df_source \
        .withColumn("state", lit(4)) \
        .join(
            df_bronze.toDF() \
                .filter((col("state") == 3) & (col("end_timestamp") == freezed_timestamp_dt)) \
                .select(*keys),  
            on=keys,
            how="inner"
        )

    # Inserting all Update (after image) STATE = 4:
    df_source_state4.write \
        .format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .saveAsTable(f"bronze.{table}")
    
    print(f"Table bronze.{table} updated successfully!")