<a href="https://colab.research.google.com/github/geithelmasri/AAI614_Geith1/blob/main/Implementing_ETL_Using_Python_for_a_Healthcare_Application.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Implementing ETL Using Python for a Healthcare Application**

---



In [4]:
import pandas as pd
# Extract patient data from CSV file
patients_df = pd.read_csv ( 'patients.csv' )
print( "Extracted Patient Data : " )
print( patients_df )


Extracted Patient Data : 
    patient_id             name  age  gender
0         P001      James Smith   45    Male
1         P002     Mary Johnson   32  Female
2         P003  Robert Williams   56    Male
3         P004   Patricia Brown   29  Female
4         P005       John Jones   67    Male
..         ...              ...  ...     ...
195       P196     Emily Brooks   41  Female
196       P197      Jack Fisher   29    Male
197       P198       Judith Lee   50  Female
198       P199       Sean Kelly   38    Male
199       P200  Rebecca Sanders   57  Female

[200 rows x 4 columns]


#  --- Diagnostics data (simulated API): ---

In [6]:
# Simulated  API  response	f o r	d i a g n o s t i c  data
diagnostic_data = [
{" diagnostic_id " :  "D001" ,  "patient_id" :  "P001" ,  "test" :  "Blood Test" , "result" :  "Normal" } ,
{"diagnostic_id" :  "D002" ,  "patient_id" :  "P002" ,  "test" :  "X−Ray",  "result" :  "Fracture"} ,
{"diagnostic_id" :  "D003" ,  "patient_id" :  "P003" ,  "test" :  "MRI" , "result" : "Normal"}
]
print ( " Extracted  Diagnos t ic  Data : " )
print ( diagnostic_data )


 Extracted  Diagnos t ic  Data : 
[{' diagnostic_id ': 'D001', 'patient_id': 'P001', 'test': 'Blood Test', 'result': 'Normal'}, {'diagnostic_id': 'D002', 'patient_id': 'P002', 'test': 'X−Ray', 'result': 'Fracture'}, {'diagnostic_id': 'D003', 'patient_id': 'P003', 'test': 'MRI', 'result': 'Normal'}]


#  --- Transform Data ---

In [8]:
# Clean patient data: Filter out patients younger than 40 years old
cleaned_patients_df = patients_df[patients_df['age'] >= 40].copy()
print("\nCleaned Patient Data (age >= 40):")
cleaned_patients_df


Cleaned Patient Data (age >= 40):


Unnamed: 0,patient_id,name,age,gender
0,P001,James Smith,45,Male
2,P003,Robert Williams,56,Male
4,P005,John Jones,67,Male
5,P006,Linda Garcia,40,Female
7,P008,Barbara Davis,55,Female
...,...,...,...,...
193,P194,Dorothy Patterson,48,Female
194,P195,Benjamin Ward,55,Male
195,P196,Emily Brooks,41,Female
197,P198,Judith Lee,50,Female


In [9]:
# Enrich diagnostic data with patient information: Join the diagnostics data with
# patient details (name, age, gender) to provide context for the test results.

# Transform diagnostic data (simulated API response) into a Pandas DataFrame
diagnostic_df = pd.DataFrame(diagnostic_data)
print ( " Transformed  Diagnostic  Data  into  DataFrame: " )
print ( diagnostic_df )

# Merge diagnostic data with patient data based on 'patient_id'
enriched_diagnostic_df = pd.merge(diagnostic_df, patients_df[['patient_id', 'name', 'age', 'gender']], on='patient_id', how='left')

print("\nEnriched Diagnostic Data:")
enriched_diagnostic_df

 Transformed  Diagnostic  Data  into  DataFrame: 
   diagnostic_id  patient_id        test    result diagnostic_id
0            D001       P001  Blood Test    Normal           NaN
1             NaN       P002       X−Ray  Fracture          D002
2             NaN       P003         MRI    Normal          D003

Enriched Diagnostic Data:


Unnamed: 0,diagnostic_id,patient_id,test,result,diagnostic_id.1,name,age,gender
0,D001,P001,Blood Test,Normal,,James Smith,45,Male
1,,P002,X−Ray,Fracture,D002,Mary Johnson,32,Female
2,,P003,MRI,Normal,D003,Robert Williams,56,Male


#  --- Load Data into MongoDB ---
• Connect to MongoDB

• Load Patient Data into MongoDB

• Load Diagnostic Data into MongoDB


In [10]:
!pip install pymongo

Collecting pymongo
  Downloading pymongo-4.13.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.7.0-py3-none-any.whl.metadata (5.8 kB)
Downloading pymongo-4.13.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m24.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dnspython-2.7.0-py3-none-any.whl (313 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m313.6/313.6 kB[0m [31m20.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dnspython, pymongo
Successfully installed dnspython-2.7.0 pymongo-4.13.2


In [12]:
from pymongo import MongoClient
client = MongoClient("mongodb+srv://user1:ecommerce1@ecommerce.mvmspnu.mongodb.net/?retryWrites=true&w=majority&appName=ecommerce")
db = client['healthcare']
patients_collection = db['patients']
diagnostics_collection = db['diagnostics']
patients_collection.insert_many(cleaned_patients_df.to_dict('records'))
diagnostics_collection.insert_many(enriched_diagnostic_df.to_dict('records'))
print("\nLoaded Patient and Diagnostic Data into MongoDB.")


Loaded Patient and Diagnostic Data into MongoDB.


#  --- Automate the ETL Process ---


In [13]:
def extract_patient_data(file_path):
  """Extract patient data from a CSV file."""
  patients_df = pd.read_csv(file_path)
  print("Extracted Patient Data:")
  print(patients_df)
  return patients_df

def extract_diagnostic_data():
  """Extract diagnostic data from a simulated API."""
  diagnostic_data = [
      {"diagnostic_id": "D001", "patient_id": "P001", "test": "Blood Test", "result": "Normal"},
      {"diagnostic_id": "D002", "patient_id": "P002", "test": "X−Ray", "result": "Fracture"},
      {"diagnostic_id": "D003", "patient_id": "P003", "test": "MRI", "result": "Normal"}
  ]
  print("Extracted Diagnostic Data:")
  print(diagnostic_data)
  return diagnostic_data

def transform_patient_data(patients_df):
  """Clean patient data by filtering by age."""
  cleaned_patients_df = patients_df[patients_df['age'] >= 40].copy()
  print("\nCleaned Patient Data (age >= 40):")
  print(cleaned_patients_df)
  return cleaned_patients_df

def transform_diagnostic_data(diagnostic_data, patients_df):
  """Transform diagnostic data and enrich it with patient information."""
  diagnostic_df = pd.DataFrame(diagnostic_data)
  print("Transformed Diagnostic Data into DataFrame:")
  print(diagnostic_df)

  enriched_diagnostic_df = pd.merge(diagnostic_df, patients_df[['patient_id', 'name', 'age', 'gender']], on='patient_id', how='left')
  print("\nEnriched Diagnostic Data:")
  print(enriched_diagnostic_df)
  return enriched_diagnostic_df

def load_data_to_mongodb(cleaned_patients_df, enriched_diagnostic_df, mongo_uri, db_name):
  """Load transformed data into MongoDB."""
  client = MongoClient(mongo_uri)
  db = client[db_name]
  patients_collection = db['patients']
  diagnostics_collection = db['diagnostics']

  # Clear existing data for idempotency in this example
  patients_collection.delete_many({})
  diagnostics_collection.delete_many({})

  if not cleaned_patients_df.empty:
    patients_collection.insert_many(cleaned_patients_df.to_dict('records'))
  if not enriched_diagnostic_df.empty:
    diagnostics_collection.insert_many(enriched_diagnostic_df.to_dict('records'))

  print("\nLoaded Patient and Diagnostic Data into MongoDB.")

def run_etl_pipeline(patient_file_path, mongo_uri, db_name):
  """Runs the complete ETL pipeline."""
  print("Starting ETL Pipeline...")

  # Extract
  patients_df = extract_patient_data(patient_file_path)
  diagnostic_data = extract_diagnostic_data()

  # Transform
  cleaned_patients_df = transform_patient_data(patients_df)
  enriched_diagnostic_df = transform_diagnostic_data(diagnostic_data, patients_df)

  # Load
  load_data_to_mongodb(cleaned_patients_df, enriched_diagnostic_df, mongo_uri, db_name)

  print("ETL Pipeline Finished.")


mongo_connection_string = "mongodb+srv://user1:ecommerce1@ecommerce.mvmspnu.mongodb.net/?retryWrites=true&w=majority&appName=ecommerce"
database_name = 'healthcare'
patient_data_file = 'patients.csv'


try:
    with open(patient_data_file, 'x') as f:
        f.write("patient_id,name,age,gender\n")
        f.write("P001,Alice,35,Female\n")
        f.write("P002,Bob,45,Male\n")
        f.write("P003,Charlie,55,Male\n")
except FileExistsError:
    pass # File already exists

run_etl_pipeline(patient_data_file, mongo_connection_string, database_name)


Starting ETL Pipeline...
Extracted Patient Data:
    patient_id             name  age  gender
0         P001      James Smith   45    Male
1         P002     Mary Johnson   32  Female
2         P003  Robert Williams   56    Male
3         P004   Patricia Brown   29  Female
4         P005       John Jones   67    Male
..         ...              ...  ...     ...
195       P196     Emily Brooks   41  Female
196       P197      Jack Fisher   29    Male
197       P198       Judith Lee   50  Female
198       P199       Sean Kelly   38    Male
199       P200  Rebecca Sanders   57  Female

[200 rows x 4 columns]
Extracted Diagnostic Data:
[{'diagnostic_id': 'D001', 'patient_id': 'P001', 'test': 'Blood Test', 'result': 'Normal'}, {'diagnostic_id': 'D002', 'patient_id': 'P002', 'test': 'X−Ray', 'result': 'Fracture'}, {'diagnostic_id': 'D003', 'patient_id': 'P003', 'test': 'MRI', 'result': 'Normal'}]

Cleaned Patient Data (age >= 40):
    patient_id               name  age  gender
0         P001