<a href="https://colab.research.google.com/github/Jyoshnade/HDS5210_InClass/blob/master/midterm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Mid-term for HDS5210

Your supervisor is concerned about 4-year survival risks for COPD. She has asked for you to do some analysis using a new metric, BODE. BODE is an improvement on a previous metric and promises to provide insight on survival risks.

BODE is defined here. https://www.mdcalc.com/calc/3916/bode-index-copd-survival#evidence

Your assignment is to create a BODE calculation, use it to calculate BODE scores and BODE survival rates for a group of patients. Then we want to evaluate the average BODE scores and BODE survival rates for each area hospital.

Your patient input file will have the following columns:
NAME,SSN,LANGUAGE,JOB,HEIGHT_M,WEIGHT_KG,fev_pct,dyspnea_description,distance_in_meters,hospital

BODE calculations require a BMI value, so you will have to create a function for it.

Your output should be in the form of two CSV files, patient_output.csv and hospital_output.csv.

Patient_output will have the following columns:
NAME,BODE_SCORE,BODE_RISK,HOSPITAL

Hospital output will have the following columns:
HOSPITAL_NAME, COPD_COUNT, PCT_OF_COPD_CASES_OVER_BEDS, AVG_SCORE, AVG_RISK

Each function you create should have documentation and a suitable number of test cases. If the input data could be wrong, make sure to raise a Value Error.

For this assignment, use the doctest, json, and csv libraries. Pandas is not allowed for this assignment.

In [None]:
import doctest
import json
import csv

### Step 1: Calculate BMI

In [None]:
def calculate_bmi(weight_kg, height_m):
    """
    Calculate BMI (Body Mass Index) using weight in kilograms and height in meters.

    >>> calculate_bmi(70, 1.75)
    22.86
    >>> calculate_bmi(50, 1.6)
    19.53
    >>> calculate_bmi(0, 1.75)
    Traceback (most recent call last):
    ...
    ValueError: Weight and height must be positive numbers.
    >>> calculate_bmi(70, -1.75)
    Traceback (most recent call last):
    ...
    ValueError: Weight and height must be positive numbers.

    :param weight_kg: Weight in kilograms
    :param height_m: Height in meters
    :return: Calculated BMI value (float)
    :raises ValueError: If weight or height are non-positive
    """
    if weight_kg <= 0 or height_m <= 0:
        raise ValueError("Weight and height must be positive numbers.")

    bmi = weight_kg / (height_m ** 2)
    return round(bmi, 2)

# Test Cases:
print(calculate_bmi(70, 1.75))
print(calculate_bmi(50, 1.6))

# Test invalid cases
try:
    print(calculate_bmi(70, -1.75))
except ValueError as e:
    print(e)

try:
    print(calculate_bmi(0, 1.75))
except ValueError as e:
    print(e)

22.86
19.53
Weight and height must be positive numbers.
Weight and height must be positive numbers.


### Step 2: Calculate BODE Score

In [None]:
def bode_score(bmi, fev_pct, dyspnea_description, distance_in_meters):
    """
    Calculate BODE score for a patient based on BMI, FEV1%, dyspnea, and 6-minute walk distance.

    >>> bode_score(22, 70, "Only breathless with strenuous exercise", 400)
    0
    >>> bode_score(20, 45, "Walks slower, stops for breath", 200)
    6
    >>> bode_score(18, 30, "Stops for breath after 100 yards or a few minutes on level ground", 100)
    9
    >>> bode_score(21, 36, "Too breathless to leave house or while dressing", 140)
    10
    >>> bode_score(25, 50, "Breathless when hurrying or walking uphill", 320)
    2
    >>> bode_score(21, 50, "Invalid description", 320)
    Traceback (most recent call last):
    ...
    ValueError: Invalid dyspnea description provided.

    :param bmi: Body Mass Index (float)
    :param fev_pct: FEV1 percentage (float)
    :param dyspnea_description: A description of dyspnea (breathlessness level, string)
    :param distance_in_meters: Distance walked in meters during a 6-minute walk test
    :return: BODE score (int)
    :raises ValueError: If input data is invalid (e.g., incorrect dyspnea description or unreasonable values)
    """

    # BMI score
    if bmi > 21:
        bmi_score = 0
    else:
        bmi_score = 1

    # FEV1 score
    if fev_pct >= 65:
        fev_score = 0
    elif fev_pct >= 50:
        fev_score = 1
    elif fev_pct >= 36:
        fev_score = 2
    elif fev_pct < 36:
        fev_score = 3
    else:
        raise ValueError("Invalid FEV1 percentage.")

    # Dyspnea score based on description
    dyspnea_mapping = {
        "Only breathless with strenuous exercise": 0,
        "Breathless when hurrying or walking uphill": 1,
        "Walks slower, stops for breath": 2,
        "Stops for breath after 100 yards or a few minutes on level ground": 3,
        "Too breathless to leave house or while dressing": 4
    }

    if dyspnea_description not in dyspnea_mapping:
        raise ValueError("Invalid dyspnea description provided.")

    dyspnea_score = dyspnea_mapping[dyspnea_description]

    # 6-minute walk distance score
    if distance_in_meters >= 350:
        distance_score = 0
    elif distance_in_meters >= 250:
        distance_score = 1
    elif distance_in_meters >= 150:
        distance_score = 2
    elif distance_in_meters < 150:
        distance_score = 3
    else:
        raise ValueError("Invalid 6-minute walk distance.")

    # Total BODE score
    return bmi_score + fev_score + dyspnea_score + distance_score

# Test Cases
print(bode_score(22, 70, "Only breathless with strenuous exercise", 400))
print(bode_score(20, 45, "Walks slower, stops for breath", 200))
print(bode_score(18, 30, "Stops for breath after 100 yards or a few minutes on level ground", 100))
print(bode_score(21, 36, "Too breathless to leave house or while dressing", 140))
print(bode_score(25, 50, "Breathless when hurrying or walking uphill", 320))

0
7
10
10
3


### Step 3: Calculate BODE Risk

In [None]:
def bode_risk(score):
    """
    Map a BODE score to a 4-year survival risk percentage based on predefined ranges.

    >>> bode_risk(0)
    80
    >>> bode_risk(4)
    67
    >>> bode_risk(6)
    57
    >>> bode_risk(9)
    18
    >>> bode_risk(11)
    Traceback (most recent call last):
    ...
    ValueError: BODE score must be between 0 and 10.

    :param score: BODE score (int), must be between 0 and 10 inclusive
    :return: 4-year survival risk percentage (int)
    :raises ValueError: If the score is outside the valid range (0–10)
    """
    if not (0 <= score <= 10):
        raise ValueError("BODE score must be between 0 and 10.")

    # Map score to risk percentage
    if 0 <= score <= 2:
        return 80
    elif 3 <= score <= 4:
        return 67
    elif 5 <= score <= 6:
        return 57
    elif 7 <= score <= 10:
        return 18

# Test Cases
print(bode_risk(0))  # Expected: 80
print(bode_risk(4))  # Expected: 67
print(bode_risk(6))  # Expected: 57
print(bode_risk(9))  # Expected: 18

# Test invalid case

80
67
57
18


### Step 4: Load Hospital Data

In [None]:
import csv
import json

# Load patient data
def load_patient_data(file_path):
    """
    Load patient data from a CSV file.

    :param file_path: Path to the CSV file
    :return: List of dictionaries containing patient data
    :raises ValueError: If required columns are missing or there are errors in the data
    :raises FileNotFoundError: If the file cannot be found

    Example:
    >>> load_patient_data('patients.csv')  # Load patient data from a CSV file
    [{'NAME': 'John Doe', 'HEIGHT_M': 1.75, 'WEIGHT_KG': 70.0, ...}]
    """
    required_columns = {'NAME', 'HEIGHT_M', 'WEIGHT_KG', 'fev_pct', 'dyspnea_description', 'distance_in_meters', 'hospital'}
    patient_data = []

    try:
        with open(file_path, mode='r') as file:
            reader = csv.DictReader(file)

            # Check for missing columns
            missing_cols = required_columns - set(reader.fieldnames)
            if missing_cols:
                raise ValueError(f"Missing columns: {missing_cols}")

            for row in reader:
                try:
                    patient_data.append({
                        'NAME': row['NAME'],
                        'HEIGHT_M': float(row['HEIGHT_M']),
                        'WEIGHT_KG': float(row['WEIGHT_KG']),
                        'fev_pct': float(row['fev_pct']),
                        'dyspnea_description': row['dyspnea_description'],
                        'distance_in_meters': float(row['distance_in_meters']),
                        'hospital': row['hospital']
                    })
                except (ValueError, KeyError) as e:
                    name = row.get('NAME', 'Unknown')
                    print(f"Skipping {name} due to error: {e}")
    except FileNotFoundError:
        raise FileNotFoundError(f"The file at {file_path} could not be found.")
    except IOError as e:
        raise IOError(f"Error reading file {file_path}: {e}")

    return patient_data

# Load patient data
try:
    patients = load_patient_data('/content/patient.csv')
    print(f"Loaded {len(patients)} patients from '/content/patient.csv'.")
except ValueError as e:
    print(f"Error loading patient data: {e}")
except FileNotFoundError as e:
    print(f"File error: {e}")


# Load hospital data
def load_hospital_info(json_path):
    """
    Load hospital information from a JSON file.

    :param json_path: Path to the JSON file
    :return: Dictionary containing hospital information
    :raises ValueError: If the JSON is malformed
    :raises FileNotFoundError: If the file cannot be found

    Example:
    >>> load_hospital_info('hospitals.json')  # Load hospital data
    {'Hospital A': {'beds': 100, 'location': 'City A'}, ...}
    """
    try:
        with open(json_path, 'r') as json_file:
            return json.load(json_file)
    except FileNotFoundError:
        raise FileNotFoundError(f"The file at {json_path} could not be found.")
    except json.JSONDecodeError as e:
        raise ValueError(f"Error decoding JSON: {e}")
    except IOError as e:
        raise IOError(f"Error reading file {json_path}: {e}")

# Load hospital data
try:
    hospitals = load_hospital_info('/content/hospitals (2).json')
    print(f"Loaded {len(hospitals)} hospitals from '/content/hospitals (2).json'.")
except ValueError as e:
    print(f"Error loading hospital data: {e}")
except FileNotFoundError as e:
    print(f"File error: {e}")

File error: The file at /content/patient.csv could not be found.
File error: The file at /content/hospitals (2).json could not be found.


### Step 5: Main business logic

Call BODE Score, BODE Risk functions for each patient.

For each hospital, calculate Avg BODE score and Avg BODE risk and count the number of cases for each hospital.

In [None]:
import csv
import json

# Step 1: Define file paths
patient_csv = "/content/patient.csv"
hospital_json = "/content/hospitals (2).json"

# Step 2: Define a function to calculate the BODE Score and Risk
def calculate_bmi(weight_kg, height_m):
    """
    Calculate BMI (Body Mass Index) using weight in kilograms and height in meters.

    :param weight_kg: Weight in kilograms
    :param height_m: Height in meters
    :return: BMI (float)
    :raises ValueError: If weight or height are non-positive
    """
    if weight_kg <= 0 or height_m <= 0:
        raise ValueError("Weight and height must be positive numbers.")
    return weight_kg / (height_m ** 2)


def calculate_bode_score(patient):
    """
    Calculate the BODE score and risk for a patient based on BMI, FEV1%, dyspnea, and distance.

    :param patient: A dictionary containing the patient's data.
    :return: BODE score (int), BODE risk (int)
    """
    bmi = calculate_bmi(patient['WEIGHT_KG'], patient['HEIGHT_M'])

    # BMI score
    bmi_score = 0 if bmi > 21 else 1

    # FEV1% score
    fev_pct = patient['fev_pct']
    if fev_pct >= 65:
        fev_score = 0
    elif fev_pct >= 50:
        fev_score = 1
    elif fev_pct >= 36:
        fev_score = 2
    else:
        fev_score = 3

    # Dyspnea score
    dyspnea_mapping = {
        "Only breathless with strenuous exercise": 0,
        "Breathless when hurrying or walking uphill": 1,
        "Walks slower, stops for breath": 2,
        "Stops for breath after 100 yards or a few minutes on level ground": 3,
        "Too breathless to leave house or while dressing": 4
    }
    dyspnea_score = dyspnea_mapping.get(patient['dyspnea_description'], 0)

    # Walk distance score
    distance = patient['distance_in_meters']
    if distance >= 350:
        distance_score = 0
    elif distance >= 250:
        distance_score = 1
    elif distance >= 150:
        distance_score = 2
    else:
        distance_score = 3

    # Calculate total BODE score
    bode_score = bmi_score + fev_score + dyspnea_score + distance_score

    # Calculate risk based on BODE score
    if 0 <= bode_score <= 2:
        bode_risk = 80
    elif 3 <= bode_score <= 4:
        bode_risk = 67
    elif 5 <= bode_score <= 6:
        bode_risk = 57
    else:
        bode_risk = 18

    return bode_score, bode_risk

# Step 3: Load the patient data from CSV
def load_patient_data(filename):
    """
    Load patient data from a CSV file.

    :param filename: The file path to the CSV file
    :return: A list of dictionaries containing patient data
    :raises ValueError: If the CSV file contains missing or invalid data
    """
    patient_data = []
    with open(filename, mode='r') as file:
        csv_reader = csv.DictReader(file)

        # Print the headers (column names)
        headers = csv_reader.fieldnames
        print("CSV Headers:", headers)

        required_columns = ['NAME', 'HEIGHT_M', 'WEIGHT_KG', 'fev_pct', 'dyspnea_description', 'distance_in_meters', 'hospital']
        for column in required_columns:
            if column not in headers:
                raise ValueError(f"Missing required column: {column}")

        for row in csv_reader:
            row['HEIGHT_M'] = float(row['HEIGHT_M'])
            row['WEIGHT_KG'] = float(row['WEIGHT_KG'])
            row['fev_pct'] = float(row['fev_pct'])
            row['distance_in_meters'] = float(row['distance_in_meters'])
            patient_data.append(row)

    return patient_data

# Step 4: Load the hospital data from JSON
def load_hospital_data(filename):
    """
    Load hospital information from a JSON file.

    :param filename: Path to the JSON file
    :return: Dictionary of hospital data
    """
    with open(filename, mode='r') as file:
        hospital_data = json.load(file)
    return hospital_data

# Step 5: Process the patients and calculate BODE Scores and Risks
def process_patients_and_hospitals(patient_data, hospital_data):
    """
    Process patient data to calculate BODE scores and group results by hospital.

    :param patient_data: List of patients loaded from the CSV
    :param hospital_data: Hospital information from JSON
    :return: Patient results, Hospital averages
    """
    patient_results = []
    hospital_aggregates = {}

    # Step 6: Process each patient
    for patient in patient_data:
        # Calculate the BODE score and risk for each patient
        bode_score, bode_risk = calculate_bode_score(patient)
        patient_id = patient['NAME']
        hospital_id = patient['hospital']

        # Store patient result
        patient_results.append([patient_id, hospital_id, bode_score, bode_risk])

        # Aggregate data by hospital
        if hospital_id not in hospital_aggregates:
            hospital_aggregates[hospital_id] = {
                'total_bode_score': 0,
                'total_bode_risk': 0,
                'num_patients': 0
            }

        hospital_aggregates[hospital_id]['total_bode_score'] += bode_score
        hospital_aggregates[hospital_id]['total_bode_risk'] += bode_risk
        hospital_aggregates[hospital_id]['num_patients'] += 1

    # Step 7: Calculate the averages for each hospital
    hospital_output_list = []
    for hospital_id, aggregates in hospital_aggregates.items():
        avg_bode_score = aggregates['total_bode_score'] / aggregates['num_patients']
        avg_bode_risk = aggregates['total_bode_risk'] / aggregates['num_patients']
        hospital_output_list.append([hospital_id, avg_bode_score, avg_bode_risk, aggregates['num_patients']])

    return patient_results, hospital_output_list

# Step 8: Write the results to CSV files
def write_csv(filename, data, headers=None):
    """
    Write data to a CSV file.

    :param filename: Path to the output CSV file
    :param data: Data to be written (list of rows)
    :param headers: Optional list of headers for the CSV
    """
    with open(filename, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        if headers:
            writer.writerow(headers)
        writer.writerows(data)

# Step 9: Load data, process it, and save the results
patient_data = load_patient_data(patient_csv)
hospital_data = load_hospital_data(hospital_json)

# Process the data and get the results
patient_results, hospital_output_list = process_patients_and_hospitals(patient_data, hospital_data)

# Write the patient and hospital results to their respective CSV files
write_csv("patient_output.csv", patient_results, headers=["PATIENT_NAME", "HOSPITAL", "BODE_SCORE", "BODE_RISK"])
write_csv("hospital_output.csv", hospital_output_list, headers=["HOSPITAL", "AVG_BODE_SCORE", "AVG_BODE_RISK", "NUM_PATIENTS"])

# Output for verification
print("Patient Results:")
for row in patient_results[:5]:
    print(row)

print("\nHospital Results:")
for row in hospital_output_list[:5]:
    print(row)

FileNotFoundError: [Errno 2] No such file or directory: '/content/patient.csv'