# Obtaining Control Patients and All Note Encounters

In [None]:
import duckdb
import pandas as pd
import numpy as np
import json

import ipywidgets as widgets
from IPython.display import display, Markdown

# timeoutput
import datetime

# regex
import re

# plots
import matplotlib.pyplot as plt

In [None]:
#! change the base_path to the IC data location in Wynton


# Functions for easy pulling of CDW data

def file_path_parquet(filename, datatype):
    base_path = f"path/to/ic/data/{datatype}/"
    parquet_wild = "/*.parquet"
    return f"{base_path}{filename}{parquet_wild}"

def rtime():
    # Get the current datetime
    current_datetime = datetime.datetime.now()
    # Define a mapping of days of the week to colors
    day_color_mapping = {
        0: 'red',       # Monday
        1: 'orange',    # Tuesday
        2: 'green',     # Wednesday
        3: 'blue',      # Thursday
        4: 'purple',    # Friday
        5: 'brown',     # Saturday
        6: 'gray',      # Sunday
    }

    # Get the day of the week (0=Monday, 1=Tuesday, ..., 6=Sunday)
    day_of_week = current_datetime.weekday()
    # Get the color based on the day of the week
    text_color = day_color_mapping.get(day_of_week, 'black')  # Default to black if the day is not found in the mapping
    # Format the current datetime
    formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S")
    # Generate the formatted output with the corresponding color
    formatted_output = f"\n<b><span style='color:{text_color}'>Ran: {formatted_datetime}</span></b>\n"
    # Display the formatted output using Markdown
    display(Markdown(formatted_output))
    
rtime()

In [None]:
#! change the path to scratch and the username


# wynton_username with your actual Wynton username
username = 'name'

# Spill data that doesn't fit into memory into Wynton Scratch storage (BeeGFS)
# Increase up to 12 threads and 150 GB of memory to not overwhelm the system
# Recommendation: ~12 GB of memory for each thread
# reduce if there are other system limitations in place
config_query = f"""
    SET temp_directory = 'path/to/scratch/{username}/duckdb_dir';
    SET preserve_insertion_order = false;
    SET memory_limit = '150GB';
    SET threads TO 12;
"""

# Create a connection with configurations
con = duckdb.connect()
con_info = con.execute(config_query)  # Apply configuration settings

display(con_info)
rtime()

# Data

In [None]:
# get the concepts for MS
ms_list = [374919, 4178929, 4145049, 4137855, 37110514]

bad_pats = ['-1', '*Unspecified']

ms_pats = pd.read_csv("ms_cohort.csv")

rtime()

### OMOP Data

In [None]:
# condition_occurrence
condition_occurrence_ucsf = con.read_parquet(file_path_parquet('condition_occurrence', 'DEID_OMOP'))

# person demographics
person_ucsf = con.read_parquet(file_path_parquet('person', 'DEID_OMOP'))

# person linkage OMOP - CDW
person_extension_ucsf = con.read_parquet(file_path_parquet('person_extension', 'DEID_OMOP'))

# visit_occurrence
visit_occurrence_ucsf = con.read_parquet(file_path_parquet('visit_occurrence', 'DEID_OMOP'))

# condition occurrence to link to CDW
condition_occurrence_extension_ucsf = con.read_parquet(file_path_parquet('condition_occurrence_extension', 'DEID_OMOP'))


rtime()

### CDW Data

In [None]:
# deid_note_key and negation terms
note_concepts = con.read_parquet(file_path_parquet('note_concepts', 'DEID_CDW'))

# linker to patientdurablekey, encoutnerkey, and deid_note_key
note_metadata = con.read_parquet(file_path_parquet('note_metadata', 'DEID_CDW'))

# note text - only deid_note_key and note_text
note_text = con.read_parquet(file_path_parquet('note_text', 'DEID_CDW'))

# diagnosis event fact
diag_fact = con.read_parquet(file_path_parquet('diagnosiseventfact', 'DEID_CDW'))

# patdurabledim
patdurabledim = con.read_parquet(file_path_parquet('patdurabledim', 'DEID_CDW'))


rtime()

In [None]:
# Query for a specific patient
specific_query = """
SELECT person_id,
    source_key_value
FROM person_extension_ucsf
WHERE person_id = 'KEY'
"""
con.query(specific_query)

# Cohort

### OMOP <-> CDW: patient table

In [None]:
# Convert pandas Series to list
ms_pats_list = ms_pats['person_id'].tolist()

# drop the existing table if exists
con.query("DROP TABLE IF EXISTS inv_patient_table")

# create the table
query_create_table = f"""
CREATE TABLE inv_patient_table AS
    SELECT
        co.person_id,
        p.person_source_value AS patientepicid,
        pd.patientdurablekey
    FROM condition_occurrence_ucsf co
    JOIN person_ucsf p
        ON co.person_id = p.person_id
    JOIN patdurabledim pd
        ON p.person_source_value = pd.patientepicid
    WHERE p.person_source_value NOT IN {tuple(bad_pats)}
        AND co.condition_concept_id NOT IN {tuple(ms_list)}
        AND co.person_id NOT IN {tuple(ms_pats_list)}
    GROUP BY co.person_id, p.person_source_value, pd.patientdurablekey
    HAVING COUNT(DISTINCT co.visit_occurrence_id) >= 5
"""

con.query(query_create_table)

# index the new table
con.query("CREATE INDEX inv_person_id ON inv_patient_table(person_id)")
con.query("CREATE INDEX inv_patientepicid ON inv_patient_table(patientepicid)")
con.query("CREATE INDEX inv_patientdurablekey ON inv_patient_table(patientdurablekey)")

rtime()

In [None]:
# count the number of patients
con.query("SELECT COUNT(DISTINCT(person_id)) FROM inv_patient_table")

### OMOP <-> CDW: condition and encounter linker

A unique identifier for each record in an OMOP domain. For example, condition_occurrence.condition_occurrence_id joins to condition_occurrence_extension.condition_occurrence_id.

In [None]:
# drop the existing table if exists
con.query("DROP TABLE IF EXISTS conlink_table")

# create the table
query_create_table = f"""
CREATE TABLE conlink_table AS
    SELECT
        pt.person_id,
        pt.patientepicid,
        co.condition_occurrence_id,
        co.condition_concept_id,
        exco.diagnosiseventkey
    FROM condition_occurrence_ucsf co
    JOIN inv_patient_table pt
        ON co.person_id = pt.person_id
    JOIN (
        SELECT condition_occurrence_id,
            source_key_value AS diagnosiseventkey
        FROM condition_occurrence_extension_ucsf
        WHERE source_table_name = 'DiagnosisEventFact'
    ) exco
        ON exco.condition_occurrence_id = co.condition_occurrence_id
    WHERE co.condition_concept_id NOT IN {tuple(ms_list)}
"""
con.query(query_create_table)

# index the new table
con.query("CREATE INDEX idx_diagnosiseventkey ON conlink_table(diagnosiseventkey)")

rtime()

In [None]:
# look at top 10 hits
con.query("SELECT * FROM conlink_table LIMIT 10")

In [None]:
# count of patients
con.query("SELECT COUNT(DISTINCT(person_id)) FROM conlink_table")

### CDW: patient encounters

In [None]:
# drop the existing table if exists
con.query("DROP TABLE IF EXISTS inv_enc_table")

# create the table
query_enc_table = f"""
CREATE TABLE inv_enc_table AS
    SELECT 
        co.person_id,
        co.patientepicid,
        enc.patientdurablekey,
        co.condition_occurrence_id,
        enc.encounterkey,
        enc.startdatekeyvalue,
        enc.diagnosisname
    FROM conlink_table co
    JOIN (
        SELECT
            pt.patientdurablekey,
            df.diagnosiseventkey,
            df.diagnosisname,
            df.encounterkey,
            df.startdatekeyvalue
        FROM diag_fact df
        JOIN inv_patient_table pt
            ON pt.patientdurablekey = df.patientdurablekey
    ) AS enc
        ON co.diagnosiseventkey = enc.diagnosiseventkey
"""
con.query(query_enc_table)

# index the new table
con.query("CREATE INDEX inv_person_id_2 ON inv_enc_table(person_id)")
con.query("CREATE INDEX inv_patientepicid_2 ON inv_enc_table(patientepicid)")
con.query("CREATE INDEX inv_patientdurablekey_2 ON inv_enc_table(patientdurablekey)")
con.query("CREATE INDEX inv_encounterkey_2 ON inv_enc_table(encounterkey)")

rtime()

### Merge with all notes to get counts and statistics for these

In [None]:

query_con_note_dates = f"""
WITH note_dates AS (
    SELECT pt.patientepicid,
        MIN(note.deid_service_date) AS note_fdate,
        MAX(note.deid_service_date) AS note_ldate,
        CAST(DATEDIFF('day', note_fdate, note_ldate) / 365.25 AS FLOAT) AS note_years,
        COUNT(DISTINCT(note.deid_note_key)) AS note_count
    FROM note_metadata note
    JOIN inv_patient_table pt ON pt.patientepicid = note.patientepicid
    WHERE note.deid_service_date >= DATE '1930-01-01'
        AND note.deid_service_date <= DATE '2027-01-01'
    GROUP BY pt.patientepicid
)
SELECT enc.*,
    note.note_fdate,
    note.note_ldate,
    note.note_years,
    note.note_count,
FROM note_dates note
JOIN inv_enc_table enc
ON note.patientepicid = enc.patientepicid
"""
note_all_result = con.query(query_con_note_dates)

In [None]:
# takes a long time when actually called
parquet_query = """
SELECT *
FROM note_all_result
"""

con.execute(f"COPY ({parquet_query}) TO 'control_cohort.parquet' (FORMAT PARQUET)")

### Duplicates

It seems some patients have duplications across OMOP and CDW. You will need to remove these patients

In [None]:
# get just the patients
con_pats = con.read_parquet("control_cohort.parquet")

In [None]:
query_just_pats = f"""
SELECT DISTINCT
    person_id,
    patientepicid,
    patientdurablekey,
    note_fdate,
    note_ldate,
    note_years,
    note_count
FROM con_pats
"""

con_pats_df = con.query(query_just_pats).df()

In [None]:
df = con.query("SELECT * FROM con_pats").df()

In [None]:
# Show duplicate values and their counts
print("Duplicates in column1:")
print(con_pats_df['person_id'].value_counts()[con_pats_df['person_id'].value_counts() > 1])

print("\nDuplicates in column2:")
print(con_pats_df['patientdurablekey'].value_counts()[con_pats_df['patientdurablekey'].value_counts() > 1])

In [None]:
def remove_dupes(df, columns):
    mask = pd.Series(True, index=df.index)
    dupes_info = {}
    
    for col in columns:
        # Find duplicates only in remaining rows
        dup_mask = df[mask][col].duplicated(keep=False)
        dupes = df[mask][dup_mask][col].unique()
        if len(dupes) > 0:
            dupes_info[col] = df[mask][dup_mask][col].unique() 
            mask &= ~df[col].isin(dupes_info[col])
    
    return df[mask], dupes_info


con_pats_df_ndup, dupes = remove_dupes(con_pats_df, ['person_id', 'patientepicid', 'patientdurablekey'])

In [None]:
# check the lengths are the same now
len(set(df_pat_ndup['person_id'])) == len(set(df_pat_ndup['patientepicid'])) == len(set(df_pat_ndup['patientdurablekey']))

**Fix the encounter table to reflect these patients**

In [None]:
query_out_dupes = f"""
SELECT *
FROM con_pats
WHERE person_id NOT IN {tuple(dupes['person_id'].tolist())}
    AND patientdurablekey NOT IN {tuple(dupes['patientdurablekey'].tolist())}
"""

con_pats_ndup = con.query(query_out_dupes)

In [None]:
# runs the filtering script
parquet_query = """
SELECT *
FROM con_pats_ndup
"""

con.execute(f"COPY ({parquet_query}) TO 'control_cohort_new.parquet' (FORMAT PARQUET)")

# Analyze Control Population - Optional

In [None]:
con_pats = con.read_parquet("control_cohort_new.parquet")

In [None]:
# number of patients
len(set(con_pats.select('person_id').df()['person_id']))

In [None]:
# number of notes
con.query("SELECT COUNT(*) FROM con_pats")

In [None]:
# view the top N columns of the notes data
con.query("SELECT * FROM con_pats ORDER BY person_id, startdatekeyvalue LIMIT 20").df()

In [None]:
# obtain a simplified dataframe for further analysis
query_just_pats = f"""
SELECT DISTINCT
    person_id,
    patientepicid,
    patientdurablekey,
    note_fdate,
    note_ldate,
    note_years,
    note_count
FROM con_pats
"""

con_pats_simp = con.query(query_just_pats).df()
con_pats_simp.shape

Further analysis is optional now the data is loaded