# Q1–Q4

In [15]:
# ============================================================
# Q1–Q4: Hospital Data Processing Pipeline
# ============================================================

import pandas as pd
from sqlalchemy import create_engine
import uuid

# --- Postgres Connection ---
from sqlalchemy import create_engine
PASSWORD = input("Enter your database password: ")
engine = create_engine(f"postgresql+psycopg2://postgres:{PASSWORD}@localhost:5432/progtest")


In [16]:
# ============================================================
# Q1: Read and De-identify
# ============================================================

# 1a) Read CSVs
admissions_surg = pd.read_csv("admissions_surg.csv")
admissions_med = pd.read_csv("admissions_med.csv")
imaging = pd.read_csv("imaging.csv")
display (admissions_surg.head())
display (admissions_med.head())
display (imaging.head())


Unnamed: 0,ID,ADMISSION.DATE,ADMISSION.TIME,DISCHARGE.DATE,DISCHARGE.TIME,DEPARTMENT,GENDER,AGE,MAIN.DIAGNOSIS.ICD10,MAIN.DIAGNOSIS.NAME
0,1068,1998-02-01,07:02,1998-03-31,07:17,General Surgery,M,,E0800,Diabetes mellitus with complications
1,1125,2010-11-30,04:33,2011-03-25,20:59,General Surgery,F,24.0,M0500,Rheumatoid arthritis and related disease
2,1127,2015-03-05,09:40,2015-08-28,04:51,General Surgery,M,92.0,O045,Induced abortion
3,1363,1987-11-07,21:37,1988-05-10,15:32,General Surgery,F,93.0,A6000,Viral infection
4,1388,1997-07-03,03:44,1997-12-27,03:45,General Surgery,F,75.0,K640,Hemorrhoids


Unnamed: 0,ID,admission_date,admission_time,discharge_date,discharge_time,department,gender,age,main_diagnosis_icd10,main_diagnosis_name
0,1064,1985-10-19,04:27,1986-01-01,,General Internal Medicine,M,63.0,N830,Ovarian cyst
1,1066,1990-06-26,21:06,1990-09-03,21:03,General Internal Medicine,F,63.0,N410,Inflammatory conditions of male genital organs
2,1074,1994-06-13,06:36,1994-12-22,07:21,General Internal Medicine,F,51.0,K640,Hemorrhoids
3,1108,2005-03-11,21:54,2005-04-19,,General Internal Medicine,F,,C6200,Cancer of testis
4,1137,1997-06-26,09:49,1997-09-06,21:03,General Internal Medicine,M,81.0,I462,Cardiac arrest and ventricular fibrillation


Unnamed: 0,ID,test_name,ordered_date_time,performed_date,performed_time,technician_name,brief_report
0,1064,US,,1985-12-17,10:27,Trevon Hopson,No significant abnormality
1,1064,US PELVIS,,1985-12-02,11:40,Claire Melko,Indication: normal
2,1066,Abdomen CT,,1990-08-05,12:26,Ladonna Mcallister,Indication: Normal
3,1068,US,,1998-03-24,16:15,claire melko,Indication: Normal
4,1074,CT neck + head,,1994-11-05,01:36,Lorena Burciaga,Normal


In [28]:
# Rename columns
admissions_surg.rename(
    columns=lambda x: x if x == "ID" else x.lower().replace('.', '_'),
    inplace=True
)
print('admissions_surg',admissions_surg.columns.tolist())
print('admissions_med',admissions_med.columns.tolist())
print('imaging',imaging.columns.tolist())

admissions_surg ['admission_date', 'admission_time', 'discharge_date', 'discharge_time', 'department', 'gender', 'age', 'main_diagnosis_icd10', 'main_diagnosis_name', 'de_id']
admissions_med ['admission_date', 'admission_time', 'discharge_date', 'discharge_time', 'department', 'gender', 'age', 'main_diagnosis_icd10', 'main_diagnosis_name', 'DE_ID']
imaging ['test_name', 'ordered_date_time', 'performed_date', 'performed_time', 'technician_name', 'brief_report', 'DE_ID']


In [None]:
# 1b) Compile all IDs from surgical + medical admissions
all_ids = pd.concat([admissions_surg['ID'], admissions_med['ID']], ignore_index=True).drop_duplicates()

# Create DE_ID mapping
q1b = pd.DataFrame({
    "ID": all_ids,
    "DE_ID": [str(uuid.uuid4()) for _ in range(len(all_ids))]
})
print("First 10 rows of q1b:")
display(q1b.head(10))

# 1c) Check uniqueness
print("Is DE_ID unique? ->", q1b['DE_ID'].is_unique)

# 1d) De-identify datasets
admissions_surg = admissions_surg.merge(q1b, on="ID").drop(columns=["ID"])
admissions_med = admissions_med.merge(q1b, on="ID").drop(columns=["ID"])
imaging = imaging.merge(q1b, on="ID").drop(columns=["ID"])

print("\nDe-identified admissions_surg:")
display(admissions_surg.head())
print("\nDe-identified admissions_med:")
display(admissions_med.head())
print("\nDe-identified imaging:")
display(imaging.head())


First 10 rows of q1b:


Unnamed: 0,ID,DE_ID
0,1068,5b55b543-211d-44ad-bac1-6243dfc0576a
1,1125,fc3208ca-8d32-4b97-bea7-ac26ebfee5ef
2,1127,57fb035b-1ff0-4659-8b26-e0527ef71ead
3,1363,6404df51-80ff-4398-9ebe-d3d345f777a1
4,1388,e42dcd4b-29a4-486a-9818-1fe94ebaaadc
5,1403,b0c3e26e-80cb-4a65-8029-7956e4d1d206
6,1419,08c8bae4-64e0-4666-aa5c-05f46a243478
7,1435,cbffbcf3-2710-47ee-85a5-12e211df3f11
8,1631,80ab6340-c0d9-4b65-9646-3532ee7e60c1
9,1654,085a11b3-460e-4690-967c-6b4583e83af0


Is DE_ID unique? -> True

De-identified admissions_surg:


Unnamed: 0,admission_date,admission_time,discharge_date,discharge_time,department,gender,age,main_diagnosis_icd10,main_diagnosis_name,DE_ID
0,1998-02-01,07:02,1998-03-31,07:17,General Surgery,M,,E0800,Diabetes mellitus with complications,5b55b543-211d-44ad-bac1-6243dfc0576a
1,2010-11-30,04:33,2011-03-25,20:59,General Surgery,F,24.0,M0500,Rheumatoid arthritis and related disease,fc3208ca-8d32-4b97-bea7-ac26ebfee5ef
2,2015-03-05,09:40,2015-08-28,04:51,General Surgery,M,92.0,O045,Induced abortion,57fb035b-1ff0-4659-8b26-e0527ef71ead
3,1987-11-07,21:37,1988-05-10,15:32,General Surgery,F,93.0,A6000,Viral infection,6404df51-80ff-4398-9ebe-d3d345f777a1
4,1997-07-03,03:44,1997-12-27,03:45,General Surgery,F,75.0,K640,Hemorrhoids,e42dcd4b-29a4-486a-9818-1fe94ebaaadc



De-identified admissions_med:


Unnamed: 0,admission_date,admission_time,discharge_date,discharge_time,department,gender,age,main_diagnosis_icd10,main_diagnosis_name,DE_ID
0,1985-10-19,04:27,1986-01-01,,General Internal Medicine,M,63.0,N830,Ovarian cyst,7a9abf72-44f8-4ed8-a780-d9e68ddf82e4
1,1990-06-26,21:06,1990-09-03,21:03,General Internal Medicine,F,63.0,N410,Inflammatory conditions of male genital organs,0cc7f158-001a-4621-a0ed-08699e1ea214
2,1994-06-13,06:36,1994-12-22,07:21,General Internal Medicine,F,51.0,K640,Hemorrhoids,98a69c85-44e6-4838-81a5-70432828ecb3
3,2005-03-11,21:54,2005-04-19,,General Internal Medicine,F,,C6200,Cancer of testis,6d32c8ea-61f2-4a09-b675-e9d481e84667
4,1997-06-26,09:49,1997-09-06,21:03,General Internal Medicine,M,81.0,I462,Cardiac arrest and ventricular fibrillation,6b0b99ac-7501-47c3-be14-b191623304bd



De-identified imaging:


Unnamed: 0,test_name,ordered_date_time,performed_date,performed_time,technician_name,brief_report,DE_ID
0,US,,1985-12-17,10:27,Trevon Hopson,No significant abnormality,7a9abf72-44f8-4ed8-a780-d9e68ddf82e4
1,US PELVIS,,1985-12-02,11:40,Claire Melko,Indication: normal,7a9abf72-44f8-4ed8-a780-d9e68ddf82e4
2,Abdomen CT,,1990-08-05,12:26,Ladonna Mcallister,Indication: Normal,0cc7f158-001a-4621-a0ed-08699e1ea214
3,US,,1998-03-24,16:15,claire melko,Indication: Normal,5b55b543-211d-44ad-bac1-6243dfc0576a
4,CT neck + head,,1994-11-05,01:36,Lorena Burciaga,Normal,98a69c85-44e6-4838-81a5-70432828ecb3


In [19]:
# ============================================================
# Q2: Merge into admissions_img
# ============================================================
admissions_combined = pd.concat([admissions_surg, admissions_med], ignore_index=True)
admissions_img = admissions_combined.merge(imaging, on="DE_ID", how="left")

print("\nAdmissions + Imaging merged sample:")
display(admissions_img.head())


Admissions + Imaging merged sample:


Unnamed: 0,admission_date,admission_time,discharge_date,discharge_time,department,gender,age,main_diagnosis_icd10,main_diagnosis_name,DE_ID,test_name,ordered_date_time,performed_date,performed_time,technician_name,brief_report
0,1998-02-01,07:02,1998-03-31,07:17,General Surgery,M,,E0800,Diabetes mellitus with complications,5b55b543-211d-44ad-bac1-6243dfc0576a,US,,1998-03-24,16:15,claire melko,Indication: Normal
1,2010-11-30,04:33,2011-03-25,20:59,General Surgery,F,24.0,M0500,Rheumatoid arthritis and related disease,fc3208ca-8d32-4b97-bea7-ac26ebfee5ef,ct neck and head,,2011-03-12,12:33,zach straughter,Normal
2,2015-03-05,09:40,2015-08-28,04:51,General Surgery,M,92.0,O045,Induced abortion,57fb035b-1ff0-4659-8b26-e0527ef71ead,ct neck,,2015-05-08,03:06,mastoora al-kaber,Cancer
3,2015-03-05,09:40,2015-08-28,04:51,General Surgery,M,92.0,O045,Induced abortion,57fb035b-1ff0-4659-8b26-e0527ef71ead,RT LEG DOPPLER,,2015-05-21,02:44,marco carr,"On visual analysis, normal"
4,1987-11-07,21:37,1988-05-10,15:32,General Surgery,F,93.0,A6000,Viral infection,6404df51-80ff-4398-9ebe-d3d345f777a1,ct neck,,1988-01-26,11:47,marco carr,No significant abnormality


In [20]:
admissions_img.isna().sum()

admission_date             0
admission_time             0
discharge_date             0
discharge_time           623
department                 0
gender                     0
age                      316
main_diagnosis_icd10       0
main_diagnosis_name      433
DE_ID                      0
test_name                406
ordered_date_time       2104
performed_date           406
performed_time           406
technician_name          406
brief_report             406
dtype: int64

In [21]:
# ============================================================
# Q3: Length of Stay
# ============================================================
# Ensure datetime parsing
admissions_img['admission_datetime'] = pd.to_datetime(
    admissions_img['admission_date'] + ' ' + admissions_img['admission_time']
)
admissions_img['discharge_datetime'] = pd.to_datetime(
    admissions_img['discharge_date'] + ' ' + admissions_img['discharge_time']
)

admissions_img['length_of_stay'] = (
    admissions_img['discharge_datetime'] - admissions_img['admission_datetime']
).dt.total_seconds() / (24 * 3600)

los = admissions_img.groupby("department", as_index=False)['length_of_stay'].mean()
print("\nAverage length of stay per department:")
display(los)


Average length of stay per department:


Unnamed: 0,department,length_of_stay
0,Addiction Services,100.623098
1,General Internal Medicine,99.340352
2,General Surgery,100.135424
3,Obstetrics,105.349594
4,Oncology,104.213743
5,Palliative Care,95.497606


In [22]:
# ============================================================
# Q4: First test per test_name, wide format
# ============================================================
imaging['performed_date'] = pd.to_datetime(imaging['performed_date'])
first_tests = imaging.sort_values(by=['DE_ID', 'test_name', 'performed_date']) \
                     .groupby(['DE_ID', 'test_name']).first().reset_index()

# Pivot to wide format
q4_df = first_tests.pivot(index="DE_ID", columns="test_name", values="performed_date").reset_index()

print("\nWide format q4_df sample:")
display(q4_df.head())

# Export q4_df
q4_df.to_csv("q4_df.csv", index=False)
los.to_csv("los.csv", index=False)


Wide format q4_df sample:


test_name,DE_ID,ABDOMEN/PELVIS US,Abdomen CT,CT,CT - ABDOMEN,CT - Femur,CT neck + head,DOP LEG VEIN,DOPPLER US,Doppler Ultrasound,...,LT LEG DOPPLER,NECK AND HEAD CT,RT LEG DOPPLER,THORAX/ABDOMEN CT,US,US ABDOMEN,US PELVIS,Ultrasound,ct neck,ct neck and head
0,001ddef6-5a92-443f-a1fb-41ee7b0994b1,NaT,NaT,NaT,NaT,NaT,NaT,NaT,NaT,NaT,...,NaT,NaT,NaT,1989-10-23,NaT,NaT,NaT,NaT,1989-07-08,NaT
1,0025aeea-f4b0-4fbd-bc14-314073ae553a,NaT,NaT,NaT,NaT,NaT,NaT,NaT,NaT,NaT,...,NaT,NaT,NaT,NaT,1991-04-11,NaT,NaT,NaT,NaT,NaT
2,003e08eb-4c72-4612-a7a9-bfe781398c88,1996-04-13,NaT,NaT,NaT,NaT,NaT,NaT,NaT,NaT,...,NaT,NaT,NaT,NaT,NaT,NaT,NaT,NaT,NaT,NaT
3,005c8327-cfc6-471b-ac2e-daf6ac82219a,NaT,NaT,NaT,NaT,NaT,NaT,NaT,NaT,NaT,...,NaT,NaT,NaT,NaT,NaT,NaT,NaT,NaT,NaT,1996-10-30
4,00a5772c-4deb-4fda-8156-7b7e4375962c,NaT,NaT,NaT,NaT,NaT,1997-04-09,NaT,NaT,NaT,...,NaT,NaT,NaT,NaT,NaT,NaT,NaT,NaT,NaT,NaT


In [26]:
# ============================================================
# Upload to Postgres
# ============================================================
los.to_sql("los", engine, if_exists="replace", index=False)
q4_df.to_sql("q4_df", engine, if_exists="replace", index=False)
print("\nUpload complete: 'los' and 'q4_df' saved to Postgres.")


Upload complete: 'los' and 'q4_df' saved to Postgres.


In [None]:
# from sqlalchemy import text

# with engine.connect() as conn:
#     conn.execute(text("DROP TABLE IF EXISTS los;"))
#     conn.execute(text("DROP TABLE IF EXISTS q4_df;"))
#     conn.commit()



# Q5-6

---

Q5 — Bash script pipeline with considerations
Minimal considerations (and 1 extra)
- Flexibility
    - Make input file paths configurable via arguments or environment variables.
    - Avoid hardcoding passwords — use environment variables or .env files.

- Scheduling
    - Use cron to run daily at 9 AM. Example:
    - 0 9 * * * /path/to/hospital_pipeline.sh >> /path/to/hospital_pipeline.log 2>&1

- Successful upload checks

    - The Python script should:

    - Exit with non-zero status on failure.

    - After upload, query the target tables (los, q4_df) to confirm row counts > 0.

- Tools & Technologies

    - Python → data processing (pandas, SQLAlchemy) — you’ve already implemented Q1–Q4.

    - Postgres → final storage and live dashboard source.

    - Bash → orchestration and scheduling wrapper.

    - cron → scheduling.

- Additional consideration (extra) — ETL Logging

    - a record of every run: execution time, status, row counts, runtime duration.

    - This helps debugging and historical tracking.


### Sample code below

In [None]:
# below is bash script outline: daily_pipeline.sh

#!/bin/bash
set -euo pipefail

# Config
export DB_PASSWORD="your_pg_password"
DATA_DIR="/path/to/daily_extracts"
PY_SCRIPT="/path/to/hospital_pipeline.py"
LOG_FILE="/path/to/etl_run.log"

echo "[$(date)] ETL run started" >> "$LOG_FILE"

python3 "$PY_SCRIPT" \
  --admissions_surg "$DATA_DIR/admissions_surg.csv" \
  --admissions_med "$DATA_DIR/admissions_med.csv" \
  --imaging "$DATA_DIR/imaging.csv" \
  --db_user "postgres" \
  --db_pass "$DB_PASSWORD" \
  --db_host "localhost" \
  --db_port "5432" \
  --db_name "progtest" >> "$LOG_FILE" 2>&1

if [ $? -eq 0 ]; then
  echo "[$(date)] ETL completed successfully" >> "$LOG_FILE"
else
  echo "[$(date)] ETL failed" >> "$LOG_FILE"
  exit 1
fi


In [None]:
# Hospital_pipeline.py
import argparse
import pandas as pd
from sqlalchemy import create_engine
import uuid, time, os, json, traceback

# --- Args ---
parser = argparse.ArgumentParser()
parser.add_argument("--admissions_surg", required=True)
parser.add_argument("--admissions_med", required=True)
parser.add_argument("--imaging", required=True)
parser.add_argument("--db_user", required=True)
parser.add_argument("--db_pass", required=True)
parser.add_argument("--db_host", required=True)
parser.add_argument("--db_port", required=True)
parser.add_argument("--db_name", required=True)
args = parser.parse_args()

# --- DB Connection ---
engine = create_engine(
    f"postgresql+psycopg2://{args.db_user}:{args.db_pass}@{args.db_host}:{args.db_port}/{args.db_name}"
)

def clean_columns(df):
    return df.rename(columns=lambda x: x if x == "ID" else x.lower().replace('.', '_'))

start_time = time.time()
status = "SUCCESS"
error_message = None
row_count_los = 0
row_count_q4 = 0

try:
    # Q1–Q4 logic (unchanged except paths from args)
    admissions_surg = clean_columns(pd.read_csv(args.admissions_surg))
    admissions_med = clean_columns(pd.read_csv(args.admissions_med))
    imaging = clean_columns(pd.read_csv(args.imaging))

    all_ids = pd.concat([admissions_surg['ID'], admissions_med['ID']], ignore_index=True).drop_duplicates()
    q1b = pd.DataFrame({"ID": all_ids, "DE_ID": [str(uuid.uuid4()) for _ in range(len(all_ids))]})

    admissions_surg = admissions_surg.merge(q1b, on="ID").drop(columns=["ID"])
    admissions_med = admissions_med.merge(q1b, on="ID").drop(columns=["ID"])
    imaging = imaging.merge(q1b, on="ID").drop(columns=["ID"])

    admissions_combined = pd.concat([admissions_surg, admissions_med], ignore_index=True)
    admissions_img = admissions_combined.merge(imaging, on="DE_ID", how="left")

    admissions_img['admission_datetime'] = pd.to_datetime(admissions_img['admission_date'] + ' ' + admissions_img['admission_time'])
    admissions_img['discharge_datetime'] = pd.to_datetime(admissions_img['discharge_date'] + ' ' + admissions_img['discharge_time'])
    admissions_img['length_of_stay'] = (
        admissions_img['discharge_datetime'] - admissions_img['admission_datetime']
    ).dt.total_seconds() / (24 * 3600)

    los = admissions_img.groupby("department", as_index=False)['length_of_stay'].mean()

    imaging['performed_date'] = pd.to_datetime(imaging['performed_date'])
    first_tests = imaging.sort_values(by=['DE_ID', 'test_name', 'performed_date']) \
                         .groupby(['DE_ID', 'test_name']).first().reset_index()
    q4_df = first_tests.pivot(index="DE_ID", columns="test_name", values="performed_date").reset_index()

    # Upload to DB
    los.to_sql("los", engine, if_exists="replace", index=False)
    q4_df.to_sql("q4_df", engine, if_exists="replace", index=False)

    # Row counts for ETL log
    row_count_los = len(los)
    row_count_q4 = len(q4_df)

except Exception as e:
    status = "FAILURE"
    error_message = str(e) + "\n" + traceback.format_exc()

duration = time.time() - start_time

# --- ETL log insert ---
source_files_meta = {
    "admissions_surg": {
        "path": args.admissions_surg,
        "size_bytes": os.path.getsize(args.admissions_surg)
    },
    "admissions_med": {
        "path": args.admissions_med,
        "size_bytes": os.path.getsize(args.admissions_med)
    },
    "imaging": {
        "path": args.imaging,
        "size_bytes": os.path.getsize(args.imaging)
    }
}

etl_log_df = pd.DataFrame([{
    "source_files": json.dumps(source_files_meta),
    "row_count_los": row_count_los,
    "row_count_q4": row_count_q4,
    "status": status,
    "duration_seconds": round(duration, 2),
    "error_message": error_message
}])

etl_log_df.to_sql("etl_log", engine, if_exists="append", index=False)

if status != "SUCCESS":
    raise SystemExit(f"ETL failed: {error_message}")


# Q6
To track each pipeline run’s success, failure, execution time, input files, and output data volumes, it’s best practice to keep an ETL log table in the Postgres database.

The ETL log table should include:

- etl_id: A unique, auto-incrementing ID for each ETL run.

- run_timestamp: The date and time when the ETL job started.

- source_files: Information about the input files processed (e.g., names, sizes) stored as JSON.

- row_count_los: Number of rows inserted into the “los” table.

- row_count_q4: Number of rows inserted into the “q4_df” table.

- status: Indicates if the run was “SUCCESS” or “FAILURE.”

- duration_seconds: Total runtime of the ETL in seconds.

- error_message: Error details if the ETL failed; empty if successful.


In [None]:
# Sample SQL for ETL log table
# This ETL log helps monitor pipeline health, troubleshoot errors, and track data lineage over time.
CREATE TABLE etl_log (
    etl_id SERIAL PRIMARY KEY,
    run_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    source_files JSONB NOT NULL,
    row_count_los INT,
    row_count_q4 INT,
    status TEXT CHECK (status IN ('SUCCESS', 'FAILURE')),
    duration_seconds NUMERIC(10,2),
    error_message TEXT
);


# Q7
Optimizing query speed by
- Indexing: Add indexes on genc_id, lab_test_name, and hospital_id.
- Partitioning: Use hospital_id for list partitioning (only 30 unique values).
- Materialized Views: Precompute frequent aggregations.
- **Avoid SELECT ***: Query only needed columns.
- Use EXPLAIN: Profile and optimize queries


# Q8

SQL for Missing Rate of lab_test_name_mapped_omop

sample sql below:

In [None]:
SELECT 
  l.hospital_id,
  COUNT(*) AS total_rows,
  COUNT(l.lab_test_name_mapped_omop) AS non_missing,
  COUNT(*) - COUNT(l.lab_test_name_mapped_omop) AS missing,
  ROUND(
    (COUNT(*) - COUNT(l.lab_test_name_mapped_omop))::NUMERIC / COUNT(*) * 100, 
    2
  ) AS missing_rate_percent
FROM 
  lab l
JOIN 
  ip_administrative ip ON l.genc_id = ip.genc_id
WHERE 
  ip.discharge_date > '2021-04-01'
GROUP BY 
  l.hospital_id;