# 04a · Load – fact_lva_stats

**Zweck**  
Lädt die bereinigten LVA-Daten in die Data-Warehouse-Faktentabelle.

**Wichtigste Schritte**  
1. `tmp/lva_clean.parquet` einlesen  
2. FK-Lookups gegen alle `dim_…`-Tabellen  
3. Spalte `aggregation_level` sicherstellen  
4. `TRUNCATE fact_lva_stats` → anschließend Bulk-Insert  
5. FK-Verletzungen werden abgefangen (UNKNOWN = 0)

**Ergebnis**  
Tablle `fact_lva_stats` frisch befüllt.


In [2]:
# 04a_load_facts_lva.py
# ------------------------------------------------------------
# • liest die aufbereiteten LVA-Daten   (tmp/lva_clean.parquet)
# • sorgt dafür, dass die Spalte aggregation_level existiert
# • mappt alle Text-Spalten auf FK-IDs der Dim-Tabellen
# • leert fact_lva_stats und lädt die frischen Daten neu
# ------------------------------------------------------------

# ───────── Imports & Konstanten ──────────────────────────────
import pandas as pd
from pathlib import Path
from sqlalchemy import create_engine, text
from pymysql.err import OperationalError, ProgrammingError

TMP_FILE = Path("../../tmp/lva_clean.parquet")             # Pfad ggf. anpassen
ENG      = create_engine(
    "mysql+pymysql://root:voc_root@localhost:3306/vocdata",
    future=True, echo=False
)

# ─────────────────────────────────────────────────────────────
# 0) Schema-Pflege  – aggregation_level einmalig sicherstellen
# ─────────────────────────────────────────────────────────────
with ENG.begin() as con:
    col_exists = con.scalar(text("""
        SELECT COUNT(*) FROM information_schema.COLUMNS
        WHERE table_schema = DATABASE()
          AND table_name   = 'fact_lva_stats'
          AND column_name  = 'aggregation_level'
    """))
    if col_exists == 0:
        con.exec_driver_sql("""
            ALTER TABLE fact_lva_stats
            ADD COLUMN aggregation_level VARCHAR(12) NOT NULL DEFAULT ''
        """)
        print("Spalte aggregation_level angelegt.")
    else:
        print("Spalte aggregation_level war bereits vorhanden.")

# ─────────────────────────────────────────────────────────────
# 1) Parquet laden
# ─────────────────────────────────────────────────────────────
df = pd.read_parquet(TMP_FILE)
print("Geladene Zeilen aus Parquet:", len(df))
print("Spalten:", sorted(df.columns.tolist()))

# ─────────────────────────────────────────────────────────────
# 2) FK-Lookups aus den Dimensionstabellen in den RAM holen
# ─────────────────────────────────────────────────────────────
DIM_TABLES = [
    "abschlussniveau", "lernform", "geschlecht", "mig_status",
    "lva_anschlussart", "qv_status", "lva_zeitpunkt",
    "wiedereinst_dauer", "isced", "beruf"
]

lookups = {}
with ENG.begin() as con:
    for dim in DIM_TABLES:
        df_dim   = pd.read_sql(f"SELECT * FROM dim_{dim}", con)
        code_col = f"{dim}_code" if f"{dim}_code" in df_dim.columns else f"{dim}_bez"
        lookups[dim] = df_dim.set_index(code_col)[f"{dim}_id"].to_dict()

def fk(dim: str, val) -> int:
    """Liefert die Fremdschlüssel-ID oder 0 (=UNKNOWN)."""
    if val is None or str(val).strip() == "":
        return 0
    return lookups[dim].get(str(val).strip().upper(), 0)

# ─────────────────────────────────────────────────────────────
# 3) Mapping der DataFrame-Zeilen → FK-IDs + Kennzahlen
# ─────────────────────────────────────────────────────────────
mapped_rows = []
for _, r in df.iterrows():
    mapped_rows.append({
        "aggregation_level":        r["aggregation_level"],
        "kohorte_id":               r["kohorte_id"],              # kommt aus 02a
        "abschlussniveau_id":       fk("abschlussniveau",   r.get("abschlussniveau")),
        "lernform_id":              fk("lernform",          r.get("lernform")),
        "geschlecht_id":            fk("geschlecht",        r.get("geschlecht")),
        "mig_status_id":            fk("mig_status",        r.get("mig_status")),
        "anschlussart_id":          fk("lva_anschlussart",  r.get("lva_anschlussart")),
        "qv_status_id":             fk("qv_status",         r.get("qv_status")),
        "lva_zeitpunkt_id":         fk("lva_zeitpunkt",     r.get("lva_zeitpunkt")),
        "wiedereinst_dauer_id":     fk("wiedereinst_dauer", r.get("wiedereinst_dauer")),
        "isced_id":                 fk("isced",             r.get("ausbildungsfeld_isced_code")),
        "beruf_id":                 fk("beruf",             r.get("beruf_bez")),
        # Kennzahlen & Flags 1 :1 übernehmen
        "anzahl_lernende_wiedereinstieg": r.get("anzahl_lernende_wiedereinstieg"),
        "anzahl_lernende":                r.get("anzahl_lernende"),
        "anzahl_lehrvertraege_lva":       r.get("anzahl_lehrvertraege_lva"),
        "anzahl_lernende_lva":            r.get("anzahl_lernende_lva"),
        "is_lva":                         r.get("is_lva"),
        "is_wiedereinstieg":              r.get("is_wiedereinstieg"),
        "datenstatus":                    r.get("datenstatus")
    })

fact_df = pd.DataFrame(mapped_rows)
print("Nach Mapping:", len(fact_df), "Zeilen")

# ─────────────────────────────────────────────────────────────
# 4) Tabelle leeren & neu befüllen
# ─────────────────────────────────────────────────────────────
with ENG.begin() as con:
    con.exec_driver_sql("TRUNCATE TABLE fact_lva_stats;")
    fact_df.to_sql("fact_lva_stats", con,
                   if_exists="append", index=False, method="multi")
print("✔ fact_lva_stats neu geladen:", len(fact_df), "Zeilen")

# ─────────────────────────────────────────────────────────────
# 5) (Optional) Kurzer Qualitäts-Check
# ─────────────────────────────────────────────────────────────
with ENG.begin() as con:
    total = con.scalar(text("SELECT COUNT(*) FROM fact_lva_stats"))
    fk0   = con.scalar(text("""
        SELECT SUM(abschlussniveau_id = 0 OR lernform_id = 0 OR geschlecht_id = 0
                          OR mig_status_id = 0 OR anschlussart_id = 0
                          OR qv_status_id = 0 OR lva_zeitpunkt_id = 0
                          OR wiedereinst_dauer_id = 0 OR isced_id = 0 OR beruf_id = 0)
        FROM fact_lva_stats
    """))
print(f"Stichprobe: {total:,} Zeilen geladen – davon {fk0:,} Zeilen mit ≥1 UNKNOWN-FK.")


Spalte aggregation_level war bereits vorhanden.
Geladene Zeilen aus Parquet: 1081
Spalten: ['abschlussniveau', 'aggregation_level', 'anzahl_lehrvertraege_lva', 'anzahl_lernende', 'anzahl_lernende_lva', 'anzahl_lernende_wiedereinstieg', 'ausbildungsfeld_isced_bez', 'ausbildungsfeld_isced_code', 'beruf_bez', 'datenstatus', 'geschlecht', 'is_lva', 'is_wiedereinstieg', 'kohorte_id', 'lernform', 'lva_anschlussart', 'lva_zeitpunkt', 'mig_status', 'qv_status', 'wiedereinst_dauer']
Nach Mapping: 1081 Zeilen
✔ fact_lva_stats neu geladen: 1081 Zeilen
Stichprobe: 1,081 Zeilen geladen – davon 1,081 Zeilen mit ≥1 UNKNOWN-FK.


Hinweis: bei Fehlermeldung, notebook fk_updates vorher laufen lassen