## Import Required Libraries
We import all the libraries needed for data cleaning and preprocessing.

In [22]:
# The primary library for data manipulation and analysis, used for creating and working with DataFrames.
import pandas as pd

# The fundamental library for numerical computing in Python, often used for array operations and mathematical functions.
import numpy as np

# A scikit-learn class used for handling missing values (e.g., filling with mean, median, or constant).
from sklearn.impute import SimpleImputer

## Cleaning Diagnoses Dataset
Steps performed:
1. Load dataset
2. Remove duplicates
3. Fill missing values for diagnosis code & description
4. Remove invalid codes (e.g., "XXX")
5. Standardize text formatting
6. Enforce one-to-one code consistency per diagnosis
7. Convert data types
8. Export cleaned file

In [23]:
# Imports
# Primary library for data manipulation and analysis (DataFrames).
import pandas as pd
# Used for numerical operations, particularly for 'np.nan' (Not a Number).
import numpy as np
# A scikit-learn tool to handle missing data.
from sklearn.impute import SimpleImputer

# Load Data
# Load the raw dataset from a CSV file into a pandas DataFrame.
df = pd.read_csv("diagnoses.csv")

# Initial Cleaning
# Remove any rows that are exact duplicates to ensure data integrity.
df = df.drop_duplicates()

# Impute Missing Values
# We assume that missing codes/descriptions are rare and can be filled
# with the most common (mode) value from their respective columns.

# Impute missing 'diagnosis_code'.
# Note: fit_transform expects a 2D array, so we pass [['diagnosis_code']].
# .ravel() is then used to "flatten" the 2D output back into a 1D series
# for assignment to the DataFrame column.
df["diagnosis_code"] = SimpleImputer(strategy="most_frequent") \
    .fit_transform(df[["diagnosis_code"]]).ravel()

# Impute missing 'description' using the same "most_frequent" strategy.
df["description"] = SimpleImputer(strategy="most_frequent") \
    .fit_transform(df[["description"]]).ravel()

# Filter Invalid Data
# Remove rows where 'diagnosis_code' is "XXX", which is treated as a
# placeholder or invalid entry according to business rules.
df = df[df["diagnosis_code"] != "XXX"]

# Standardize Text Descriptions
# Clean and standardize the 'description' text to ensure consistency.
# .str.strip(): Remove leading/trailing whitespace.
# .str.title(): Capitalize the first letter of each word (e.g., "heart attack" -> "Heart Attack").
df["description"] = df["description"].str.strip().str.title()
# Correct known abbreviations for uniformity (e.g., "Unk" -> "Unknown").
df["description"] = df["description"].str.replace("Unk", "Unknown")

# Create a Consistent Description-to-Code Mapping ---
# GOAL: Ensure that every unique description maps to one, and only one,
# unique diagnosis code. This enforces a one-to-one relationship.

# Find the "preferred" code (mode) for each description.
# Group by the cleaned description and find the most frequent (mode)
# diagnosis code associated with it.
mode_mapping = df.groupby("description")["diagnosis_code"].agg(
    # Use a lambda to safely get the mode. If the mode is empty (e.g., no data),
    # return np.nan to avoid an error.
    lambda x: x.mode()[0] if not x.mode().empty else np.nan
).to_dict()

# Define the pool of descriptions and codes to work with.
# We'll handle 'Unknown' separately at the end.
primary_descriptions = [desc for desc in mode_mapping if desc != 'Unknown']

# Get a set of all unique codes present in the *original* data.
# This is our pool of available codes.
all_codes = set(df['diagnosis_code'].unique())
# Keep track of codes we've already assigned in our new map.
used_codes = set()
# This dictionary will hold our final, clean, 1:1 mapping.
consistent_map_final = {}

# Iterate and resolve conflicts to build the final map.
for desc in primary_descriptions:
    suggested_code = mode_mapping[desc]

    # HAPPY PATH
    # If the preferred code (mode) hasn't been assigned yet, assign it.
    if suggested_code not in used_codes:
        consistent_map_final[desc] = suggested_code
        used_codes.add(suggested_code)
    
    # ONFLICT PATH
    # The preferred code is already assigned to another description.
    # We must find a new, unique code for this description.
    else:
        # Find all codes from the original dataset that are not yet used.
        # Sort them (key=str) to ensure deterministic assignment (e.g., "C100" before "C99").
        available_codes = sorted(all_codes - used_codes, key=str)

        if available_codes:
            # If there's an unused code from the original pool, assign the first one.
            new_code = available_codes[0]
        else:
            # FALLBACK
            # No unused original codes are left. We must generate a new, synthetic code.
            # Start from "C999" and increment until we find an unused code.
            new_code = "C999"
            while new_code in used_codes:
                # Increment the numeric part (e.g., "C999" -> "C1000")
                new_code = f"C{int(new_code[1:]) + 1}"

        # Assign the new code (either from the pool or synthetic) to the map.
        consistent_map_final[desc] = new_code
        used_codes.add(new_code)

# Handle the 'Unknown' description.
# Add 'Unknown' back into the map with its original preferred code.
if 'Unknown' in mode_mapping:
    consistent_map_final['Unknown'] = mode_mapping['Unknown']
    # Note: We add this last and don't check for conflicts, assuming
    # its preferred code is the one we want it to have.

# Apply the Consistent Mapping
# Overwrite the 'diagnosis_code' column by mapping each 'description'
# to its new, consistent code from our final map.
df["diagnosis_code"] = df["description"].map(consistent_map_final)

# Enforce Final Data Types
# Convert columns to their proper data types for memory efficiency
# and to prevent errors in downstream systems (e.g., databases).
df = df.astype({
    "diagnosis_id": "int",
    "diagnosis_code": "str",
    "description": "str"
})

# Output Results
# Print the head of the cleaned DataFrame to the console for verification.
print("Diagnoses Cleaned Dataset:\n", df.head())

# Save the fully cleaned and standardized DataFrame to a new CSV file.
# index=False prevents pandas from writing the DataFrame index as a column.
df.to_csv("diagnoses_cleaned.csv", index=False)

Diagnoses Cleaned Dataset:
    diagnosis_id diagnosis_code   description
0             1           D004      Covid-19
1             2           D003  Hypertension
2             3           D005           Flu
3             4           D004      Covid-19
4             5           D004      Covid-19


## Cleaning Patients Dataset
Steps performed:
1. Load dataset
2. Fill missing names
3. Fix age errors and impute missing values
4. Normalize gender values
5. Remove duplicate patient records
6. Export cleaned file

In [24]:
# Load Data
# Load the raw patient dataset from its CSV source.
df = pd.read_csv("patients.csv")

# Clean Text Fields (Names)
# Handle missing first names: Fill with a placeholder 'Unknown' for consistency.
# Downstream systems may require a non-null value.
df['first_name'] = df['first_name'].fillna("Unknown")

# Clean last names:
# .str.rstrip("#"): Remove any trailing '#' characters, likely from a data entry artifact.
# .str.title(): Standardize to 'Title Case' (e.g., "smith" -> "Smith").
df['last_name'] = df['last_name'].str.rstrip("#").str.title()

# --- 3. Validate and Impute Numeric Fields (Age) ---
# Step 3a: Nullify invalid age entries.
# Apply business rule: Ages must be realistic. Set any age
# less than 0 or greater than 120 to 'None' (which becomes np.nan)
# so they can be imputed in the next step.
df.loc[(df['age'] < 0) | (df['age'] > 120), 'age'] = None

# Impute missing ages.
# Use 'mean' imputation strategy. This fills all NaN/None values in the 'age'
# column with the average age from the entire dataset.
# Note: [['age']] is used to pass a 2D array (DataFrame) to fit_transform.
imputer = SimpleImputer(strategy='mean')
df[['age']] = imputer.fit_transform(df[['age']])

# Convert age to integer.
# After imputation (which yields floats), cast the column to 'int'
# for a cleaner, more appropriate data type.
df['age'] = df['age'].astype(int)

# Normalize Categorical Fields (Gender)
# Standardize the 'gender' column to have consistent values.
# .str.strip(): Remove leading/trailing whitespace.
# .str.upper(): Convert all to uppercase to handle 'm', 'f', 'M', 'F', etc.
# .replace(): Map the standardized abbreviations to the full, desired terms.
df['gender'] = df['gender'].str.strip().str.upper().replace({
    'M': 'Male',
    'F': 'Female'
})

# Deduplicate Records
# Ensure each patient is represented only once by dropping duplicate rows
# based on the 'patient_id' column, which is our unique business key.
df = df.drop_duplicates(subset=['patient_id'])

# Output Results
# Print the head of the cleaned DataFrame to the console for verification.
print("Patients Cleaned Dataset:\n", df.head())

# Save the fully cleaned DataFrame to a new CSV file.
# index=False prevents pandas from writing the DataFrame row index as a new column.
df.to_csv("patients_cleaned.csv", index=False)

Patients Cleaned Dataset:
    patient_id first_name last_name  gender  age
0           1      Alice    Wilson  Female   52
1           2    Charlie     Smith    Male   93
2           3       Jane   Johnson  Female   15
3           4      Ethan     Smith  Female   72
4           5       Jane    Miller  Female   61


## Cleaning Treatments Dataset
Steps performed:
1. Load dataset and remove duplicates
2. Fix missing doctor names and costs
3. Replace invalid entries
4. Clean text formatting
5. Convert dates properly
6. Handle negative/extreme treatment costs
7. Convert data types
8. Export cleaned file

In [25]:
# Imports
# Import pandas for DataFrame manipulation.
import pandas as pd
# Import numpy for numerical operations, specifically np.nan.
import numpy as np
# Import SimpleImputer for handling missing data.
from sklearn.impute import SimpleImputer

# Load Data
# Load the raw treatments dataset from a CSV file.
df = pd.read_csv("treatments.csv")

# Deduplication
# Remove any rows that are exact duplicates across all columns.
# .copy() is used to avoid a SettingWithCopyWarning in later operations.
df = df.drop_duplicates().copy()

# Initial Imputation (Missing Values)
# Fill missing 'doctor_name' with the most frequent (mode) name in the dataset.
df["doctor_name"] = SimpleImputer(strategy="most_frequent") \
    .fit_transform(df[["doctor_name"]]).ravel()

# Fill missing 'treatment_cost' with the median cost. Median is chosen
# as it is robust to outliers, which are common in cost data.
df["treatment_cost"] = SimpleImputer(strategy="median") \
    .fit_transform(df[["treatment_cost"]]).ravel()

# Fix and Standardize Invalid Text Entries
# Replace placeholder '???' entries in 'doctor_name' with a standard 'Unknown' value.
# (na=False ensures we don't try to run .str.contains on NaN values).
df.loc[df["doctor_name"].str.contains("\?\?\?", na=False), "doctor_name"] = "Unknown Doctor"

# Standardize department names: correct known abbreviations.
df.loc[df["department"] == "UnknownDept", "department"] = "Unknown Department"

# --- 5. Standardize Text Formatting ---
# Apply consistent text formatting to string columns.
# .str.strip(): Remove leading/trailing whitespace.
# .str.title(): Convert to Title Case (e.g., "john doe" -> "John Doe").
df["doctor_name"] = df["doctor_name"].str.strip().str.title()
df["department"] = df["department"].str.strip().str.title()

# Convert Date Values
# Convert the 'admission_date' column to datetime objects.
# errors='coerce': If any date cannot be parsed, it will be set to NaT (Not a Time).
df["admission_date"] = pd.to_datetime(df["admission_date"], errors="coerce")

# Validate and Clean Numeric Fields (Cost)
# Apply business rules to the 'treatment_cost' column.

# Nullify invalid negative costs.
# Set any costs less than 0 to NaN, as negative costs are invalid.
df.loc[df["treatment_cost"] < 0, "treatment_cost"] = np.nan

# Re-impute.
# Fill the NaNs we just created (from negative values) using the median strategy.
# This ensures we don't have invalid negative numbers skewing our data.
df["treatment_cost"] = SimpleImputer(strategy="median").fit_transform(df[["treatment_cost"]])

# Cap outliers (Top-coding).
# Set any costs greater than 200,000 (a business-defined ceiling)
# to 200,000. This prevents extreme outliers from skewing analysis.
df.loc[df["treatment_cost"] > 200000, "treatment_cost"] = 200000

# Enforce Final Data Types
# Convert columns to their final, appropriate data types for memory
# efficiency and compatibility with downstream systems.
# Note: 'treatment_cost' remains float, 'admission_date' remains datetime.
df = df.astype({
    "admission_id": "int",
    "patient_id": "int",
    "diagnosis_id": "int",
    "department": "str",
    "doctor_name": "str"
})

# Output Results
# Print the head of the cleaned DataFrame to the console for verification.
print("Treatments Cleaned Dataset:\n", df.head())

# Save the cleaned data to a new CSV file without the pandas index.
df.to_csv("treatments_cleaned.csv", index=False)

Treatments Cleaned Dataset:
    admission_id  patient_id doctor_name          department  diagnosis_id  \
0             1          50   Dr. Evans            Oncology             1   
1             2          61   Dr. Evans           Neurology            78   
2             3          51   Dr. Adams           Neurology           115   
3             4          19   Dr. Adams  Unknown Department            47   
4             5          21   Dr. Evans          Pediatrics           114   

  admission_date  treatment_cost  
0     2023-05-08         49854.0  
1     2023-12-22         21607.0  
2     2023-10-06         38599.0  
3     2023-02-24         29529.0  
4     2023-10-20         36954.0  


## SQL Script Execution for ETL

This step runs the `script.sql` file which contains all SQL statements for:
1. Creating OLTP tables  
2.  Loading CSV datasets  
3.  Creating Data Warehouse dimension & fact tables  
4.  Transforming and inserting data into the DW

Execution is done using a database transaction:
- If all commands succeed → Commit to MySQL
- If any command fails → Rollback all changes

A log file named `etl_sql_execution.log` is generated to store execution details.

In [26]:
# The specific Python driver used to interact with MySQL databases.
import mysql.connector
# Standard library for writing logs, essential for tracking ETL processes.
import logging
# Standard library for interacting with the OS, used here to check file existence.
import os

# Logger Setup
# Configure a file-based logger. In data engineering, stdout (print) is temporary,
# but a log file provides a persistent record for debugging failed runs.
logging.basicConfig(
    filename="etl_sql_execution.log",  # Log file name.
    level=logging.INFO,  # Set the logging level to INFO (captures info, warnings, errors).
    format="%(asctime)s - %(levelname)s - %(message)s"  # Define a clear log format.
)

# Configuration & Pre-flight Check ---
# Define the SQL script to be executed.
SQL_FILE = "script.sql"

# Pre-check: Fail fast if the SQL script is missing.
# This avoids opening a database connection only to fail, saving resources.
if not os.path.exists(SQL_FILE):
    logging.error(f"Required {SQL_FILE} not found! Halting execution.")
    raise FileNotFoundError(f"{SQL_FILE} not found!")

# Database Connection & Transaction
# Initialize connection and cursor variables outside the try block.
# This ensures they are accessible in the 'finally' block for cleanup,
# even if the connection fails.
connection = None
cursor = None

try:
    # Connect to MySQL ---
    logging.info("Attempting to connect to MySQL database 'healthcare_dw'...")
    connection = mysql.connector.connect(
        host="localhost",  # DB server address
        user="root",  # Username (should be a service account in production)
        password="",  # Password (should be from env variables/secrets manager)
        database="healthcare_dw",  # The specific data warehouse to use
        allow_local_infile=True  # A flag often needed for 'LOAD DATA' commands
    )
    logging.info("Database connection successful.")

    # Create a cursor object, which is the interface for executing queries.
    cursor = connection.cursor()

    # Load SQL Script
    # Read the entire content of the .sql file into a string variable.
    with open(SQL_FILE, "r") as file:
        sql_script = file.read()
    logging.info(f"SQL script '{SQL_FILE}' loaded into memory.")

    # Execute Script Transactionally
    # Split the script into individual statements using the semicolon as a delimiter.
    # This allows for basic multi-statement execution.
    statements = sql_script.split(';')
    executed_count = 0

    logging.info("Starting execution of SQL statements...")
    for statement in statements:
        # .strip() removes any leading/trailing whitespace (like newlines).
        # This prevents errors from executing "empty" statements.
        stmt = statement.strip()
        if stmt:  # Only execute if the statement is not empty.
            cursor.execute(stmt)
            executed_count += 1
            # Log the first 50 chars for traceability without flooding the log file.
            logging.info(f"Executed statement: {stmt[:50]}...")

    # Commit Transaction
    # If all statements executed successfully, commit the changes.
    # This makes the changes (INSERTs, UPDATEs, etc.) permanent in the database.
    # This is the "all" part of an "all-or-nothing" transaction.
    connection.commit()
    logging.info(f"Script executed successfully. Committed {executed_count} statements.")
    print("SQL script executed successfully!")

except mysql.connector.Error as err:
    # Error Handling & Rollback
    # This block catches any database-specific errors (e.g., syntax, connection issues).
    logging.error(f"A MySQL error occurred: {err}")
    if connection:
        # If an error occurred, roll back *all* changes made during this 'try' block.
        # This is the "nothing" part of an "all-or-nothing" transaction,
        # ensuring the database is left in a consistent state.
        connection.rollback()
        logging.warning("Transaction rolled back due to error. No changes were committed.")
    print(f"Execution failed. Rolled back changes: {err}")

finally:
    # Cleanup
    # This block *always* executes, whether the 'try' succeeded or failed.
    # Its purpose is to close connections and free up database resources.
    logging.info("Closing database connection and cursor...")
    if cursor:
        cursor.close()
    if connection:
        connection.close()
    logging.info("Database connection closed.")

SQL script executed successfully!
