In [None]:
import os
import sys
import time
import json
import regex as re
from collections import Counter

import logging
logger = logging.getLogger()
logger.setLevel(logging.CRITICAL)

import pandas as pd
from nltk import ngrams
from nltk.corpus import stopwords


# Digital health terms

## ClinicalTrials.gov: June 29, 2023

In [None]:
from DHTermSearch import query_ctgov_api

### Retrieve DTx clinical trials that contain "digital health"
queries = ["digital health"]

fields_list = {"Metadata": ["NCTId", "OverallStatus", "StartDate",
                            "BriefSummary", "InterventionName", "InterventionDescription", "BriefTitle",
                             "Keyword", "DetailedDescription", "OfficialTitle", "EligibilityCriteria"]} 
    
# get clinical trials that match specific search fields
search_fields = fields_list["Metadata"]

full_query_df = pd.DataFrame(columns=["NCTId"])
for q in queries:
    print(q)
    clinical_df = pd.DataFrame(columns=["NCTId"])
    for key in fields_list.keys(): 
        curr_df = query_ctgov_api(q, fields_list[key], search_field=search_fields)
        clinical_df = clinical_df.merge(curr_df, how="outer", left_on="NCTId", right_on="NCTId")
    clinical_df["query"]=q
    print(len(clinical_df))
    full_query_df = pd.concat([clinical_df, full_query_df])

full_query_df = full_query_df.drop_duplicates(subset=["NCTId"])
print(len(full_query_df))

# Remove stopwords using NLTK English stopwords, as well as removing special characters and numbers
# Only words with >3 characters were considered
all_stopwords = stopwords.words('english')
all_stopwords = all_stopwords + ["patient", "subject", "participant", "studies", "study", "years", "months",
                  "individual", "eg",  "weeks", "diagnosis", "participation", "participate", "patients", "participants"]

text = " ".join(list(full_query_df.apply(' '.join, axis=1)))
text = text.lower()
text = re.sub('inclusion criteria', ' ', text)
text = re.sub('exclusion criteria', ' ', text)
text = re.sub('[\n\t\-,]', ' ', text)
text = re.sub('[^A-Za-z ]+', ' ', text)
text = re.sub('[ ]+', ' ', text)
text = text.split(" ")
text = [t for t in text if t not in all_stopwords if len(t)>3]

# Top 1000 1,2, and 3-grams present in clinical trial descriptions, titles, eligibility criteria,
# For 1-grams, only specialty names were considered
terms_df = pd.DataFrame()

for i in range(1,4):
    ngram_counts = Counter(ngrams(text, i))
    ngram_list = [(" ".join(i), ngram_counts[i]) for i in ngram_counts if ngram_counts[i]]
    ngram_df = pd.DataFrame(ngram_list, columns = ["term", "count"])
    ngram_df = ngram_df.sort_values("count", ascending=False)
    ngram_df = ngram_df[ngram_df["count"]>49]
    terms_df = pd.concat([terms_df, ngram_df])
    
# Last run June 29, 2023
terms_df.to_csv("./output/dhterms/ctgov_digitalhealthterms.csv")


## Pubmed: June 29, 2023

In [None]:
from DHTermSearch import get_pmc_ids, fetch_full_pmc_text

# Get all pmc_ids with "digital health" in Title or Abstract that occur between 2012/01/01 and 2022/12/31
search_params = {"db":"pmc",
                 "retmode":"json",
                 "mindate":"2012/01/01",
                 "maxdate":"2022/12/31",
                }
all_pmc_ids = []
for field in ["Body - Key Terms", "Abstract", "Title"]: #, 
    search_params["field"] = field
    pmc_ids = get_pmc_ids(query='"digital health"', search_params=search_params)
    pmc_ids = pmc_ids["esearchresult"]["idlist"]
    all_pmc_ids.extend(pmc_ids)
    print("Articles with 'digital health' in %s:"%field, len(pmc_ids))

print("Total values:", len(all_pmc_ids))
all_pmc_ids = list(set(all_pmc_ids))
print("Total values - deduplicated:", len(all_pmc_ids))

# Get all full text and abstracts from each Digital health PMC paper
all_pmc_texts = pd.DataFrame()
for i in range(0,len(all_pmc_ids), 200):
    print(i,min(len(all_pmc_ids), i+200))
    
    pmc_texts = fetch_full_pmc_text(all_pmc_ids[i:min(len(all_pmc_ids), i+200)])
    
    pmc_df = pd.DataFrame(pmc_texts)
    all_pmc_texts = pd.concat([all_pmc_texts, pmc_df])

# Clean up values
all_pmc_texts = all_pmc_texts.drop_duplicates(["pmc_id"])
all_pmc_texts = all_pmc_texts.set_index("pmc_id")
all_pmc_texts.to_parquet("./output/dhterms/pmc_digitalhealthtexts.parquet")


In [None]:
pmc_texts_df = pd.read_parquet("./output/dhterms/pmc_digitalhealthtexts.parquet")

# Remove stopwords, numnbers, extra spaces, and special characters 
all_stopwords = stopwords.words('english')
all_stopwords = all_stopwords + ["patient", "subject", "participant", "studies", "study", "years", "months",
                  "individual", "weeks", "eg",  "diagnosis", "participation", "participate", "patients", "participants"]

pmc_texts_df["text_clean"] = pmc_texts_df["text"].apply(lambda text: re.sub('[\n\t\-,]', ' ', text))
pmc_texts_df["text_clean"] = pmc_texts_df["text_clean"].apply(lambda text: re.sub('[^A-Za-z ]+', ' ', text))
pmc_texts_df["text_clean"] = pmc_texts_df["text_clean"].apply(lambda text: re.sub('[ ]+', ' ', text))
pmc_texts_df["text_clean"] = pmc_texts_df["text_clean"].str.lower()

text = " ".join(list(pmc_texts_df["text_clean"]))
text = text.split(" ")
text = [t for t in text if t not in all_stopwords if len(t)>3]

# 1,2, and 3-grams that occur at least 200 times in article text
terms_df = pd.DataFrame()

for gram in range(1,4):
    ngram_counts = Counter(ngrams(text, gram))
    ngram_list = [(" ".join(i), ngram_counts[i]) for i in ngram_counts if ngram_counts[i]]
    ngram_df = pd.DataFrame(ngram_list, columns = ["term", "count"])
    ngram_df = ngram_df.sort_values("count", ascending=False)
    
    mean = ngram_df["count"].mean()
    std = ngram_df["count"].std()
    
    print(len(ngram_df)) # 267323, 7244453, 18324292
    ngram_df = ngram_df[ngram_df["count"]>499]
    terms_df = pd.concat([terms_df, ngram_df])
    
print(len(terms_df)) # 6677
terms_df.to_csv("./output/dhterms/pubmed_digitalhealthterms.csv")


# Get digital health notes: July 3, 2023

In [None]:
import dask
import dask.dataframe as dd

from dask_jobqueue import SGECluster
from dask.distributed import Client

i=0
# Load cluster
while True:
    try:
        cluster =  SGECluster(
            queue = 'DEID',
            cores = 4,
            memory = '48GiB',
            walltime = '04:00:00',
            death_timeout = 60,
            local_directory = f'{os.getcwd()}/dask_temp',
            log_directory = f'{os.getcwd()}/dask_temp/dask_log',
            python = sys.executable,
            #python = "/wynton/group/jhadmin/base-clone/bin/python",
            resource_spec='DEID',
            scheduler_options = {
                'host': f'DEID{40000 + i}'
            }
        )
    except:
        pass
    else:
        print(f'Using Port {40000 + i}...')
        break
    i += 1
    if i%1000==0: print(i)

cluster.scale(250)
client = Client(cluster)
print(client.dashboard_link)

def load_register_table(data_asset, table, **kwargs):
    return dd.read_parquet(f'/DEID/{data_asset}/{table}/', **kwargs)



In [None]:
# Load search terms and get notes
# Some table and column names here are changed to generic values due to University policy on data use
note_data = load_register_table("DEID_CDW", "note_table")

search_list = pd.read_csv("./searchterms.csv", encoding='mac-roman')
search_list = search_list[search_list["EMERSE_check"] == "Yes"]
search_list = search_list[search_list["Group"]!="Telehealth"] # Telehealth removed

# For each type of search term, select note IDs and create a "partition"
for group in list(search_list["Group"].unique()):
    
    search_terms = search_list[search_list["Group"]==group]
    search_terms = "|".join(search_terms["Term"].to_list())

    # Remove irrelevant note types
    note_data = note_data[~note_data["encounter_type"].str.contains("Letter|Prepare |Documentation|STOR|Prep ", na=False)]
    note_data = note_data[note_data["note_text"].str.lower().str.contains(search_terms)]
    digital_df = digital_rdd.compute()
    digital_df.to_parquet(f"./output/dhnotes/raw/{group}_notes.parquet.gzip", compression="gzip")

    
    

## Add encounter information

In [None]:
import glob

### Load values and clean up + add encounter/patient metadata information
# Load as Dask DataFrame
note_files = glob.glob("./output/dhnotes/raw/*")

notes_df = pd.DataFrame()
for file in note_files:
    curr_df = pd.read_parquet(file)
    notes_df = pd.concat([curr_df, notes_df])

notes_df = dd.from_pandas(notes_df, npartitions=200)

# Add encounter information
encounter_rdd = load_register_table("DEID_CDW", "encounter_table")
merged_df = notes_df.merge(encounter_rdd, right_on='encounter_id', left_on="encounter_id", how="inner")
merged_df = merged_df.compute()

# Add year information and lower case text column
merged_df["year"] = [None if n is None else n.year for n in merged_df['date']]
merged_df["note_text_clean"] = [t.lower() for t in merged_df["note_text"]]

print(f"Initial note count: {merged_df.shape}")
print(f"Initial patient count: {merged_df['patientid'].nunique()}")

# drop duplicates (remove all templated instructions)
# This does not keep any duplicates
clean_df = merged_df.drop_duplicates(subset=["note_text_clean"],keep=False)
print(f"Dedup note count: {clean_df.shape}")
print(f"Dedup patient count: {clean_df['patientid'].nunique()}")

# Remove notes after 2022 or before 2012
clean_df = clean_df[(clean_df["year"]<2023)]
clean_df = clean_df[(clean_df["year"]>2011)]
print(f"Time window note count: {clean_df.shape}")
print(f"Time window patient count: {clean_df['patientid'].nunique()}")

# drop zoom links and patient portal information
# these values were manually curated from 
remove_zoom = ["zoom.us", "mychart app", "on your smartphone with the app",
              "download the free mychart","download the mychart","on-line and via mobile app",
               "please follow the link from your smartphone" ]
clean_df = clean_df[~clean_df["note_text_clean"].str.contains("|".join(remove_zoom))]

print(f"Without zoom link / patient portal notes count: {clean_df.shape}")
print(f"Without zoom link / patient portal patient count: {clean_df['patientid'].nunique()}")

# Add term information
search_list = pd.read_csv("./searchterms.csv", encoding='mac-roman')
search_list = search_list[search_list["EMERSE_check"] == "Yes"]
search_list = search_list[search_list["Group"]!="Telehealth"] # Telehealth removed

for term, term_clean in zip(search_list["Term"], search_list["Term_clean"]):
    clean_df[term_clean] = clean_df["note_text_clean"].str.contains(term)

# Save final values
clean_df.to_parquet("./output/dhnotes/DH_annotated_notes.parquet.gzip", compression="gzip")



## Save demographics

In [None]:
# Read in values
notes_df = pd.read_parquet("./output/dhnotes/DH_annotated_notes.parquet.gzip")

# Get patient metadata using values from previous set
patient_rdd = load_register_table("DEID_CDW", "patient_table")
patient_rdd = patient_rdd[patient_rdd["iscurrent"]==1]
patient_df = patient_rdd[patient_rdd["patientid"].isin(list(notes_df["patientid"].unique()))]
patient_df = patient_df.compute()
print(patient_df.shape)

patient_df.to_parquet("./output/dhnotes/demographics.parquet.gzip", compression="gzip")



In [None]:
from utils.figures import mask_small_group

# Get overall demographics to compare
patient_df = pd.read_parquet("./output/dhnotes/demographics.parquet.gzip")

# filter to get patients with notes who have notes after 2012 or before 2023
notes_meta_rdd = load_register_table("DEID_CDW", "note_table")
print("Total patients with notes:", notes_meta_rdd["patientid"].nunique().compute())

notes_meta_rdd = notes_meta_rdd[(notes_meta_rdd["deid_service_date"]>"2012")]
notes_meta_rdd = notes_meta_rdd[(notes_meta_rdd["deid_service_date"]<"2023")]

# Get total notes per year for these patients
notes_meta_rdd['year'] = notes_meta_rdd["deid_service_date"].dt.year
print(notes_meta_rdd["year"].value_counts().compute())

# add patient birthdays to note metadata
notes_meta_rdd = notes_meta_rdd[["patientid", "deid_service_date"]] 
notes_demo_rdd = patient_rdd.merge(notes_meta_rdd,how='inner',on='patientid')

# add age info
notes_demo_rdd["birthdate"] = notes_demo_rdd["birthdate"].astype('datetime64[ns]')
notes_demo_rdd["age_at_note"] = (notes_demo_rdd["deid_service_date"] - notes_demo_rdd["birthdate"]).dt.days / 365

# remove patients with digital health notes
# Total unique patient epic ids with notes without DH terms: 2797871
demographics_df = pd.read_parquet("./output/dhnotes/demographics.parquet.gzip")
notes_demo_rdd = notes_demo_rdd[~notes_demo_rdd["patientid"].isin(list(demographics_df["patientid"].unique()))]

# Get overall age of notes
print(notes_demo_rdd["age_at_note"].describe().compute().loc[["mean", "std", "50%", "25%", "75%"]])

# get unique patient data for categorical demographic calculations
notes_demo_rdd = notes_demo_rdd.drop_duplicates('patientid')
notes_demo_df = notes_demo_rdd.persist()
print("Total patients between 2012-2022 without DH notes:", notes_demo_df["patientid"].nunique().compute())

'''
Total patients between 2012-2022 without DH notes: 2170780
'''

# get categorical demographcs
demo_col = ['sex', 'preferredlanguage', 'mychartstatus', 'ucsfderivedraceethnicity_x']
all_demo_df = pd.DataFrame()

for demo in demo_col:
    demo_series = notes_demo_df[demo].value_counts().compute()
    
    # get values
    demo_df = demo_series.reset_index()
    demo_df.columns = ["value", "count"]
    demo_df["category"] = demo
    
    # mask values with less than 10 counts and get masked proportions
    demo_df = mask_small_group(demo_df, min_n=10)
    all_demo_df = pd.concat([all_demo_df, demo_df])
    
all_demo_df.to_csv("./output/no_dhnotes_demographics.csv")


In [None]:
# Read in values
notes_df = pd.read_parquet("./output/dhnotes/DH_annotated_notes.parquet.gzip")
demographics_df = pd.read_parquet("./output/dhnotes/demographics.parquet.gzip")

# get age of notes
notes_demo_df = notes_df.merge(demographics_df, how="left", on="patientid")
notes_demo_df["birthdate"] = notes_demo_df["birthdate"].astype('datetime64[ns]')
notes_demo_df["date"] = notes_demo_df["date"].astype('datetime64[ns]')

notes_demo_df["age_at_note"] = (notes_demo_df["date"] - notes_demo_df["birthdate"]).dt.days / 365
print(notes_demo_rdd["age_at_note"].describe().compute().loc[["mean", "std", "50%", "25%", "75%"]])

'''
mean    37.348978
std     24.883279
50%     36.032877
25%     15.054795
75%     58.268493
'''

# Get values for DH note patients
all_demo_df = pd.DataFrame()

for demo in demo_col:
    demo_series = demographics_df[demo].value_counts()
    
    # get values
    demo_df = demo_series.reset_index()
    demo_df.columns = ["value", "count"]
    demo_df["category"] = demo
    
    # mask values with less than 10 counts and get masked proportions
    demo_df = mask_small_group(demo_df, min_n=10)
    all_demo_df = pd.concat([all_demo_df, demo_df])
    
all_demo_df.to_csv('./output/dhnotes_demographics.csv')



In [None]:
# Checking when the "unknown" values occur for patients without DH notes
# And in which departments
from utils.figures import mask_small_group

# Get overall demographics to compare
patient_rdd = load_register_table("DEID_CDW", "patient_table")
patient_rdd = patient_rdd[patient_rdd["iscurrent"]==1]

# filter to get patients with notes who have notes after 2012 or before 2023
notes_meta_rdd = load_register_table("DEID_CDW", "note_table")

notes_meta_rdd = notes_meta_rdd[(notes_meta_rdd["note_date"]>"2012")]
notes_meta_rdd = notes_meta_rdd[(notes_meta_rdd["note_date"]<"2023")]

# add patient birthdays to note metadata
#notes_meta_rdd = notes_meta_rdd[["patientid", "note_date"]] 
notes_demo_rdd = patient_rdd.merge(notes_meta_rdd,how='inner',on='patientid')

# Get total notes per year for these patients
notes_demo_rdd['year'] = notes_demo_rdd["note_date"].dt.year

# add age info
notes_demo_rdd["birthdate"] = notes_demo_rdd["birthdate"].astype('datetime64[ns]')
notes_demo_rdd["age_at_note"] = (notes_demo_rdd["note_date"] - notes_demo_rdd["birthdate"]).dt.days / 365

# remove patients with digital health notes
# Total unique patient epic ids with notes without DH terms: 2797871
demographics_df = pd.read_parquet("./output/dhnotes/demographics.parquet.gzip")

# Only limit to patients who have "unknown" mychart status
notes_demo_rdd = notes_demo_rdd[~notes_demo_rdd["patientid"].isin(list(demographics_df["patientid"].unique()))]
notes_demo_rdd = notes_demo_rdd.dropna(subset="mychartstatus")
notes_demo_rdd = notes_demo_rdd[notes_demo_rdd["mychartstatus"].str.contains("unspecified", case=False, na=False)]

# Characterize notes per year for patients with "unspecified" mychart status (run 10/4)
notes_demo_rdd = notes_demo_rdd.compute()


## Get digital health sentences

In [None]:
from nltk.tokenize import sent_tokenize
from utils.figures import retrieve_dh_sentences

### Load notes and terms
notes_df = pd.read_parquet("./output/dhnotes/DH_annotated_notes.parquet.gzip")

terms = pd.read_csv("./searchterms.csv", header=0, encoding='mac-roman')
terms = terms[terms["EMERSE_check"] == "Yes"]
terms = terms[terms["File_group"]!="Telehealth"] # Telehealth removed

# clean up
notes_df["note_text_sent"] = notes_df["note_text_clean"].apply(lambda t: t.replace("*****", ""))
notes_df["note_text_sent"] = notes_df["note_text_sent"].apply(lambda t: sent_tokenize(t))

# extract sentences that mention digital health
dh_terms = list(terms["Term"])
dh_terms_clean = list(terms["Term_clean"])

for term, term_clean in zip(dh_terms, dh_terms_clean):
    print(term_clean)
    curr_df = notes_df[notes_df[term_clean]]
    curr_df["note_text_dh_sent_extended"] = retrieve_dh_sentences(list(curr_df["note_text_sent"]), term, extend=True)
    curr_df["note_text_dh_sent"] = retrieve_dh_sentences(list(curr_df["note_text_sent"]), term, extend=False)
    
    curr_df = curr_df[["note_id", "encounterkey", "note_text", "note_text_dh_sent", term_clean, 
                       "provider_specialty", "diagnosis", "encounter_department_specialty", "note_text_dh_sent_extended"]]
    if len(curr_df)>0:
        curr_df.to_parquet(f"./output/dhnotes/sentences/{term_clean}.parquet.gzip", compression="gzip")
        