In [0]:
# Databricks notebook source
# ============================================
# 00. Parameters
# ============================================

CATALOG = "skills_intelligence"
BRONZE_SCHEMA = "01_bronze"

BASE_VOLUME_PATH = "/Volumes/skills_intelligence/00_job_posting_landing_zone/financial_sector/data_or_bi_analyst"

INGEST_SPECS = [
    {
        "source_csv": f"{BASE_VOLUME_PATH}/abnamro20251225.csv",
        "target_table": f"{CATALOG}.{BRONZE_SCHEMA}.financial_sector_data_or_bi_analyst_abnamro",
        "source_system": "abnamro",
    },
    {
        "source_csv": f"{BASE_VOLUME_PATH}/ing20260106.csv",
        "target_table": f"{CATALOG}.{BRONZE_SCHEMA}.financial_sector_data_or_bi_analyst_ing",
        "source_system": "ing",
    },
    {
        "source_csv": f"{BASE_VOLUME_PATH}/devolksbank20251227.csv",
        "target_table": f"{CATALOG}.{BRONZE_SCHEMA}.financial_sector_data_or_bi_analyst_devolksbank",
        "source_system": "devolksbank",
    },
]

print("Ingest specs:")
for s in INGEST_SPECS:
    print("-", s["source_csv"], "->", s["target_table"])

In [0]:
# Databricks notebook source
# ============================================
# Ingest ALL dated CSVs per bank -> Bronze Delta tables
# (date-agnostic; schema-forced to avoid Delta merge errors)
# ============================================

from datetime import datetime, timezone
import re

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType

# ============================================
# 00. Parameters
# ============================================
CATALOG = "skills_intelligence"
BRONZE_SCHEMA = "01_bronze"

BASE_VOLUME_PATH = "/Volumes/skills_intelligence/00_job_posting_landing_zone/financial_sector/data_or_bi_analyst"

BANKS = ["abnamro", "ing", "devolksbank"]

TARGET_TABLES = {
    "abnamro": f"{CATALOG}.{BRONZE_SCHEMA}.financial_sector_data_or_bi_analyst_abnamro",
    "ing": f"{CATALOG}.{BRONZE_SCHEMA}.financial_sector_data_or_bi_analyst_ing",
    "devolksbank": f"{CATALOG}.{BRONZE_SCHEMA}.financial_sector_data_or_bi_analyst_devolksbank",
}

# Filenames like: ing20260106.csv
DATE_RE = re.compile(r"^(abnamro|ing|devolksbank)(\d{8})\.csv$", re.I)

# Expected raw columns from scrapers (force these as STRING to avoid type drift)
RAW_COLS = [
    "vacancy_id",
    "title",
    "location_guess",
    "url",
    "sections_text",
    "description_text",
    "date_taken_utc",
]

CSV_SCHEMA = StructType([StructField(c, StringType(), True) for c in RAW_COLS])

# Metadata columns we add
META_COLS = ["source_system", "snapshot_date", "source_file", "ingested_at_utc"]

# Final column order (raw + meta)
FINAL_COLS = RAW_COLS + META_COLS

print("Base volume path:", BASE_VOLUME_PATH)

# ============================================
# 01. Ensure catalog/schema exist
# ============================================
spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{BRONZE_SCHEMA}")

# ============================================
# 02. Discover all matching CSVs
# ============================================
files = dbutils.fs.ls(BASE_VOLUME_PATH)

bank_to_files = {b: [] for b in BANKS}
for f in files:
    name = f.name
    m = DATE_RE.match(name)
    if not m:
        continue
    bank = m.group(1).lower()
    yyyymmdd = m.group(2)
    bank_to_files[bank].append((yyyymmdd, f.path))

print("\nDiscovered files:")
for b in BANKS:
    items = sorted(bank_to_files[b])
    print(f"- {b}: {len(items)} file(s)")
    for d, p in items[:20]:
        print(f"    {d} -> {p}")
    if len(items) > 20:
        print("    ...")

# Build ingest specs for ALL snapshots (date-agnostic)
INGEST_SPECS = []
for bank in BANKS:
    for yyyymmdd, path in sorted(bank_to_files[bank]):
        INGEST_SPECS.append(
            {
                "source_csv": path,
                "target_table": TARGET_TABLES[bank],
                "source_system": bank,
                "snapshot_date": yyyymmdd,
            }
        )

print("\nTotal ingest specs:", len(INGEST_SPECS))
for s in INGEST_SPECS[:20]:
    print("-", s["source_csv"], "->", s["target_table"])
if len(INGEST_SPECS) > 20:
    print("...")

# ============================================
# 03. Optional: reset tables (DROP) if you want a clean rebuild
#     Set RESET_TABLES=True to drop & rebuild from CSVs (prevents legacy type conflicts)
# ============================================
RESET_TABLES = False  # <-- change to True if you previously created tables with wrong types

if RESET_TABLES:
    for bank in BANKS:
        tgt = TARGET_TABLES[bank]
        print(f"Dropping table (if exists): {tgt}")
        spark.sql(f"DROP TABLE IF EXISTS {tgt}")

# ============================================
# 04. Ingest loop (append) with forced schema
# ============================================
ingested_at_utc = datetime.now(timezone.utc).isoformat()

for spec in INGEST_SPECS:
    src = spec["source_csv"]
    tgt = spec["target_table"]
    bank = spec["source_system"]
    snap = spec["snapshot_date"]

    print(f"\nIngesting {bank} snapshot {snap} -> {tgt}")
    print(f"  source: {src}")

    # Read CSV with fixed schema (all strings) to prevent Delta type conflicts
    df_raw = (
        spark.read.format("csv")
        .schema(CSV_SCHEMA)               # ✅ forced schema (all STRING)
        .option("header", "true")
        .option("multiLine", "true")
        .option("escape", "\"")
        .option("quote", "\"")
        .option("mode", "PERMISSIVE")
        .load(src)
    )

    # Ensure all expected cols exist (if missing, add as null)
    for c in RAW_COLS:
        if c not in df_raw.columns:
            df_raw = df_raw.withColumn(c, F.lit(None).cast("string"))

    # Keep exact raw col order + cast safety
    df_raw = df_raw.select([F.col(c).cast("string").alias(c) for c in RAW_COLS])

    # Add metadata
    df = (
        df_raw
        .withColumn("source_system", F.lit(bank))
        .withColumn("snapshot_date", F.lit(snap))
        .withColumn("source_file", F.lit(src))
        .withColumn("ingested_at_utc", F.lit(ingested_at_utc))
        .select(*FINAL_COLS)  # enforce final col order
    )

    # Append into Delta table
    df.write.format("delta").mode("append").saveAsTable(tgt)

print("\n✅ Done ingesting ALL snapshots.")

# ============================================
# 05. Quick sanity checks
# ============================================
for bank in BANKS:
    tgt = TARGET_TABLES[bank]
    try:
        cnt = spark.table(tgt).count()
        print(f"- {tgt} rowcount = {cnt}")
    except Exception as e:
        print(f"- {tgt} not readable yet -> {e}")

In [0]:
# Databricks notebook source
# ============================================
# 01. Set catalog & schema, create schema if needed
# ============================================

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {BRONZE_SCHEMA}")
spark.sql(f"USE SCHEMA {BRONZE_SCHEMA}")

spark.sql("SELECT current_catalog(), current_schema()").show(truncate=False)

In [0]:
# Databricks notebook source
# ============================================
# 02. Helpers: file exists + read csv + add bronze metadata
# ============================================

from pyspark.sql import functions as F

def file_exists(path: str) -> bool:
    """Checks file existence for Volumes/DBFS paths by listing parent dir."""
    try:
        parent, fname = path.rsplit("/", 1)
        return any(x.name.rstrip("/") == fname for x in dbutils.fs.ls(parent))
    except Exception:
        return False

def read_csv_with_metadata(source_csv: str, source_system: str):
    df = (
        spark.read
            .format("csv")
            .option("header", "true")
            .option("inferSchema", "true")
            .option("multiLine", "true")
            .option("escape", "\"")
            .option("quote", "\"")
            .load(source_csv)
    )

    df = (
        df.withColumn("_ingest_ts", F.current_timestamp())
          .withColumn("_source_file", F.lit(source_csv))
          .withColumn("_source_filename", F.element_at(F.split(F.lit(source_csv), "/"), -1))
          .withColumn("_source_system", F.lit(source_system))
    )
    return df

In [0]:
# Databricks notebook source
# ============================================
# 03. Ingest loop: write each csv into its target Delta table
# ============================================

results = []

for spec in INGEST_SPECS:
    src = spec["source_csv"]
    tgt = spec["target_table"]
    sys = spec["source_system"]

    if not file_exists(src):
        results.append((src, tgt, "SKIPPED - file not found", 0))
        print(f"⚠️ SKIP (not found): {src}")
        continue

    df_bronze = read_csv_with_metadata(src, sys)
    row_count = df_bronze.count()

    (df_bronze.write
        .format("delta")
        .mode("append")                # change to "overwrite" if you want replace each run
        .option("mergeSchema", "true")  # schema evolution
        .saveAsTable(tgt)
    )

    results.append((src, tgt, "OK", row_count))
    print(f"✅ OK: {src} -> {tgt} (rows: {row_count})")

display(spark.createDataFrame(results, ["source_csv", "target_table", "status", "rows_written"]))

In [0]:
# Databricks notebook source
# ============================================
# 04. Quick validation: counts & latest ingests
# ============================================

for spec in INGEST_SPECS:
    tgt = spec["target_table"]
    try:
        spark.sql(f"SELECT '{tgt}' AS table_name, COUNT(*) AS cnt FROM {tgt}").show(truncate=False)
        spark.sql(f"""
            SELECT _source_filename, _source_system, _ingest_ts
            FROM {tgt}
            ORDER BY _ingest_ts DESC
            LIMIT 5
        """).show(truncate=False)
    except Exception as e:
        print(f"⚠️ Could not query {tgt}: {e}")