## Final Assignment Overview: Working with Patient Records and Encounter Notes

In this final assignment, we’ll focus on patient records related to COVID-19 encounters. Our task is to analyze, process, and transform the data while applying the concepts we’ve covered throughout this course. Here's a detailed breakdown of the assignment:

What Are Encounter Notes?
An encounter note is a record that captures details about a patient’s visit with a doctor. It includes both structured and semi-structured information that is crucial for understanding the context of the visit. Here’s what an encounter note typically looks like:

```
AMBULATORY ENCOUNTER NOTE
Date of Service: March 2, 2020 15:45-16:30

DEMOGRAPHICS:
Name: Jeffrey Greenfelder
DOB: 1/16/2005
Gender: Male
Address: 428 Wiza Glen Unit 91, Springfield, Massachusetts 01104
Insurance: Guardian
MRN: 055ae6fc-7e18-4a39-8058-64082ca6d515

PERTINENT MEDICAL HISTORY:
- Obesity 

Recent Visit: Well child visit (2/23/2020)
Immunizations: Influenza vaccine (2/23/2020)

Recent Baseline (2/23/2020):
Height: 155.0 cm
Weight: 81.2 kg
BMI: 33.8 kg/m² (99.1th percentile)
BP: 123/80 mmHg
HR: 92/min
RR: 13/min

SUBJECTIVE:
Adolescent patient presents with multiple symptoms including:
- Cough
- Sore throat
- Severe fatigue
- Muscle pain
- Joint pain
- Fever
Never smoker. Symptoms began recently.

OBJECTIVE:
Vitals:
Temperature: 39.3°C (102.7°F)
Heart Rate: 131.1/min
Blood Pressure: 120/73 mmHg
Respiratory Rate: 27.6/min
O2 Saturation: 75.8% on room air
Weight: 81.2 kg

Laboratory/Testing:
Comprehensive Respiratory Panel:
- Influenza A RNA: Negative
- Influenza B RNA: Negative
- RSV RNA: Negative
- Parainfluenza virus 1,2,3 RNA: Negative
- Rhinovirus RNA: Negative
- Human metapneumovirus RNA: Negative
- Adenovirus DNA: Negative
- SARS-CoV-2 RNA: Positive

ASSESSMENT:
1. Suspected COVID-19 with severe symptoms
2. Severe hypoxemia requiring immediate intervention
3. Tachycardia (HR 131)
4. High-grade fever
5. Risk factors:
   - Obesity (BMI 33.8)
   - Adolescent age

PLAN:
1. Face mask provided for immediate oxygen support
2. Infectious disease care plan initiated
3. Close monitoring required due to:
   - Severe hypoxemia
   - Tachycardia
   - Age and obesity risk factors
4. Parent/patient education on:
   - Home isolation protocols
   - Warning signs requiring emergency care
   - Return precautions
5. Follow-up plan:
   - Daily monitoring during acute phase
   - Virtual check-ins as needed

Encounter Duration: 45 minutes
Encounter Type: Ambulatory
Provider: ID# e2c226c2-3e1e-3d0b-b997-ce9544c10528
Facility: 5103c940-0c08-392f-95cd-446e0cea042a
```


The enocuter contains

* General encounter information: 

  * When the encounter took place: Date and time of the visit.
  * Demographics: Patient’s age, gender, and unique medical record identifier.
  * Encounter details: The reason for the visit, diagnosis, and any associated costs.


* Semi-Structured Notes:

These notes mirror how doctors organize their thoughts and observations during an encounter. They generally follow a SOAP format:

* Subjective: The patient’s subjective description of their symptoms, feelings, and medical concerns.
* Objective: The doctor’s objective findings, including test results, measurements, or physical examination outcomes.
* Assessment: The doctor’s evaluation or diagnosis based on subjective and objective information.
* Plan: The proposed treatment plan, including medications, follow-ups, or other interventions.

While some encounter notes might include additional details, the majority conform to this semi-structured format, making them ideal for analysis and transformation.

* Goals for the Assignment

1. Transforming Encounter Notes:

Using an LLM to convert semi-structured encounter notes into a JSON format that organizes the information into structured fields. The JSON will include details such as demographics, encounter specifics, and the SOAP components of the note. Subsequently, you will need to transform the JSON data into a Parquet file, which is not only suitable for analysis in Spark but also ideal for storage later.
Here we will use the ML classificaition to assing the objective and assessment semi-structured fields into standardized, structured fields. The medical taxonomy for this task will be the one provided by the CDC, which defines standard codes for diagnoses, symptoms, procedures, and treatments. This step ensures the structured data aligns with domain-wide medical standards, making it interoperable and ready for deeper analysis.

The JSON format should capture the hierachies described in the structure below. 




2. Basic Analytics and Visualizations:
Using Apache Spark, perform comprehensive data analysis on the encounter data and create visualizations that reveal meaningful patterns. Your analysis must include:
- COVID-19 Case Demographics: Case breakdown by age ranges ([0-5], [6-10], [11-17], [18-30], [31-50], [51-70], [71+])
- Cumulative case count of Covid between the earliest case observed in the dataset and last case observed
- Symptoms for all COVID-19 patients versus patients that admitted into the intensive care unit due to COVID.
- Rank medications by frequency of prescription
- Analyze medication patterns across different demographic groups (e.g., top 3 per age group)
- Identify and plot co-morbidity information from the patient records (e.g., hypertension, obesity, prediabetes, etc.) provided in the dataset. 
- An independent group analysis: You need to develop and execute THREE original analyses that provide meaningful insights about COVID-19 patterns in this dataset. For each analysis:
  - Clearly state your analytical question/hypothesis
  - Justify why this analysis is valuable
  - Show your Spark code and methodology
  - Present results with appropriate visualizations


In [None]:
from pydantic import Basemodel, Field, field_validator

class Address(BaseModel):
    city: str = Field(description=" the city where the patient lives. Should be under DEMOGRAPHICS header")
    state: str

class Demographics(BaseModel):
    name: str
    date_of_birth: str
    age: int
    gender: str
    address: Address
    insurance: str

class Medication(BaseModel):
    code: str
    description: str

class PatientRecord(BaseModel):
    demographics: Demographics
    medications: List[Medication]

        
llm = ChatOpenAI(model="gpt-4o-mini")
structured_llm = llm.with_structured_output(PatientRecord)


In [None]:
EncounterType:
    code
    description

Encounter:
    date
    time
    provider_id
    facility_id

Address:
    city
    state

Demographics:
    name
    date_of_birth
    age
    gender
    address: Address
    insurance

Condition:
    code
    description

Medication:
    code
    description

Immunization:
    code
    description
    date: date

VitalMeasurement:
    code
    value:
    unit

BloodPressure:
    systolic: VitalMeasurement
    diastolic: VitalMeasurement

CurrentVitals:
    temperature: VitalMeasurement
    heart_rate: VitalMeasurement
    blood_pressure: BloodPressure
    respiratory_rate: VitalMeasurement
    oxygen_saturation: VitalMeasurement
    weight: VitalMeasurement

BaselineVitals:
    date: date
    height: VitalMeasurement
    weight: VitalMeasurement
    bmi: VitalMeasurement
    bmi_percentile: VitalMeasurement

Vitals:
    current: CurrentVitals
    baseline: BaselineVitals

RespiratoryTest:
    code
    result

RespiratoryPanel:
    influenza_a: RespiratoryTest
    influenza_b: RespiratoryTest
    rsv: RespiratoryTest
    parainfluenza_1: RespiratoryTest
    parainfluenza_2: RespiratoryTest
    parainfluenza_3: RespiratoryTest
    rhinovirus: RespiratoryTest
    metapneumovirus: RespiratoryTest
    adenovirus: RespiratoryTest

Covid19Test:
    code
    description
    result

Laboratory:
    covid19: Covid19Test
    respiratory_panel: RespiratoryPanel

Procedure:
    code
    description
    date: date
    reasonCode
    reasonDescription

CarePlan:
    code
    description
    start: date
    stop: date
    reasonCode
    reasonDescription

PatientRecord:
    encounter: Encounter
    demographics: Demographics
    conditions: List[Condition]
    medications: List[Medication]
    immunizations: List[Immunization]
    vitals: Vitals
    laboratory: Laboratory
    procedures: List[Procedure]


In [None]:
# 1. Transforming Encounter Notes:

import os
import openai
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime
from pydantic import ValidationError
import openai
import json

# Load CSV files
encounters_df = pd.read_csv("data/encounters_assignment_1.csv")
encounter_types_df = pd.read_csv("data/encounters_types_assignment_1.csv")
immunizations_df = pd.read_csv("data/immunizations_assignment_1.csv")
medications_df = pd.read_csv("data/medications_assignment_1.csv")
observations_df = pd.read_csv("data/observations_assignment_1.csv")

# Inspect the data
print("Encounters Data:")
print(encounters_df.head())
print("\nEncounter Types Data:")
print(encounter_types_df.head())
print("\nImmunizations Data:")
print(immunizations_df.head())
print("\nMedications Data:")
print(medications_df.head())
print("\nObservations Data:")
print(observations_df.head())


import os

# Define the path to the directory containing encounter notes
notes_dir = r"data/encounter_notes"

# List all text files in the directory
text_files = [os.path.join(notes_dir, f) for f in os.listdir(notes_dir) if f.endswith('.txt')]
print("Text Files Found:", text_files)


from pydantic import BaseModel, Field, ValidationError, field_validator
from typing import List, Optional
from datetime import datetime

class Address(BaseModel):
    city: str = Field(description="City of residence.")
    state: str = Field(description="State of residence.")
    postal_code: Optional[str] = Field(None, description="Postal code.")

class Demographics(BaseModel):
    name: str = Field(description="Full name of the patient.")
    dob: str = Field(description="Date of birth in MM/DD/YYYY format.")
    age: int = Field(description="Calculated age of the patient in years.")
    gender: str = Field(description="Gender of the patient.")
    address: Address = Field(description="Address details.")
    insurance: str = Field(description="Insurance information.")
    mrn: str = Field(description="Unique medical record number.")

    @field_validator("dob")
    def validate_dob_format(cls, value):
        try:
            datetime.strptime(value, "%m/%d/%Y")
        except ValueError:
            raise ValueError("Date of Birth must be in MM/DD/YYYY format.")
        return value

class Vitals(BaseModel):
    temperature: Optional[float] = Field(None, description="Body temperature in Celsius.")
    heart_rate: Optional[float] = Field(None, description="Heart rate in beats per minute.")
    blood_pressure: Optional[str] = Field(None, description="Blood pressure (e.g., 120/80).")
    respiratory_rate: Optional[float] = Field(None, description="Respiratory rate.")
    o2_saturation: Optional[float] = Field(None, description="Oxygen saturation.")

class SOAP(BaseModel):
    subjective: List[str] = Field(description="Subjective symptoms described by the patient.")
    objective: Vitals = Field(description="Objective findings.")
    assessment: List[str] = Field(description="Doctor's evaluation.")
    plan: List[str] = Field(description="Proposed treatment plan.")

class Encounter(BaseModel):
    encounter_note: str = Field(description="Full text of the encounter note.")
    date_of_service: datetime = Field(description="Date and time of the encounter.")
    demographics: Demographics = Field(description="Demographic information.")
    soap: SOAP = Field(description="SOAP note details.")
    provider_id: str = Field(description="Provider's unique identifier.")
    facility_id: Optional[str] = Field(None, description="Facility identifier.")
    encounter_duration: Optional[int] = Field(None, description="Duration in minutes.")
    encounter_type: str = Field(description="Type of encounter (e.g., Urgent Care).")



openai.api_key = "Secret!"

# Define the OpenAI prompt generator
def generate_openai_prompt(note: str) -> str:
    schema_description = """
    Your task is to extract structured information from unstructured medical encounter notes using the schema below. 
    - For the demographics section, calculate the patient's age in years based on the Date of Birth (DOB) and Date of Service (DOS). 
    - Ensure the Date of Birth (dob) is strictly in MM/DD/YYYY format.
    - Ensure `date_of_service` is formatted as `YYYY-MM-DD HH:MM:SS`.
    - Include the calculated age under the demographics section.
    - Ensure the output is a valid JSON object strictly following the schema.

    JSON Schema:
    {
        "encounter_note": "string - Full text of the encounter note.",
        "date_of_service": "datetime - Date and time of the encounter.",
        "demographics": {
            "name": "string - Full name of the patient.",
            "dob": "string - Patient's date of birth in MM/DD/YYYY format.",
            "age": "int - Patient's age in years, calculated from DOB and DOS.",
            "gender": "string - Gender of the patient (e.g., Male, Female).",
            "address": {
                "city": "string - City of residence.",
                "state": "string - State of residence.",
                "postal_code": "string - Postal code (if available)."
            },
            "insurance": "string - Name of the insurance provider.",
            "mrn": "string - Unique medical record number."
        },
        "soap": {
            "subjective": ["string - List of symptoms described by the patient."],
            "objective": {
                "vitals": {
                    "temperature": "float - Body temperature in Celsius.",
                    "heart_rate": "float - Heart rate in beats per minute.",
                    "blood_pressure": "string - Blood pressure in mmHg (e.g., 120/80).",
                    "respiratory_rate": "float - Respiratory rate in breaths per minute.",
                    "o2_saturation": "float - Oxygen saturation percentage."
                }
            },
            "assessment": ["string - List of diagnoses or evaluations."],
            "plan": ["string - List of proposed treatments or next steps."]
        },
        "provider_id": "string - Unique identifier of the healthcare provider.",
        "facility_id": "string - Unique identifier of the facility.",
        "encounter_duration": "int - Duration of the encounter in minutes.",
        "encounter_type": "string - Type of encounter (e.g., Urgent Care, Ambulatory)."
    }
    """
    prompt = f"""
    Extract structured information from the following medical encounter note. Ensure the data strictly adheres to the JSON schema described above. 
    - Calculate the patient's age in years by subtracting the year of birth (DOB) from the year of the Date of Service (DOS). 
    - Ensure all fields are present, even if optional fields (like postal code) are null. 

    Encounter Note:
    {note}
    """
    return schema_description + prompt

# Parse encounter notes using OpenAI
def parse_encounter_notes(note: str) -> dict:
    prompt = generate_openai_prompt(note)
    try:
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "You are a medical assistant skilled at extracting structured data from medical notes."},
                {"role": "user", "content": prompt}
            ]
        )
        # Convert the JSON string output to a Python dictionary
        return json.loads(response['choices'][0]['message']['content'])
    except json.JSONDecodeError as e:
        print(f"JSON Decode Error: {e}")
        return None
    except Exception as e:
        print(f"Error during OpenAI API call: {e}")
        return None

# Preprocess the date_of_service field (for preventing validation errors)
def preprocess_date_of_service(json_data: dict) -> dict:
    try:
        if "date_of_service" in json_data:
            raw_date = json_data["date_of_service"]
            # If already in ISO format, skip further processing
            if "T" in raw_date or "-" in raw_date:
                return json_data
            # Parse and reformat non-standard datetime
            parsed_date = datetime.strptime(raw_date.split("-")[0], "%B %d, %Y %H:%M")
            json_data["date_of_service"] = parsed_date.strftime("%Y-%m-%d %H:%M:%S")
    except Exception as e:
        print(f"Error during date_of_service preprocessing: {e}")
    return json_data

# Directory containing encounter notes
notes_dir = r"data/encounter_notes"
text_files = [os.path.join(notes_dir, f) for f in os.listdir(notes_dir) if f.endswith('.txt')]

# Process and validate encounter notes
validated_encounters = []

for file_path in text_files:
    with open(file_path, 'r') as f:
        raw_note = f.read()
        print(f"Processing: {file_path}")
        parsed_note = parse_encounter_notes(raw_note)
        if parsed_note:
            try:
                # Preprocess the parsed JSON data
                parsed_json = preprocess_date_of_service(parsed_note)
                # Validate the preprocessed data using Pydantic
                validated_data = Encounter.model_validate(parsed_json)
                validated_encounters.append(validated_data.model_dump())
            except ValidationError as e:
                print(f"Validation Error for {file_path}:", e.json(indent=2))

# Save validated data to Parquet
validated_df = pd.DataFrame(validated_encounters)
parquet_path = r"data/merged_encounter_data.parquet"
table = pa.Table.from_pandas(validated_df)
pq.write_table(table, parquet_path)

print("Data successfully saved to Parquet.")

# Text Files Found: ['data/encounter_notes\\055ae6fc-7e18-4a39-8058-64082ca6d515.txt', 'data/encounter_notes\\199c586f-af16-4091-9998-ee4cfc02ee7a.txt', 'data/encounter_notes\\28658715-b770-4576-9a81-fbb2282a98ea.txt', 'data/encounter_notes\\353016ea-a0ff-4154-85bb-1cf8b6cedf20.txt', 'data/encounter_notes\\ae9efba3-ddc4-43f9-a781-f72019388548.txt', 'data/encounter_notes\\b9fd2dd8-181b-494b-ab15-e9f286d668d9.txt', 'data/encounter_notes\\d22592ac-552f-4ecd-a63d-7663d77ce9ba.txt', 'data/encounter_notes\\df6b563d-1ff4-4833-9af8-84431e641e9c.txt', 'data/encounter_notes\\f0f3bc8d-ef38-49ce-a2bd-dfdda982b271.txt', 'data/encounter_notes\\f73d6f41-0091-4485-8b43-9d38eb98fb36.txt']
# Processing: data/encounter_notes\055ae6fc-7e18-4a39-8058-64082ca6d515.txt    
# Processing: data/encounter_notes\199c586f-af16-4091-9998-ee4cfc02ee7a.txt    
# Processing: data/encounter_notes\28658715-b770-4576-9a81-fbb2282a98ea.txt    
# JSON Decode Error: Expecting value: line 1 column 1 (char 0)
# Processing: data/encounter_notes\353016ea-a0ff-4154-85bb-1cf8b6cedf20.txt    
# Processing: data/encounter_notes\ae9efba3-ddc4-43f9-a781-f72019388548.txt
# Processing: data/encounter_notes\b9fd2dd8-181b-494b-ab15-e9f286d668d9.txt
# Processing: data/encounter_notes\d22592ac-552f-4ecd-a63d-7663d77ce9ba.txt
# Processing: data/encounter_notes\df6b563d-1ff4-4833-9af8-84431e641e9c.txt
# Processing: data/encounter_notes\f0f3bc8d-ef38-49ce-a2bd-dfdda982b271.txt
# Processing: data/encounter_notes\f73d6f41-0091-4485-8b43-9d38eb98fb36.txt
# Data successfully saved to Parquet.

In [None]:
# Using the provided JSON and turn it into aparquet file

import pandas as pd
import json
import pyarrow as pa
import pyarrow.parquet as pq

jsonl_path = r"Parsed_notes.jsonl"

# Read JSONL into a list of dictionaries
parsed_notes = []
with open(jsonl_path, 'r') as file:
    for line in file:
        parsed_notes.append(json.loads(line))

parsed_df = pd.json_normalize(parsed_notes)
print(parsed_df.head())

parquet_path = r"new_parsed_notes.parquet"

table = pa.Table.from_pandas(parsed_df)
pq.write_table(table, parquet_path)

print(f"Data successfully saved to Parquet at {parquet_path}")



In [None]:
#------------------------------------------------------------------------------------------------#
#------------------------------------------------------------------------------------------------#
#------------------------------------------------------------------------------------------------#

In [None]:
# 2. Basic Analytics and Visualizations: (Code from databricks)

parquet_path = "dbfs:/FileStore/new_parsed_notes.parquet"

df = spark.read.parquet(parquet_path)
df.printSchema()
df.show(5)

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("COVID-19 Data Analytics Using Parquet") \
    .getOrCreate()

parsed_notes_parquet = "dbfs:/FileStore/new_parsed_notes.parquet"
parsed_notes_df = spark.read.parquet(parsed_notes_parquet)

# parsed_notes_df.printSchema()



In [None]:
# COVID-19 Case Demographics

from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

age_ranges = [
    (0, 5), (6, 10), (11, 17), (18, 30), (31, 50), (51, 70), (71, 120)
]

def age_range(age):
    for start, end in age_ranges:
        if start <= age <= end:
            return f"{start}-{end}"
    return "Unknown"

age_range_udf = udf(age_range, StringType())

demographics_by_age = parsed_notes_df \
    .withColumn("age_range", age_range_udf(col("`demographics.age`"))) \
    .groupBy("age_range").count() \
    .orderBy("age_range")

demographics_by_age.show()

# +---------+-----+
# |age_range|count|
# +---------+-----+
# |      0-5|   25|
# |    11-17|  126|
# |    18-30|  320|
# |    31-50|  436|
# |    51-70|  508|
# |     6-10|   95|
# |   71-120|  489|
# +---------+-----+


In [None]:
# Cumulative Case Count


from pyspark.sql.functions import to_date

cumulative_cases = parsed_notes_df \
    .withColumn("case_date", to_date(col("`encounter.date`"))) \
    .groupBy("case_date").count() \
    .orderBy("case_date")

cumulative_cases.show(10)


# +----------+-----+
# | case_date|count|
# +----------+-----+
# |      null|  398|
# |2020-01-01|    1|
# |2020-01-04|    1|
# |2020-01-11|    1|
# |2020-01-12|    1|
# |2020-01-13|    1|
# |2020-01-15|    1|
# |2020-01-16|    2|
# |2020-01-17|    1|
# |2020-01-20|    2|
# +----------+-----+
# only showing top 10 rows



In [None]:
# Symptoms Analysis

from pyspark.sql.functions import explode

symptoms_analysis = parsed_notes_df \
    .withColumn("condition_code", explode(col("conditions.code"))) \
    .groupBy("condition_code").count() \
    .orderBy(col("count").desc())

symptoms_analysis.show(10)


# +--------------+-----+
# |condition_code|count|
# +--------------+-----+
# |     840544004| 1486|
# |     840539006| 1435|
# |     386661006| 1322|
# |      49727002| 1028|
# |      36955009|  747|
# |      84229001|  560|
# |     248595008|  476|
# |     233604007|  303|
# |     271825005|  302|
# |     389087006|  302|
# +--------------+-----+
# only showing top 10 rows



In [None]:
# Rank Medications by Frequency

top_medications = parsed_notes_df \
    .withColumn("medication_code", explode(col("medications.code"))) \
    .groupBy("medication_code").count() \
    .orderBy(col("count").desc())

top_medications.show(10)

# +---------------+-----+
# |medication_code|count|
# +---------------+-----+
# |         198440|  302|
# |         854235|  291|
# |         205923|  214|
# |         854252|  174|
# |        2123111|  165|
# |         106892|   88|
# |         314231|   83|
# |         310798|   82|
# |         860975|   69|
# |         999967|   52|
# +---------------+-----+
# only showing top 10 rows



In [None]:
# Analyze Medication Patterns Across Age Groups

medications_by_age = parsed_notes_df \
    .withColumn("age_range", age_range_udf(col("`demographics.age`"))) \
    .withColumn("medication_code", explode(col("medications.code"))) \
    .groupBy("age_range", "medication_code").count() \
    .orderBy("age_range", col("count").desc())

medications_by_age.show(10)

# +---------+---------------+-----+
# |age_range|medication_code|count|
# +---------+---------------+-----+
# |      0-5|         198405|    3|
# |      0-5|         854252|    1|
# |      0-5|        2123111|    1|
# |      0-5|         854235|    1|
# |      0-5|        1043400|    1|
# |      0-5|         308182|    1|
# |      0-5|         198440|    1|
# |    11-17|        2123111|   14|
# |    11-17|         895994|   11|
# |    11-17|         854235|    9|
# +---------+---------------+-----+
# only showing top 10 rows



In [None]:
# Analyze co-morbidities from conditions

co_morbidities = parsed_notes_df \
    .withColumn("condition_code", explode(col("conditions.code"))) \
    .groupBy("condition_code").count() \
    .orderBy(col("count").desc())

co_morbidities.show(10)

# +--------------+-----+
# |condition_code|count|
# +--------------+-----+
# |     840544004| 1486|
# |     840539006| 1435|
# |     386661006| 1322|
# |      49727002| 1028|
# |      36955009|  747|
# |      84229001|  560|
# |     248595008|  476|
# |     233604007|  303|
# |     271825005|  302|
# |     389087006|  302|
# +--------------+-----+
# only showing top 10 rows



# Independent Analysis

## Top 5 symptoms in Covid-19 cases

### Analytical Question / Hypothesis:
What are the top 5 symptoms in COVID-19? 
The most common symptoms reported by COVID-19 cases in the dataset? Identifying common symptoms can help healthcare providers prioritize diagnostic criteria and treatment plans.

### Justification:
Understanding the most frequently reported symptoms provides insights into COVID-19. This enables public awareness of the disease and helps create a plan to prevent the spread of pathogens based on the symptoms.

### Spark Code and Methodology:
The dataset was processed to explode the conditions column and group by symptom codes to count their occurrences. The top 5 symptoms were visualized using a pie chart.

### Result / Visualization:
The most common symptoms in COVID-19 cases were: 1. cough, 2. fever, and 3. shortness of breath, which matches the order of possible symptoms listed by the Centers for Disease Control and Prevention (CDC).


In [None]:
from pyspark.sql.functions import col, explode, lit, when
import matplotlib.pyplot as plt

# Define a mapping of SNOMED CT codes to their descriptions
snomed_ct_mapping = {
    "386661006": "Fever",
    "49727002": "Cough",
    "36955009": "Shortness of breath",
    "233604007": "Sore throat",
    "389087006": "Loss of sense of smell",
    "271825005": "Loss of sense of taste"
}

# List of codes to exclude
exclude_codes = ["840544004", "840539006", "84229001", "248595008"]

symptoms_prevalence = (
    parsed_notes_df
    .withColumn("symptom_code", explode(col("conditions.code")))  # Explode the symptom codes
    .filter(~col("symptom_code").isin(*exclude_codes))  # Exclude non-symptom codes
    .groupBy("symptom_code")
    .count()
    .orderBy(col("count").desc())
)

# Replace codes with descriptions for clarity
symptoms_prevalence = symptoms_prevalence.withColumn(
    "symptom_description",
    lit(None).cast("string")
)

# Map symptom codes to descriptions
for code, description in snomed_ct_mapping.items():
    symptoms_prevalence = symptoms_prevalence.withColumn(
        "symptom_description",
        when(col("symptom_code") == code, lit(description)).otherwise(col("symptom_description"))
    )

symptoms_prevalence = symptoms_prevalence.filter(col("symptom_description").isNotNull())
symptoms_prevalence_pd = symptoms_prevalence.toPandas()
top_symptoms = symptoms_prevalence_pd.head(5)

plt.figure(figsize=(8, 8))
plt.pie(
    top_symptoms["count"],
    labels=top_symptoms["symptom_description"],
    autopct="%1.1f%%",
    startangle=140
)
plt.title("Top 5 Symptoms in COVID-19 Cases", fontsize=14)
plt.show()


![image](Debug_and_etc/Top_5_Symptoms.png)

## Top 3 Medications Prescribed in Each Age Group

### Analytical Question/Hypothesis:
What are the most commonly prescribed medications for each age group of COVID-19 patients?
We hypothesize that medication patterns vary across age groups due to differences in treatment protocols and patient needs.

### Justification:
The analysis aims to highlight medication trends and support the understanding of age-specific therapeutic interventions. The visualization could help guide better pharmaceutical management, such as stocking specific medications.

### Spark Code and Methodology:
The medications column was used to list individual prescriptions. Medications were grouped by age range and ordered by frequency, and the top 3 medications for each group were visualized.

### Result / Visualization:
The result makes sense because, for example, the most commonly used medication among those aged 18-70 was acetaminophen 500 mg oral tablet. This medication is used for reducing fever, which matches the most common symptoms identified in the previous analysis.


In [None]:
from pyspark.sql.functions import col, lit, first
import matplotlib.pyplot as plt
import math

medications_mapping_path = "dbfs:/FileStore/medications_assignment_1.csv"
medications_mapping_df = spark.read.option("header", "true").csv(medications_mapping_path)

medications_mapping_pd = medications_mapping_df.toPandas()
medication_mapping = medications_mapping_pd.groupby("CODE")["DESCRIPTION"].first().to_dict()

# Replace medication codes with descriptions in medications_by_age
medications_by_age_pd = medications_by_age.toPandas()
medications_by_age_pd["medication_description"] = medications_by_age_pd["medication_code"].map(medication_mapping)

age_groups = medications_by_age_pd["age_range"].unique()
n_age_groups = len(age_groups)

n_cols = 1
n_rows = math.ceil(n_age_groups / n_cols)

fig, axes = plt.subplots(n_rows, n_cols, figsize=(18, 10), constrained_layout=True)

# Flatten the axes array for easier indexing
axes = axes.flatten()

for idx, age_group in enumerate(age_groups):
    # Filter data for the current age group and select top 3 medications
    age_group_data = medications_by_age_pd[medications_by_age_pd["age_range"] == age_group].head(3)
    
    axes[idx].barh(age_group_data["medication_description"], age_group_data["count"], color="skyblue")
    axes[idx].set_title(f"Top 3 Medications for Age Group: {age_group}", fontsize=12)
    axes[idx].set_xlabel("Number of Prescriptions", fontsize=10)
    axes[idx].set_ylabel("Medication Name", fontsize=10)
    axes[idx].tick_params(axis='y', labelsize=8) 
    axes[idx].invert_yaxis()

for idx in range(n_age_groups, len(axes)):
    fig.delaxes(axes[idx])

fig.suptitle("Top 3 Medications Prescribed in Each Age Group", fontsize=16)
plt.show()


![image](Debug_and_etc/Top_3_Medications.png)

## Time Spent in Encounters by Age Group

## Analytical Question/Hypothesis:
Do encounter durations vary significantly among different age groups?
Older patients may require longer encounters due to comorbidities or complexity in care.

## Justification:
Understanding time spent in encounters by age group aids in resource planning and allocation in healthcare settings, helps hospital time management during outbreak, ensuring equitable care delivery.

## Spark Code and Methodology:
Encounter durations were aggregated by age range to compute total and average times. The results were visualized using a simple bar chart.

## Result / Visualization:
The result shows a trend that the older the age group, the more time spent in encounters.


In [None]:

parsed_notes_df = parsed_notes_df.withColumn(
    "encounter_datetime",
    unix_timestamp(concat_ws(" ", col("`encounter.date`"), col("`encounter.time`")), "yyyy-MM-dd HH:mm:ss")
)

# If encounter duration is available, use it. Otherwise, simulate a fixed duration.
parsed_notes_df = parsed_notes_df.withColumn(
    "encounter_duration",
    when(col("encounter_datetime").isNotNull(), 30).otherwise(0)  # Replace `30` with actual logic if available
)

# Define age ranges
parsed_notes_df = parsed_notes_df.withColumn(
    "age_range",
    when((col("`demographics.age`") >= 0) & (col("`demographics.age`") <= 5), "0-5")
    .when((col("`demographics.age`") >= 6) & (col("`demographics.age`") <= 10), "6-10")
    .when((col("`demographics.age`") >= 11) & (col("`demographics.age`") <= 17), "11-17")
    .when((col("`demographics.age`") >= 18) & (col("`demographics.age`") <= 30), "18-30")
    .when((col("`demographics.age`") >= 31) & (col("`demographics.age`") <= 50), "31-50")
    .when((col("`demographics.age`") >= 51) & (col("`demographics.age`") <= 70), "51-70")
    .when(col("`demographics.age`") > 70, "71-120")
    .otherwise("Unknown")
)

# Add a numerical index for sorting
parsed_notes_df = parsed_notes_df.withColumn(
    "age_range_order",
    when(col("age_range") == "0-5", 1)
    .when(col("age_range") == "6-10", 2)
    .when(col("age_range") == "11-17", 3)
    .when(col("age_range") == "18-30", 4)
    .when(col("age_range") == "31-50", 5)
    .when(col("age_range") == "51-70", 6)
    .when(col("age_range") == "71-120", 7)
)

# Aggregate durations by age range
encounter_duration_by_age = parsed_notes_df \
    .groupBy("age_range", "age_range_order") \
    .agg(
        round(sum("encounter_duration"), 1).alias("total_time_spent (min)"),
        round(avg("encounter_duration"), 1).alias("average_time_spent (min)")
    ) \
    .orderBy("age_range_order")

# Drop helper column for display
encounter_duration_by_age = encounter_duration_by_age.drop("age_range_order")

# Show the result
encounter_duration_by_age.show()

# Convert to Pandas for visualization
encounter_duration_by_age_pd = encounter_duration_by_age.toPandas()

# Visualization: Bar chart for average time spent
plt.figure(figsize=(12, 6))
plt.bar(encounter_duration_by_age_pd["age_range"], encounter_duration_by_age_pd["average_time_spent (min)"], color="skyblue")
plt.title("Average Time Spent in Encounters by Age Group (Minutes)", fontsize=14)
plt.xlabel("Age Group", fontsize=12)
plt.ylabel("Average Time Spent (Minutes)", fontsize=12)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# +---------+----------------------+------------------------+
# |age_range|total_time_spent (min)|average_time_spent (min)|
# +---------+----------------------+------------------------+
# |      0-5|                   750|                    30.0|
# |     6-10|                  2700|                    28.4|
# |    11-17|                  3480|                    27.6|
# |    18-30|                  8940|                    27.9|
# |    31-50|                 12120|                    27.8|
# |    51-70|                 13170|                    25.9|
# |   71-120|                  6870|                    14.0|
# +---------+----------------------+------------------------+

![image](Debug_and_etc/average_time_spent.png)