In [5]:
import os
import sys
import socket
hostname = socket.gethostname()

import pandas as pd
import numpy as np

import dask
import dask.dataframe as dd
import dask.array as da
import dask.bag as db

from dask_sql import Context

from dask_jobqueue import SGECluster
from dask.distributed import Client
import tqdm

In [6]:
i = 0

for i in range(100):
    try:
        cluster =  SGECluster(
            queue = 'short.q',
            cores = 2,
            memory = '48GiB',
            walltime = '00:30:00',
            local_directory = f'{os.getcwd()}/dask_temp',
            log_directory = f'{os.getcwd()}/dask_temp/dask_log',
            python = sys.executable,
            resource_spec='x86-64-v=3',
            scheduler_options = {
                'host': f'{socket.gethostbyname(hostname)}:{40000 + i}'
            }
        )
    except:
        pass
    else:
        print(f'Using Port {40000 + i}...')
        break

cluster.scale(50)
client = Client(cluster)

client.dashboard_link

Perhaps you already have a cluster running?
Hosting the HTTP server on port 45641 instead
Perhaps you already have a cluster running?
Hosting the HTTP server on port 38577 instead
Perhaps you already have a cluster running?
Hosting the HTTP server on port 34105 instead
Perhaps you already have a cluster running?
Hosting the HTTP server on port 37221 instead


Using Port 40003...


'https://ic-app.wynton.ucsf.edu/user/adtserapio/proxy/37221/status'

In [4]:
data_asset = 'DEID_CDW' # Either DEID_CDW, DEID_OMOP, or PEDB
tables = ['note_text', 'note_metadata'] # Enter tables you are planning to query here

c = Context()
dask_dfs = dict()
for table in tables:
    dask_dfs[table] = dd.read_parquet(f'/wynton/protected/project/ic/data/parquet/{data_asset}/{table}/')
    c.create_table(table, dask_dfs[table])  

## Radiology Report Inclusion-Exclusion

In [5]:
query = c.sql("""
    SELECT 
        COUNT(DISTINCT note_metadata.patientdurablekey) AS unique_patientdurablekey_count,
        COUNT(DISTINCT note_metadata.deid_note_key) AS unique_deid_note_key_count
    FROM note_metadata
    WHERE note_type IS NOT NULL AND note_type IN ('Imaging')
""")

rad_report_counts = query.compute()



In [6]:
rad_report_counts

Unnamed: 0,unique_patientdurablekey_count,unique_deid_note_key_count
0,1745595,34361924


In [7]:
n_rad_total = rad_report_counts["unique_deid_note_key_count"].iloc[0]
n_patients_total = rad_report_counts["unique_patientdurablekey_count"].iloc[0]

print(n_patients_total, n_rad_total)

1745595 34361924


#### Number of Patients: 1,745,595
#### Radiology Reports: 34,361,924

In [8]:
query = c.sql("""
    SELECT *
    FROM (
        SELECT *
        FROM note_text
        WHERE note_text LIKE '%ADDITIONAL HISTORY%'
    ) nt
    INNER JOIN (
        SELECT *
        FROM note_metadata
        WHERE note_type IN ('Imaging')
    ) nm
    ON nt.deid_note_key = nm.deid_note_key;
"""
)

rad_report_additional_history = query.compute()
rad_report_additional_history["additional_history"] = rad_report_additional_history["note_text"].str.extract(r'ADDITIONAL HISTORY:\s*(.*?)\s*COMPARISON:').fillna("")



In [9]:
rad_report_additional_history["additional_history"] = rad_report_additional_history["note_text"].str.extract(
    r'ADDITIONAL HISTORY:\s*(.*?)(?=\s{4}|[A-Z][A-Z\s]*(?::|$))'
).fillna("")

In [10]:
excluded_values = {"none", "none.", "none available", "none available.", "none provided", "none provided.","n/", "n/a", "history of", "research study", "research study.", "none provided.", "*****", "as above", ".", "see above", "no", "non-", "per", "", "no available relevant prior"}

rad_report_filter_none = rad_report_additional_history[
    (rad_report_additional_history["additional_history"].apply(len) > 0) &
    (~rad_report_additional_history["additional_history"].str.strip().str.lower().isin(excluded_values))
]

In [11]:
n_rad_filter_none = rad_report_filter_none["nt.deid_note_key"].nunique()

In [12]:
print(n_rad_total, n_rad_filter_none, n_rad_total - n_rad_filter_none)

34361924 247132 34114792


#### Total Radiology Reports: 34,361,924 
#### Radiology Reports that have Additional History: 247,132
#### Radiology Reports Excluded: 34,114,792

In [13]:
rad_report_not_wet_read = rad_report_filter_none[
    ~rad_report_filter_none["note_text"].str.contains("---  RADIOLOGY PRELIMINARY INTERPRETATION. FULL REPORT TO FOLLOW.  ---")
]

In [14]:
n_rad_filter_wet = rad_report_not_wet_read["nt.deid_note_key"].nunique()

In [15]:
print(n_rad_filter_none, n_rad_filter_wet, n_rad_filter_none - n_rad_filter_wet)

247132 202664 44468


#### Total Radiology Reports (After Filtering None): 247,132
#### Radiology Reports that are not wet reads: 202,664
#### Radiology Reports Excluded: 44,468

In [16]:
rad_report_not_redacted_exam = rad_report_not_wet_read.copy()
rad_report_not_redacted_exam["exam_type"] = rad_report_not_redacted_exam["note_text"].apply(lambda s: s.split("  ")[0].replace(":", ""))
rad_report_not_redacted_exam = rad_report_not_redacted_exam[
    (rad_report_not_redacted_exam["exam_type"] != "") & 
    (~rad_report_not_redacted_exam["exam_type"].str.contains("\*"))
]

In [17]:
n_rad_exam_redact = rad_report_not_redacted_exam["nt.deid_note_key"].nunique()

In [18]:
print(n_rad_filter_wet, n_rad_exam_redact, n_rad_filter_wet - n_rad_exam_redact)

202664 155874 46790


#### Total Radiology Reports (After Filtering Wet): 202,664
#### Radiology Reports that do not have exam type redactions: 155,874
#### Radiology Reports Excluded: 46,790

In [19]:
rad_report_not_redacted_histories = rad_report_not_redacted_exam.copy()
rad_report_not_redacted_histories["original_history"] = rad_report_not_redacted_histories["note_text"].str.extract(
    r'(?:CLINICAL HISTORY:|INDICATION:|INDICATION FOR EXAM:|CLINICAL HISTORY provided by referring provider:|INDICATION \(as provided by referring clinician\):)\s*(.*?)(?=\s{4}|[A-Z][A-Z\s]*(?::|$))'
).squeeze().fillna("")

INFO:distributed.utils_perf:full garbage collection released 79.37 MiB from 19 reference cycles (threshold: 9.54 MiB)


In [20]:
rad_report_not_redacted_histories = rad_report_not_redacted_histories[
    (rad_report_not_redacted_histories["original_history"] != "") & 
    (~rad_report_not_redacted_histories["additional_history"].str.contains("\*\*\*\*\*")) & 
    (~rad_report_not_redacted_histories["original_history"].str.contains("\*\*\*\*\*")) 
]

In [21]:
n_rad_histories_redact = rad_report_not_redacted_histories["nt.deid_note_key"].nunique()

In [22]:
print(n_rad_exam_redact, n_rad_histories_redact, n_rad_exam_redact - n_rad_histories_redact)

155874 79313 76561


#### Total Radiology Reports (After Filtering Exam Type Redactions): 155,874
#### Radiology Reports that do not have history redactions: 79,313
#### Radiology Reports Excluded: 76,561

In [23]:
radiology_indication = rad_report_not_redacted_histories.copy()
radiology_indication = dd.from_pandas(radiology_indication, npartitions=1) 
c.create_table("radiology_indication", radiology_indication)

query = c.sql("""
SELECT 
    nm.patientdurablekey
FROM note_metadata nm
INNER JOIN 
    radiology_indication ri
ON 
    nm.patientdurablekey = ri.patientdurablekey
WHERE 
    nm.note_type NOT IN ('Imaging') 
"""
)

has_clinical_notes = query.compute()

INFO:distributed.utils_perf:full garbage collection released 34.49 MiB from 995 reference cycles (threshold: 9.54 MiB)


In [24]:
rad_report_has_clinical_notes = rad_report_not_redacted_histories[
    rad_report_not_redacted_histories["patientdurablekey"].isin(
    has_clinical_notes["patientdurablekey"].unique()
)]

In [25]:
n_rad_with_notes = rad_report_has_clinical_notes["nt.deid_note_key"].nunique()

In [26]:
print(n_rad_histories_redact, n_rad_with_notes, n_rad_histories_redact - n_rad_with_notes)

79313 79091 222


#### Total Radiology Reports (After Filtering Reports without Notes): 79,313
#### Radiology Reports that do not have exam type redactions: 79,091
#### Radiology Reports Excluded: 222

In [27]:
print("n_patients", rad_report_has_clinical_notes["patientdurablekey"].nunique())
print("n_notes", rad_report_has_clinical_notes["nt.deid_note_key"].nunique())

n_patients 28940
n_notes 79091


## Clinical Notes

In [28]:
radiology_indication = rad_report_has_clinical_notes.copy()
radiology_indication = dd.from_pandas(radiology_indication, npartitions=1) 
c.create_table("radiology_indication", radiology_indication)

query = c.sql("""
SELECT 
    COUNT(DISTINCT nm.patientdurablekey) AS unique_patientdurablekey_count,
    COUNT(DISTINCT nm.deid_note_key) AS unique_deid_note_key_count
FROM note_metadata nm
INNER JOIN 
    radiology_indication ri
ON 
    nm.patientdurablekey = ri.patientdurablekey
WHERE 
    nm.note_type NOT IN ('Imaging') 
"""
)

clinical_note_counts = query.compute()



In [29]:
clinical_note_counts

Unnamed: 0,unique_patientdurablekey_count,unique_deid_note_key_count
0,28940,11751002


In [118]:
# START_IDX = 25000
# END_IDX = 29000
# INTERVAL = 1000

# patient_ids = rad_report_has_clinical_notes["patientdurablekey"].unique()

for CURR_IDX in tqdm.tqdm(range(START_IDX, END_IDX, INTERVAL)):
    curr_ids = patient_ids[CURR_IDX:CURR_IDX+INTERVAL]
    radiology_indication = rad_report_not_redacted_histories[
        rad_report_not_redacted_histories["patientdurablekey"].isin(curr_ids)
    ].copy()
    radiology_indication = dd.from_pandas(radiology_indication, npartitions=1) 
    c.create_table("radiology_indication", radiology_indication)

    query = c.sql("""
    SELECT DISTINCT
        nm.patientdurablekey,
        nm.deid_note_key,
        nm.note_type,
        nm.enc_dept_name,
        nm.auth_prov_type,
        nm.deid_service_date
    FROM 
        note_metadata nm
    INNER JOIN 
        radiology_indication ri
    ON 
        nm.patientdurablekey = ri.patientdurablekey
    WHERE 
        nm.note_type NOT IN ('Imaging') 
    ORDER BY 
        ri.patientdurablekey DESC, nm.deid_service_date DESC;
    """
    )
    patient_notes = query.compute()
    filtered_patient_notes = patient_notes.drop_duplicates(subset=["deid_note_key"])    
    filtered_patient_notes = dd.from_pandas(filtered_patient_notes, npartitions=1)
    c.create_table("filtered_patient_notes", filtered_patient_notes)

    query = c.sql("""
    SELECT DISTINCT
        fpn.patientdurablekey,
        fpn.deid_note_key,
        fpn.deid_service_date,
        fpn.note_type,
        fpn.enc_dept_name,
        fpn.auth_prov_type,
        nt.note_text
    FROM 
        filtered_patient_notes fpn
    INNER JOIN 
        note_text nt 
    ON 
        fpn.deid_note_key = nt.deid_note_key
    ORDER BY 
        fpn.patientdurablekey DESC, fpn.deid_service_date DESC;
    """
    )
    filtered_patient_note_texts = query.compute()
    filtered_patient_note_texts.to_parquet(f"clinical_notes/patient_notes_{CURR_IDX}_{CURR_IDX+INTERVAL}.parquet")

In [30]:
parquet_files = []

for curr_idx in tqdm.tqdm(range(0, 29000, 1000)):
    parquet_files.append(pd.read_parquet(f"clinical_notes/patient_notes_{curr_idx}_{curr_idx+1000}.parquet"))
    
all_clinical_notes = pd.concat(parquet_files)

100%|██████████| 29/29 [03:03<00:00,  6.33s/it]


In [31]:
all_clinical_notes["patientdurablekey"].nunique()

INFO:distributed.core:Event loop was unresponsive in Scheduler for 3.72s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.


28940

In [32]:
clinical_notes_indication = all_clinical_notes[
    ~(all_clinical_notes["note_type"].isin(["Telephone Encounter", "Patient Instructions"])) & 
    ~(all_clinical_notes["note_type"].isna()) &
    (all_clinical_notes["note_text"].apply(lambda t: len(t.split())) > 20)
]

INFO:distributed.core:Event loop was unresponsive in Scheduler for 7.14s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.


In [33]:
clinical_notes_indication["patientdurablekey"].nunique()

28908

In [34]:
clinical_notes_indication["deid_note_key"].nunique()

7328684

## Dataset Curation

In [39]:
radiology_indication = rad_report_has_clinical_notes.copy()
radiology_indication = radiology_indication[radiology_indication["patientdurablekey"].isin(
    clinical_notes_indication["patientdurablekey"].unique()
)].rename(
    columns={"nt.deid_note_key": "deid_note_key"}
)[[
    "patientdurablekey",
    "deid_note_key",
    "note_text",
    "deid_service_date",
    "exam_type",
    "additional_history",
    "original_history"
]].reset_index(drop=True)
radiology_indication.to_parquet("radiology_indication.parquet")

In [43]:
NUM_NOTES = 10

clinical_indication_dataset = pd.DataFrame(columns=[
    "patientdurablekey",
    "radiology_deid_note_key",
    "exam_type",
    "radiology_text",
    "radiology_deid_service_date",
    "original_history",
    "additional_history",
    "enc_dept_names",
    "note_types",
    "auth_prov_types",
    "deid_service_dates",
    "note_texts"
])

START_IDX = 70000
END_IDX = len(radiology_indication)
INTERVAL = 10000

for CURR_IDX in range(START_IDX, END_IDX, INTERVAL):
    for i in tqdm.tqdm(range(CURR_IDX, CURR_IDX+INTERVAL)):
        radiology_report = radiology_indication.iloc[i]
        patient_mrn = radiology_report["patientdurablekey"]
        patient_notes = clinical_notes_indication[clinical_notes_indication["patientdurablekey"] == patient_mrn].copy()    
        filtered_patient_notes = patient_notes[
            (patient_notes["deid_service_date"] < radiology_report["deid_service_date"]) & 
            (patient_notes["note_type"] != "Imaging")
        ].sort_values(by=["deid_service_date"], ascending=False).reset_index(drop=True)[:NUM_NOTES]
        if len(filtered_patient_notes) > 1:
            enc_dept_names = filtered_patient_notes[:NUM_NOTES][["enc_dept_name"]].squeeze().tolist()
            note_types = filtered_patient_notes[:NUM_NOTES][["note_type"]].squeeze().tolist()
            auth_prov_types = filtered_patient_notes[:NUM_NOTES][["auth_prov_type"]].squeeze().tolist()
            deid_service_dates = filtered_patient_notes[:NUM_NOTES][["deid_service_date"]].squeeze().tolist()
            note_texts = filtered_patient_notes[:NUM_NOTES][["note_text"]].squeeze().tolist()
            row = {
                "patientdurablekey": radiology_report["patientdurablekey"],
                "exam_type": radiology_report["exam_type"],
                "radiology_deid_note_key": radiology_report["deid_note_key"],
                "radiology_text": radiology_report["note_text"],
                "radiology_deid_service_date": radiology_report["deid_service_date"],
                "original_history": radiology_report["original_history"],
                "additional_history": radiology_report["additional_history"],
                "enc_dept_names": enc_dept_names,
                "note_types": note_types,
                "auth_prov_types": auth_prov_types,
                "deid_service_dates": deid_service_dates,
                "note_texts": note_texts,
            }
            clinical_indication_dataset.loc[
                len(clinical_indication_dataset)
            ] = row

    clinical_indication_dataset.to_parquet(f"indication_dataset/{CURR_IDX}_{CURR_IDX+INTERVAL}.parquet")

## Indication Dataset Development Dataset Sampling

In [3]:
import pandas as pd

In [4]:
total = 79032
chunk_size = 10000

indication_dataset = pd.concat([
    pd.read_parquet(f"indication_dataset/{start}_{min(start + chunk_size, total)}.parquet")
    for start in range(0, total, chunk_size)
]).drop_duplicates(subset=["radiology_deid_note_key"])

In [10]:
indication_dataset.reset_index(drop=True).sample(n=100).to_parquet("llm_labels.parquet")

In [6]:
len(indication_dataset)

77630

In [106]:
cluster.close()
client.close()

INFO:distributed.core:Event loop was unresponsive in Scheduler for 18.57s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
INFO:distributed.scheduler:Scheduler closing...
INFO:distributed.scheduler:Scheduler closing all comms
INFO:distributed.core:Connection to tcp://172.26.37.17:36436 has been closed.
INFO:distributed.scheduler:Remove worker <WorkerState 'tcp://172.26.37.17:33131', name: SGECluster-7-0, status: running, memory: 9, processing: 2>
INFO:distributed.core:Removing comms to tcp://172.26.37.17:33131
INFO:distributed.scheduler:Lost all workers
