In [1]:
import psycopg2
import pandas as pd
from psycopg2.extras import execute_values

# PostgreSQL Database Connection
DB_PARAMS = {
    "dbname": "mydatabase",
    "user": "myuser",
    "password": "mypassword",
    "host": "localhost",
    "port": "5433"  # Ensure this matches your running PostgreSQL container
}

conn = psycopg2.connect(**DB_PARAMS)
cur = conn.cursor()

cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
conn.commit()

# admissions

In [21]:
def create_admissions_table():
    """ 创建 admissions 表（如果不存在） """
    create_table_sql = """
    CREATE TABLE IF NOT EXISTS admissions (
        subject_id INTEGER NOT NULL,
        hadm_id INTEGER NOT NULL PRIMARY KEY,
        admittime TIMESTAMP NOT NULL,
        dischtime TIMESTAMP,
        deathtime TIMESTAMP,
        admission_type VARCHAR(40) NOT NULL,
        admit_provider_id VARCHAR(10),
        admission_location VARCHAR(60),
        discharge_location VARCHAR(60),
        insurance VARCHAR(255),
        language VARCHAR(10),
        marital_status VARCHAR(30),
        race VARCHAR(80),
        edregtime TIMESTAMP,
        edouttime TIMESTAMP,
        hospital_expire_flag SMALLINT
    );
    """
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'admissions' is ready.")
def insert_admissions():
    """ 读取 admissions.csv 并批量插入 PostgreSQL，修复 NaT 时间格式错误 """
    csv_path = "./data/hosp/admissions.csv"  # 请确保路径正确

    print("📥 Reading CSV file...")
    
    # 读取 CSV 并转换时间字段
    df = pd.read_csv(csv_path, dtype=str)

    # 需要转换的时间字段
    datetime_fields = ["admittime", "dischtime", "deathtime", "edregtime", "edouttime"]

    for field in datetime_fields:
        if field in df.columns:
            df[field] = pd.to_datetime(df[field], errors="coerce")  # 🔹 处理无效时间，转换失败的值变成 NaT

    # 替换 NaT（无效日期）为 None，以便插入 PostgreSQL
    df = df.astype(object).where(pd.notnull(df), None)

    # SQL 批量插入
    insert_sql = """
    INSERT INTO admissions (
        subject_id, hadm_id, admittime, dischtime, deathtime,
        admission_type, admit_provider_id, admission_location, discharge_location,
        insurance, language, marital_status, race, edregtime, edouttime, hospital_expire_flag
    ) VALUES %s
    ON CONFLICT (hadm_id) DO NOTHING;
    """

    # 转换 DataFrame 为 Tuple List
    data_tuples = [tuple(x) for x in df.to_numpy()]

    # 批量插入
    print("📤 Inserting data into PostgreSQL...")
    execute_values(cur, insert_sql, data_tuples)
    
    print(f"✅ Inserted {len(data_tuples)} rows into 'admissions'.")

def check_data():
    """ 检查数据是否成功插入 """
    cur.execute("SELECT COUNT(*) FROM admissions;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'admissions': {count}")

    cur.execute("SELECT * FROM admissions LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [22]:
create_admissions_table()  # 创建表
insert_admissions()  # 插入数据
check_data()  # 检查是否成功插入

✅ Table 'admissions' is ready.
📥 Reading CSV file...
📤 Inserting data into PostgreSQL...
✅ Inserted 431231 rows into 'admissions'.
📊 Total rows in 'admissions': 431231
🧐 Sample data:
(10000032, 22595853, datetime.datetime(2180, 5, 6, 22, 23), datetime.datetime(2180, 5, 7, 17, 15), None, 'URGENT', 'P874LG', 'TRANSFER FROM HOSPITAL', 'HOME', 'Other', 'ENGLISH', 'WIDOWED', 'WHITE', datetime.datetime(2180, 5, 6, 19, 17), datetime.datetime(2180, 5, 6, 23, 30), 0)
(10000032, 22841357, datetime.datetime(2180, 6, 26, 18, 27), datetime.datetime(2180, 6, 27, 18, 49), None, 'EW EMER.', 'P09Q6Y', 'EMERGENCY ROOM', 'HOME', 'Medicaid', 'ENGLISH', 'WIDOWED', 'WHITE', datetime.datetime(2180, 6, 26, 15, 54), datetime.datetime(2180, 6, 26, 21, 31), 0)
(10000032, 25742920, datetime.datetime(2180, 8, 5, 23, 44), datetime.datetime(2180, 8, 7, 17, 50), None, 'EW EMER.', 'P60CC5', 'EMERGENCY ROOM', 'HOSPICE', 'Medicaid', 'ENGLISH', 'WIDOWED', 'WHITE', datetime.datetime(2180, 8, 5, 20, 58), datetime.datetime(

# d_hcpcs

In [25]:
def create_d_hcpcs_table():
    """ Creates d_hcpcs table if it doesn't exist """
    create_table_sql = """
    CREATE TABLE IF NOT EXISTS d_hcpcs (
        code CHAR(5) NOT NULL PRIMARY KEY,
        category SMALLINT,
        long_description TEXT,
        short_description VARCHAR(180)
    );
    """
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'd_hcpcs' is ready.")

def insert_d_hcpcs():
    """ Reads d_hcpcs.csv and inserts data into PostgreSQL, handling NAType errors """
    csv_path = "./data/hosp/d_hcpcs.csv"  # Ensure the correct path

    print("📥 Reading CSV file...")
    df = pd.read_csv(csv_path, dtype=str)  # Read everything as string

    # Convert NaN to None for PostgreSQL
    df = df.where(pd.notnull(df), None)

    # Convert category column to integer (SMALLINT in PostgreSQL)
    if "category" in df.columns:
        df["category"] = pd.to_numeric(df["category"], errors="coerce")  # Convert to number, NaN -> NaN
        df["category"] = df["category"].astype("Int64").where(pd.notnull(df["category"]), None)  # NaN -> None

    # SQL Batch Insert Query
    insert_sql = """
    INSERT INTO d_hcpcs (code, category, long_description, short_description)
    VALUES %s
    ON CONFLICT (code) DO NOTHING;
    """

    # Convert DataFrame to List of Tuples
    data_tuples = [tuple(x) for x in df.to_numpy()]

    # Ensure all `pd.NA` are converted to `None`
    data_tuples = [
        tuple(None if isinstance(x, pd._libs.missing.NAType) else x for x in row)
        for row in data_tuples
    ]

    # Batch Insert Data
    print("📤 Inserting data into PostgreSQL...")
    execute_values(cur, insert_sql, data_tuples)
    
    print(f"✅ Inserted {len(data_tuples)} rows into 'd_hcpcs'.")

def check_d_hcpcs():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM d_hcpcs;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'd_hcpcs': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM d_hcpcs LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [26]:
create_d_hcpcs_table()  # Create table
insert_d_hcpcs()  # Insert data
check_d_hcpcs()  # Check if data is inserted

✅ Table 'd_hcpcs' is ready.
📥 Reading CSV file...
📤 Inserting data into PostgreSQL...
✅ Inserted 89200 rows into 'd_hcpcs'.
📊 Total rows in 'd_hcpcs': 89200
🧐 Sample data:
('00000', None, None, 'Invalid Code')
('0001F', 2, None, 'Composite measures')
('0002F', 2, None, 'Composite measures')
('0003F', 2, None, 'Composite measures')
('0004F', 2, None, 'Composite measures')


# d_icd_diagnoses

In [28]:
def create_d_icd_diagnoses_table():
    """ Creates d_icd_diagnoses table if it doesn't exist """
    create_table_sql = """
    CREATE TABLE IF NOT EXISTS d_icd_diagnoses (
        icd_code CHAR(7) NOT NULL,
        icd_version INTEGER NOT NULL,
        long_title VARCHAR(255),
        PRIMARY KEY (icd_code, icd_version)
    );
    """
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'd_icd_diagnoses' is ready.")

def insert_d_icd_diagnoses():
    """ Reads d_icd_diagnoses.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/d_icd_diagnoses.csv"  # Ensure the correct path

    print("📥 Reading CSV file...")
    df = pd.read_csv(csv_path, dtype=str)  # Read all data as string to avoid type mismatches

    # Convert NaN to None for PostgreSQL
    df = df.where(pd.notnull(df), None)

    # Convert icd_version to integer
    if "icd_version" in df.columns:
        df["icd_version"] = pd.to_numeric(df["icd_version"], errors="coerce").astype(pd.Int64Dtype())

    # SQL Batch Insert Query
    insert_sql = """
    INSERT INTO d_icd_diagnoses (icd_code, icd_version, long_title)
    VALUES %s
    ON CONFLICT (icd_code, icd_version) DO NOTHING;
    """

    # Convert DataFrame to List of Tuples
    data_tuples = [tuple(x) for x in df.to_numpy()]

    # Ensure all `pd.NA` are converted to `None`
    data_tuples = [
        tuple(None if isinstance(x, pd._libs.missing.NAType) else x for x in row)
        for row in data_tuples
    ]

    # Batch Insert Data
    print("📤 Inserting data into PostgreSQL...")
    execute_values(cur, insert_sql, data_tuples)
    
    print(f"✅ Inserted {len(data_tuples)} rows into 'd_icd_diagnoses'.")

def check_d_icd_diagnoses():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM d_icd_diagnoses;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'd_icd_diagnoses': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM d_icd_diagnoses LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [29]:
create_d_icd_diagnoses_table()  # Create table
insert_d_icd_diagnoses()  # Insert data
check_d_icd_diagnoses()  # Check if data is inserted

✅ Table 'd_icd_diagnoses' is ready.
📥 Reading CSV file...
📤 Inserting data into PostgreSQL...
✅ Inserted 109775 rows into 'd_icd_diagnoses'.
📊 Total rows in 'd_icd_diagnoses': 109775
🧐 Sample data:
('0010   ', 9, 'Cholera due to vibrio cholerae')
('0011   ', 9, 'Cholera due to vibrio cholerae el tor')
('0019   ', 9, 'Cholera, unspecified')
('0020   ', 9, 'Typhoid fever')
('0021   ', 9, 'Paratyphoid fever A')


# d_icd_procedures

In [31]:
def create_d_icd_procedures_table():
    """ Creates d_icd_procedures table if it doesn't exist """
    create_table_sql = """
    CREATE TABLE IF NOT EXISTS d_icd_procedures (
        icd_code CHAR(7) NOT NULL,
        icd_version INTEGER NOT NULL,
        long_title VARCHAR(255),
        PRIMARY KEY (icd_code, icd_version)
    );
    """
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'd_icd_procedures' is ready.")

def insert_d_icd_procedures():
    """ Reads d_icd_procedures.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/d_icd_procedures.csv"  # Ensure the correct path

    print("📥 Reading CSV file...")
    df = pd.read_csv(csv_path, dtype=str)  # Read all data as string to avoid type mismatches

    # Convert NaN to None for PostgreSQL
    df = df.where(pd.notnull(df), None)

    # Convert icd_version to integer
    if "icd_version" in df.columns:
        df["icd_version"] = pd.to_numeric(df["icd_version"], errors="coerce").astype(pd.Int64Dtype())

    # SQL Batch Insert Query
    insert_sql = """
    INSERT INTO d_icd_procedures (icd_code, icd_version, long_title)
    VALUES %s
    ON CONFLICT (icd_code, icd_version) DO NOTHING;
    """

    # Convert DataFrame to List of Tuples
    data_tuples = [tuple(x) for x in df.to_numpy()]

    # Ensure all `pd.NA` are converted to `None`
    data_tuples = [
        tuple(None if isinstance(x, pd._libs.missing.NAType) else x for x in row)
        for row in data_tuples
    ]

    # Batch Insert Data
    print("📤 Inserting data into PostgreSQL...")
    execute_values(cur, insert_sql, data_tuples)
    
    print(f"✅ Inserted {len(data_tuples)} rows into 'd_icd_procedures'.")

def check_d_icd_procedures():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM d_icd_procedures;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'd_icd_procedures': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM d_icd_procedures LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [32]:
create_d_icd_procedures_table()  # Create table
insert_d_icd_procedures()  # Insert data
check_d_icd_procedures()  # Check if data is inserted

✅ Table 'd_icd_procedures' is ready.
📥 Reading CSV file...
📤 Inserting data into PostgreSQL...
✅ Inserted 85257 rows into 'd_icd_procedures'.
📊 Total rows in 'd_icd_procedures': 85257
🧐 Sample data:
('0001   ', 9, 'Therapeutic ultrasound of vessels of head and neck')
('0002   ', 9, 'Therapeutic ultrasound of heart')
('0003   ', 9, 'Therapeutic ultrasound of peripheral vascular vessels')
('0009   ', 9, 'Other therapeutic ultrasound')
('001    ', 10, 'Central Nervous System and Cranial Nerves, Bypass')


# d_labitems

In [37]:
def create_d_labitems_table():
    """ Creates d_labitems table if it doesn't exist """
    create_table_sql = """
    CREATE TABLE IF NOT EXISTS d_labitems (
        itemid INTEGER PRIMARY KEY,
        label VARCHAR(50),
        fluid VARCHAR(50),
        category VARCHAR(50)
    );
    """
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'd_labitems' is ready.")

def insert_d_labitems():
    """ Reads d_labitems.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/d_labitems.csv"  # Ensure the correct path

    print("📥 Reading CSV file...")
    df = pd.read_csv(csv_path, dtype=str)  # Read all data as string to avoid type mismatches

    # Convert NaN to None for PostgreSQL
    df = df.where(pd.notnull(df), None)

    # Convert itemid to integer
    if "itemid" in df.columns:
        df["itemid"] = pd.to_numeric(df["itemid"], errors="coerce").astype(pd.Int64Dtype())

    # SQL Batch Insert Query
    insert_sql = """
    INSERT INTO d_labitems (itemid, label, fluid, category)
    VALUES %s
    ON CONFLICT (itemid) DO NOTHING;
    """

    # Convert DataFrame to List of Tuples
    data_tuples = [tuple(x) for x in df.to_numpy()]

    # Ensure all `pd.NA` are converted to `None`
    data_tuples = [
        tuple(None if isinstance(x, pd._libs.missing.NAType) else x for x in row)
        for row in data_tuples
    ]

    # Batch Insert Data
    print("📤 Inserting data into PostgreSQL...")
    execute_values(cur, insert_sql, data_tuples)
    
    print(f"✅ Inserted {len(data_tuples)} rows into 'd_labitems'.")

def check_d_labitems():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM d_labitems;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'd_labitems': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM d_labitems LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [38]:
create_d_labitems_table()  # Create table
insert_d_labitems() 
check_d_labitems()

✅ Table 'd_labitems' is ready.
📥 Reading CSV file...
📤 Inserting data into PostgreSQL...
✅ Inserted 1622 rows into 'd_labitems'.
📊 Total rows in 'd_labitems': 1622
🧐 Sample data:
(50801, 'Alveolar-arterial Gradient', 'Blood', 'Blood Gas')
(50802, 'Base Excess', 'Blood', 'Blood Gas')
(50803, 'Calculated Bicarbonate, Whole Blood', 'Blood', 'Blood Gas')
(50804, 'Calculated Total CO2', 'Blood', 'Blood Gas')
(50805, 'Carboxyhemoglobin', 'Blood', 'Blood Gas')


# diagnoses_icd

In [41]:
def create_diagnoses_icd_table():
    """ Creates diagnoses_icd table if it doesn't exist """
    create_table_sql = """
    CREATE TABLE IF NOT EXISTS diagnoses_icd (
        subject_id INTEGER NOT NULL,
        hadm_id INTEGER NOT NULL,
        seq_num INTEGER NOT NULL,
        icd_code VARCHAR(7),
        icd_version INTEGER,
        PRIMARY KEY (subject_id, hadm_id, seq_num)
    );
    """
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'diagnoses_icd' is ready.")

def insert_diagnoses_icd():
    """ Reads diagnoses_icd.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/diagnoses_icd.csv"  # Ensure the correct path

    print("📥 Reading CSV file...")
    df = pd.read_csv(csv_path, dtype=str)  # Read all data as string to avoid type mismatches

    # Convert NaN to None for PostgreSQL
    df = df.where(pd.notnull(df), None)

    # Convert numeric columns
    for col in ["subject_id", "hadm_id", "seq_num", "icd_version"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce").astype(pd.Int64Dtype())

    # SQL Batch Insert Query
    insert_sql = """
    INSERT INTO diagnoses_icd (subject_id, hadm_id, seq_num, icd_code, icd_version)
    VALUES %s
    ON CONFLICT (subject_id, hadm_id, seq_num) DO NOTHING;
    """

    # Convert DataFrame to List of Tuples
    data_tuples = [tuple(x) for x in df.to_numpy()]

    # Ensure all `pd.NA` are converted to `None`
    data_tuples = [
        tuple(None if isinstance(x, pd._libs.missing.NAType) else x for x in row)
        for row in data_tuples
    ]

    # Batch Insert Data
    print("📤 Inserting data into PostgreSQL...")
    execute_values(cur, insert_sql, data_tuples)
    
    print(f"✅ Inserted {len(data_tuples)} rows into 'diagnoses_icd'.")

def check_diagnoses_icd():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM diagnoses_icd;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'diagnoses_icd': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM diagnoses_icd LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [42]:
create_diagnoses_icd_table()  # Create table
insert_diagnoses_icd()  # Insert data
check_diagnoses_icd()  # Check if data is inserted

✅ Table 'diagnoses_icd' is ready.
📥 Reading CSV file...
📤 Inserting data into PostgreSQL...
✅ Inserted 4756326 rows into 'diagnoses_icd'.
📊 Total rows in 'diagnoses_icd': 4756210
🧐 Sample data:
(10000032, 22595853, 1, '5723', 9)
(10000032, 22595853, 2, '78959', 9)
(10000032, 22595853, 3, '5715', 9)
(10000032, 22595853, 4, '07070', 9)
(10000032, 22595853, 5, '496', 9)


# drgcodes

In [43]:
def create_drgcodes_table():
    """ Creates drgcodes table if it doesn't exist """
    create_table_sql = """
    CREATE TABLE IF NOT EXISTS drgcodes (
        subject_id INTEGER NOT NULL,
        hadm_id INTEGER NOT NULL,
        drg_type VARCHAR(4),
        drg_code VARCHAR(10),
        description VARCHAR(195),
        drg_severity SMALLINT,
        drg_mortality SMALLINT,
        PRIMARY KEY (subject_id, hadm_id, drg_code)
    );
    """
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'drgcodes' is ready.")

def insert_drgcodes():
    """ Reads drgcodes.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/drgcodes.csv"  # Ensure the correct path

    print("📥 Reading CSV file...")
    df = pd.read_csv(csv_path, dtype=str)  # Read all data as string to avoid type mismatches

    # Convert NaN to None for PostgreSQL
    df = df.where(pd.notnull(df), None)

    # Convert numeric columns
    for col in ["subject_id", "hadm_id", "drg_severity", "drg_mortality"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce").astype(pd.Int64Dtype())

    # SQL Batch Insert Query
    insert_sql = """
    INSERT INTO drgcodes (subject_id, hadm_id, drg_type, drg_code, description, drg_severity, drg_mortality)
    VALUES %s
    ON CONFLICT (subject_id, hadm_id, drg_code) DO NOTHING;
    """

    # Convert DataFrame to List of Tuples
    data_tuples = [tuple(x) for x in df.to_numpy()]

    # Ensure all `pd.NA` are converted to `None`
    data_tuples = [
        tuple(None if isinstance(x, pd._libs.missing.NAType) else x for x in row)
        for row in data_tuples
    ]

    # Batch Insert Data
    print("📤 Inserting data into PostgreSQL...")
    execute_values(cur, insert_sql, data_tuples)
    
    print(f"✅ Inserted {len(data_tuples)} rows into 'drgcodes'.")

def check_drgcodes():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM drgcodes;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'drgcodes': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM drgcodes LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [44]:
create_drgcodes_table()  # Create table
insert_drgcodes()  # Insert data
check_drgcodes()  # Check if data is inserted

✅ Table 'drgcodes' is ready.
📥 Reading CSV file...
📤 Inserting data into PostgreSQL...
✅ Inserted 604377 rows into 'drgcodes'.
📊 Total rows in 'drgcodes': 604227
🧐 Sample data:
(10000032, 22595853, 'APR', '283', 'OTHER DISORDERS OF THE LIVER', 2, 2)
(10000032, 22595853, 'HCFA', '442', 'DISORDERS OF LIVER EXCEPT MALIG,CIRR,ALC HEPA W CC', None, None)
(10000032, 22841357, 'APR', '279', 'HEPATIC COMA & OTHER MAJOR ACUTE LIVER DISORDERS', 3, 2)
(10000032, 22841357, 'HCFA', '442', 'DISORDERS OF LIVER EXCEPT MALIG,CIRR,ALC HEPA W CC', None, None)
(10000032, 25742920, 'APR', '283', 'OTHER DISORDERS OF THE LIVER', 3, 2)


# emar

In [2]:
def create_emar_table():
    """ Creates emar table if it doesn't exist """
    create_table_sql = """
    CREATE TABLE IF NOT EXISTS emar (
        subject_id INTEGER NOT NULL,
        hadm_id INTEGER,
        emar_id VARCHAR(25) NOT NULL,
        emar_seq INTEGER NOT NULL,
        poe_id VARCHAR(25) NOT NULL,
        pharmacy_id INTEGER,
        enter_provider_id VARCHAR(10),
        charttime TIMESTAMP NOT NULL,
        medication TEXT,
        event_txt VARCHAR(100),
        scheduletime TIMESTAMP,
        storetime TIMESTAMP NOT NULL,
        PRIMARY KEY (emar_id, emar_seq)
    );
    """
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'emar' is ready.")

def insert_emar():
    """ Reads emar.csv and inserts data into PostgreSQL, fixing NaT timestamp errors """
    csv_path = "./data/hosp/emar.csv"  # Ensure the correct path

    print("📥 Reading CSV file...")
    df = pd.read_csv(csv_path, dtype=str)  # Read as string to prevent errors

    # Convert timestamps, replacing invalid values
    datetime_columns = ["charttime", "scheduletime", "storetime"]
    
    for col in datetime_columns:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors="coerce")  # Convert invalid values to NaT

    # Replace NaT (invalid timestamps) with None
    df = df.astype(object).where(pd.notnull(df), None)

    # Convert numeric columns
    for col in ["subject_id", "hadm_id", "emar_seq", "pharmacy_id"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce").astype(pd.Int64Dtype())

    # SQL Batch Insert Query
    insert_sql = """
    INSERT INTO emar (
        subject_id, hadm_id, emar_id, emar_seq, poe_id, pharmacy_id, enter_provider_id,
        charttime, medication, event_txt, scheduletime, storetime
    ) VALUES %s
    ON CONFLICT (emar_id, emar_seq) DO NOTHING;
    """

    # Convert DataFrame to List of Tuples
    data_tuples = [tuple(x) for x in df.to_numpy()]

    # Ensure all `NaT` values are converted to `None`
    data_tuples = [
        tuple(None if isinstance(x, pd._libs.missing.NAType) else x for x in row)
        for row in data_tuples
    ]

    # Batch Insert Data
    print("📤 Inserting data into PostgreSQL...")
    execute_values(cur, insert_sql, data_tuples)
    
    print(f"✅ Inserted {len(data_tuples)} rows into 'emar'.")
def check_emar():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM emar;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'emar': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM emar LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [3]:
create_emar_table()  # Create table
insert_emar()  # Insert data
check_emar()  # Check if data is inserted

✅ Table 'emar' is ready.
📥 Reading CSV file...
📤 Inserting data into PostgreSQL...
✅ Inserted 26850359 rows into 'emar'.
📊 Total rows in 'emar': 26850359
🧐 Sample data:
(10000032, 22595853, '10000032-10', 10, '10000032-36', 48770010, None, datetime.datetime(2180, 5, 7, 0, 44), 'Potassium Chloride', 'Administered', datetime.datetime(2180, 5, 7, 0, 44), datetime.datetime(2180, 5, 7, 0, 44))
(10000032, 22595853, '10000032-11', 11, '10000032-22', 14779570, None, datetime.datetime(2180, 5, 7, 0, 44), 'Sodium Chloride 0.9%  Flush', 'Flushed', datetime.datetime(2180, 5, 7, 0, 44), datetime.datetime(2180, 5, 7, 0, 44))
(10000032, 22595853, '10000032-12', 12, '10000032-37', 93463122, None, datetime.datetime(2180, 5, 7, 6, 10), 'Ipratropium Bromide Neb', 'Administered', datetime.datetime(2180, 5, 7, 6, 0), datetime.datetime(2180, 5, 7, 6, 10))
(10000032, 22595853, '10000032-13', 13, '10000032-28', 42497745, None, datetime.datetime(2180, 5, 7, 5, 0), 'Albuterol Inhaler', 'Administered', datetime.

# emar_detail

In [None]:
def drop_duplicate_in_emar_detail_postgres():
    """
    Connects to PostgreSQL, finds duplicates in emar_detail (based on emar_id + parent_field_ordinal),
    deletes excess duplicates (keeps only one per combination), and verifies the result.
    """

    # 2️⃣ Query duplicates: identify (emar_id, parent_field_ordinal) combos with COUNT(*) > 1
    query_duplicates = """
        SELECT emar_id, parent_field_ordinal, COUNT(*) AS duplicate_count
        FROM emar_detail
        GROUP BY emar_id, parent_field_ordinal
        HAVING COUNT(*) > 1;
    """
    duplicates_df = pd.read_sql(query_duplicates, conn)
    print("Duplicate (emar_id, parent_field_ordinal) combinations:")
    print(duplicates_df)

    if duplicates_df.empty:
        print("No duplicates found. Nothing to remove.")
    else:
        print("\nRemoving duplicates...")

        # 3️⃣ Delete duplicates, keeping only the first row per combination
        #    We'll use a window function and ctid to identify "extra" rows
        delete_duplicates_query = """
            DELETE FROM emar_detail
            USING (
                SELECT ctid AS delete_ctid,
                       ROW_NUMBER() OVER (
                           PARTITION BY emar_id, parent_field_ordinal
                           ORDER BY ctid
                       ) AS row_num
                FROM emar_detail
            ) AS sub
            WHERE emar_detail.ctid = sub.delete_ctid
            AND sub.row_num > 1;
        """
        cursor.execute(delete_duplicates_query)
        rows_deleted = cursor.rowcount
        conn.commit()

        print(f"Excess duplicate rows removed: {rows_deleted}")

        # 4️⃣ Verify no duplicates remain
        remaining_duplicates_df = pd.read_sql(query_duplicates, conn)
        print("Remaining duplicates after cleanup:")
        print(remaining_duplicates_df)

        if remaining_duplicates_df.empty:
            print("All duplicate records have been successfully removed.")
        else:
            print("Some duplicates still exist. Please check the data.")

    # 5️⃣ Close the DB connection
    cursor.close()
    conn.close()
    print("Database connection closed.")

if __name__ == "__main__":
    drop_duplicate_in_emar_detail_postgres()

In [24]:
def create_emar_detail_table():
    drop_table_sql = "DROP TABLE IF EXISTS emar_detail CASCADE;"
    create_table_sql = """
    CREATE TABLE emar_detail (
        subject_id INTEGER NOT NULL,
        emar_id VARCHAR(25) NOT NULL,
        emar_seq INTEGER NOT NULL,
        parent_field_ordinal VARCHAR(10),  -- Can be NULL
        administration_type VARCHAR(50),
        pharmacy_id INTEGER,
        barcode_type VARCHAR(4),
        reason_for_no_barcode TEXT,
        complete_dose_not_given VARCHAR(5),
        dose_due VARCHAR(100),
        dose_due_unit VARCHAR(50),
        dose_given VARCHAR(255),
        dose_given_unit VARCHAR(50),
        will_remainder_of_dose_be_given VARCHAR(5),
        product_amount_given VARCHAR(30),
        product_unit VARCHAR(30),
        product_code VARCHAR(30),
        product_description VARCHAR(255),
        product_description_other VARCHAR(255),
        prior_infusion_rate VARCHAR(40),
        infusion_rate VARCHAR(40),
        infusion_rate_adjustment VARCHAR(50),
        infusion_rate_adjustment_amount VARCHAR(30),
        infusion_rate_unit VARCHAR(30),
        route VARCHAR(10),
        infusion_complete VARCHAR(1),
        completion_interval VARCHAR(50),
        new_iv_bag_hung VARCHAR(1),
        continued_infusion_in_other_location VARCHAR(1),
        restart_interval TEXT,
        side VARCHAR(10),
        site VARCHAR(255),
        non_formulary_visual_verification VARCHAR(1),
        PRIMARY KEY (subject_id, emar_id, emar_seq)
    );
    """
    cur.execute(drop_table_sql)
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'emar_detail' dropped and recreated.")

def insert_emar_detail():
    csv_path = "./data/hosp/emar_detail.csv"
    chunk_size = 50000

    for chunk in pd.read_csv(csv_path, dtype=str, chunksize=chunk_size):
        chunk = chunk.where(pd.notnull(chunk), None)
        
        for col in ["subject_id", "emar_seq", "pharmacy_id"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce").astype(pd.Int64Dtype())

        insert_sql = """
        INSERT INTO emar_detail (
            subject_id, emar_id, emar_seq, parent_field_ordinal, administration_type, 
            pharmacy_id, barcode_type, reason_for_no_barcode, complete_dose_not_given,
            dose_due, dose_due_unit, dose_given, dose_given_unit, will_remainder_of_dose_be_given,
            product_amount_given, product_unit, product_code, product_description,
            product_description_other, prior_infusion_rate, infusion_rate,
            infusion_rate_adjustment, infusion_rate_adjustment_amount, infusion_rate_unit,
            route, infusion_complete, completion_interval, new_iv_bag_hung,
            continued_infusion_in_other_location, restart_interval, side, site,
            non_formulary_visual_verification
        ) VALUES %s
        ON CONFLICT (subject_id, emar_id, emar_seq) DO NOTHING;  -- ✅ No parent_field_ordinal
        """

        data_tuples = [tuple(row) for row in chunk.to_numpy()]
        data_tuples = [
            tuple(None if isinstance(x, pd._libs.missing.NAType) else x for x in r)
            for r in data_tuples
        ]

        execute_values(cur, insert_sql, data_tuples)
        conn.commit()

    print("✅ Finished inserting data into 'emar_detail'!")


def check_emar_detail():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM emar_detail;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'emar_detail': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM emar_detail LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [25]:
create_emar_detail_table()  # Create table
insert_emar_detail()  # Insert data
check_emar_detail()  # Check if data is inserted

✅ Table 'emar_detail' dropped and recreated.
✅ Finished inserting data into 'emar_detail'!
📊 Total rows in 'emar_detail': 26850359
🧐 Sample data:
(10000032, '10000032-10', 10, '1.1', None, 48770010, 'if', None, None, None, None, '10', 'mEq', None, '1', 'TAB', 'MICROK10', 'Potassium Chloride 10mEq ER Tablet', None, None, None, None, None, None, None, None, None, None, None, None, None, None, None)
(10000032, '10000032-100', 100, '1.1', None, None, 'if', None, None, None, None, '30', 'mL', None, '1', 'UDCUP', 'MAAL30L', 'Aluminum-Magnesium Hydrox.-Simethicone 30 mL UDCup', None, None, None, None, None, None, None, None, None, None, None, None, None, None, None)
(10000032, '10000032-101', 101, '1.1', None, None, 'if', None, None, None, None, '5', 'mL', None, '1', 'SYR', 'BELL5L', 'Donnatal (Elixir) 5mL Oral Syringe', None, None, None, None, None, None, None, None, None, None, None, None, None, None, None)
(10000032, '10000032-102', 102, '1.1', None, None, 'if', None, None, None, None, '4'

# hcpcsevents

In [26]:
def create_hcpcsevents_table():
    """ Drops existing table and creates hcpcsevents table """
    drop_table_sql = "DROP TABLE IF EXISTS hcpcsevents CASCADE;"  # 🚀 Drop table if it exists
    create_table_sql = """
    CREATE TABLE hcpcsevents (
        subject_id INTEGER NOT NULL,
        hadm_id INTEGER NOT NULL,
        chartdate DATE,
        hcpcs_cd CHAR(5) NOT NULL,
        seq_num INTEGER NOT NULL,
        short_description VARCHAR(180),
        PRIMARY KEY (subject_id, hadm_id, hcpcs_cd, seq_num)
    );
    """
    cur.execute(drop_table_sql)  # Drop table before recreating
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'hcpcsevents' dropped and recreated.")

def insert_hcpcsevents():
    """ Reads hcpcsevents.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/hcpcsevents.csv"  # Ensure correct path
    chunk_size = 50000  # Process 50,000 rows at a time

    print("📥 Reading CSV file in chunks...")
    for chunk in pd.read_csv(csv_path, dtype=str, chunksize=chunk_size):
        # Convert NaN to None for PostgreSQL
        chunk = chunk.where(pd.notnull(chunk), None)

        # Convert numeric columns
        for col in ["subject_id", "hadm_id", "seq_num"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce").astype(pd.Int64Dtype())

        # Convert chartdate to correct date format
        if "chartdate" in chunk.columns:
            chunk["chartdate"] = pd.to_datetime(chunk["chartdate"], errors="coerce").dt.date

        # SQL Batch Insert Query
        insert_sql = """
        INSERT INTO hcpcsevents (
            subject_id, hadm_id, chartdate, hcpcs_cd, seq_num, short_description
        ) VALUES %s
        ON CONFLICT (subject_id, hadm_id, hcpcs_cd, seq_num) DO NOTHING;
        """

        # Convert DataFrame to List of Tuples
        data_tuples = [tuple(x) for x in chunk.to_numpy()]

        # Ensure all `pd.NA` are converted to `None`
        data_tuples = [
            tuple(None if isinstance(x, pd._libs.missing.NAType) else x for x in row)
            for row in data_tuples
        ]

        # Batch Insert Data
        print(f"📤 Inserting {len(data_tuples)} rows into PostgreSQL...")
        execute_values(cur, insert_sql, data_tuples)
        conn.commit()  # Commit after each chunk to reduce memory usage
    
    print("✅ Finished inserting data into 'hcpcsevents'.")

def check_hcpcsevents():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM hcpcsevents;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'hcpcsevents': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM hcpcsevents LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [27]:
create_hcpcsevents_table()  # Create table
insert_hcpcsevents()  # Insert data
check_hcpcsevents()  # Check if data is inserted

✅ Table 'hcpcsevents' dropped and recreated.
📥 Reading CSV file in chunks...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 771 rows into PostgreSQL...
✅ Finished inserting data into 'hcpcsevents'.
📊 Total rows in 'hcpcsevents': 150771
🧐 Sample data:
(10000068, 25022803, datetime.date(2160, 3, 4), '99218', 1, 'Hospital observation services')
(10000084, 29888819, datetime.date(2160, 12, 28), 'G0378', 1, 'Hospital observation per hr')
(10000108, 27250926, datetime.date(2163, 9, 27), '99219', 1, 'Hospital observation services')
(10000117, 22927623, datetime.date(2181, 11, 15), '43239', 1, 'Digestive system')
(10000117, 22927623, datetime.date(2181, 11, 15), 'G0378', 2, 'Hospital observation per hr')


# labevents

In [42]:
def create_labevents_table():
    """ Drops existing table and creates labevents table """
    drop_table_sql = "DROP TABLE IF EXISTS labevents CASCADE;"  # 🚀 Drop table if it exists
    create_table_sql = """
    CREATE TABLE labevents (
        labevent_id INTEGER NOT NULL PRIMARY KEY,
        subject_id INTEGER NOT NULL,
        hadm_id INTEGER,
        specimen_id INTEGER NOT NULL,
        itemid INTEGER NOT NULL,
        order_provider_id VARCHAR(10),
        charttime TIMESTAMP(0),
        storetime TIMESTAMP(0),
        value VARCHAR(200),
        valuenum DOUBLE PRECISION,
        valueuom VARCHAR(20),
        ref_range_lower DOUBLE PRECISION,
        ref_range_upper DOUBLE PRECISION,
        flag VARCHAR(10),
        priority VARCHAR(7),
        comments TEXT
    );
    """
    cur.execute(drop_table_sql)  # Drop table before recreating
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'labevents' dropped and recreated.")

def insert_labevents():
    """ Reads labevents.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/labevents.csv"
    chunk_size = 50000  # Process 50,000 rows at a time

    print("📥 Reading CSV file in chunks...")
    for chunk in pd.read_csv(csv_path, dtype=str, chunksize=chunk_size):
        # Convert NaN to None for PostgreSQL
        chunk = chunk.where(pd.notnull(chunk), None)

        # Convert numeric columns
        for col in ["labevent_id", "subject_id", "hadm_id", "specimen_id", "itemid"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce").astype(pd.Int64Dtype())

        # Convert floating-point numeric columns
        for col in ["valuenum", "ref_range_lower", "ref_range_upper"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce")

        # Convert timestamp columns and explicitly replace NaT with None
        for col in ["charttime", "storetime"]:
            if col in chunk.columns:
                chunk[col] = pd.to_datetime(chunk[col], errors="coerce")  # Convert invalid timestamps to NaT
                chunk[col] = chunk[col].apply(lambda x: x if pd.notna(x) else None)  # Ensure None replaces NaT

        # SQL Batch Insert Query
        insert_sql = """
        INSERT INTO labevents (
            labevent_id, subject_id, hadm_id, specimen_id, itemid, order_provider_id,
            charttime, storetime, value, valuenum, valueuom, ref_range_lower,
            ref_range_upper, flag, priority, comments
        ) VALUES %s
        ON CONFLICT (labevent_id) DO NOTHING;
        """

        # Convert DataFrame to List of Tuples
        data_tuples = [tuple(x) for x in chunk.to_numpy()]

        # Ensure all `pd.NA` are converted to `None`
        data_tuples = [
            tuple(None if pd.isna(x) else x for x in row)  # ✅ Explicitly replace NaT with None
            for row in data_tuples
        ]

        # Batch Insert Data
        print(f"📤 Inserting {len(data_tuples)} rows into PostgreSQL...")
        execute_values(cur, insert_sql, data_tuples)
        conn.commit()  # Commit after each chunk to reduce memory usage
    
    print("✅ Finished inserting data into 'labevents'.")



def check_labevents():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM labevents;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'labevents': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM labevents LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)


In [43]:
create_labevents_table()  # Create table
insert_labevents()  # Insert data
check_labevents()  # Check if data is inserted

✅ Table 'labevents' dropped and recreated.
📥 Reading CSV file in chunks...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤

  chunk[col] = pd.to_datetime(chunk[col], errors="coerce")  # Convert invalid timestamps to NaT


📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into Postgr

# microbiologyevents

In [46]:
def create_microbiologyevents_table():
    """ Drops existing table and creates microbiologyevents table """
    drop_table_sql = "DROP TABLE IF EXISTS microbiologyevents CASCADE;"  # 🚀 Drop table if it exists
    create_table_sql = """
    CREATE TABLE microbiologyevents (
        microevent_id INTEGER NOT NULL PRIMARY KEY,
        subject_id INTEGER NOT NULL,
        hadm_id INTEGER,
        micro_specimen_id INTEGER NOT NULL,
        order_provider_id VARCHAR(10),
        chartdate TIMESTAMP(0) NOT NULL,
        charttime TIMESTAMP(0),
        spec_itemid INTEGER NOT NULL,
        spec_type_desc VARCHAR(100) NOT NULL,
        test_seq INTEGER NOT NULL,
        storedate TIMESTAMP(0),
        storetime TIMESTAMP(0),
        test_itemid INTEGER,
        test_name VARCHAR(100),
        org_itemid INTEGER,
        org_name VARCHAR(100),
        isolate_num SMALLINT,
        quantity VARCHAR(50),
        ab_itemid INTEGER,
        ab_name VARCHAR(30),
        dilution_text VARCHAR(10),
        dilution_comparison VARCHAR(20),
        dilution_value DOUBLE PRECISION,
        interpretation VARCHAR(5),
        comments TEXT
    );
    """
    cur.execute(drop_table_sql)  # Drop table before recreating
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'microbiologyevents' dropped and recreated.")

def insert_microbiologyevents():
    """ Reads microbiologyevents.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/microbiologyevents.csv"
    chunk_size = 50000  # Process 50,000 rows at a time

    print("📥 Reading CSV file in chunks...")
    for chunk in pd.read_csv(csv_path, dtype=str, chunksize=chunk_size):
        # Convert NaN to None for PostgreSQL
        chunk = chunk.where(pd.notnull(chunk), None)

        # Fix `spec_type_desc` NULL issue
        if "spec_type_desc" in chunk.columns:
            chunk["spec_type_desc"] = chunk["spec_type_desc"].fillna("UNKNOWN")  # ✅ Option 2: Replace NULLs

        # Convert numeric columns
        for col in ["microevent_id", "subject_id", "hadm_id", "micro_specimen_id",
                    "spec_itemid", "test_seq", "test_itemid", "org_itemid",
                    "ab_itemid", "isolate_num"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce").astype(pd.Int64Dtype())

        # Convert floating-point numeric columns
        for col in ["dilution_value"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce")

        # Convert timestamp columns and explicitly replace NaT with None
        for col in ["chartdate", "charttime", "storedate", "storetime"]:
            if col in chunk.columns:
                chunk[col] = pd.to_datetime(chunk[col], errors="coerce")  # Convert invalid timestamps to NaT
                chunk[col] = chunk[col].apply(lambda x: x if pd.notna(x) else None)  # Replace NaT with None

        # SQL Batch Insert Query
        insert_sql = """
        INSERT INTO microbiologyevents (
            microevent_id, subject_id, hadm_id, micro_specimen_id, order_provider_id,
            chartdate, charttime, spec_itemid, spec_type_desc, test_seq, storedate,
            storetime, test_itemid, test_name, org_itemid, org_name, isolate_num,
            quantity, ab_itemid, ab_name, dilution_text, dilution_comparison,
            dilution_value, interpretation, comments
        ) VALUES %s
        ON CONFLICT (microevent_id) DO NOTHING;
        """

        # Convert DataFrame to List of Tuples
        data_tuples = [tuple(x) for x in chunk.to_numpy()]

        # Ensure all `pd.NA` are converted to `None`
        data_tuples = [
            tuple(None if pd.isna(x) else x for x in row)  # ✅ Explicitly replace NaT with None
            for row in data_tuples
        ]

        # Batch Insert Data
        print(f"📤 Inserting {len(data_tuples)} rows into PostgreSQL...")
        execute_values(cur, insert_sql, data_tuples)
        conn.commit()  # Commit after each chunk to reduce memory usage
    
    print("✅ Finished inserting data into 'microbiologyevents'.")

def check_microbiologyevents():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM microbiologyevents;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'microbiologyevents': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM microbiologyevents LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [48]:
create_microbiologyevents_table()  # Create table
insert_microbiologyevents()  # Insert data
check_microbiologyevents()  # Check if data is inserted

✅ Table 'microbiologyevents' dropped and recreated.
📥 Reading CSV file in chunks...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into Postgr

# patients

In [None]:
def create_patients_table():
    """ Drops existing table and creates patients table """
    drop_table_sql = "DROP TABLE IF EXISTS patients CASCADE;"  # 🚀 Drop table if it exists
    create_table_sql = """
    CREATE TABLE patients (
        subject_id INTEGER NOT NULL PRIMARY KEY,
        gender VARCHAR(1) NOT NULL,
        anchor_age INTEGER NOT NULL,
        anchor_year INTEGER NOT NULL,
        anchor_year_group VARCHAR(255) NOT NULL,
        dod TIMESTAMP(0)
    );
    """
    cur.execute(drop_table_sql)  # Drop table before recreating
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'patients' dropped and recreated.")

def insert_patients():
    """ Reads patients.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/patients.csv"  # Ensure correct path
    chunk_size = 50000  # Process 50,000 rows at a time

    print("📥 Reading CSV file in chunks...")
    for chunk in pd.read_csv(csv_path, dtype=str, chunksize=chunk_size):
        # Convert NaN to None for PostgreSQL
        chunk = chunk.where(pd.notnull(chunk), None)

        # Convert numeric columns
        for col in ["subject_id", "anchor_age", "anchor_year"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce").astype(pd.Int64Dtype())

        # Convert timestamp columns and explicitly replace NaT with None
        if "dod" in chunk.columns:
            chunk["dod"] = pd.to_datetime(chunk["dod"], errors="coerce")  # Convert invalid timestamps to NaT
            chunk["dod"] = chunk["dod"].apply(lambda x: x if pd.notna(x) else None)  # Replace NaT with None

        # SQL Batch Insert Query
        insert_sql = """
        INSERT INTO patients (
            subject_id, gender, anchor_age, anchor_year, anchor_year_group, dod
        ) VALUES %s
        ON CONFLICT (subject_id) DO NOTHING;
        """

        # Convert DataFrame to List of Tuples
        data_tuples = [tuple(x) for x in chunk.to_numpy()]

        # Ensure all `pd.NA` are converted to `None`
        data_tuples = [
            tuple(None if pd.isna(x) else x for x in row)  # ✅ Explicitly replace NaT with None
            for row in data_tuples
        ]

        # Batch Insert Data
        print(f"📤 Inserting {len(data_tuples)} rows into PostgreSQL...")
        execute_values(cur, insert_sql, data_tuples)
        conn.commit()  # Commit after each chunk to reduce memory usage
    
    print("✅ Finished inserting data into 'patients'.")

def check_patients():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM patients;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'patients': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM patients LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

✅ Table 'patients' dropped and recreated.
📥 Reading CSV file in chunks...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 49712 rows into PostgreSQL...
✅ Finished inserting data into 'patients'.
📊 Total rows in 'patients': 299712
🧐 Sample data:
(10000032, 'F', 52, 2180, '2014 - 2016', datetime.datetime(2180, 9, 9, 0, 0))
(10000048, 'F', 23, 2126, '2008 - 2010', None)
(10000068, 'F', 19, 2160, '2008 - 2010', None)
(10000084, 'M', 72, 2160, '2017 - 2019', datetime.datetime(2161, 2, 13, 0, 0))
(10000102, 'F', 27, 2136, '2008 - 2010', None)


In [None]:
create_patients_table()  # Create table
insert_patients()  # Insert data
check_patients()  # Check if data is inserted

# pharmacy

In [50]:
def create_pharmacy_table():
    """ Drops existing table and creates pharmacy table """
    drop_table_sql = "DROP TABLE IF EXISTS pharmacy CASCADE;"  # 🚀 Drop table if it exists
    create_table_sql = """
    CREATE TABLE pharmacy (
        subject_id INTEGER NOT NULL,
        hadm_id INTEGER NOT NULL,
        pharmacy_id INTEGER NOT NULL PRIMARY KEY,
        poe_id VARCHAR(25),
        starttime TIMESTAMP(3),
        stoptime TIMESTAMP(3),
        medication TEXT,
        proc_type VARCHAR(50) NOT NULL,
        status VARCHAR(50),
        entertime TIMESTAMP(3) NOT NULL,
        verifiedtime TIMESTAMP(3),
        route VARCHAR(50),
        frequency VARCHAR(50),
        disp_sched VARCHAR(255),
        infusion_type VARCHAR(15),
        sliding_scale VARCHAR(1),
        lockout_interval VARCHAR(50),
        basal_rate REAL,
        one_hr_max VARCHAR(10),
        doses_per_24_hrs REAL,
        duration REAL,
        duration_interval VARCHAR(50),
        expiration_value INTEGER,
        expiration_unit VARCHAR(50),
        expirationdate TIMESTAMP(3),
        dispensation VARCHAR(50),
        fill_quantity VARCHAR(50)
    );
    """
    cur.execute(drop_table_sql)  # Drop table before recreating
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'pharmacy' dropped and recreated.")

def insert_pharmacy():
    """ Reads pharmacy.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/pharmacy.csv"  # Ensure correct path
    chunk_size = 50000  # Process 50,000 rows at a time

    print("📥 Reading CSV file in chunks...")
    for chunk in pd.read_csv(csv_path, dtype=str, chunksize=chunk_size):
        # Convert NaN to None for PostgreSQL
        chunk = chunk.where(pd.notnull(chunk), None)

        # Convert numeric columns
        for col in ["subject_id", "hadm_id", "pharmacy_id", "expiration_value"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce").astype(pd.Int64Dtype())

        # Convert floating-point numeric columns
        for col in ["basal_rate", "doses_per_24_hrs", "duration"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce")

        # Convert timestamp columns and explicitly replace NaT with None
        for col in ["starttime", "stoptime", "entertime", "verifiedtime", "expirationdate"]:
            if col in chunk.columns:
                chunk[col] = pd.to_datetime(chunk[col], errors="coerce")  # Convert invalid timestamps to NaT
                chunk[col] = chunk[col].apply(lambda x: x if pd.notna(x) else None)  # Replace NaT with None

        # SQL Batch Insert Query
        insert_sql = """
        INSERT INTO pharmacy (
            subject_id, hadm_id, pharmacy_id, poe_id, starttime, stoptime,
            medication, proc_type, status, entertime, verifiedtime, route, frequency,
            disp_sched, infusion_type, sliding_scale, lockout_interval, basal_rate,
            one_hr_max, doses_per_24_hrs, duration, duration_interval, expiration_value,
            expiration_unit, expirationdate, dispensation, fill_quantity
        ) VALUES %s
        ON CONFLICT (pharmacy_id) DO NOTHING;
        """

        # Convert DataFrame to List of Tuples
        data_tuples = [tuple(x) for x in chunk.to_numpy()]

        # Ensure all `pd.NA` are converted to `None`
        data_tuples = [
            tuple(None if pd.isna(x) else x for x in row)  # ✅ Explicitly replace NaT with None
            for row in data_tuples
        ]

        # Batch Insert Data
        print(f"📤 Inserting {len(data_tuples)} rows into PostgreSQL...")
        execute_values(cur, insert_sql, data_tuples)
        conn.commit()  # Commit after each chunk to reduce memory usage
    
    print("✅ Finished inserting data into 'pharmacy'.")

def check_pharmacy():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM pharmacy;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'pharmacy': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM pharmacy LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [51]:
create_pharmacy_table()  # Create table
insert_pharmacy()  # Insert data
check_pharmacy()  # Check if data is inserted

✅ Table 'pharmacy' dropped and recreated.
📥 Reading CSV file in chunks...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 

# poe

In [52]:
def create_poe_table():
    """ Drops existing table and creates poe table """
    drop_table_sql = "DROP TABLE IF EXISTS poe CASCADE;"  # 🚀 Drop table if it exists
    create_table_sql = """
    CREATE TABLE poe (
        poe_id VARCHAR(25) NOT NULL,
        poe_seq INTEGER NOT NULL,
        subject_id INTEGER NOT NULL,
        hadm_id INTEGER,
        ordertime TIMESTAMP(0) NOT NULL,
        order_type VARCHAR(25) NOT NULL,
        order_subtype VARCHAR(50),
        transaction_type VARCHAR(15),
        discontinue_of_poe_id VARCHAR(25),
        discontinued_by_poe_id VARCHAR(25),
        order_provider_id VARCHAR(10),
        order_status VARCHAR(15),
        PRIMARY KEY (poe_id, poe_seq)
    );
    """
    cur.execute(drop_table_sql)  # Drop table before recreating
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'poe' dropped and recreated.")

def insert_poe():
    """ Reads poe.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/poe.csv"  # Ensure correct path
    chunk_size = 50000  # Process 50,000 rows at a time

    print("📥 Reading CSV file in chunks...")
    for chunk in pd.read_csv(csv_path, dtype=str, chunksize=chunk_size):
        # Convert NaN to None for PostgreSQL
        chunk = chunk.where(pd.notnull(chunk), None)

        # Convert numeric columns
        for col in ["poe_seq", "subject_id", "hadm_id"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce").astype(pd.Int64Dtype())

        # Convert timestamp columns and explicitly replace NaT with None
        if "ordertime" in chunk.columns:
            chunk["ordertime"] = pd.to_datetime(chunk["ordertime"], errors="coerce")  # Convert invalid timestamps to NaT
            chunk["ordertime"] = chunk["ordertime"].apply(lambda x: x if pd.notna(x) else None)  # Replace NaT with None

        # SQL Batch Insert Query
        insert_sql = """
        INSERT INTO poe (
            poe_id, poe_seq, subject_id, hadm_id, ordertime, order_type, order_subtype,
            transaction_type, discontinue_of_poe_id, discontinued_by_poe_id, order_provider_id, order_status
        ) VALUES %s
        ON CONFLICT (poe_id, poe_seq) DO NOTHING;
        """

        # Convert DataFrame to List of Tuples
        data_tuples = [tuple(x) for x in chunk.to_numpy()]

        # Ensure all `pd.NA` are converted to `None`
        data_tuples = [
            tuple(None if pd.isna(x) else x for x in row)  # ✅ Explicitly replace NaT with None
            for row in data_tuples
        ]

        # Batch Insert Data
        print(f"📤 Inserting {len(data_tuples)} rows into PostgreSQL...")
        execute_values(cur, insert_sql, data_tuples)
        conn.commit()  # Commit after each chunk to reduce memory usage
    
    print("✅ Finished inserting data into 'poe'.")

def check_poe():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM poe;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'poe': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM poe LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)


In [53]:
create_poe_table()  # Create table
insert_poe()  # Insert data
check_poe()  # Check if data is inserted

✅ Table 'poe' dropped and recreated.
📥 Reading CSV file in chunks...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inser

# poe_detail

In [9]:
def create_poe_detail_table():
    """ Drops existing table and creates poe_detail table """
    drop_table_sql = "DROP TABLE IF EXISTS poe_detail CASCADE;"  # 🚀 Drop table if it exists
    create_table_sql = """
    CREATE TABLE poe_detail (
        poe_id VARCHAR(25) NOT NULL,
        poe_seq INTEGER NOT NULL,
        subject_id INTEGER NOT NULL,
        field_name VARCHAR(255) NOT NULL,
        field_value TEXT,
        PRIMARY KEY (poe_id, poe_seq, field_name)
    );
    """
    cur.execute(drop_table_sql)  # Drop table before recreating
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'poe_detail' dropped and recreated.")

def insert_poe_detail():
    """ Reads poe_detail.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/poe_detail.csv"  # Ensure correct path
    chunk_size = 50000  # Process 50,000 rows at a time

    print("📥 Reading CSV file in chunks...")
    for chunk in pd.read_csv(csv_path, dtype=str, chunksize=chunk_size):
        # Convert NaN to None for PostgreSQL
        chunk = chunk.where(pd.notnull(chunk), None)

        # Convert numeric columns
        for col in ["poe_seq", "subject_id"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce").astype(pd.Int64Dtype())

        # SQL Batch Insert Query
        insert_sql = """
        INSERT INTO poe_detail (
            poe_id, poe_seq, subject_id, field_name, field_value
        ) VALUES %s
        ON CONFLICT (poe_id, poe_seq, field_name) DO NOTHING;
        """

        # Convert DataFrame to List of Tuples
        data_tuples = [tuple(x) for x in chunk.to_numpy()]

        # Ensure all `pd.NA` are converted to `None`
        data_tuples = [
            tuple(None if pd.isna(x) else x for x in row)  # ✅ Explicitly replace NaT with None
            for row in data_tuples
        ]

        # Batch Insert Data
        print(f"📤 Inserting {len(data_tuples)} rows into PostgreSQL...")
        execute_values(cur, insert_sql, data_tuples)
        conn.commit()  # Commit after each chunk to reduce memory usage
    
    print("✅ Finished inserting data into 'poe_detail'.")

def check_poe_detail():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM poe_detail;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'poe_detail': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM poe_detail LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [10]:
create_poe_detail_table()  # Create table
insert_poe_detail()  # Insert data
check_poe_detail()  # Check if data is inserted

✅ Table 'poe_detail' dropped and recreated.
📥 Reading CSV file in chunks...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...


# prescriptions

In [7]:
def create_prescriptions_table():
    """ Drops existing table and creates prescriptions table """
    drop_table_sql = "DROP TABLE IF EXISTS prescriptions CASCADE;"  # 🚀 Drop table if it exists
    create_table_sql = """
    CREATE TABLE prescriptions (
        subject_id INTEGER NOT NULL,
        hadm_id INTEGER NOT NULL,
        pharmacy_id INTEGER NOT NULL,
        poe_id VARCHAR(25),
        poe_seq INTEGER,
        order_provider_id VARCHAR(10),
        starttime TIMESTAMP(3),
        stoptime TIMESTAMP(3),
        drug_type VARCHAR(20) NOT NULL,
        drug VARCHAR(255) NOT NULL,
        formulary_drug_cd VARCHAR(50),
        gsn VARCHAR(255),
        ndc VARCHAR(25),
        prod_strength VARCHAR(255),
        form_rx VARCHAR(25),
        dose_val_rx VARCHAR(100),
        dose_unit_rx VARCHAR(50),
        form_val_disp VARCHAR(50),
        form_unit_disp VARCHAR(50),
        doses_per_24_hrs REAL,
        route VARCHAR(50),
        PRIMARY KEY (subject_id, hadm_id, pharmacy_id)
    );
    """
    cur.execute(drop_table_sql)  # Drop table before recreating
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'prescriptions' dropped and recreated.")

def insert_prescriptions():
    """ Reads prescriptions.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/prescriptions.csv"  # Ensure correct path
    chunk_size = 50000  # Process 50,000 rows at a time

    print("📥 Reading CSV file in chunks...")
    for chunk in pd.read_csv(csv_path, dtype=str, chunksize=chunk_size):
        # Convert NaN to None for PostgreSQL
        chunk = chunk.where(pd.notnull(chunk), None)

        # Fill missing values for the 'drug' column with a default value
        if "drug" in chunk.columns:
            chunk["drug"] = chunk["drug"].fillna("UNKNOWN")

        # Convert numeric columns
        for col in ["subject_id", "hadm_id", "pharmacy_id", "poe_seq"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce").astype(pd.Int64Dtype())

        # Convert floating-point numeric columns
        for col in ["doses_per_24_hrs"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce")

        # Convert timestamp columns and explicitly replace NaT with None
        for col in ["starttime", "stoptime"]:
            if col in chunk.columns:
                chunk[col] = pd.to_datetime(chunk[col], errors="coerce")
                chunk[col] = chunk[col].apply(lambda x: x if pd.notna(x) else None)

        # SQL Batch Insert Query
        insert_sql = """
        INSERT INTO prescriptions (
            subject_id, hadm_id, pharmacy_id, poe_id, poe_seq, order_provider_id,
            starttime, stoptime, drug_type, drug, formulary_drug_cd, gsn, ndc,
            prod_strength, form_rx, dose_val_rx, dose_unit_rx, form_val_disp,
            form_unit_disp, doses_per_24_hrs, route
        ) VALUES %s
        ON CONFLICT (subject_id, hadm_id, pharmacy_id) DO NOTHING;
        """

        # Convert DataFrame to List of Tuples
        data_tuples = [tuple(x) for x in chunk.to_numpy()]

        # Ensure all pd.NA values are converted to None
        data_tuples = [
            tuple(None if pd.isna(x) else x for x in row)
            for row in data_tuples
        ]

        # Batch Insert Data
        print(f"📤 Inserting {len(data_tuples)} rows into PostgreSQL...")
        execute_values(cur, insert_sql, data_tuples)
        conn.commit()  # Commit after each chunk to reduce memory usage

    print("✅ Finished inserting data into 'prescriptions'.")

def check_prescriptions():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM prescriptions;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'prescriptions': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM prescriptions LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)


In [8]:
create_prescriptions_table()  # Create table
insert_prescriptions()  # Insert data
check_prescriptions()  # Check if data is inserted

✅ Table 'prescriptions' dropped and recreated.
📥 Reading CSV file in chunks...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL.

# procedures_icd

In [12]:
def create_procedures_icd_table():
    """ Drops existing table and creates procedures_icd table """
    drop_table_sql = "DROP TABLE IF EXISTS procedures_icd CASCADE;"  # 🚀 Drop table if it exists
    create_table_sql = """
    CREATE TABLE procedures_icd (
        subject_id INTEGER NOT NULL,
        hadm_id INTEGER NOT NULL,
        seq_num INTEGER NOT NULL,
        chartdate DATE NOT NULL,
        icd_code VARCHAR(7),
        icd_version INTEGER,
        PRIMARY KEY (subject_id, hadm_id, seq_num)
    );
    """
    cur.execute(drop_table_sql)  # Drop table before recreating
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'procedures_icd' dropped and recreated.")

def insert_procedures_icd():
    """ Reads procedures_icd.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/procedures_icd.csv"  # Ensure correct path
    chunk_size = 50000  # Process 50,000 rows at a time

    print("📥 Reading CSV file in chunks...")
    for chunk in pd.read_csv(csv_path, dtype=str, chunksize=chunk_size):
        # Convert NaN to None for PostgreSQL
        chunk = chunk.where(pd.notnull(chunk), None)

        # Convert numeric columns
        for col in ["subject_id", "hadm_id", "seq_num", "icd_version"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce").astype(pd.Int64Dtype())

        # Convert chartdate to proper DATE format
        if "chartdate" in chunk.columns:
            chunk["chartdate"] = pd.to_datetime(chunk["chartdate"], errors="coerce").dt.date

        # SQL Batch Insert Query
        insert_sql = """
        INSERT INTO procedures_icd (
            subject_id, hadm_id, seq_num, chartdate, icd_code, icd_version
        ) VALUES %s
        ON CONFLICT (subject_id, hadm_id, seq_num) DO NOTHING;
        """

        # Convert DataFrame to List of Tuples
        data_tuples = [tuple(x) for x in chunk.to_numpy()]

        # Ensure all `pd.NA` are converted to `None`
        data_tuples = [
            tuple(None if pd.isna(x) else x for x in row)  # ✅ Explicitly replace NaT with None
            for row in data_tuples
        ]

        # Batch Insert Data
        print(f"📤 Inserting {len(data_tuples)} rows into PostgreSQL...")
        execute_values(cur, insert_sql, data_tuples)
        conn.commit()  # Commit after each chunk to reduce memory usage
    
    print("✅ Finished inserting data into 'procedures_icd'.")

def check_procedures_icd():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM procedures_icd;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'procedures_icd': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM procedures_icd LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [13]:
create_procedures_icd_table()  # Create table
insert_procedures_icd()  # Insert data
check_procedures_icd()  # Check if data is inserted

✅ Table 'procedures_icd' dropped and recreated.
📥 Reading CSV file in chunks...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 19186 rows into PostgreSQL...
✅ Finished inserting data into 'procedures_icd'.
📊 Total rows in 'procedures_icd': 669184
🧐 Sample data:
(10000032, 22595853, 1, datetime.date(2180, 5, 7), '5491', 9)
(10000032, 22841357, 1, datetime.date(2180, 6, 27), '5491', 9)
(10000032, 25742920, 1, datetime.date(2180, 8, 6), '5491', 9)
(10000068, 25022803, 1, datetime.date(21

# services

In [14]:
def create_services_table():
    """ Drops existing table and creates services table """
    drop_table_sql = "DROP TABLE IF EXISTS services CASCADE;"  # 🚀 Drop table if it exists
    create_table_sql = """
    CREATE TABLE services (
        subject_id INT NOT NULL,
        hadm_id INT NOT NULL,
        transfertime TIMESTAMP(0),
        prev_service VARCHAR(20),
        curr_service VARCHAR(20),
        PRIMARY KEY (subject_id, hadm_id, transfertime)
    );
    """
    cur.execute(drop_table_sql)  # Drop table before recreating
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'services' dropped and recreated.")

def insert_services():
    """ Reads services.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/services.csv"  # Ensure correct path
    chunk_size = 50000  # Process 50,000 rows at a time

    print("📥 Reading CSV file in chunks...")
    for chunk in pd.read_csv(csv_path, dtype=str, chunksize=chunk_size):
        # Convert NaN to None for PostgreSQL
        chunk = chunk.where(pd.notnull(chunk), None)

        # Convert numeric columns
        for col in ["subject_id", "hadm_id"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce").astype(pd.Int64Dtype())

        # Convert timestamp columns and explicitly replace NaT with None
        if "transfertime" in chunk.columns:
            chunk["transfertime"] = pd.to_datetime(chunk["transfertime"], errors="coerce")  # Convert invalid timestamps to NaT
            chunk["transfertime"] = chunk["transfertime"].apply(lambda x: x if pd.notna(x) else None)  # Replace NaT with None

        # SQL Batch Insert Query
        insert_sql = """
        INSERT INTO services (
            subject_id, hadm_id, transfertime, prev_service, curr_service
        ) VALUES %s
        ON CONFLICT (subject_id, hadm_id, transfertime) DO NOTHING;
        """

        # Convert DataFrame to List of Tuples
        data_tuples = [tuple(x) for x in chunk.to_numpy()]

        # Ensure all `pd.NA` are converted to `None`
        data_tuples = [
            tuple(None if pd.isna(x) else x for x in row)  # ✅ Explicitly replace NaT with None
            for row in data_tuples
        ]

        # Batch Insert Data
        print(f"📤 Inserting {len(data_tuples)} rows into PostgreSQL...")
        execute_values(cur, insert_sql, data_tuples)
        conn.commit()  # Commit after each chunk to reduce memory usage
    
    print("✅ Finished inserting data into 'services'.")

def check_services():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM services;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'services': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM services LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [15]:
create_services_table()  # Create table
insert_services()  # Insert data
check_services()  # Check if data is inserted

✅ Table 'services' dropped and recreated.
📥 Reading CSV file in chunks...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 18029 rows into PostgreSQL...
✅ Finished inserting data into 'services'.
📊 Total rows in 'services': 468028
🧐 Sample data:
(10000032, 22595853, datetime.datetime(2180, 5, 6, 22, 24, 57), None, 'MED')
(10000032, 22841357, datetime.datetime(2180, 6, 26, 18, 28, 8), None, 'MED')
(10000032, 25742920, datetime.datetime(2180, 8, 5, 23, 44, 50), None, 'MED')
(10000032, 29079034, datetime.datetime(2180, 7, 23, 12, 36, 4), None, 'MED')
(10000068, 25022803, datetime.datetime(2160, 3, 3, 23, 17, 17), None, 'MED')


# transfers

In [18]:
def create_transfers_table():
    """ Drops existing table and creates transfers table """
    drop_table_sql = "DROP TABLE IF EXISTS transfers CASCADE;"  # 🚀 Drop table if it exists
    create_table_sql = """
    CREATE TABLE transfers (
        subject_id INTEGER NOT NULL,
        hadm_id INTEGER,
        transfer_id INTEGER NOT NULL PRIMARY KEY,
        eventtype VARCHAR(10),
        careunit VARCHAR(255),
        intime TIMESTAMP(0),
        outtime TIMESTAMP(0)
    );
    """
    cur.execute(drop_table_sql)  # Drop table before recreating
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'transfers' dropped and recreated.")

def insert_transfers():
    """ Reads transfers.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/transfers.csv"  # Ensure correct path
    chunk_size = 50000  # Process 50,000 rows at a time

    print("📥 Reading CSV file in chunks...")
    for chunk in pd.read_csv(csv_path, dtype=str, chunksize=chunk_size):
        # Convert NaN to None for PostgreSQL
        chunk = chunk.where(pd.notnull(chunk), None)

        # Convert numeric columns
        for col in ["subject_id", "hadm_id", "transfer_id"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce").astype(pd.Int64Dtype())

        # Convert timestamp columns and explicitly replace NaT with None
        for col in ["intime", "outtime"]:
            if col in chunk.columns:
                chunk[col] = pd.to_datetime(chunk[col], errors="coerce")  # Convert invalid timestamps to NaT
                chunk[col] = chunk[col].apply(lambda x: x if pd.notna(x) else None)  # Replace NaT with None

        # SQL Batch Insert Query
        insert_sql = """
        INSERT INTO transfers (
            subject_id, hadm_id, transfer_id, eventtype, careunit, intime, outtime
        ) VALUES %s
        ON CONFLICT (transfer_id) DO NOTHING;
        """

        # Convert DataFrame to List of Tuples
        data_tuples = [tuple(x) for x in chunk.to_numpy()]

        # Ensure all `pd.NA` are converted to `None`
        data_tuples = [
            tuple(None if pd.isna(x) else x for x in row)  # ✅ Explicitly replace NaT with None
            for row in data_tuples
        ]

        # Batch Insert Data
        print(f"📤 Inserting {len(data_tuples)} rows into PostgreSQL...")
        execute_values(cur, insert_sql, data_tuples)
        conn.commit()  # Commit after each chunk to reduce memory usage
    
    print("✅ Finished inserting data into 'transfers'.")

def check_transfers():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM transfers;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'transfers': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM transfers LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [17]:
create_transfers_table()  # Create table
insert_transfers()  # Insert data
check_transfers()  # Check if data is inserted

✅ Table 'transfers' dropped and recreated.
📥 Reading CSV file in chunks...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤

# omr

In [19]:
def create_omr_table():
    """ Drops existing table and creates omr table """
    drop_table_sql = "DROP TABLE IF EXISTS omr CASCADE;"  # 🚀 Drop table if it exists
    create_table_sql = """
    CREATE TABLE omr (
        subject_id INTEGER NOT NULL,
        chartdate DATE NOT NULL,
        seq_num INTEGER NOT NULL,
        result_name VARCHAR(100) NOT NULL,
        result_value TEXT NOT NULL,
        PRIMARY KEY (subject_id, chartdate, seq_num, result_name)
    );
    """
    cur.execute(drop_table_sql)  # Drop table before recreating
    cur.execute(create_table_sql)
    conn.commit()
    print("✅ Table 'omr' dropped and recreated.")

def insert_omr():
    """ Reads omr.csv and inserts data into PostgreSQL """
    csv_path = "./data/hosp/omr.csv"  # Ensure correct path
    chunk_size = 50000  # Process 50,000 rows at a time

    print("📥 Reading CSV file in chunks...")
    for chunk in pd.read_csv(csv_path, dtype=str, chunksize=chunk_size):
        # Convert NaN to None for PostgreSQL
        chunk = chunk.where(pd.notnull(chunk), None)

        # Convert numeric columns
        for col in ["subject_id", "seq_num"]:
            if col in chunk.columns:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce").astype(pd.Int64Dtype())

        # Convert chartdate to proper DATE format
        if "chartdate" in chunk.columns:
            chunk["chartdate"] = pd.to_datetime(chunk["chartdate"], errors="coerce").dt.date

        # SQL Batch Insert Query
        insert_sql = """
        INSERT INTO omr (
            subject_id, chartdate, seq_num, result_name, result_value
        ) VALUES %s
        ON CONFLICT (subject_id, chartdate, seq_num, result_name) DO NOTHING;
        """

        # Convert DataFrame to List of Tuples
        data_tuples = [tuple(x) for x in chunk.to_numpy()]

        # Ensure all `pd.NA` are converted to `None`
        data_tuples = [
            tuple(None if pd.isna(x) else x for x in row)
            for row in data_tuples
        ]

        # Batch Insert Data
        print(f"📤 Inserting {len(data_tuples)} rows into PostgreSQL...")
        execute_values(cur, insert_sql, data_tuples)
        conn.commit()  # Commit after each chunk to reduce memory usage
    
    print("✅ Finished inserting data into 'omr'.")

def check_omr():
    """ Checks if data was successfully inserted """
    cur.execute("SELECT COUNT(*) FROM omr;")
    count = cur.fetchone()[0]
    print(f"📊 Total rows in 'omr': {count}")

    # Fetch first 5 rows
    cur.execute("SELECT * FROM omr LIMIT 5;")
    rows = cur.fetchall()
    print("🧐 Sample data:")
    for row in rows:
        print(row)

In [20]:
create_omr_table()  # Create table
insert_omr()        # Insert data
check_omr()         # Check if data is inserted

✅ Table 'omr' dropped and recreated.
📥 Reading CSV file in chunks...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inserting 50000 rows into PostgreSQL...
📤 Inser

In [21]:
conn.close()