## Final Assignment Overview: Working with Patient Records and Encounter Notes

In this final assignment, we’ll focus on patient records related to COVID-19 encounters. Our task is to analyze, process, and transform the data while applying the concepts we’ve covered throughout this course. Here's a detailed breakdown of the assignment:

What Are Encounter Notes?
An encounter note is a record that captures details about a patient’s visit with a doctor. It includes both structured and semi-structured information that is crucial for understanding the context of the visit. Here’s what an encounter note typically looks like:

```
AMBULATORY ENCOUNTER NOTE
Date of Service: March 2, 2020 15:45-16:30

DEMOGRAPHICS:
Name: Jeffrey Greenfelder
DOB: 1/16/2005
Gender: Male
Address: 428 Wiza Glen Unit 91, Springfield, Massachusetts 01104
Insurance: Guardian
MRN: 055ae6fc-7e18-4a39-8058-64082ca6d515

PERTINENT MEDICAL HISTORY:
- Obesity 

Recent Visit: Well child visit (2/23/2020)
Immunizations: Influenza vaccine (2/23/2020)

Recent Baseline (2/23/2020):
Height: 155.0 cm
Weight: 81.2 kg
BMI: 33.8 kg/m² (99.1th percentile)
BP: 123/80 mmHg
HR: 92/min
RR: 13/min

SUBJECTIVE:
Adolescent patient presents with multiple symptoms including:
- Cough
- Sore throat
- Severe fatigue
- Muscle pain
- Joint pain
- Fever
Never smoker. Symptoms began recently.

OBJECTIVE:
Vitals:
Temperature: 39.3°C (102.7°F)
Heart Rate: 131.1/min
Blood Pressure: 120/73 mmHg
Respiratory Rate: 27.6/min
O2 Saturation: 75.8% on room air
Weight: 81.2 kg

Laboratory/Testing:
Comprehensive Respiratory Panel:
- Influenza A RNA: Negative
- Influenza B RNA: Negative
- RSV RNA: Negative
- Parainfluenza virus 1,2,3 RNA: Negative
- Rhinovirus RNA: Negative
- Human metapneumovirus RNA: Negative
- Adenovirus DNA: Negative
- SARS-CoV-2 RNA: Positive

ASSESSMENT:
1. Suspected COVID-19 with severe symptoms
2. Severe hypoxemia requiring immediate intervention
3. Tachycardia (HR 131)
4. High-grade fever
5. Risk factors:
   - Obesity (BMI 33.8)
   - Adolescent age

PLAN:
1. Face mask provided for immediate oxygen support
2. Infectious disease care plan initiated
3. Close monitoring required due to:
   - Severe hypoxemia
   - Tachycardia
   - Age and obesity risk factors
4. Parent/patient education on:
   - Home isolation protocols
   - Warning signs requiring emergency care
   - Return precautions
5. Follow-up plan:
   - Daily monitoring during acute phase
   - Virtual check-ins as needed

Encounter Duration: 45 minutes
Encounter Type: Ambulatory
Provider: ID# e2c226c2-3e1e-3d0b-b997-ce9544c10528
Facility: 5103c940-0c08-392f-95cd-446e0cea042a
```


The enocuter contains

* General encounter information: 

  * When the encounter took place: Date and time of the visit.
  * Demographics: Patient’s age, gender, and unique medical record identifier.
  * Encounter details: The reason for the visit, diagnosis, and any associated costs.


* Semi-Structured Notes:

These notes mirror how doctors organize their thoughts and observations during an encounter. They generally follow a SOAP format:

* Subjective: The patient’s subjective description of their symptoms, feelings, and medical concerns.
* Objective: The doctor’s objective findings, including test results, measurements, or physical examination outcomes.
* Assessment: The doctor’s evaluation or diagnosis based on subjective and objective information.
* Plan: The proposed treatment plan, including medications, follow-ups, or other interventions.

While some encounter notes might include additional details, the majority conform to this semi-structured format, making them ideal for analysis and transformation.

* Goals for the Assignment

1. Transforming Encounter Notes:

Using an LLM to convert semi-structured encounter notes into a JSON format that organizes the information into structured fields. The JSON will include details such as demographics, encounter specifics, and the SOAP components of the note. Subsequently, you will need to transform the JSON data into a Parquet file, which is not only suitable for analysis in Spark but also ideal for storage later.
Here we will use the ML classificaition to assing the objective and assessment semi-structured fields into standardized, structured fields. The medical taxonomy for this task will be the one provided by the CDC, which defines standard codes for diagnoses, symptoms, procedures, and treatments. This step ensures the structured data aligns with domain-wide medical standards, making it interoperable and ready for deeper analysis.

The JSON format should capture the hierachies described in the structure below. 




2. Basic Analytics and Visualizations:
Using Apache Spark, perform comprehensive data analysis on the encounter data and create visualizations that reveal meaningful patterns. Your analysis must include:
- COVID-19 Case Demographics: Case breakdown by age ranges ([0-5], [6-10], [11-17], [18-30], [31-50], [51-70], [71+])
- Cumulative case count of Covid between the earliest case observed in the dataset and last case observed
- Symptoms for all COVID-19 patients versus patients that admitted into the intensive care unit due to COVID.
- Rank medications by frequency of prescription
- Analyze medication patterns across different demographic groups (e.g., top 3 per age group)
- Identify and plot co-morbidity information from the patient records (e.g., hypertension, obesity, prediabetes, etc.) provided in the dataset. 
- An independent group analysis: You need to develop and execute THREE original analyses that provide meaningful insights about COVID-19 patterns in this dataset. For each analysis:
  - Clearly state your analytical question/hypothesis
  - Justify why this analysis is valuable
  - Show your Spark code and methodology
  - Present results with appropriate visualizations


In [0]:
EncounterType:
    code
    description

Encounter:
    id
    date
    time
    type: EncounterType
    provider_id
    facility_id

Address:
    city
    state

Demographics:
    id
    name
    date_of_birth
    age
    gender
    address: Address
    insurance

Condition:
    code
    description

Medication:
    code
    description

Immunization:
    code
    description
    date: date

VitalMeasurement:
    code
    value: float
    unit

BloodPressure:
    systolic: VitalMeasurement
    diastolic: VitalMeasurement

CurrentVitals:
    temperature: VitalMeasurement
    heart_rate: VitalMeasurement
    blood_pressure: BloodPressure
    respiratory_rate: VitalMeasurement
    oxygen_saturation: VitalMeasurement
    weight: VitalMeasurement

BaselineVitals:
    date: date
    height: VitalMeasurement
    weight: VitalMeasurement
    bmi: VitalMeasurement
    bmi_percentile: VitalMeasurement

Vitals:
    current: CurrentVitals
    baseline: BaselineVitals

RespiratoryTest:
    code
    result

RespiratoryPanel:
    influenza_a: RespiratoryTest
    influenza_b: RespiratoryTest
    rsv: RespiratoryTest
    parainfluenza_1: RespiratoryTest
    parainfluenza_2: RespiratoryTest
    parainfluenza_3: RespiratoryTest
    rhinovirus: RespiratoryTest
    metapneumovirus: RespiratoryTest
    adenovirus: RespiratoryTest

Covid19Test:
    code
    description
    result

Laboratory:
    covid19: Covid19Test
    respiratory_panel: RespiratoryPanel

Procedure:
    code
    description
    date: date
    reasonCode
    reasonDescription

CarePlan:
    id
    code
    description
    start: date
    stop: date
    reasonCode
    reasonDescription

PatientRecord:
    encounter: Encounter
    demographics: Demographics
    conditions: List[Condition]
    medications: List[Medication]
    immunizations: List[Immunization]
    vitals: Vitals
    laboratory: Laboratory
    procedures: List[Procedure]


In [0]:
#!pip install -U sentence-transformers
#!pip install faiss-cpu
#!pip install PyMuPDF Pillow numpy
#!pip install --upgrade typing_extensions
#!pip install sentence-transformers
#!pip install langchain
!pip install langchain_OpenAI
!pip install OpanAI
dbutils.library.restartPython()

Collecting langchain_OpenAI
  Downloading langchain_openai-0.2.12-py3-none-any.whl (50 kB)
[?25l[K     |██████▌                         | 10 kB 21.4 MB/s eta 0:00:01[K     |█████████████                   | 20 kB 20.8 MB/s eta 0:00:01[K     |███████████████████▍            | 30 kB 12.1 MB/s eta 0:00:01[K     |█████████████████████████▉      | 40 kB 5.5 MB/s eta 0:00:01[K     |████████████████████████████████| 50 kB 3.9 MB/s 
Collecting openai<2.0.0,>=1.55.3
  Downloading openai-1.58.1-py3-none-any.whl (454 kB)
[?25l[K     |▊                               | 10 kB 35.7 MB/s eta 0:00:01[K     |█▍                              | 20 kB 44.5 MB/s eta 0:00:01[K     |██▏                             | 30 kB 54.9 MB/s eta 0:00:01[K     |██▉                             | 40 kB 63.6 MB/s eta 0:00:01[K     |███▋                            | 51 kB 8.6 MB/s eta 0:00:01[K     |████▎                           | 61 kB 10.2 MB/s eta 0:00:01[K     |█████                      

1. Transforming Encounter Notes:

Using an LLM to convert semi-structured encounter notes into a JSON format that organizes the information into structured fields. The JSON will include details such as demographics, encounter specifics, and the SOAP components of the note. Subsequently, you will need to transform the JSON data into a Parquet file, which is not only suitable for analysis in Spark but also ideal for storage later. Here we will use the ML classificaition to assing the objective and assessment semi-structured fields into standardized, structured fields. The medical taxonomy for this task will be the one provided by the CDC, which defines standard codes for diagnoses, symptoms, procedures, and treatments. This step ensures the structured data aligns with domain-wide medical standards, making it interoperable and ready for deeper analysis.

In [0]:
import re
import json
import pandas as pd
from pyspark.sql import SparkSession
from sentence_transformers import SentenceTransformer
import numpy as np
import faiss
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from typing import List, Optional, Dict, Any
from langchain.prompts import PromptTemplate
from langchain.output_parsers import PydanticOutputParser

spark = SparkSession.builder.appName("EncounterProcessing").getOrCreate()

file_paths = [
    "/FileStore/tables/055ae6fc_7e18_4a39_8058_64082ca6d515.txt",
    "/FileStore/tables/199c586f_af16_4091_9998_ee4cfc02ee7a.txt",
    "/FileStore/tables/199c586f_af16_4091_9998_ee4cfc02ee7a_txt_",
    "/FileStore/tables/353016ea_a0ff_4154_85bb_1cf8b6cedf20.txt",
    "/FileStore/tables/28658715_b770_4576_9a81_fbb2282a98ea.txt",
    "/FileStore/tables/ae9efba3_ddc4_43f9_a781_f72019388548.txt",
    "/FileStore/tables/b9fd2dd8_181b_494b_ab15_e9f286d668d9.txt",
    "/FileStore/tables/df6b563d_1ff4_4833_9af8_84431e641e9c.txt",
    "/FileStore/tables/d22592ac_552f_4ecd_a63d_7663d77ce9ba.txt",
    "/FileStore/tables/f0f3bc8d_ef38_49ce_a2bd_dfdda982b271.txt",
    "/FileStore/tables/f73d6f41_0091_4485_8b43_9d38eb98fb36.txt"
]

class EncounterTypeModel(BaseModel):
    code: Optional[str] = None
    description: Optional[str] = None

class EncounterModel(BaseModel):
    id: Optional[str] = None
    date: Optional[str] = None
    time: Optional[str] = None
    type: EncounterTypeModel = EncounterTypeModel()
    provider_id: Optional[str] = None
    facility_id: Optional[str] = None

class AddressModel(BaseModel):
    city: Optional[str] = None
    state: Optional[str] = None

class DemographicsModel(BaseModel):
    id: Optional[str] = None
    name: Optional[str] = None
    date_of_birth: Optional[str] = None
    age: Optional[int] = None
    gender: Optional[str] = None
    address: AddressModel = AddressModel()
    insurance: Optional[str] = None

class ConditionModel(BaseModel):
    code: Optional[str] = None
    description: Optional[str] = None

class MedicationModel(BaseModel):
    code: Optional[str] = None
    description: Optional[str] = None

class ImmunizationModel(BaseModel):
    code: Optional[str] = None
    description: Optional[str] = None
    date: Optional[str] = None

class VitalMeasurementModel(BaseModel):
    code: Optional[str] = None
    value: Optional[float] = None
    unit: Optional[str] = None

class BloodPressureModel(BaseModel):
    systolic: VitalMeasurementModel = VitalMeasurementModel()
    dystolic: VitalMeasurementModel = VitalMeasurementModel()

class CurrentVitalsModel(BaseModel):
    temperature: VitalMeasurementModel = VitalMeasurementModel()
    heart_rate: VitalMeasurementModel = VitalMeasurementModel()
    blood_pressure: BloodPressureModel = BloodPressureModel()
    respiratory_rate: VitalMeasurementModel = VitalMeasurementModel()
    oxygen_saturation: VitalMeasurementModel = VitalMeasurementModel()
    weight: VitalMeasurementModel = VitalMeasurementModel()

class BaselineVitalsModel(BaseModel):
    date: Optional[str] = None
    height: VitalMeasurementModel = VitalMeasurementModel()
    weight: VitalMeasurementModel = VitalMeasurementModel()
    bmi: VitalMeasurementModel = VitalMeasurementModel()
    bmi_percentile: VitalMeasurementModel = VitalMeasurementModel()

class VitalsModel(BaseModel):
    current: CurrentVitalsModel = CurrentVitalsModel()
    baseline: BaselineVitalsModel = BaselineVitalsModel()

class RespiratoryPanelResult(BaseModel):
    code: Optional[str] = None
    result: Optional[bool] = None

class Covid19TestModel(BaseModel):
    code: Optional[str] = None
    description: Optional[str] = None
    result: Optional[bool] = None

class RespiratoryPanelModel(BaseModel):
    influenza_a: RespiratoryPanelResult = RespiratoryPanelResult()
    influenza_b: RespiratoryPanelResult = RespiratoryPanelResult()
    rsv: RespiratoryPanelResult = RespiratoryPanelResult()
    parainfluenza_1: RespiratoryPanelResult = RespiratoryPanelResult()
    parainfluenza_2: RespiratoryPanelResult = RespiratoryPanelResult()
    parainfluenza_3: RespiratoryPanelResult = RespiratoryPanelResult()
    rhinovirus: RespiratoryPanelResult = RespiratoryPanelResult()
    metapneumovirus: RespiratoryPanelResult = RespiratoryPanelResult()
    adenovirus: RespiratoryPanelResult = RespiratoryPanelResult()

class LaboratoryModel(BaseModel):
    covid19: Covid19TestModel = Covid19TestModel()
    respiratory_panel: RespiratoryPanelModel = RespiratoryPanelModel()

class ProcedureModel(BaseModel):
    code: Optional[str] = None
    description: Optional[str] = None
    date: Optional[str] = None
    reasonCode: Optional[str] = None
    reasonDescription: Optional[str] = None

class PatientRecordModel(BaseModel):
    encounter: EncounterModel = EncounterModel()
    demographics: DemographicsModel = DemographicsModel()
    conditions: List[ConditionModel] = []
    medications: List[MedicationModel] = []
    immunizations: List[ImmunizationModel] = []
    vitals: VitalsModel = VitalsModel()
    laboratory: LaboratoryModel = LaboratoryModel()
    procedures: List[ProcedureModel] = []

# Main parsing function
def parse_encounters(note, noteName):
    patient_record_parser = PydanticOutputParser(pydantic_object=PatientRecordModel)
    #print(person_idetail_parser.get_format_instructions())

    prompt_prefix = """
    You are a medical information extractor. Given the medical note below, parse the following details and return them in the specified JSON format.
    Extract if available:
    - Encounter: id, date, time, type code/description, provider_id, facility_id
    - Demographics: id, name, date_of_birth, age, gender, address (city/state), insurance
    - Conditions: a list of conditions with code and description
    - Medications: a list with name, dosage, frequency
    - Immunizations: a list with name and date
    - Vitals: current vitals (temperature, heart_rate, blood_pressure with systolic and dystolic, respiratory_rate, oxygen_saturation, weight) 
    and baseline vitals (date, height, weight, bmi, bmi_percentile)
    - Laboratory: covid19 result, and a respiratory panel with results for influenza_a, influenza_b, rsv, parainfluenza_1/2/3, rhinovirus, metapneumovirus, adenovirus
    - Procedures: a list with code and description

    If a field is not mentioned, leave it as null or empty. Return only in the requested JSON structure.

    {what_person_said}

    {format_instructions}
    """

    model = ChatOpenAI(api_key="") # Add key here

    prompt = PromptTemplate(
        template=prompt_prefix,
        input_variables=["what_person_said"],
        partial_variables={"format_instructions": patient_record_parser.get_format_instructions()}
    )
    # print(prompt.format(what_person_said= "Hello there"))


    chain = prompt | model
    response = chain.invoke(note)
    #print(response.content)

    parsed_record = patient_record_parser.parse(response.content)

    PatientRecord = parsed_record.model_dump()
    #print((PatientRecord))

    PatientRecord["encounter"]["id"] = noteName[19: 54]

    return PatientRecord


parsed_data = []
for file_path in file_paths:
    file_df = spark.read.text(file_path)
    note = "\n".join(row["value"] for row in file_df.collect())
    parsed_data.append(parse_encounters(note, file_path))

with open("parsed_encounters.json", "w") as json_file:
    json.dump(parsed_data, json_file, indent=4)

parsed_df_orig = pd.json_normalize(parsed_data)
parsed_df_orig.to_parquet("encounter_notes.parquet", engine="pyarrow", index=False)

display(parsed_df_orig)


conditions,medications,immunizations,procedures,encounter.id,encounter.date,encounter.time,encounter.type.code,encounter.type.description,encounter.provider_id,encounter.facility_id,demographics.id,demographics.name,demographics.date_of_birth,demographics.age,demographics.gender,demographics.address.city,demographics.address.state,demographics.insurance,vitals.current.temperature.code,vitals.current.temperature.value,vitals.current.temperature.unit,vitals.current.heart_rate.code,vitals.current.heart_rate.value,vitals.current.heart_rate.unit,vitals.current.blood_pressure.systolic.code,vitals.current.blood_pressure.systolic.value,vitals.current.blood_pressure.systolic.unit,vitals.current.blood_pressure.dystolic.code,vitals.current.blood_pressure.dystolic.value,vitals.current.blood_pressure.dystolic.unit,vitals.current.respiratory_rate.code,vitals.current.respiratory_rate.value,vitals.current.respiratory_rate.unit,vitals.current.oxygen_saturation.code,vitals.current.oxygen_saturation.value,vitals.current.oxygen_saturation.unit,vitals.current.weight.code,vitals.current.weight.value,vitals.current.weight.unit,vitals.baseline.date,vitals.baseline.height.code,vitals.baseline.height.value,vitals.baseline.height.unit,vitals.baseline.weight.code,vitals.baseline.weight.value,vitals.baseline.weight.unit,vitals.baseline.bmi.code,vitals.baseline.bmi.value,vitals.baseline.bmi.unit,vitals.baseline.bmi_percentile.code,vitals.baseline.bmi_percentile.value,vitals.baseline.bmi_percentile.unit,laboratory.covid19.code,laboratory.covid19.description,laboratory.covid19.result,laboratory.respiratory_panel.influenza_a.code,laboratory.respiratory_panel.influenza_a.result,laboratory.respiratory_panel.influenza_b.code,laboratory.respiratory_panel.influenza_b.result,laboratory.respiratory_panel.rsv.code,laboratory.respiratory_panel.rsv.result,laboratory.respiratory_panel.parainfluenza_1.code,laboratory.respiratory_panel.parainfluenza_1.result,laboratory.respiratory_panel.parainfluenza_2.code,laboratory.respiratory_panel.parainfluenza_2.result,laboratory.respiratory_panel.parainfluenza_3.code,laboratory.respiratory_panel.parainfluenza_3.result,laboratory.respiratory_panel.rhinovirus.code,laboratory.respiratory_panel.rhinovirus.result,laboratory.respiratory_panel.metapneumovirus.code,laboratory.respiratory_panel.metapneumovirus.result,laboratory.respiratory_panel.adenovirus.code,laboratory.respiratory_panel.adenovirus.result
"List(List(null, Obesity))",List(),"List(List(null, 2/23/2020, Influenza vaccine))",List(),55ae6fc_7e18_4a39_8058_64082ca6d515,"March 2, 2020",15:45-16:30,,Well child visit,e2c226c2-3e1e-3d0b-b997-ce9544c10528,5103c940-0c08-392f-95cd-446e0cea042a,055ae6fc-7e18-4a39-8058-64082ca6d515,Jeffrey Greenfelder,1/16/2005,15,Male,Springfield,Massachusetts,Guardian,,39.3,°C,,131.1,/min,,120.0,mmHg,,73.0,mmHg,,27.6,/min,,75.8,%,,81.2,kg,2/23/2020,,155.0,cm,,81.2,kg,,33.8,kg/m²,,99.1,th percentile,,,False,,False,,False,,False,,False,,False,,False,,False,,False,,False
List(),"List(List(null, Jolivette (oral contraceptive)))","List(List(null, 2/21/2020, Influenza vaccine), List(null, 2/21/2020, Meningococcal vaccine))",List(),99c586f_af16_4091_9998_ee4cfc02ee7a,"March 2, 2020",04:15-05:15,Ambulatory/Urgent Care,Urgent Care,8be741d6-44d0-3412-88dd-4987e1559f0b,b1ddf812-1fdd-3adf-b1d5-32cc8bd07ebb,199c586f-af16-4091-9998-ee4cfc02ee7a,Jimmie Harris,1/9/2004,16,Female,Pembroke,MA,Medicare/Medicaid,,40.7,°C,,98.0,/min,,120.0,mmHg,,89.0,mmHg,,22.0,/min,,78.2,%,,45.3,kg,2/21/2020,,149.3,cm,,45.3,kg,,20.3,kg/m2,,47.6,%,,,True,,False,,False,,False,,False,,False,,False,,False,,False,,False
List(),"List(List(null, Jolivette (oral contraceptive)))","List(List(null, 2/21/2020, Influenza vaccine), List(null, 2/21/2020, Meningococcal vaccine))",List(),99c586f_af16_4091_9998_ee4cfc02ee7a,"March 2, 2020",04:15-05:15,Ambulatory/Urgent Care,Ambulatory/Urgent Care,8be741d6-44d0-3412-88dd-4987e1559f0b,b1ddf812-1fdd-3adf-b1d5-32cc8bd07ebb,199c586f-af16-4091-9998-ee4cfc02ee7a,Jimmie Harris,1/9/2004,16,Female,Pembroke,MA,Medicare/Medicaid,,40.7,C,,98.0,/min,,120.0,mmHg,,89.0,mmHg,,22.0,/min,,78.2,%,,45.3,kg,2/21/2020,,149.3,cm,,45.3,kg,,20.3,kg/m2,,47.6,th percentile,,,True,,False,,False,,False,,False,,False,,False,,False,,False,,False
"List(List(null, Confirmed COVID-19 infection with severe symptoms), List(null, Severe hypoxemia requiring immediate intervention), List(null, Significant bradycardia requiring monitoring), List(null, Upper respiratory symptoms))",List(),List(),List(),53016ea_a0ff_4154_85bb_1cf8b6cedf20,"March 3, 2020",00:58-02:19,Ambulatory/Urgent Care,,6ba172e8-ae6d-3c6c-9f49-38096f9d1e1e,d78e84ec-30aa-3bba-a33a-f29a3a454662,353016ea-a0ff-4154-85bb-1cf8b6cedf20,Gregorio Auer,11/15/1996,23,Male,Boston,MA,Medicare/Medicaid,,38.8,°C,,51.0,/min,,116.0,mmHg,,80.0,mmHg,,19.0,/min,,77.6,%,,91.2,kg,,,,,,,,,,,,,,,,True,,False,,False,,,,,,,,,,,,,,
"List(List(TBI, Brain damage - traumatic), List(MISCARRIAGE, History of miscarriage in first trimester), List(PREGNANT, Currently pregnant))",List(),"List(List(INFLUENZA, 10/9/2019, Influenza vaccine))",List(),8658715_b770_4576_9a81_fbb2282a98ea,"February 29, 2020",04:48-05:44,Ambulatory,Ambulatory Encounter,bc4a66b7-a2ba-3ad3-af08-2975489d8495,3bd5eda0-16da-3ba5-8500-4dfd6ae118b8,28658715-b770-4576-9a81-fbb2282a98ea,Mrs. Karyn217 Metz686,7/31/1991,28,Female,Medfield,Massachusetts,Guardian,,41.5,°C,,84.4,/min,,108.0,mmHg,,84.0,mmHg,,27.3,/min,,86.7,%,,74.0,kg,10/9/2019,,167.3,cm,,74.0,kg,,26.4,kg/m²,,,,,,True,,False,,False,,,,,,,,,,,,,,
"List(List(null, Hypertension (since 2010) - on combination therapy), List(null, Recent viral sinusitis (11/2019)))","List(List(null, amLODIPine/Hydrochlorothiazide/Olmesartan (5/12.5/20 mg) daily))","List(List(null, 10/15/2019, Influenza vaccine))",List(),e9efba3_ddc4_43f9_a781_f72019388548,"March 11, 2020",10:51-11:42,Ambulatory/Urgent Care,Ambulatory/Urgent Care,f59948a6-b07a-3248-9661-60f5924b644b,5d4b9df1-93ae-3bc9-b680-03249990e558,ae9efba3-ddc4-43f9-a781-f72019388548,Jayson Fadel,6/30/1992,27,Male,Chicopee,MA,Guardian,,41.1,C,,187.0,/min,,187.0,mmHg,,96.0,mmHg,,33.0,/min,,88.7,%,,68.2,kg,10/15/2019,,165.7,cm,,68.2,kg,,24.8,kg/m2,,,,,COVID-19 PCR,True,,False,,False,,False,,False,,False,,False,,False,,False,,False
"List(List(null, Confirmed COVID-19 infection), List(null, Acute hypoxemia), List(null, Multiple associated symptoms including fever, myalgia, arthralgia, and ageusia))",List(),List(),List(),9fd2dd8_181b_494b_ab15_e9f286d668d9,"February 25, 2020",15:39-16:31,ambulatory,Ambulatory,046c4fac-2680-391a-ba36-eeaec6eed0c5,e002090d-4e92-300e-b41e-7d1f21dee4c6,b9fd2dd8-181b-494b-ab15-e9f286d668d9,Mr. Milo271 Feil794,12/12/1983,36,Male,Somerville,Massachusetts,Guardian,,39.9,°C,,82.1,/min,,122.0,mmHg,,80.0,mmHg,,31.5,/min,,89.0,%,,112.7,kg,,,,,,,,,,,,,,,,True,,False,,False,,,,,,,,,,,,,,
"List(List(null, Pulmonary emphysema), List(null, Hypertension))","List(List(null, Hydrochlorothiazide 12.5 MG daily), List(null, Fluticasone/Salmeterol 250/50 mcg inhaler BID))","List(List(null, 3/11/2020, Influenza vaccine))",List(),f6b563d_1ff4_4833_9af8_84431e641e9c,"March 13, 2020",16:12-17:11,,Ambulatory/Urgent Care,d76028b8-d23f-346e-abd0-65995aec66c8,,df6b563d-1ff4-4833-9af8-84431e641e9c,Ms. Brown,9/29/1982,37,Female,Boston,MA,Medicare/Medicaid,,40.6,°C,,179.0,/min,,106.0,mmHg,,78.0,mmHg,,24.0,/min,,83.6,%,,59.9,kg,,,,,,,,,,,,,,,,True,,False,,False,,False,,False,,False,,False,,False,,False,,False
"List(List(null, Obesity), List(null, Prediabetes))","List(List(null, Meperidine Hydrochloride 50 MG), List(null, Acetaminophen 325 MG))",List(),"List(List(null, 9/13/2019, Clavicle X-ray, null, null), List(null, 9/13/2019, Admission to orthopedic department, null, null))",22592ac_552f_4ecd_a63d_7663d77ce9ba,"March 7, 2020",12:56-14:13,Ambulatory,Ambulatory Encounter,f59948a6-b07a-3248-9661-60f5924b644b,5d4b9df1-93ae-3bc9-b680-03249990e558,d22592ac-552f-4ecd-a63d-7663d77ce9ba,Mr. José Eduardo181 Gómez206,6/22/1989,30,Male,Chicopee,Massachusetts,Guardian,,42.0,°C,,87.9,/min,,113.0,mmHg,,83.0,mmHg,,20.9,/min,,83.5,%,,94.7,kg,,,,,,,,,,,,,,,,True,,False,,False,,,,,,,,,,,,,,
List(),List(),"List(List(null, 8/1/2019, Influenza vaccine), List(null, 1/30/2020, Hepatitis A vaccine))",List(),0f3bc8d_ef38_49ce_a2bd_dfdda982b271,"March 2, 2020",01:02-01:58,Ambulatory/Urgent Care,Ambulatory/Urgent Care,9c875a09-93e0-39aa-9260-ad264bbdd3fe,fd328395-ab1d-35c6-a2d0-d05a9a79cf11,f0f3bc8d-ef38-49ce-a2bd-dfdda982b271,Jacinto Kris,8/24/2017,2,Male,Springfield,MA,Self-Pay,,39.7,°C,,164.0,/min,,130.0,mmHg,,75.0,mmHg,,33.0,/min,,86.2,%,,13.5,kg,1/30/2020,,86.3,cm,,13.4,kg,,18.0,kg/m2,,88.0,percentile,,,True,,False,,False,,False,,False,,False,,False,,False,,False,,False


2. Basic Analytics and Visualizations:
Using Apache Spark, perform comprehensive data analysis on the encounter data and create visualizations that reveal meaningful patterns. Your analysis must include:
- COVID-19 Case Demographics: Case breakdown by age ranges ([0-5], [6-10], [11-17], [18-30], [31-50], [51-70], [71+])
- Cumulative case count of Covid between the earliest case observed in the dataset and last case observed
- ~Symptoms for all COVID-19 patients versus patients that admitted into the intensive care unit due to COVID.~
- Rank medications by frequency of prescription
- Analyze medication patterns across different demographic groups (e.g., top 3 per age group)
- Identify and plot co-morbidity information from the patient records (e.g., hypertension, obesity, prediabetes, etc.) provided in the dataset. 
- An independent group analysis: You need to develop and execute THREE original analyses that provide meaningful insights about COVID-19 patterns in this dataset. For each analysis:
  - Clearly state your analytical question/hypothesis
  - Justify why this analysis is valuable
  - Show your Spark code and methodology
  - Present results with appropriate visualizations

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

age_groups = [
    ('0-5', (0, 5)),
    ('6-10', (6, 10)),
    ('11-17', (11, 17)),
    ('18-30', (18, 30)),
    ('31-50', (31, 50)),
    ('51-70', (51, 70)),
    ('71+', (71, float('inf')))
]

def categorize_age(age):
    for label, (min_age, max_age) in age_groups:
        if min_age <= age <= max_age:
            return label
    return 'Unknown'

categorize_age_udf = udf(categorize_age, StringType())
parsed_df = spark.createDataFrame(parsed_df_orig)
parsed_df_with_age_group = parsed_df.withColumn('age_group', categorize_age_udf(parsed_df['demographics.age']))
age_group_counts = parsed_df_with_age_group.groupBy("age_group").count().orderBy("count", ascending=False)

age_group_counts.show()


In [0]:
from pyspark.sql.functions import to_date, col

parsed_df = spark.createDataFrame(parsed_df_orig)
parsed_df_with_date = parsed_df.withColumn('date', to_date(parsed_df['encounter.date'], 'yyyy-MM-dd'))
covid_cases_df = parsed_df_with_date.filter(parsed_df_with_date['laboratory.covid19.result'] == True)
covid_cases_df = covid_cases_df.orderBy('date')
covid_cases_df = covid_cases_df.withColumn('cumulative_cases', sum(covid_cases_df['count']).over(Window.orderBy('date')))

covid_cases_df.select('date', 'cumulative_cases').show()


In [0]:
from pyspark.sql.functions import explode

medications_df = parsed_df.select(explode(parsed_df['medications']).alias('medication'))
medication_counts = medications_df.groupBy("medication.description").count().orderBy('count', ascending=False)

medication_counts.show()

+--------------------+-----+
|         description|count|
+--------------------+-----+
|Jolivette (oral c...|    2|
|amLODIPine/Hydroc...|    1|
|Fluticasone/Salme...|    1|
|Hydrochlorothiazi...|    1|
|Meperidine Hydroc...|    1|
|Acetaminophen 325 MG|    1|
+--------------------+-----+



In [0]:
parsed_df_with_age_group = parsed_df.withColumn('age_group', categorize_age_udf(parsed_df['demographics.age']))
medications_by_age_df = parsed_df_with_age_group.select("age_group", explode(parsed_df['medications']).alias('medication'))
medication_by_age_counts = medications_by_age_df.groupBy("age_group", "medication.description").count()

medication_by_age_counts.show()

In [0]:
conditions_df = parsed_df.select(explode(parsed_df['conditions']).alias('condition'))
co_morbidities = ['hypertension', 'obesity', 'prediabetes']  # example conditions
co_morbidity_df = conditions_df.filter(conditions_df['condition.description'].isin(co_morbidities))
co_morbidity_counts = co_morbidity_df.groupBy('condition.description').count()

co_morbidity_counts.show()

+-----------+-----+
|description|count|
+-----------+-----+
+-----------+-----+

