# UK Health Security Agency (UKHSA)
Description : Your task is to write a reproducible ETL pipeline to process a CSV data file of your choice using python code (this could include activities, but not limited to data manipulation, data validation and error handling etc.) that could be deployed on either an on-premise SQL server or within a cloud environment such as Azure/AWS.
Your objective is to create an ETL pipeline, considering access such as security, scalability and maintainability. We are interested in seeing how you apply your code writing ability and service knowledge to meet these goals effectively.

### Note: As the Question did not specify how the data will be used, I am gonna approach it as this data will be loaded to SQL server for analytical and Machine learning purpose.


## Dataset downloaded from https://www.kaggle.com/datasets/prasad22/healthcare-dataset/data

# Deployment

For deployment, managed Airflow from Azure Data Factory will be used,
This notebook explains the steps I took, but this ETL pipelines will be deployed in Airflow Azure Data Factory.
Why:
1. UKHSA already uses Azure
2. Security: Only the pipeline owner has access to the data and who has right Azure Policy have access to data
3. Scalability:Azure Managed Airflow automatically scales Apache Airflow nodes when required based on range specification (min, max)
4. Mintainability: Airflow DAGs are easily maintainable, Has WebUI to manage task, and to drill down on logs for any error. With Github its also reliably with version control.

Deployment Procedure:
1. Have a DAGs repo in Github.
2. push newly created DAG. Code review and Merge
3. In Azure DF, create a new airflow instance link it to this repo. and install dependencies as required.

Test DataPipeline, Depending on the Pipelines many test can be used pytest, or for have a local environment. In case of this pipeline most error can occur if format of csv changes.

In [1]:
import pandas as pd
import numpy as np
df=pd.read_csv("healthcare_dataset_.csv", index_col=False) # for cloud premise this would be path to S3 storage

In [2]:
df.head()

Unnamed: 0,Name,Age,Gender,Blood Type,Medical Condition,Date of Admission,Doctor,Hospital,Insurance Provider,Billing Amount,Room Number,Admission Type,Discharge Date,Medication,Test Results
0,Tiffany Ramirez,81,Female,O-,Diabetes,17/11/22,Patrick Parker,Wallace-Hamilton,Medicare 37490.98336,,146,Elective,01-12-22,Aspirin,Inconclusive
1,Ruben Burns,35,Male,O+,Asthma,01.06.23,Diane Jackson,"Burke, Griffin and Cooper",UnitedHealthcare 47304.06485,,404,Emergency,15-06-23,Lipitor,Normal
2,Chad Byrd,61,Male,B-,Obesity,09-01-19,Paul Baker,Walton LLC,Medicare 36874.897,,292,Emergency,08-02-19,Lipitor,Normal
3,Antonio Frederick,49,Male,B-,Asthma,02-05-20,Brian Chandler,Garcia Ltd,Medicare 23303.32209,,480,Urgent,03-05-20,Penicillin,Abnormal
4,Mrs. Brandy Flowers,51,Male,O-,Arthritis,09-07-21,Dustin Griffin,"Jones, Brown and Murray",UnitedHealthcare 18086.34418,,477,Urgent,02-08-21,Paracetamol,Normal


In [3]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10006 entries, 0 to 10005
Data columns (total 15 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   Name                10006 non-null  object 
 1   Age                 10006 non-null  int64  
 2   Gender              10006 non-null  object 
 3   Blood Type          10006 non-null  object 
 4   Medical Condition   10006 non-null  object 
 5   Date of Admission   10006 non-null  object 
 6   Doctor              10006 non-null  object 
 7   Hospital            10006 non-null  object 
 8   Insurance Provider  10006 non-null  object 
 9   Billing Amount      0 non-null      float64
 10  Room Number         10006 non-null  int64  
 11  Admission Type      10006 non-null  object 
 12  Discharge Date      10006 non-null  object 
 13  Medication          10006 non-null  object 
 14  Test Results        10006 non-null  object 
dtypes: float64(1), int64(2), object(12)
memory usage: 1.1

In [4]:
print("Duplicate",df.duplicated().sum())

empty_values = df[df == ''].count()
print("Empty values:",empty_values.sum())

nan_values = df.isnull().sum()
print("NaN values:",nan_values.sum())

df.drop_duplicates(inplace=True)

Duplicate 5
Empty values: 0
NaN values: 10006


## Identify wrong formats in Data and other discrepancy 
- "Date of Admission" Col have date in 3 formats using / . -
- "Insurance Provider" Col has "Billing Amount" Col data added to it and  "Billing Amount" col is empty 
- there are few dups
- We will also drop "Name" as its is irrelevant for analytical purpose. and for maintaining user privacy. Also if there was a column where uses has opt out of any use of his/her data then we would drop entire row for the user

In [5]:
df['Date of Admission'] = pd.to_datetime(df['Date of Admission'], format='%d/%m/%y', errors='coerce').fillna(
                    pd.to_datetime(df['Date of Admission'], format='%d.%m.%y', errors='coerce')).fillna(
                    pd.to_datetime(df['Date of Admission'], format='%d-%m-%y', errors='coerce'))

df['Discharge Date'] = pd.to_datetime(df['Discharge Date'], format='%d/%m/%y', errors='coerce').fillna(
                    pd.to_datetime(df['Discharge Date'], format='%d.%m.%y', errors='coerce')).fillna(
                    pd.to_datetime(df['Discharge Date'], format='%d-%m-%y', errors='coerce'))

df.drop(columns=['Name'],inplace=True)


In [6]:
for index, row in df.iterrows():
    # Extract numerical value from the end of the Insurance Provider
    numerical_value =  row['Insurance Provider'].split()[-1]
    
    #Check if number extracted is numerical
    if numerical_value.replace(".","").isnumeric():
        df.at[index, 'Billing Amount'] = float(numerical_value)
        df.at[index, 'Insurance Provider'] = row['Insurance Provider'].replace(numerical_value,"")

# Feature Engineering
As this is be used for Analytical and ML use, I will be creating new column Stay Length

In [7]:
df['Stay Length days'] = (df['Discharge Date'] - df['Date of Admission']).dt.days

#Here we can drop the 'Discharge Date' as we can calculate Discharge Date by using Date of Admission + Stay Length
df.drop(columns=['Discharge Date'],inplace=True)

In [8]:
# Rename columns to lowercase and replace spaces with underscores
df.columns = [col.lower().replace(' ', '_') for col in df.columns]
#df['entry id'] = np.arange(0, len(df))

In [9]:
df.info()
empty_values = df[df == ''].count()
print("Empty values:",empty_values.sum())

nan_values = df.isnull().sum()
print("NaN values:",nan_values.sum())


<class 'pandas.core.frame.DataFrame'>
Index: 10001 entries, 0 to 10000
Data columns (total 14 columns):
 #   Column              Non-Null Count  Dtype         
---  ------              --------------  -----         
 0   age                 10001 non-null  int64         
 1   gender              10001 non-null  object        
 2   blood_type          10001 non-null  object        
 3   medical_condition   10001 non-null  object        
 4   date_of_admission   10001 non-null  datetime64[ns]
 5   doctor              10001 non-null  object        
 6   hospital            10001 non-null  object        
 7   insurance_provider  10001 non-null  object        
 8   billing_amount      10001 non-null  float64       
 9   room_number         10001 non-null  int64         
 10  admission_type      10001 non-null  object        
 11  medication          10001 non-null  object        
 12  test_results        10001 non-null  object        
 13  stay_length_days    10001 non-null  int64         


In [10]:
nan_values = df[["age","gender","date_of_admission"  ,"doctor" ,"hospital" ,"billing_amount","admission_type"  ] ].isnull().sum()    
if nan_values.sum()>0:
    print("Key Data missing")

# This condition will be added to the DAGs to check if any key data is missing

In [1]:
import os
import psycopg2
import psycopg2.extras as extras

def get_postgres_connection():   
# Set up connection parameters
    host = '192.168.11.3' # os.environ['PG_HOST']
    database = 'huruhuru' 
    user = 'huruhuru' 
    password = 'abc123'

    try:
        # Connect to the PostgreSQL database
        connection = psycopg2.connect(
            host=host,
            database=database,
            user=user,
            password=password
        )
        print("Postgres connected")
        return connection

    except (Exception, psycopg2.Error) as error:
        print("Error while connecting to PostgreSQL", error,"type",type(error))
        raise error


def postgres_insert(conn, df, table):
    """
    Using psycopg2.extras.execute_values() to insert the dataframe
    """
    # Create a list of tupples from the dataframe values
    tuples = [tuple(x) for x in df.to_numpy()]
    # Comma-separated dataframe columns
    cols = ','.join(list(df.columns))
    # SQL quert to execute
    query  = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)
    cursor = conn.cursor()
    try:
        extras.execute_values(cursor, query, tuples) # execute_values is sed as this is better performance than all other methods https://naysan.ca/2020/05/09/pandas-to-postgresql-using-psycopg2-bulk-insert-performance-benchmark/
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("execute_values() done")
    cursor.close()

In [2]:
pg_conn=get_postgres_connection()

Postgres connected


In [None]:
postgres_insert(pg_conn,df,"patient_records")

# SQL Schema for loading the data
``` sql
CREATE TABLE patient_records (
    age INTEGER NOT NULL,
    gender VARCHAR(10) NOT NULL,
    date_of_admission TIMESTAMP NOT NULL,
    blood_type VARCHAR(5),
    medical_condition VARCHAR(255),
    doctor VARCHAR(255) NOT NULL,
    hospital VARCHAR(255) NOT NULL,
    insurance_provider VARCHAR(255),
    billing_amount NUMERIC(12, 2) NOT NULL,
    room_number INTEGER,
    admission_type VARCHAR(50) NOT NULL,
    medication VARCHAR(255),
    test_results TEXT,
    stay_length_days INTEGER,
    entry_id SERIAL PRIMARY KEY
);

```