In [1]:
from pathlib import Path
import pandas as pd
import sqlite3
from src.utils.os_helper import get_project_root
import os

In [5]:
repo = get_project_root()

CACHE_DIR = repo / "cache" / "data" / "awesome_cgm" / "aleppo"
data_tables = CACHE_DIR / "raw" / "Data Tables"
db_path = CACHE_DIR / "awesome_cgm.db"

# SQLite param cap (commonly 999). Keep a margin.
SQLITE_MAX_VARS = 999
MARGIN = 10

In [None]:
con = sqlite3.connect(db_path)
cur = con.cursor()
cur.execute("PRAGMA journal_mode=WAL;")
cur.execute("PRAGMA synchronous=OFF;")
cur.execute("PRAGMA temp_store=MEMORY;")

for f in sorted(data_tables.glob("*.txt")):
    table = f.stem
    print(f"Importing {f.name} -> {table}")

    # Stream in chunks to control memory and SQL variables
    first = True
    for df in pd.read_csv(f, sep="|", dtype=str, low_memory=False, chunksize=50_000):
        ncols = len(df.columns)
        # rows per insert so (rows * cols) <= SQLITE_MAX_VARS - MARGIN
        safe_rows = max(1, (SQLITE_MAX_VARS - MARGIN) // max(1, ncols))

        # method=None avoids multi-row SQL text construction; Pandas will executemany
        df.to_sql(
            table,
            con,
            if_exists="replace" if first else "append",
            index=False,
            chunksize=safe_rows,
            method=None,
        )
        first = False


indexes = [
    # Bolus Indexes
    "CREATE INDEX idx_hdevicebolus_ptid ON HDeviceBolus(PtID);",
    "CREATE INDEX idx_hdevicebolus_parentid ON HDeviceBolus(ParentHDeviceUploadsID);",
    "CREATE INDEX idx_hdevicebolus_ptid_days ON HDeviceBolus(PtID, DeviceDtTmDaysFromEnroll);",
    "CREATE INDEX idx_hdevicebolus_normal ON HDeviceBolus(Normal) WHERE Normal IS NOT NULL;",
    "CREATE INDEX idx_hdevicebolus_order ON HDeviceBolus(PtID, DeviceDtTmDaysFromEnroll, DeviceTm);",
    # Uploads Indexes
    "CREATE INDEX idx_hdeviceuploads_recid ON HDeviceUploads(RecID);",
    "CREATE INDEX idx_hdeviceuploads_ptid ON HDeviceUploads(PtID);",
    "CREATE INDEX idx_hdeviceuploads_device ON HDeviceUploads(DeviceModel, DeviceType);",
    # CGM Indexes
    "CREATE INDEX idx_hdevicecgm_recid ON HDeviceCGM(RecID);",
    "CREATE INDEX idx_hdevicecgm_recordtype ON HDeviceCGM(RecordType);",
    "CREATE INDEX idx_hdevicecgm_ptid_days ON HDeviceCGM(PtID, DeviceDtTmDaysFromEnroll);",
    "CREATE INDEX idx_hdevicecgm_ptid_time ON HDeviceCGM(PtID, DeviceTm);",
    # Wizard Indexes
    "CREATE INDEX idx_hdevicewizard_recid ON HDeviceWizard(RecID);",
    "CREATE INDEX idx_hdevicewizard_ptid ON HDeviceWizard(PtID);",
    "CREATE INDEX idx_hdevicewizard_ptid_days ON HDeviceWizard(PtID, DeviceDtTmDaysFromEnroll);",
    "CREATE INDEX idx_hdevicewizard_ptid_time ON HDeviceWizard(PtID, DeviceTm);",
]

for i, index_sql in enumerate(indexes):
    try:
        cur.execute(index_sql)
        print(
            f"Created index {i+1}/{len(indexes)}: {index_sql.split('ON ')[1].split('(')[0]}"
        )
    except sqlite3.Error as e:
        print(f"Error creating index: {e}")

con.commit()
con.close()

In [None]:
def query_db(query: str):
    # Open the connection first
    con = sqlite3.connect(db_path)
    # cur = con.cursor()
    df = pd.read_sql_query(query, con)
    con.close()
    return df

### Convert data to csv

- pid
- date
- tableType
- bolusType
- normalBolus
- expectedNormalBolus
- extendedBolus
- expectedExtendedBolus
- bgInput
- foodG
- iob
- cr
- isf
- bgMgdl
- rate
- suprBasalType
- suprRate

In [None]:
query = """
WITH
  params AS (
    SELECT '2020-01-01' AS base_date
  )
SELECT
    pid, date, tableType,
    bolusType, normalBolus, expectedNormalBolus, extendedBolus, expectedExtendedBolus,
    bgInput, foodG, iob, cr, isf,
    bgMgdl,
    rate, suprBasalType, suprRate
FROM (
    -- Bolus data
    SELECT
        HDeviceBolus.PtID as pid,
        datetime(julianday((SELECT base_date FROM params)) + CAST(HDeviceBolus.DeviceDtTmDaysFromEnroll AS INTEGER) + (julianday(HDeviceBolus.DeviceTm) - julianday('00:00:00'))) AS date,
        'bolus' as tableType,
        HDeviceBolus.BolusType as bolusType,
        Normal as normalBolus,
        ExpectedNormal as expectedNormalBolus,
        Extended as extendedBolus,
        ExpectedExtended as expectedExtendedBolus,
        NULL as bgInput,
        NULL as foodG,
        NULL as iob,
        NULL as cr,
        NULL as isf,
        NULL as bgMgdl,
        NULL as rate,
        NULL as suprBasalType,
        NULL as suprRate
    FROM HDeviceBolus

    UNION ALL

    -- Wizard data
    SELECT
        HDeviceWizard.PtID as pid,
        datetime(julianday((SELECT base_date FROM params)) + CAST(HDeviceWizard.DeviceDtTmDaysFromEnroll AS INTEGER) + (julianday(HDeviceWizard.DeviceTm) - julianday('00:00:00'))) AS date,
        'wizard' as tableType,
        NULL as bolusType,
        NULL as normalBolus,
        NULL as expectedNormalBolus,
        NULL as extendedBolus,
        NULL as expectedExtendedBolus,
        HDeviceWizard.BgInput as bgInput,
        HDeviceWizard.CarbInput / 1000 as foodG,
        HDeviceWizard.InsulinOnBoard as iob,
        HDeviceWizard.InsulinCarbRatio as cr,
        HDeviceWizard.InsulinSensitivity as isf,
        NULL as bgMgdl,
        NULL as rate,
        NULL as suprBasalType,
        NULL as suprRate
    FROM HDeviceWizard

    UNION ALL

    -- CGM data
    SELECT
        HDeviceCGM.PtID as pid,
        datetime(julianday((SELECT base_date FROM params)) + CAST(HDeviceCGM.DeviceDtTmDaysFromEnroll AS INTEGER) + (julianday(HDeviceCGM.DeviceTm) - julianday('00:00:00'))) AS date,
        'cgm' as tableType,
        NULL as bolusType,
        NULL as normalBolus,
        NULL as expectedNormalBolus,
        NULL as extendedBolus,
        NULL as expectedExtendedBolus,
        NULL as bgInput,
        NULL as foodG,
        NULL as iob,
        NULL as cr,
        NULL as isf,
        HDeviceCGM.GlucoseValue as bgMgdl,
        NULL as rate,
        NULL as suprBasalType,
        NULL as suprRate
    FROM HDeviceCGM
    WHERE HDeviceCGM.GlucoseValue IS NOT NULL
    AND HDeviceCGM.RecordType = 'CGM'

    UNION ALL

    -- Basal data
    SELECT
        HDeviceBasal.PtID as pid,
        datetime(julianday((SELECT base_date FROM params)) + CAST(HDeviceBasal.DeviceDtTmDaysFromEnroll AS INTEGER) + (julianday(HDeviceBasal.DeviceTm) - julianday('00:00:00'))) AS date,
        'basal' as tableType,
        NULL as bolusType,
        NULL as normalBolus,
        NULL as expectedNormalBolus,
        NULL as extendedBolus,
        NULL as expectedExtendedBolus,
        NULL as bgInput,
        NULL as foodG,
        NULL as iob,
        NULL as cr,
        NULL as isf,
        NULL as bgMgdl,
        HDeviceBasal.Rate as rate,
        HDeviceBasal.SuprBasalType as suprBasalType,
        HDeviceBasal.SuprRate as suprRate
    FROM HDeviceBasal
)
ORDER BY pid, date ASC;
"""
df_all = query_db(query)
df_all.to_csv("output.csv")

### Raw -> Interim

In [9]:
def convert_to_csv(df):
    project_root = get_project_root()
    data_dir = project_root / "cache" / "data" / "awesome_cgm" / "aleppo" / "interim"
    os.makedirs(data_dir, exist_ok=True)
    for pid in df["pid"].unique():
        df_pid = df[df["pid"] == pid]
        df_pid.to_csv(data_dir / f"p{pid}_full.csv", index=False)
        print(f"Done processing pid {pid}")


convert_to_csv(df_all)

Done processing pid 10
Done processing pid 101
Done processing pid 102
Done processing pid 103
Done processing pid 105
Done processing pid 106
Done processing pid 108
Done processing pid 109
Done processing pid 11
Done processing pid 110
Done processing pid 111
Done processing pid 112
Done processing pid 113
Done processing pid 115
Done processing pid 116
Done processing pid 118
Done processing pid 119
Done processing pid 121
Done processing pid 123
Done processing pid 124
Done processing pid 127
Done processing pid 128
Done processing pid 129
Done processing pid 130
Done processing pid 131
Done processing pid 132
Done processing pid 134
Done processing pid 135
Done processing pid 136
Done processing pid 137
Done processing pid 138
Done processing pid 139
Done processing pid 14
Done processing pid 140
Done processing pid 141
Done processing pid 143
Done processing pid 145
Done processing pid 146
Done processing pid 147
Done processing pid 148
Done processing pid 149
Done processing pid

In [None]:
from src.data.models import ColumnNames
from src.data.preprocessing.pipeline import preprocessing_pipeline
from src.utils.unit import mg_dl_to_mmol_l


# 	pid	date	tableType	bolusType	normalBolus	expectedNormalBolus	extendedBolus	expectedExtendedBolus	bgInput	foodG	iob	cr	isf	bgMgdl	rate	suprBasalType	suprRate
def data_translation(df_raw: pd.DataFrame) -> pd.DataFrame:
    """
    'pid' -> p_num
    'date' -> datetime
    'tableType' -> msg_type (bolus, wizard, cgm). wizard contains information like carbs intake
    'eventType': This is bolus type
    'normal': Number of units of normal bolus
    'expectedNormal'
    'extended': Number of units for extended delivery
    'expectedExtended'
    'bgInput' -> Blood glucose as inputted into wizard in mg
    'carbInput' -> Carbohydrates as inputted into wizard in mg
    'iob': Units of insulin on board
    'cr': Number of mg carbs covered by unit of insulin. Not used yet but we can find a way to give model a hint about the slope of the glucose curve
    'isf': Number of bgs covered by unit of insulin. Same as above
    'recordType': CGM | CALIBRATION
    'glucoseValue' -> bg_mM
    """
    df = df_raw.copy()
    # TODO: Rename to the correct column names
    df = df.rename(
        columns={
            "pid": ColumnNames.P_NUM.value,
            "date": ColumnNames.DATETIME.value,
            "tableType": ColumnNames.MSG_TYPE.value,
            "normalBolus": ColumnNames.DOSE_UNITS.value,
            # "expectedNormalBolus": ColumnNames.EXPECTED_NORMAL.value,
            # "extendedBolus": ColumnNames.EXTENDED.value,
            # "expectedExtendedBolus": ColumnNames.EXPECTED_EXTENDED.value,
            # "bgInput": ColumnNames.BG.value, todo: Maybe tag the record type as bgInput
            "foodG": ColumnNames.FOOD_G.value,
            "iob": ColumnNames.IOB.value,
            # "cr": ColumnNames.INSULIN_CARB_RATIO.value,
            # "isf": ColumnNames.ISF.value,
            "recordType": ColumnNames.RECORD_TYPE.value,
            "bgMgdl": ColumnNames.BG.value,
            "rate": ColumnNames.RATE.value,
            "suprBasalType": ColumnNames.SUPR_BASAL_TYPE.value,
            "suprRate": ColumnNames.SUPR_RATE.value,
        }
    )
    df.drop(columns=["expectedNormalBolus", "expectedExtendedBolus"], inplace=True)
    df.set_index(ColumnNames.DATETIME.value, inplace=True)
    df.sort_index(inplace=True)
    df.index = pd.to_datetime(df.index)

    # Convert blood glucose from mg/dL to mmol/L
    # TODO: Add this back in
    df[ColumnNames.BG.value] = mg_dl_to_mmol_l(df, bgl_col=ColumnNames.BG.value)

    # Convert carbs from mg to g
    df[ColumnNames.FOOD_G.value] = df[ColumnNames.FOOD_G.value].astype(float) / 1000

    return df


def keep_overlapping_data(patient_df: pd.DataFrame) -> pd.DataFrame:
    """
    Args:
        patient_df: A dataframe that has been through data_translation (datetime is the index)
    Keep data that has overlapping time windows for all table types.
    Note that not all patients have data for all table types so we only consider the table types that are present.
    """
    table_types = patient_df[ColumnNames.MSG_TYPE.value].unique()
    start_datetime = None  # This should be the max of all the min datetimes
    end_datetime = None  # This should be the min of all the max datetimes

    for table_type in table_types:
        table_data = patient_df[patient_df[ColumnNames.MSG_TYPE.value] == table_type]
        if table_data.empty:
            continue

        min_datetime = table_data.index.min()
        max_datetime = table_data.index.max()

        if start_datetime is None or min_datetime > start_datetime:
            start_datetime = min_datetime
        if end_datetime is None or max_datetime < end_datetime:
            end_datetime = max_datetime

    if start_datetime is None or end_datetime is None:
        return None

    has_overlap = start_datetime < end_datetime
    if not has_overlap:
        return None

    # Filter using the index (datetime)
    return patient_df[
        (patient_df.index >= start_datetime) & (patient_df.index <= end_datetime)
    ]

In [None]:
def process_one_patient(
    df_raw: pd.DataFrame,
    to_csv: bool = False,
) -> pd.DataFrame:
    """
    Process the raw data for one patient:
    1. Translate the data (columns and units)
    2. Keep overlapping data (for bolus, wizard, cgm and basal)
    3. TODO: Convert basal rate to 5-min interval
    4. preprocessing_pipeline (This include resampling and deriving cob and iob)
    """
    df = df_raw.copy()

    # Translate the data
    df = data_translation(df)
    pid = df[ColumnNames.P_NUM.value].iloc[0]

    # Keep overlapping data
    df = keep_overlapping_data(df)
    if df is None:
        return None

    # Print data meta: span of datetime index and number of rows
    start_dt = df.index.min()
    end_dt = df.index.max()
    print(
        f"Patient {pid} processed data spans from {start_dt} to {end_dt} ({(end_dt - start_dt)})"
    )

    food_g = df[df[ColumnNames.FOOD_G.value].notna()]
    print(f"Number of rows with food intake: {len(food_g)}")

    bolus = df[df[ColumnNames.DOSE_UNITS.value].notna()]
    print(f"Number of rows with bolus: {len(bolus)}")

    # Let the pipeline handle the rest
    # This derives iob and cob which we is information we already have
    df = preprocessing_pipeline(pid, df)
    # Debug only
    if to_csv:
        debug_dir = repo / "cache" / "data" / "awesome_cgm" / "aleppo" / "debug"
        os.makedirs(debug_dir, exist_ok=True)
        df.to_csv(debug_dir / f"p{pid}.csv", index=True)
    return df


def process_all_patients(
    interim_path: Path,
    processed_path: Path,
) -> pd.DataFrame:
    """
    Clean all patients' data in the interim path and save the processed data to the processed path.
    """
    for pid in os.listdir(interim_path):
        df = pd.read_csv(interim_path / pid)
        df = process_one_patient(df)
        df.to_csv(processed_path / pid, index=True)
        print(f"{"-"*10}Done processing pid {pid} {"-"*10}")
    return

### Interim -> Processed for one patient

In [26]:
pid_check = 7
root = get_project_root()
patient_df = pd.read_csv(
    f"{root}/cache/data/awesome_cgm/aleppo/interim/p{pid_check}_full.csv"
)

start_time = patient_df.date.min()
end_time = patient_df.date.max()
print(f"Patient {pid_check} original data spans from {start_time} to {end_time}")
processed_df = process_one_patient(patient_df)

Patient 7 original data spans from 2019-08-17 14:32:00 to 2020-09-30 10:14:30
Patient 7 processed data spans from 2019-11-02 09:19:27 to 2020-09-30 08:58:17 (332 days 23:38:50)
Number of rows with food intake: 1651
Number of rows with bolus: 1648


### Process all patients

In [None]:
interim_path = repo / "cache" / "data" / "awesome_cgm" / "aleppo" / "interim"
processed_path = repo / "cache" / "data" / "awesome_cgm" / "aleppo" / "processed"


# TODO: Double check this. This is too fast to be true.
process_all_patients(interim_path, processed_path)

### TODO

- [x] Figure out why we are missing some patients like p04 - There is no reason
- [x] Verify db is created correctly in the right directory
- [x] Figure out the columns of the tables and make sure the unit is actually correct. Is CarbInput in g and "normal" in units at all?
- [ ] Plot a histogram to see if there are significantly less data before the enrollment day.
- [ ] Figure out how to handle missing data (Some patients have missing cgm data). - Need to filter out calibaration. That seems to remove some of the large gap
- [ ] Union with Basal table too (the rate should be roll to the next few rows. For example, a patient with 5-min interval has a rate of 0.1 units/hour, then we should have a 12 rows of 0.1/12 unit each). Also need to do a addition with whatever bolus is already there. So if a row already has 8 units of insulin, then it will be 8 + 0.1/12 units. Also need to keep rate as 0 if null.
- [ ] Take a look at some patients' raw data and see if it makes sense at all first


## Loader
- [ ] Add a test split for the loader (5% of the entire dataset is enough)
- [ ] Verify the `clean_all_patients` is working
- [ ] Verify that `data_loader` works at all
- [ ] For the loader class, cross check with Kaggle to do a similar data check
- [ ] Update `README.md`
