In [0]:
dbutils.widgets.text("external_location", "dev_raw_stock_layer", "External Location Name")
dbutils.widgets.text("catalog_name", "dev-raw", "Catalog Name")
dbutils.widgets.text("schema_name", "stock", "Schema Name")
dbutils.widgets.text("table_name", "daily_prices", "Table Name")

In [0]:
catalog_name = dbutils.widgets.get("catalog_name")
schema_name = dbutils.widgets.get("schema_name")
table_name = dbutils.widgets.get("table_name")
external_location = dbutils.widgets.get("external_location")

In [0]:
spark.sql(f"""
CREATE EXTERNAL LOCATION IF NOT EXISTS `{external_location}`
  URL 'abfss://raw-stock-data@stadevm77kkznmognla.dfs.core.windows.net/'
  WITH (STORAGE CREDENTIAL `data_layers_storage_credential`)
  COMMENT 'External location for dev raw API table'
""")

DataFrame[]

In [0]:
spark.sql(f"""
CREATE CATALOG IF NOT EXISTS `{catalog_name}`
COMMENT 'for all dev raw objects'
""")

DataFrame[]

In [0]:
spark.sql(f"USE CATALOG `{catalog_name}`")

spark.sql(
    f"""
    CREATE SCHEMA IF NOT EXISTS `{schema_name}`
    MANAGED LOCATION 'abfss://raw-stock-data@stadevm77kkznmognla.dfs.core.windows.net/'
    COMMENT 'for all dev raw delta tables'
    """
)

DataFrame[]

In [0]:
spark.sql(f"USE CATALOG `{catalog_name}`")
spark.sql(f"USE SCHEMA `{schema_name}`")

spark.sql(
    f"""
    CREATE TABLE IF NOT EXISTS `{catalog_name}`.`{schema_name}`.`{table_name}` (
      load_id STRING,
      load_ts_utc TIMESTAMP,
      sources_system STRING,
      symbol STRING,
      price_date DATE,
      open_price DECIMAL(18,6),
      high_price DECIMAL(18,6),
      low_price DECIMAL(18,6),
      close_price DECIMAL(18,6),
      volume BIGINT
    )
    USING PARQUET
    LOCATION 'abfss://raw-stock-data@stadevm77kkznmognla.dfs.core.windows.net/stock_raw'
    """
)

DataFrame[]

In [0]:
# spark.sql(
#     f"""
#     REFRESH TABLE `{catalog_name}`.`{schema_name}`.`{table_name}`
#     """)

In [0]:
bronze_catalog = "bronze"

spark.sql(f"""
CREATE CATALOG IF NOT EXISTS `{bronze_catalog}`
COMMENT 'for all dev bronze delta tables'
""")

DataFrame[]

In [0]:
# /**

#       load_id, STRING
#       ingestion_timestamp, TIMESTAMP
#       source_system, STRING
#       symbol, STRING
#       price_date, DATE
#       day_opening_price, DECIMAL(18,6)
#       day_highest_price, DECIMAL(18,6)
#       day_lowest_price, DECIMAL(18,6)
#       day_closing_price, DECIMAL(18,6)
#       volume BIGINT
#       load_timestamp, TIMESTAMP

# **/

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, BooleanType

schema = StructType([
    StructField("bronze_col", StringType(), True),
    StructField("raw_col", StringType(), True),
    StructField("data_type", StringType(), True),
    StructField("is_literal", BooleanType(), True)
])

data = [
    ("load_id", "load_id", "STRING", False),
    ("ingestion_timestamp", "load_ts_utc", "TIMESTAMP", False),
    ("source_system", "alpha_vantage", "STRING", True),
    ("symbol", "symbol", "STRING", False),
    ("price_date", "price_date", "DATE", False),
    ("day_opening_price", "open_price", "DECIMAL(18,6)", False),
    ("day_highest_price", "high_price", "DECIMAL(18,6)", False),
    ("day_lowest_price", "low_price", "DECIMAL(18,6)", False),
    ("day_closing_price", "close_price", "DECIMAL(18,6)", False),
    ("volume", "volume", "BIGINT", False),
    ("load_timestamp", "current_timestamp()", "TIMESTAMP", False)
]

df_mapping = spark.createDataFrame(data, schema)
display(df_mapping)

# schema = StructType([
#     StructField("bronze_col", StringType(), True),
#     StructField("raw_col", StringType(), True),
#     StructField("data_type", StringType(), True)
# ])

# data = [
#     ("load_id", "load_id", "STRING"),
#     ("ingestion_timestamp", "load_ts_utc", "TIMESTAMP"),
#     ("source_system", "sources_system", "STRING"),
#     ("symbol", "symbol", "STRING"),
#     ("price_date", "price_date", "DATE"),
#     ("day_opening_price", "open_price", "DECIMAL(18,6)"),
#     ("day_highest_price", "high_price", "DECIMAL(18,6)"),
#     ("day_lowest_price", "low_price", "DECIMAL(18,6)"),
#     ("day_closing_price", "close_price", "DECIMAL(18,6)"),
#     ("volume", "volume", "BIGINT"),
#     ("load_timestamp", "current_timestamp()", "TIMESTAMP")
# ]

# df_mapping = spark.createDataFrame(data, schema)
# display(df_mapping)

bronze_col,raw_col,data_type,is_literal
load_id,load_id,STRING,False
ingestion_timestamp,load_ts_utc,TIMESTAMP,False
source_system,alpha_vantage,STRING,True
symbol,symbol,STRING,False
price_date,price_date,DATE,False
day_opening_price,open_price,"DECIMAL(18,6)",False
day_highest_price,high_price,"DECIMAL(18,6)",False
day_lowest_price,low_price,"DECIMAL(18,6)",False
day_closing_price,close_price,"DECIMAL(18,6)",False
volume,volume,BIGINT,False


In [0]:
# # Assume df_mapping is a DataFrame with columns: bronze_col, raw_col, data_type, is_literal
# spark.sql(f"USE CATALOG `{bronze_catalog}`")
# spark.sql(f"USE SCHEMA `{schema_name}`")

# # Collect mapping info
# mapping = df_mapping.collect()
# bronze_cols = [row[0] for row in mapping]
# data_types = [row[2] for row in mapping]

# # Script 1: CREATE TABLE
# columns_ddl = ",\n  ".join([f"{col} {dtype}" for col, dtype in zip(bronze_cols, data_types)])
# create_table_sql = f"""
# CREATE TABLE IF NOT EXISTS `{bronze_catalog}`.`{schema_name}`.`{table_name}` (
#   {columns_ddl}
# )
# USING DELTA
# """

# spark.sql(create_table_sql)

In [0]:
# First run: creates the bronze table with all mapped columns
# Later runs: if you add a row in your mapping for a new column, it automatically ALTER TABLE … ADD COLUMN

spark.sql(f"USE CATALOG `{bronze_catalog}`")
spark.sql(f"USE SCHEMA `{schema_name}`")

# Collect mapping info
mapping = df_mapping.collect()
bronze_cols = [row["bronze_col"] for row in mapping]
data_types = [row["data_type"] for row in mapping]

from pyspark.sql.utils import AnalysisException

full_bronze_name = f"{bronze_catalog}.{schema_name}.{table_name}"

# Check if table exists
table_exists = False
try:
    spark.table(full_bronze_name).limit(1).collect()
    table_exists = True
except AnalysisException:
    table_exists = False

if not table_exists:
    # First time: create with full DDL
    columns_ddl = ",\n  ".join(
        [f"{col} {dtype}" for col, dtype in zip(bronze_cols, data_types)]
    )
    create_table_sql = f"""
    CREATE TABLE `{bronze_catalog}`.`{schema_name}`.`{table_name}` (
      {columns_ddl}
    )
    USING DELTA
    """
    spark.sql(create_table_sql)
else:
    # Table exists: add any *new* columns
    existing_cols = [f.name for f in spark.table(full_bronze_name).schema.fields]

    alter_statement = []
    for col, dtype in zip(bronze_cols, data_types):
        if col not in existing_cols:
            alter_statement.append(f"ADD COLUMN {col} {dtype}")
    
    
    if alter_statement:
        alter_sql = f"""
        ALTER TABLE `{bronze_catalog}`.`{schema_name}`.`{table_name}`
        {', '.join(alter_statement)}
        """
        spark.sql(alter_sql)

In [0]:
# Script 2: INSERT INTO
select_expr = []
for row in mapping:
    bronze_col = row["bronze_col"]
    raw_col = row["raw_col"]
    is_literal = row["is_literal"]

    if is_literal:
        # For now we treat raw_col as literal text value if is_literal = True
        select_expr.append(f"'{raw_col}' AS {bronze_col}")
    else:
        select_expr.append(f"{raw_col} AS {bronze_col}")

insert_sql = f"""
INSERT INTO `{bronze_catalog}`.`{schema_name}`.`{table_name}`
SELECT
  {', '.join(select_expr)}
FROM `{catalog_name}`.`{schema_name}`.`{table_name}`
"""
spark.sql(insert_sql)

# Later, we want idempotent loads, we can switch that to:
# INSERT OVERWRITE for full reloads, or
# INSERT INTO ... WHERE load_id = '{some_load_id}' for incremental runs

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

**Automated schema evolution from RAW =>> POSSIBLE Next Steps**

We have implemented the bronze side evolution (add columns when mapping grows).

We can decide to implement true auto-evolution from raw schema drift, we can add another step that:

- reads the raw table schema,

- compares to mapping,

- optionally flags “unmapped raw columns” for review.

In [0]:
# %sql
# truncate table bronze.stock.daily_prices