## Final Assignment Overview: Working with Patient Records and Encounter Notes

In this final assignment, we’ll focus on patient records related to COVID-19 encounters. Our task is to analyze, process, and transform the data while applying the concepts we’ve covered throughout this course. Here's a detailed breakdown of the assignment:

What Are Encounter Notes?
An encounter note is a record that captures details about a patient’s visit with a doctor. It includes both structured and semi-structured information that is crucial for understanding the context of the visit. Here’s what an encounter note typically looks like:

```
AMBULATORY ENCOUNTER NOTE
Date of Service: March 2, 2020 15:45-16:30

DEMOGRAPHICS:
Name: Jeffrey Greenfelder
DOB: 1/16/2005
Gender: Male
Address: 428 Wiza Glen Unit 91, Springfield, Massachusetts 01104
Insurance: Guardian
MRN: 055ae6fc-7e18-4a39-8058-64082ca6d515

PERTINENT MEDICAL HISTORY:
- Obesity 

Recent Visit: Well child visit (2/23/2020)
Immunizations: Influenza vaccine (2/23/2020)

Recent Baseline (2/23/2020):
Height: 155.0 cm
Weight: 81.2 kg
BMI: 33.8 kg/m² (99.1th percentile)
BP: 123/80 mmHg
HR: 92/min
RR: 13/min

SUBJECTIVE:
Adolescent patient presents with multiple symptoms including:
- Cough
- Sore throat
- Severe fatigue
- Muscle pain
- Joint pain
- Fever
Never smoker. Symptoms began recently.

OBJECTIVE:
Vitals:
Temperature: 39.3°C (102.7°F)
Heart Rate: 131.1/min
Blood Pressure: 120/73 mmHg
Respiratory Rate: 27.6/min
O2 Saturation: 75.8% on room air
Weight: 81.2 kg

Laboratory/Testing:
Comprehensive Respiratory Panel:
- Influenza A RNA: Negative
- Influenza B RNA: Negative
- RSV RNA: Negative
- Parainfluenza virus 1,2,3 RNA: Negative
- Rhinovirus RNA: Negative
- Human metapneumovirus RNA: Negative
- Adenovirus DNA: Negative
- SARS-CoV-2 RNA: Positive

ASSESSMENT:
1. Suspected COVID-19 with severe symptoms
2. Severe hypoxemia requiring immediate intervention
3. Tachycardia (HR 131)
4. High-grade fever
5. Risk factors:
   - Obesity (BMI 33.8)
   - Adolescent age

PLAN:
1. Face mask provided for immediate oxygen support
2. Infectious disease care plan initiated
3. Close monitoring required due to:
   - Severe hypoxemia
   - Tachycardia
   - Age and obesity risk factors
4. Parent/patient education on:
   - Home isolation protocols
   - Warning signs requiring emergency care
   - Return precautions
5. Follow-up plan:
   - Daily monitoring during acute phase
   - Virtual check-ins as needed

Encounter Duration: 45 minutes
Encounter Type: Ambulatory
Provider: ID# e2c226c2-3e1e-3d0b-b997-ce9544c10528
Facility: 5103c940-0c08-392f-95cd-446e0cea042a
```


The enocuter contains

* General encounter information: 

  * When the encounter took place: Date and time of the visit.
  * Demographics: Patient’s age, gender, and unique medical record identifier.
  * Encounter details: The reason for the visit, diagnosis, and any associated costs.


* Semi-Structured Notes:

These notes mirror how doctors organize their thoughts and observations during an encounter. They generally follow a SOAP format:

* Subjective: The patient’s subjective description of their symptoms, feelings, and medical concerns.
* Objective: The doctor’s objective findings, including test results, measurements, or physical examination outcomes.
* Assessment: The doctor’s evaluation or diagnosis based on subjective and objective information.
* Plan: The proposed treatment plan, including medications, follow-ups, or other interventions.

While some encounter notes might include additional details, the majority conform to this semi-structured format, making them ideal for analysis and transformation.

* Goals for the Assignment

1. Transforming Encounter Notes:

Using an LLM to convert semi-structured encounter notes into a JSON format that organizes the information into structured fields. The JSON will include details such as demographics, encounter specifics, and the SOAP components of the note. Subsequently, you will need to transform the JSON data into a Parquet file, which is not only suitable for analysis in Spark but also ideal for storage later.
Here we will use the ML classificaition to assing the objective and assessment semi-structured fields into standardized, structured fields. The medical taxonomy for this task will be the one provided by the CDC, which defines standard codes for diagnoses, symptoms, procedures, and treatments. This step ensures the structured data aligns with domain-wide medical standards, making it interoperable and ready for deeper analysis.

The JSON format should capture the hierachies described in the structure below. 




2. Basic Analytics and Visualizations:
Using Apache Spark, perform comprehensive data analysis on the encounter data and create visualizations that reveal meaningful patterns. Your analysis must include:
- COVID-19 Case Demographics: Case breakdown by age ranges ([0-5], [6-10], [11-17], [18-30], [31-50], [51-70], [71+])
- Cumulative case count of Covid between the earliest case observed in the dataset and last case observed
- Symptoms for all COVID-19 patients versus patients that admitted into the intensive care unit due to COVID.
- Rank medications by frequency of prescription
- Analyze medication patterns across different demographic groups (e.g., top 3 per age group)
- Identify and plot co-morbidity information from the patient records (e.g., hypertension, obesity, prediabetes, etc.) provided in the dataset. 
- An independent group analysis: You need to develop and execute THREE original analyses that provide meaningful insights about COVID-19 patterns in this dataset. For each analysis:
  - Clearly state your analytical question/hypothesis
  - Justify why this analysis is valuable
  - Show your Spark code and methodology
  - Present results with appropriate visualizations


# JSON HIERARCHIES 
```
EncounterType:
    code
    description

Encounter:
    id
    date
    time
    type: EncounterType
    provider_id
    facility_id

Address:
    city
    state

Demographics:
    id
    name
    date_of_birth
    age
    gender
    address: Address
    insurance

Condition:
    code
    description

Medication:
    code
    description

Immunization:
    code
    description
    date: date

VitalMeasurement:
    code
    value: float
    unit

BloodPressure:
    systolic: VitalMeasurement
    diastolic: VitalMeasurement

CurrentVitals:
    temperature: VitalMeasurement
    heart_rate: VitalMeasurement
    blood_pressure: BloodPressure
    respiratory_rate: VitalMeasurement
    oxygen_saturation: VitalMeasurement
    weight: VitalMeasurement

BaselineVitals:
    date: date
    height: VitalMeasurement
    weight: VitalMeasurement
    bmi: VitalMeasurement
    bmi_percentile: VitalMeasurement

Vitals:
    current: CurrentVitals
    baseline: BaselineVitals

RespiratoryTest:
    code
    result

RespiratoryPanel:
    influenza_a: RespiratoryTest
    influenza_b: RespiratoryTest
    rsv: RespiratoryTest
    parainfluenza_1: RespiratoryTest
    parainfluenza_2: RespiratoryTest
    parainfluenza_3: RespiratoryTest
    rhinovirus: RespiratoryTest
    metapneumovirus: RespiratoryTest
    adenovirus: RespiratoryTest

Covid19Test:
    code
    description
    result

Laboratory:
    covid19: Covid19Test
    respiratory_panel: RespiratoryPanel

Procedure:
    code
    description
    date: date
    reasonCode
    reasonDescription

CarePlan:
    id
    code
    description
    start: date
    stop: date
    reasonCode
    reasonDescription

PatientRecord:
    encounter: Encounter
    demographics: Demographics
    conditions: List[Condition]
    medications: List[Medication]
    immunizations: List[Immunization]
    vitals: Vitals
    laboratory: Laboratory
    procedures: List[Procedure]




class Address(BaseModel):
    city: str = Field(description=" the city where the patient lives. Should be under DEMOGRAPHICS header")
    state: str

class Demographics(BaseModel):
    name: str
    date_of_birth: str
    age: int
    gender: str
    address: Address
    insurance: str

class Medication(BaseModel):
    code: str
    description: str

class PatientRecord(BaseModel):
    demographics: Demographics
    medications: List[Medication]

        
llm = ChatOpenAI(model="gpt-4o-mini")
structured_llm = llm.with_structured_output(PatientRecord)

structured_llm.invoke(PATIENT_NOTE)


In [0]:
!-m pip install --upgrade pip
!pip install -U sentence-transformers
!pip install faiss-cpu
!pip install PyMuPDF Pillow numpy
!pip install langchain
!pip install --upgrade pydantic
%restart_python
!pip install langchain_community
!pip install -U langchain-openai


/bin/bash: - : invalid option
Usage:	/bin/bash [GNU long option] [option] ...
	/bin/bash [GNU long option] [option] script-file ...
GNU long options:
	--debug
	--debugger
	--dump-po-strings
	--dump-strings
	--help
	--init-file
	--login
	--noediting
	--noprofile
	--norc
	--posix
	--pretty-print
	--rcfile
	--restricted
	--verbose
	--version
Shell options:
	-ilrsD or -c command or -O shopt_option		(invocation only)
	-abefhkmnptuvxBCHP or -o option
Collecting sentence-transformers
  Obtaining dependency information for sentence-transformers from https://files.pythonhosted.org/packages/8b/c8/990e22a465e4771338da434d799578865d6d7ef1fdb50bd844b7ecdcfa19/sentence_transformers-3.3.1-py3-none-any.whl.metadata
  Downloading sentence_transformers-3.3.1-py3-none-any.whl.metadata (10 kB)
Collecting transformers<5.0.0,>=4.41.0 (from sentence-transformers)
  Obtaining dependency information for transformers<5.0.0,>=4.41.0 from https://files.pythonhosted.org/packages/d0/a7/7eedcf

In [0]:
## Add all imports here
from pyspark.sql import SparkSession

from pyspark.sql.types import IntegerType, StructType, StructField, StringType, TimestampType

from pyspark.sql.functions import lower, col, min, desc, rank, date_trunc, trim, length, split, size, mean, stddev, min, max, countDistinct, datediff, log, month, year, concat_ws

from pyspark.sql.window import Window


```

dbfs:/FileStore/shared_uploads/ktorres2@hawaii.edu/055ae6fc_7e18_4a39_8058_64082ca6d515.txt
dbfs:/FileStore/shared_uploads/ktorres2@hawaii.edu/199c586f_af16_4091_9998_ee4cfc02ee7a.txt
# dbfs:/FileStore/shared_uploads/ktorres2@hawaii.edu/199c586f_af16_4091_9998_ee4cfc02ee7a_txt_
dbfs:/FileStore/shared_uploads/ktorres2@hawaii.edu/353016ea_a0ff_4154_85bb_1cf8b6cedf20.txt
dbfs:/FileStore/shared_uploads/ktorres2@hawaii.edu/ae9efba3_ddc4_43f9_a781_f72019388548.txt
dbfs:/FileStore/shared_uploads/ktorres2@hawaii.edu/28658715_b770_4576_9a81_fbb2282a98ea.txt
dbfs:/FileStore/shared_uploads/ktorres2@hawaii.edu/b9fd2dd8_181b_494b_ab15_e9f286d668d9.txt
dbfs:/FileStore/shared_uploads/ktorres2@hawaii.edu/d22592ac_552f_4ecd_a63d_7663d77ce9ba.txt
dbfs:/FileStore/shared_uploads/ktorres2@hawaii.edu/f0f3bc8d_ef38_49ce_a2bd_dfdda982b271.txt
dbfs:/FileStore/shared_uploads/ktorres2@hawaii.edu/df6b563d_1ff4_4833_9af8_84431e641e9c.txt
dbfs:/FileStore/shared_uploads/ktorres2@hawaii.edu/f73d6f41_0091_4485_8b43_9d38eb98fb36.txt

```

In [0]:
# File location and type
file_location = "/FileStore/tables/medications_assignment_1-1.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
medications_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(medications_df)

CODE,DESCRIPTION
1000126,1 ML medroxyPROGESTERone acetate 150 MG/ML Injection
1000126,1 ML medroxyprogesterone acetate 150 MG/ML Injection
1014676,cetirizine hydrochloride 5 MG Oral Tablet
1014678,cetirizine hydrochloride 10 MG Oral Tablet
1043400,Acetaminophen 21.7 MG/ML / Dextromethorphan Hydrobromide 1 MG/ML / doxylamine succinate 0.417 MG/ML Oral Solution
1049221,Acetaminophen 325 MG / Oxycodone Hydrochloride 5 MG Oral Tablet
1049221,Acetaminophen 325 MG / oxyCODONE Hydrochloride 5 MG Oral Tablet
1049630,diphenhydrAMINE Hydrochloride 25 MG Oral Tablet
1049635,Acetaminophen 325 MG / oxyCODONE Hydrochloride 2.5 MG Oral Tablet
105078,Penicillin G 375 MG/ML Injectable Solution


In [0]:
spark = SparkSession.builder.appName("medical notes").getOrCreate()

print("df_encounter_types")
encounters_type_schema = StructType([
    StructField("CODE", StringType(), True),
    StructField("DESCRIPTION", StringType(), True)
])
df_encounter_types = spark.read.csv("/FileStore/tables/encounters_types_assignment_1.csv",
                    schema=encounters_type_schema, 
                    header=True)
df_encounter_types.show(2)

print("df_immunizations")
immunizations_schema = StructType([
    StructField("CODE", IntegerType(), True),
    StructField("DESCRIPTION", StringType(), True)
])
df_immunizations = spark.read.csv("/FileStore/tables/immunizations_assignment_1.csv",
                    immunizations_schema, 
                    header=True)
df_immunizations.show(2)

print("df_medications")
medications_schema = StructType([
    StructField("CODE", IntegerType(), True),
    StructField("DESCRIPTION", StringType(), True)
])
df_medications = spark.read.csv("/FileStore/tables/medications_assignment_1.csv",
                    schema=medications_schema, 
                    header=True)
df_medications.show(2)


print("df_observation")
observations_schema = StructType([
    StructField("CODE", StringType(), True),
    StructField("DESCRIPTION", StringType(), True)
])
df_observations = spark.read.csv("/FileStore/tables/observations_assignment_1.csv",
                    schema=observations_schema, 
                    header=False)
df_observations.show(2)


print("df_encounters")
encounters_schema = StructType([
    StructField("PATIENT", StringType(), True),
    StructField("START", TimestampType(), True),
    StructField("CODE", StringType(), True)
])
df_encounters = spark.read.csv("/FileStore/tables/encounters_assignment_1.csv",
                    schema=encounters_schema, 
                    header=True)
df_encounters.show(2)


df_encounter_types
+---------+--------------------+
|     CODE|         DESCRIPTION|
+---------+--------------------+
|  1505002|Hospital admissio...|
|162673000|General examinati...|
+---------+--------------------+
only showing top 2 rows

df_immunizations
+----+--------------------+
|CODE|         DESCRIPTION|
+----+--------------------+
|  10|                 IPV|
| 113|Td (adult) preser...|
+----+--------------------+
only showing top 2 rows

df_medications
+-------+--------------------+
|   CODE|         DESCRIPTION|
+-------+--------------------+
|1000126|1 ML medroxyPROGE...|
|1000126|1 ML medroxyproge...|
+-------+--------------------+
only showing top 2 rows

df_observation
+-------+--------------------+
|   CODE|         DESCRIPTION|
+-------+--------------------+
|10230-1|Left ventricular ...|
|10480-2|Estrogen+Progeste...|
+-------+--------------------+
only showing top 2 rows

df_encounters
+--------------------+-------------------+---------+
|             PATIENT|       

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import json

spark = SparkSession.builder.appName("medical_notes").getOrCreate()

#convert dataframes into dicts

immunizations_map = {row['DESCRIPTION']: row['CODE'] for row in df_immunizations.collect()}

medications_map = {row['DESCRIPTION'].lower(): row['CODE'] for row in df_medications.collect()}

observations_map = {row['DESCRIPTION'].lower(): row['CODE'] for row in df_observations.collect()}

encounter_types_map = {row['DESCRIPTION'].lower(): row['CODE'] for row in df_encounter_types.collect()}


In [0]:
%env OPENAI_API_KEY=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


env: OPENAI_API_KEY=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


In [0]:
# if all_records:
#     rdd = spark.sparkContext.parallelize([json.dumps(rec) for rec in all_records])
#     df = spark.read.json(rdd)
#     df.show(truncate=False)
#     df.printSchema()

#     # Write to Parquet
#     output_path = "dbfs:/FileStore/shared_uploads/ktorres2@hawaii.edu/patient_records.parquet"
#     df.write.mode("overwrite").parquet(output_path)
#     print(f"Parquet file written to {output_path}")
# else:
#     print("No valid records were produced.")


#Faiss implementation

In [0]:
!pip install faiss-cpu
!pip install sentence_transformers

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
## Add all imports here
# from pyspark.sql.types import *
# from pyspark.sql import functions as F
# from pyspark.sql.window import Window
# from pyspark.ml.functions import vector_to_array
# from pyspark.ml.feature import VectorAssembler
# from pyspark.ml.feature import MinMaxScaler
import numpy as np
import pandas as pd
import faiss
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, DoubleType
from sentence_transformers import SentenceTransformer

In [0]:
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

@F.pandas_udf(returnType=ArrayType(DoubleType()))
def encode(x: pd.Series) -> pd.Series:
    return pd.Series(model.encode(x).tolist())

# apply udf and show 
medications_df_embedding = df_medications.withColumn("embedding", encode("DESCRIPTION"))
medications_df_embedding.show()

medications_pd = medications_df_embedding.toPandas()

series_vectors = np.array(medications_pd['embedding'].tolist()).astype('float32')
index = faiss.IndexFlatIP(384)
embeddings = np.array(series_vectors, dtype=np.float32)
faiss.normalize_L2(embeddings)
index.add(embeddings)

+-------+--------------------+--------------------+
|   CODE|         DESCRIPTION|           embedding|
+-------+--------------------+--------------------+
|1000126|1 ML medroxyPROGE...|[0.01742400042712...|
|1000126|1 ML medroxyproge...|[0.01742400042712...|
|1014676|cetirizine hydroc...|[-0.0386663228273...|
|1014678|cetirizine hydroc...|[-0.0455816313624...|
|1043400|Acetaminophen 21....|[0.04618706926703...|
|1049221|Acetaminophen 325...|[-5.4820568766444...|
|1049221|Acetaminophen 325...|[-5.4820568766444...|
|1049630|diphenhydrAMINE H...|[-0.0313987918198...|
|1049635|Acetaminophen 325...|[0.00547732040286...|
| 105078|Penicillin G 375 ...|[-0.0210110694169...|
| 105585|Methotrexate 2.5 ...|[0.03505192324519...|
| 106258|Hydrocortisone 10...|[0.02938731014728...|
| 106892|insulin human  is...|[-0.0179619360715...|
|1091392|Methylphenidate H...|[0.02765876241028...|
|1094107|Phenazopyridine h...|[-0.0226141549646...|
|1100184|Donepezil hydroch...|[-0.0243072416633...|
|1114085|100

In [0]:
def retrieve_faiss_similarity_lists(medication_desc):
  search_vector = model.encode(medication_desc)
  _vector = np.array([search_vector])
  faiss.normalize_L2(_vector)
  k = index.ntotal
  return index.search(_vector, k=k)

# Create FAISS Med mapping

In [0]:
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

@F.pandas_udf(returnType=ArrayType(DoubleType()))
def encode(x: pd.Series) -> pd.Series:
    return pd.Series(model.encode(x).tolist())

# apply udf and show 
medications_df_embedding = df_medications.withColumn("embedding", encode("DESCRIPTION"))
medications_df_embedding.show()

+-------+--------------------+--------------------+
|   CODE|         DESCRIPTION|           embedding|
+-------+--------------------+--------------------+
|1000126|1 ML medroxyPROGE...|[0.01742400042712...|
|1000126|1 ML medroxyproge...|[0.01742400042712...|
|1014676|cetirizine hydroc...|[-0.0386663228273...|
|1014678|cetirizine hydroc...|[-0.0455816313624...|
|1043400|Acetaminophen 21....|[0.04618706926703...|
|1049221|Acetaminophen 325...|[-5.4820568766444...|
|1049221|Acetaminophen 325...|[-5.4820568766444...|
|1049630|diphenhydrAMINE H...|[-0.0313987918198...|
|1049635|Acetaminophen 325...|[0.00547732040286...|
| 105078|Penicillin G 375 ...|[-0.0210110694169...|
| 105585|Methotrexate 2.5 ...|[0.03505192324519...|
| 106258|Hydrocortisone 10...|[0.02938731014728...|
| 106892|insulin human  is...|[-0.0179619360715...|
|1091392|Methylphenidate H...|[0.02765876241028...|
|1094107|Phenazopyridine h...|[-0.0226141549646...|
|1100184|Donepezil hydroch...|[-0.0243072416633...|
|1114085|100

In [0]:
medications_pd = medications_df_embedding.toPandas()

series_vectors = np.array(medications_pd['embedding'].tolist()).astype('float32')
index = faiss.IndexFlatIP(384)
embeddings = np.array(series_vectors, dtype=np.float32)
faiss.normalize_L2(embeddings)
index.add(embeddings)

# Combine

In [0]:
import json
import tempfile
from typing import List, Optional
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
from pyspark.sql import SparkSession
from datetime import date

########################################
# Define the PatientRecord Schema
########################################

class EncounterType(BaseModel):
    code: str
    description: str

class Encounter(BaseModel):
    date: str
    time: str
    provider_id: str
    facility_id: str

class Address(BaseModel):
    city: str
    state: str

class Demographics(BaseModel):
    name: str
    date_of_birth: str
    age: int
    gender: str
    address: Address
    insurance: str

class Condition(BaseModel):
    code: str
    description: str

class Medication(BaseModel):
    code: str
    description: str

class Immunization(BaseModel):
    code: str
    description: str
    date: str

class VitalMeasurement(BaseModel):
    code: str
    value: float
    unit: str

class BloodPressure(BaseModel):
    systolic: Optional[VitalMeasurement]
    diastolic: Optional[VitalMeasurement]

class CurrentVitals(BaseModel):
    temperature: Optional[VitalMeasurement] = None
    heart_rate: Optional[VitalMeasurement] = None
    blood_pressure: Optional[BloodPressure] = None
    respiratory_rate: Optional[VitalMeasurement] = None
    oxygen_saturation: Optional[VitalMeasurement] = None
    weight: Optional[VitalMeasurement] = None

class BaselineVitals(BaseModel):
    date: str
    height: Optional[VitalMeasurement] = None
    weight: Optional[VitalMeasurement] = None
    bmi: Optional[VitalMeasurement] = None
    bmi_percentile: Optional[VitalMeasurement] = None

class Vitals(BaseModel):
    current: Optional[CurrentVitals]
    baseline: Optional[BaselineVitals]

class RespiratoryTest(BaseModel):
    code: str
    result: str

class RespiratoryPanel(BaseModel):
    influenza_a: Optional[RespiratoryTest] = None
    influenza_b: Optional[RespiratoryTest] = None
    rsv: Optional[RespiratoryTest] = None
    parainfluenza_1: Optional[RespiratoryTest] = None
    parainfluenza_2: Optional[RespiratoryTest] = None
    parainfluenza_3: Optional[RespiratoryTest] = None
    rhinovirus: Optional[RespiratoryTest] = None
    metapneumovirus: Optional[RespiratoryTest] = None
    adenovirus: Optional[RespiratoryTest] = None

class Covid19Test(BaseModel):
    code: str
    description: str
    result: str

class Laboratory(BaseModel):
    covid19: Optional[Covid19Test] = None
    respiratory_panel: Optional[RespiratoryPanel] = None

class Procedure(BaseModel):
    code: str
    description: str
    date: str
    reasonCode: str
    reasonDescription: str

class CarePlan(BaseModel):
    code: str
    description: str
    start: str
    stop: str
    reasonCode: str
    reasonDescription: str

class PatientRecord(BaseModel):
    demographics: Demographics
    encounter: Optional[Encounter] = None
    conditions: Optional[List[Condition]] = None
    medications: Optional[List[Medication]] = None
    immunizations: Optional[List[Immunization]] = None
    vitals: Optional[Vitals] = None
    laboratory: Optional[Laboratory] = None
    procedures: Optional[List[Procedure]] = None

########################################
# Paths to Encounter Notes
########################################

note_files = [
    "/FileStore/tables/055ae6fc_7e18_4a39_8058_64082ca6d515.txt", 
    "/FileStore/tables/199c586f_af16_4091_9998_ee4cfc02ee7a.txt",
    "/FileStore/tables/353016ea_a0ff_4154_85bb_1cf8b6cedf20.txt",
    "/FileStore/tables/ae9efba3_ddc4_43f9_a781_f72019388548.txt",
    "/FileStore/tables/28658715_b770_4576_9a81_fbb2282a98ea.txt",
    "/FileStore/tables/b9fd2dd8_181b_494b_ab15_e9f286d668d9.txt",
    "/FileStore/tables/d22592ac_552f_4ecd_a63d_7663d77ce9ba.txt",
    "/FileStore/tables/f0f3bc8d_ef38_49ce_a2bd_dfdda982b271.txt",
    "/FileStore/tables/df6b563d_1ff4_4833_9af8_84431e641e9c.txt",
    "/FileStore/tables/f73d6f41_0091_4485_8b43_9d38eb98fb36.txt"
]

########################################
# Use LLM to Extract Structured Data
########################################

spark = SparkSession.builder.appName("patient_record_conversion").getOrCreate()

llm = ChatOpenAI(model="gpt-4o", temperature=0)

all_records = []

print("Starting processing of patient records...")

for i, file_path in enumerate(note_files, start=1):
    print(f"Processing file {i}/{len(note_files)}: {file_path}")

    # Copy file from DBFS to local tmp
    print("Copying file from DBFS to local /tmp/ directory...")
    dbutils.fs.cp(file_path, file_path.replace("/FileStore/tables/", "file:/tmp/"))
    file_location = file_path.replace("/FileStore/tables/", "/tmp/")
    local_path = file_location

    print("Reading the encounter note file...")
    with open(local_path, "r") as f:
        note_text = f.read()

    print("Preparing the prompt for the LLM...")
    prompt = f"""
    You are a helpful medical data assistant. Given the patient note below, extract the relevant data and produce a JSON response that matches this Pydantic schema:

    {json.dumps(PatientRecord.model_json_schema(), indent=2)}

    Fill in as much as possible based on the note. Use 'Not Available' if not found. Please output only valid JSON and nothing else. Do not include any triple backticks or code fences. Return only raw JSON starting and ending with '{' and '}'.


    Patient note:
    {note_text}
    """

    print("Invoking LLM for extraction...")
    response = llm.invoke(prompt)

    print("LLM response received. Printing response content for debugging:")
    print(response.content)

    # Validate response
    print("Validating JSON response against schema...")
    try:
        json_str = response.content.strip()
        # Remove code fences if any remain
        if json_str.startswith("```"):
            json_str = json_str.strip("```").strip()

        # converts to json
        data = json.loads(json_str)
        patient_record = PatientRecord(**data)

        # Apply code mapping
        # Map medications
        if patient_record.medications:
            print('PATIENT MED')
            print(patient_record.medications)
            for med in patient_record.medications:
                distances, ann = retrieve_faiss_similarity_lists(med.description.strip().lower())
                results = pd.DataFrame({'distances': distances[0], 'ann': ann[0]})
                merge = pd.merge(results, medications_pd, left_on='ann', right_index=True)
                print("CODE")
                print(merge.iloc[0].CODE)
                most_similar_code = merge.head(1)
                med.code = str(merge.iloc[0].CODE)

        print(patient_record)
        all_records.append(patient_record.model_dump())
        print("Validation successful. Record appended.")
    except Exception as e:
        print(f"Validation error for {file_path}: {e}")
        continue

# print("All files processed. Preparing to create DataFrame...")

if all_records:
    rdd = spark.sparkContext.parallelize([json.dumps(rec) for rec in all_records])
    df = spark.read.json(rdd)

    # print("Showing DataFrame preview:")
    df.show(truncate=False)
    df.printSchema()

    # Uncomment if you want to write the DataFrame to Parquet
    # output_path = "dbfs:/FileStore/shared_uploads/ktorres2@hawaii.edu/patient_records.parquet"
    # print("Writing to Parquet...")
    # df.write.mode("overwrite").parquet(output_path)
    # print(f"Parquet file written to {output_path}")
else:
    print("No valid records were produced.")

print("Processing complete.")


Starting processing of patient records...
Invoking LLM for extraction...
Validating JSON response against schema...
demographics=Demographics(name='Jeffrey Greenfelder', date_of_birth='2005-01-16', age=15, gender='Male', address=Address(city='Springfield', state='Massachusetts'), insurance='Guardian') encounter=Encounter(date='2020-03-02', time='15:45', provider_id='e2c226c2-3e1e-3d0b-b997-ce9544c10528', facility_id='5103c940-0c08-392f-95cd-446e0cea042a') conditions=[Condition(code='Not Available', description='Obesity'), Condition(code='Not Available', description='Suspected COVID-19 with severe symptoms'), Condition(code='Not Available', description='Severe hypoxemia'), Condition(code='Not Available', description='Tachycardia'), Condition(code='Not Available', description='High-grade fever')] medications=None immunizations=[Immunization(code='Not Available', description='Influenza vaccine', date='2020-02-23')] vitals=Vitals(current=CurrentVitals(temperature=VitalMeasurement(code='Not

In [0]:
if all_records:
    rdd = spark.sparkContext.parallelize([json.dumps(rec) for rec in all_records])
    df = spark.read.json(rdd)

    # print("Showing DataFrame preview:")
    df.show(truncate=False)
    df.printSchema()

    # Uncomment if you want to write the DataFrame to Parquet
    output_path = "dbfs:/FileStore/shared_uploads/bdd@hawaii.edu/patient_records.parquet"
    print("Writing to Parquet...")
    df.write.mode("overwrite").parquet(output_path)
    print(f"Parquet file written to {output_path}")
else:
    print("No valid records were produced.")

print("Processing complete.")

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
files = dbutils.fs.ls("dbfs:/FileStore/shared_uploads/bdd@hawaii.edu/patient_records.parquet/")

# Download all files from the directory
for file in files:
    print(file.path)  # List all file paths to verify contents

dbfs:/FileStore/my-stuff/patient_records.parquet/_SUCCESS
dbfs:/FileStore/my-stuff/patient_records.parquet/_committed_378025825616281375
dbfs:/FileStore/my-stuff/patient_records.parquet/_committed_6191849953683112650
dbfs:/FileStore/my-stuff/patient_records.parquet/_started_244099851904688369
dbfs:/FileStore/my-stuff/patient_records.parquet/_started_378025825616281375
dbfs:/FileStore/my-stuff/patient_records.parquet/_started_4774919951855267738
dbfs:/FileStore/my-stuff/patient_records.parquet/_started_6191849953683112650
dbfs:/FileStore/my-stuff/patient_records.parquet/part-00000-tid-6191849953683112650-98628188-4aa5-4dad-96a0-d8bf38c59c14-245-1-c000.snappy.parquet
dbfs:/FileStore/my-stuff/patient_records.parquet/part-00001-tid-6191849953683112650-98628188-4aa5-4dad-96a0-d8bf38c59c14-246-1-c000.snappy.parquet
dbfs:/FileStore/my-stuff/patient_records.parquet/part-00002-tid-6191849953683112650-98628188-4aa5-4dad-96a0-d8bf38c59c14-247-1-c000.snappy.parquet
dbfs:/FileStore/my-stuff/patient