# Databricks notebook for silver_to_gold

## Setup

Let's start by setting up the environment and variables.

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import current_date, initcap

In [0]:
# Get Widgets (parameters)
dbutils.widgets.text("catalog_name", "fm_sandbox_demo")  # Default catalog
dbutils.widgets.text("schema_name", "dabs_lakehouse")  # Default schema
dbutils.widgets.text("layer", "03_gold")  # Default layer
dbutils.widgets.text("destination_table","customers") # Default destination table
dbutils.widgets.text("source_layer", "02_silv")  # Default source layer
dbutils.widgets.text("source_table", "customers") # Default source table

catalog_name            = dbutils.widgets.get("catalog_name")
schema_name             = dbutils.widgets.get("schema_name")
layer                   = dbutils.widgets.get("layer")
destination_table_name  = dbutils.widgets.get("destination_table")
source_layer            = dbutils.widgets.get("source_layer")
source_table_name       = dbutils.widgets.get("source_table")

external_storage_url    = "abfss://main-unitycat@fmsandbox1adlseusdev.dfs.core.windows.net"

## Create Test Data

In [0]:
# Define schema
schema = StructType([StructField(name, StringType() if name != "ID" else IntegerType(), False) for name in 
                     ["ID", "customer_name", "street", "city", "district", "state", "postcode", "region"]])

# Create test data
data = [
    (1, 'Alice Johnson', '123 Maple St', 'London', 'Camden', 'Greater London', 'NW1 5DB', 'england'),
    (2, 'Bob Smith', '456 Oak Ave', 'Manchester', 'Didsbury', 'Greater Manchester', 'M20 2UF', 'england'),
    (3, 'Charlie Brown', '789 Pine Rd', 'Birmingham', 'Edgbaston', 'West Midlands', 'B15 2TT', 'england'),
    (4, 'David Miller', '101 Birch Blvd', 'Edinburgh', 'Leith', 'City of Edinburgh', 'EH6 6NZ', 'scotland'),
    (5, 'Eva White', '202 Cedar Ln', 'Glasgow', 'Hillhead', 'Glasgow City', 'G12 8QQ', 'scotland'),
    (6, 'Frank Harris', '303 Elm St', 'Bristol', 'Clifton', 'Bristol', 'BS8 1UD', 'england'),
    (7, 'Grace Lee', '404 Willow Dr', 'Liverpool', 'Wavertree', 'Merseyside', 'L15 1HN', 'england'),
    (8, 'Henry Clark', '505 Aspen Ct', 'Cardiff', 'Roath', 'Cardiff', 'CF24 3EX', 'wales'),
    (9, 'Isabella Davis', '606 Spruce Ave', 'Belfast', 'Botanic', 'Belfast', 'BT7 1NN', 'northern ireland'),
    (10, 'Jack Wilson', '707 Redwood Way', 'Oxford', 'Headington', 'Oxfordshire', 'OX3 9DU', 'england')
]

# # Create DataFrame
# df_source = spark.createDataFrame(data, schema=schema)

# # Display DataFrame
# display(df_source)

## Read Source Data
If layer equals bronze `then` create dataframe `else` read from source table

In [0]:
# Read source data - if layer equals bronze `then` create dataframe `else` read from source table
if layer == "01_bron":
    # Create DataFrame
    df_source = spark.createDataFrame(data, schema=schema)
else:
    # Read source data from table
    df_source = spark.read.table(f"{catalog_name}.{schema_name}.{source_layer}_{source_table_name}")

# Display DataFrame
display(df_source)

## Apply Transformation

In [0]:
# Apply transformation: add ingested date and capitalise region
df_transformed = (  df_source
                      .withColumn("ingested_date", current_date())
                      .withColumn("region", initcap(df_source["region"]))
                  )

# # Display DataFrame
display(df_transformed)

In [0]:
# Define storage location and table path
storage_location = f"{external_storage_url}/{catalog_name}/{schema_name}/{layer}_{destination_table_name}"
destination_full_table_name = f"{catalog_name}.{schema_name}.{layer}_{destination_table_name}"

# Use catalog
spark.sql(f"USE CATALOG {catalog_name}")

# Ensure schema exists
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name}")

# Write DataFrame to Unity Catalog in Delta format
(df_transformed.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .option("path", storage_location)
    .saveAsTable(destination_full_table_name)
)

## Clean Up

In [0]:
# storage_location = f"{external_storage_url}/{catalog_name}/{schema_name}/{layer}_{destination_table_name}"
# spark.sql(f"DROP TABLE IF EXISTS {catalog_name}.{schema_name}.{layer}_{destination_table_name}")
# spark.sql(f"DROP SCHEMA IF EXISTS {catalog_name}.{schema_name}")
# dbutils.fs.rm(f"{storage_location}",True)

Notebook finished.