Dataflow Gen2 is suitable for static ingestion, but since we are designing a configuration-driven dynamic RDM framework with logic branching, a notebook provides the required programmability and scalability.‚Äù

üîπ when we use Notebook ?

whrn:

‚úî Dynamic behaviour 
‚úî Config read and then logic change
‚úî For loop 
‚úî Conditional logic apply 
‚úî Conformed vs Mapping branch 
‚úî _add detection build 

Notebook is programmable.

In [None]:
# Step 0: We already loaded the config CSV into config_df.
# Now we will loop through each config row and sanity-check what we will ingest.

def print_config_summary(cfg):
    """
    Print a readable summary of one config row.
    This is just a verification step before we start reading SharePoint lists dynamically.
    """
    print("\n" + "-" * 60)
    print(f"Config Name     : {cfg['rdm_config_name']}")
    print(f"Entity          : {cfg['entity_name']}")
    print(f"SharePoint List : {cfg['sp_list_name']}")
    print(f"Target Table    : {cfg['silver_table_name']}")
    print(f"RDM Type        : {cfg['rdm_type']}")
    print(f"Include Columns : {cfg['include_columns']}")
    print(f"Business Keys   : {cfg['business_key_columns']}")
    print("-" * 60)


# Collect config rows from the Spark DataFrame
config_rows = config_df.collect()

# Print a summary for each row so we can confirm the config looks correct
for cfg in config_rows:
    print_config_summary(cfg.asDict())

____

In [None]:
# Step 1: Prepare config rows for dynamic processing
# Convert comma-separated strings into Python lists so we can use them safely later.

prepared_configs = []

for cfg in config_df.collect():
    cfg_dict = cfg.asDict()

    include_cols = [c.strip() for c in cfg_dict["include_columns"].split(",") if c and c.strip()]
    business_keys = [c.strip() for c in cfg_dict["business_key_columns"].split(",") if c and c.strip()]

    prepared_configs.append({
        "config_name": cfg_dict["rdm_config_name"],
        "entity_name": cfg_dict["entity_name"],
        "sp_list_name": cfg_dict["sp_list_name"],
        "target_table": cfg_dict["silver_table_name"],
        "rdm_type": cfg_dict["rdm_type"],
        "include_columns": include_cols,
        "business_keys": business_keys
    })

# Human-readable check
for pc in prepared_configs:
    print("\n" + "-" * 60)
    print(f"Config Name   : {pc['config_name']}")
    print(f"Entity        : {pc['entity_name']}")
    print(f"SP List       : {pc['sp_list_name']}")
    print(f"Target Table  : {pc['target_table']}")
    print(f"RDM Type      : {pc['rdm_type']}")
    print(f"Include Cols  : {pc['include_columns']}")
    print(f"Business Keys : {pc['business_keys']}")

1 sharepoint list read

In [None]:

-------------------old------------

# Step 2: Read each mock SharePoint list export (CSV) based on config
# For POC, we're reading exported SharePoint lists from Lakehouse Files.

from pyspark.sql import functions as F

base_path = "Files/mock_rdm_lists"

for cfg in prepared_configs:
    list_name = cfg["sp_list_name"]
    include_cols = cfg["include_columns"]
    target_table = cfg["target_table"]

    # Build the file path for the exported list
    file_path = f"{base_path}/{list_name}.csv"

    print(f"\nReading list export: {file_path}")

    df = (
        spark.read.format("csv")
        .option("header", "true")
        .load(file_path)
    )

    # Select only the columns we want (ignore everything else in the SharePoint list)
    df_selected = df.select([F.col(c) for c in include_cols])

    print(f"Writing to target table: {target_table}")
    
    # For POC we can overwrite; for production we would avoid overwrite and use a safer pattern
    df_selected.write.mode("overwrite").format("delta").saveAsTable(target_table)

    print(f"Done: {target_table}")

updated above

In [None]:
# ============================================================
# Clean Professional Ingestion Loop (POC)
# ------------------------------------------------------------
# This loop dynamically reads SharePoint-exported CSV files
# based on config and writes them into Silver Delta tables.
#
# Original Config Fields:
# - sp_list_name      ‚Üí tells us which SharePoint list export to read
# - include_columns   ‚Üí tells us which columns to keep
# - silver_table_name ‚Üí tells us where to write in Silver
#
# Additional Config Fields (Added for future scalability):
#
# - rdm_type:
#     Identifies whether the RDM is CONFORMED or MAPPING.
#     This allows future branching logic (e.g., different validation,
#     transformation or merge rules per RDM type).
#
# - business_keys:
#     Defines the natural key of the dataset.
#     These keys will be used in production for:
#         * duplicate detection
#         * null validation
#         * MERGE / UPSERT logic instead of overwrite
#     NOTE: rdm_type and business_keys are NOT written into the Silver table.
#           They are only used to control ingestion logic.
# ============================================================

from pyspark.sql import functions as F

base_path = "Files/mock_rdm_lists"

for cfg in prepared_configs:

    list_name = cfg["sp_list_name"]
    include_cols = cfg["include_columns"]
    target_table = cfg["target_table"]

    # Newly added metadata fields
    rdm_type = cfg["rdm_type"]
    business_keys = cfg["business_keys"]

    file_path = f"{base_path}/{list_name}.csv"

    print("\n" + "="*70)
    print(f"Processing config   : {cfg['config_name']}")
    print(f"RDM Type            : {rdm_type}")
    print(f"Target Silver table : {target_table}")

    df = (
        spark.read.format("csv")
        .option("header", "true")
        .load(file_path)
    )

    # Keep only configured columns
    df_selected = df.select([F.col(c) for c in include_cols])

    # Basic validation on business keys (for future production readiness)
    for key in business_keys:
        null_count = df_selected.filter(F.col(key).isNull()).count()
        print(f"Null count in business key '{key}': {null_count}")

    # POC behavior: overwrite
    df_selected.write.mode("overwrite").format("delta").saveAsTable(target_table)

    print(f"Finished writing table : {target_table}")