# Parquet to Raw: Notebook Documentation
### Purpose
The Parquet to Raw notebook is designed to automate the process of ingesting raw data from a landing zone into a Unity Catalog (UC) table. It reads metadata from a configuration source to dynamically process multiple tables, ensuring schema consistency and proper data handling for each table.

### Functionality
The notebook follows a structured, parallelized workflow to process data:

- **Configuration Reading**: The process begins by reading a configuration source (e.g., a DataFrame or CSV file). This source acts as a manifest, containing essential metadata for each table to be processed, such as the table name, refresh mode (full_refresh, truncate, or merge), merge key, and columns to select.

- **Parallel Processing**: Using a multi-threaded approach, the notebook iterates through each table defined in the configuration. This allows for concurrent processing of multiple tables, significantly reducing the overall execution time.

- **Data Loading and Transformations**: For each table, the following steps are performed:

1. The raw data is loaded from its Parquet file in the landing zone.

2. **Column Renaming**: Special characters in column names are replaced with underscores to ensure they are compatible with Unity Catalog naming conventions.

3. **Schema Enforcement**: The notebook intelligently enforces a consistent schema. If the target UC table already exists, it aligns the input DataFrame's schema to match it. If the table is new, it applies a default schema logic, converting columns ending with _date to the correct DateType.

4. **Audit Fields**: Additional audit columns (aud_creationdate, aud_load_id, etc.) are added to the DataFrame for tracking.

5. **Writing to Unity Catalog**: The fully prepared DataFrame is then written to the target Unity Catalog table based on the configured refresh_mode:

  - `full_refresh`: Uses a `CREATE OR REPLACE` statement to completely overwrite the table, ensuring the target is a clean copy of the source data.
  - `truncate`: Empties the existing table with a `TRUNCATE` command and then inserts the new data.
  - `merge`: Performs a `MERGE INTO` operation, updating existing records and inserting new ones based on a specified merge_key.

### Requirements
To run this notebook successfully, the following requirements must be met:

- **Databricks Cluster**: A Databricks cluster configured with access to Unity Catalog is required. The cluster must be in Single User or Shared access mode.

- **Configuration Source**: A DataFrame (or equivalent data source) must exist with the necessary metadata columns, including:

- `schema_name`
- `table_name`
- `key` (for merge operations)
- `refresh_mode`
- `columns` (for column filtering)

**Source Data**: The raw data must be available as Parquet files in the specified landing zone path.

**Permissions**: The user running the notebook must have the necessary permissions to read from the landing zone and write/modify tables in the target Unity Catalog schema.

In [0]:
import os
from datetime import datetime
import re
import json
from delta.tables import *
from pyspark.sql.functions import *
from multiprocessing.pool import ThreadPool
import threading
from delta.tables import DeltaTable

In [0]:
dbutils.widgets.text("load_id","","")
load_id = dbutils.widgets.get("load_id") or datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

dbutils.widgets.text("config_file", "","")
config_file = dbutils.widgets.get("config_file") or '/mnt/landing/it/dataverse/table_info_dataverse_beaver.csv'

dbutils.widgets.text("table_info_file", "","")
table_info_file = dbutils.widgets.get("table_info_file") or 'table_info.csv'

dbutils.widgets.text("n_threads", "","")
n_threads = dbutils.widgets.get("n_threads") or "5"
try:
  n_threads = int(n_threads)
except ValueError as e:
  n_threads = 5

#### Riattivare quando porti su beaver
# env = dbutils.secrets.get(scope="kv-solution-01", key="environment")

# env_mapping = {
#     "development": "heiaepgb002dwe01",
#     "test": "heiaepgb002twe01",
#     "acceptance": "heiaepgb002awe01",
#     "production": "heiaepgb002pwe01"
# }

# uc_env = env_mapping[env]
# print(uc_env)

print( "table_info_file:", table_info_file )
print( "load_id:", load_id )
print( "n_threads:", n_threads )

topic: 
table_info_file: table_info.csv
load_id: 2025-08-12_14-16-11
n_threads: 5


In [0]:
def load_source_df(item):
    """
    Loads the source DataFrame from the landing zone based on config item.
    Args:
        item (Row): The configuration row.
    Returns:
        DataFrame: The loaded DataFrame with filtered columns.
    """
    path_spark = f"dbfs:/mnt/landing/it/dataverse/"
    table_name = item["table_name"]
    columns_str = item["columns"]

    if columns_str and columns_str.strip() != "None":
        cols = [c.strip() for c in columns_str.split(",") if c.strip()]
    else:
        cols = "*"

    print(f"Loading DataFrame for table {table_name}, selecting columns: {cols}")

    df = spark.read.format("parquet").load(
        f"{path_spark}/{table_name}/{table_name}.parquet"
    )

    if cols != "*":
        df = df.select(*cols)

    return df


def df_rename_cols(df):
    """
    Renames columns in a DataFrame by replacing special characters with underscores.
    Args:
        df (DataFrame): The source DataFrame.
    """
    for col in df.schema:
        df = df.withColumnRenamed(col.name, re.sub("[^0-9a-zA-Z]", "_", col.name))
    return df


def add_audit_fields(df):
    """
    Add technical fields to the table
    Args:
        df (DataFrame): The source DataFrame.
    """
    df = (
        df.withColumn("aud_creationdate", current_timestamp())
        .withColumn("aud_modifieddate", current_timestamp())
        .withColumn("aud_load_id", lit(load_id))
        .withColumn("aud_operation", lit("I"))
    )
    return df


def enforce_schema_delta(df, catalog, schema, table):
    """
    Applies a schema check to an input DataFrame against an existing Delta Table.
    If the Delta Table does not exist, it converts columns ending with '_date' to DateType.
    Also checks for the existence of the schema.

    Args:
        df (DataFrame): The source DataFrame.
        catalog (str): The Unity Catalog name.
        schema (str): The schema name.
        table (str): The table name.

    Returns:
        DataFrame: The DataFrame with the corrected schema.
    """
    uc_table_path = f"{catalog}.{schema}.{table}"
    spark = SparkSession.builder.getOrCreate()

    # Correct way to check for schema existence in Unity Catalog
    schemas = [s.name for s in spark.catalog.listDatabases(catalog)]

    if schema in schemas:
        tables = [
            row.tableName for row in spark.catalog.listTables(f"{catalog}.{schema}")
        ]

        if table in tables:
            print(f"Delta Table '{uc_table_path}' exists. Aligning schema...")
            delta_df = spark.sql(f"SELECT * FROM {uc_table_path}")
            delta_schema = delta_df.schema

            for field in delta_schema:
                try:
                    col_name = field.name
                    col_type = field.dataType

                    if col_name in df.columns:
                        if str(col_type) == "DateType()":
                            df = df.withColumn(
                                col_name, to_date(df[col_name], "yyyy-MM-dd")
                            )
                        else:
                            df = df.withColumn(col_name, df[col_name].cast(col_type))
                except Exception as e:
                    print(f"Error while casting column '{col_name}': {e}")
                    pass
        else:
            print("Delta Table does not exist. Applying default schema logic...")
            for col_name in df.columns:
                if col_name.endswith("_date"):
                    try:
                        df = df.withColumn(
                            col_name, to_date(df[col_name], "yyyy-MM-dd")
                        )
                        print(f"Column '{col_name}' converted to DateType.")
                    except Exception as e:
                        print(f"Could not convert column '{col_name}' to DateType: {e}")
                        pass
    else:
        print("Schema does not exist. Applying default schema logic...")
        for col_name in df.columns:
            if col_name.endswith("_date"):
                try:
                    df = df.withColumn(col_name, to_date(df[col_name], "yyyy-MM-dd"))
                    print(f"Column '{col_name}' converted to DateType.")
                except Exception as e:
                    print(f"Could not convert column '{col_name}' to DateType: {e}")
                    pass

    return df


def process_and_write_to_uc(
    source_df, catalog, schema, table, write_mode, merge_key=None
):
    """
    Processes a DataFrame and writes it to a Unity Catalog table using one of three modes:
    'create' (CREATE OR REPLACE), 'truncate' (truncate and insert), or 'merge'.

    Args:
        source_df (DataFrame): The input DataFrame from the landing zone.
        catalog (str): The Unity Catalog name.
        schema (str): The schema name.
        table (str): The table name.
        write_mode (str): The write mode ('full_refresh', 'truncate', or 'merge').
        merge_key (str, optional): The primary key for 'merge' operations. Required for merge_mode.

    Returns:
        None
    """
    uc_table_path = f"{catalog}.{schema}.{table}"
    print(
        f"Starting data processing for table '{uc_table_path}' with mode '{write_mode}'"
    )

    # 1. Column Renaming
    processed_df = df_rename_cols(source_df)
    print("Columns renamed successfully.")

    # 2. Schema Enforcement
    processed_df = enforce_schema_delta(processed_df, catalog, schema, table)
    print("Schema enforcement completed.")

    # Create a temporary view from the processed DataFrame
    temp_view_name = f"temp_view_{table}"
    processed_df.createOrReplaceTempView(temp_view_name)

    # Check if the UC schema and table already exist
    schemas = [s.name for s in spark.catalog.listDatabases(catalog)]
    uc_table_exists = False

    if schema not in schemas:
        print(f"UC schema '{schema}' not found. Creating schema...")
        spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}")
        uc_table_exists = False
    else:
        try:
            spark.sql(f"DESCRIBE TABLE EXTENDED {uc_table_path}")
            uc_table_exists = True
        except Exception:
            pass  # Table does not exist

    # 3. Write to Unity Catalog based on the specified mode
    if write_mode == "full_refresh":
        print(f"Writing to UC table '{uc_table_path}' using CREATE OR REPLACE...")
        spark.sql(
            f"CREATE OR REPLACE TABLE {catalog}.{schema}.{table} AS SELECT * FROM {temp_view_name}"
        )

    elif write_mode == "truncate":
        if not uc_table_exists:
            print(
                f"UC table '{uc_table_path}' not found. Creating table with CREATE OR REPLACE..."
            )
            spark.sql(
                f"CREATE OR REPLACE TABLE {catalog}.{schema}.{table} AS SELECT * FROM {temp_view_name}"
            )
        else:
            print(f"Writing to UC table '{uc_table_path}' using TRUNCATE INSERT...")
            # Note: TRUNCATE and INSERT are not an atomic operation.
            spark.sql(f"TRUNCATE TABLE {catalog}.{schema}.{table}")
            spark.sql(
                f"INSERT INTO {catalog}.{schema}.{table} SELECT * FROM {temp_view_name}"
            )

    elif write_mode == "merge":
        if not merge_key:
            raise ValueError("A 'merge_key' must be provided for the 'merge' mode.")

        if not uc_table_exists:
            print(
                f"UC table '{uc_table_path}' not found. Creating table with CREATE OR REPLACE..."
            )
            spark.sql(
                f"CREATE OR REPLACE TABLE {catalog}.{schema}.{table} AS SELECT * FROM {temp_view_name}"
            )
        else:
            print(f"Writing to UC table '{uc_table_path}' using MERGE INTO...")
            spark.sql(
                f"""
                MERGE INTO {catalog}.{schema}.{table} AS target
                USING {temp_view_name} AS source
                ON target.{merge_key} = source.{merge_key}
                WHEN MATCHED THEN UPDATE SET *
                WHEN NOT MATCHED THEN INSERT *
            """
            )

    else:
        raise ValueError(
            f"Invalid write mode: '{write_mode}'. Must be 'full_refresh', 'truncate', or 'merge'."
        )


def process_table_thread(item):
    global thread_id_count, total, ok, ko
    global mx

    id = -1
    with mx:
        thread_id_count = thread_id_count + 1
        id = thread_id_count
        print(
            f"[{id}] table: {item['table_name']} in schema = {item['schema_name']} is refreshing in {item['refresh_mode']} ... "
        )

    try:
        source_df = load_source_df(item)

        process_and_write_to_uc(
            source_df=source_df,
            catalog="heiaepit002dwe01",
            schema=item["schema_name"],
            table=item["table_name"],
            write_mode=item["refresh_mode"],
            merge_key=item["key"],
        )

        with mx:
            print(f"[{id}] table: {item['table_name']} ... OK")
            ok_tables.append(item["table_name"])
    except Exception as e:
        err = str(e)
        with mx:
            print(f"[{id}] table: {item['table_name']} ... ERROR -> {err}")
            ko_tables.append({"table": item["table_name"], "error": err})

In [0]:
df = spark.read.format("csv").option("header", "true").option("encoding", "utf-8").load(f"dbfs:{config_file}")

thread_id_count = 0
ok_tables = list()
ko_tables = list()
mx = threading.Lock()

pool = ThreadPool(n_threads)
_ = pool.map(process_table_thread, df.collect())

[1] table: hki_product_replenishment in schema = sch_raw_dataverse_replenishment_it is refreshing in full_refresh ... 
Loading DataFrame for table hki_product_replenishment, selecting columns: *
Starting data processing for table 'heiaepit002dwe01.sch_raw_dataverse_replenishment_it.hki_product_replenishment' with mode 'full_refresh'
[2] table: hki_d_product_lookup in schema = sch_raw_dataverse_replenishment_it is refreshing in merge ... 
Loading DataFrame for table hki_d_product_lookup, selecting columns: *
Starting data processing for table 'heiaepit002dwe01.sch_raw_dataverse_replenishment_it.hki_d_product_lookup' with mode 'merge'
[3] table: hki_sales_product_custom in schema = sch_raw_dataverse_sales_it is refreshing in truncate ... 
Loading DataFrame for table hki_sales_product_custom, selecting columns: *
Starting data processing for table 'heiaepit002dwe01.sch_raw_dataverse_sales_it.hki_sales_product_custom' with mode 'truncate'
[4] table: hki_lookup_packtype_sas in schema = sch_