In [1]:
import os
import logging
import sqlite3
from datetime import datetime
import pandas as pd
import numpy as np

# ===========================================================
#                 CONFIGURATION
# ===========================================================
BASE_PATH = r"E:\work\DEPI\graduation promax"
RAW_DB = fr"{BASE_PATH}\data\raw\ivf_database_updated.db"
STAR_DB = fr"{BASE_PATH}\data\warehouse_final\ivf_star_schema.db"
SCHEMA_SQL = fr"{BASE_PATH}\src\ETL\create_star_schema.sql"
LOG_FILE = fr"{BASE_PATH}\src\ETL\logs\etl_log_ivf.txt"

os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
os.makedirs(os.path.dirname(STAR_DB), exist_ok=True)

logging.basicConfig(
    filename=LOG_FILE,
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)

# ===========================================================
#                SCHEMA SQL (FULL REFRESH)
# ===========================================================
def run_schema_sql():
    conn = sqlite3.connect(STAR_DB)
    with open(SCHEMA_SQL, "r", encoding="utf-8") as f:
        conn.executescript(f.read())
    conn.commit()
    conn.close()
    logging.info("Schema (fresh) created successfully.")


# ===========================================================
#                   RAW LOADING
# ===========================================================
def load_raw_df():
    conn = sqlite3.connect(RAW_DB)
    df = pd.read_sql("SELECT * FROM ivf_patients", conn)
    conn.close()
    logging.info(f"Loaded {len(df)} raw rows.")
    return df


# ===========================================================
#                   CLEAN DATA
# ===========================================================
def clean_data(df):
    df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")
    df = df.drop_duplicates()
    return df


# ===========================================================
#    CHECK REQUIRED COLUMNS (ONLY 3)
# ===========================================================
def check_required(df):
    required = ["case_id", "female_id", "male_id"]
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")


# ===========================================================
#   SET NON-CRITICAL IDs TO NULL
# ===========================================================
def handle_ids(df):
    id_cols = ["protocol_id", "outcome_id", "embryo_id"]
    for c in id_cols:
        if c not in df.columns:
            df[c] = None

    if "fresh_et_stage" not in df.columns:
        df["fresh_et_stage"] = None
    if "grading" not in df.columns:
        df["grading"] = None

    df["transfer_time_id"] = pd.to_datetime(df.get("et_date", None), errors="coerce") \
                                .dt.strftime("%Y-%m-%d")

    return df


# ===========================================================
#     MAP DOCTOR NAME → AUTO INCREMENT doctor_id
# ===========================================================
def map_doctor_ids(df, conn):
    if "doctor_name" not in df.columns:
        df["doctor_name"] = "Unknown"

    # Read existing doctors
    existing = pd.read_sql("SELECT doctor_id, doctor_name FROM dim_doctor;", conn)
    name_to_id = dict(zip(existing["doctor_name"], existing["doctor_id"]))

    unique_names = df["doctor_name"].dropna().unique()
    new_doctors = [name for name in unique_names if name not in name_to_id]

    for name in new_doctors:
        cur = conn.execute(
            "INSERT INTO dim_doctor (doctor_name) VALUES (?)",
            (name,)
        )
        name_to_id[name] = cur.lastrowid

    df["doctor_id"] = df["doctor_name"].map(name_to_id)

    return df


# ===========================================================
#   SAFE INSERT → NO DUPLICATION
# ===========================================================
def insert_or_ignore(table, df_subset, conn):
    cols = df_subset.columns.tolist()
    placeholders = ",".join("?" * len(cols))
    sql = f"INSERT OR IGNORE INTO {table} ({','.join(cols)}) VALUES ({placeholders})"
    conn.executemany(sql, df_subset.values.tolist())
    conn.commit()


# ===========================================================
#       DIMENSIONS LOADING
# ===========================================================
def load_dimensions(df, conn, refresh=True):
    dim_tables = {
        "dim_female":  ["female_id","female_age","female_bmi","amh_level","fsh_level","afc"],
        "dim_male":    ["male_id","male_age","male_factor","semen_count_mill_per_ml",
                        "motility_percent","morphology_percent"],
        "dim_protocol":["protocol_id","protocol_type","stimulation_days",
                        "total_fsh_dose","trigger_type","recommended_protocol"],
        "dim_outcome": ["outcome_id","risk_level","response_type",
                        "suggested_waiting_period_days","failure_reason"],
        "dim_embryo":  ["embryo_id","fresh_et_stage","grading","class_a_rate"]
    }

    for table, cols in dim_tables.items():
        subset = df[cols].drop_duplicates()

        if refresh:
            subset.to_sql(table, conn, if_exists="replace", index=False)
        else:
            insert_or_ignore(table, subset, conn)

        logging.info(f"{table}: {len(subset)} processed.")


# ===========================================================
#       DIM_TIME
# ===========================================================
def build_dim_time(df, conn):
    tmp = pd.to_datetime(df.get("et_date", None), errors="coerce").dropna().drop_duplicates()

    time_dim = pd.DataFrame({
        "full_date": tmp.dt.strftime("%Y-%m-%d"),
        "day": tmp.dt.day,
        "month": tmp.dt.month,
        "month_name": tmp.dt.month_name(),
        "quarter": tmp.dt.quarter,
        "year": tmp.dt.year,
        "week": tmp.dt.isocalendar().week.astype(int)
    })

    for _, row in time_dim.iterrows():
        conn.execute("""
            INSERT OR IGNORE INTO dim_time
            (full_date, day, month, month_name, quarter, year, week)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """, tuple(row))
    conn.commit()


# ===========================================================
#              FACT TABLES
# ===========================================================
def load_fact_tables(df, conn):
    time_df = pd.read_sql("SELECT time_id, full_date FROM dim_time;", conn)
    date_to_id = dict(zip(time_df["full_date"], time_df["time_id"]))

    # Fill missing fact numeric columns with 0
    fact_needed = [
        "e2_on_trigger","endometrium_thickness","follicles_18mm",
        "gv_count","injected_m2","fertilized_oocytes",
        "cleavage_d3","blastocyst_d5","good_embryos"
    ]
    for col in fact_needed:
        if col not in df.columns:
            df[col] = 0

    df["cycle_start_time_id"] = df["transfer_time_id"].map(date_to_id)

    fact_cycle = df.drop_duplicates(subset=["case_id"])[[
        "case_id","female_id","male_id","protocol_id","doctor_id","outcome_id",
        "cycle_start_time_id","e2_on_trigger","endometrium_thickness",
        "follicles_18mm","retrieved_oocytes","m2_count","gv_count",
        "injected_m2","fertilized_oocytes","fertilization_rate",
        "cleavage_d3","blastocyst_d5","good_embryos"
    ]]
    insert_or_ignore("fact_ivf_cycle", fact_cycle, conn)

    # ---------------- FACT TRANSFER ----------------
    if all(col in df.columns for col in [
        "case_id","transfer_time_id","doctor_id","embryos_transferred"
    ]):
        tmp = df.drop_duplicates(subset=["case_id"]).copy()
        tmp["transfer_time_fk"] = tmp["transfer_time_id"].map(date_to_id)
        fact_transfer = tmp[[
            "case_id","transfer_time_fk","doctor_id",
            "embryos_transferred","pregnancy_test_result",
            "clinical_pregnancy","live_birth",
            "outcome_id","success_probability_score"
        ]]
        insert_or_ignore("fact_transfer", fact_transfer, conn)

    # ---------------- FACT TRANSFER EMBRYO ----------------
    try:
        existing_transfer = pd.read_sql("SELECT transfer_sk, case_id FROM fact_transfer;", conn)
        if not existing_transfer.empty and "embryo_id" in df.columns:
            df_merge = df.merge(existing_transfer, on="case_id", how="inner")
            fact_embryo = df_merge[["transfer_sk","embryo_id"]].drop_duplicates()
            insert_or_ignore("fact_transfer_embryo", fact_embryo, conn)
    except:
        logging.warning("fact_transfer_embryo skipped.")


# ===========================================================
#                    MAIN ETL
# ===========================================================
def run_full_etl(refresh=True):
    logging.info("===== ETL STARTED =====")

    if refresh:
        run_schema_sql()

    df = load_raw_df()
    df = clean_data(df)

    # Required columns check
    check_required(df)

    # Normalize MII/M2 naming
    rename_map = {"mii_count": "m2_count", "injected_mii": "injected_m2"}
    df = df.rename(columns={k: v for k, v in rename_map.items() if k in df.columns})

    df = handle_ids(df)

    conn = sqlite3.connect(STAR_DB)

    # Doctor mapping
    df = map_doctor_ids(df, conn)

    # Dimensions
    load_dimensions(df, conn, refresh=refresh)
    build_dim_time(df, conn)
    load_fact_tables(df, conn)

    conn.close()

    logging.info("ETL COMPLETED SUCCESSFULLY.")
    print("ETL Done ✔")


if __name__ == "__main__":
    run_full_etl(refresh=True)


ETL Done ✔


In [3]:
import sqlite3
import pandas as pd

DB = r"E:\work\DEPI\graduation promax\data\warehouse_final\ivf_star_schema.db"
conn = sqlite3.connect(DB)

# 1️⃣  عرض كل الجداول الموجودة فعليًا:
print("\n--- ALL TABLES IN DB ---")
tables = pd.read_sql("SELECT name FROM sqlite_master WHERE type='table';", conn)
print(tables)

# 2️⃣  عرض عدد الصفوف لكل جدول:
print("\n--- ROW COUNTS ---")
for t in tables['name']:
    count = pd.read_sql(f"SELECT COUNT(*) as rows FROM {t};", conn)
    print(f"{t:<25} → {count['rows'][0]} rows")

# 3️⃣  عرض أول 3 صفوف من كل جدول (لو فيه بيانات):
print("\n--- SAMPLE DATA (LIMIT 3) ---")
for t in tables['name']:
    try:
        sample = pd.read_sql(f"SELECT * FROM {t} LIMIT 3;", conn)
        print(f"\nTABLE: {t}")
        print(sample)
    except:
        print(f"\nTABLE: {t} → Error or no rows")

conn.close()



--- ALL TABLES IN DB ---
                    name
0             dim_doctor
1        sqlite_sequence
2               dim_time
3         fact_ivf_cycle
4          fact_transfer
5   fact_transfer_embryo
6             dim_female
7               dim_male
8           dim_protocol
9            dim_outcome
10            dim_embryo

--- ROW COUNTS ---
dim_doctor                → 6 rows
sqlite_sequence           → 4 rows
dim_time                  → 3376 rows
fact_ivf_cycle            → 10000 rows
fact_transfer             → 30000 rows
fact_transfer_embryo      → 60000 rows
dim_female                → 30000 rows
dim_male                  → 30000 rows
dim_protocol              → 29767 rows
dim_outcome               → 21750 rows
dim_embryo                → 20028 rows

--- SAMPLE DATA (LIMIT 3) ---

TABLE: dim_doctor
   doctor_id doctor_name                              doctor_recommendation
0          1     Unknown  Increase gonadotropin dose in next cycle for b...
1          2     Unknown  Procee

In [4]:
import sqlite3, pandas as pd
conn = sqlite3.connect(STAR_DB)

df = pd.read_sql("PRAGMA table_info(dim_time);", conn)
print(df)

df = pd.read_sql("SELECT * FROM dim_time LIMIT 5;", conn)
print(df)


conn.close()

   cid        name     type  notnull dflt_value  pk
0    0     time_id  INTEGER        0       None   1
1    1   full_date     TEXT        0       None   0
2    2         day  INTEGER        0       None   0
3    3       month  INTEGER        0       None   0
4    4  month_name     TEXT        0       None   0
5    5     quarter  INTEGER        0       None   0
6    6        year  INTEGER        0       None   0
7    7        week  INTEGER        0       None   0
   time_id   full_date  day  month month_name  quarter  year  week
0        1  2022-03-26   26      3      March        1  2022    12
1        2  2016-03-30   30      3      March        1  2016    13
2        3  2018-09-06    6      9  September        3  2018    36
3        4  2018-03-05    5      3      March        1  2018    10
4        5  2023-03-07    7      3      March        1  2023    10


In [5]:
import sqlite3
import pandas as pd

DB_PATH = r"E:\work\DEPI\graduation promax\data\warehouse_final\ivf_star_schema.db"

# Connect
conn = sqlite3.connect(DB_PATH)

queries = {
    "Total Female Patients": "SELECT COUNT(*) FROM dim_female;",
    "Total Male Patients": "SELECT COUNT(*) FROM dim_male;",
    "Protocol Distribution": """
        SELECT protocol_type, COUNT(*) 
        FROM dim_protocol
        GROUP BY protocol_type;
    """,
    "Success Outcome Counts": """
        SELECT outcome_id, COUNT(*) 
        FROM fact_transfer
        GROUP BY outcome_id;
    """,
    "Sample Dates (dim_time)": """
        SELECT * FROM dim_time LIMIT 5;
    """
}

for title, q in queries.items():
    print(f"\n--- {title} ---")
    try:
        df = pd.read_sql(q, conn)
        print(df)
    except Exception as e:
        print("Error:", e)

conn.close()
print("\nConnection closed.")



--- Total Female Patients ---
   COUNT(*)
0     10000

--- Total Male Patients ---
   COUNT(*)
0     10000

--- Protocol Distribution ---
  protocol_type  COUNT(*)
0    Antagonist      5307
1          Long      2519
2          Mild       783
3         Short      1158

--- Success Outcome Counts ---
  outcome_id  COUNT(*)
0       None     10000

--- Sample Dates (dim_time) ---
   time_id   full_date  day  month month_name  quarter  year  week
0        1  2022-03-26   26      3      March        1  2022    12
1        2  2016-03-30   30      3      March        1  2016    13
2        3  2018-09-06    6      9  September        3  2018    36
3        4  2018-03-05    5      3      March        1  2018    10
4        5  2023-03-07    7      3      March        1  2023    10

Connection closed.


In [9]:
df = load_raw_df()
df = clean_data(df)
df = apply_placeholder_and_ids(df)
print(df.columns)   # عشان نتأكد الأعمدة موجودة

print(df[["case_id", "transfer_time_id", "embryos_transferred"]].head())  # نشوف عينات

print(df["embryos_transferred"].value_counts())  # نعرف لو كلها صفر أو فاضية



Index(['case_id', 'et_date', 'female_age', 'female_bmi', 'amh_level',
       'fsh_level', 'afc', 'male_age', 'male_factor',
       'semen_count_mill_per_ml', 'motility_percent', 'morphology_percent',
       'protocol_type', 'stimulation_days', 'total_fsh_dose', 'trigger_type',
       'e2_on_trigger', 'endometrium_thickness', 'follicles_18mm',
       'retrieved_oocytes', 'mii_count', 'mi_count', 'gv_count',
       'injected_mii', 'fertilized_oocytes', 'fertilization_rate',
       'cleavage_d3', 'blastocyst_d5', 'good_embryos', 'class_a_rate',
       'fresh_et_stage', 'embryos_transferred', 'grading',
       'pregnancy_test_result', 'clinical_pregnancy', 'live_birth',
       'success_probability_score', 'response_type', 'risk_level',
       'recommended_protocol', 'suggested_waiting_period_days',
       'failure_reason', 'doctor_recommendation', 'female_id', 'male_id',
       'transfer_time_id', 'protocol_id', 'doctor_id', 'outcome_id',
       'embryo_id'],
      dtype='object')
      ca

In [7]:
import sqlite3
import pandas as pd

# إنشاء DB جديدة
conn = sqlite3.connect(r"E:\work\DEPI\graduation promax\data\raw\ivf_patients_test.db")

# الداتا كاملة بنفس الشكل المطلوب للـ ETL
data = [
    # صف مكرر (للاختبار)
    ["CASE_TEST_001", "2023-03-07", 32, 24.8, 1.2, 5.0, 10, 38, "Normal", 55.0, 20.0, 5.0, 4, 4, 2, 0.5, 1, 1.0, "D5", 2, "B", "Negative", 0, 0, 0.76, "Poor", "High", "Antagonist", 90, "Unknown", "Good response"],
    ["CASE_TEST_001", "2023-03-07", 32, 24.8, 1.2, 5.0, 10, 38, "Normal", 55.0, 20.0, 5.0, 4, 4, 2, 0.5, 1, 1.0, "D5", 2, "B", "Negative", 0, 0, 0.76, "Poor", "High", "Antagonist", 90, "Unknown", "Good response"],

    # صف جديد
    ["CASE_TEST_002", "2022-11-15", 30, 22.5, 0.9, 4.3, 12, 41, "OAT", 40.0, 18.0, 6.0, 5, 5, 3, 0.6, 2, 0.8, "D3", 1, "C", "Positive", 1, 0, 0.88, "Normal", "Medium", "Mild", 45, "Implantation Failure", "Monitor closely"],

    # صف جديد (نصف البيانات ناقص → اختبار null handling)
    ["CASE_TEST_003", None, 29, None, None, 3.1, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, 0.00, None, None, None, None, None, None]
]

columns = [
    "case_id","et_date","female_age","female_bmi","amh_level","fsh_level","afc",
    "male_age","male_factor","semen_count_mill_per_ml","motility_percent","morphology_percent",
    "retrieved_oocytes","mii_count","num_embryos_generated","fertilization_rate",
    "good_embryos","class_a_rate","fresh_et_stage","embryos_transferred","grading",
    "pregnancy_test_result","clinical_pregnancy","live_birth","success_probability_score",
    "response_type","risk_level","recommended_protocol","suggested_waiting_period_days",
    "failure_reason","doctor_recommendation"
]

df = pd.DataFrame(data, columns=columns)
df.to_sql("ivf_patients", conn, if_exists="replace", index=False)

conn.close()
print("✔ Test DB created successfully: ivf_patients_test.db")


✔ Test DB created successfully: ivf_patients_test.db


In [6]:
# Load the database and display full table
import sqlite3
import pandas as pd

db_path = r"E:\work\DEPI\graduation promax\data\raw\ivf_patients_test.db"
conn = sqlite3.connect(db_path)

df = pd.read_sql("SELECT * FROM ivf_patients", conn)
conn.close()

df


Unnamed: 0,case_id,et_date,female_age,female_bmi,amh_level,fsh_level,afc,male_age,male_factor,semen_count_mill_per_ml,...,pregnancy_test_result,clinical_pregnancy,live_birth,success_probability_score,response_type,risk_level,recommended_protocol,suggested_waiting_period_days,failure_reason,doctor_recommendation
0,CASE_TEST_001,2023-03-07,32,24.8,1.2,5.0,10.0,38.0,Normal,55.0,...,Negative,0.0,0.0,0.76,Poor,High,Antagonist,90.0,Unknown,Good response
1,CASE_TEST_001,2023-03-07,32,24.8,1.2,5.0,10.0,38.0,Normal,55.0,...,Negative,0.0,0.0,0.76,Poor,High,Antagonist,90.0,Unknown,Good response
2,CASE_TEST_002,2022-11-15,30,22.5,0.9,4.3,12.0,41.0,OAT,40.0,...,Positive,1.0,0.0,0.88,Normal,Medium,Mild,45.0,Implantation Failure,Monitor closely
3,CASE_TEST_003,,29,,,3.1,,,,,...,,,,0.0,,,,,,


In [8]:
import sqlite3
import pandas as pd

# ================================
# 1) PATHS
# ================================
db_path = r"E:\work\DEPI\graduation promax\data\raw\ivf_database_updated.db"
excel_output = r"E:\work\DEPI\graduation promax\data\raw\raw_database_dump.xlsx"

# ================================
# 2) CONNECT TO DB
# ================================
conn = sqlite3.connect(db_path)

# ================================
# 3) READ TABLE NAMES
# ================================
tables = pd.read_sql(
    "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'",
    conn
)

table_list = tables["name"].tolist()

# ================================
# 4) EXPORT TO EXCEL
# ================================
with pd.ExcelWriter(excel_output, engine="openpyxl") as writer:
    for tbl in table_list:
        df = pd.read_sql(f"SELECT * FROM {tbl}", conn)
        df.to_excel(writer, sheet_name=tbl[:31], index=False)  # Excel sheet name limit

# ================================
# 5) CLOSE CONNECTION
# ================================
conn.close()

print("Done! Excel saved at:", excel_output)


Done! Excel saved at: E:\work\DEPI\graduation promax\data\raw\raw_database_dump.xlsx


In [None]:
import sqlite3
import pandas as pd

# ================================
# 1) PATHS
# ================================
db_path = r"E:\work\DEPI\graduation promax\data\warehouse_final\ivf_star_schema.db"
excel_output = r"E:\work\DEPI\graduation promax\data\warehouse_final\database_dump.xlsx"

# ================================
# 2) CONNECT TO DB
# ================================
conn = sqlite3.connect(db_path)

# ================================
# 3) READ TABLE NAMES
# ================================
tables = pd.read_sql(
    "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'",
    conn
)

table_list = tables["name"].tolist()

# ================================
# 4) EXPORT TO EXCEL
# ================================
with pd.ExcelWriter(excel_output, engine="openpyxl") as writer:
    for tbl in table_list:
        df = pd.read_sql(f"SELECT * FROM {tbl}", conn)
        df.to_excel(writer, sheet_name=tbl[:31], index=False)  # Excel sheet name limit

# ================================
# 5) CLOSE CONNECTION
# ================================
conn.close()

print("Done! Excel saved at:", excel_output)


In [5]:
# etl_sequential_ids.py
import os
import logging
import sqlite3
import uuid
from datetime import datetime
import pandas as pd
import numpy as np

# ----------------------------
# CONFIG
# ----------------------------
BASE_PATH = r"E:\work\DEPI\graduation promax"
RAW_DB = fr"{BASE_PATH}\data\raw\ivf_database_updated.db"
STAR_DB = fr"{BASE_PATH}\data\warehouse_final\ivf_star_schema.db"
SCHEMA_SQL = fr"{BASE_PATH}\src\ETL\create_star_schema.sql"  # should contain full create statements
LOG_FILE = fr"{BASE_PATH}\src\ETL\logs\etl_sequential_ids.txt"

os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
os.makedirs(os.path.dirname(STAR_DB), exist_ok=True)

logging.basicConfig(
    filename=LOG_FILE,
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger("etl_sequential_ids")

# ----------------------------
# DIMENSION DEFINITIONS (natural keys + id col + prefix)
# ----------------------------
# nat: list of columns that identify a unique row for that dim
DIM_DEFS = {
    "dim_female": {
        "id_col": "female_id",
        "nat": ["female_age", "female_bmi", "amh_level", "fsh_level", "afc"],
        "prefix": "f"
    },
    "dim_male": {
        "id_col": "male_id",
        "nat": ["male_age", "male_factor", "semen_count_mill_per_ml", "motility_percent", "morphology_percent"],
        "prefix": "m"
    },
    "dim_protocol": {
        "id_col": "protocol_id",
        "nat": ["protocol_type", "stimulation_days", "total_fsh_dose", "trigger_type", "recommended_protocol"],
        "prefix": "prot"
    },
    "dim_outcome": {
        "id_col": "outcome_id",
        "nat": ["risk_level", "response_type", "suggested_waiting_period_days", "failure_reason"],
        "prefix": "out"
    },
    "dim_embryo": {
        "id_col": "embryo_id",
        "nat": ["fresh_et_stage", "grading", "class_a_rate"],
        "prefix": "emb"
    },
    # dim_doctor handled separately (autoinc integer) using natural key [name, recommendation]
}

# ----------------------------
# HELPERS
# ----------------------------
def zero_pad(n, width=5):
    return str(n).zfill(width)

def next_seq_id(conn, table, id_col, prefix):
    """
    Determine next sequential ID in the form prefix_00001
    Looks at existing values in table.id_col, extracts numeric suffix and returns next.
    If table is empty -> returns prefix_00001
    """
    cur = conn.cursor()
    try:
        # select maximal numeric suffix by stripping prefix_ and casting
        like_pat = f"{prefix}_%" 
        cur.execute(f"SELECT {id_col} FROM {table} WHERE {id_col} LIKE ? LIMIT 1", (like_pat,))
        # get all matching ids to compute max - simpler and reliable
        rows = cur.execute(f"SELECT {id_col} FROM {table} WHERE {id_col} LIKE ?", (like_pat,)).fetchall()
        if not rows:
            return f"{prefix}_{zero_pad(1)}"
        max_n = 0
        for (val,) in rows:
            try:
                # val like prefix_00012
                num = int(val.split("_")[-1])
                if num > max_n:
                    max_n = num
            except Exception:
                continue
        return f"{prefix}_{zero_pad(max_n + 1)}"
    finally:
        cur.close()

def build_nat_key_from_row(row, cols):
    # convert NaN -> empty string and strip
    vals = []
    for c in cols:
        v = row.get(c, "")
        if pd.isna(v):
            v = ""
        vals.append(str(v).strip())
    return "|".join(vals)

def ensure_table_exists(conn, table_sql=None):
    """If user schema SQL exists, run it earlier. This is a fallback to ensure dim_doctor present."""
    if table_sql:
        conn.executescript(table_sql)
        conn.commit()

# ----------------------------
# CORE: process a dimension using natural keys and sequential ids
# ----------------------------
def process_dimension(conn, df, dim_name, dim_def, refresh):
    id_col = dim_def["id_col"]
    nat_cols = dim_def["nat"]
    prefix = dim_def["prefix"]

    # ensure natural columns exist in incoming df (fill with empty)
    for c in nat_cols:
        if c not in df.columns:
            df[c] = None

    # build incoming subset unique by natural key
    incoming = df[nat_cols + [id_col]].copy()
    incoming = incoming.drop_duplicates(subset=nat_cols).reset_index(drop=True)

    # create nat_key column
    incoming["nat_key"] = incoming.apply(lambda r: build_nat_key_from_row(r, nat_cols), axis=1)

    # fetch existing rows
    try:
        existing = pd.read_sql(f"SELECT {id_col}, {', '.join(nat_cols)} FROM {dim_name}", conn)
        if existing.shape[0] > 0:
            existing["nat_key"] = existing.apply(lambda r: build_nat_key_from_row(r, nat_cols), axis=1)
        else:
            existing["nat_key"] = []
    except Exception:
        # table may not exist yet (schema not created) -> create minimal table with id_col and nat cols
        cols_sql = ",\n".join([f"{c} TEXT" for c in nat_cols])
        create_sql = f"""
            CREATE TABLE IF NOT EXISTS {dim_name} (
                {id_col} TEXT PRIMARY KEY,
                {cols_sql}
            );
        """
        conn.execute(create_sql)
        conn.commit()
        existing = pd.DataFrame(columns=[id_col] + nat_cols + ["nat_key"])

    nat_to_id = dict(zip(existing["nat_key"].astype(str), existing[id_col].astype(str)))

    # Iterate incoming rows: if nat_key exists -> reuse id, else create new sequential id and insert
    to_insert = []
    assigned_ids = {}
    for _, row in incoming.iterrows():
        nk = row["nat_key"]
        if nk in nat_to_id and nk not in assigned_ids:
            assigned_ids[nk] = nat_to_id[nk]
            continue
        if nk in assigned_ids:
            continue
        # new natural key -> generate sequential id
        new_id = next_seq_id(conn, dim_name, id_col, prefix)
        # prepare insert row preserving natural cols values and the id
        insert_row = [new_id] + [row[c] if not pd.isna(row[c]) else None for c in nat_cols]
        to_insert.append(insert_row)
        nat_to_id[nk] = new_id
        assigned_ids[nk] = new_id

    # Bulk insert new rows (if any)
    if to_insert:
        placeholders = ",".join(["?"] * (1 + len(nat_cols)))
        cols = ", ".join([id_col] + nat_cols)
        sql = f"INSERT OR IGNORE INTO {dim_name} ({cols}) VALUES ({placeholders})"
        cur = conn.cursor()
        try:
            cur.executemany(sql, to_insert)
            conn.commit()
            logging.info(f"{dim_name}: inserted {len(to_insert)} new rows.")
        except Exception as e:
            logging.exception(f"{dim_name}: failed to insert new rows: {e}")
            conn.rollback()
            # try row-by-row
            for r in to_insert:
                try:
                    cur.execute(sql, r)
                except Exception:
                    logging.exception(f"{dim_name}: single row insert failed: {r}")
            conn.commit()
        finally:
            cur.close()
    else:
        logging.info(f"{dim_name}: no new rows to insert.")

    # Map incoming original df rows to ids by nat_key
    # Build mapping nat_key -> id
    final_map = nat_to_id  # nat_key -> id

    # produce a mapping series for the entire df (not only incoming subset)
    df[id_col] = df.apply(lambda r: final_map.get(build_nat_key_from_row(r, nat_cols), None), axis=1)

    return df

# ----------------------------
# Doctor special handling: natural key = name|recommendation, id = autoinc integer
# ----------------------------
def process_doctors(conn, df, refresh):
    if "doctor_name" not in df.columns:
        df["doctor_name"] = "Unknown"
    if "doctor_recommendation" not in df.columns:
        df["doctor_recommendation"] = None

    # ensure table exists with UNIQUE(name, recommendation)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS dim_doctor (
            doctor_id INTEGER PRIMARY KEY AUTOINCREMENT,
            doctor_name TEXT,
            doctor_recommendation TEXT,
            UNIQUE (doctor_name, doctor_recommendation)
        );
    """)
    conn.commit()

    df["nat_key_doctor"] = df["doctor_name"].astype(str).str.strip() + "|" + df["doctor_recommendation"].astype(str).fillna("").astype(str).str.strip()

    # load existing nat keys
    existing = pd.read_sql("SELECT doctor_id, doctor_name, doctor_recommendation FROM dim_doctor", conn)
    if existing.shape[0] > 0:
        existing["nat_key_doctor"] = existing["doctor_name"].astype(str).str.strip() + "|" + existing["doctor_recommendation"].astype(str).fillna("").astype(str).str.strip()
    else:
        existing["nat_key_doctor"] = []

    nat_to_id = dict(zip(existing["nat_key_doctor"].astype(str), existing["doctor_id"].astype(int)))

    # identify new combinations
    new_keys = df.loc[~df["nat_key_doctor"].isin(nat_to_id), ["doctor_name", "doctor_recommendation", "nat_key_doctor"]].drop_duplicates("nat_key_doctor")
    cur = conn.cursor()
    for _, row in new_keys.iterrows():
        try:
            cur.execute("INSERT OR IGNORE INTO dim_doctor (doctor_name, doctor_recommendation) VALUES (?, ?)",
                        (row["doctor_name"], row["doctor_recommendation"]))
            conn.commit()
            doc_id = conn.execute("SELECT doctor_id FROM dim_doctor WHERE doctor_name = ? AND doctor_recommendation = ?",
                                   (row["doctor_name"], row["doctor_recommendation"])).fetchone()[0]
            nat_to_id[row["nat_key_doctor"]] = doc_id
        except Exception:
            logging.exception("process_doctors: insert failed for %s", row.to_dict())
    cur.close()

    # assign ids to df
    df["doctor_id"] = df["nat_key_doctor"].map(nat_to_id)
    df.drop(columns=["nat_key_doctor"], inplace=True)
    return df

# ----------------------------
# dim_time builder (append-only)
# ----------------------------
def build_dim_time(df, conn):
    tmp = pd.to_datetime(df.get("et_date", None), errors="coerce").dropna().drop_duplicates()
    if tmp.empty:
        return
    time_dim = pd.DataFrame({
        "full_date": tmp.dt.strftime("%Y-%m-%d"),
        "day": tmp.dt.day,
        "month": tmp.dt.month,
        "month_name": tmp.dt.month_name(),
        "quarter": tmp.dt.quarter,
        "year": tmp.dt.year,
        "week": tmp.dt.isocalendar().week.astype(int)
    }).drop_duplicates(subset=["full_date"])
    cur = conn.cursor()
    for _, r in time_dim.iterrows():
        try:
            cur.execute("""INSERT OR IGNORE INTO dim_time
                           (full_date, day, month, month_name, quarter, year, week)
                           VALUES (?, ?, ?, ?, ?, ?, ?)""",
                        (r.full_date, int(r.day), int(r.month), r.month_name, int(r.quarter), int(r.year), int(r.week)))
        except Exception:
            logging.exception("build_dim_time insert failed for %s", r.full_date)
    conn.commit()
    cur.close()

# ----------------------------
# facts loader (insert or ignore; expects dims already assigned)
# ----------------------------
def load_fact_tables(df, conn):
    # prepare date->time_id map
    try:
        time_df = pd.read_sql("SELECT time_id, full_date FROM dim_time", conn)
    except Exception:
        time_df = pd.DataFrame(columns=["time_id", "full_date"])
    date_to_id = dict(zip(time_df["full_date"], time_df["time_id"]))

    # ensure numeric fact columns exist
    fact_needed = [
        "e2_on_trigger","endometrium_thickness","follicles_18mm",
        "retrieved_oocytes","m2_count","gv_count","injected_m2","fertilized_oocytes",
        "fertilization_rate","cleavage_d3","blastocyst_d5","good_embryos"
    ]
    for c in fact_needed:
        if c not in df.columns:
            df[c] = np.nan

    # map time ids
    df["cycle_start_time_id"] = df["transfer_time_id"].map(date_to_id)

    # build fact_ivf_cycle
    fact_cycle_cols = [
        "case_id","female_id","male_id","protocol_id","doctor_id","outcome_id",
        "cycle_start_time_id","e2_on_trigger","endometrium_thickness","follicles_18mm",
        "retrieved_oocytes","m2_count","gv_count","injected_m2","fertilized_oocytes",
        "fertilization_rate","cleavage_d3","blastocyst_d5","good_embryos"
    ]
    fc = df.drop_duplicates(subset=["case_id"])[[c for c in fact_cycle_cols if c in df.columns]]
    insert_or_ignore("fact_ivf_cycle", fc, conn)

    # fact_transfer
    if all(c in df.columns for c in ["case_id","transfer_time_id","doctor_id","embryos_transferred"]):
        tmp = df.drop_duplicates(subset=["case_id"]).copy()
        tmp["transfer_time_fk"] = tmp["transfer_time_id"].map(date_to_id)
        ft_cols = ["case_id","transfer_time_fk","doctor_id","embryos_transferred","pregnancy_test_result",
                   "clinical_pregnancy","live_birth","outcome_id","success_probability_score"]
        ft = tmp[[c for c in ft_cols if c in tmp.columns]]
        insert_or_ignore("fact_transfer", ft, conn)

    # fact_transfer_embryo
    try:
        trans = pd.read_sql("SELECT transfer_sk, case_id FROM fact_transfer", conn)
        if not trans.empty and "embryo_id" in df.columns:
            merged = df.merge(trans, on="case_id", how="inner")
            te = merged[["transfer_sk", "embryo_id"]].drop_duplicates()
            insert_or_ignore("fact_transfer_embryo", te, conn)
    except Exception:
        logging.exception("load_fact_tables -> transfer_embryo failed")

# ----------------------------
# MAIN ETL
# ----------------------------
def run_full_etl(refresh=True):
    logging.info("===== ETL STARTED =====")
    try:
        if refresh:
            # run schema SQL if provided
            if os.path.exists(SCHEMA_SQL):
                with open(SCHEMA_SQL, "r", encoding="utf-8") as f:
                    sql_text = f.read()
                conn0 = sqlite3.connect(STAR_DB)
                conn0.executescript(sql_text)
                conn0.commit()
                conn0.close()
                logging.info("Schema created from SCHEMA_SQL.")
            else:
                logging.warning("SCHEMA_SQL not found; continuing without executing SQL file.")

        # load raw
        conn = sqlite3.connect(RAW_DB)
        raw_df = pd.read_sql("SELECT * FROM ivf_patients", conn)
        conn.close()
        logging.info("Loaded raw rows: %d", len(raw_df))

        # clean
        raw_df.columns = raw_df.columns.str.strip().str.lower().str.replace(" ", "_")
        raw_df = raw_df.drop_duplicates().reset_index(drop=True)

        # required
        req = ["case_id", "female_id", "male_id"]
        missing = [c for c in req if c not in raw_df.columns]
        if missing:
            raise ValueError(f"Missing required columns: {missing}")

        # optional standardize episode
        if "et_date" in raw_df.columns:
            raw_df["transfer_time_id"] = pd.to_datetime(raw_df["et_date"], errors="coerce").dt.strftime("%Y-%m-%d")
        else:
            raw_df["transfer_time_id"] = None

        # open star db connection
        conn = sqlite3.connect(STAR_DB)

        # process doctor first (autoinc)
        raw_df = process_doctors(conn, raw_df, refresh)

        # process each dimension sequentially (female, male, protocol, outcome, embryo)
        for dim_name, dim_def in DIM_DEFS.items():
            id_col = dim_def["id_col"]
            if id_col not in raw_df.columns:
                raw_df[id_col] = None
            raw_df = process_dimension(conn, raw_df, dim_name, dim_def, refresh)

        # time dim
        build_dim_time(raw_df, conn)

        # facts
        load_fact_tables(raw_df, conn)

        conn.close()
        logging.info("ETL completed successfully.")
        print("ETL Done ✔")
        return True
    except Exception as e:
        logging.exception("run_full_etl FAILED")
        print(f"ETL FAILED: {e}")
        return False

# ----------------------------
# RUN
# ----------------------------
if __name__ == "__main__":
    # change to True for initial full refresh
    run_full_etl(refresh=True)


ETL Done ✔


In [6]:
import sqlite3
import pandas as pd

RAW_DB = r"E:\work\DEPI\graduation promax\data\raw\ivf_database_updated.db"

conn = sqlite3.connect(RAW_DB)

query = """
SELECT DISTINCT
    doctor_recommendation
FROM ivf_patients
WHERE doctor_recommendation IS NOT NULL;
"""

df = pd.read_sql(query, conn)
conn.close()

print(df)


                               doctor_recommendation
0  Increase gonadotropin dose in next cycle for b...
1  Proceed with embryo freezing for future transfer.
2  Monitor progesterone closely during luteal phase.
3  Consider switching to mild stimulation protoco...
4            Good prognosis, continue same protocol.
5  Optimize sperm selection for ICSI in next atte...
