# Synthetic Data Validation & Snowflake Load  
This notebook validates the synthetic TPC-DS–like dataset generated by our project.  It performs the following checks:  
- Ensures all CSVs defined in the lineage config are present and loaded  
- Verifies primary key uniqueness for each table  
- Checks foreign key consistency (normalizing any `_date` columns)  
- Stages the files to Snowflake  
- Loads each table into Snowflake using `schema_metadata.json`

## 1. Load environment variables, configurations, and build dynamic maps  
In this cell we read Snowflake credentials from `.env`, load our table/lineage YAML, and derive the list of CSVs, date columns, PKs, and FK rules entirely from config.


In [1]:
# %% 
import os, json, yaml
from pathlib import Path
import pandas as pd
import numpy as np
from dotenv import load_dotenv

# ——————————————————————————————
# Setup paths & env
# ——————————————————————————————
ROOT_DIR = Path.cwd().parent             # one level up from notebooks/
DATA_DIR = ROOT_DIR / "data"
META_DIR = ROOT_DIR / "metadata"
CFG_DIR  = ROOT_DIR / "data_generation_config"
load_dotenv(dotenv_path=ROOT_DIR / ".env")

# ——————————————————————————————
# Load YAML configs
# ——————————————————————————————
with open(CFG_DIR / "tables.yml")  as f: tbl_cfg = yaml.safe_load(f)
with open(CFG_DIR / "lineage.yml") as f: lin_cfg = yaml.safe_load(f)

# ——————————————————————————————
# 1.a) Build list of CSVs from lineage steps
# ——————————————————————————————
file_set = set()
for step in lin_cfg["steps"]:
    # Handle inputs
    for input_file in step.get("inputs", []):
        if isinstance(input_file, list):
            # If input_file is a list, add each item
            for nested_file in input_file:
                file_set.add(nested_file)
        else:
            # If input_file is a string, add it directly
            file_set.add(input_file)
    
    # Handle outputs similarly
    for output_file in step.get("outputs", []):
        if isinstance(output_file, list):
            for nested_file in output_file:
                file_set.add(nested_file)
        else:
            file_set.add(output_file)

# include any backups if present on disk
for p in DATA_DIR.glob("*_backup.csv"):
    file_set.add(p.name)
file_list = sorted(file_set)

# ——————————————————————————————
# 1.b) Derive date columns from tables.yml
# ——————————————————————————————
date_map = {}
from datetime import timedelta, datetime
for tname, tdef in tbl_cfg["tables"].items():
    dates = []
    for col, cdef in tdef["columns"].items():
        # faker.date_between or derived expr containing 'date'
        if (cdef["type"] == "faker"       and cdef.get("method")=="date_between") \
        or (cdef["type"] == "derived"     and "date" in cdef.get("expr","")):
            dates.append(col)
    if dates:
        date_map[tname] = dates

# 1.c) Derive PKs by convention: first int _id
PKS = {}
for tname, tdef in tbl_cfg["tables"].items():
    for col, cdef in tdef["columns"].items():
        if col.endswith("_id") and cdef["type"]=="int":
            PKS[tname] = col
            break

# 1.d) Load schema_metadata for FK definitions
schema_meta   = json.load(open(META_DIR/"schema_metadata.json"))
relationships = schema_meta.get("relationships", [])

print("→ Will load CSVs:", file_list)
print("→ Date parsing map:", date_map)
print("→ Primary keys:", PKS)
print("→ Foreign key rules:", relationships)


→ Will load CSVs: ['accounts_payable.csv', 'accounts_payable_clean.csv', 'accounts_payable_dw.csv', 'accounts_payable_enriched.csv', 'accounts_receivable.csv', 'accounts_receivable_clean.csv', 'accounts_receivable_dw.csv', 'accounts_receivable_enriched.csv', 'ad_spend.csv', 'ad_spend_clean.csv', 'ad_spend_dw.csv', 'ad_spend_enriched.csv', 'anomaly_detections.csv', 'anomaly_detections_clean.csv', 'anomaly_detections_dw.csv', 'anomaly_detections_enriched.csv', 'asset_management.csv', 'asset_management_clean.csv', 'asset_management_dw.csv', 'asset_management_enriched.csv', 'attendance_records.csv', 'attendance_records_clean.csv', 'attendance_records_dw.csv', 'attendance_records_enriched.csv', 'bank_reconciliations.csv', 'bank_reconciliations_clean.csv', 'bank_reconciliations_dw.csv', 'bank_reconciliations_enriched.csv', 'benefits_enrollment.csv', 'benefits_enrollment_clean.csv', 'benefits_enrollment_dw.csv', 'benefits_enrollment_enriched.csv', 'billing_cycles.csv', 'billing_cycles_clean.c

In [5]:
len(file_list)
len(date_map)

23

In [2]:
def audit_data_schema_integrity():
    """
    Audits the generated data files and schema_metadata.json for integrity issues
    that could cause validation failures or Snowflake load errors.
    """
    import os, json, yaml
    import pandas as pd
    from pathlib import Path
    
    # Set up paths
    ROOT_DIR = Path.cwd().parent
    DATA_DIR = ROOT_DIR / "data"
    META_DIR = ROOT_DIR / "metadata"
    schema_path = META_DIR / "schema_metadata.json"
    
    # Load schema metadata
    with open(schema_path, 'r') as f:
        schema = json.load(f)
    
    issues = {
        "type_mismatches": [],
        "malformed_references": [],
        "missing_columns": [],
        "missing_tables": [],
        "fk_violations": []
    }
    
    # 1. Check for malformed references in relationships
    for idx, rel in enumerate(schema.get("relationships", [])):
        if "references" not in rel or "." not in str(rel.get("references", "")):
            issues["malformed_references"].append({
                "index": idx,
                "relation": rel
            })
            continue
            
        # For valid references, check if tables and columns exist
        child_table, child_col = rel.get("table", ""), rel.get("column", "")
        parent_ref = rel.get("references", "")
        
        if "." in parent_ref:
            parent_table, parent_col = parent_ref.split(".", 1)
            
            # Check if tables exist
            child_path = DATA_DIR / f"{child_table}.csv"
            parent_path = DATA_DIR / f"{parent_table}.csv"
            
            if not child_path.exists():
                issues["missing_tables"].append(f"Child table {child_table} not found")
            if not parent_path.exists():
                issues["missing_tables"].append(f"Parent table {parent_table} not found")
            
            # If tables exist, check if columns exist
            if child_path.exists():
                child_df = pd.read_csv(child_path, nrows=0)
                if child_col not in child_df.columns:
                    issues["missing_columns"].append(f"{child_table}.{child_col}")
            
            if parent_path.exists():
                parent_df = pd.read_csv(parent_path, nrows=0)
                if parent_col not in parent_df.columns:
                    issues["missing_columns"].append(f"{parent_table}.{parent_col}")
    
    # 2. Check for type mismatches
    for table_name, table_meta in schema.get("tables", {}).items():
        table_path = DATA_DIR / f"{table_name}.csv"
        if not table_path.exists():
            continue
            
        # Read the actual CSV
        df = pd.read_csv(table_path)
        
        # Check each column's type against schema definition
        for col_name, col_meta in table_meta.get("columns", {}).items():
            if col_name not in df.columns:
                continue
                
            # Get expected type from schema
            if isinstance(col_meta, dict):
                expected_type = col_meta.get("type", "").upper()
            else:
                expected_type = str(col_meta).upper()
            
            # Check for numeric columns with non-numeric values
            if any(t in expected_type for t in ["INT", "FLOAT", "DECIMAL", "NUMERIC", "DOUBLE"]):
                try:
                    pd.to_numeric(df[col_name])
                except:
                    # Found non-numeric data in numeric column
                    # Get sample of problematic values
                    non_numeric = []
                    for val in df[col_name].dropna().unique():
                        try:
                            float(val)
                        except:
                            non_numeric.append(str(val))
                            if len(non_numeric) >= 3:
                                break
                    
                    issues["type_mismatches"].append({
                        "table": table_name,
                        "column": col_name,
                        "expected_type": expected_type,
                        "sample_values": non_numeric[:3]
                    })
    
    # 3. Print detailed report
    print("\n===== DATA INTEGRITY AUDIT REPORT =====\n")
    
    if issues["malformed_references"]:
        print(f"MALFORMED REFERENCES ({len(issues['malformed_references'])} issues):")
        for issue in issues["malformed_references"]:
            print(f"  • Relation #{issue['index']}: {issue['relation']}")
        print()
    
    if issues["missing_tables"]:
        print(f"MISSING TABLES ({len(issues['missing_tables'])} issues):")
        for table in sorted(set(issues["missing_tables"])):
            print(f"  • {table}")
        print()
    
    if issues["missing_columns"]:
        print(f"MISSING COLUMNS ({len(issues['missing_columns'])} issues):")
        for col in sorted(set(issues["missing_columns"])):
            print(f"  • {col}")
        print()
    
    if issues["type_mismatches"]:
        print(f"TYPE MISMATCHES ({len(issues['type_mismatches'])} issues):")
        for issue in issues["type_mismatches"]:
            print(f"  • {issue['table']}.{issue['column']} - Expected {issue['expected_type']}, " 
                  f"found non-numeric values: {', '.join(issue['sample_values'])}")
        print()
    
    if not any(issues.values()):
        print("✓ No integrity issues found!")
    
    return issues

# Run the audit
audit_results = audit_data_schema_integrity()


===== DATA INTEGRITY AUDIT REPORT =====

✓ No integrity issues found!


## 2. Load CSVs with proper date parsing  
Here we load each CSV present on disk, automatically parsing only those columns whose names end in `_date`, and skip any files declared in lineage but not generated.

In [3]:
# %% 
# 2) Load CSVs with _only_ real date columns
from pathlib import Path
import pandas as pd

dfs = {}
print("Loading CSVs…")
for fname in file_list:
    path  = DATA_DIR / fname
    table = path.stem

    if not path.exists():
        print(f" • {table:20s} MISSING on disk, skipping")
        continue

    # Peek at columns
    sample = pd.read_csv(path, nrows=0)
    # Only parse those ending in '_date'
    parse_dt = [c for c in sample.columns if c.lower().endswith("_date")]

    df = pd.read_csv(
        path,
        parse_dates=parse_dt,
        infer_datetime_format=True
    )
    dfs[table] = df
    print(f" • {table:20s} → {df.shape[0]:5d} rows × {df.shape[1]:2d} cols"
          + (f"  (dates: {parse_dt})" if parse_dt else ""))


Loading CSVs…
 • accounts_payable     → 15234 rows ×  3 cols
 • accounts_payable_clean MISSING on disk, skipping
 • accounts_payable_dw  MISSING on disk, skipping
 • accounts_payable_enriched MISSING on disk, skipping
 • accounts_receivable  → 16890 rows ×  3 cols
 • accounts_receivable_clean MISSING on disk, skipping
 • accounts_receivable_dw MISSING on disk, skipping
 • accounts_receivable_enriched MISSING on disk, skipping
 • ad_spend             → 15678 rows ×  4 cols
 • ad_spend_clean       MISSING on disk, skipping
 • ad_spend_dw          MISSING on disk, skipping
 • ad_spend_enriched    MISSING on disk, skipping
 • anomaly_detections   → 43210 rows ×  3 cols
 • anomaly_detections_clean MISSING on disk, skipping
 • anomaly_detections_dw MISSING on disk, skipping
 • anomaly_detections_enriched MISSING on disk, skipping
 • asset_management     →  2765 rows ×  4 cols  (dates: ['purchase_date'])
 • asset_management_clean MISSING on disk, skipping
 • asset_management_dw  MISSING on di

  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(


 • commissions          → 29415 rows ×  4 cols
 • commissions_clean    MISSING on disk, skipping
 • commissions_dw       MISSING on disk, skipping
 • commissions_enriched MISSING on disk, skipping
 • compensation_bands   →  2109 rows ×  4 cols
 • compensation_bands_clean MISSING on disk, skipping
 • compensation_bands_dw MISSING on disk, skipping
 • compensation_bands_enriched MISSING on disk, skipping
 • cost_centers         → 13456 rows ×  2 cols
 • cost_centers_clean   MISSING on disk, skipping
 • cost_centers_dw      MISSING on disk, skipping
 • cost_centers_enriched MISSING on disk, skipping
 • credit_notes         → 13245 rows ×  3 cols
 • credit_notes_clean   MISSING on disk, skipping
 • credit_notes_dw      MISSING on disk, skipping
 • credit_notes_enriched MISSING on disk, skipping
 • currency_rates       → 16789 rows ×  3 cols
 • currency_rates_clean MISSING on disk, skipping
 • currency_rates_dw    MISSING on disk, skipping
 • currency_rates_enriched MISSING on disk, skippin

  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(


 • dashboard_views      → 98765 rows ×  4 cols  (dates: ['view_date'])
 • dashboard_views_clean MISSING on disk, skipping
 • dashboard_views_dw   MISSING on disk, skipping
 • dashboard_views_enriched MISSING on disk, skipping
 • data_quality_metrics → 54321 rows ×  3 cols
 • data_quality_metrics_clean MISSING on disk, skipping
 • data_quality_metrics_dw MISSING on disk, skipping
 • data_quality_metrics_enriched MISSING on disk, skipping
 • dates                →   365 rows ×  6 cols  (dates: ['full_date'])
 • distribution_centers →  1456 rows ×  4 cols
 • distribution_centers_clean MISSING on disk, skipping
 • distribution_centers_dw MISSING on disk, skipping
 • distribution_centers_enriched MISSING on disk, skipping
 • email_campaigns      → 19876 rows ×  4 cols  (dates: ['sent_date'])
 • email_campaigns_clean MISSING on disk, skipping
 • email_campaigns_dw   MISSING on disk, skipping
 • email_campaigns_enriched MISSING on disk, skipping
 • employees            →   200 rows ×  5 cols 

  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(


 • job_postings         →  4321 rows ×  4 cols  (dates: ['post_date'])
 • job_postings_clean   MISSING on disk, skipping
 • job_postings_dw      MISSING on disk, skipping
 • job_postings_enriched MISSING on disk, skipping
 • knowledge_base_articles →  8765 rows ×  4 cols
 • knowledge_base_articles_clean MISSING on disk, skipping
 • knowledge_base_articles_dw MISSING on disk, skipping
 • knowledge_base_articles_enriched MISSING on disk, skipping
 • kpi_definitions      → 87654 rows ×  3 cols
 • kpi_definitions_clean MISSING on disk, skipping
 • kpi_definitions_dw   MISSING on disk, skipping
 • kpi_definitions_enriched MISSING on disk, skipping
 • leads                → 27109 rows ×  4 cols
 • leads_clean          MISSING on disk, skipping
 • leads_dw             MISSING on disk, skipping
 • leads_enriched       MISSING on disk, skipping
 • ledgers              → 18765 rows ×  3 cols
 • ledgers_clean        MISSING on disk, skipping
 • ledgers_dw           MISSING on disk, skipping
 • le

  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(


 • org_chart            →  3210 rows ×  3 cols
 • org_chart_clean      MISSING on disk, skipping
 • org_chart_dw         MISSING on disk, skipping
 • org_chart_enriched   MISSING on disk, skipping
 • packaging            →  1890 rows ×  3 cols
 • packaging_clean      MISSING on disk, skipping
 • packaging_dw         MISSING on disk, skipping
 • packaging_enriched   MISSING on disk, skipping
 • payments             → 34512 rows ×  5 cols  (dates: ['payment_date'])
 • payments_clean       MISSING on disk, skipping
 • payments_dw          MISSING on disk, skipping
 • payments_enriched    MISSING on disk, skipping
 • payments_fin         MISSING on disk, skipping
 • payments_fin_clean   MISSING on disk, skipping
 • payments_fin_dw      MISSING on disk, skipping
 • payments_fin_enriched MISSING on disk, skipping
 • payroll              → 10987 rows ×  5 cols  (dates: ['pay_date'])
 • payroll_clean        MISSING on disk, skipping
 • payroll_dw           MISSING on disk, skipping
 • payroll_

  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(


 • sales_channels       → 31245 rows ×  3 cols
 • sales_channels_clean MISSING on disk, skipping
 • sales_channels_dw    MISSING on disk, skipping
 • sales_channels_enriched MISSING on disk, skipping
 • sales_forecast       → 50123 rows ×  5 cols
 • sales_forecast_clean MISSING on disk, skipping
 • sales_forecast_dw    MISSING on disk, skipping
 • sales_forecast_enriched MISSING on disk, skipping
 • sales_pipeline       → 63452 rows ×  5 cols  (dates: ['close_date'])
 • sales_pipeline_clean MISSING on disk, skipping
 • sales_pipeline_dw    MISSING on disk, skipping
 • sales_pipeline_enriched MISSING on disk, skipping
 • sales_regions        → 26789 rows ×  3 cols
 • sales_regions_clean  MISSING on disk, skipping
 • sales_regions_dw     MISSING on disk, skipping
 • sales_regions_enriched MISSING on disk, skipping
 • sales_targets        → 42301 rows ×  5 cols
 • sales_targets_clean  MISSING on disk, skipping
 • sales_targets_dw     MISSING on disk, skipping
 • sales_targets_enriched MIS

  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(


 • seo_keywords         → 10987 rows ×  4 cols
 • seo_keywords_clean   MISSING on disk, skipping
 • seo_keywords_dw      MISSING on disk, skipping
 • seo_keywords_enriched MISSING on disk, skipping
 • session_data         → 65432 rows ×  4 cols
 • session_data_clean   MISSING on disk, skipping
 • session_data_dw      MISSING on disk, skipping
 • session_data_enriched MISSING on disk, skipping
 • shipments            →  4876 rows ×  5 cols  (dates: ['shipped_date'])
 • shipments_clean      MISSING on disk, skipping
 • shipments_dw         MISSING on disk, skipping
 • shipments_enriched   MISSING on disk, skipping
 • social_media_metrics → 17890 rows ×  5 cols
 • social_media_metrics_clean MISSING on disk, skipping
 • social_media_metrics_dw MISSING on disk, skipping
 • social_media_metrics_enriched MISSING on disk, skipping
 • store_staffing       →  1023 rows ×  5 cols  (dates: ['start_date', 'end_date'])
 • store_staffing_clean MISSING on disk, skipping
 • store_staffing_dw    MISSING

  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(


 • tickets              → 45321 rows ×  5 cols
 • tickets_clean        MISSING on disk, skipping
 • tickets_dw           MISSING on disk, skipping
 • tickets_enriched     MISSING on disk, skipping
 • training_sessions    →  6543 rows ×  4 cols  (dates: ['session_date'])
 • training_sessions_clean MISSING on disk, skipping
 • training_sessions_dw MISSING on disk, skipping
 • training_sessions_enriched MISSING on disk, skipping
 • transactions_backup  → 30095 rows ×  7 cols
 • transactions_dw      MISSING on disk, skipping
 • transfer_orders      →  2678 rows ×  4 cols  (dates: ['transfer_date'])
 • transfer_orders_clean MISSING on disk, skipping
 • transfer_orders_dw   MISSING on disk, skipping
 • transfer_orders_enriched MISSING on disk, skipping
 • trend_analysis       → 10987 rows ×  3 cols
 • trend_analysis_clean MISSING on disk, skipping
 • trend_analysis_dw    MISSING on disk, skipping
 • trend_analysis_enriched MISSING on disk, skipping
 • warehouse_locations  →    50 rows ×  7 c

  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(


## 3. Verify primary key uniqueness for each table  
We infer the primary key for each table by convention (`*_id`) and check that every value is unique, reporting PASS/FAIL.


In [6]:
# %% 
print("\nPrimary Key Checks:")
for tbl, pk in PKS.items():
    df = dfs.get(tbl)
    if df is None: 
        print(f" • {tbl:20s} MISSING") 
        continue
    unique = df[pk].nunique()
    total  = len(df)
    print(f" • {tbl:20s} {unique:5d}/{total:<5d} →"
          + (" PASS" if unique==total else " FAIL"))



Primary Key Checks:
 • customers            59229/59229 → PASS
 • products              8123/8123  → PASS
 • stores                4123/4123  → PASS
 • distribution_centers  1456/1456  → PASS
 • categories            2345/2345  → PASS
 • subcategories         4567/4567  → PASS
 • product_reviews       9123/9123  → PASS
 • product_images        6789/6789  → PASS
 • pricing_tiers         3456/3456  → PASS
 • inventory_levels      5678/5678  → PASS
 • product_attributes    2890/2890  → PASS
 • brand_partners        1345/1345  → PASS
 • dates                  365/365   → PASS
 • budgets              MISSING
 • ledgers              18765/18765 → PASS
 • orders               10000/10000 → PASS
 • warehouse_locations     50/50    → PASS
 • invoices_finance     MISSING
 • tax_records          15000/15000 → PASS
 • financial_statements 14234/14234 → PASS
 • accounts_receivable  16890/16890 → PASS
 • accounts_payable     15234/15234 → PASS
 • cost_centers         13456/13456 → PASS
 • profit_ce

## 4. Check foreign key consistency (with normalized dates)  
All `_date` columns are normalized to midnight, then we join each child→parent FK and count any mismatches, reporting PASS or the number of failures.

In [7]:
# %% 
# 3) Foreign Key Consistency with normalized dates and detailed debugging
import numpy as np

# 3.a) Normalize any datetime columns ending in '_date'
for tbl_name, df in dfs.items():
    for col in df.columns:
        if col.lower().endswith("_date") and np.issubdtype(df[col].dtype, np.datetime64):
            df[col] = df[col].dt.normalize()

def fk_issues(child_df, ckey, parent_df, pkey):
    return (~child_df[ckey].dropna().isin(parent_df[pkey].dropna())).sum()

print("\nForeign Key Checks (with debug):")
for idx, rel in enumerate(relationships):

    # ---- debug the raw references field ----
    raw_ref = rel.get("references", "")
    if "." not in raw_ref:
        print(f"⚠️  Rel#{idx}: malformed references value (no '.'): {raw_ref!r}  → full rel: {rel}")
        continue

    child_table, child_key = rel["table"], rel["column"]
    parent_table, parent_key = raw_ref.split(".", 1)

    df_child = dfs.get(child_table)
    df_parent = dfs.get(parent_table)

    # Table existence
    if df_child is None or df_parent is None:
        missing = []
        if df_child is None: missing.append(f"CHILD '{child_table}' missing")
        if df_parent is None: missing.append(f"PARENT '{parent_table}' missing")
        print(f"⚠️  Rel#{idx}: {', '.join(missing)}")
        continue

    # Column existence
    if child_key not in df_child.columns:
        print(f"⚠️  Rel#{idx}: CHILD column missing: {child_table}.{child_key}")
        continue
    if parent_key not in df_parent.columns:
        print(f"⚠️  Rel#{idx}: PARENT column missing: {parent_table}.{parent_key}")
        continue

    # Compute FK issues
    issues = fk_issues(df_child, child_key, df_parent, parent_key)
    status = "PASS" if issues == 0 else f"FAIL ({issues})"
    print(f" • Rel#{idx:2d}: {child_table:15s}.{child_key:15s} → {parent_table:15s}.{parent_key:15s} : {status}")



Foreign Key Checks (with debug):
 • Rel# 0: orders         .customer_id     → customers      .customer_id     : PASS
 • Rel# 1: orders         .store_id        → stores         .store_id        : FAIL (5877)
 • Rel# 2: order_items    .order_id        → orders         .order_id        : PASS
 • Rel# 3: order_items    .product_id      → products       .product_id      : PASS
 • Rel# 4: invoices       .order_id        → orders         .order_id        : PASS
 • Rel# 5: payments       .invoice_id      → invoices       .invoice_id      : PASS
 • Rel# 6: refunds        .payment_id      → payments       .payment_id      : PASS
 • Rel# 7: sales_pipeline .customer_id     → customers      .customer_id     : PASS
 • Rel# 8: sales_pipeline .opportunity_id  → opportunities  .opportunity_id  : FAIL (25108)
 • Rel# 9: opportunities  .lead_id         → leads          .lead_id         : PASS
 • Rel#10: commissions    .opportunity_id  → opportunities  .opportunity_id  : PASS
 • Rel#11: sales_targets  .

In [8]:
# %%
import numpy as np

# ——————————————————————————————
# 0) Build set of intentionally broken-FK pairs
# ——————————————————————————————
anomaly_rels = set()

# (a) from tables.yml: if you have an "anomalies" section per table
for tname, tdef in tbl_cfg["tables"].items():
    table_anoms = tdef.get("anomalies", {})
    # e.g. under table_anoms["foreign_key"] you list columns to break
    for col in table_anoms.get("foreign_key", []):
        anomaly_rels.add((tname, col))
    # or if you configured anomalies per-column:
    for col, cdef in tdef["columns"].items():
        rate = cdef.get("anomaly_rate", 0)
        if rate and cdef.get("type") == "int":  # or whatever your marker is
            anomaly_rels.add((tname, col))

# (b) from lineage.yml: if you injected anomalies at the step level
for step in lin_cfg["steps"]:
    for a in step.get("anomalies", []):
        if a.get("type") == "fk":
            anomaly_rels.add((a["table"], a["column"]))


# ——————————————————————————————
# 1) Normalize dates
# ——————————————————————————————
for tbl_name, df in dfs.items():
    for col in df.columns:
        if col.lower().endswith("_date") and np.issubdtype(df[col].dtype, np.datetime64):
            df[col] = df[col].dt.normalize()


# ——————————————————————————————
# 2) FK‐check with anomaly flags
# ——————————————————————————————
def fk_issues(child_df, ckey, parent_df, pkey):
    return (~child_df[ckey].dropna().isin(parent_df[pkey].dropna())).sum()

print("\nForeign Key Checks (flagged):")
for idx, rel in enumerate(relationships):
    raw_ref = rel.get("references", "")
    if "." not in raw_ref:
        print(f"⚠️ Rel#{idx}: malformed references (no '.'): {raw_ref!r}")
        continue

    child_table, child_key = rel["table"], rel["column"]
    parent_table, parent_key = raw_ref.split(".", 1)

    df_child = dfs.get(child_table)
    df_parent = dfs.get(parent_table)
    if df_child is None or df_parent is None:
        print(f"⚠️ Rel#{idx}: Missing table → child:{child_table}, parent:{parent_table}")
        continue

    if child_key not in df_child.columns:
        print(f"⚠️ Rel#{idx}: Missing child column {child_table}.{child_key}")
        continue
    if parent_key not in df_parent.columns:
        print(f"⚠️ Rel#{idx}: Missing parent column {parent_table}.{parent_key}")
        continue

    issues = fk_issues(df_child, child_key, df_parent, parent_key)
    status = "PASS" if issues == 0 else f"FAIL({issues})"

    intentional = (child_table, child_key) in anomaly_rels
    flag = "INTENTIONAL" if intentional else "UNINTENTIONAL"

    print(f" • Rel#{idx:2d}: "
          f"{child_table:15s}.{child_key:15s} → "
          f"{parent_table:15s}.{parent_key:15s} : "
          f"{status:10s} [{flag}]")



Foreign Key Checks (flagged):
 • Rel# 0: orders         .customer_id     → customers      .customer_id     : PASS       [UNINTENTIONAL]
 • Rel# 1: orders         .store_id        → stores         .store_id        : FAIL(5877) [UNINTENTIONAL]
 • Rel# 2: order_items    .order_id        → orders         .order_id        : PASS       [UNINTENTIONAL]
 • Rel# 3: order_items    .product_id      → products       .product_id      : PASS       [UNINTENTIONAL]
 • Rel# 4: invoices       .order_id        → orders         .order_id        : PASS       [UNINTENTIONAL]
 • Rel# 5: payments       .invoice_id      → invoices       .invoice_id      : PASS       [UNINTENTIONAL]
 • Rel# 6: refunds        .payment_id      → payments       .payment_id      : PASS       [UNINTENTIONAL]
 • Rel# 7: sales_pipeline .customer_id     → customers      .customer_id     : PASS       [UNINTENTIONAL]
 • Rel# 8: sales_pipeline .opportunity_id  → opportunities  .opportunity_id  : FAIL(25108) [UNINTENTIONAL]
 • Rel# 9: opp

## 5. Stage CSVs and load tables into Snowflake  
We PUT each CSV into the user stage, then use Snowpark (with `parse_header` option) and our `schema_metadata.json` to CREATE/OVERWRITE each table in Snowflake.

In [10]:
def diagnose_and_fix_csv_issues(data_dir, schema_meta, verbose=True):
    """
    Check for and fix common CSV issues that could cause Snowflake loading problems.
    Focuses on quoting issues, data type mismatches, and other common problems.
    
    Args:
        data_dir: Path to directory containing CSV files
        schema_meta: Dictionary containing schema metadata
        verbose: Whether to print detailed diagnostic information
    
    Returns:
        Dictionary with statistics of issues found and fixed
    """
    import pandas as pd
    import os
    from pathlib import Path
    import re
    
    stats = {
        "files_checked": 0,
        "files_fixed": 0,
        "total_issues": 0,
        "issues_by_type": {},
        "files_with_issues": []
    }
    
    if verbose:
        print("\n==== CSV File Diagnostic Tool ====")
    
    # Loop through each table in the schema
    for table_name, table_info in schema_meta["tables"].items():
        csv_path = Path(data_dir) / f"{table_name}.csv"
        
        # Skip if file doesn't exist
        if not csv_path.exists():
            if verbose:
                print(f"❌ File not found: {csv_path}")
            continue
        
        stats["files_checked"] += 1
        file_issues = []
        
        # Get expected column types from schema
        expected_types = {}
        for col_name, col_meta in table_info["columns"].items():
            raw_type = col_meta if isinstance(col_meta, str) else col_meta.get("type", "string")
            # Normalize type names
            if "int" in raw_type.lower():
                expected_types[col_name] = "int"
            elif any(t in raw_type.lower() for t in ["float", "decimal", "double", "numeric"]):
                expected_types[col_name] = "float"
            elif "date" in raw_type.lower():
                expected_types[col_name] = "date"
            else:
                expected_types[col_name] = "string"
        
        # Read the CSV file
        try:
            # First try with standard reading
            df = pd.read_csv(csv_path)
            
            # Check for file-level issues
            if len(df.columns) == 1 and df.columns[0].count(',') > 0:
                # Likely delimiter issue
                if verbose:
                    print(f"🔍 {table_name}.csv: Possible delimiter issue detected")
                # Try again with different parser
                df = pd.read_csv(csv_path, engine='python')
                file_issues.append("delimiter_issue")
            
            # Check for columns that should be numeric but have string values
            for col_name, expected_type in expected_types.items():
                if col_name not in df.columns:
                    continue
                    
                if expected_type in ["int", "float"]:
                    # Try converting to numeric to check for issues
                    try:
                        pd.to_numeric(df[col_name])
                    except Exception:
                        # Found non-numeric values
                        if verbose:
                            print(f"⚠️ {table_name}.csv: Column '{col_name}' should be {expected_type} but has non-numeric values")
                        
                        # Get problematic rows
                        problem_mask = pd.to_numeric(df[col_name], errors='coerce').isna() & ~df[col_name].isna()
                        problem_rows = df[problem_mask]
                        
                        if verbose and not problem_rows.empty:
                            print(f"   Found {len(problem_rows)} problem rows. Sample values:")
                            for idx, row in problem_rows.head(3).iterrows():
                                print(f"   - Row {idx}: '{row[col_name]}'")
                        
                        # Check for quoted values with commas
                        quote_comma_pattern = r'"[^"]*,[^"]*"'
                        if problem_rows.empty:
                            continue
                            
                        if any(problem_rows[col_name].astype(str).str.contains(quote_comma_pattern)):
                            if verbose:
                                print(f"   Issue likely caused by quoted values with commas")
                            file_issues.append(f"quoted_comma_in_{col_name}")
                            
                            # Fix: Extract just the numeric part if a pattern is detected
                            # This is a simplified fix - assumes numeric values are at start of string
                            numeric_extraction = df[col_name].astype(str).str.extract(r'(\d+)')
                            mask = ~numeric_extraction[0].isna()
                            if mask.sum() > 0:
                                df.loc[mask, col_name] = numeric_extraction[0]
                        
                        # Fix: Convert any remaining problematic values to NaN
                        df[col_name] = pd.to_numeric(df[col_name], errors='coerce')
                        file_issues.append(f"type_mismatch_in_{col_name}")
            
            # Check for quoting issues (unbalanced quotes)
            for col_name in df.columns:
                if df[col_name].dtype == 'object':
                    # Count quotes in each cell
                    quote_counts = df[col_name].astype(str).str.count('"')
                    if (quote_counts % 2 != 0).any():
                        if verbose:
                            print(f"⚠️ {table_name}.csv: Column '{col_name}' has unbalanced quotes")
                        file_issues.append(f"unbalanced_quotes_in_{col_name}")
                        
                        # Fix: Balance quotes by adding missing quote or removing extra
                        for idx, count in quote_counts[quote_counts % 2 != 0].items():
                            val = str(df.at[idx, col_name])
                            if count % 2 == 1:  # Odd number of quotes
                                df.at[idx, col_name] = val + '"' if val.count('"') % 2 == 1 else val[:-1]
            
            # If any issues were found, write the fixed file
            if file_issues:
                fixed_path = csv_path.with_suffix('.fixed.csv')
                df.to_csv(fixed_path, index=False, quoting=1)  # quoting=1 means quote all non-numeric
                
                if verbose:
                    print(f"✅ Fixed file written to: {fixed_path}")
                    print(f"   Issues fixed: {', '.join(file_issues)}")
                
                # Replace original with fixed version
                import shutil
                shutil.move(fixed_path, csv_path)
                
                stats["files_fixed"] += 1
                stats["total_issues"] += len(file_issues)
                stats["files_with_issues"].append(table_name)
                
                # Count by issue type
                for issue in file_issues:
                    stats["issues_by_type"][issue] = stats["issues_by_type"].get(issue, 0) + 1
        
        except Exception as e:
            if verbose:
                print(f"❌ Error processing {table_name}.csv: {str(e)}")
            stats["issues_by_type"]["process_error"] = stats["issues_by_type"].get("process_error", 0) + 1
    
    # Print summary
    if verbose:
        print("\n==== CSV Diagnostic Summary ====")
        print(f"Files checked: {stats['files_checked']}")
        print(f"Files with issues: {stats['files_fixed']}")
        print(f"Total issues fixed: {stats['total_issues']}")
        
        if stats["issues_by_type"]:
            print("\nIssues by type:")
            for issue_type, count in stats["issues_by_type"].items():
                print(f"  - {issue_type}: {count}")
        
        if stats["files_with_issues"]:
            print("\nTables fixed:")
            for table in stats["files_with_issues"]:
                print(f"  - {table}")
    
    return stats

def prepare_snowflake_loading(sess, schema_meta, data_dir):
    """
    Prepare for Snowflake loading by dropping existing tables and 
    creating a properly configured reader that handles CSV issues.
    
    Args:
        sess: Snowflake session
        schema_meta: Dictionary with schema metadata
        data_dir: Directory containing CSV files
    
    Returns:
        Configured reader for loading CSVs
    """
    # First drop existing tables to avoid conflicts
    print("\nDropping existing tables...")
    for tbl_name in schema_meta["tables"].keys():
        table_ident = f'"{tbl_name.upper()}"'
        try:
            sess.sql(f"DROP TABLE IF EXISTS {table_ident}").collect()
            print(f"  • Dropped {table_ident}")
        except Exception as e:
            print(f"  • Failed to drop {table_ident}: {str(e)}")
    
    # Create a robust reader with proper options
    reader = sess.read \
        .option("parse_header", True) \
        .option("field_delimiter", ",") \
        .option("quote", "\"") \
        .option("escape", "\"") \
        .option("encoding", "UTF-8") \
        .option("skip_blank_lines", True) \
        .option("empty_field_as_null", True)
    
    print("\nConfigured robust CSV reader with proper quoting and escaping")
    
    return reader

# Usage example:
# 1. First diagnose and fix CSV files
# diagnose_and_fix_csv_issues(DATA_DIR, schema_meta)
#
# 2. Then prepare for loading and get a robust reader
# reader = prepare_snowflake_loading(sess, schema_meta, DATA_DIR)
#
# 3. Use the reader in your loading code:
# for tbl_name, tbl_info in schema_meta["tables"].items():
#     ...
#     df_snow = reader.schema(schema).csv(f"@~/{tbl_name}.csv")
#     ...

In [11]:
# %%
# 5) Staging & Loading to Snowflake with CSV diagnostics
import os
from dotenv import load_dotenv
import snowflake.connector
from snowflake.snowpark import Session
from snowflake.snowpark.types import (
    StructType, StructField,
    IntegerType, FloatType,
    StringType, DateType,
)

# Load environment variables
load_dotenv()

# Configure Snowflake connection
SF_CONN = {
    "account":        os.getenv("SNOWFLAKE_ACCOUNT"),
    "user":           os.getenv("SNOWFLAKE_USER"),
    "password":       os.getenv("SNOWFLAKE_PASSWORD"),
    "role":           os.getenv("SNOWFLAKE_ROLE"),
    "warehouse":      os.getenv("SNOWFLAKE_WAREHOUSE"),
    "database":       os.getenv("SNOWFLAKE_DATABASE"),
    "schema":         os.getenv("SNOWFLAKE_SCHEMA"),
    "ocsp_fail_open": os.getenv("SNOWFLAKE_OCSP_FAIL_OPEN", "True").lower() == "true",
    "insecure_mode":  os.getenv("SNOWFLAKE_INSECURE_MODE",  "True").lower() == "true",
}

# Step 1: Diagnose and fix any CSV issues BEFORE staging
print("\n1. Diagnosing and fixing CSV issues...")
csv_stats = diagnose_and_fix_csv_issues(DATA_DIR, schema_meta)

# Step 2: ONLY AFTER verification, stage the fixed CSV files to Snowflake
print("\n2. Staging verified CSVs to Snowflake...")
conn = snowflake.connector.connect(**SF_CONN)
cur = conn.cursor()
for path in DATA_DIR.glob("*.csv"):
    uri = f"file:///{path.resolve().as_posix()}"
    print(f" • PUT {path.name}")
    cur.execute(f"PUT '{uri}' @~/ OVERWRITE=TRUE")
cur.close()
conn.close()

# Step 3: Prepare Snowflake environment
print("\n3. Preparing Snowflake environment...")
sess = Session.builder.configs(SF_CONN).create()
sess.use_warehouse(SF_CONN["warehouse"])
sess.use_database(SF_CONN["database"])
sess.use_schema(SF_CONN["schema"])

# Get a reader with FIXED options for the quote/escape conflict
# NOTE: Modified to avoid the ESCAPE parameter that's causing conflicts
print("\n3a. Creating Snowflake reader with compatible quote settings...")
reader = sess.read \
    .option("parse_header", True) \
    .option("field_delimiter", ",") \
    .option("field_optionally_enclosed_by", '"')  # Use this instead of quote/escape combination

# Drop existing tables if needed
print("\n3b. Dropping existing tables...")
for tbl_name in schema_meta["tables"].keys():
    table_ident = f'"{tbl_name.upper()}"'
    try:
        sess.sql(f"DROP TABLE IF EXISTS {table_ident}").collect()
        print(f"  • Dropped {table_ident}")
    except Exception as e:
        print(f"  • Failed to drop {table_ident}: {str(e)}")

# Helper function to convert schema types
def to_snow_type(tstr):
    t = str(tstr).upper()
    if t.startswith("INT"):    
        return IntegerType()
    if t.startswith(("FLOAT","DECIMAL","NUMERIC","DOUBLE")): 
        return FloatType()
    if t.startswith("DATE"):   
        return DateType()
    return StringType()

# Step 4: Load tables into Snowflake
print("\n4. Loading tables into Snowflake...")
loaded_tables = []
failed_tables = []

for tbl_name, tbl_info in schema_meta["tables"].items():
    # Build schema from metadata
    fields = []
    for col_name, col_meta in tbl_info.get("columns", {}).items():
        if isinstance(col_meta, dict):
            raw_t = col_meta.get("type", "string")
        else:
            raw_t = str(col_meta)
        fields.append(StructField(col_name, to_snow_type(raw_t)))
    
    schema = StructType(fields)
    table_ident = f'"{tbl_name.upper()}"'
    
    try:
        # Load the table with compatible reader options
        print(f" • {table_ident:30s}", end=" ")
        df_snow = reader.schema(schema).csv(f"@~/{tbl_name}.csv")
        df_snow.write.mode("overwrite").save_as_table(table_ident)
        print("✓")
        loaded_tables.append(tbl_name)
    except Exception as e:
        print("✗")
        print(f"   Error: {str(e)}")
        
        # Try with alternate options if the first attempt fails
        try:
            print(f"   Trying alternate loading approach...")
            # Create a different reader with simpler options
            alt_reader = sess.read \
                .option("parse_header", True) \
                .option("skip_blank_lines", True)
                
            df_snow = alt_reader.schema(schema).csv(f"@~/{tbl_name}.csv")
            print(f" • RETRY {table_ident:30s}", end=" ")
            df_snow.write.mode("overwrite").save_as_table(table_ident)
            print("✓")
            loaded_tables.append(tbl_name)
        except Exception as retry_error:
            print("✗")
            print(f"   Retry failed: {str(retry_error)}")
            failed_tables.append(tbl_name)

# Close the Snowflake session
sess.close()

# Print summary
print("\n===== Snowflake Load Summary =====")
print(f"Total tables processed: {len(loaded_tables) + len(failed_tables)}")
print(f"Successfully loaded: {len(loaded_tables)}")
if failed_tables:
    print(f"Failed to load: {len(failed_tables)}")
    print(" • " + "\n • ".join(failed_tables))
print("===================================")


1. Diagnosing and fixing CSV issues...

==== CSV File Diagnostic Tool ====

==== CSV Diagnostic Summary ====
Files checked: 69
Files with issues: 0
Total issues fixed: 0

2. Staging verified CSVs to Snowflake...
 • PUT accounts_payable.csv
 • PUT accounts_receivable.csv
 • PUT ad_spend.csv
 • PUT anomaly_detections.csv
 • PUT asset_management.csv
 • PUT attendance_records.csv
 • PUT bank_reconciliations.csv
 • PUT benefits_enrollment.csv
 • PUT billing_cycles.csv
 • PUT brand_partners.csv
 • PUT budgets.csv
 • PUT call_records.csv
 • PUT campaigns.csv
 • PUT carriers.csv
 • PUT categories.csv
 • PUT chat_logs.csv
 • PUT commissions.csv
 • PUT compensation_bands.csv
 • PUT cost_centers.csv
 • PUT credit_notes.csv
 • PUT currency_rates.csv
 • PUT customers.csv
 • PUT customer_segments.csv
 • PUT customer_segments_marketing.csv
 • PUT dashboard_views.csv
 • PUT data_quality_metrics.csv
 • PUT dates.csv
 • PUT distribution_centers.csv
 • PUT email_campaigns.csv
 • PUT employees.csv
 • PUT