# Bronze to Silver
### Overview
This notebook is responsible for loading the data from the bronze DLT's, applying transformations, checking constraints, and loading the transformed data into silver tables.

### Process
- Function definitions
- Define silver configurations
- Create silver DLT's Function
- Loop through silver configuration and load data to tables.

In [0]:
import re
from pyspark.sql.functions import col, to_timestamp, to_date
import dlt
from pyspark.sql import DataFrame
from pyspark.sql import functions as F

## 1. Function Definitions
Here, we will define our transformation functions which we will call later on.

- `camel_to_snake`: Converts a string from camelCase to snake_case format.
- `convert_columns_to_snake_case`: Converts all column names in a DataFrame from camelCase to snake_case.
- `convert_date_columns_to_date`: Identifies and converts all columns with "date" in their name to date format.

In [0]:
def camel_to_snake(name):
    s1 = re.sub('([a-z0-9])([A-Z])', r'\1_\2', name)
    return s1.lower()

In [0]:
def convert_columns_to_snake_case(df):
    for col_name in df.columns:
        df = df.withColumnRenamed(col_name, camel_to_snake(col_name))
    return df

In [0]:
def convert_date_columns_to_date(df):
    date_columns = [col_name for col_name in df.columns if "date" in col_name.lower()]
    for col_name in date_columns:
        df = df.withColumn(col_name, to_date(col(col_name), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
    return df


## 2. Silver Table Configurations
In the code below, we are identifying which columns we would like to exclude from each table, the transformations we would like to apply, and table specific constraints.

In [0]:
# Define the common exclusion columns and transformations globally
COMMON_EXCLUDE_COLUMNS = ["_rescued_data", "ingestion_time", "source_file"]
COMMON_TRANSFORMATIONS = [convert_columns_to_snake_case, convert_date_columns_to_date]

silver_tables_config = {
    "adventureworks.silver.address": {           
        "bronze_table": "adventureworks.bronze.address",
        "expectations": {
            "address_id": "address_id IS NOT NULL"
        }
    },
    "adventureworks.silver.customer": { 
        "bronze_table": "adventureworks.bronze.customer",
        "expectations": {
            "customer_id": "customer_id IS NOT NULL",
            "modified_date": "modified_date > '2005-07-01'"
        }
    },
    "adventureworks.silver.customeraddress": {
        "bronze_table": "adventureworks.bronze.customeraddress",
        "expectations": {
            "customer_id": "customer_id IS NOT NULL",
            "address_id": "address_id IS NOT NULL"
        }
    },
    "adventureworks.silver.product": {
        "bronze_table": "adventureworks.bronze.product",
        "expectations": {
            "product_id": "product_id IS NOT NULL",
            "product_number": "product_number IS NOT NULL"
        }
    },
    "adventureworks.silver.productcategory": {
        "bronze_table": "adventureworks.bronze.productcategory",
        "expectations": {
            "product_category_id": "product_category_id IS NOT NULL",
            "name": "name IS NOT NULL"
        }
    },
    "adventureworks.silver.productdescription": {
        "bronze_table": "adventureworks.bronze.productdescription",
        "expectations": {
            "product_description_id": "product_description_id IS NOT NULL"
        }
    },
    "adventureworks.silver.productmodel": {
        "bronze_table": "adventureworks.bronze.productmodel",
        "expectations": {
            "product_model_id": "product_model_id IS NOT NULL",
            "name": "name IS NOT NULL"
        }
    },
    "adventureworks.silver.productmodelproductdescription": {
        "bronze_table": "adventureworks.bronze.productmodelproductdescription",
        "expectations": {
            "product_model_id": "product_model_id IS NOT NULL",
            "product_description_id": "product_description_id IS NOT NULL"
        }
    },
    "adventureworks.silver.salesorderdetail": {
        "bronze_table": "adventureworks.bronze.salesorderdetail",
        "expectations": {
            "sales_order_id": "sales_order_id IS NOT NULL",
            "sales_order_detail_id": "sales_order_detail_id IS NOT NULL",
            "product_id": "product_id IS NOT NULL"
        }
    },
    "adventureworks.silver.salesorderheader": {
        "bronze_table": "adventureworks.bronze.salesorderheader",
        "expectations": {
            "sales_order_id": "sales_order_id IS NOT NULL",
            "customer_id": "customer_id IS NOT NULL",
            "order_date": "order_date IS NOT NULL"
        }
    }
}


### 3. Creating Silver DLT Tables
`create_silver_table` dynamically creates DLT's for each table in the silver schema. The main difference from the bronze tables are the constraints (`@dlt.expect_all_or_drop`) and the transformations in the `transform_data()` function.

In [0]:
# Function to dynamically create DLT tables for Silver layer
def create_silver_table(table_name, config):
    bronze_table_name = config["bronze_table"]
    table_expectations = config.get("expectations", {})
    
    @dlt.table(
        name=table_name,
        comment="Streaming Silver table from Bronze",
        table_properties={"quality": "silver"}
    )
    @dlt.expect_all_or_drop(table_expectations)
    def transform_data():
        # Read the streaming bronze table
        df = spark.readStream.table(bronze_table_name)

        # Drop excluded columns (using the global list)
        df = df.drop(*COMMON_EXCLUDE_COLUMNS)

        # Apply transformations (using the global list)
        for transform in COMMON_TRANSFORMATIONS:
            df = transform(df)

        return df

## 4. Loading Data to Silver Tables
Following the same process as before, this loop will iterate through the silver_tables config and create tables and configure them.

In [0]:
# Loop through the dictionary to create silver tables
for table_name, config in silver_tables_config.items():
    create_silver_table(table_name, config)