# ETL Pipeline (Extract → Transform → Load)

This notebook walks through a full ETL workflow for loading Flowcase style data into PostgreSQL:

1. Generate synthetic CV reports  
2. Extract raw CSV files  
3. Transform data into a clean relational schema  
4. Load into a PostgreSQL database  
5. Verify the results  

Each step is designed to be clear, testable, and reproducible.


## Environment Setup

To get started:

1. Create a Python virtual environment  
2. Select the `.venv` kernel in Jupyter  
3. Install dependencies:  
    ```
    pip install -r requirements.txt
    ```
4. Make sure PostgreSQL is running (e.g., via pgAdmin)



## Step 0 — Generate Synthetic Data

Generate synthetic Flowcase-style CV Partner reports.  
These are stored under the `cv_reports/` folder using timestamped folder names.  


In [1]:
import make_fake_flowcase_reports as make_fake_flowcase_reports

make_fake_flowcase_reports.main()

✔ user_report.csv: 500 rows
✔ usage_report.csv: 500 rows
✔ project_experiences.csv: 1501 rows
✔ certifications.csv: 1006 rows
✔ courses.csv: 1446 rows
✔ languages.csv: 996 rows
✔ technologies.csv: 2220 rows
✔ key_qualifications.csv: 485 rows
✔ educations.csv: 757 rows
✔ work_experiences.csv: 1484 rows
✔ positions.csv: 1264 rows
✔ blogs.csv: 758 rows
✔ cv_roles.csv: 1001 rows
✔ sc_clearance.csv: 500 rows
✔ availability_report.csv: 30000 rows

All files written under: /Users/alanabarrett-frew/Desktop/Module  4/Assignment/ETL Pipeline/ETL_pipeline/early-experimentation/cv_reports/Q42025


## Run all Necessary Imports

In [2]:
from pathlib import Path
import pandas as pd
import re
from dataclasses import dataclass
import json
from datetime import date
import psycopg2
from sqlalchemy import create_engine, text
import os


In [3]:
def print_step(title):
    print(f"\n{'='*20} {title} {'='*20}")

# Step 1 — Extract


In [4]:
def find_latest_report_folder(base_folder="cv_reports"):
    print_step("Finding the latest report folder")

    report_folders = [f for f in Path(base_folder).iterdir() if f.is_dir()]
    if not report_folders:
        raise FileNotFoundError(f"No report folders found in {base_folder}")

    latest = sorted(report_folders, key=lambda f: f.name)[-1]

    print(f"Found {len(report_folders)} folders.")
    print(f"Latest folder: {latest.name}")

    return latest


def find_latest_quarterly_report_folder(base_folder="cv_reports"):
    print_step("Finding the latest quarterly report folder")

    pattern = re.compile(r"Q[1-4]\d{4}")
    report_folders = [
        f for f in Path(base_folder).iterdir()
        if f.is_dir() and pattern.match(f.name)
    ]
    if not report_folders:
        raise FileNotFoundError(f"No quarterly report folders found in {base_folder}.")

    names = sorted([f.name for f in report_folders])
    print("Quarterly folders found:", names)

    latest_folder = sorted(report_folders, key=lambda folder: folder.name)[-1]
    print("Using latest quarterly folder:", latest_folder.name)

    return latest_folder


def load_csv_files_from_folder(report_folder):
    print_step(f"Loading CSV files from {report_folder}")

    csv_files = list(Path(report_folder).glob("*.csv"))
    print(f"Found {len(csv_files)} CSV files.")
    dataframes = {}

    for csv_file in csv_files:
        try:
            df = pd.read_csv(csv_file)
            dataframes[csv_file.name] = df
            print(f"  Loaded {csv_file.name} -> {df.shape}")
        except Exception as e:
            print(f"  ⚠️ Failed to read {csv_file.name}: {e}")

    return dataframes


def extract(settings):
    """
    Finds the latest quarterly report folder and loads all CSVs as DataFrames.
    Returns an object with .data_dir and dict-like data (keyed by filename).
    """
    data_source = settings.get("data_source", "fake")

    if data_source == "real":
        print("[extract] Real data mode selected, but not implemented.")
        return type("ExtractResult", (), {"data_dir": None})()

    # Fake data path
    base_folder = settings.get("base_folder", "cv_reports")
    data_dir = find_latest_quarterly_report_folder(base_folder)
    data = load_csv_files_from_folder(data_dir)

    class ExtractResult(dict):
        pass

    result = ExtractResult(data)
    result.data_dir = data_dir
    return result




## Step 1.1 — Locate the latest report folder

Identify the most recent synthetic export under `cv_reports/`. 

In [5]:
print_step("Step 1.1: Locate latest report folder")

latest_folder = find_latest_report_folder("cv_reports")
print(f"✅ Using folder: {latest_folder}")

# Tiny test: does it actually exist?
assert latest_folder.exists(), "Latest folder path does not exist on disk!"





Found 1 folders.
Latest folder: Q42025
✅ Using folder: cv_reports/Q42025


## Step 1.2 — Load all CSV files from the latest report

Load all CSVs inside the selected folder into pandas DataFrames and perform a sanity check to ensure core files are present.


In [6]:
print_step("Step 1.2: Load CSV files from latest folder")

raw_frames = load_csv_files_from_folder(latest_folder)

print("\nSummary of loaded files:")
for name, df in raw_frames.items():
    print(f" - {name:30} {df.shape}")

# Check: do we have some expected core CSVs?
expected_files = [
    "user_report.csv",
    "project_experiences.csv",
    "work_experiences.csv",
]
missing = [f for f in expected_files if f not in raw_frames]

print("\nExpected core files:", expected_files)
print("Missing:", missing)

assert not missing, "One or more expected CSVs are missing!"




Found 15 CSV files.
  Loaded certifications.csv -> (1006, 22)
  Loaded project_experiences.csv -> (1501, 45)
  Loaded blogs.csv -> (758, 21)
  Loaded availability_report.csv -> (30000, 7)
  Loaded cv_roles.csv -> (1001, 19)
  Loaded work_experiences.csv -> (1484, 26)
  Loaded educations.csv -> (757, 27)
  Loaded user_report.csv -> (500, 26)
  Loaded courses.csv -> (1446, 26)
  Loaded key_qualifications.csv -> (485, 21)
  Loaded positions.csv -> (1264, 23)
  Loaded technologies.csv -> (2220, 20)
  Loaded sc_clearance.csv -> (500, 9)
  Loaded languages.csv -> (996, 22)
  Loaded usage_report.csv -> (500, 51)

Summary of loaded files:
 - certifications.csv             (1006, 22)
 - project_experiences.csv        (1501, 45)
 - blogs.csv                      (758, 21)
 - availability_report.csv        (30000, 7)
 - cv_roles.csv                   (1001, 19)
 - work_experiences.csv           (1484, 26)
 - educations.csv                 (757, 27)
 - user_report.csv                (500, 26)
 -

# Step 2 — Transform


## Step 2.1 — Transform helpers

These helpers normalise multilang fields, dates, and define the `TransformResult`
dataclass for returning a clean set of tables.


In [7]:
@dataclass
class TransformResult:
    users_df: pd.DataFrame | None = None
    cvs_df: pd.DataFrame | None = None
    technologies_df: pd.DataFrame | None = None
    languages_df: pd.DataFrame | None = None
    project_experiences_df: pd.DataFrame | None = None
    work_experiences_df: pd.DataFrame | None = None
    certifications_df: pd.DataFrame | None = None
    courses_df: pd.DataFrame | None = None
    educations_df: pd.DataFrame | None = None
    positions_df: pd.DataFrame | None = None
    blogs_df: pd.DataFrame | None = None
    cv_roles_df: pd.DataFrame | None = None
    key_qualifications_df: pd.DataFrame | None = None
    sc_clearance_df: pd.DataFrame | None = None
    availability_df: pd.DataFrame | None = None

def parse_multilang(pipe: object) -> dict:
    """
    Convert a single pipe string like 'int:Text|no:Tekst' into a dict.
    Anything non-string or blank -> {}.
    """
    if not isinstance(pipe, str) or not pipe.strip():
        return {}
    out = {}
    for part in pipe.split("|"):
        if ":" in part:
            k, v = part.split(":", 1)
            k, v = k.strip(), v.strip()
            if k and v:
                out[k] = v
    return out

def to_iso_date(s: object) -> str | None:
    if s is None or (isinstance(s, float) and pd.isna(s)):
        return None
    s = str(s).strip()
    if not s:
        return None
    # If it looks like ISO (yyyy-mm-dd), parse straight without dayfirst
    if "-" in s and len(s.split("-")[0]) == 4:
        dt = pd.to_datetime(s, errors="coerce")
    else:
        dt = pd.to_datetime(s, dayfirst=True, errors="coerce")
    return None if pd.isna(dt) else dt.date().isoformat()


## Step 2.2 — Core transform logic

The `transform()` function builds clean tables for Users, CVs, skills, 
experiences, and other CV Partner sections.


In [8]:
def transform(data) -> TransformResult:
    # Core extracts
    users = data.get("user_report.csv", pd.DataFrame()).copy()
    usage = data.get("usage_report.csv", pd.DataFrame()).copy()

    # Parse user name as dict
    if not users.empty and "Name (multilang)" in users.columns:
        users["Name (multilang)"] = users["Name (multilang)"].map(parse_multilang)
    else:
        users["Name (multilang)"] = [{}] * len(users)

    # Nationality comes from usage_report: "Nationality (#{lang})" is a single pipe string
    if not usage.empty and "Nationality (#{lang})" in usage.columns:
        nat_map = {
            str(r["CV Partner User ID"]): parse_multilang(r["Nationality (#{lang})"])
            for _, r in usage.iterrows()
            if "CV Partner User ID" in r and pd.notna(r["CV Partner User ID"])
        }
        users["nationality_multilang"] = users["CV Partner User ID"].map(
            lambda uid: nat_map.get(str(uid), {})
        )
    else:
        users["nationality_multilang"] = [{}] * len(users)

    # Build CV rows: user_report already has one row per CV
    cvs = users.copy()
    if "Title (#{lang})" in users.columns:
        cvs["title_multilang"] = users["Title (#{lang})"].map(parse_multilang)
    else:
        cvs["title_multilang"] = [{}] * len(cvs)

    # Carry seniority columns from user_report -> cvs_df
    def _num(x):
        try:
            return int(x)
        except Exception:
            return None

    cvs["sfia_level"] = users.get("SFIA Level", pd.Series([None]*len(users))).map(_num)
    cvs["cpd_level"]  = users.get("CPD Level",  pd.Series([None]*len(users))).map(_num)
    cvs["cpd_band"]   = users.get("CPD Band",   pd.Series([None]*len(users))).astype("string").where(lambda s: s.notna(), None)
    cvs["cpd_label"]  = users.get("CPD Label",  pd.Series([None]*len(users))).astype("string").where(lambda s: s.notna(), None)


    # Pass through + light cleanup
    sc_clearance = data.get("sc_clearance.csv", pd.DataFrame()).copy()
    if not sc_clearance.empty:
        for col in ("Valid From", "Valid To"):
            if col in sc_clearance.columns:
                sc_clearance[col] = sc_clearance[col].map(to_iso_date)

    availability = data.get("availability_report.csv", pd.DataFrame()).copy()
    if not availability.empty and "Date" in availability.columns:
        availability["Date"] = availability["Date"].map(to_iso_date)

    return TransformResult(
        users_df=users if not users.empty else pd.DataFrame(),
        cvs_df=cvs if not cvs.empty else pd.DataFrame(),
        technologies_df=data.get("technologies.csv"),
        languages_df=data.get("languages.csv"),
        project_experiences_df=data.get("project_experiences.csv"),
        work_experiences_df=data.get("work_experiences.csv"),
        certifications_df=data.get("certifications.csv"),
        courses_df=data.get("courses.csv"),
        educations_df=data.get("educations.csv"),
        positions_df=data.get("positions.csv"),
        blogs_df=data.get("blogs.csv"),
        cv_roles_df=data.get("cv_roles.csv"),
        key_qualifications_df=data.get("key_qualifications.csv"),
        sc_clearance_df=sc_clearance if not sc_clearance.empty else pd.DataFrame(),
        availability_df=availability if not availability.empty else pd.DataFrame(),
    )

## Step 2.3 — Run transform and perform tests

Execute `transform(raw_data)` and validate:

- Row counts  
- Key identifiers  
- Data integrity (e.g., CV count aligns with user count)  


In [9]:
print_step("Step 2.3: Reload CSVs for transform demo")

raw_data = load_csv_files_from_folder(latest_folder)

print_step("Step 2.3a: Run transform() on extracted data")

# Use the raw data dict from load_csv_files_from_folder(...)
tr = transform(raw_data)

print("\nTransformed tables (row counts):")
for name in [
    "users_df", "cvs_df", "technologies_df", "languages_df",
    "project_experiences_df", "work_experiences_df",
]:
    df = getattr(tr, name)
    if df is not None:
        print(f" - {name:25} {len(df):5d} rows")

from IPython.display import display

print("\nSample: users_df")
display(tr.users_df.head())

print("\nSample: cvs_df")
display(tr.cvs_df.head())

print_step("Step 2.3b: Basic data quality checks on transform output")

users_df = tr.users_df
cvs_df = tr.cvs_df

# 1) Ensure we have users
assert not users_df.empty, "users_df is unexpectedly empty after transform!"

# 2) Key identifier should exist & not be all null
assert "CV Partner User ID" in users_df.columns, "Missing CV Partner User ID column in users_df"
assert users_df["CV Partner User ID"].notna().any(), "All user IDs are null!"

# 3) Same number of rows in users_df and cvs_df
assert len(users_df) == len(cvs_df), "users_df and cvs_df row counts differ!"

print("✅ Transform checks passed for users_df and cvs_df.")





Found 15 CSV files.
  Loaded certifications.csv -> (1006, 22)
  Loaded project_experiences.csv -> (1501, 45)
  Loaded blogs.csv -> (758, 21)
  Loaded availability_report.csv -> (30000, 7)
  Loaded cv_roles.csv -> (1001, 19)
  Loaded work_experiences.csv -> (1484, 26)
  Loaded educations.csv -> (757, 27)
  Loaded user_report.csv -> (500, 26)
  Loaded courses.csv -> (1446, 26)
  Loaded key_qualifications.csv -> (485, 21)
  Loaded positions.csv -> (1264, 23)
  Loaded technologies.csv -> (2220, 20)
  Loaded sc_clearance.csv -> (500, 9)
  Loaded languages.csv -> (996, 22)
  Loaded usage_report.csv -> (500, 51)


Transformed tables (row counts):
 - users_df                    500 rows
 - cvs_df                      500 rows
 - technologies_df            2220 rows
 - languages_df                996 rows
 - project_experiences_df     1501 rows
 - work_experiences_df        1484 rows

Sample: users_df


Unnamed: 0,Name,Name (multilang),Title (#{lang}),Email,UPN,External User ID,CV Partner User ID,CV Partner CV ID,Phone Number,Landline,...,Years since first work experience,Access roles,Has profile image,Owns a reference project,Read and understood privacy notice,SFIA Level,CPD Level,CPD Band,CPD Label,nationality_multilang
0,Danielle Johnson,{'int': 'Danielle Johnson'},int:Principal C# Developer,danielle.johnson@mail.test,daniellejohnson,ext_5b69cd14,5b69cd14,cv_5b69cd14,958-350-6431,+1-539-500-5329x31839,...,5,User,True,False,True,5,3,L,CPD3L,{'int': 'Norwegian'}
1,Joshua Walker,{'int': 'Joshua Walker'},int:Principal Data Engineer,joshua.walker@mail.test,joshuawalker,ext_439b63ae,439b63ae,cv_439b63ae,729-504-2284x21020,001-350-324-0268,...,12,User,False,False,False,5,3,L,CPD3L,{'int': 'Swedish'}
2,Jill Rhodes,{'int': 'Jill Rhodes'},int:Senior C# Developer,jill.rhodes@example.org,jillrhodes,ext_bc24ae58,bc24ae58,cv_bc24ae58,975.289.1783x9084,7764617711,...,9,User,True,False,True,4,3,E,CPD3E,{'int': 'British'}
3,Patricia Miller,{'int': 'Patricia Miller'},int:Principal Analytics Engineer,patricia.miller@mail.test,patriciamiller,ext_711200c6,711200c6,cv_711200c6,624-999-8569,896.311.8367x36576,...,18,User,False,False,True,5,3,L,CPD3L,{'int': 'Polish'}
4,Robert Johnson,{'int': 'Robert Johnson'},int:Associate ML Engineer,robert.johnson@mail.test,robertjohnson,ext_a7986c12,a7986c12,cv_a7986c12,465-245-2711x11615,001-688-651-6560,...,10,Country Manager,True,True,False,2,1,E,CPD1E,{'int': 'Danish'}



Sample: cvs_df


Unnamed: 0,Name,Name (multilang),Title (#{lang}),Email,UPN,External User ID,CV Partner User ID,CV Partner CV ID,Phone Number,Landline,...,SFIA Level,CPD Level,CPD Band,CPD Label,nationality_multilang,title_multilang,sfia_level,cpd_level,cpd_band,cpd_label
0,Danielle Johnson,{'int': 'Danielle Johnson'},int:Principal C# Developer,danielle.johnson@mail.test,daniellejohnson,ext_5b69cd14,5b69cd14,cv_5b69cd14,958-350-6431,+1-539-500-5329x31839,...,5,3,L,CPD3L,{'int': 'Norwegian'},{'int': 'Principal C# Developer'},5,3,L,CPD3L
1,Joshua Walker,{'int': 'Joshua Walker'},int:Principal Data Engineer,joshua.walker@mail.test,joshuawalker,ext_439b63ae,439b63ae,cv_439b63ae,729-504-2284x21020,001-350-324-0268,...,5,3,L,CPD3L,{'int': 'Swedish'},{'int': 'Principal Data Engineer'},5,3,L,CPD3L
2,Jill Rhodes,{'int': 'Jill Rhodes'},int:Senior C# Developer,jill.rhodes@example.org,jillrhodes,ext_bc24ae58,bc24ae58,cv_bc24ae58,975.289.1783x9084,7764617711,...,4,3,E,CPD3E,{'int': 'British'},{'int': 'Senior C# Developer'},4,3,E,CPD3E
3,Patricia Miller,{'int': 'Patricia Miller'},int:Principal Analytics Engineer,patricia.miller@mail.test,patriciamiller,ext_711200c6,711200c6,cv_711200c6,624-999-8569,896.311.8367x36576,...,5,3,L,CPD3L,{'int': 'Polish'},{'int': 'Principal Analytics Engineer'},5,3,L,CPD3L
4,Robert Johnson,{'int': 'Robert Johnson'},int:Associate ML Engineer,robert.johnson@mail.test,robertjohnson,ext_a7986c12,a7986c12,cv_a7986c12,465-245-2711x11615,001-688-651-6560,...,2,1,E,CPD1E,{'int': 'Danish'},{'int': 'Associate ML Engineer'},2,1,E,CPD1E



✅ Transform checks passed for users_df and cvs_df.


# Step 3 — Load


## Step 3.1 — Load helpers

These utility functions handle boolean parsing, date conversion, 
foreign key lookups, and normalisation.


In [10]:
def _to_bool(v):
    if v is None or (isinstance(v, float) and pd.isna(v)):
        return None
    if isinstance(v, bool):
        return v
    s = str(v).strip().lower()
    return s in ("true", "1", "t", "yes", "y")

def _clean_str(v, default=""):
    # Safely turn any value (including NaN/float) into a stripped string (or default)
    if v is None or (isinstance(v, float) and pd.isna(v)):
        return default
    s = str(v).strip()
    return s if s else default

def _resolve_user_id(conn, email=None, upn=None, external_id=None):
    if email:
        uid = conn.execute(text("SELECT user_id FROM users WHERE lower(email)=lower(:e)"), {"e": email}).scalar()
        if uid: return uid
    if upn:
        uid = conn.execute(text("SELECT user_id FROM users WHERE lower(upn)=lower(:u)"), {"u": upn}).scalar()
        if uid: return uid
    if external_id:
        uid = conn.execute(text("SELECT user_id FROM users WHERE external_user_id=:x"), {"x": external_id}).scalar()
        if uid: return uid
    return None

def _to_date(v, default=None):
    # Accept strings like "2024-07-01", "01/07/2024", or excel-ish values
    if v is None or (isinstance(v, float) and pd.isna(v)) or str(v).strip() == "":
        return default
    dt = pd.to_datetime(str(v).strip(), dayfirst=True, errors="coerce")
    return None if pd.isna(dt) else dt.date()
        
def _cv_id(conn, cv_partner_cv_id: str):
    return conn.execute(
        text("SELECT cv_id FROM cvs WHERE cv_partner_cv_id=:cid"),
        {"cid": str(cv_partner_cv_id)}
    ).scalar()

def _ensure_dim(conn, table: str, name: str, key: str = "name", id_col: str = None):
    if not name:
        return None
    if id_col is None:
        id_col = (table[4:] + "_id") if table.startswith("dim_") else (table.rstrip("s") + "_id")
    conn.execute(text(f"INSERT INTO {table} ({key}) VALUES (:n) ON CONFLICT ({key}) DO NOTHING"), {"n": name})
    return conn.execute(text(f"SELECT {id_col} FROM {table} WHERE {key}=:n"), {"n": name}).scalar()


## Step 3.2 — Core entity upserts (Users and CVs)

These upsert functions populate the `users` and `cvs` tables.


In [11]:

def upsert_users(conn, df):
    print(f"Upserting {len(df)} users.")
    sql = text("""
        INSERT INTO users
          (cv_partner_user_id, name_multilang, email, upn, external_user_id,
           phone_number, landline, birth_year, department, country,
           user_created_at, nationality_multilang)
        VALUES
          (:cv_partner_user_id, CAST(:name_multilang AS JSONB), :email, :upn, :external_user_id,
           :phone_number, :landline, :birth_year, :department, :country,
           :user_created_at, CAST(:nationality_multilang AS JSONB))
        ON CONFLICT (cv_partner_user_id) DO UPDATE
        SET name_multilang = EXCLUDED.name_multilang,
            email = EXCLUDED.email,
            upn = EXCLUDED.upn,
            external_user_id = EXCLUDED.external_user_id,
            phone_number = EXCLUDED.phone_number,
            landline = EXCLUDED.landline,
            birth_year = EXCLUDED.birth_year,
            department = EXCLUDED.department,
            country = EXCLUDED.country,
            user_created_at = EXCLUDED.user_created_at,
            nationality_multilang = EXCLUDED.nationality_multilang
    """)
    for _, r in df.iterrows():
        conn.execute(sql, {
            "cv_partner_user_id": str(r["CV Partner User ID"]),
            "name_multilang": json.dumps(r["Name (multilang)"]),  # dict -> JSON
            "email": r.get("Email"),
            "upn": r.get("UPN"),
            "external_user_id": r.get("External User ID"),
            "phone_number": r.get("Phone Number"),
            "landline": r.get("Landline"),
            "birth_year": int(r["Birth Year"]) if pd.notna(r.get("Birth Year")) else None,
            "department": r.get("Department"),
            "country": r.get("Country"),
            "user_created_at": r.get("User created at"),
            "nationality_multilang": json.dumps(r.get("nationality_multilang", {})),
        })

def upsert_cvs(conn, df):
    print(f"Upserting {len(df)} CVs...")
    sql = text("""
        INSERT INTO cvs
          (cv_partner_cv_id, user_id, title_multilang, years_of_education,
           years_since_first_work_experience, has_profile_image,
           owns_reference_project, read_privacy_notice,
           cv_last_updated_by_owner, cv_last_updated,
           sfia_level, cpd_level, cpd_band, cpd_label)
        VALUES
          (:cv_partner_cv_id, :user_id, CAST(:title_multilang AS JSONB), :yoe, :ysfwe,
           :has_img, :owns_ref, :read_priv, :lu_owner, :lu,
           :sfia_level, :cpd_level, :cpd_band, :cpd_label)
        ON CONFLICT (cv_partner_cv_id) DO UPDATE
        SET title_multilang = EXCLUDED.title_multilang,
            years_of_education = EXCLUDED.years_of_education,
            years_since_first_work_experience = EXCLUDED.years_since_first_work_experience,
            has_profile_image = EXCLUDED.has_profile_image,
            owns_reference_project = EXCLUDED.owns_reference_project,
            read_privacy_notice = EXCLUDED.read_privacy_notice,
            cv_last_updated_by_owner = EXCLUDED.cv_last_updated_by_owner,
            cv_last_updated = EXCLUDED.cv_last_updated,
            sfia_level = EXCLUDED.sfia_level,
            cpd_level  = EXCLUDED.cpd_level,
            cpd_band   = EXCLUDED.cpd_band,
            cpd_label  = EXCLUDED.cpd_label
    """)
    for _, r in df.iterrows():
        uid = conn.execute(
            text("SELECT user_id FROM users WHERE cv_partner_user_id=:uid"),
            {"uid": str(r["CV Partner User ID"])}
        ).scalar()
        if uid is None:
            print(f"  ⚠️ Skipping CV {r['CV Partner CV ID']} (unknown user {r['CV Partner User ID']})")
            continue

        conn.execute(sql, {
            "cv_partner_cv_id": str(r["CV Partner CV ID"]),
            "user_id": uid,
            "title_multilang": json.dumps(r["title_multilang"]),
            "yoe": int(r["Years of education"]) if pd.notna(r["Years of education"]) else None,
            "ysfwe": int(r["Years since first work experience"]) if pd.notna(r["Years since first work experience"]) else None,
            "has_img": _to_bool(r["Has profile image"]),
            "owns_ref": _to_bool(r["Owns a reference project"]),
            "read_priv": _to_bool(r["Read and understood privacy notice"]),
            "lu_owner": r["CV Last updated by owner"],
            "lu": r["CV Last updated"],
            "sfia_level": r.get("sfia_level"),
            "cpd_level":  r.get("cpd_level"),
            "cpd_band":   (None if pd.isna(r.get("cpd_band"))  else str(r.get("cpd_band"))),
            "cpd_label":  (None if pd.isna(r.get("cpd_label")) else str(r.get("cpd_label"))),
        })

## Step 3.3 — Skills and languages

Upserts for technology skills, languages, and related dimension tables.


In [12]:
def upsert_technologies(conn, df):
    print(f"Upserting {len(df)} technologies...")
    for _, r in df.iterrows():
        tech_name = r["Skill name"]
        conn.execute(text("""
            INSERT INTO dim_technology (name)
            VALUES (:name)
            ON CONFLICT (name) DO NOTHING
        """), {"name": tech_name})

        tech_id = conn.execute(
            text("SELECT technology_id FROM dim_technology WHERE name=:n"),
            {"n": tech_name}
        ).scalar()
        if tech_id is None:
            print(f"  ⚠️ Skipping tech link; cannot resolve technology '{tech_name}'")
            continue

        cv_id = conn.execute(
            text("SELECT cv_id FROM cvs WHERE cv_partner_cv_id=:cid"),
            {"cid": str(r["CV Partner CV ID"])}
        ).scalar()
        if cv_id is None:
            print(f"  ⚠️ Skipping tech link; unknown CV {r['CV Partner CV ID']}")
            continue

        conn.execute(text("""
            INSERT INTO cv_technology (cv_id, technology_id, years_experience, proficiency, is_official_masterdata)
            VALUES (:cv, :tech, :yexp, :prof, CAST(:is_md AS JSONB))
            ON CONFLICT (cv_id, technology_id) DO UPDATE
            SET years_experience = EXCLUDED.years_experience,
                proficiency = EXCLUDED.proficiency,
                is_official_masterdata = EXCLUDED.is_official_masterdata
        """), {
            "cv": cv_id,
            "tech": tech_id,
            "yexp": int(r["Year experience"]) if pd.notna(r["Year experience"]) else None,
            "prof": int(r["Proficiency (0-5)"]) if pd.notna(r["Proficiency (0-5)"]) else None,
            "is_md": json.dumps(r["Is official masterdata (in #{lang})"])  # dict -> json
        })

def upsert_languages(conn, df):
    if df is None or df.empty:
        return
    print(f"Upserting {len(df)} languages...")
    sql = text("""
        INSERT INTO cv_language
          (cv_id, language_id, level, highlighted, is_official_masterdata, updated, updated_by_owner)
        VALUES
          (:cv_id, :lang_id, :level, :highlighted, CAST(:is_md AS JSONB), :updated, :updated_by_owner)
        ON CONFLICT (cv_id, language_id) DO UPDATE
        SET level = EXCLUDED.level,
            highlighted = EXCLUDED.highlighted,
            is_official_masterdata = EXCLUDED.is_official_masterdata,
            updated = EXCLUDED.updated,
            updated_by_owner = EXCLUDED.updated_by_owner
    """)
    for _, r in df.iterrows():
        cv_id = _cv_id(conn, r["CV Partner CV ID"])
        if not cv_id:
            continue
        lang_id = _ensure_dim(conn, "dim_language", r.get("Language"))
        conn.execute(sql, {
            "cv_id": cv_id,
            "lang_id": lang_id,
            "level": r.get("Level"),
            "highlighted": _to_bool(r.get("Highlighted")),
            "is_md": json.dumps(r.get("Is official masterdata (in #{lang})", {})),
            "updated": r.get("Updated"),
            "updated_by_owner": r.get("Updated by owner"),
        })

## Step 3.4 — Project experience, work experience, certifications, courses, education, and positions


In [13]:

def upsert_project_experiences(conn, df):
    if df is None or df.empty:
        return
    print(f"Upserting {len(df)} project experiences...")
    sql = text("""
      INSERT INTO project_experience
        (cv_id, cv_partner_section_id, external_unique_id,
         month_from, year_from, month_to, year_to,
         customer_int, customer_multilang,
         customer_anon_int, customer_anon_multilang,
         description_int, description_multilang,
         long_description_int, long_description_multilang,
         industry_id, project_type_id,
         percent_allocated, extent_individual_hours, extent_hours, extent_total_hours,
         extent_unit, extent_currency, extent_total, extent_total_currency,
         project_area, project_area_unit,
         highlighted, updated, updated_by_owner)
      VALUES
        (:cv_id, :sid, :ext_id,
         :m_from, :y_from, :m_to, :y_to,
         :cust_int, CAST(:cust_ml AS JSONB),
         :cust_anon_int, CAST(:cust_anon_ml AS JSONB),
         :desc_int, CAST(:desc_ml AS JSONB),
         :ldesc_int, CAST(:ldesc_ml AS JSONB),
         :industry_id, :project_type_id,
         :pct_alloc, :indiv_hours, :hours, :total_hours,
         :extent_unit, :extent_curr, :extent_total, :extent_total_curr,
         :proj_area, :proj_area_unit,
         :highlighted, :updated, :updated_by_owner)
      ON CONFLICT (cv_id, cv_partner_section_id) DO UPDATE
      SET external_unique_id = EXCLUDED.external_unique_id,
          month_from = EXCLUDED.month_from, year_from = EXCLUDED.year_from,
          month_to = EXCLUDED.month_to, year_to = EXCLUDED.year_to,
          customer_int = EXCLUDED.customer_int, customer_multilang = EXCLUDED.customer_multilang,
          customer_anon_int = EXCLUDED.customer_anon_int, customer_anon_multilang = EXCLUDED.customer_anon_multilang,
          description_int = EXCLUDED.description_int, description_multilang = EXCLUDED.description_multilang,
          long_description_int = EXCLUDED.long_description_int, long_description_multilang = EXCLUDED.long_description_multilang,
          industry_id = EXCLUDED.industry_id, project_type_id = EXCLUDED.project_type_id,
          percent_allocated = EXCLUDED.percent_allocated,
          extent_individual_hours = EXCLUDED.extent_individual_hours,
          extent_hours = EXCLUDED.extent_hours,
          extent_total_hours = EXCLUDED.extent_total_hours,
          extent_unit = EXCLUDED.extent_unit,
          extent_currency = EXCLUDED.extent_currency,
          extent_total = EXCLUDED.extent_total,
          extent_total_currency = EXCLUDED.extent_total_currency,
          project_area = EXCLUDED.project_area, project_area_unit = EXCLUDED.project_area_unit,
          highlighted = EXCLUDED.highlighted,
          updated = EXCLUDED.updated, updated_by_owner = EXCLUDED.updated_by_owner
    """)
    for _, r in df.iterrows():
        cv_id = _cv_id(conn, r["CV Partner CV ID"])
        if not cv_id:
            continue
        industry_id = _ensure_dim(conn, "dim_industry", r.get("Industry (int)"))
        projtype_id = _ensure_dim(conn, "dim_project_type", r.get("Project type (int)"))
        conn.execute(sql, {
            "cv_id": cv_id,
            "sid": r.get("CV Partner section ID"),
            "ext_id": r.get("External unique ID"),
            "m_from": r.get("Month from"),
            "y_from": r.get("Year from"),
            "m_to": r.get("Month to"),
            "y_to": r.get("Year to"),
            "cust_int": r.get("Customer (int)"),
            "cust_ml": json.dumps(r.get("Customer (#{lang})", {})),
            "cust_anon_int": r.get("Customer Anonymized (int)"),
            "cust_anon_ml": json.dumps(r.get("Customer Anonymized (#{lang})", {})),
            "desc_int": r.get("Description (int)"),
            "desc_ml": json.dumps(r.get("Description (#{lang})", {})),
            "ldesc_int": r.get("Long description (int)"),
            "ldesc_ml": json.dumps(r.get("Long description (#{lang})", {})),
            "industry_id": industry_id,
            "project_type_id": projtype_id,
            "pct_alloc": r.get("Percent allocated"),
            "indiv_hours": r.get("Project extent (individual hours)"),
            "hours": r.get("Project extent (hours)"),
            "total_hours": r.get("Project extent total (hours)"),
            "extent_unit": r.get("Project extent"),
            "extent_curr": r.get("Project extent (currency)"),
            "extent_total": r.get("Project extent total"),
            "extent_total_curr": r.get("Project extent total (currency)"),
            "proj_area": r.get("Project area"),
            "proj_area_unit": r.get("Project area (unit)"),
            "highlighted": _to_bool(r.get("Highlighted")),
            "updated": r.get("Updated"),
            "updated_by_owner": r.get("Updated by owner"),
        })

def upsert_work_experiences(conn, df):
    if df is None or df.empty:
        return
    print(f"Upserting {len(df)} work experiences...")
    sql = text("""
      INSERT INTO work_experience
        (cv_id, cv_partner_section_id, external_unique_id,
         month_from, year_from, month_to, year_to,
         highlighted, employer, description, long_description,
         updated, updated_by_owner)
      VALUES
        (:cv_id, :sid, :ext_id,
         :m_from, :y_from, :m_to, :y_to,
         :highlighted, :employer, :desc, :ldesc,
         :updated, :updated_by_owner)
      ON CONFLICT (cv_id, cv_partner_section_id) DO UPDATE
      SET external_unique_id = EXCLUDED.external_unique_id,
          month_from = EXCLUDED.month_from, year_from = EXCLUDED.year_from,
          month_to = EXCLUDED.month_to, year_to = EXCLUDED.year_to,
          highlighted = EXCLUDED.highlighted,
          employer = EXCLUDED.employer,
          description = EXCLUDED.description,
          long_description = EXCLUDED.long_description,
          updated = EXCLUDED.updated, updated_by_owner = EXCLUDED.updated_by_owner
    """)
    for _, r in df.iterrows():
        cv_id = _cv_id(conn, r["CV Partner CV ID"])
        if not cv_id:
            continue
        conn.execute(sql, {
            "cv_id": cv_id,
            "sid": r.get("CV Partner section ID"),
            "ext_id": r.get("External unique ID"),
            "m_from": r.get("Month from"),
            "y_from": r.get("Year from"),
            "m_to": r.get("Month to"),
            "y_to": r.get("Year to"),
            "highlighted": _to_bool(r.get("Highlighted")),
            "employer": r.get("Employer"),
            "desc": r.get("Description"),
            "ldesc": r.get("Long Description"),
            "updated": r.get("Updated"),
            "updated_by_owner": r.get("Updated by owner"),
        })

def upsert_certifications(conn, df):
    if df is None or df.empty:
        return
    print(f"Upserting {len(df)} certifications...")
    sql = text("""
      INSERT INTO certification
        (cv_id, cv_partner_section_id, external_unique_id,
         month, year, month_expire, year_expire,
         updated, updated_by_owner)
      VALUES
        (:cv_id, :sid, :ext_id, :m, :y, :mexp, :yexp, :updated, :updated_by_owner)
      ON CONFLICT (cv_id, cv_partner_section_id) DO UPDATE
      SET external_unique_id = EXCLUDED.external_unique_id,
          month = EXCLUDED.month, year = EXCLUDED.year,
          month_expire = EXCLUDED.month_expire, year_expire = EXCLUDED.year_expire,
          updated = EXCLUDED.updated, updated_by_owner = EXCLUDED.updated_by_owner
    """)
    for _, r in df.iterrows():
        cv_id = _cv_id(conn, r["CV Partner CV ID"])
        if not cv_id:
            continue
        conn.execute(sql, {
            "cv_id": cv_id,
            "sid": r.get("CV Partner section ID"),
            "ext_id": r.get("External unique ID"),
            "m": r.get("Month"),
            "y": r.get("Year"),
            "mexp": r.get("Month expire"),
            "yexp": r.get("Year expire"),
            "updated": r.get("Updated"),
            "updated_by_owner": r.get("Updated by owner"),
        })

def upsert_courses(conn, df):
    if df is None or df.empty:
        return
    print(f"Upserting {len(df)} courses...")
    sql = text("""
      INSERT INTO course
        (cv_id, cv_partner_section_id, external_unique_id,
         month, year, name, organiser, long_description, highlighted,
         is_official_masterdata, attachments, updated, updated_by_owner)
      VALUES
        (:cv_id, :sid, :ext_id, :m, :y, :name, :org, :ldesc, :hl,
         CAST(:is_md AS JSONB), :att, :updated, :updated_by_owner)
      ON CONFLICT (cv_id, cv_partner_section_id) DO UPDATE
      SET external_unique_id = EXCLUDED.external_unique_id,
          month = EXCLUDED.month, year = EXCLUDED.year,
          name = EXCLUDED.name, organiser = EXCLUDED.organiser,
          long_description = EXCLUDED.long_description,
          highlighted = EXCLUDED.highlighted,
          is_official_masterdata = EXCLUDED.is_official_masterdata,
          attachments = EXCLUDED.attachments,
          updated = EXCLUDED.updated, updated_by_owner = EXCLUDED.updated_by_owner
    """)
    for _, r in df.iterrows():
        cv_id = _cv_id(conn, r["CV Partner CV ID"])
        if not cv_id:
            continue
        conn.execute(sql, {
            "cv_id": cv_id,
            "sid": r.get("CV Partner section ID"),
            "ext_id": r.get("External unique ID"),
            "m": r.get("Month"),
            "y": r.get("Year"),
            "name": r.get("Name"),
            "org": r.get("Organiser"),
            "ldesc": r.get("Long description"),
            "hl": _to_bool(r.get("Highlighted")),
            "is_md": json.dumps(r.get("Is official masterdata (in #{lang})", {})),
            "att": r.get("Attachments"),
            "updated": r.get("Updated"),
            "updated_by_owner": r.get("Updated by owner"),
        })

def upsert_educations(conn, df):
    if df is None or df.empty:
        return
    print(f"Upserting {len(df)} educations...")
    sql = text("""
      INSERT INTO education
        (cv_id, cv_partner_section_id, external_unique_id,
         month_from, year_from, month_to, year_to,
         highlighted, attachments, place_of_study, degree, description,
         updated, updated_by_owner)
      VALUES
        (:cv_id, :sid, :ext_id,
         :m_from, :y_from, :m_to, :y_to,
         :hl, :att, :place, :deg, :desc,
         :updated, :updated_by_owner)
      ON CONFLICT (cv_id, cv_partner_section_id) DO UPDATE
      SET external_unique_id = EXCLUDED.external_unique_id,
          month_from = EXCLUDED.month_from, year_from = EXCLUDED.year_from,
          month_to = EXCLUDED.month_to, year_to = EXCLUDED.year_to,
          highlighted = EXCLUDED.highlighted,
          attachments = EXCLUDED.attachments,
          place_of_study = EXCLUDED.place_of_study,
          degree = EXCLUDED.degree,
          description = EXCLUDED.description,
          updated = EXCLUDED.updated, updated_by_owner = EXCLUDED.updated_by_owner
    """)
    for _, r in df.iterrows():
        cv_id = _cv_id(conn, r["CV Partner CV ID"])
        if not cv_id:
            continue
        conn.execute(sql, {
            "cv_id": cv_id,
            "sid": r.get("CV Partner section ID"),
            "ext_id": r.get("External unique ID"),
            "m_from": r.get("Month from"),
            "y_from": r.get("Year from"),
            "m_to": r.get("Month to"),
            "y_to": r.get("Year to"),
            "hl": _to_bool(r.get("Highlighted")),
            "att": r.get("Attachments"),
            "place": r.get("Place of study"),
            "deg": r.get("Degree"),
            "desc": r.get("Description"),
            "updated": r.get("Updated"),
            "updated_by_owner": r.get("Updated by owner"),
        })

def upsert_positions(conn, df):
    if df is None or df.empty:
        return
    print(f"Upserting {len(df)} positions...")
    sql = text("""
      INSERT INTO position
        (cv_id, cv_partner_section_id, external_unique_id,
         year_from, year_to, highlighted, name, description,
         updated, updated_by_owner)
      VALUES
        (:cv_id, :sid, :ext_id, :y_from, :y_to, :hl, :name, :desc, :updated, :updated_by_owner)
      ON CONFLICT (cv_id, cv_partner_section_id) DO UPDATE
      SET external_unique_id = EXCLUDED.external_unique_id,
          year_from = EXCLUDED.year_from, year_to = EXCLUDED.year_to,
          highlighted = EXCLUDED.highlighted,
          name = EXCLUDED.name, description = EXCLUDED.description,
          updated = EXCLUDED.updated, updated_by_owner = EXCLUDED.updated_by_owner
    """)
    for _, r in df.iterrows():
        cv_id = _cv_id(conn, r["CV Partner CV ID"])
        if not cv_id:
            continue
        conn.execute(sql, {
            "cv_id": cv_id,
            "sid": r.get("CV Partner section ID"),
            "ext_id": r.get("External unique ID"),
            "y_from": r.get("Year from"),
            "y_to": r.get("Year to"),
            "hl": _to_bool(r.get("Highlighted")),
            "name": r.get("Name"),
            "desc": r.get("Description"),
            "updated": r.get("Updated"),
            "updated_by_owner": r.get("Updated by owner"),
        })

def upsert_blogs(conn, df):
    if df is None or df.empty:
        return
    print(f"Upserting {len(df)} blogs/publications...")
    sql = text("""
      INSERT INTO blog_publication
        (cv_id, cv_partner_section_id, external_unique_id,
         name, description, highlighted, updated, updated_by_owner)
      VALUES
        (:cv_id, :sid, :ext_id, :name, :desc, :hl, :updated, :updated_by_owner)
      ON CONFLICT (cv_id, cv_partner_section_id) DO UPDATE
      SET external_unique_id = EXCLUDED.external_unique_id,
          name = EXCLUDED.name, description = EXCLUDED.description,
          highlighted = EXCLUDED.highlighted,
          updated = EXCLUDED.updated, updated_by_owner = EXCLUDED.updated_by_owner
    """)
    for _, r in df.iterrows():
        cv_id = _cv_id(conn, r["CV Partner CV ID"])
        if not cv_id:
            continue
        conn.execute(sql, {
            "cv_id": cv_id,
            "sid": r.get("CV Partner section ID"),
            "ext_id": r.get("External unique ID"),
            "name": r.get("Name"),
            "desc": r.get("Description"),
            "hl": _to_bool(r.get("Highlighted")),
            "updated": r.get("Updated"),
            "updated_by_owner": r.get("Updated by owner"),
        })

def upsert_cv_roles(conn, df):
    if df is None or df.empty:
        return
    print(f"Upserting {len(df)} cv roles...")
    sql = text("""
      INSERT INTO cv_role
        (cv_id, name, description, highlighted, updated, updated_by_owner)
      VALUES
        (:cv_id, :name, :desc, :hl, :updated, :updated_by_owner)
      ON CONFLICT (cv_id, name) DO UPDATE
      SET description = EXCLUDED.description,
          highlighted = EXCLUDED.highlighted,
          updated = EXCLUDED.updated, updated_by_owner = EXCLUDED.updated_by_owner
    """)
    for _, r in df.iterrows():
        cv_id = _cv_id(conn, r["CV Partner CV ID"])
        if not cv_id:
            continue
        conn.execute(sql, {
            "cv_id": cv_id,
            "name": r.get("Name"),
            "desc": r.get("Description"),
            "hl": _to_bool(r.get("Highlighted")),
            "updated": r.get("Updated"),
            "updated_by_owner": r.get("Updated by owner"),
        })

def upsert_key_qualifications(conn, df):
    if df is None or df.empty:
        return
    print(f"Upserting {len(df)} key qualifications...")
    sql = text("""
      INSERT INTO key_qualification
        (cv_id, cv_partner_section_id, external_unique_id,
         label, summary, short_description, updated, updated_by_owner)
      VALUES
        (:cv_id, :sid, :ext_id, :label, :summary, :short_desc, :updated, :updated_by_owner)
      ON CONFLICT (cv_id, cv_partner_section_id) DO UPDATE
      SET external_unique_id = EXCLUDED.external_unique_id,
          label = EXCLUDED.label, summary = EXCLUDED.summary,
          short_description = EXCLUDED.short_description,
          updated = EXCLUDED.updated, updated_by_owner = EXCLUDED.updated_by_owner
    """)
    for _, r in df.iterrows():
        cv_id = _cv_id(conn, r["CV Partner CV ID"])
        if not cv_id:
            continue
        conn.execute(sql, {
            "cv_id": cv_id,
            "sid": r.get("CV Partner section ID"),
            "ext_id": r.get("External unique ID"),
            "label": r.get("Label"),
            "summary": r.get("Summary of Qualifications"),
            "short_desc": r.get("Short description"),
            "updated": r.get("Updated"),
            "updated_by_owner": r.get("Updated by owner"),
        })

## Step 3.5 — Security clearance and availability


In [14]:
def upsert_sc_clearance(conn, df: pd.DataFrame):
    if df is None or df.empty:
        return
    for _, r in df.iterrows():
        uid = _resolve_user_id(conn, r.get("Email"), r.get("UPN"), r.get("External User ID"))
        if not uid:
            continue

        clr = _clean_str(r.get("Clearance"), "None") or "None"
        conn.execute(text("INSERT INTO dim_clearance(name) VALUES (:n) ON CONFLICT(name) DO NOTHING"),
                     {"n": clr})
        clr_id = conn.execute(text("SELECT clearance_id FROM dim_clearance WHERE name=:n"),
                              {"n": clr}).scalar()

        # Default valid_from if missing so we never violate NOT NULL
        vf = _to_date(r.get("Valid From"), default=date(1900, 1, 1))
        vt = _to_date(r.get("Valid To"))
        vb = _clean_str(r.get("Verified By"), None) or None
        no = _clean_str(r.get("Notes"), None) or None

        # If both present and vt < vf (bad data), drop vt
        if vt and vf and vt < vf:
            vt = None

        conn.execute(text("""
            INSERT INTO user_clearance(user_id, clearance_id, valid_from, valid_to, verified_by, notes)
            VALUES (:u, :c, :vf, :vt, :vb, :no)
            ON CONFLICT (user_id, clearance_id, valid_from) DO UPDATE
            SET valid_to    = EXCLUDED.valid_to,
                verified_by = EXCLUDED.verified_by,
                notes       = EXCLUDED.notes
        """), {"u": uid, "c": clr_id, "vf": vf, "vt": vt, "vb": vb, "no": no})



def upsert_availability(conn, df: pd.DataFrame):
    if df is None or df.empty:
        return
    sql = text("""
        INSERT INTO user_availability(user_id, date, percent_available, source)
        VALUES (:u, :d, :p, :s)
        ON CONFLICT (user_id, date) DO UPDATE
        SET percent_available = EXCLUDED.percent_available,
            source            = EXCLUDED.source,
            updated_at        = NOW()
    """)
    for _, r in df.iterrows():
        uid = _resolve_user_id(conn, r.get("Email"), r.get("UPN"), r.get("External User ID"))
        if not uid:
            continue
        # percent can come as float/NaN — clamp to [0,100]
        raw = r.get("Percent Available")
        p = 0 if (raw is None or (isinstance(raw, float) and pd.isna(raw))) else int(float(raw))
        p = max(0, min(100, p))
        conn.execute(sql, {
            "u": uid,
            "d": _clean_str(r.get("Date"), None) or None,
            "p": p,
            "s": _clean_str(r.get("Source"), "Fake generator"),
        })

## Step 3.6 — Load orchestrator

The `load()` function runs all upserts in the correct sequence inside a 
single database transaction.


In [15]:
def load(clean_data, engine):
    """
    Loads each cleaned DataFrame into the database using upsert logic.
    """
    with engine.begin() as conn:
        if getattr(clean_data, 'users_df', None) is not None:
            upsert_users(conn, clean_data.users_df)
        if getattr(clean_data, 'cvs_df', None) is not None:
            upsert_cvs(conn, clean_data.cvs_df)
        if getattr(clean_data, 'technologies_df', None) is not None:
            upsert_technologies(conn, clean_data.technologies_df)
        if getattr(clean_data, 'languages_df', None) is not None:
            upsert_languages(conn, clean_data.languages_df)
        if getattr(clean_data, 'project_experiences_df', None) is not None:
            upsert_project_experiences(conn, clean_data.project_experiences_df)
        if getattr(clean_data, 'work_experiences_df', None) is not None:
            upsert_work_experiences(conn, clean_data.work_experiences_df)
        if getattr(clean_data, 'certifications_df', None) is not None:
            upsert_certifications(conn, clean_data.certifications_df)
        if getattr(clean_data, 'courses_df', None) is not None:
            upsert_courses(conn, clean_data.courses_df)
        if getattr(clean_data, 'educations_df', None) is not None:
            upsert_educations(conn, clean_data.educations_df)
        if getattr(clean_data, 'positions_df', None) is not None:
            upsert_positions(conn, clean_data.positions_df)
        if getattr(clean_data, 'blogs_df', None) is not None:
            upsert_blogs(conn, clean_data.blogs_df)
        if getattr(clean_data, 'cv_roles_df', None) is not None:
            upsert_cv_roles(conn, clean_data.cv_roles_df)
        if getattr(clean_data, 'key_qualifications_df', None) is not None:
            upsert_key_qualifications(conn, clean_data.key_qualifications_df)
        if getattr(clean_data, 'sc_clearance_df', None) is not None:
            upsert_sc_clearance(conn, clean_data.sc_clearance_df)
        if getattr(clean_data, 'availability_df', None) is not None:
            upsert_availability(conn, clean_data.availability_df)
    print("✅ Load complete.")



# Step 4 — Database Setup

These steps configure PostgreSQL connection settings, create the database if 
missing, and apply schema files from `sql/*.sql`.


In [16]:
def read_db_config_txt(path="db_config.txt"):
    db = {}
    try:
        with open(path, "r", encoding="utf-8") as f:
            for line in f:
                if "=" in line:
                    k, v = line.strip().split("=", 1)
                    db[k.strip()] = v.strip()
    except Exception:
        pass
    return db

def compose_settings():
    # precedence: ENV > db_config.txt > defaults
    defaults = dict(host="localhost", port=5432, database="flowcase_demo",
                    user="postgres", password="postgres")
    file_cfg = read_db_config_txt()

    env_cfg = dict(
        host=os.getenv("PGHOST"),
        port=os.getenv("PGPORT"),
        database=os.getenv("PGDATABASE"),
        user=os.getenv("PGUSER"),
        password=os.getenv("PGPASSWORD"),
    )
    # drop Nones
    env_cfg = {k:v for k,v in env_cfg.items() if v is not None}
    # coerce port
    if "port" in env_cfg:
        try: env_cfg["port"] = int(env_cfg["port"])
        except: env_cfg.pop("port", None)

    db = {**defaults, **file_cfg, **env_cfg}

    settings = {
        "data_source": "fake",
        "base_folder": "cv_reports",
        "db": db,
        # let utils apply ALL sql/*.sql automatically
        "schema": {"apply_all_sql_in_sql_folder": True, "folder": "sql"}
    }
    return settings

def ensure_database_exists(db):
    try:
        conn = psycopg2.connect(dbname="postgres",
                                user=db["user"], password=db["password"],
                                host=db["host"], port=db["port"])
        conn.autocommit = True
        cur = conn.cursor()
        cur.execute("SELECT 1 FROM pg_database WHERE datname = %s", (db["database"],))
        if not cur.fetchone():
            print(f"Database '{db['database']}' does not exist. Creating...")
            cur.execute(f"CREATE DATABASE {db['database']};")
        cur.close(); conn.close()
    except Exception as e:
        print(f"Warning: Could not check/create database: {e}")

In [17]:
def create_database_engine(db_settings: dict):
    url = (
        "postgresql+psycopg2://"
        f"{db_settings['user']}:{db_settings['password']}"
        f"@{db_settings['host']}:{db_settings['port']}/{db_settings['database']}"
    )
    return create_engine(url)


In [18]:


def setup_database_schema_if_needed(engine, settings: dict):
    schema_cfg = settings.get("schema", {})
    if not schema_cfg.get("apply_all_sql_in_sql_folder"):
        return

    sql_folder = Path(schema_cfg.get("folder", "sql"))
    if not sql_folder.exists():
        print(f"Schema folder {sql_folder} does not exist. Skipping schema setup.")
        return

    with engine.begin() as conn:
        for path in sorted(sql_folder.glob("*.sql")):
            print(f"Applying schema from {path.name}...")
            conn.execute(text(path.read_text()))


# Step 5 — Run Full ETL Pipeline

This executes:

1. Extract  
2. Transform  
3. Load  

And confirms everything worked end-to-end.


In [19]:
settings = compose_settings()
db_settings = settings["db"]

ensure_database_exists(db_settings)
engine = create_database_engine(db_settings)

setup_database_schema_if_needed(engine, settings)

ex = extract(settings)
print(f"Using data folder: {getattr(ex, 'data_dir', 'unknown')}")
tr = transform(ex)

load(tr, engine)
print("✅ Flowcase ETL (manual fake) complete.")



Applying schema from 01_schema.sql...
Applying schema from 02_cv_search_profile_mv.sql...

Quarterly folders found: ['Q42025']
Using latest quarterly folder: Q42025

Found 15 CSV files.
  Loaded certifications.csv -> (1006, 22)
  Loaded project_experiences.csv -> (1501, 45)
  Loaded blogs.csv -> (758, 21)
  Loaded availability_report.csv -> (30000, 7)
  Loaded cv_roles.csv -> (1001, 19)
  Loaded work_experiences.csv -> (1484, 26)
  Loaded educations.csv -> (757, 27)
  Loaded user_report.csv -> (500, 26)
  Loaded courses.csv -> (1446, 26)
  Loaded key_qualifications.csv -> (485, 21)
  Loaded positions.csv -> (1264, 23)
  Loaded technologies.csv -> (2220, 20)
  Loaded sc_clearance.csv -> (500, 9)
  Loaded languages.csv -> (996, 22)
  Loaded usage_report.csv -> (500, 51)
Using data folder: cv_reports/Q42025
Upserting 500 users.
Upserting 500 CVs...
Upserting 2220 technologies...
Upserting 996 languages...
Upserting 1501 project experiences...
Upserting 1484 work experiences...
Upserting 1

  dt = pd.to_datetime(str(v).strip(), dayfirst=True, errors="coerce")


✅ Load complete.
✅ Flowcase ETL (manual fake) complete.


## Step 5.1 — Basic database verification

After running the full ETL, we check that key tables contain data as expected.

In [20]:

with engine.connect() as conn:
    users_count = conn.execute(text("SELECT COUNT(*) FROM users")).scalar()
    cvs_count = conn.execute(text("SELECT COUNT(*) FROM cvs")).scalar()
    tech_links = conn.execute(text("SELECT COUNT(*) FROM cv_technology")).scalar()

print(f"Users in DB: {users_count}")
print(f"CVs in DB: {cvs_count}")
print(f"CV–technology links in DB: {tech_links}")

assert users_count > 0, "No users loaded!"
assert cvs_count > 0, "No CVs loaded!"
print("✅ Basic load checks passed.")


Users in DB: 500
CVs in DB: 500
CV–technology links in DB: 2220
✅ Basic load checks passed.


# Step 6 - materialised search view

In [21]:
print_step("Step 6 — Refresh materialised search view")

from sqlalchemy import text

with engine.begin() as conn:
    conn.execute(text("REFRESH MATERIALIZED VIEW cv_search_profile_mv;"))

with engine.connect() as conn:
    mv_sample = pd.read_sql(
        "SELECT * FROM cv_search_profile_mv ORDER BY user_id LIMIT 10;",
        conn,
    )

print("\nSample rows from cv_search_profile_mv:")
display(mv_sample)




Sample rows from cv_search_profile_mv:


Unnamed: 0,user_id,cv_partner_user_id,user_name,cv_id,cv_title,sfia_level,cpd_label,technologies,max_years_experience,clearance,latest_availability_date,latest_percent_available
0,1,5b69cd14,Danielle Johnson,1,Principal C# Developer,5,CPD3L,".NET, Azure, Kubernetes, Python, dbt",15,,2026-01-26,23
1,2,439b63ae,Joshua Walker,2,Principal Data Engineer,5,CPD3L,"C#, Kafka, Node.js, Snowflake, dbt",15,,2026-01-26,74
2,3,bc24ae58,Jill Rhodes,3,Senior C# Developer,4,CPD3E,".NET, Node.js, React, Snowflake, TypeScript, dbt",9,,2026-01-26,64
3,4,711200c6,Patricia Miller,4,Principal Analytics Engineer,5,CPD3L,"Airflow, C#, JavaScript, Kafka, Spark, Terraform",15,SC,2026-01-26,66
4,5,a7986c12,Robert Johnson,5,Associate ML Engineer,2,CPD1E,"Airflow, Node.js, Python, Spark",15,SC,2026-01-26,91
5,6,aea2bd5b,Jeffery Wagner,6,Consultant Data Platform Engineer,3,CPD2L,"Airflow, JavaScript, Power BI, dbt",15,SC,2026-01-26,0
6,7,f3e19f78,Anthony Gonzalez,7,Principal Azure Engineer,5,CPD3L,"C#, Databricks, SQL, Snowflake, Terraform",15,,2026-01-26,37
7,8,2ced75ad,Debra Gardner,8,Senior AI Engineer,4,CPD3E,"Airflow, JavaScript, Oracle, Power BI, SQL, Te...",12,,2026-01-26,20
8,9,4ba9d1b6,Jeffrey Lawrence,9,Associate Backend Engineer,2,CPD1E,"AWS, Azure, C#, Oracle, Power BI, TypeScript",15,,2026-01-26,21
9,10,6e32ee35,Lisa Smith,10,Associate Data Scientist,2,CPD1E,".NET, Airflow, Azure, React, Snowflake",15,,2026-01-26,57


## Step 6.1 - Example queries against cv_search_profile_mv

In [22]:
print_step("Example queries against cv_search_profile_mv")

with engine.connect() as conn:
    sc_available = pd.read_sql(
        """
        SELECT *
        FROM cv_search_profile_mv
        WHERE
            (clearance = 'SC' OR clearance IS NULL) AND
            sfia_level >= 4 AND
            latest_percent_available >= 50
        ORDER BY latest_percent_available DESC, sfia_level DESC
        LIMIT 20;
        """,
        conn,
    )

print("\nAvailable SC (or above) candidates:")
display(sc_available)




Available SC (or above) candidates:


Unnamed: 0,user_id,cv_partner_user_id,user_name,cv_id,cv_title,sfia_level,cpd_label,technologies,max_years_experience,clearance,latest_availability_date,latest_percent_available
0,126,df5d3f82,Leslie Walton,126,Senior Data Engineer,4,CPD3E,"C#, JavaScript, React",11,SC,2026-01-26,93
1,293,f8f4ef54,Nicole Sanchez,293,Senior DevOps Engineer,4,CPD3E,"Docker, Node.js, Oracle, React, Terraform",12,SC,2026-01-26,92
2,462,79b08fa6,Courtney Berger,462,Senior Python Developer,4,CPD3E,"Azure, Kafka, Power BI, Terraform, dbt",15,SC,2026-01-26,91
3,62,930ca7b5,Kimberly Moreno,62,Senior Data Architect,4,CPD3E,"JavaScript, Kafka, Kubernetes, Oracle, Spark, ...",14,SC,2026-01-26,89
4,294,3f9be908,Steven Bishop,294,Principal Data Engineer,5,CPD3L,"Databricks, Kafka, dbt",9,SC,2026-01-26,84
5,173,718baf55,Mark Lin,173,Principal AWS Engineer,5,CPD3L,".NET, Airflow, Python, SQL",14,SC,2026-01-26,83
6,487,5712f033,Madison Blankenship,487,Senior Azure Engineer,4,CPD3E,"Kubernetes, Node.js, Snowflake, Terraform",11,SC,2026-01-26,83
7,499,3e9c87fe,Mitchell Ramos,499,Lead Cloud Architect,6,CPD4E,"Airflow, Docker, SQL, Terraform, TypeScript",14,SC,2026-01-26,81
8,14,80cdaa13,Christopher Davis,14,Principal Solution Architect,5,CPD3L,"Databricks, Kafka, Python, Snowflake, dbt",15,SC,2026-01-26,81
9,198,d93e4efd,Aaron Mitchell,198,Lead Data Platform Engineer,6,CPD4E,"C#, Power BI, Python, TypeScript",12,SC,2026-01-26,80


# Step 7 — Explore Validations

## 7.1 Pick a random user + CV 

In [23]:
print_step("Step 7.1 — Pick one random user + CV")

import numpy as np
from IPython.display import display

with engine.connect() as conn:
    person_df = pd.read_sql(
        """
        SELECT 
            u.user_id,
            u.cv_partner_user_id,
            (u.name_multilang->>'int') AS name,
            u.email,
            u.department,
            u.country,
            c.cv_id,
            c.cv_partner_cv_id,
            (c.title_multilang->>'int') AS cv_title,
            c.sfia_level,
            c.cpd_level,
            c.cpd_band,
            c.cpd_label
        FROM users u
        JOIN cvs c ON c.user_id = u.user_id
        ORDER BY random()
        LIMIT 1;
        """,
        conn,
    )

display(person_df)

user_id = int(person_df.loc[0, "user_id"])
cv_id = int(person_df.loc[0, "cv_id"])

print(f"\nChosen user_id={user_id}, cv_id={cv_id}")





Unnamed: 0,user_id,cv_partner_user_id,name,email,department,country,cv_id,cv_partner_cv_id,cv_title,sfia_level,cpd_level,cpd_band,cpd_label
0,300,86d64648,Alejandro Vaughan,alejandro.vaughan@mail.test,Cloud Engineering,Ireland,300,cv_86d64648,Senior Azure Engineer,4,3,E,CPD3E



Chosen user_id=300, cv_id=300


## 7.2 Pull all related sections for that CV

In [24]:
print_step("Step 7.2 — Load all sections for this CV")

with engine.connect() as conn:
    techs = pd.read_sql(
        """
        SELECT 
          dt.name AS technology,
          ct.years_experience,
          ct.proficiency
        FROM cv_technology ct
        JOIN dim_technology dt ON dt.technology_id = ct.technology_id
        WHERE ct.cv_id = %(cv_id)s
        ORDER BY dt.name;
        """,
        conn,
        params={"cv_id": cv_id},
    )

    langs = pd.read_sql(
        """
        SELECT 
          dl.name AS language,
          cl.level,
          cl.highlighted
        FROM cv_language cl
        JOIN dim_language dl ON dl.language_id = cl.language_id
        WHERE cl.cv_id = %(cv_id)s
        ORDER BY cl.highlighted DESC, dl.name;
        """,
        conn,
        params={"cv_id": cv_id},
    )

    projects = pd.read_sql(
        """
        SELECT 
          pe.year_from, pe.year_to,
          (pe.description_multilang->>'int') AS description,
          di.name AS industry,
          dpt.name AS project_type,
          pe.percent_allocated
        FROM project_experience pe
        LEFT JOIN dim_industry di ON di.industry_id = pe.industry_id
        LEFT JOIN dim_project_type dpt ON dpt.project_type_id = pe.project_type_id
        WHERE pe.cv_id = %(cv_id)s
        ORDER BY pe.year_from DESC, pe.month_from DESC;
        """,
        conn,
        params={"cv_id": cv_id},
    )

    work = pd.read_sql(
        """
        SELECT 
          year_from, year_to,
          employer,
          description
        FROM work_experience
        WHERE cv_id = %(cv_id)s
        ORDER BY year_from DESC, month_from DESC;
        """,
        conn,
        params={"cv_id": cv_id},
    )

    edu = pd.read_sql(
        """
        SELECT 
          year_from, year_to,
          place_of_study,
          degree,
          description
        FROM education
        WHERE cv_id = %(cv_id)s
        ORDER BY year_from DESC;
        """,
        conn,
        params={"cv_id": cv_id},
    )

    courses = pd.read_sql(
        """
        SELECT 
          year,
          name,
          organiser,
          highlighted
        FROM course
        WHERE cv_id = %(cv_id)s
        ORDER BY year DESC;
        """,
        conn,
        params={"cv_id": cv_id},
    )

    certs = pd.read_sql(
        """
        SELECT 
          year,
          month,
          month_expire,
          year_expire
        FROM certification
        WHERE cv_id = %(cv_id)s
        ORDER BY year DESC, month DESC;
        """,
        conn,
        params={"cv_id": cv_id},
    )

    positions = pd.read_sql(
        """
        SELECT 
          year_from, year_to,
          name,
          description,
          highlighted
        FROM position
        WHERE cv_id = %(cv_id)s
        ORDER BY year_from DESC;
        """,
        conn,
        params={"cv_id": cv_id},
    )

    blogs = pd.read_sql(
        """
        SELECT 
          name,
          description,
          highlighted
        FROM blog_publication
        WHERE cv_id = %(cv_id)s
        ORDER BY highlighted DESC, updated DESC NULLS LAST;
        """,
        conn,
        params={"cv_id": cv_id},
    )

    roles = pd.read_sql(
        """
        SELECT 
          name,
          description,
          highlighted
        FROM cv_role
        WHERE cv_id = %(cv_id)s
        ORDER BY highlighted DESC, name;
        """,
        conn,
        params={"cv_id": cv_id},
    )

    key_quals = pd.read_sql(
        """
        SELECT 
          label,
          summary,
          short_description
        FROM key_qualification
        WHERE cv_id = %(cv_id)s
        ORDER BY label;
        """,
        conn,
        params={"cv_id": cv_id},
    )

print("\nTechnologies:")
display(techs.head())

print("\nLanguages:")
display(langs.head())

print("\nProject experience:")
display(projects.head())

print("\nWork experience:")
display(work.head())

print("\nEducation:")
display(edu.head())

print("\nCourses:")
display(courses.head())

print("\nCertifications:")
display(certs.head())

print("\nPositions:")
display(positions.head())

print("\nBlogs / publications:")
display(blogs.head())

print("\nCV roles:")
display(roles.head())

print("\nKey qualifications:")
display(key_quals.head())




Technologies:


Unnamed: 0,technology,years_experience,proficiency
0,Databricks,3,4
1,JavaScript,13,3
2,Terraform,14,2
3,dbt,2,5



Languages:


Unnamed: 0,language,level,highlighted
0,English,Fluent,True
1,Danish,Fluent,False
2,Polish,Fluent,False



Project experience:


Unnamed: 0,year_from,year_to,description,industry,project_type,percent_allocated
0,2022,2023,,Finance,Implementation,32
1,2021,2021,,Finance,Advisory,78



Work experience:


Unnamed: 0,year_from,year_to,employer,description
0,2019,2023,RetailCo,"Worked on data platforms, software delivery an..."
1,2016,2018,RetailCo,"Worked on data platforms, software delivery an..."
2,2014,2015,ConsultCo,"Worked on data platforms, software delivery an..."
3,2013,2015,ConsultCo,"Worked on data platforms, software delivery an..."



Education:


Unnamed: 0,year_from,year_to,place_of_study,degree,description



Courses:


Unnamed: 0,year,name,organiser,highlighted
0,2024,Azure Fundamentals,Databricks,True
1,2022,Power BI,Udemy,True
2,2021,SQL Advanced,Coursera,True
3,2018,Azure Fundamentals,AWS,True



Certifications:


Unnamed: 0,year,month,month_expire,year_expire
0,2020,7,10,2023
1,2020,5,11,2022



Positions:


Unnamed: 0,year_from,year_to,name,description,highlighted
0,2023,2024,Senior Azure Engineer,Progression based on delivery impact.,False
1,2022,2023,Consultant Azure Engineer,Progression based on delivery impact.,True
2,2021,2022,Associate Azure Engineer,Progression based on delivery impact.,True



Blogs / publications:


Unnamed: 0,name,description,highlighted
0,Azure Fabric Basics,Conference talk / blog summary.,True
1,Optimising PySpark,Conference talk / blog summary.,True



CV roles:


Unnamed: 0,name,description,highlighted
0,Developer,High-level role on multiple projects.,True



Key qualifications:


Unnamed: 0,label,summary,short_description
0,Key Strengths,"Experienced in cloud, data engineering and ana...","Focus on Python, Azure/AWS, Databricks."


## 7.3 Clearance + availability for that user

In [25]:
print_step("Step 7.3 — Clearance and availability for this user")

with engine.connect() as conn:
    clearance = pd.read_sql(
        """
        SELECT 
          dc.name AS clearance,
          uc.valid_from,
          uc.valid_to,
          uc.verified_by
        FROM user_clearance uc
        JOIN dim_clearance dc ON dc.clearance_id = uc.clearance_id
        WHERE uc.user_id = %(user_id)s
        ORDER BY uc.valid_from DESC;
        """,
        conn,
        params={"user_id": user_id},
    )

    availability = pd.read_sql(
        """
        SELECT 
          date,
          percent_available,
          source
        FROM user_availability
        WHERE user_id = %(user_id)s
        ORDER BY date
        LIMIT 30;
        """,
        conn,
        params={"user_id": user_id},
    )

print("\nSecurity clearance:")
display(clearance)

print("\nUpcoming availability (sample):")
display(availability.head(10))




Security clearance:


Unnamed: 0,clearance,valid_from,valid_to,verified_by
0,,1900-01-01,,HR



Upcoming availability (sample):


Unnamed: 0,date,percent_available,source
0,2025-11-25,27,Fake generator
1,2025-11-26,47,Fake generator
2,2025-11-27,18,Fake generator
3,2025-11-28,57,Fake generator
4,2025-11-29,100,Fake generator
5,2025-11-30,100,Fake generator
6,2025-12-01,79,Fake generator
7,2025-12-02,74,Fake generator
8,2025-12-03,73,Fake generator
9,2025-12-04,47,Fake generator


In [26]:
print_step("Example table use")

with engine.connect() as conn:
    basic = pd.read_sql("""
        SELECT
            u.user_id,
            u.cv_partner_user_id,
            (u.name_multilang->>'int') AS name,
            c.cv_id,
            (c.title_multilang->>'int') AS cv_title,
            c.sfia_level,
            c.cpd_label,
            dt.name AS technology,
            ct.years_experience,
            cr.name AS clearance,
            ua.percent_available, 
            ua.date AS availability_date
        FROM users u
        JOIN cvs c
            ON c.user_id = u.user_id
        LEFT JOIN cv_technology ct  ON ct.cv_id = c.cv_id
        LEFT JOIN dim_technology dt ON dt.technology_id = ct.technology_id
        LEFT JOIN user_clearance uc ON uc.user_id = u.user_id
        LEFT JOIN dim_clearance cr  ON cr.clearance_id = uc.clearance_id
        LEFT JOIN user_availability ua ON ua.user_id = u.user_id

    """, conn)

basic_clean = (
    basic.sort_values(
        by=["user_id", "cv_id", "technology", "availability_date", "clearance"],
        ascending=[True, True, True, True, False]
    )
    .drop_duplicates(
        subset=["user_id", "cv_id", "technology", "availability_date"],
        keep="first"
    )
)

profile = (
    basic_clean
    .groupby(
        [
            "user_id",
            "cv_partner_user_id",
            "name",
            "cv_id",
            "cv_title",
            "sfia_level",
            "cpd_label",
        ],
        dropna=False,
    )
    .agg(
        technologies=("technology", lambda s: ", ".join(sorted(set(s.dropna())))),
        max_years_experience=("years_experience", "max"),
        clearance=("clearance", lambda s: s.dropna().iloc[0] if s.dropna().any() else None),
        latest_availability_date=("availability_date", "max"),
        latest_percent_available=("percent_available", "max"),
    )
    .reset_index()
)

print("\nOne-row-per-person profile table:")
display(profile.head(10))






One-row-per-person profile table:


Unnamed: 0,user_id,cv_partner_user_id,name,cv_id,cv_title,sfia_level,cpd_label,technologies,max_years_experience,clearance,latest_availability_date,latest_percent_available
0,1,5b69cd14,Danielle Johnson,1,Principal C# Developer,5,CPD3L,".NET, Azure, Kubernetes, Python, dbt",15,,2026-01-26,100
1,2,439b63ae,Joshua Walker,2,Principal Data Engineer,5,CPD3L,"C#, Kafka, Node.js, Snowflake, dbt",15,,2026-01-26,100
2,3,bc24ae58,Jill Rhodes,3,Senior C# Developer,4,CPD3E,".NET, Node.js, React, Snowflake, TypeScript, dbt",9,,2026-01-26,100
3,4,711200c6,Patricia Miller,4,Principal Analytics Engineer,5,CPD3L,"Airflow, C#, JavaScript, Kafka, Spark, Terraform",15,SC,2026-01-26,100
4,5,a7986c12,Robert Johnson,5,Associate ML Engineer,2,CPD1E,"Airflow, Node.js, Python, Spark",15,SC,2026-01-26,100
5,6,aea2bd5b,Jeffery Wagner,6,Consultant Data Platform Engineer,3,CPD2L,"Airflow, JavaScript, Power BI, dbt",15,SC,2026-01-26,100
6,7,f3e19f78,Anthony Gonzalez,7,Principal Azure Engineer,5,CPD3L,"C#, Databricks, SQL, Snowflake, Terraform",15,,2026-01-26,100
7,8,2ced75ad,Debra Gardner,8,Senior AI Engineer,4,CPD3E,"Airflow, JavaScript, Oracle, Power BI, SQL, Te...",12,,2026-01-26,100
8,9,4ba9d1b6,Jeffrey Lawrence,9,Associate Backend Engineer,2,CPD1E,"AWS, Azure, C#, Oracle, Power BI, TypeScript",15,,2026-01-26,100
9,10,6e32ee35,Lisa Smith,10,Associate Data Scientist,2,CPD1E,".NET, Airflow, Azure, React, Snowflake",15,,2026-01-26,100
