In [1]:
import pandas as pd
import duckdb
import pathlib

data_path = '/data/vgribanov/data/readm'
conn = duckdb.connect(database=':memory:', read_only=False)

To transform files to unicode should be used the following command:
```bash
iconv -f WINDOWS-1252 -t UTF-8 /data/vgribanov/data/readm/Diagnoses_hours.txt > /data/vgribanov/data/readm/Diagnoses_hours_unicode.txt
```

In [2]:
# create raw inpatient table
conn.execute(f"""
DROP TABLE IF EXISTS raw_inpatient;
CREATE TABLE raw_inpatient AS 
SELECT StudyID AS patient_id, PatientEncounterID AS encounter_id, 
    HospitalAdmitDTS AS admitted, HospitalDischargeDTS AS discharged, 
    HospitalServiceCD AS service_id, HospitalServiceDSC AS service_desc, 
    DischargeCategory AS disposition_desc,
    MeansOfArrivalCD AS ma_id, MeansOfArrivalDSC AS ma_desc, 
    IsFemale AS is_female, Age AS age, RaceEthnicity AS race,
    IsMarried AS is_married, SpeaksEnglish AS is_speak_english, HasAtLeastSomeCollege AS is_graduated,
    SDI_score AS sdi, HasSmoked AS is_smoked, CurrentlyDrinks AS is_drunk, 
    DaysFromLastHospitalization AS days_from_last, Within30 AS in_30, 
    AnyICU AS was_in_icu, LastDepartment AS last_department
FROM read_csv('{pathlib.Path(data_path)}/Inpatient_Static_hosp11.txt', types:= {'{'}EmergencyAdmitDTS:'VARCHAR'{'}'}, ignore_errors:=true);
""")

<duckdb.duckdb.DuckDBPyConnection at 0x79ae48db28b0>

In [4]:
# create vocabulary for static data (dictionary values)
conn.execute("""
DROP TABLE IF EXISTS static_data_vocab;
DROP SEQUENCE IF EXISTS static_data_vocab_id_seq;
CREATE SEQUENCE static_data_vocab_id_seq;
""")
conn.execute("""
CREATE TABLE static_data_vocab (
    id INT PRIMARY KEY DEFAULT nextval('static_data_vocab_id_seq'),
    type VARCHAR,
    description VARCHAR,
    code VARCHAR,
    lower_value FLOAT DEFAULT 0,
    upper_value FLOAT DEFAULT 0,
    encounters BIGINT
);
""")

conn.execute("""
INSERT INTO static_data_vocab (type, description, code, encounters)
SELECT 'service', COALESCE(max(service_desc), 'Unknown'), COALESCE(service_id::text,'0') as service_id, count(*) as qty
FROM raw_inpatient
GROUP BY service_id
ORDER BY service_id;

INSERT INTO static_data_vocab (type, description, code, encounters)
SELECT 'disposition', COALESCE(disposition_desc, 'Unknown'), COALESCE(disposition_desc,'Unknown'), count(*) as qty
FROM raw_inpatient
GROUP BY disposition_desc
ORDER BY disposition_desc;

INSERT INTO static_data_vocab (type, description, code, encounters)
SELECT 'means_of_arrival', COALESCE(max(ma_desc), 'Unknown'), COALESCE(ma_id::text,'0'), count(*) as qty
FROM raw_inpatient
GROUP BY ma_id
ORDER BY ma_id;

INSERT INTO static_data_vocab (type, description, code, encounters)
SELECT 'race', COALESCE(max(race), 'Unknown'), COALESCE(race::text,'Unknown'), count(*) as qty
FROM raw_inpatient
GROUP BY race
ORDER BY race;

-- binary features
WITH data AS (
    SELECT feature_name, value, encounter_id
    FROM raw_inpatient
    UNPIVOT (value FOR feature_name IN (is_married, is_speak_english, is_graduated, is_smoked, is_drunk, was_in_icu))
)
INSERT INTO static_data_vocab (type, description, code, encounters)
SELECT feature_name, feature_name, value, count(distinct encounter_id) as qty
FROM data
WHERE value = 1
GROUP BY feature_name, value;

-- numeric features
WITH data AS (
    SELECT feature_name, value, encounter_id
    FROM raw_inpatient
    UNPIVOT (value FOR feature_name IN (sdi, age, days_from_last))
), values AS (
    SELECT feature_name, value, encounter_id
    FROM data
    GROUP BY feature_name, value, encounter_id
), feature_groups AS (
    SELECT feature_name, ntile(CASE WHEN feature_name = 'age' THEN 15 ELSE 10 END) OVER (PARTITION BY feature_name ORDER BY value) as vq, value, encounter_id
    FROM values
)
INSERT INTO static_data_vocab (type, description, code, lower_value, upper_value,  encounters)
SELECT 
    feature_name, 
    printf('%d: %s %.1f-%.1f', vq, feature_name, min(value)::float, max(value)::float), 
    vq, min(value), max(value), count(distinct encounter_id)
FROM feature_groups
GROUP BY feature_name, vq
ORDER BY feature_name, vq;
""")


<duckdb.duckdb.DuckDBPyConnection at 0x79ae48db28b0>

In [5]:
# create deduplicated inpatient table
conn.execute("""
DROP TABLE IF EXISTS inpatient;
CREATE TABLE inpatient AS 
SELECT * EXCLUDE (prev_admitted, prev_discharged)
FROM (
    WITH grouped_inpatient AS (
        SELECT 
            patient_id, encounter_id, 
            min(admitted) AS admitted, 
            max(discharged) AS discharged, 
            LIST(DISTINCT COALESCE(service_id, 0)) AS service_ids,
            LIST(DISTINCT COALESCE(ma_id,0)) AS ma_ids,
            max(is_female) AS is_female,
            max(age) AS age,
            MIN(COALESCE(race, 'Unknown')) AS race,
            max(is_married) AS is_married,
            max(is_speak_english) AS is_speak_english,
            max(is_graduated) AS is_graduated,
            COALESCE(avg(sdi), 0) AS sdi,
            max(is_smoked) AS is_smoked,
            max(is_drunk) AS is_drunk,
            max(COALESCE(days_from_last,0)) AS days_from_last,
            max(in_30) AS in_30,
            max(was_in_icu) AS was_in_icu
        FROM raw_inpatient
       -- WHERE patient_id = '{51A1E0CF-257F-4E76-9D48-2D04CA5BF4A2}'
        GROUP BY patient_id, encounter_id
        ORDER BY admitted
    )
    SELECT *,
        lag(admitted) OVER (ORDER BY admitted) AS prev_admitted,
        lag(discharged) OVER (ORDER BY admitted) AS prev_discharged,
        (admitted BETWEEN prev_admitted AND prev_discharged OR 
        discharged BETWEEN prev_admitted AND prev_discharged) AS intersects_prev,
        COUNT(*) OVER (
            PARTITION BY patient_id 
            ORDER BY admitted 
            RANGE BETWEEN unbounded preceding AND CURRENT ROW
        ) AS encounter_num
    FROM grouped_inpatient
);
""")

<duckdb.duckdb.DuckDBPyConnection at 0x79ae48db28b0>

In [6]:
# static feature array
conn.execute("""
DROP TABLE IF EXISTS static_features;
CREATE TABLE static_features AS
WITH binary_data AS (
    SELECT feature_name, max(value) as value, patient_id, encounter_id
    FROM raw_inpatient
    UNPIVOT (value FOR feature_name IN (is_married, is_speak_english, is_graduated, is_smoked, is_drunk, was_in_icu))
    GROUP BY ALL
), binary_features AS (
    SELECT binary_data.patient_id, binary_data.encounter_id, static_data_vocab.id
    FROM binary_data
        INNER JOIN static_data_vocab ON 
            binary_data.feature_name = static_data_vocab.type AND 
            binary_data.value::TEXT = static_data_vocab.code
), numeric_data AS (
    SELECT feature_name, value, patient_id, encounter_id
    FROM raw_inpatient
    UNPIVOT (value FOR feature_name IN (sdi, age, days_from_last))
), numeric_values AS (
    SELECT feature_name, max(value) AS value, patient_id, encounter_id
    FROM numeric_data
    GROUP BY ALL
), numeric_features AS (
    SELECT numeric_values.patient_id, numeric_values.encounter_id, static_data_vocab.id
    FROM numeric_values
        ASOF JOIN static_data_vocab 
            ON 
                numeric_values.feature_name = static_data_vocab.type AND 
                numeric_values.value >= static_data_vocab.lower_value
), dict_data AS (
    SELECT 
        patient_id, encounter_id, 
        COALESCE(service_id::TEXT, '0') AS service,
        COALESCE(disposition_desc::TEXT, 'Unknown') AS disposition,
        COALESCE(ma_id::TEXT, '0') AS means_of_arrival,
        COALESCE(race, 'Unknown') AS race
    FROM raw_inpatient
    GROUP BY ALL
), dict_values AS (
    SELECT *
    FROM dict_data
    UNPIVOT (value FOR feature_name IN (service, disposition, means_of_arrival))
), dict_features AS (
    SELECT patient_id, encounter_id, static_data_vocab.id
    FROM dict_values
            JOIN static_data_vocab 
                ON 
                    dict_values.feature_name = static_data_vocab.type AND 
                    dict_values.value = static_data_vocab.code
    GROUP BY patient_id, encounter_id, static_data_vocab.id
)
SELECT patient_id, encounter_id, LIST(DISTINCT id ORDER BY id) AS features
FROM (
    FROM binary_features
    UNION ALL
    FROM numeric_features
    UNION ALL
    FROM dict_features
)
GROUP BY patient_id, encounter_id;
""")

<duckdb.duckdb.DuckDBPyConnection at 0x79ae48db28b0>

In [7]:
# dynamic features 
conn.execute("""
DROP TABLE IF EXISTS dynamic_data_vocab;
DROP SEQUENCE IF EXISTS dynamic_data_vocab_id_seq;

CREATE SEQUENCE dynamic_data_vocab_id_seq;
CREATE TABLE dynamic_data_vocab (
    id INT PRIMARY KEY DEFAULT nextval('dynamic_data_vocab_id_seq'),
    type VARCHAR,
    description VARCHAR,
    code VARCHAR,
    value_group INT,
    lower_value FLOAT DEFAULT 0,
    upper_value FLOAT DEFAULT 0,
    encounters BIGINT
);
""")

# create encounters period table
conn.execute("""
DROP TABLE IF EXISTS encounters;
CREATE TABLE encounters AS
SELECT
    row_number() OVER () AS idx,
    patient_id, encounter_id, 
    min(admitted) as admitted, max(discharged) as discharged
FROM raw_inpatient
GROUP BY patient_id, encounter_id
""")

<duckdb.duckdb.DuckDBPyConnection at 0x79ae48db28b0>

In [8]:
# labs 

lab_limit = 0.01 # percent of encounters that has lab code to limit the number of features

# load labs data
conn.execute(f"""
DROP TABLE IF EXISTS labs_raw;
CREATE TABLE labs_raw AS
SELECT 
    StudyID AS patient_id, 
    EncounterID AS encounter_id,
    componentCommonNM AS code,
    NVal AS value,
    HoursSinceAdmit AS hours_since_admit
FROM read_csv('{pathlib.Path(data_path)}/Labs_hours.txt');
""")

# split labs values into groups
conn.execute(f"""
DROP TABLE IF EXISTS labs_values;
CREATE TABLE labs_values AS
WITH data AS (
    SELECT 
        labs_raw.patient_id, labs_raw.encounter_id, labs_raw.code, labs_raw.value, labs_raw.hours_since_admit,
        encounters.admitted, encounters.discharged,
        (encounters.admitted + INTERVAL (labs_raw.hours_since_admit::INTEGER) HOUR) AS lab_time,
        extract(hour from encounters.discharged - lab_time) AS hours_to_discharge,
        COUNT(DISTINCT labs_raw.encounter_id) OVER (PARTITION BY code) AS lab_encounters_num,
        COUNT(DISTINCT labs_raw.encounter_id) OVER () AS total_encounters_num
    FROM labs_raw
        INNER JOIN encounters ON 
            labs_raw.patient_id = encounters.patient_id AND 
            labs_raw.encounter_id = encounters.encounter_id
    WHERE lab_time BETWEEN encounters.admitted AND encounters.discharged
)
SELECT 
    patient_id, 
    encounter_id, code, value, 
    hours_since_admit, hours_to_discharge,
    ntile(10) OVER (PARTITION BY code ORDER BY value) as value_group
FROM data
WHERE 
    lab_encounters_num::FLOAT / total_encounters_num::FLOAT >= {lab_limit}
""")

# create vocabulary for dynamic lab data
conn.execute("""
INSERT INTO dynamic_data_vocab (type, description, code, value_group, lower_value, upper_value, encounters) 
SELECT 'labs', code, code, value_group, min(value), max(value), COUNT(DISTINCT encounter_id) as qty
FROM labs_values
GROUP BY code, value_group
ORDER BY code, value_group
""")

# create dynamic lab features
conn.execute("""
DROP TABLE IF EXISTS dynamic_labs_data;
CREATE TABLE dynamic_labs_data AS
SELECT patient_id, encounter_id, hours_since_admit AS period, min(dynamic_data_vocab.id) AS value
FROM labs_values
        INNER JOIN dynamic_data_vocab ON 
            dynamic_data_vocab.type = 'labs' AND
            labs_values.code = dynamic_data_vocab.code AND 
            labs_values.value_group = dynamic_data_vocab.value_group
GROUP BY patient_id, encounter_id, period, labs_values.code;
""")


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

<duckdb.duckdb.DuckDBPyConnection at 0x79ae48db28b0>

In [9]:
# vitals 
conn.execute("""
DROP TABLE IF EXISTS vitals_raw;
CREATE TABLE vitals_raw (
    patient_id VARCHAR,
    encounter_id BIGINT,
    type VARCHAR,
    code VARCHAR,
    value FLOAT,
    hours_since_admit FLOAT
)
""")

# load vitals data
conn.execute(f"""
INSERT INTO vitals_raw BY NAME
SELECT 
    StudyID AS patient_id, 
    EncounterID AS encounter_id,
    'DBP' AS type,
    'DBP' AS code,
    MeasureTXT AS value,
    HoursSinceAdmit AS hours_since_admit
FROM read_csv('{pathlib.Path(data_path)}/Vitals_DBP_hours.txt');

INSERT INTO vitals_raw BY NAME
SELECT 
    StudyID AS patient_id, 
    EncounterID AS encounter_id,
    'O2Device' AS type,
    mapping.category AS code,
    0 AS value,
    HoursSinceAdmit AS hours_since_admit
FROM read_csv('{pathlib.Path(data_path)}/Vitals_O2Device_hours.txt') AS data
        INNER JOIN read_csv('{pathlib.Path(data_path)}/o2device_mapping.csv') AS mapping
            ON data.MeasureTXT = mapping.description;

INSERT INTO vitals_raw BY NAME
SELECT 
    StudyID AS patient_id, 
    EncounterID AS encounter_id,
    'Pulse' AS type,
    'Pulse' AS code,
    MeasureTXT AS value,
    HoursSinceAdmit AS hours_since_admit
FROM read_csv('{pathlib.Path(data_path)}/Vitals_Pulse_hours.txt');

INSERT INTO vitals_raw BY NAME
SELECT 
    StudyID AS patient_id, 
    EncounterID AS encounter_id,
    'RespRate' AS type,
    'RespRate' AS code,
    MeasureTXT AS value,
    HoursSinceAdmit AS hours_since_admit
FROM read_csv('{pathlib.Path(data_path)}/Vitals_RespRate_hours.txt');

INSERT INTO vitals_raw BY NAME
SELECT 
    StudyID AS patient_id, 
    EncounterID AS encounter_id,
    'SBP' AS type,
    'SBP' AS code,
    MeasureTXT AS value,
    HoursSinceAdmit AS hours_since_admit
FROM read_csv('{pathlib.Path(data_path)}/Vitals_SBP_hours.txt');

INSERT INTO vitals_raw BY NAME
SELECT 
    StudyID AS patient_id, 
    EncounterID AS encounter_id,
    'SpO2' AS type,
    'SpO2' AS code,
    try_cast(MeasureTXT AS Float) AS value,
    HoursSinceAdmit AS hours_since_admit
FROM read_csv('{pathlib.Path(data_path)}/Vitals_SpO2_hours.txt');

INSERT INTO vitals_raw BY NAME
SELECT 
    StudyID AS patient_id, 
    EncounterID AS encounter_id,
    'Temp' AS type,
    'Temp' AS code,
    MeasureTXT AS value,
    HoursSinceAdmit AS hours_since_admit
FROM read_csv('{pathlib.Path(data_path)}/Vitals_Temp_hours.txt');

""")

# split vitals values into groups
conn.execute("""
DROP TABLE IF EXISTS vitals_values;
CREATE TABLE vitals_values AS
SELECT 
    vitals_raw.patient_id, vitals_raw.encounter_id, 
    type, code, 
    ntile(CASE WHEN type = 'O2Device' THEN 1 ELSE 10 END) OVER (PARTITION BY type, code ORDER BY value) as value_group,
    hours_since_admit,
    (encounters.admitted + INTERVAL (vitals_raw.hours_since_admit::INTEGER) HOUR) AS measure_time,
    extract(hour from encounters.discharged - measure_time) AS hours_to_discharge,
    value
FROM vitals_raw
    INNER JOIN encounters ON vitals_raw.patient_id = encounters.patient_id AND vitals_raw.encounter_id = encounters.encounter_id
WHERE measure_time BETWEEN encounters.admitted AND encounters.discharged
""")

# create vocabulary for dynamic vital data
conn.execute("""
INSERT INTO dynamic_data_vocab (type, description, code, value_group, lower_value, upper_value, encounters) 
SELECT type, code, code, value_group, min(value), max(value), COUNT(DISTINCT encounter_id) as qty
FROM vitals_values
GROUP BY type, code, value_group
ORDER BY type, code, value_group
""")

# create dynamic vital features
conn.execute("""
DROP TABLE IF EXISTS dynamic_vitals_data;
CREATE TABLE dynamic_vitals_data AS
SELECT patient_id, encounter_id, hours_since_admit::INTEGER AS period, min(dynamic_data_vocab.id) AS value
FROM vitals_values
        INNER JOIN dynamic_data_vocab ON 
            vitals_values.type = dynamic_data_vocab.type AND
            vitals_values.code = dynamic_data_vocab.code AND 
            vitals_values.value_group = dynamic_data_vocab.value_group
GROUP BY patient_id, encounter_id, period, vitals_values.code;
""")


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

<duckdb.duckdb.DuckDBPyConnection at 0x79ae48db28b0>

In [10]:
# diagnoses

diag_limit = 0.01 # percent of patients that has diagnoses code to limit the number of features

# load diagnoses data
conn.execute(f"""
DROP TABLE IF EXISTS diagnoses_raw;
CREATE TABLE diagnoses_raw AS
SELECT 
    StudyID AS patient_id,
    EncounterID AS encounter_id,
    HoursSinceAdmit AS hours_since_admit,
    ICD10CD AS code
FROM read_csv('{pathlib.Path(data_path)}/Diagnoses_hours.txt');

DROP TABLE IF EXISTS codes;
CREATE TABLE codes AS
FROM '{pathlib.Path(data_path)}/codes.parquet';

DROP TABLE IF EXISTS diagnoses_values;
CREATE TABLE diagnoses_values AS
SELECT 
    diagnoses_raw.patient_id, diagnoses_raw.encounter_id, code, hours_since_admit,
    (encounters.admitted + INTERVAL (diagnoses_raw.hours_since_admit::INTEGER) HOUR) AS diagnoses_time,
    extract(hour from encounters.discharged - diagnoses_time) AS hours_to_discharge
FROM diagnoses_raw
    INNER JOIN encounters ON 
        diagnoses_raw.patient_id = encounters.patient_id AND 
        diagnoses_raw.encounter_id = encounters.encounter_id
WHERE diagnoses_time BETWEEN encounters.admitted AND encounters.discharged;
""")

# create vocabulary for dynamic diagnoses data
conn.execute(f"""
WITH data AS (
    SELECT 
        diagnoses_values.code,COALESCE(codes.description, '') as description, 
        COUNT(DISTINCT encounter_id) as encounters,
        COUNT(DISTINCT patient_id) as patients
    FROM diagnoses_values
        LEFT JOIN codes ON 
            codes.type = 'ICD10' AND 
            diagnoses_values.code = codes.code
    GROUP BY diagnoses_values.code, description
), total_encounters AS (
    SELECT COUNT(DISTINCT patient_id) as qty
    FROM encounters
)
INSERT INTO dynamic_data_vocab (type, description, code, value_group, lower_value, upper_value, encounters) 
SELECT 'diagnoses', description, code, 1, 0, 0, encounters
FROM data
WHERE patients::FLOAT / (SELECT qty FROM total_encounters LIMIT 1) >= {diag_limit};
""")

# create dynamic diagnoses features
conn.execute("""
DROP TABLE IF EXISTS dynamic_diagnoses_data;
CREATE TABLE dynamic_diagnoses_data AS
SELECT patient_id, encounter_id, hours_since_admit AS period, dynamic_data_vocab.id AS value
FROM diagnoses_values
        INNER JOIN dynamic_data_vocab ON
            dynamic_data_vocab.type = 'diagnoses' AND
            diagnoses_values.code = dynamic_data_vocab.code
GROUP BY ALL;
""")

<duckdb.duckdb.DuckDBPyConnection at 0x79ae48db28b0>

In [11]:
# procedures
procs_limit = 0.01 # percent of patients that has procedures code to limit the number of features
proc_hours_field = 'HoursSinceAdmit_order'
proc_code_field = 'ProcedureCD'
proc_desc_field = 'ProcedureDSC'

conn.execute(f"""
DROP TABLE IF EXISTS procedures_raw;
CREATE TABLE procedures_raw AS
SELECT 
    EncounterID AS encounter_id,
    {proc_hours_field} AS hours_since_admit,
    {proc_code_field} AS code,
    ANY_VALUE({proc_desc_field}) AS description
FROM read_csv('{pathlib.Path(data_path)}/Procedures_hours_unicode.txt')
GROUP BY EncounterID, HoursSinceAdmit_order, ProcedureCD;

DROP TABLE IF EXISTS procedures_values;
CREATE TABLE procedures_values AS
SELECT 
    encounters.patient_id, procedures_raw.encounter_id, code, description, hours_since_admit,
    (encounters.admitted + INTERVAL (procedures_raw.hours_since_admit::INTEGER) HOUR) AS procedure_time,
    extract(hour from encounters.discharged - procedure_time) AS hours_to_discharge
FROM procedures_raw
    INNER JOIN encounters ON 
        procedures_raw.encounter_id = encounters.encounter_id
WHERE procedure_time BETWEEN encounters.admitted AND encounters.discharged;
""")

# create vocabulary for dynamic procedures data
conn.execute(f"""
WITH data AS (
    SELECT 
        procedures_values.code,
        COALESCE(procedures_values.description, '') as description, 
        COUNT(DISTINCT encounter_id) as encounters,
        COUNT(DISTINCT patient_id) as patients
    FROM procedures_values
    GROUP BY code, description
), total_encounters AS (
    SELECT COUNT(DISTINCT patient_id) as qty
    FROM encounters
)
INSERT INTO dynamic_data_vocab (type, description, code, value_group, lower_value, upper_value, encounters) 
SELECT 'procedures', description, code, 1, 0, 0, encounters
FROM data
WHERE patients::FLOAT / (SELECT qty FROM total_encounters LIMIT 1) >= {procs_limit};
""")

# create dynamic procedures features
conn.execute("""
DROP TABLE IF EXISTS dynamic_procedures_data;
CREATE TABLE dynamic_procedures_data AS
SELECT patient_id, encounter_id, hours_since_admit AS period, dynamic_data_vocab.id AS value
FROM procedures_values
        INNER JOIN dynamic_data_vocab ON
            dynamic_data_vocab.type = 'procedures' AND
            procedures_values.code = dynamic_data_vocab.code
GROUP BY ALL;
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

<duckdb.duckdb.DuckDBPyConnection at 0x79ae48db28b0>

In [12]:
# medications

meds_limit = 0.01 # percent of patients that has medications code to limit the number of features
meds_code_field = 'PharmaceuticalSubclassCD'
meds_desc_field = 'PharmaceuticalSubclassDSC'

conn.execute(f"""
DROP TABLE IF EXISTS medications_raw;
CREATE TABLE medications_raw AS
SELECT
    StudyID AS patient_id,
    EncounterID AS encounter_id,
    HoursSinceAdmit AS hours_since_admit,
    {meds_code_field} AS code,
    ANY_VALUE({meds_desc_field}) AS description
FROM read_csv('{pathlib.Path(data_path)}/Medications_hours_unicode.txt')
GROUP BY StudyID, EncounterID, HoursSinceAdmit, PharmaceuticalSubclassCD;

DROP TABLE IF EXISTS medications_values;
CREATE TABLE medications_values AS
SELECT 
    encounters.patient_id, medications_raw.encounter_id, code, description, hours_since_admit,
    (encounters.admitted + INTERVAL (medications_raw.hours_since_admit::INTEGER) HOUR) AS procedure_time,
    extract(hour from encounters.discharged - procedure_time) AS hours_to_discharge
FROM medications_raw
    INNER JOIN encounters ON
        medications_raw.patient_id = encounters.patient_id AND
        medications_raw.encounter_id = encounters.encounter_id
WHERE procedure_time BETWEEN encounters.admitted AND encounters.discharged;
""")

# create vocabulary for dynamic medication data
conn.execute(f"""
WITH data AS (
    SELECT 
        medications_values.code,
        COALESCE(medications_values.description, '') as description, 
        COUNT(DISTINCT encounter_id) as encounters,
        COUNT(DISTINCT patient_id) as patients
    FROM medications_values
    GROUP BY code, description
), total_encounters AS (
    SELECT COUNT(DISTINCT patient_id) as qty
    FROM encounters
)
INSERT INTO dynamic_data_vocab (type, description, code, value_group, lower_value, upper_value, encounters) 
SELECT 'medications', description, code, 1, 0, 0, encounters
FROM data
WHERE patients::FLOAT / (SELECT qty FROM total_encounters LIMIT 1) >= {meds_limit};
""")

# create dynamic medications features
conn.execute("""
DROP TABLE IF EXISTS dynamic_medications_data;
CREATE TABLE dynamic_medications_data AS
SELECT patient_id, encounter_id, hours_since_admit AS period, dynamic_data_vocab.id AS value
FROM medications_values
        INNER JOIN dynamic_data_vocab ON
            dynamic_data_vocab.type = 'medications' AND
            medications_values.code = dynamic_data_vocab.code
GROUP BY ALL;
""")


<duckdb.duckdb.DuckDBPyConnection at 0x79ae48db28b0>

In [13]:
# create dynamic features arrays
conn.execute("""
DROP TABLE IF EXISTS dynamic_features;
CREATE TABLE dynamic_features AS
WITH data AS (
    SELECT patient_id, encounter_id, period, value 
    FROM dynamic_labs_data
    UNION ALL
    SELECT patient_id, encounter_id, period, value
    FROM dynamic_vitals_data
    UNION ALL
    SELECT patient_id, encounter_id, period, value
    FROM dynamic_diagnoses_data
    UNION ALL
    SELECT patient_id, encounter_id, period, value
    FROM dynamic_procedures_data
    UNION ALL
    SELECT patient_id, encounter_id, period, value
    FROM dynamic_medications_data
), hourly_data AS (
    SELECT patient_id, encounter_id, period, LIST(DISTINCT value ORDER BY value) AS features
    FROM data
    GROUP BY patient_id, encounter_id, period
)
SELECT patient_id, encounter_id, LIST(features ORDER BY period) AS features, LIST(DISTINCT period ORDER BY period) AS periods
FROM hourly_data
GROUP BY patient_id, encounter_id
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

<duckdb.duckdb.DuckDBPyConnection at 0x79ae48db28b0>

In [14]:
# export data
import pathlib
out_file_path = '/data/vgribanov/data/readm/prepared_data'
pathlib.Path(out_file_path).mkdir(parents=True, exist_ok=True)

# vocab
conn.execute(f"""
COPY static_data_vocab TO '{pathlib.Path(out_file_path) / 'static_data_vocab.parquet'}' (FORMAT parquet);
COPY dynamic_data_vocab TO '{pathlib.Path(out_file_path) / 'dynamic_data_vocab.parquet'}' (FORMAT parquet);
""")

# data
conn.execute(f"""
COPY (
    SELECT
        encounters.idx,
        static_features.features AS static_features,
        dynamic_features.features AS dynamic_features,
        dynamic_features.periods AS periods
    FROM encounters
            INNER JOIN static_features ON 
                encounters.patient_id = static_features.patient_id AND 
                encounters.encounter_id = static_features.encounter_id
            INNER JOIN dynamic_features ON
                encounters.patient_id = dynamic_features.patient_id AND 
                encounters.encounter_id = dynamic_features.encounter_id
) TO '{pathlib.Path(out_file_path) / 'data.parquet'}' (FORMAT parquet);
""")

# target
conn.execute(f"""
COPY (
    SELECT 
        encounters.idx,
        inpatient.in_30 AS target
    FROM inpatient
        INNER JOIN encounters ON 
            inpatient.patient_id = encounters.patient_id AND 
            inpatient.encounter_id = encounters.encounter_id
) TO '{pathlib.Path(out_file_path) / 'target.parquet'}' (FORMAT parquet);
""")


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

<duckdb.duckdb.DuckDBPyConnection at 0x79ae48db28b0>

In [17]:
conn.query("""
SELECT type, COUNT(*) AS qty FROM static_data_vocab GROUP BY type
""").df()

Unnamed: 0,type,qty
0,means_of_arrival,11
1,was_in_icu,1
2,is_speak_english,1
3,age,15
4,disposition,4
5,is_graduated,1
6,race,5
7,is_smoked,1
8,days_from_last,10
9,sdi,10


In [23]:
conn.query("""
with data AS (
    SELECT
        encounters.idx,
        encounters.patient_id,
        static_features.features AS static_features,
        dynamic_features.features AS dynamic_features,
        dynamic_features.periods AS periods
    FROM encounters
            INNER JOIN static_features ON 
                encounters.patient_id = static_features.patient_id AND 
                encounters.encounter_id = static_features.encounter_id
            INNER JOIN dynamic_features ON
                encounters.patient_id = dynamic_features.patient_id AND 
                encounters.encounter_id = dynamic_features.encounter_id
)
SELECT COUNT(*), count(distinct patient_id)
FROM data
""").df()

Unnamed: 0,count_star(),count(DISTINCT patient_id)
0,102878,32807


In [19]:
duckdb.query(f"""
    SELECT COUNT(*) FROM '{pathlib.Path(out_file_path)}/data.parquet';
""").df()

Unnamed: 0,count_star()
0,849827


In [None]:
duckdb.query("""
SUMMARIZE FROM read_csv('/data/vgribanov/data/readm/Procedures_hours_unicode.txt');
""").df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,column_name,column_type,min,max,approx_unique,avg,std,q25,q50,q75,count,null_percentage
0,OrderProcedureID,BIGINT,773132,1401949361,30523783,720544527.8897992,354526331.5724946,416502983.0,700258545.0,1018062360.0,35635261,0.0
1,EncounterID,BIGINT,3008282174,3612269871,1287281,3328089527.422318,144197569.18995392,3203539771.0,3315594600.0,3448899253.0,35635261,0.0
2,HoursSinceAdmit_order,BIGINT,-46,105879,6316,105.99607024065293,472.2888961659517,-9.0,9.0,90.0,35635261,0.0
3,HoursSinceAdmit_specimen,BIGINT,-1084591,724238,7317,145.5976126058818,828.5355990868014,-7.0,36.0,143.0,35635261,78.82
4,OrderTypeCD,BIGINT,1,1020,62,62.36827991802839,197.37700778659615,10.0,26.0,26.0,35635261,0.0
5,OrderTypeDSC,VARCHAR,ADT Discharge,Wound Ostomy,68,,,,,,35635261,0.0
6,ProcedureCD,VARCHAR,104112,XS.XS.IMGBRIEF,5710,,,,,,35635261,0.0
7,ProcedureNM,VARCHAR,1-3-BETA D GLUCAN,ZYGOSITY,8015,,,,,,35635261,0.0
8,ProcedureDSC,VARCHAR,"GENITAL CULTURE/SMEAR, FEMALE",unsuccessful LEFT bedside picc insertion,8706,,,,,,35635261,0.0
9,OrderClassCD,BIGINT,1,100,47,27.495769418786,22.32439617215245,22.0,22.0,23.0,35635261,0.49


In [None]:
duckdb.query("""
SUMMARIZE FROM '/data/vgribanov/data/readm/codes.parquet';
""").df()

Unnamed: 0,column_name,column_type,min,max,approx_unique,avg,std,q25,q50,q75,count,null_percentage
0,code,VARCHAR,,Z99.9,389021,,,,,,397739,0.0
1,description,VARCHAR,BONE GRAFT TO FACE OR SKULL AUTOGENOUS ADMI...,wallstent Esoph--double,279504,,,,,,397739,0.04
2,type,VARCHAR,CPT,ICD9,3,,,,,,397739,0.0
3,source,VARCHAR,KMDictionary,SURGERYBWH,4,,,,,,397739,0.0


In [None]:
duckdb.query("""
select * from dynamic_data_vocab where type != 'labs' and code != 'DBP' and code != 'Pulse' and code != 'RespRate' and code != 'SBP' and code != 'SpO2' and code != 'Temp'
""").df()

Unnamed: 0,id,type,description,code,value_group,lower_value,upper_value,encounters
0,3627,O2Device,Aerosol mask,Aerosol mask,1,0.0,0.0,5070
1,3628,O2Device,Bag-valve Mask,Bag-valve Mask,1,0.0,0.0,1085
2,3629,O2Device,Bag-valve mask,Bag-valve mask,1,0.0,0.0,14
3,3630,O2Device,Bi-PAP,Bi-PAP,1,0.0,0.0,5878
4,3631,O2Device,Blow-by,Blow-by,1,0.0,0.0,2078
5,3602,O2Device,CPAP,CPAP,1,0.0,0.0,11193
6,3603,O2Device,Face tent,Face tent,1,0.0,0.0,2346
7,3604,O2Device,High flow face mask,High flow face mask,1,0.0,0.0,1440
8,3605,O2Device,High flow nasal cannula,High flow nasal cannula,1,0.0,0.0,2928
9,3606,O2Device,Nasal cannula,Nasal cannula,1,0.0,0.0,134981


<duckdb.duckdb.DuckDBPyConnection at 0x7e6d095b8370>

In [None]:
# create dynamic lab features
duckdb.execute("""
DELETE FROM dynamic_data_vocab WHERE type != 'labs';
""").df()

Unnamed: 0,Count
0,1240


In [None]:
duckdb.query("""
SELECT patient_id, encounter_id, hours_since_admit AS period, min(dynamic_data_vocab.id) AS value
FROM labs_values
        INNER JOIN dynamic_data_vocab ON labs_values.code = dynamic_data_vocab.code AND labs_values.value_group = dynamic_data_vocab.value_group
GROUP BY patient_id, encounter_id, period, labs_values.code
""").df()

Unnamed: 0,patient_id,encounter_id,period,value
0,{E7DAEFA6-7311-477A-B31F-B7CD52E37A85},3130537130,57,1179
1,{02FCBAB4-C937-49C2-8B45-FF34CD9561FF},3380167987,58,1179
2,{F9374EC4-3952-455E-82B6-982FEED5028D},3297540900,18,1179
3,{06917848-1455-4B84-93F7-1887F385A754},3340114661,322,1179
4,{A7755527-2230-4BF7-BA1F-AE94EC20CB40},3434357153,12,1179
...,...,...,...,...
31846673,{7EB5B2D7-AE58-47C6-956D-D7B4DEE4BF4A},3126254047,0,1638
31846674,{BA22D114-4E76-4B53-B108-929EF2650360},3253807966,26,1638
31846675,{9702986E-6A3D-4A01-A28F-BA146CEF8D10},3197484530,229,1638
31846676,{491063B2-844C-418C-9E58-CAED91F0A3F2},3198337451,16,3201


In [None]:
duckdb.query("""
    SELECT count(*)
    FROM labs_raw
        INNER JOIN encounters ON 
            labs_raw.patient_id = encounters.patient_id AND 
            labs_raw.encounter_id = encounters.encounter_id
    WHERE (encounters.admitted + INTERVAL (labs_raw.hours_since_admit::INTEGER) HOUR) BETWEEN encounters.admitted AND encounters.discharged
""").df()

Unnamed: 0,count_star()
0,41601630


In [None]:
duckdb.query("""
FROM dynamic_data_vocab
""").df()

Unnamed: 0,id,type,description,code,value_group,lower_value,upper_value,encounters
0,122,labs,25 (OH) VITAMIN D TOTAL,25 (OH) VITAMIN D TOTAL,1,4.0,11.0,1263
1,123,labs,25 (OH) VITAMIN D TOTAL,25 (OH) VITAMIN D TOTAL,2,11.0,15.0,1261
2,124,labs,25 (OH) VITAMIN D TOTAL,25 (OH) VITAMIN D TOTAL,3,15.0,19.0,1262
3,125,labs,25 (OH) VITAMIN D TOTAL,25 (OH) VITAMIN D TOTAL,4,19.0,22.0,1278
4,126,labs,25 (OH) VITAMIN D TOTAL,25 (OH) VITAMIN D TOTAL,5,22.0,26.0,1264
...,...,...,...,...,...,...,...,...
3265,1980,labs,YEAST,YEAST,6,1.0,1.0,261
3266,1981,labs,YEAST,YEAST,7,1.0,1.0,261
3267,1982,labs,YEAST,YEAST,8,1.0,2.0,254
3268,1983,labs,YEAST,YEAST,9,2.0,4.0,250


In [9]:
conn.query(f"""
summarize
FROM read_csv('{pathlib.Path(data_path)}/Inpatient_Static_hosp11.txt', types:= {'{'}EmergencyAdmitDTS:'VARCHAR'{'}'}, ignore_errors:=true)

""").df()

Unnamed: 0,column_name,column_type,min,max,approx_unique,avg,std,q25,q50,q75,count,null_percentage
0,StudyID,VARCHAR,-B2C2-99661D36054D},{FFFC4875-3814-4E31-9313-CAE4CB110D79},37626,,,,,,128648,0.0
1,PatientEncounterID,BIGINT,3042456800,3574789771,133036,3302620297.762538,137553612.2889296,3184857561,3285691861,3416234846,128648,0.0
2,HospitalAdmitDTS,TIMESTAMP,2015-07-01 01:43:00,2023-12-30 22:33:00,100812,,,2018-01-13 16:54:00.499326,2019-12-26 11:45:52.953234,2021-12-28 14:36:42.658096,128648,0.0
3,HospitalDischargeDTS,TIMESTAMP,2015-07-02 12:04:00,2024-03-08 19:02:00,112677,,,2018-01-17 15:37:27.429174,2019-12-31 23:10:15.623285,2022-01-04 15:50:34.631332,128648,0.0
4,HospitalServiceCD,BIGINT,3,1413,52,137.0094910142404,163.98492087915525,112,112,119,128648,0.0
5,HospitalServiceDSC,VARCHAR,Acupuncture,Vascular Surgery,49,,,,,,128648,0.0
6,DischargeCategory,VARCHAR,Hospice,home with services,4,,,,,,128648,0.0
7,MeansOfArrivalCD,BIGINT,1,10,11,6.595856063649204,1.6626324267393782,5,8,8,128648,40.01
8,MeansOfArrivalDSC,VARCHAR,Ambulance,Wheelchair,10,,,,,,128648,40.01
9,IsFemale,BIGINT,0,2,3,0.4791368695976618,0.4996131593160194,0,0,1,128648,0.0
