In [None]:
%pip install pandas
%pip install boto3
%pip install psycopg2
%pip install sqlalchemy
%pip install dotenv


%pip matplotlib
%pip plotly
%pip seaborn 


# Run this cell to install these packages
# !pip install pandas boto3 sqlalchemy python-dotenv

In [2]:
# Import dependencies
import boto3
import os
import pandas as pd
from sqlalchemy import create_engine, text
import psycopg2
from io import StringIO

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

import plotly.express as px
import plotly.graph_objects as go


from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans, DBSCAN
from sklearn.metrics import silhouette_score
from sklearn.neighbors import KNeighborsClassifier, NearestNeighbors

#pipeline
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, RobustScaler
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer

sns.set(style="whitegrid")

%matplotlib inline

import warnings
warnings.filterwarnings("ignore")

# Load environment variables from .env file
from dotenv import load_dotenv
import logging

In [3]:
# Load environment variables
load_dotenv()

True

# 1:  LOAD DATA INTO AWS S3

In [4]:
# Verify and print the absolute path to the raw data folder
folder_data = os.path.abspath("./medoptix_data/raw")
print(folder_data)  # Verify the correct path

c:\Users\ibrah\Documents\Practicals\Amdari\medoptix-ai-internship\medoptix_data\raw


In [5]:
# Check if the folder exists
folder_path = "./medoptix_data/raw"
if not os.path.exists(folder_path):
    print(f"Error: Folder '{folder_path}' does not exist.")
else:
    print(f"Folder '{folder_path}' found successfully.")

Folder './medoptix_data/raw' found successfully.


In [6]:
# List of files to upload to S3 (if needed)
files = ['clinics.csv', 'dropout_flags.csv', 'feedback.csv', 'interventions.csv', 'patients.csv', 'sessions.csv']

In [10]:
# Load and clean all CSVs from local directory
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s:%(message)s')

def clean_column_names(df):
    df.columns = (
        df.columns.str.strip()
        .str.lower()
        .str.replace(' ', '_')
        .str.replace('-', '_')
    )
    return df

def remove_duplicates(df, subset=None):
    before = len(df)
    df = df.drop_duplicates(subset=subset)
    after = len(df)
    if before != after:
        logging.info(f"Removed {before - after} duplicates.")
    return df

def handle_missing_values(df, strategy='drop', fill_value=None):
    if strategy == 'drop':
        df = df.dropna()
    elif strategy == 'mean':
        df = df.fillna(df.mean(numeric_only=True))
    elif strategy == 'median':
        df = df.fillna(df.median(numeric_only=True))
    elif strategy == 'fill':
        df = df.fillna(fill_value)
    return df

def convert_types(df, type_map):
    for col, dtype in type_map.items():
        if col in df.columns:
            try:
                df[col] = df[col].astype(dtype)
            except Exception as e:
                logging.warning(f"Could not convert {col} to {dtype}: {e}")
    return df

def validate_schema(df, required_columns, file_name=None):
    missing = [col for col in required_columns if col not in df.columns]
    if missing:
        logging.warning(f"{file_name or ''} missing columns: {missing}")
        return False
    return True

def clean_dataframe(df, required_columns=None, type_map=None, na_strategy='drop', file_name=None):
    df = clean_column_names(df)
    schema_ok = True
    if required_columns:
        schema_ok = validate_schema(df, required_columns, file_name)
    if schema_ok:
        df = remove_duplicates(df)
        df = handle_missing_values(df, strategy=na_strategy)
        if type_map:
            df = convert_types(df, type_map)
    return df

schemas = {
    "patients": ["patient_id", "age", "gender", "bmi", "smoker", "chronic_cond", "injury_type", "signup_date", "consent", "clinic_id", "referral_source", "insurance_type"],
    "clinics": ["clinic_id", "city", "country", "type", "postcode", "capacity", "staff_count", "speciality", "avg_rating"],
    "sessions": ["session_id", "patient_id", "date", "week", "duration", "pain_level", "exercise_type", "home_adherence_pc", "satisfaction", "therapist_id"],
    "feedback": ["feedback_id", "session_id", "sentiment", "comments"],
    "dropout_flags": ["patient_id", "dropout", "dropout_week"],
    "interventions": ["intervention_id", "patient_id", "sent_at", "channel", "message","responded"]
}
type_maps = {
    "patients": {"age": "float", "bmi": "float"},
    "sessions": {"duration": "float", "week": "int", "pain_level": "float", "home_adherence_pc": "float", "satisfaction": "float"},
    "feedback": {"sentiment": "float"},
    "dropout_flags": {},
    "interventions": {}
}

dfs = {}
for file in files:
    path = os.path.join(folder_data, file)
    name = file.replace('.csv', '')
    if os.path.exists(path):
        df = pd.read_csv(path)
        req_cols = schemas.get(name)
        type_map = type_maps.get(name, {})
        df = clean_dataframe(df, required_columns=req_cols, type_map=type_map, na_strategy='drop', file_name=file)
        dfs[name] = df
        logging.info(f"Loaded and cleaned {file} with shape {df.shape}")
    else:
        logging.warning(f"{file} not found in {folder_data}")

2025-07-09 07:14:39,969 INFO:Loaded and cleaned clinics.csv with shape (8, 9)
2025-07-09 07:14:40,004 INFO:Loaded and cleaned dropout_flags.csv with shape (81, 3)
2025-07-09 07:14:40,679 INFO:Loaded and cleaned feedback.csv with shape (49165, 4)
2025-07-09 07:14:40,749 INFO:Loaded and cleaned interventions.csv with shape (5016, 6)
2025-07-09 07:14:40,904 INFO:Loaded and cleaned patients.csv with shape (1931, 12)
2025-07-09 07:14:41,418 INFO:Loaded and cleaned sessions.csv with shape (70236, 10)


In [None]:
for csv_file, table in table_mapping.items():
    print(f"\n→ Processing {csv_file} → {table}")
    df = load_csv_from_s3(csv_file)  # or read from local
    df_clean = clean_dataframe(
        df,
        required_columns=schemas[table],
        na_strategy="drop",
        file_name=csv_file
    )

    if df_clean is not None:
        os.makedirs("data/processed", exist_ok=True)
        processed_path = f"data/processed/{csv_file}"
        df_clean.to_csv(processed_path, index=False)
        print(f" Saved cleaned '{csv_file}' to {processed_path}")

        insert_to_postgres(df_clean, table, engine)
    else:
        print(f" Skipping '{csv_file}' due to cleaning/schema issues.")


In [11]:
# define variables (AKA parameters)
bucket_name = "amdari-demo-etl1"
folder_data = "./medoptix_data/raw"  # This is the local folder where the CSV files are stored
target_folder = "/medoptix/raw/"

# Activated a boto3 client (agent/interface) to connect to the S3 service
# Make sure you have configured your AWS credentials before running this script
s3 = boto3.client("s3")


try:    
    for filename in os.listdir(folder_data):   #looped into the data folder
        if filename.endswith(".csv"):    #check for only files that ended with .csv
            filepath = os.path.join(folder_data, filename) # define a path (eg /medoptix_data/patients.csv)
            s3.upload_file(
                Filename = filepath,                      #we used the client method (upload_file) to upload all the csv into our s3 bucket
                Bucket= bucket_name,
                Key = target_folder + filename   # combination of the path(s3) plus the filename (/medoptix/raw/patients.csv)
            )
            print(f"uploaded {filename} successfully")   # printed out the progress level
except Exception as e:
    print(f"An error occurred: {e}")

2025-07-09 07:15:21,640 INFO:Found credentials in shared credentials file: ~/.aws/credentials


uploaded clinics.csv successfully
uploaded dropout_flags.csv successfully
uploaded feedback.csv successfully
uploaded interventions.csv successfully
uploaded patients.csv successfully
uploaded sessions.csv successfully


In [12]:
# created a client
s3 = boto3.client("s3")

# defined our parameters
bucket = "amdari-demo-etl1"
prefix = "/medoptix/raw/"

# List the files in the S3 bucket to confirm upload
# List of files to upload to S3
files = ['clinics.csv', 'dropout_flags.csv', 'feedback.csv', 'interventions.csv', 'patients.csv', 'sessions.csv']

In [13]:
# Download files from S3 bucket to local directory

#looped through our desired files and dowload then from S3
try:
    for file in files:
        s3.download_file(bucket, prefix + file, file)  ### medoptix/raw/patients.csv
        print(f"Downloaded {file} successfully")
except Exception as e:
    print(f"An error occurred: {e}")

Downloaded clinics.csv successfully
Downloaded dropout_flags.csv successfully
Downloaded feedback.csv successfully
Downloaded interventions.csv successfully
Downloaded patients.csv successfully
Downloaded sessions.csv successfully


# 2: LOAD DATA FROM AWS S3 ------ POSTGRES DATABASE

In [None]:
# Define the database connection parameters
def get_db_connection():
     db_url = (
          f"postgresql://{os.environ['DB_USER']}:{os.environ['DB_PASSWORD']}@"
            f"{os.environ['DB_HOST']}:{os.environ['DB_PORT']}/{os.environ['DB_NAME']}"
     )

     return create_engine(db_url)

## Upload From S3 Bucket to PostGres

Import Libraries & Load Environment Variables

### Set Up Configuration

In [None]:
# PostgreSQL configuration
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")

# S3 configuration
#AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
#AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
S3_BUCKET = "amdari-demo-etl1"
S3_FOLDER = "/medoptix/raw/" # e.g., 'csv-data/',  This is the local folder where the CSV files are stored

# SQLAlchemy engine setup
engine = create_engine(f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")

### Table Mapping & S3 Access Functions

In [None]:
# Map filenames to corresponding table names in your database
table_mapping = {
    "clinics.csv": "clinics",
    "patients.csv": "patients",
    "sessions.csv": "sessions",
    "feedback.csv": "feedback",
    "dropout_flags.csv": "dropout_flags",
    "interventions.csv": "interventions" 
}

# Initialize boto3 S3 client

s3 = boto3.client("s3")

# Function to load a CSV file from S3 into a pandas DataFrame
def load_csv_from_s3(filename):
    key = f"{S3_FOLDER}{filename}"
    response = s3.get_object(Bucket=S3_BUCKET, Key=key)
    content = response['Body'].read().decode('utf-8')
    df = pd.read_csv(StringIO(content))
    return df

#### Insert the Files (Data) into Postgres

In [None]:
# Function to insert a DataFrame into PostgreSQL
def insert_to_postgres(df, table_name):
    df.to_sql(table_name, engine, chunksize= 1000, method= "multi", index=False, if_exists='append')
    print(f"Inserted into: {table_name} ({len(df)} records)")

#### Execute the Full Load

In [None]:
# Process each CSV file: load from S3 and insert into DB
for csv_file, table in table_mapping.items():
    print(f"\n Processing {csv_file} → {table}")
    df = load_csv_from_s3(csv_file)
    display(df.head())  # Preview top rows in notebook
    insert_to_postgres(df, table)

In [None]:
print("Columns in DataFrame:", df.columns.tolist())
# Close the SQLAlchemy engine
engine.dispose()

In [None]:
print(df.columns)

# <h2 align="center">Data Input</h2>


### Define a Function to Query & Display Results

In [2]:
# Create a function to fetch data from the database
def get_db_connection():
    db_url = (
        f"postgresql://{os.environ['DB_USER']}:{os.environ['DB_PASSWORD']}@"
        f"{os.environ['DB_HOST']}:{os.environ['DB_PORT']}/{os.environ['DB_NAME']}"
    )
    return create_engine(db_url)

# Instantiate the database connection
engine = get_db_connection()


# Define query to fetch data from each table
query_clinics = "SELECT * FROM clinics;"
query_patients = "SELECT * FROM patients;"
query_sessions = "SELECT * FROM sessions;"
query_feedback = "SELECT * FROM feedback;"
query_dropout_flags = "SELECT * FROM dropout_flags;"
query_interventions = "SELECT * FROM interventions;"

# Load data from each table into a DataFrame
clinics_df = pd.read_sql(query_clinics, engine)
patients_df = pd.read_sql(query_patients, engine)
sessions_df = pd.read_sql(query_sessions, engine)
feedback_df = pd.read_sql(query_feedback, engine)
dropout_flags_df = pd.read_sql(query_dropout_flags, engine)
interventions_df = pd.read_sql(query_interventions, engine)

#### Display Data

In [None]:
# Display the first few rows of each DataFrame
clinics_df.head(2)

In [None]:
patients_df.head(2)

In [None]:
sessions_df.head(2)

In [None]:
feedback_df.head(2)

In [None]:
dropout_flags_df.head(2)

In [None]:
interventions_df.head()