# Project: Performance of phenotype algorithms for the identification of opioid-exposed infants, Andrew D. Wiese et al. Hospital Pediatrics 2024
# Title: Identify birthing parents with evidence of opioid drug exposure in medication lists 
# Summary: 
## Identify birthing parents with evidence of opioid drug exposure in medication lists from estimated time of conception through 30-days after delivery



##### Algorithm steps:

```
1.Get opioid search terms from database table

2.Get all notes for moms from database table

3.Filter notes containing opioid search terms

4.Further filter notes by date relative to baby birth date

5.Iterate through notes and find matched search terms using regex

6.Update notes dataframe with matched terms

7.Convert dataframe to Spark

8.Filter out notes without matched terms 

9.Filter notes containing 'medication list'

10.Output results:
    - Count of unique moms
    - Count of total notes
```

##### Data Dictionaries:

**mom_drug_search_term_list** - Database table containing list of opioid search terms

**note_table** - Database table containing clinical notes

**person_table** - Database table containing person IDs and MRNs

**mom_baby_step1** - Temporary table containing mom and baby ID pairs 

**mom_notes** - Temporary table containing all notes for moms

**opioid_notes** - Temporary table containing notes with opioid search terms

**opioid_df** - Temporary table containing filtered notes by date 

**search_opioid_terms** - Temporary table containing notes with matched terms

**search_opioid_terms_cleaned** - Temporary table containing only notes with matched terms

**search_opioid_terms_cleaned_medicationlist** - Final table containing notes with 'medication list'

##### Usage Notes:
```
- This pipeline looks for mentions of opioid medications in clinical notes around time of delivery to identify moms exposed to opioids

- It searches notes within 30 days before delivery and filters out notes within 2 days of delivery to avoid false matches on medications given during delivery

- Matching of search terms uses regex to handle partial matches and case-insensitive searching

- Results are deduplicated by mom and note IDs to avoid double counting 

- Final table filters for notes containing 'medication list' to increase likelihood of true opioid exposure

```

In [0]:
%run "./project_modules"

In [0]:
sql=f"select generic from {mom_drug_search_term_list};"
search_terms= spark.sql(sql)

search_terms_str=search_terms.agg(F.concat_ws(" or ",F.collect_list(F.concat(F.lit('note_text like "%'),F.col('generic'),F.lit('%"'))))).first()[0]

all_generics = search_terms.select('generic').rdd.flatMap(lambda x: x).collect()

##### Find matched notes functions

In [0]:
def is_in(full_str, sub_str):
    if re.findall(sub_str, full_str,re.I):
        return True
    else:
        return False

In [0]:
def update_notes(note_searched_output_df):
    
    spark = SparkSession.builder.appName("pandas to spark").getOrCreate()
    pat = '[a-zA-Z]+'
    count = 0;
    unique_words = []
    for index, row in note_searched_output_df.iterrows():
        found_strs = None
        note_text = str(row['note_text'])
        count = count + 1
        word_list = []
        if count%10000 == 0:
            print("Current record count:",count)
     
        for sen in all_generics:
            sen_words = re.findall(pat, sen)
            if len(sen_words) > 1:
                if is_in(note_text, sen):
                    found_strs = '%s ||||| %s' % (found_strs, sen)
            else:
                all_words = re.findall(pat, note_text)
                for word in all_words:
                    if word.lower() == sen.lower():
                        found_strs = '%s ||||| %s' % (found_strs, sen)


        unique_words.append(found_strs)
        
    note_searched_output_df["matched_words"] = unique_words
    return spark.createDataFrame(note_searched_output_df)

##### MOM exposed to OPIOID DRUG in Note


In [0]:

sql=f"select person_id,note_date,note_id,lower(note_text) as note_text from {note_table} where person_id in (select fact_id_1 from global_temp.mom_baby_step1)"

mom_notes= spark.sql(sql)
mom_notes.name="mom_notes"
register_parquet_global_view(mom_notes)

sql=f"select * from global_temp.mom_notes where {search_terms_str};"
notes_df= spark.sql(sql)
notes_df.name="opioid_notes"
register_parquet_global_view(notes_df)

In [0]:
sql="""
     select a.*, b.fact_id_1,b.fact_id_2,b.birth_datetime as baby_birth_datetime, 
     date_sub(birth_datetime, 30) as start_date, date_sub(birth_datetime, 2) as end_date from 
     
     (select * from global_temp.opioid_notes) a 
     inner join 
     global_temp.mom_baby_step1 b
     on a.person_id = b.fact_id_1
     where note_date >= date_sub(birth_datetime, 30)  and note_date <  date_sub(birth_datetime, 2);
    """

opioid_df=spark.sql(sql)
opioid_df.createOrReplaceTempView("opioid_df")

In [0]:
pandasDF =opioid_df.toPandas()
search_opioid_terms=update_notes(pandasDF)
search_opioid_terms.name="search_opioid_terms"
register_parquet_global_view(search_opioid_terms)

##### Search_opioid_terms_cleaned

In [0]:
sql="""
       select a.* from global_temp.search_opioid_terms a
       left join 
       (select note_id,person_id from global_temp.search_opioid_terms where matched_words !='') b
       on a.person_id = b.person_id and a.note_id = b.note_id
       where b.person_id is not null and b.note_id is not null;
    """

search_opioid_terms_cleaned= spark.sql(sql)
search_opioid_terms_cleaned.createOrReplaceTempView("search_opioid_terms_cleaned")

##### Just need the note with terms: 'medication list'

In [0]:
sql=f"""
      select a.*,v.person_source_value as mrn from  
      (select * from search_opioid_terms_cleaned where lower(note_text) like '%medication list%') a 
      inner join 
      {person_table} v
      on a.person_id = v.person_id;
    """
search_opioid_terms_cleaned_medicationlist= spark.sql(sql)
search_opioid_terms_cleaned_medicationlist.name = "search_opioid_terms_cleaned_medicationlist"
register_parquet_global_view(search_opioid_terms_cleaned_medicationlist)

In [0]:
sql="""
     select count(distinct person_id) as unique_mom, count(*) as total from global_temp.search_opioid_terms_cleaned_medicationlist;
    """
inspect_df= spark.sql(sql)
inspect_df.display()