In [39]:
import sqlite3
import csv
from pathlib import Path
import os
import time
DB_PATH = "patient.db"
FILES = {
        "patients": "PatientCorePopulatedTable.txt",
        "admissions": "AdmissionsCorePopulatedTable.txt",
        "diagnoses": "AdmissionsDiagnosesCorePopulatedTable.txt",
        "labs": "LabsCorePopulatedTable.txt",
}

if os.path.exists(DB_PATH) :
    os.remove (DB_PATH)
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.executescript(STAGING_CREATE_SQL)
conn.close()

In [40]:
STAGING_CREATE_SQL = """

PRAGMA foreign_keys = ON;

DROP TABLE IF EXISTS admission_lab_results;
DROP TABLE IF EXISTS admission_primary_diagnoses;
DROP TABLE IF EXISTS admissions;
DROP TABLE IF EXISTS patients;
DROP TABLE IF EXISTS lab_tests;
DROP TABLE IF EXISTS diagnosis_codes;
DROP TABLE IF EXISTS lab_units;
DROP TABLE IF EXISTS languages;
DROP TABLE IF EXISTS marital_statuses;
DROP TABLE IF EXISTS races;
DROP TABLE IF EXISTS genders;
DROP TABLE IF EXISTS stage_labs;
DROP TABLE IF EXISTS stage_diagnoses;
DROP TABLE IF EXISTS stage_admissions;
DROP TABLE IF EXISTS stage_patients;

-- Staging tables
CREATE TABLE stage_patients (
    PatientID TEXT, 
    PatientGender TEXT, 
    PatientDateOfBirth TEXT, 
    PatientRace TEXT, 
    PatientMaritalStatus TEXT, 
    PatientLanguage TEXT, 
    PatientPopulationPercentageBelowPoverty TEXT
);

CREATE TABLE stage_admissions (
    PatientID TEXT, 
    AdmissionID TEXT, 
    AdmissionStartDate TEXT, 
    AdmissionEndDate TEXT
);

CREATE TABLE stage_diagnoses (
    PatientID TEXT, 
    AdmissionID TEXT, 
    PrimaryDiagnosisCode TEXT, 
    PrimaryDiagnosisDescription TEXT
);

CREATE TABLE stage_labs (
    PatientID TEXT, 
    AdmissionID TEXT, 
    LabName TEXT, 
    LabValue TEXT,
    LabUnits TEXT,
    LabDateTime TEXT
);

-- Lookup tables
CREATE TABLE genders (
    gender_id INTEGER PRIMARY KEY AUTOINCREMENT,
    gender_desc TEXT NOT NULL UNIQUE
);

CREATE TABLE races (
    race_id INTEGER PRIMARY KEY AUTOINCREMENT,
    race_desc TEXT NOT NULL UNIQUE
);

CREATE TABLE marital_statuses (
    marital_status_id INTEGER PRIMARY KEY AUTOINCREMENT,
    marital_status_desc TEXT NOT NULL UNIQUE
);

CREATE TABLE languages (
    language_id INTEGER PRIMARY KEY AUTOINCREMENT,
    language_desc TEXT NOT NULL UNIQUE
);

CREATE TABLE lab_units (
    unit_id INTEGER PRIMARY KEY AUTOINCREMENT,
    unit_string TEXT NOT NULL UNIQUE
);

CREATE TABLE lab_tests (
    lab_test_id INTEGER PRIMARY KEY AUTOINCREMENT,
    lab_name TEXT NOT NULL UNIQUE,
    unit_id INTEGER NOT NULL,
    FOREIGN KEY (unit_id) REFERENCES lab_units(unit_id)
);

CREATE TABLE diagnosis_codes (
    diagnosis_code TEXT PRIMARY KEY,
    diagnosis_description TEXT NOT NULL
);

-- Core tables
CREATE TABLE patients (
    patient_id TEXT PRIMARY KEY,
    patient_gender INTEGER,
    patient_dob TEXT NOT NULL,
    patient_race INTEGER,
    patient_marital_status INTEGER,
    patient_language INTEGER,
    patient_population_pct_below_poverty REAL,
    FOREIGN KEY (patient_gender) REFERENCES genders(gender_id),
    FOREIGN KEY (patient_race) REFERENCES races(race_id),
    FOREIGN KEY (patient_marital_status) REFERENCES marital_statuses(marital_status_id),
    FOREIGN KEY (patient_language) REFERENCES languages(language_id)
);

CREATE TABLE admissions (
    patient_id TEXT NOT NULL,
    admission_id INTEGER NOT NULL,
    admission_start TEXT NOT NULL,
    admission_end TEXT,
    PRIMARY KEY (patient_id, admission_id),
    FOREIGN KEY (patient_id) REFERENCES patients(patient_id)
);

CREATE TABLE admission_primary_diagnoses (
    patient_id TEXT NOT NULL,
    admission_id INTEGER NOT NULL,
    diagnosis_code TEXT NOT NULL,
    PRIMARY KEY (patient_id, admission_id),
    FOREIGN KEY (patient_id, admission_id) REFERENCES admissions(patient_id, admission_id),
    FOREIGN KEY (diagnosis_code) REFERENCES diagnosis_codes(diagnosis_code)
);

CREATE TABLE admission_lab_results (
    patient_id TEXT NOT NULL,
    admission_id INTEGER NOT NULL,
    lab_test_id INTEGER NOT NULL,
    lab_value REAL,
    lab_datetime TEXT NOT NULL,
    FOREIGN KEY (patient_id, admission_id) REFERENCES admissions(patient_id, admission_id),
    FOREIGN KEY (lab_test_id) REFERENCES lab_tests(lab_test_id),
    UNIQUE (patient_id, admission_id, lab_test_id, lab_datetime)
);
"""


In [41]:
# Delete old DB if exists
if os.path.exists(DB_PATH):
    os.remove(DB_PATH)

conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.executescript(STAGING_CREATE_SQL)
conn.commit()
conn.close()




In [48]:
def load_tsv_to_stage(conn, filepath, stage_table, expected_columns, delimiter="\t", batch_size=5000):
    path = Path(filepath)
    if not path.exists():
        raise FileNotFoundError(f"Missing file: {filepath}")

    with path.open("r", encoding="utf-8-sig") as csvfile:
        reader = csv.DictReader(csvfile, delimiter=delimiter)
        missing = sorted(set(expected_columns) - set(reader.fieldnames))
        if missing:
            raise ValueError(f"{filepath} missing expected columns: {missing}")

        # Clear table first
        conn.execute(f"DELETE FROM {stage_table}")

        rows = []
        total = 0
        for row in reader:
            rows.append([row.get(c, None) for c in expected_columns])
            if len(rows) >= batch_size:
                conn.executemany(f"INSERT INTO {stage_table} ({','.join(expected_columns)}) VALUES ({','.join(['?']*len(expected_columns))})", rows)
                conn.commit()
                total += len(rows)
                print(f"Inserted {total} rows into {stage_table}")
                rows = []
        if rows:
            conn.executemany(f"INSERT INTO {stage_table} ({','.join(expected_columns)}) VALUES ({','.join(['?']*len(expected_columns))})", rows)
            conn.commit()
            total += len(rows)
            print(f"Inserted {total} rows into {stage_table}")


In [49]:
conn = sqlite3.connect(DB_PATH)

for table_name, file_path in FILES.items():
    load_tsv_to_stage(conn, file_path, f"stage_{table_name}", EXPECTED_COLUMNS[table_name])

conn.close()
print("All staging tables loaded!")


Inserted 100 rows into stage_patients
Inserted 372 rows into stage_admissions
Inserted 372 rows into stage_diagnoses
Inserted 5000 rows into stage_labs
Inserted 10000 rows into stage_labs
Inserted 15000 rows into stage_labs
Inserted 20000 rows into stage_labs
Inserted 25000 rows into stage_labs
Inserted 30000 rows into stage_labs
Inserted 35000 rows into stage_labs
Inserted 40000 rows into stage_labs
Inserted 45000 rows into stage_labs
Inserted 50000 rows into stage_labs
Inserted 55000 rows into stage_labs
Inserted 60000 rows into stage_labs
Inserted 65000 rows into stage_labs
Inserted 70000 rows into stage_labs
Inserted 75000 rows into stage_labs
Inserted 80000 rows into stage_labs
Inserted 85000 rows into stage_labs
Inserted 90000 rows into stage_labs
Inserted 95000 rows into stage_labs
Inserted 100000 rows into stage_labs
Inserted 105000 rows into stage_labs
Inserted 110000 rows into stage_labs
Inserted 111483 rows into stage_labs
All staging tables loaded!


In [44]:
def build_dimensions(conn):
    cur = conn.cursor()
    # Genders
    cur.execute("INSERT OR IGNORE INTO genders(gender_desc) SELECT DISTINCT PatientGender FROM stage_patients WHERE PatientGender IS NOT NULL AND PatientGender <> ''")
    # Races
    cur.execute("INSERT OR IGNORE INTO races(race_desc) SELECT DISTINCT PatientRace FROM stage_patients WHERE PatientRace IS NOT NULL AND PatientRace <> ''")
    # Marital Statuses
    cur.execute("INSERT OR IGNORE INTO marital_statuses(marital_status_desc) SELECT DISTINCT PatientMaritalStatus FROM stage_patients WHERE PatientMaritalStatus IS NOT NULL AND PatientMaritalStatus <> ''")
    # Languages
    cur.execute("INSERT OR IGNORE INTO languages(language_desc) SELECT DISTINCT PatientLanguage FROM stage_patients WHERE PatientLanguage IS NOT NULL AND PatientLanguage <> ''")
    # Lab Units
    cur.execute("INSERT OR IGNORE INTO lab_units(unit_string) SELECT DISTINCT LabUnits FROM stage_labs WHERE LabUnits IS NOT NULL AND LabUnits <> ''")
    # Lab Tests
    cur.execute("""
        INSERT OR IGNORE INTO lab_tests(lab_name, unit_id)
        SELECT DISTINCT s.LabName, u.unit_id
        FROM stage_labs s
        JOIN lab_units u ON u.unit_string = s.LabUnits
        WHERE s.LabName IS NOT NULL AND s.LabName <> ''
    """)
    # Diagnosis Codes
    cur.execute("""
        INSERT OR IGNORE INTO diagnosis_codes(diagnosis_code, diagnosis_description)
        SELECT DISTINCT PrimaryDiagnosisCode, PrimaryDiagnosisDescription
        FROM stage_diagnoses
        WHERE PrimaryDiagnosisCode IS NOT NULL AND PrimaryDiagnosisCode <> ''
    """)
    conn.commit()
    cur.close()
    print("Dimension tables populated!")

conn = sqlite3.connect(DB_PATH)
build_dimensions(conn)
conn.close()


Dimension tables populated!


In [45]:
def load_entities(conn):
    cur = conn.cursor()
    # Patients
    cur.execute("""
        INSERT OR IGNORE INTO patients(patient_id, patient_gender, patient_dob, patient_race, patient_marital_status, patient_language, patient_population_pct_below_poverty)
        SELECT
            s.PatientID,
            g.gender_id,
            s.PatientDateOfBirth,
            r.race_id,
            m.marital_status_id,
            l.language_id,
            CASE WHEN s.PatientPopulationPercentageBelowPoverty = '' THEN NULL ELSE s.PatientPopulationPercentageBelowPoverty END
        FROM stage_patients s
        LEFT JOIN genders g ON g.gender_desc = s.PatientGender
        LEFT JOIN races r ON r.race_desc = s.PatientRace
        LEFT JOIN marital_statuses m ON m.marital_status_desc = s.PatientMaritalStatus
        LEFT JOIN languages l ON l.language_desc = s.PatientLanguage
    """)
    # Admissions
    cur.execute("""
        INSERT OR IGNORE INTO admissions(patient_id, admission_id, admission_start, admission_end)
        SELECT PatientID, AdmissionID, AdmissionStartDate, AdmissionEndDate
        FROM stage_admissions
    """)
    conn.commit()
    cur.close()
    print("Entity tables populated!")

conn = sqlite3.connect(DB_PATH)
load_entities(conn)
conn.close()


Entity tables populated!


In [46]:
def build_facts(conn):
    cur = conn.cursor()
    # Primary diagnoses
    cur.execute("""
        INSERT OR IGNORE INTO admission_primary_diagnoses(patient_id, admission_id, diagnosis_code)
        SELECT s.PatientID, s.AdmissionID, s.PrimaryDiagnosisCode
        FROM stage_diagnoses s
        JOIN diagnosis_codes d ON d.diagnosis_code = s.PrimaryDiagnosisCode
    """)
    # Lab results
    cur.execute("""
        INSERT OR IGNORE INTO admission_lab_results(patient_id, admission_id, lab_test_id, lab_value, lab_datetime)
        SELECT s.PatientID, s.AdmissionID, lt.lab_test_id,
            CASE WHEN s.LabValue = '' THEN NULL ELSE s.LabValue END,
            s.LabDateTime
        FROM stage_labs s
        JOIN lab_tests lt ON lt.lab_name = s.LabName
    """)
    conn.commit()
    cur.close()
    print("Fact tables populated!")

conn = sqlite3.connect(DB_PATH)
build_facts(conn)
conn.close()


Fact tables populated!


In [37]:
import time
import sqlite3

# Start the execution
print("Creating tables...")
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.executescript(STAGING_CREATE_SQL)
conn.commit()
cursor.close()
conn.close()
print("Tables created successfully\n")

# Load staging data
print("Loading staging data...")
start_time = time.monotonic()
conn = sqlite3.connect(DB_PATH)
for name in FILES:
    load_tsv_to_stage(
        conn,
        FILES[name],
        f"stage_{name}",
        EXPECTED_COLUMNS[name]
    )
conn.close()
end_time = time.monotonic()
elapsed_time = end_time - start_time
print(f"\nStaging data loaded. Elapsed time: {elapsed_time:.2f} seconds\n")

# Build dimension tables
print("Building dimension tables...")
conn = sqlite3.connect(DB_PATH)
build_dimensions(conn)
conn.close()

# Load entity tables
print("Loading entity tables...")
conn = sqlite3.connect(DB_PATH)
load_entities(conn)
conn.close()

# Build fact tables
print("Building fact tables...")
conn = sqlite3.connect(DB_PATH)
build_facts(conn)
conn.close()

print("\n✅ Database migration complete!")


Creating tables...
Tables created successfully

Loading staging data...
Inserted 100 rows into stage_patients
Inserted 372 rows into stage_admissions
Inserted 372 rows into stage_diagnoses
Inserted 5000 rows into stage_labs
Inserted 10000 rows into stage_labs
Inserted 15000 rows into stage_labs
Inserted 20000 rows into stage_labs
Inserted 25000 rows into stage_labs
Inserted 30000 rows into stage_labs
Inserted 35000 rows into stage_labs
Inserted 40000 rows into stage_labs
Inserted 45000 rows into stage_labs
Inserted 50000 rows into stage_labs
Inserted 55000 rows into stage_labs
Inserted 60000 rows into stage_labs
Inserted 65000 rows into stage_labs
Inserted 70000 rows into stage_labs
Inserted 75000 rows into stage_labs
Inserted 80000 rows into stage_labs
Inserted 85000 rows into stage_labs
Inserted 90000 rows into stage_labs
Inserted 95000 rows into stage_labs
Inserted 100000 rows into stage_labs
Inserted 105000 rows into stage_labs
Inserted 110000 rows into stage_labs
Inserted 111483 r

In [47]:
import psycopg2

conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()

# Correct SQL query
SQL_query = """
SELECT PatientID, COUNT(LabName) AS LabCount
FROM LabsCorePopulatedTable
GROUP BY PatientID
ORDER BY LabCount DESC
"""

cur.execute(SQL_query)
results = cur.fetchall()

for row in results:
    print(row)

cur.close()
conn.close()


NameError: name 'DATABASE_URL' is not defined

In [47]:
import psycopg2

conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()

# Correct SQL query
SQL_query = """
SELECT PatientID, COUNT(LabName) AS LabCount
FROM LabsCorePopulatedTable
GROUP BY PatientID
ORDER BY LabCount DESC
"""

cur.execute(SQL_query)
results = cur.fetchall()

for row in results:
    print(row)

cur.close()
conn.close()


NameError: name 'DATABASE_URL' is not defined