# Import Librairies

In [2]:
!pip install pymongo
!pip install pandas
!pip install python-dateutil



In [3]:
import os
import shutil
import pandas as pd
import logging
from datetime import datetime
from pymongo import MongoClient

In [42]:
Unprocessed_csv = "/data"
Processed_csv   = os.path.join(Unprocessed_csv, "archive")

USER    = os.getenv("MONGO_INITDB_ROOT_USERNAME", "admin")
PASSWORD  = os.getenv("MONGO_INITDB_ROOT_PASSWORD", "admin")
HOST    = "mongo"
PORT    = 27017
AUTH_DB = "admin"

DB         = "hospital"
COLLECTION = "hospital_collection"

# Colonnes attendues dans vos CSV :
COLUMNS = [
    "Name", "Age", "Gender", "Blood Type", "Medical Condition",
    "Date of Admission", "Doctor", "Hospital", "Insurance Provider",
    "Billing Amount", "Room Number", "Admission Type", "Discharge Date",
    "Medication", "Test Results"
]


In [None]:

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s  %(levelname)s  %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)


# Create a list of paths of all the untreated csv_files

In [10]:
def list_csv_files(Unprocessed_csv):
   
    all_files = []
    for fname in os.listdir(Unprocessed_csv):
        if fname.lower().endswith(".csv"):
            fullpath = os.path.join(Unprocessed_csv, fname)
            if os.path.isfile(fullpath):
                all_files.append(fullpath)
    return sorted(all_files)


# Process _file

In [47]:
def process_file(src_path, Processed_csv):
   
    os.makedirs(Processed_csv, exist_ok=True)
    fname = os.path.basename(src_path)
    dst = os.path.join(Processed_csv, fname)
    logging.info(f"  → processed {src_path} → {dst}")
    shutil.move(src_path, dst)

# Connection to Mongo

In [15]:
def connect_mongo():
   
    uri = f"mongodb://{USER}:{PASSWORD}@{HOST}:{PORT}/?authSource={AUTH_DB}"
    logging.info(f"Connection MongoDB : {uri}")
    client = MongoClient(uri)
    db = client[DB]
    coll = db[COLLECTION]
    return coll

# Data Cleaning

In [45]:
def validate_and_clean_csv(csv_path):

   
    logging.info(f"  → is loading : {csv_path}")
    try:
     df = pd.read_csv(csv_path, sep=";", engine="python", on_bad_lines="skip")
    except TypeError:
     df = pd.read_csv(csv_path, sep=";", engine="python", error_bad_lines=False)


    actual_cols = df.columns.tolist()
    logging.info(f"    • Colonnes readed : {actual_cols}")

     # Parsing of the column Date of Admission
    COLUMNS = [
        "Name", "Age", "Gender", "Blood Type", "Medical Condition",
        "Date of Admission", "Doctor", "Hospital", "Insurance Provider",
        "Billing Amount", "Room Number", "Admission Type", "Discharge Date",
        "Medication", "Test Results"
    ]
    try:
        df["Date of Admission"] = pd.to_datetime(df["Date of Admission"], format="%d/%m/%Y")
    except Exception as e:
        raise ValueError(f"⛔ Impossible to parse the column Date of Admission in {csv_path} : {e}")

 # Parsing of the column Discharge Date
    try:
        df["Discharge Date"] = pd.to_datetime(df["Discharge Date"], format="%d/%m/%Y")
    except Exception as e:
        raise ValueError(f"⛔ Impossible to parse the column Discharge Date in {csv_path} : {e}")


  # Handling missing values issues 
    missing_cols = [c for c in COLUMNS if c not in df.columns]
    if missing_cols:
        raise ValueError(f"Some columns are missing in {csv_path} : {missing_cols}")

    df = df.rename(columns={
        "Name": "name",
        "Age": "age",
        "Gender": "gender",
        "Blood Type": "blood_type",
        "Medical Condition": "medical_condition",
        "Date of Admission": "date_of_admission",
        "Doctor": "doctor",
        "Hospital": "hospital",
        "Insurance Provider": "insurance_provider",
        "Billing Amount": "billing_amount",
        "Room Number": "room_number",
        "Admission Type": "admission_type",
        "Discharge Date": "discharge_date",
        "Medication": "medication",
        "Test Results": "test_results"
    })


    records = df.to_dict(orient="records")
    logging.info(f"    • Prêt à insérer {len(records)} document(s).")
    return records

 


    

In [51]:
def etl_pipeline_for_all_new_csv():
   
    csv_files = list_csv_files(Unprocessed_csv)
    if not csv_files:
        logging.info("➡️ No files found")
        return

    coll = connect_mongo()
    logging.info(f" Collection in '{TARGET_DB}': {coll.database.list_collection_names()}")
   

    for csv_path in csv_files:
        fname = os.path.basename(csv_path)
        logging.info(f"--- file being processed: {fname} ---")
        try:
            records = validate_and_clean_csv(csv_path)

            logging.info(f"  → Ingestion in MongoDB: {len(records)} documents.")
            result = coll.insert_many(records)
            logging.info(f"    ✔️ {len(result.inserted_ids)} Ingested docs from {fname}.")

            process_file(csv_path, Processed_csv )

        except Exception as e:
            logging.error(f"    ❌ Error during the processing of {fname}: {e}")
            continue

    logging.info(" All news files have been processed.")


# ─────────────────────────────────────────────────────────────────────────────────────
# EXÉCUTION DU PIPELINE
logging.info("=== DÉMARRAGE DU PIPELINE ETL À " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " ===")
etl_pipeline_for_all_new_csv()
logging.info("=== FIN DU PIPELINE ETL À " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " ===")

2025-06-02 15:44:21  INFO  === DÉMARRAGE DU PIPELINE ETL À 2025-06-02 15:44:21 ===
2025-06-02 15:44:21  INFO  Connection MongoDB : mongodb://fares:AbCd2000@mongo:27017/?authSource=admin
2025-06-02 15:44:21  INFO   Collection in 'hospital': ['hospital_collection']
2025-06-02 15:44:21  INFO  --- file being processed: healthcare_dataset-20250506.csv ---
2025-06-02 15:44:21  INFO    → is loading : /data/healthcare_dataset-20250506.csv
2025-06-02 15:44:21  INFO      • Colonnes readed : ['Name', 'Age', 'Gender', 'Blood Type', 'Medical Condition', 'Date of Admission', 'Doctor', 'Hospital', 'Insurance Provider', 'Billing Amount', 'Room Number', 'Admission Type', 'Discharge Date', 'Medication', 'Test Results']
2025-06-02 15:44:22  INFO      • Prêt à insérer 55500 document(s).
2025-06-02 15:44:22  INFO    → Ingestion in MongoDB: 55500 documents.
2025-06-02 15:44:23  INFO      ✔️ 55500 Ingested docs from healthcare_dataset-20250506.csv.
2025-06-02 15:44:23  INFO    → processed /data/healthcare_da