## Installing Dependencies & utilize libraries neccessary

In [2]:
import json
import requests
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
import os
from datetime import datetime
import re
import uuid
from urllib.parse import quote
import urllib.parse
import time
import logging
from concurrent.futures import ThreadPoolExecutor
import sqlite3
import pandas as pd
from sqlalchemy import create_engine
import logging


Task 1 - Loading FHIR data from disk storage/source system

In [3]:
# Define the folder path for FHIR JSON files
folder_path = "/home/omarattia/Take-Home Assesment/patients_fhir_100"

# Load all JSON files from the folder
all_entries = []

Task 2 - Iterate and loop over all JSON files in the folder. Then showcase how many entries were made. Pre-defining the LOG file.

In [4]:
for filename in os.listdir(folder_path):
    if filename.endswith('.json'):
        file_path = os.path.join(folder_path, filename)
        with open(file_path, 'r') as file:
            data = json.load(file)
            if 'entry' in data:  # Ensure 'entry' key exists
                all_entries.extend(data['entry'])

print(f"Loaded {len(all_entries)} entries.")

log_file_path = "/home/omarattia/Take-Home Assesment/FHIR_DWH_logs/At-Home_Assesment_ETL.log"

# Ensure the directory exists
os.makedirs(os.path.dirname(log_file_path), exist_ok=True)

logging.basicConfig(
    level=logging.DEBUG,  # Set the logging level to capture all log levels
    format='%(asctime)s - %(levelname)s - %(message)s',  # Format with timestamps and levels
    handlers=[
        logging.FileHandler(log_file_path),  # Write logs to the file
        logging.StreamHandler()  # Also output logs to the console
    ])


logging.info(f"DATA EXTRACTION 1: Loaded {len(all_entries)} entries.")


2024-11-16 20:24:14,606 - INFO - DATA EXTRACTION 1: Loaded 230394 entries.


Loaded 230394 entries.


Task 3 (Patient Information Dimension) - Parsing neccessary data from existing JSON files, based on the business requirements and KPIs. Chronolgically transforming data into user-friendly Dataframes.

In [5]:
# Helper function to remove numbers
def remove_numbers(name):
    return ''.join([i for i in name if not i.isdigit()])

def calculate_age(birth_date):
    if birth_date == "Unknown":
        return "Unknown"
    birth_year = datetime.strptime(birth_date, "%Y-%m-%d").year
    current_year = datetime.now().year
    return current_year - birth_year

# Extract patient information from all entries
patients = []
logging.info("DATA EXTRACTION 2.1: Starting patient data extraction.")
for entry in all_entries:
    resource = entry.get("resource", {})
    
    if resource.get("resourceType") == "Patient":
        name_data = resource.get("name", [{}])[0]
        address_data = resource.get("address", [{}])[0]
        contact_data = resource.get("telecom", [{}])
        identifier_data = resource.get("identifier", [])
        
        # Process first, middle, and last name
        first_name = " ".join([remove_numbers(name) for name in name_data.get("given", ["Unknown"])])
        last_name = remove_numbers(name_data.get("family", "Unknown"))
        middle_name = " ".join([remove_numbers(name) for name in name_data.get("given", [])[1:]])  # Middle name is anything after the first given name
        
        # If middle name exists, include it
        full_name = first_name
        if middle_name:
            full_name = f"{first_name} {middle_name}"

        # Extract the identifiers
        ssn = next((i.get("value") for i in identifier_data if i.get("type", {}).get("coding", [{}])[0].get("code") == "SS"), "Unknown")
        mrn = next((i.get("value") for i in identifier_data if i.get("type", {}).get("coding", [{}])[0].get("code") == "MR"), "Unknown")
        dl_number = next((i.get("value") for i in identifier_data if i.get("type", {}).get("coding", [{}])[0].get("code") == "DL"), "Unknown")
        passport = next((i.get("value") for i in identifier_data if i.get("type", {}).get("coding", [{}])[0].get("code") == "PPN"), "Unknown")
        
        # Process phone number (MSISDN)
        msisdn = next((c.get("value") for c in contact_data if c.get("system") == "phone"), "Unknown")

        patients.append({
            "PATIENT_ID": resource.get("id", "Unknown"),
            "FIRST_NAME": first_name,
            "MIDDLE_NAME": middle_name if middle_name else "Unknown",
            "LAST_NAME": last_name,
            "SSN": ssn,
            "MR_NUMBER": mrn,
            "DL_NUMBER": dl_number,  # Added Driver's License Number
            "PASSPORT_NUMBER": passport,  # Added Passport Number
            "GENDER": resource.get("gender", "Unknown"),
            "BIRTH_DATE": resource.get("birthDate", "Unknown"),
            "AGE": calculate_age(resource.get("birthDate")),
            "PATIENT_ADDRESS": " ".join(address_data.get("line", [])),
            "CITY": address_data.get("city", "Unknown"),
            "POSTAL_CODE": address_data.get("postalCode", "Unknown"),
            "COUNTRY": address_data.get("country", "Unknown"),
            "MSISDN": msisdn,
            "MARITAL_STATUS": resource.get("maritalStatus", {}).get("text", "Unknown")
        })

logging.info(f"DATA EXTRACTION 2.2: Extracted data for {len(patients)} patients.")

# Create the DataFrame from the patients list
patients_information_df = pd.DataFrame(patients)

logging.info(f"DATA TRANSFORMATION: Encounter DataFrame created with {patients_information_df.shape[0]} records.")

# Save the DataFrame to a CSV file
csv_file_path = "patient_infromation_dimension.csv"  # Specify your desired file path
patients_information_df.to_csv(csv_file_path, index=False)

# Show confirmation message
logging.info(f"DATA LOADING: Encounter event data has been saved to {csv_file_path}.")
print(f"Encounter event data has been saved to {csv_file_path}")

2024-11-16 20:24:14,659 - INFO - DATA EXTRACTION 2.1: Starting patient data extraction.
2024-11-16 20:24:14,840 - INFO - DATA EXTRACTION 2.2: Extracted data for 100 patients.
2024-11-16 20:24:14,844 - INFO - DATA TRANSFORMATION: Encounter DataFrame created with 100 records.
2024-11-16 20:24:14,900 - INFO - DATA LOADING: Encounter event data has been saved to patient_infromation_dimension.csv.


Encounter event data has been saved to patient_infromation_dimension.csv


Task 4 (Encounter Events Dimension) - Parsing neccessary data from existing JSON files, based on the business requirements and KPIs. Chronolgically transforming data into user-friendly Dataframes.

In [6]:
# Extract encounter information from all entries and transform it into the desired event format
encounter_events = []
logging.info("DATA EXTRACTION 3.1: Starting encounter data extraction.")
for entry in all_entries:
    resource = entry["resource"]
    resource_type = resource["resourceType"]
    
    # Process only "Encounter" type resources
    if resource_type == "Encounter":
        event_id = resource["id"]

        # Extract start and end date for the event, defaulting to "Unknown" if not found
        start_date = resource.get("period", {}).get("start", "Unknown")
        end_date = resource.get("period", {}).get("end", "Unknown")

        # If no period info, look for other relevant date info (like effective date, recorded date)
        if start_date == "Unknown" and "effectiveDateTime" in resource:
            start_date = resource["effectiveDateTime"]
        if end_date == "Unknown" and "issued" in resource:
            end_date = resource["issued"]

        # Related events (if available), referencing Encounter, Condition, etc.
        related_events = resource.get("subject", {}).get("reference", "Unknown")

        # Remove the "urn:uuid:" prefix from the Patient ID (i.e., extract only the UUID)
        patient_id = related_events.replace("urn:uuid:", "") if "urn:uuid:" in related_events else related_events

        # Encounter specific fields
        encounter_status = resource.get("status", "Unknown").title()
        encounter_type = resource.get("type", [{}])[0].get("text", "Unknown").title()

        # Extract location
        location = resource.get("location", [{}])[0].get("location", {}).get("display", "Unknown").title()

        # Collect event data for Encounter
        event_data = {
            "EVENT_TYPE": resource_type.title(),
            "EVENT_ID": event_id,
            "START_DATETIME": start_date,
            "END_DATETIME": end_date,
            "PATIENT_ID": patient_id,  # Removed the urn:uuid: prefix
            "ENCOUNTER_STATUS": encounter_status,
            "ENCOUNTER_TYPE": encounter_type,
            "LOCATION": location
        }

        # Add the event data to the list
        encounter_events.append(event_data)

logging.info(f"DATA EXTRACTION 3.2: Extracted {len(encounter_events)} encounter events.")

# Create a DataFrame from the encounter event data
events_encounters_df = pd.DataFrame(encounter_events)

logging.info(f"DATA TRANSFORMATION: Encounter DataFrame created with {events_encounters_df.shape[0]} records.")

# Save the DataFrame to a CSV file
csv_file_path = "events_encounters.csv"  # Specify your desired file path
events_encounters_df.to_csv(csv_file_path, index=False)

# Show confirmation message
logging.info(f"DATA LOADING: Encounter event data has been saved to {csv_file_path}.")
print(f"Encounter event data has been saved to {csv_file_path}")

2024-11-16 20:24:14,916 - INFO - DATA EXTRACTION 3.1: Starting encounter data extraction.
2024-11-16 20:24:15,121 - INFO - DATA EXTRACTION 3.2: Extracted 9868 encounter events.
2024-11-16 20:24:15,141 - INFO - DATA TRANSFORMATION: Encounter DataFrame created with 9868 records.
2024-11-16 20:24:15,217 - INFO - DATA LOADING: Encounter event data has been saved to events_encounters.csv.


Encounter event data has been saved to events_encounters.csv


Task 5 (Dignosis Report Events Dimension) - Parsing neccessary data from existing JSON files, based on the business requirements and KPIs. Chronolgically transforming data into user-friendly Dataframes.

In [7]:
# Helper function to remove numbers from names and clean them up
def remove_numbers(name):
    return ''.join([i for i in name if not i.isdigit()]).strip().title()

# Extract diagnostic report information from all entries and transform it into the desired format
diagnostic_report_events = []
logging.info("DATA EXTRACTION 4.1: Starting diagnostic report data extraction.")

for entry in all_entries:
    resource = entry["resource"]
    resource_type = resource["resourceType"]
    
    # Process only "DiagnosticReport" type resources
    if resource_type == "DiagnosticReport":
        event_id = resource["id"]

        # Extract start and end date for the event, defaulting to "Unknown" if not found
        start_date = resource.get("effectiveDateTime", "Unknown")
        end_date = resource.get("issued", "Unknown")

        # Related events (if available), referencing DiagnosticReport, Condition, etc.
        related_events = resource.get("subject", {}).get("reference", "Unknown")

        # Remove the "urn:uuid:" prefix from the Patient ID (i.e., extract only the UUID)
        patient_id = related_events.replace("urn:uuid:", "") if "urn:uuid:" in related_events else related_events

        # DiagnosticReport specific fields
        status = resource.get("status", "Unknown").title()
        category = ", ".join([cat.get("coding", [{}])[0].get("display", "Unknown").title() for cat in resource.get("category", [])])

        # Extract practitioner/performer and remove numbers from their names
        performer_list = resource.get("performer", [])
        practitioner_performer = ", ".join([remove_numbers(pr.get("display", "Unknown")) for pr in performer_list])

        # Collect event data for DiagnosticReport
        event_data = {
            "EVENT_TYPE": resource_type.title(),
            "EVENT_ID": event_id,
            "START_DATETIME": start_date,
            "END_DATETIME": end_date,
            "PATIENT_ID": patient_id,  # Removed the urn:uuid: prefix
            "STATUS": status,
            "CATEGORY": category,
            "PRACTITIONER_PERFORMER": practitioner_performer
        }

        # Add the event data to the list
        diagnostic_report_events.append(event_data)

logging.info(f"DATA EXTRACTION 4.2: Extracted {len(diagnostic_report_events)} diagnostic report events.")

# Create a DataFrame from the diagnostic report event data
events_diagnostic_report_df = pd.DataFrame(diagnostic_report_events)

logging.info(f"DATA TRANSFORMATION: Diagnostic report DataFrame created with {events_diagnostic_report_df.shape[0]} records.")

# Specifically fill missing values in "PRACTITIONER_PERFORMER" and "CATEGORY" columns with "Unknown"
events_diagnostic_report_df['PRACTITIONER_PERFORMER'].replace("", "Unknown", inplace=True)
events_diagnostic_report_df['CATEGORY'].replace("", "Unknown", inplace=True)
events_diagnostic_report_df.fillna({"PRACTITIONER_PERFORMER": "Unknown", "CATEGORY": "Unknown"}, inplace=True)

# Save the DataFrame to a CSV file
csv_file_path = "events_diagnostic_report.csv"  # Specify your desired file path
events_diagnostic_report_df.to_csv(csv_file_path, index=False)

# Show confirmation message
logging.info(f"DATA LOADING: Diagnostic report event data has been saved to {csv_file_path}.")
print(f"DiagnosticReport event data has been saved to {csv_file_path}")


2024-11-16 20:24:15,235 - INFO - DATA EXTRACTION 4.1: Starting diagnostic report data extraction.
2024-11-16 20:24:15,574 - INFO - DATA EXTRACTION 4.2: Extracted 20149 diagnostic report events.
2024-11-16 20:24:15,606 - INFO - DATA TRANSFORMATION: Diagnostic report DataFrame created with 20149 records.
2024-11-16 20:24:15,796 - INFO - DATA LOADING: Diagnostic report event data has been saved to events_diagnostic_report.csv.


DiagnosticReport event data has been saved to events_diagnostic_report.csv


Task 6 (Condition Events Dimension) - Parsing neccessary data from existing JSON files, based on the business requirements and KPIs. Chronolgically transforming data into user-friendly Dataframes.

In [8]:
# Helper function to remove the "urn:uuid:" prefix
def remove_prefix(value):
    return value.replace("urn:uuid:", "") if "urn:uuid:" in value else value

# Map condition status to boolean values
def map_condition_status(status):
    status_lower = status.lower()
    if status_lower == "active":
        return 0  # Active is 0
    elif status_lower == "resolved":
        return 1  # Resolved is 1
    return None  # For unknown statuses

# Extract condition information from all entries and transform it into the desired format
condition_events = []
logging.info("DATA EXTRACTION 5.1: Starting condition data extraction.")

for entry in all_entries:
    resource = entry["resource"]
    resource_type = resource["resourceType"]
    
    # Process only "Condition" type resources
    if resource_type == "Condition":
        event_id = resource["id"]

        # Related events (Patient ID), remove "urn:uuid:" prefix
        related_events = resource.get("subject", {}).get("reference", "Unknown")
        patient_id = remove_prefix(related_events)

        # Condition specific fields
        condition_status = resource.get("clinicalStatus", {}).get("coding", [{}])[0].get("code", "Unknown")
        condition_boolean = map_condition_status(condition_status)
        medical_diagnosis = resource.get("code", {}).get("text", "Unknown")
        condition_onset = resource.get("onsetDateTime", "Unknown")
        condition_record_date = resource.get("recordedDate", "Unknown")

        # Collect event data for Condition
        event_data = {
            "EVENT_TYPE": resource_type.title(),
            "EVENT_ID": event_id,
            "PATIENT_ID": patient_id,  # Removed the urn:uuid: prefix
            "CONDITION": condition_boolean,  # Boolean for condition status
            "MEDICAL_DIAGNOSIS": medical_diagnosis,
            "CONDITION_ONSET": condition_onset,
            "CONDITION_RECORD_DATE": condition_record_date
        }

        # Add the event data to the list
        condition_events.append(event_data)

logging.info(f"DATA EXTRACTION 5.2: Extracted {len(condition_events)} condition events.")

# Create a DataFrame from the condition event data
events_conditions_df = pd.DataFrame(condition_events)

logging.info(f"DATA TRANSFORMATION: Condition DataFrame created with {events_conditions_df.shape[0]} records.")

# Save the DataFrame to a CSV file
csv_file_path = "events_conditions.csv"  # Specify your desired file path
events_conditions_df.to_csv(csv_file_path, index=False)

# Show confirmation message
logging.info(f"DATA LOADING: Condition event data has been saved to {csv_file_path}.")
print(f"Condition event data has been saved to {csv_file_path}")


2024-11-16 20:24:15,815 - INFO - DATA EXTRACTION 5.1: Starting condition data extraction.
2024-11-16 20:24:15,990 - INFO - DATA EXTRACTION 5.2: Extracted 4892 condition events.
2024-11-16 20:24:16,004 - INFO - DATA TRANSFORMATION: Condition DataFrame created with 4892 records.
2024-11-16 20:24:16,050 - INFO - DATA LOADING: Condition event data has been saved to events_conditions.csv.


Condition event data has been saved to events_conditions.csv


Task 7 (Claims, EOBs, and Medicine Requests Dimensions) - Parsing neccessary data from existing JSON files, based on the business requirements and KPIs. Chronolgically transforming data into user-friendly Dataframes.

In [9]:
# Helper function to remove "urn:uuid:" prefix
def remove_prefix(value):
    return value.replace("urn:uuid:", "") if "urn:uuid:" in value else value

# Helper function to clean up prescriber names by removing numbers and symbols
def clean_prescriber_name(name):
    clean_name = re.sub(r'[^a-zA-Z\s]', '', name).strip()
    return clean_name.title()

# Initialize lists to store dimension data
medication_data = []
claim_data = []
eob_data = []

# Log the start of the extraction process
logging.info("DATA EXTRACTION 6.1: Starting data extraction from all entries.")

# Iterate through all entries in the JSON data
for entry in all_entries:
    resource = entry["resource"]
    resource_type = resource["resourceType"]

    # Process MedicationRequest
    if resource_type == "MedicationRequest":
        medication_name = resource.get("medicationCodeableConcept", {}).get("text", "No Medication Prescribed")  # Replace Unknown
        dosage = resource.get("dosageInstruction", [{}])[0].get("doseAndRate", [{}])[0].get("doseQuantity", {}).get("value", "Unknown")
        frequency = resource.get("dosageInstruction", [{}])[0].get("timing", {}).get("repeat", {}).get("frequency", "Unknown")
        period_unit = resource.get("dosageInstruction", [{}])[0].get("timing", {}).get("repeat", {}).get("periodUnit", "Unknown")
        patient_id = remove_prefix(resource.get("subject", {}).get("reference", "Unknown"))
        prescriber = clean_prescriber_name(resource.get("requester", {}).get("display", "Unknown"))  # Clean prescriber name
        reason = resource.get("reasonReference", [{}])[0].get("display", "Unknown")

        medication_data.append({
            "MEDICATION_NAME": medication_name,
            "DOSAGE": f"{dosage} {frequency} per {period_unit}" if dosage != "Unknown" else "Unknown",
            "PATIENT_ID": patient_id,
            "PRESCRIBER": prescriber,
            "REASON": reason
        })

    # Process Claim
    elif resource_type == "Claim":
        claim_id = resource["id"]
        status = resource.get("status", "Unknown").title()
        billable_period_start = resource.get("billablePeriod", {}).get("start", "Unknown")
        billable_period_end = resource.get("billablePeriod", {}).get("end", "Unknown")
        cost = resource.get("total", {}).get("value", "Unknown")
        currency = resource.get("total", {}).get("currency", "Unknown")
        insurance = resource.get("insurance", [{}])[0].get("coverage", {}).get("display", "Unknown")
        patient_id = remove_prefix(resource.get("patient", {}).get("reference", "Unknown"))
        prescription_id = remove_prefix(resource.get("prescription", {}).get("reference", "Unknown"))

        claim_data.append({
            "CLAIM_ID": claim_id,
            "STATUS": status,
            "BILLABLE_PERIOD_START": billable_period_start,
            "BILLABLE_PERIOD_END": billable_period_end,
            "COST": cost,
            "CURRENCY": currency,
            "INSURANCE": insurance,
            "PATIENT_ID": patient_id,
            "PRESCRIPTION_ID": prescription_id
        })

    # Process ExplanationOfBenefit
    elif resource_type == "ExplanationOfBenefit":
        eob_id = resource["id"]
        status = resource.get("status", "Unknown").title()
        billable_period_start = resource.get("billablePeriod", {}).get("start", "Unknown")
        billable_period_end = resource.get("billablePeriod", {}).get("end", "Unknown")
        total_amount = resource.get("total", [{}])[0].get("amount", {}).get("value", "Unknown")
        currency = resource.get("total", [{}])[0].get("amount", {}).get("currency", "Unknown")
        insurer = resource.get("insurer", {}).get("display", "Unknown")
        facility = resource.get("facility", {}).get("display", "Unknown")
        patient_id = remove_prefix(resource.get("patient", {}).get("reference", "Unknown"))

        eob_data.append({
            "EOB_ID": eob_id,
            "STATUS": status,
            "BILLABLE_PERIOD_START": billable_period_start,
            "BILLABLE_PERIOD_END": billable_period_end,
            "TOTAL_AMOUNT": total_amount,
            "CURRENCY": currency,
            "INSURER": insurer,
            "FACILITY": facility,
            "PATIENT_ID": patient_id
        })

# Log the end of the extraction
logging.info(f"DATA EXTRACTION 6.2: Loaded {len(all_entries)} entries and extracted data.")

# Convert the lists to DataFrames
medication_prescibed_df = pd.DataFrame(medication_data)
claim_df = pd.DataFrame(claim_data)
eob_df = pd.DataFrame(eob_data)

# Log the creation of the DataFrames
logging.info("DATA TRANSFORMATION: DataFrames created for Medication, Claim, and ExplanationOfBenefit.")

# Add a unique ID column for medication names
medication_prescibed_df['MEDICATION_ID'] = medication_prescibed_df['MEDICATION_NAME'].apply(lambda x: hash(x))

# Log the creation of MEDICATION_ID
logging.info("DATA TRANSFORMATION 2: MEDICATION_ID added for each medication.")

# Save the DataFrames to CSV files
medication_prescibed_df.to_csv("medication_prescibed.csv", index=False)
claim_df.to_csv("claim_dimension.csv", index=False)
eob_df.to_csv("eob_dimension.csv", index=False)

# Log the completion of the data loading process
logging.info("DATA LOADING: Data has been saved to CSV files.")

# Confirmation messages
print("Medication dimension saved to 'medication_prescibed.csv'")
print("Claim dimension saved to 'claim_dimension.csv'")
print("ExplanationOfBenefit dimension saved to 'eob_dimension.csv'")

# Filter out rows with "No Medication Prescribed"
medication_df = medication_prescibed_df[medication_prescibed_df['MEDICATION_NAME'] != "No Medication Prescribed"]

# Remove duplicate medication names
medication_df = medication_df.drop_duplicates(subset=["MEDICATION_NAME"])

# Log after filtering duplicates
logging.info("DATA CLEANING: Removed rows with 'No Medication Prescribed' and duplicates.")



2024-11-16 20:24:16,138 - INFO - DATA EXTRACTION 6.1: Starting data extraction from all entries.
2024-11-16 20:24:16,631 - INFO - DATA EXTRACTION 6.2: Loaded 230394 entries and extracted data.
2024-11-16 20:24:16,809 - INFO - DATA TRANSFORMATION: DataFrames created for Medication, Claim, and ExplanationOfBenefit.
2024-11-16 20:24:16,817 - INFO - DATA TRANSFORMATION 2: MEDICATION_ID added for each medication.
2024-11-16 20:24:17,194 - INFO - DATA LOADING: Data has been saved to CSV files.
2024-11-16 20:24:17,237 - INFO - DATA CLEANING: Removed rows with 'No Medication Prescribed' and duplicates.


Medication dimension saved to 'medication_prescibed.csv'
Claim dimension saved to 'claim_dimension.csv'
ExplanationOfBenefit dimension saved to 'eob_dimension.csv'


Task 8 (Medicine Descriptive Dimension) - Parsing neccessary data from FDA Drug Label API and transforming the data.

In [10]:
# Helper function to remove "urn:uuid:" prefix
def remove_prefix(value):
    return value.replace("urn:uuid:", "") if "urn:uuid:" in value else value

# Helper function to clean up prescriber names by removing numbers and symbols
def clean_prescriber_name(name):
    clean_name = re.sub(r'[^a-zA-Z\s]', '', name).strip()
    return clean_name.title()

# Function to query the FDA Drug Label API using the EXTRACTED_MED_NAME
def query_fda_api(medication_name):
    base_url = "https://api.fda.gov/drug/label.json"
    params = {"search": f"openfda.brand_name:{medication_name}", "limit": 1}
    try:
        response = requests.get(base_url, params=params, timeout=10)
        if response.status_code == 200:
            results = response.json()
            if results.get("results"):
                fda_details = results["results"][0]
                return {
                    "BRAND_NAME": truncate_value(", ".join(fda_details.get("openfda", {}).get("brand_name", ["Unknown"]))),
                    "GENERIC_NAME": truncate_value(", ".join(fda_details.get("openfda", {}).get("generic_name", ["Unknown"]))),
                    "MANUFACTURER": truncate_value(", ".join(fda_details.get("openfda", {}).get("manufacturer_name", ["Unknown"]))),
                    "ACTIVE_INGREDIENT": truncate_value(", ".join(fda_details.get("active_ingredient", ["Unknown"]))),
                    "INDICATIONS_AND_USAGE": truncate_value(", ".join(fda_details.get("indications_and_usage", ["Unknown"]))),
                    "SUBSTANCE_NAME": truncate_value(", ".join(fda_details.get("openfda", {}).get("substance_name", ["Unknown"]))),
                    "PRODUCT_TYPE": truncate_value(", ".join(fda_details.get("openfda", {}).get("product_type", ["Unknown"]))),
                    "PURPOSE": truncate_value(", ".join(fda_details.get("purpose", ["Unknown"]))),
                    "ROUTE": truncate_value(", ".join(fda_details.get("route", ["Unknown"]))),
                    "DOSAGE_AND_ADMINISTRATION": truncate_value(", ".join(fda_details.get("dosage_and_administration", ["Unknown"]))),
                }
            return {"BRAND_NAME": "Not Available"}
        else:
            return {"BRAND_NAME": "API Error"}
    except Exception as e:
        logging.error(f"API Error for {medication_name}: {e}")
        return {"BRAND_NAME": "API Error"}

# Helper function to truncate text to 200 characters
def truncate_value(value):
    return value[:200]

# Function to extract the medication name (only the text) from the 'MEDICATION_NAME'
def extract_medication_name(medication_name):
    # Extract the text before any numbers or brackets
    match = re.match(r"([a-zA-Z\s]+)", medication_name)
    return match.group(0) if match else medication_name

# Function to process each medication name and query the FDA API
def process_medication(index_row, medication_df):
    index, row = index_row
    medication_name = row['EXTRACTED_MED_NAME']
    
    # Log the start of medication processing
    logging.info(f"DATA EXTRACTION 4: Processing medication '{medication_name}' (Index: {index}).")

    # Query the FDA API for the medication details using EXTRACTED_MED_NAME
    fda_details = query_fda_api(medication_name)
    
    # Add the new columns to the DataFrame (use .at to avoid copy warnings)
    for column, value in fda_details.items():
        medication_df.at[index, column] = value

    # Log the completion of processing for this medication
    logging.info(f"DATA EXTRACTION 4.1: Completed processing for medication '{medication_name}' (Index: {index}).")

# Assuming medication_prescibed_df is already populated with the data

# Log the start of EXTRACTED_MED_NAME extraction
logging.info("DATA EXTRACTION 3.1: Extracting EXTRACTED_MED_NAME from MEDICATION_NAME.")

# Add EXTRACTED_MED_NAME column by extracting only the text from MEDICATION_NAME
medication_prescibed_df['EXTRACTED_MED_NAME'] = medication_prescibed_df['MEDICATION_NAME'].apply(extract_medication_name)

# Log the completion of EXTRACTED_MED_NAME extraction
logging.info("DATA EXTRACTION 3.2: EXTRACTED_MED_NAME column added to the DataFrame.")

# Filter out rows with "No Medication Prescribed"
medication_df = medication_prescibed_df[medication_prescibed_df['MEDICATION_NAME'] != "No Medication Prescribed"]

# Remove duplicate medication names based on EXTRACTED_MED_NAME
medication_df = medication_df.drop_duplicates(subset=["EXTRACTED_MED_NAME"])

# Log after filtering and removing duplicates
logging.info("DATA CLEANING 1: Filtered out rows with 'No Medication Prescribed' and removed duplicates.")

# Set up parallel processing with ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
    # Log the start of parallel processing
    logging.info("DATA EXTRACTION 5: Starting parallel processing for medication data.")

    # Submit individual tasks for each medication row
    futures = [executor.submit(process_medication, index_row, medication_df) for index_row in medication_df.iterrows()]

    # Wait for all futures to complete
    for future in futures:
        future.result()

    # Log the completion of parallel processing
    logging.info("DATA EXTRACTION 6: Completed parallel processing for all medications.")

# Log the saving of the DataFrame to CSV
logging.info("DATA LOADING: Saving the final DataFrame with 'EXTRACTED_MED_NAME' to CSV.")

# Save the final DataFrame with the added 'EXTRACTED_MED_NAME' to CSV
medication_df.to_csv("medication_dimension.csv", index=False)

# Log the successful completion of the ETL process
logging.info("ETL Process Complete: Data processing and saving to CSV completed successfully.")

# Show confirmation message
print("Data processing complete.")


2024-11-16 20:24:17,265 - INFO - DATA EXTRACTION 3.1: Extracting EXTRACTED_MED_NAME from MEDICATION_NAME.


2024-11-16 20:24:17,285 - INFO - DATA EXTRACTION 3.2: EXTRACTED_MED_NAME column added to the DataFrame.
2024-11-16 20:24:17,300 - INFO - DATA CLEANING 1: Filtered out rows with 'No Medication Prescribed' and removed duplicates.
2024-11-16 20:24:17,301 - INFO - DATA EXTRACTION 5: Starting parallel processing for medication data.
2024-11-16 20:24:17,303 - INFO - DATA EXTRACTION 4: Processing medication 'Clopidogrel ' (Index: 0).
2024-11-16 20:24:17,305 - INFO - DATA EXTRACTION 4: Processing medication 'Simvastatin ' (Index: 1).
2024-11-16 20:24:17,307 - INFO - DATA EXTRACTION 4: Processing medication '24 HR metoprolol succinate 100 MG Extended Release Oral Tablet' (Index: 2).
2024-11-16 20:24:17,308 - INFO - DATA EXTRACTION 4: Processing medication 'Nitroglycerin ' (Index: 3).
2024-11-16 20:24:17,310 - INFO - DATA EXTRACTION 4: Processing medication 'insulin isophane' (Index: 4).
2024-11-16 20:24:17,319 - DEBUG - Starting new HTTPS connection (1): api.fda.gov:443
2024-11-16 20:24:17,320 

Data processing complete.


Task 9 - Setting up the PostgreSQL settings/configurations, to execute the DDLS

In [None]:
import pandas as pd
import sqlite3
from sqlalchemy import create_engine
import logging

# Load data from CSV files
logging.info("Loading CSV files...")
file_paths = {
    "patients": "/mnt/data/patient_infromation_dimension.csv",
    "claims": "/mnt/data/claim_dimension.csv",
    "medications": "/mnt/data/medication_dimension.csv",
    "encounters": "/mnt/data/events_encounters.csv",
    "diagnostic_reports": "/mnt/data/events_diagnostic_report.csv",
    "conditions": "/mnt/data/events_conditions.csv",
    "eob": "/mnt/data/eob_dimension.csv"
}
dfs = {name: pd.read_csv(path) for name, path in file_paths.items()}

# Establish SQLite connection
sqlite_db = 'health_datawarehouse.db'
conn = sqlite3.connect(sqlite_db)
cursor = conn.cursor()

# Establish SQLAlchemy engine
engine = create_engine(f'sqlite:///{sqlite_db}')

# Create new normalized tables and fact table (Adjust the fact table to business KPIs)
create_tables_sql = """
CREATE TABLE IF NOT EXISTS patients (
    PATIENT_ID VARCHAR(255) PRIMARY KEY,
    FIRST_NAME VARCHAR(255),
    LAST_NAME VARCHAR(255),
    GENDER VARCHAR(50),
    BIRTH_DATE DATE,
    AGE INT,
    CITY VARCHAR(100),
    POSTAL_CODE INT,
    COUNTRY VARCHAR(100),
    MSISDN VARCHAR(50)
);

CREATE TABLE IF NOT EXISTS claims (
    CLAIM_ID VARCHAR(255) PRIMARY KEY,
    PATIENT_ID VARCHAR(255),
    STATUS VARCHAR(50),
    BILLABLE_PERIOD_START TIMESTAMP,
    BILLABLE_PERIOD_END TIMESTAMP,
    COST DECIMAL,
    CURRENCY VARCHAR(50),
    INSURANCE VARCHAR(50),
    PRESCRIPTION_ID VARCHAR(255),
    FOREIGN KEY (PATIENT_ID) REFERENCES patients(PATIENT_ID)
);

CREATE TABLE IF NOT EXISTS medications (
    MEDICATION_ID VARCHAR(255) PRIMARY KEY,
    MEDICATION_NAME VARCHAR(255),
    COST DECIMAL,
    PRESCRIBER VARCHAR(255),
    REASON VARCHAR(255),
    PATIENT_ID VARCHAR(255),
    FOREIGN KEY (PATIENT_ID) REFERENCES patients(PATIENT_ID)
);

CREATE TABLE IF NOT EXISTS encounters (
    EVENT_ID VARCHAR(255) PRIMARY KEY,
    PATIENT_ID VARCHAR(255),
    EVENT_TYPE VARCHAR(255),
    COST DECIMAL,
    LOCATION TEXT,
    START_DATETIME TIMESTAMP,
    END_DATETIME TIMESTAMP,
    FOREIGN KEY (PATIENT_ID) REFERENCES patients(PATIENT_ID)
);

CREATE TABLE IF NOT EXISTS diagnostic_reports (
    REPORT_ID VARCHAR(255) PRIMARY KEY,
    PATIENT_ID VARCHAR(255),
    REPORT_TYPE VARCHAR(255),
    REPORT_DATE TIMESTAMP,
    FOREIGN KEY (PATIENT_ID) REFERENCES patients(PATIENT_ID)
);

CREATE TABLE IF NOT EXISTS conditions (
    CONDITION_ID VARCHAR(255) PRIMARY KEY,
    PATIENT_ID VARCHAR(255),
    CONDITION_NAME VARCHAR(255),
    DIAGNOSIS_DATE TIMESTAMP,
    FOREIGN KEY (PATIENT_ID) REFERENCES patients(PATIENT_ID)
);

CREATE TABLE IF NOT EXISTS fact_patient_metrics (
    METRIC_ID INTEGER PRIMARY KEY AUTOINCREMENT,
    PATIENT_ID VARCHAR(255),
    TOTAL_CLAIM_COST DECIMAL,
    TOTAL_MEDICATION_COST DECIMAL,
    TOTAL_ENCOUNTER_COST DECIMAL,
    TOTAL_CLAIM_COUNT INT,
    TOTAL_ENCOUNTER_COUNT INT,
    LAST_ENCOUNTER_DATE TIMESTAMP,
    AVERAGE_CLAIM_COST DECIMAL,
    AVERAGE_ENCOUNTER_COST DECIMAL,
    FOREIGN KEY (PATIENT_ID) REFERENCES patients(PATIENT_ID)
);
"""
cursor.executescript(create_tables_sql)
conn.commit()

# Prepare data for fact table
logging.info("Creating the fact table from joined data...")

# Perform joins between the tables
fact_table_data = pd.merge(dfs['patients'], dfs['claims'], how='left', on='PATIENT_ID')
fact_table_data = pd.merge(fact_table_data, dfs['medications'], how='left', on='PATIENT_ID')
fact_table_data = pd.merge(fact_table_data, dfs['encounters'], how='left', on='PATIENT_ID')
fact_table_data = pd.merge(fact_table_data, dfs['diagnostic_reports'], how='left', on='PATIENT_ID')
fact_table_data = pd.merge(fact_table_data, dfs['conditions'], how='left', on='PATIENT_ID')

# Calculate business metrics for the fact table
fact_table_data['TOTAL_CLAIM_COST'] = fact_table_data.groupby('PATIENT_ID')['COST_x'].transform('sum')
fact_table_data['TOTAL_MEDICATION_COST'] = fact_table_data.groupby('PATIENT_ID')['COST_y'].transform('sum')
fact_table_data['TOTAL_ENCOUNTER_COST'] = fact_table_data.groupby('PATIENT_ID')['COST'].transform('sum')
fact_table_data['TOTAL_CLAIM_COUNT'] = fact_table_data.groupby('PATIENT_ID')['CLAIM_ID'].transform('count')
fact_table_data['TOTAL_ENCOUNTER_COUNT'] = fact_table_data.groupby('PATIENT_ID')['EVENT_ID'].transform('count')
fact_table_data['LAST_ENCOUNTER_DATE'] = fact_table_data.groupby('PATIENT_ID')['START_DATETIME'].transform('max')
fact_table_data['AVERAGE_CLAIM_COST'] = fact_table_data['TOTAL_CLAIM_COST'] / fact_table_data['TOTAL_CLAIM_COUNT']
fact_table_data['AVERAGE_ENCOUNTER_COST'] = fact_table_data['TOTAL_ENCOUNTER_COST'] / fact_table_data['TOTAL_ENCOUNTER_COUNT']

# Insert data into the fact table
logging.info("Inserting data into fact_patient_metrics table...")
fact_table_data_to_insert = fact_table_data[['PATIENT_ID', 'TOTAL_CLAIM_COST', 'TOTAL_MEDICATION_COST', 'TOTAL_ENCOUNTER_COST',
                                            'TOTAL_CLAIM_COUNT', 'TOTAL_ENCOUNTER_COUNT', 'LAST_ENCOUNTER_DATE',
                                            'AVERAGE_CLAIM_COST', 'AVERAGE_ENCOUNTER_COST']]
fact_table_data_to_insert.to_sql('fact_patient_metrics', engine, if_exists='append', index=False)

# Close connections
logging.info("Closing database connections...")
cursor.close()
conn.close()
logging.info("Database connections closed successfully.")


INFO:root:DATA LOADING: Starting process to load and create tables.
INFO:root:Dropping existing tables if they exist...


INFO:root:Creating tables...
INFO:root:Loading CSV files...
INFO:root:Inserting data into tables...
INFO:root:Inserting data into patient_hist table...
  patient_hist_data['CHANGE_DATE'] = pd.to_datetime('now')  # Add the current timestamp for change date
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  patient_hist_data['CHANGE_DATE'] = pd.to_datetime('now')  # Add the current timestamp for change date
INFO:root:Populating the fact_patient_metrics table...
