## Setup and Configuration

First, let's install the necessary libraries and configure our environment.

In [97]:
# The SageMaker Studio environment comes with most of these pre-installed.
# This cell ensures all dependencies are present.
!pip install -q boto3 sagemaker mlflow "scikit-learn>=1.0" "pandas>=1.2"
# !pip install -q boto3 sagemaker mlflow "scikit-learn>=1.7" "pandas>=2.3" "matplotlib>=3.10" "numpy>=2.0" "seaborn>=0.13"
# !pip install --upgrade sagemaker google-api-core grpcio
# !pip install "sagemaker==2.218.0" "protobuf==3.20.*" --force-reinstall

In [1]:
import sys
import subprocess
import time

# Ensure MLflow is installed
try:
    import mlflow
    import sagemaker_mlflow
except ImportError:
    print("Installing MLflow...")
    subprocess.check_call([sys.executable, "-m", "pip", "install",  "boto3==1.37.1", "botocore==1.37.1", "s3transfer", "mlflow==2.22.0", "sagemaker-mlflow==0.1.0"])
    import mlflow
    import sagemaker_mlflow

In [2]:
# import packages
import sagemaker
import boto3
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.datasets import make_classification
import os
import io
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
from pprint import pprint

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


### Set up Sagemaker session & S3 folder for assignment

In [3]:
# Setup SageMaker session
sagemaker_session = sagemaker.Session()
sagemaker_client = boto3.client("sagemaker")
s3_client = boto3.client("s3")

# --- IMPORTANT: CONFIGURE THESE VARIABLES ---
s3_bucket = sagemaker_session.default_bucket()
# ----------------------
# UPDATE THESE VARIABLES
bucket_name = 'iti113-team12-bucket'  # e.g., 'my-company-sagemaker-bucket'
base_folder = 'project'      # e.g., 'users/my-name'
# ----------------------


### Upload dataset

In [4]:
# # Load dataset
# df = pd.read_csv('volunteer_details.csv')

# Define the base path for our datasets
data_path = f"s3://{bucket_name}/{base_folder}/datasets/c17/working"
print(f"Working dataset at: {data_path}")

# # Define S3 path for dataset
# # os.makedirs(dataset_s3_path, exist_ok=True)
# dataset_s3_path = f"{data_path}/users/volunteer_details.csv"
# data_v1_s3_uri = os.path.dirname(dataset_s3_path)

# # Upload to S3
# print(f"Uploading data to {dataset_s3_path}")
# csv_buffer = io.StringIO()
# df.to_csv(csv_buffer, index=False)
# dataset_s3_key = f"{base_folder}/datasets/adl/users/volunteer_details.csv"
# s3_client.put_object(Bucket=bucket_name, Key=dataset_s3_key, Body=csv_buffer.getvalue())

# print(f"Dataset uploaded to: {dataset_s3_path}")
# # s3://iti113-team12-bucket/project/datasets/adl/users/volunteer_details.csv

Working dataset at: s3://iti113-team12-bucket/project/datasets/c17/working


In [5]:
# Define the base path for our datasets
# Scidirect ADL: s3://iti113-team12-bucket/project/datasets/adl/adl/
# Scidirect fall: s3://iti113-team12-bucket/project/datasets/adl/fall/ 
# Dryad C17: s3://iti113-team12-bucket/project/datasets/c17/
# data_path = f"s3://{bucket_name}/{base_folder}/datasets/adl"
# data_path_adl = "s3://iti113-team12-bucket/project/datasets/adl/adl/"
# data_path_fall = "s3://iti113-team12-bucket/project/datasets/adl/fall/"
# data_path_adlfalls_users = "s3://iti113-team12-bucket/project/datasets/adl/users/"

# print(f"S3 data paths: {data_path_adl}\n {data_path_fall} \n {data_path_adlfalls_users}")


### Check file exists in S3 (section not in use)

In [7]:
import boto3

s3 = boto3.client("s3")
resp = s3.list_objects_v2(
    Bucket="iti113-team12-bucket",
    Prefix="project/datasets/adl/users/volunteer_details.csv"
)
print(resp.get("Contents"))

# s3://iti113-team12-bucket/project/datasets/adl/users/volunteer_details.csv

[{'Key': 'project/datasets/adl/users/volunteer_details.csv', 'LastModified': datetime.datetime(2025, 8, 19, 8, 30, 13, tzinfo=tzlocal()), 'ETag': '"1b9bf950ed61e8618a4a5d37a85130ad"', 'ChecksumAlgorithm': ['CRC32'], 'ChecksumType': 'FULL_OBJECT', 'Size': 2086, 'StorageClass': 'STANDARD'}]


In [8]:
def check_file_exists(bucket, key):
    try:
        s3_client.head_object(Bucket=bucket, Key=key)
        print(f"✅ File found: s3://{bucket}/{key}")
        return True
    except s3_client.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            print(f"❌ File not found: s3://{bucket}/{key}")
            return False
        else:
            raise  # Something else went wrong

# s3://iti113-team12-bucket/project/datasets/adl/users/volunteer_details.csv
check_file_exists(bucket_name, "project/datasets/adl/users/volunteer_details.csv")

✅ File found: s3://iti113-team12-bucket/project/datasets/adl/users/volunteer_details.csv


True

### Set up tracking server

In [6]:
# Assuming you have your boto3 client and server name
tracking_server_name = "mlflow-elderly"

try:
    response = sagemaker_client.describe_mlflow_tracking_server(
        TrackingServerName=tracking_server_name
    )
    mlflow_tracking_server_arn = response['TrackingServerArn']
    print(f"Found MLflow Tracking Server ARN: {mlflow_tracking_server_arn}")
except Exception as e:
    print(f"Could not find tracking server: {e}")
    mlflow_tracking_server_arn = None

# ARN of your MLflow Tracking Server
# Find this in the SageMaker console or by running `aws sagemaker list-mlflow-tracking-servers`
# mlflow_tracking_server_arn = tracking_server_arn


# IAM role for SageMaker execution
sagemaker_role = sagemaker.get_execution_role()

print(f"SageMaker Role ARN: {sagemaker_role}")
print(f"MLflow Tracking Server ARN: {mlflow_tracking_server_arn}")

Found MLflow Tracking Server ARN: arn:aws:sagemaker:ap-southeast-1:837028399719:mlflow-tracking-server/mlflow-elderly
SageMaker Role ARN: arn:aws:iam::837028399719:role/iti113-team12-sagemaker-iti113-team12-domain-iti113-team12-Role
MLflow Tracking Server ARN: arn:aws:sagemaker:ap-southeast-1:837028399719:mlflow-tracking-server/mlflow-elderly


## Read and combine datasets

In [6]:
import boto3
import pandas as pd
import os

# === S3 info ===
bucket = bucket_name
prefix = "project/datasets/c17/"
output_key = "project/datasets/c17/working/combined_c17.csv"

s3 = boto3.client("s3")

# Optional: remove previous combined file
try:
    s3.delete_object(Bucket=bucket, Key=output_key)
    print("Previous combined file removed from S3.")
except Exception:
    pass

# Define expected columns
dtype_map = {
    "stamp_start": "string",
    "stamp_end": "string",
    "yaw_mean": "float64",
    "pitch_mean": "float64",
    "roll_mean": "float64",
    "rotation_rate_x_mean": "float64",
    "rotation_rate_y_mean": "float64",
    "rotation_rate_z_mean": "float64",
    "user_acceleration_x_mean": "float64",
    "user_acceleration_y_mean": "float64",
    "user_acceleration_z_mean": "float64",
    "yaw_std": "float64",
    "pitch_std": "float64",
    "roll_std": "float64",
    "rotation_rate_x_std": "float64",
    "rotation_rate_y_std": "float64",
    "rotation_rate_z_std": "float64",
    "user_acceleration_x_std": "float64",
    "user_acceleration_y_std": "float64",
    "user_acceleration_z_std": "float64",
    "rotation_magnitude_mean": "float64",
    "acceleration_magnitude_mean": "float64",
    "rotation_magnitude_std": "float64",
    "acceleration_magnitude_std": "float64",
    "speed_mean": "float64",
    "speed_std": "float64",
    "course_mode": "string",
    "course_std": "float64",
    "distance_from_home_mean": "float64",
    "distance_from_home_std": "float64",
    "distance_from_home_latitude_mean": "float64",
    "distance_from_home_latitude_std": "float64",
    "distance_from_home_longitude_mean": "float64",
    "distance_from_home_longitude_std": "float64",
    "bearing_from_home_mode": "string",
    "bearing_from_home_std": "float64",
    "time_of_day_radians": "float64",
    "time_of_day_sin": "float64",
    "time_of_day_cos": "float64",
    "day_of_week": "int8",
    "activity_label": "string"
}

expected_cols = list(dtype_map.keys())

local_tmp = "/tmp/combined_c17.csv"
if os.path.exists(local_tmp):
    os.remove(local_tmp)

bad_rows_report = []

# === List all CSV files ===
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
if "Contents" not in response:
    print("No files found in S3 path.")
else:
    for obj in response["Contents"]:
        key = obj["Key"]
        if not key.endswith(".csv") or any(x in key.lower() for x in ["combined", "train", "test"]):
            continue

        filename = os.path.basename(key)
        user_id = filename.split(".")[1].strip("p")  # e.g., "1"
        print(f"Processing {filename} | User={user_id}")

        try:
            s3_obj = s3.get_object(Bucket=bucket, Key=key)
            chunk_size = 100_000

            for chunk in pd.read_csv(s3_obj["Body"], chunksize=chunk_size, low_memory=False, on_bad_lines="warn"):
                # Keep only expected columns
                extra_cols = [c for c in chunk.columns if c not in expected_cols]
                missing_cols = [c for c in expected_cols if c not in chunk.columns]

                if extra_cols or missing_cols:
                    bad_rows_report.append({"file": filename, "extra": extra_cols, "missing": missing_cols})

                # Align columns
                for c in missing_cols:
                    chunk[c] = pd.NA
                chunk = chunk[expected_cols]

                # Filter rows with missing activity_label
                filtered_chunk = chunk[chunk["activity_label"].notna()].copy()
                filtered_chunk["user_id"] = user_id

                # Append to local temp CSV
                filtered_chunk.to_csv(local_tmp, mode="a", header=not os.path.exists(local_tmp), index=False)

        except Exception as e:
            print(f"Error reading {key}: {e}")
            continue

    # Upload final combined CSV to S3
    s3.upload_file(local_tmp, bucket, output_key)
    print("Final combined CSV uploaded to:", f"s3://{bucket}/{output_key}")

    if bad_rows_report:
        print("Warning: Some files had unexpected columns. Review them:")
        for r in bad_rows_report:
            print(r)


Previous combined file removed from S3.
Processing c17.p001.csv | User=001
Processing c17.p002.csv | User=002
Processing c17.p003.csv | User=003
Processing c17.p004.csv | User=004
Processing c17.p005.csv | User=005
Processing c17.p006.csv | User=006
Processing c17.p007.csv | User=007
Processing c17.p008.csv | User=008
Processing c17.p009.csv | User=009
Processing c17.p010.csv | User=010
Processing c17.p011.csv | User=011
Processing c17.p012.csv | User=012
Processing c17.p013.csv | User=013
Processing c17.p014.csv | User=014
Processing c17.p015.csv | User=015
Processing c17.p016.csv | User=016
Processing c17.p017.csv | User=017
Processing c17.p018.csv | User=018
Processing c17.p019.csv | User=019
Processing c17.p020.csv | User=020
Processing c17.p021.csv | User=021
Processing c17.p022.csv | User=022
Processing c17.p023.csv | User=023
Processing c17.p024.csv | User=024
Processing c17.p025.csv | User=025
Processing c17.p026.csv | User=026
Processing c17.p027.csv | User=027
Processing c17.

In [7]:
try:
    full_data_output_path = "s3://iti113-team12-bucket/project/datasets/c17/working/combined_c17.csv"
    df = pd.read_csv(full_data_output_path)#, on_bad_lines="skip")
    print("Successfully loaded combined dataset:")
    print(df.head())
except Exception as e:
    print(f"An error occurred: {e}")
    print("\nError loading combined csv file.")

Successfully loaded combined dataset:
               stamp_start                stamp_end  yaw_mean  pitch_mean  \
0  2023-01-28 10:57:21.821  2023-01-28 10:58:21.784 -0.690362   -0.608920   
1  2023-01-28 10:57:22.821  2023-01-28 10:58:22.783 -0.659091   -0.607907   
2  2023-01-28 10:57:23.820  2023-01-28 10:58:23.783 -0.616051   -0.607774   
3  2023-01-28 10:57:24.819  2023-01-28 10:58:24.782 -0.579319   -0.609040   
4  2023-01-28 10:57:25.819  2023-01-28 10:58:25.781 -0.542710   -0.610304   

   roll_mean  rotation_rate_x_mean  rotation_rate_y_mean  \
0  -0.608920              0.014896             -0.021061   
1  -0.607907              0.003650             -0.020898   
2  -0.607774              0.003679             -0.023021   
3  -0.609040              0.003764             -0.022758   
4  -0.610304              0.003763             -0.022828   

   rotation_rate_z_mean  user_acceleration_x_mean  user_acceleration_y_mean  \
0              0.018753                  0.000663          

In [9]:
print(bad_rows)

## Create scripts to be used in SageMaker Pipeline

#### 0. Feature Engineering

In [8]:

%%writefile health_featureEngr.py

import argparse
import os
import pandas as pd
import numpy as np

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--input-path', type=str, required=True, help="Directory containing main dataset")
    parser.add_argument("--output-path", type=str, required=True, help="Output directory for dataset with engineered features")
    args = parser.parse_args()

    # Use provided paths or fall back to SageMaker defaults
    input_path = args.input_path
    output_path = args.output_path

    os.makedirs(output_path, exist_ok=True)


    # Load data
    input_file = os.path.join(input_path, 'combined_c17.csv')
    print(f"Reading input file from {input_file}...")
    df = pd.read_csv(input_file)

    # ==============================================

    print("Engineering features...")

    # --------------------------------
    # Cleaning speed & course features
    # --------------------------------
    # Change speed, course -1 to NaN
    df[["speed_mean", "speed_std", "course_mode", "course_std"]] = df[["speed_mean", "speed_std", "course_mode", "course_std"]].replace(-1, np.nan)
    # drop rows with empty speed, course
    df2 = df[
        df["course_mode"].notna() &
        df["speed_mean"].notna() &
        df["speed_std"].notna() &
        df["course_std"].notna()
    ].copy().reset_index(drop=True)

    # -----------------------------
    # Timestamp feature engineering
    # -----------------------------
    # Convert 'stamp_start' and 'stamp_end' into 'timeframe'
    df2["start_time"] = pd.to_datetime(df2["stamp_start"]) # Convert to datetime
    df2["end_time"] = pd.to_datetime(df2["stamp_end"])
    df2["timeframe"] = df2["end_time"] - df2["start_time"] # Row-wise timeframe (end - start)
    df2["timeframe_seconds"] = df2["timeframe"].dt.total_seconds()

    # Get total duration of each activity for each user, each instance
    df2["stamp_start"] = pd.to_datetime(df2["stamp_start"], errors="coerce")
    df2["end_time"] = pd.to_datetime(df2["end_time"], errors="coerce")

    df2 = df2.sort_values([     # Sort by user, activity, and start time
        "user_id", 
        "activity_label", 
        "stamp_start"]).reset_index(drop=True)
    df2["activity_instance"] = (
        (df2["stamp_start"] > df2.groupby(["user_id", "activity_label"])["end_time"].shift())
        .groupby([df2["user_id"], df2["activity_label"]])
        .cumsum()
    )   # Define new instance when gap occurs between end time and new start time
    total_time = (
        df2.groupby(["user_id", "activity_label", "activity_instance"])
        .agg(duration=("end_time", lambda x: x.max() - x.min()))
        .reset_index()
    )   # Compute duration per instance
    total_time["duration_minutes"] = total_time["duration"].dt.total_seconds() / 60 # convert to mins
    df2 = df2.merge(total_time, on=["user_id", "activity_label", "activity_instance"], how="left")

    # frequency of activity per user per day & per user per week
    df2["date"] = df2["start_time"].dt.date
    df2["week"] = df2["start_time"].dt.to_period("W").apply(lambda r: r.start_time.date())
    daily_freq = (
        df2.groupby(["user_id", "activity_label", "date"])["activity_instance"]
        .nunique() # count unique activity instances
        .reset_index(name="daily_freq")
    )
    weekly_freq = (
        df2.groupby(["user_id", "activity_label", "week"])["activity_instance"]
        .nunique()
        .reset_index(name="weekly_freq")
    )
    df2 = df2.merge(daily_freq, on=["user_id", "activity_label", "date"], how="left")
    df2 = df2.merge(weekly_freq, on=["user_id", "activity_label", "week"], how="left")

    # drop unused
    df2 = df2.drop(columns=['stamp_start','stamp_end',
                        'start_time','end_time',
                        'date','week',
                        'duration','timeframe'])

    # -------------------------------
    # Cyclic day-of-week features
    # ---------------------------
    df2["day_of_week_sin"] = np.sin(2*np.pi*df2["day_of_week"]/7)
    df2["day_of_week_cos"] = np.cos(2*np.pi*df2["day_of_week"]/7)

    # -------------------------------------
    # Activity duration normalized per user
    # (compares each activity instance against that user’s typical duration across all activities)
    # -------------------------------------
    # calculate duration per activity instance 
    df_instance = (
        df2.groupby(['user_id', 'activity_label', 'activity_instance'])
        .agg(duration_minutes=('duration_minutes', 'first'))  # or recalc if needed
        .reset_index()
    )
    # Compute mean and std per user
    user_stats = (
        df_instance.groupby('user_id')['duration_minutes']
                .agg(user_mean='mean', user_std='std')
                .reset_index()
    )

    # Merge back
    df_instance = df_instance.merge(user_stats, on='user_id', how='left')

    # Compute z-score
    df_instance['duration_zscore_user_all_acts'] = (
        (df_instance['duration_minutes'] - df_instance['user_mean']) / df_instance['user_std']
    )

    # -----------------------------
    # Activity duration normalized per user per activity
    # (compares each instance against that user's pattern for that specific activity)
    # -----------------------------
    # Compute mean and std per user & activity
    user_act_stats = (
        df_instance.groupby(['user_id', 'activity_label'])['duration_minutes']
                .agg(user_act_mean='mean', user_act_std='std')
                .reset_index()
    )

    # Merge back
    df_instance = df_instance.merge(user_act_stats, on=['user_id', 'activity_label'], how='left')

    # Compute z-score
    df_instance['duration_zscore_user_each_act'] = (
        (df_instance['duration_minutes'] - df_instance['user_act_mean']) / df_instance['user_act_std']
    )
    # z-score = 0 means this activity is the user's typical duration
    df_instance['duration_zscore_user_each_act'].fillna(0, inplace=True)
    # Merge z-scores back to df2
    df2 = df2.merge(
            df_instance[['user_id', 'activity_label', 'activity_instance',
                        'duration_zscore_user_all_acts', 'duration_zscore_user_each_act']],
            on=['user_id', 'activity_label', 'activity_instance'],
            how='left'
    )

    # ----------------
    # Combinations of motion
    # -----------------
    df2["total_orientation_variability"] = df2["yaw_std"] + df2["pitch_std"] + df2["roll_std"]
    df2["total_orientation"] = df2["yaw_mean"] + df2["pitch_mean"] + df2["roll_mean"]
    df2["total_motion"] = df2["rotation_magnitude_mean"] + df2["acceleration_magnitude_mean"]
    df2["total_motion_variability"] = df2["rotation_magnitude_std"] + df2["acceleration_magnitude_std"]
    df2["speed_n_orientation_stability"] = df2["speed_mean"] / (df2["rotation_magnitude_std"] + 1e-6) # 1e-6 to avoid division by 0

    # ----------------
    # Combinations of time-related features
    # ----------------
    df2["duration_n_daily_freq"] = df2["duration_minutes"] * df2["daily_freq"]
    df2["duration_n_weekly_freq"] = df2["duration_minutes"] * df2["weekly_freq"]

    # ------------
    # Binning distance from home mean
    # --------------
    # fixed bins (e.g. in meters)
    bins = [0, 50, 500, 2000, 5000, float('inf')]
    labels = ['At home (0-50m)', 'Within walking-distance (50-500m)', '30-mins-walk (500-2km)', 'Far (2km-5km)', 'Very far (>5km)']

    df2['distance_from_home_mean_bin_fixed'] = pd.cut(
        df2['distance_from_home_mean'],
        bins=bins,
        labels=labels,
        include_lowest=True
    )

    df2 = df2.drop(columns='distance_from_home_mean')

    # ========================================
    

    # Save data with engineered features
    data_output = os.path.join(output_path, "combined_c17_engineered.csv")

    print(f"Saving train to {data_output}")
    df2.to_csv(data_output, index=False)

    print("Feature engineering complete.")


Overwriting health_featureEngr.py


#### 1. Data Cleaning

In [9]:
%%writefile health_datacleaning.py

import argparse
import os
import pandas as pd
from sklearn.model_selection import train_test_split

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--input-path', type=str, help="Directory containing main dataset")
    parser.add_argument("--output-train-path", type=str, help="Output directory for train.csv")
    parser.add_argument("--output-test-path", type=str, help="Output directory for test.csv")
    args = parser.parse_args()

    # Use provided paths or fall back to SageMaker defaults
    input_path = args.input_path
    output_train_path = args.output_train_path
    output_test_path = args.output_test_path

    os.makedirs(output_train_path, exist_ok=True)
    os.makedirs(output_test_path, exist_ok=True)

    # Load data
    input_file = os.path.join(input_path, 'combined_c17_engineered.csv')

    if not os.path.exists(input_file):
        raise FileNotFoundError(f"Required input file not found: {input_file}")
    else:
        print(f"Reading input file from {input_file}...")
    df2 = pd.read_csv(input_file)

    # ===========================
    print('Cleaning data...')
    # ---------
    # drop duplicates based on all columns except 'user_id', keep first row
    # ---------
    print('Removing duplicates...')
    df2 = df2.drop_duplicates(subset=df2.columns.difference(['user_id']), keep='first')

    # ------------------------
    # Standardize activities
    # (not necessary for labelling activities!)
    # ------------------------
    # # Convert to lowercase for easier matching
    # df2['activity_label'] = df2['activity_label'].astype(str).str.lower()

    # # 4. Activity categorization by health relevance
    # health_categories = {
    #     'Essential ADLs': ['chores', 'eat', 'hygiene'],
    #     'Health Indicators': ['exercise', 'sleep'],  
    #     'Independence/Social': ['errands', 'travel'],
    #     'Leisure/Behavioral': ['relax', 'entertainment', 'hobby'],
    #     'Cognitive': ['job','school']
    # }

    # # Invert the mapping: activity -> category
    # activity_to_category = {activity: category 
    #                         for category, activities in health_categories.items() 
    #                         for activity in activities}

    # # Map to a new column
    # df2["activity_category"] = df2["activity_label"].map(activity_to_category)

    # # Categorizing into ADLs
    # adl_mapping = {
    #     "ADL": ["eat", "hygiene", "chores", "exercise", "sleep"],
    #     "IADL": ["errands", "travel", "relax", "entertainment", "hobby", "job", "school"]
    # }

    # # Map to new column
    # df2["ADL_category"] = df2["activity_label"].map(
    #     {a: k for k, v in adl_mapping.items() for a in v}
    # )

    # Drop some less relevant activities
    print('Dropping some activities...')
    activities_to_drop = ["job", "school", "travel"]
    df2 = df2[~df2["activity_label"].isin(activities_to_drop)].reset_index(drop=True)

    # ----------------------------------------
    print('Dropping some features...')
    # --------------
    # Drop features
    # --------------

    # Drop correlated features & weak predictors
    features_to_drop = [
        "user_acceleration_y_mean",
        "user_acceleration_y_std",
        "total_motion",
        "total_motion_variability",
        "weekly_freq",
        "rotation_magnitude_std",
        "time_of_day_radians",
        "rotation_rate_x_std",
        "acceleration_magnitude_mean",
        "user_acceleration_z_std",
        "distance_from_home_std",
        "rotation_magnitude_mean",
        "user_acceleration_x_std",
        "user_acceleration_y_std",
        "daily_freq",
        "rotation_rate_y_std",
        "rotation_rate_x_std",
        "rotation_rate_z_std",
        "rotation_rate_y_std",
        "rotation_rate_y_std",
        "user_acceleration_x_std",
        "user_acceleration_y_std",
        "rotation_rate_x_std",
        "rotation_rate_y_std",
        "user_acceleration_x_std",
        "user_acceleration_y_std",
        "rotation_rate_z_std",
        "rotation_rate_z_std",
        "rotation_rate_x_std",
        "yaw_std",
        "acceleration_magnitude_std",
        "user_acceleration_z_std",
        "rotation_rate_y_std",
        "rotation_rate_y_std",
        "rotation_rate_z_std",
        "rotation_rate_z_std",
        "time_of_day_radians",
        "user_acceleration_z_std",
        "total_motion_variability",
        "rotation_magnitude_std",
        "rotation_rate_y_std",
        "rotation_magnitude_std",
        "rotation_rate_y_std",
        "rotation_magnitude_std",
        "rotation_rate_x_std",
        "user_acceleration_x_std",
        "user_acceleration_y_std",
        "rotation_rate_y_std",
        "roll_std",
        "user_acceleration_z_std",
        "rotation_rate_z_std",
        "rotation_rate_x_std",
        "rotation_rate_x_std",
        "rotation_rate_x_std",
        "rotation_rate_x_std",
        "rotation_rate_z_std",
        "acceleration_magnitude_std",
        "acceleration_magnitude_std",
        "user_acceleration_x_std",
        "user_acceleration_y_std",
        "rotation_magnitude_std",
        "rotation_rate_x_std",
        "user_acceleration_z_std",
        "distance_from_home_latitude_std",
        "distance_from_home_longitude_std"
    ]

    # Dropping additional features
    print('Dropping additional features...')
    addn_features_to_drop = [
        'time_of_day_cos',
        'time_of_day_sin',
        'day_of_week',
        'day_of_week_sin',
        'day_of_week_cos',
        'distance_from_home_latitude_mean',
        'distance_from_home_longitude_mean',
        'bearing_from_home_mode',
        'bearing_from_home_std',
        'distance_from_home_mean_bin_fixed',
        'speed_mean',
        'speed_std',
        'course_mode',
        'course_std'
    ]

    df3 = df2.drop(columns=features_to_drop+['user_id']+addn_features_to_drop)

    # -----------------------------------------
    # =================
    # Splitting data
    # =================
    print("Splitting into train/test...")
    # Stratified train-test split (stratify by health_status)
    df3 = df3.dropna(subset=['activity_label'])  # can't stratify on NaNs in target (if any)
    train_df, test_df = train_test_split(
        df3,
        test_size=0.2,
        stratify=df3['activity_label'],
        random_state=42
    )

    # Save splits
    train_output = os.path.join(output_train_path, "train.csv")
    test_output = os.path.join(output_test_path, "test.csv")

    print(f"Saving train to {train_output}")
    train_df.to_csv(train_output, index=False)

    print(f"Saving test to {test_output}")
    test_df.to_csv(test_output, index=False)

    print("Data cleaning complete.")


Overwriting health_datacleaning.py


#### 2. Preprocessor (in progress)

In [10]:
%%writefile health_preprocessor.py

import argparse
import os
import pandas as pd
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import FunctionTransformer
from sklearn.utils import resample

# ---------------------
# Include GroupImputer
# ---------------------
# -----------
# Main script
# ------------
if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--input-train-path', type=str, help="Previous output directory for train.csv")
    parser.add_argument('--output-train-path', type=str, help="Output directory for processed_train.csv")
    parser.add_argument('--input-test-path', type=str, help="Previous output directory for test.csv")
    parser.add_argument('--output-test-path', type=str, help="Output directory for processed_test.csv")
    args = parser.parse_args()

    input_train_path = os.path.join(args.input_train_path, "train.csv")
    output_train_path = args.output_train_path
    input_test_path = os.path.join(args.input_test_path, "test.csv")
    output_test_path = args.output_test_path

    # Make output directory
    os.makedirs(output_train_path, exist_ok=True)
    os.makedirs(output_test_path, exist_ok=True)

    # Load train and test data
  
    if not os.path.exists(input_train_path):
        raise FileNotFoundError(f"Train data not found: {input_train_path}")
    print(f"Reading input file from {input_train_path}...")
    train_df = pd.read_csv(input_train_path)

    if not os.path.exists(input_test_path):
        raise FileNotFoundError(f"Test data not found: {input_test_path}")
    print(f"Reading input file from {input_test_path}...")
    test_df = pd.read_csv(input_test_path)

    # ---------------------------
    # Balancing imbalanced data (unused)
    # ---------------------------
    print('Balancing imbalanced data...')
    X_train = train_df.drop(columns=['activity_label'])
    y_train = train_df['activity_label']

    # # Combine training features and target for resampling
    # train_df = pd.concat([X_train, y_train], axis=1)

    # # Separate majority and minority classes in the training set only
    # train_majority = train_df[train_df.health_status == 0]
    # train_minority = train_df[train_df.health_status == 1]

    # len_maj = len(train_majority)
    # len_min = len(train_minority)
    # n_samples_each = round((len_maj+len_min)/2)
    # print(n_samples_each)

    # # Upsample minority class in the training set
    # train_minority_upsampled = resample(train_minority, 
    #                                     replace=True,    # sample with replacement
    #                                     n_samples=n_samples_each,  # number of samples in upsampled minority class (reduced due to resource constraints)
    #                                     random_state=42) # reproducible results

    # # Downsample majority class in the training set
    # train_majority_downsampled = resample(train_majority, 
    #                                     replace=False,    # sample without replacement
    #                                     n_samples=n_samples_each,   # number of samples in downsampled majority class (reduced due to resource constraints)
    #                                     random_state=42)  # reproducible results

    # # Combine upsampled minority class with downsampled majority class
    # train_combined = pd.concat([train_majority_downsampled, train_minority_upsampled])

    # # Shuffle and reset index
    # train_combined = train_combined.sample(frac=1, random_state=42).reset_index(drop=True)

    # # Re-separate the features and target in the combined training set
    # X_train = train_combined.drop(columns='activity_label')
    # y_train = train_combined['activity_label']

    # Only separate the features and target in the combined test set (without resampling)
    X_test = test_df.drop(columns='activity_label')
    y_test = test_df['activity_label']
    
    # ---------------------
    # Define pipeline columns
    # ---------------------
    # nominal_cols_encode = ['distance_from_home_mean_bin_fixed']
    numerical_cols_scale = X_train.select_dtypes(include='number').columns.tolist()
    
    # ---------------------
    # Build pipelines
    # ---------------------
        
    # nominal_onehot_pipe = Pipeline([
    #     ('onehot', OneHotEncoder(handle_unknown='ignore'))
    # ])

    numerical_scale_pipe = Pipeline([
        ('scaler', StandardScaler())
    ])

    preprocessor = ColumnTransformer([
        # ('nominal_encode', nominal_onehot_pipe, nominal_cols_encode),
        ('num_scale', numerical_scale_pipe, numerical_cols_scale)
    ], remainder='passthrough', verbose_feature_names_out=False)

    # ---------------------
    # Fit on train, transform train/test
    # ---------------------
    print("Train columns:", X_train.columns.tolist())
    print("Test columns:", X_test.columns.tolist())
    X_train_trf = preprocessor.fit_transform(X_train)
    X_test_trf = preprocessor.transform(X_test)

    # Convert back to DataFrame with feature names
    feature_names = preprocessor.get_feature_names_out()
    X_train_transformed = pd.DataFrame(X_train_trf, columns=feature_names)
    X_test_transformed = pd.DataFrame(X_test_trf, columns=feature_names)

    # Add back Y
    train_transformed = pd.concat([X_train_transformed, y_train], axis=1)
    test_transformed = pd.concat([X_test_transformed, y_test], axis=1)
    
    # ---------------------
    # Save transformed datasets
    # ---------------------
    train_output_file = os.path.join(output_train_path, "train_processed.csv")
    test_output_file = os.path.join(output_test_path, "test_processed.csv")

    train_transformed.to_csv(train_output_file, index=False)
    test_transformed.to_csv(test_output_file, index=False)

    print(f"Processed train saved to {train_output_file}")
    print(f"Processed test saved to {test_output_file}")

    print("Preprocessing complete.")

Overwriting health_preprocessor.py


#### 3. Training (in progress)

In [11]:
%%writefile requirements.txt
# boto3>=1.34
# botocore>=1.34
mlflow
sagemaker-mlflow
scikit-learn
pandas
joblib
numpy

Overwriting requirements.txt


In [12]:
%%writefile health_train.py

import sys
import subprocess

# Ensure MLflow is installed
try:
    import mlflow
    import sagemaker_mlflow
except ImportError:
    print("Installing MLflow...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "mlflow", "sagemaker-mlflow"])
    # subprocess.check_call([sys.executable, "-m", "pip", "install",  "boto3==1.28.57", "botocore==1.31.85", "s3transfer", "mlflow", "sagemaker-mlflow"])
    import mlflow
    import sagemaker_mlflow

# import mlflow
# import sagemaker_mlflow
import mlflow.sklearn
import os
import argparse
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import RandomizedSearchCV
import joblib
import glob
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score, roc_auc_score

def evaluate_classification(y_target, y_prediction, y_prediction_proba) -> list:
    scores = {}
    scores['accuracy']  = accuracy_score(y_target, y_prediction)
    scores['precision'] = precision_score(y_target, y_prediction, average='macro')
    scores['recall']    = recall_score(y_target, y_prediction, average='macro')
    scores['f1']        = f1_score(y_target, y_prediction, average='macro')
    
    scores['roc_auc'] = None
    if y_prediction_proba is not None:
        try:
            scores['roc_auc'] = roc_auc_score(
                y_target, y_prediction_proba, multi_class='ovr', average='macro'
            )
        except ValueError:
            pass
    
    return scores

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--tracking-server-arn", type=str, required=True)
    parser.add_argument("--experiment-name", type=str, default="Default")
    parser.add_argument("--model-output-path", type=str, default="/opt/ml/model", help="Output directory for model.joblib")
    # Hyperparameters
    parser.add_argument("--n_estimators", type=int, default=100)
    parser.add_argument("--max_depth", type=int, default=10)
    parser.add_argument("--min_samples_leaf", type=int, default=5)
    parser.add_argument("--min_samples_split", type=int, default=10)
    parser.add_argument("--max_features", type=str, default="sqrt")
    parser.add_argument('--bootstrap', action='store_true')
    parser.add_argument('--no-bootstrap', dest='bootstrap', action='store_false')
    parser.set_defaults(bootstrap=True)

    # parser.add_argument("--n-estimators", type=int, default=100)
    # parser.add_argument("--max-depth", type=int, default=10)
    # parser.add_argument("--min-samples-leaf", type=int, default=5)
    # parser.add_argument("--max-features", type=str, default='sqrt')
    # parser.add_argument("--min-samples-split", type=int, default=10)
    # # parser.add_argument("--bootstrap", type=bool, default=True)
    # parser.add_argument('--bootstrap', action='store_true', help="Enable bootstrap")
    # parser.add_argument('--no-bootstrap', dest='bootstrap', action='store_false', help="Disable bootstrap")
    parser.set_defaults(bootstrap=True)


    args, _ = parser.parse_known_args()

    # Make output directory
    model_output_path = args.model_output_path or "/opt/ml/model"
    os.makedirs(model_output_path, exist_ok=True)
    
    # Load training data
    files = glob.glob("/opt/ml/input/data/train/train_processed.csv")
    if not files:
        raise FileNotFoundError(f"Processed train data not found: {input_train}")
    input_train = files[0]

    # input_train = glob.glob("/opt/ml/input/data/train/train_processed.csv")[0] # first train_processed.csv
    # if not os.path.exists(input_train):
    #     raise FileNotFoundError(f"Processed train data not found: {input_train}")
    df = pd.read_csv(input_train)
    X = df.drop(columns='activity_label')
    y = df['activity_label']

    # Set up MLflow
    mlflow.set_tracking_uri(args.tracking_server_arn)
    print("MLflow tracking URI set successfully.")
    mlflow.set_experiment(args.experiment_name)
    print(f"MLflow tracking URI set to: {mlflow.get_tracking_uri()}")
    print(f"MLflow experiment set to: '{args.experiment_name}'")

    with mlflow.start_run() as run:
        # Log parameters

        mlflow.log_param("bootstrap", bool(args.bootstrap)) 
        mlflow.log_param("max_depth", int(args.max_depth))
        mlflow.log_param("max_features", args.max_features)  # str
        mlflow.log_param("min_samples_leaf", int(args.min_samples_leaf))
        mlflow.log_param("min_samples_split", int(args.min_samples_split))
        mlflow.log_param("n_estimators", int(args.n_estimators))

        model = RandomForestClassifier(
            bootstrap = bool(args.bootstrap),
            max_depth = int(args.max_depth),
            max_features = args.max_features,
            min_samples_leaf = int(args.min_samples_leaf),
            min_samples_split = int(args.min_samples_split),
            n_estimators = int(args.n_estimators),
            random_state = 42
        )
        print("Fitting to model...")
        model.fit(X, y)
        print("Predicting on training set...")
        y_pred = model.predict(X)
        y_pred_proba = model.predict_proba(X)

        scores = evaluate_classification(y, y_pred, y_pred_proba)

        for label, value in scores.items():
            mlflow.log_metric(label, value)
    
        mlflow.sklearn.log_model(sk_model=model, artifact_path="model")
    
        joblib.dump(model, os.path.join(args.model_output_path, "model.joblib"))
        with open(os.path.join(args.model_output_path, "run_id.txt"), "w") as f:
            f.write(run.info.run_id)
    
        print(f"Training complete. Scores: \n{scores}")
        print(f"MLflow Run ID: {run.info.run_id}")

Overwriting health_train.py


#### 4. Evaluation (in progress)

In [25]:
%%writefile health_evaluate.py

# Ensure MLflow is installed
import sys
import subprocess
try:
    import mlflow
    import sagemaker_mlflow
except ImportError:
    print("Installing MLflow...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "mlflow", "sagemaker-mlflow"])
    # subprocess.check_call([sys.executable, "-m", "pip", "install",  "boto3==1.28.57", "botocore==1.31.85", "s3transfer", "mlflow", "sagemaker-mlflow"])
    import mlflow
    import sagemaker_mlflow

import argparse
import pandas as pd
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score
import joblib
import os
import json
import boto3
import tarfile
import mlflow


def evaluate_classification(y_target, y_prediction, y_prediction_proba) -> list:
    scores = {}
    scores['accuracy']  = accuracy_score(y_target, y_prediction)
    scores['precision'] = precision_score(y_target, y_prediction, average='macro')
    scores['recall']    = recall_score(y_target, y_prediction, average='macro')
    scores['f1']        = f1_score(y_target, y_prediction, average='macro')
    
    scores['roc_auc'] = None
    if y_prediction_proba is not None:
        try:
            scores['roc_auc'] = roc_auc_score(
                y_target, y_prediction_proba, multi_class='ovr', average='macro'
            )
        except ValueError:
            pass
    
    return scores

if __name__ == "__main__":
    # --- Parse Arguments ---
    parser = argparse.ArgumentParser()
    parser.add_argument("--model-path", type=str, required=True, help="Previous output directory for model.joblib.")
    parser.add_argument("--input-test-path", type=str, required=True, help="Previous output directory for test_processed.csv")
    parser.add_argument("--output-path", type=str, required=True, help="Path to save the evaluation.json report.")
    parser.add_argument("--model-package-group-name", type=str, required=True, help="Name of the SageMaker Model Package Group.")
    parser.add_argument("--region", type=str, required=True, help="The AWS region for creating the boto3 client.")
    args = parser.parse_args()

    # --- Extract and Load Model ---
    # SageMaker packages models in a .tar.gz file. We need to extract it first.
    model_archive_path = os.path.join(args.model_path, 'model.tar.gz')
    print(f"Extracting model from archive: {model_archive_path}")
    with tarfile.open(model_archive_path, "r:gz") as tar:
        tar.extractall(path=args.model_path)

    # Load the model using joblib
    model_file_path = os.path.join(args.model_path, "model.joblib")
    if not os.path.exists(model_file_path):
        raise FileNotFoundError(f"Model file 'model.joblib' not found after extraction in: {args.model_path}")
    
    print(f"Loading model from: {model_file_path}")
    model = joblib.load(model_file_path)

    # Load test data
    input_test_path = args.input_test_path
    input_test = os.path.join(input_test_path, "test_processed.csv")
    if not os.path.exists(input_test):
        raise FileNotFoundError(f"Processed test data not found: {input_test}")
    df = pd.read_csv(input_test)
    X_test = df.drop(columns='activity_label')
    y_test = df['activity_label']

    print("Running predictions on the test dataset.")
    y_test_pred = model.predict(X_test)
    y_test_pred_proba = model.predict_proba(X_test)

    scores = evaluate_classification(y_test, y_test_pred, y_test_pred_proba)

    for label, value in scores.items():
        mlflow.log_metric(label, value)

    print(f"Evaluation complete. Scores: \n{scores}")

    # accuracy = accuracy_score(y_test, y_test_pred)
    # f1_macro = f1_score(y_test, y_test_pred, average="macro")
    # report = {"accuracy": accuracy,
    #          "f1": f1_macro} # labelled as 'f1' instead of 'f1_macro' in case of change
    # print(f"Calculated accuracy: {accuracy:.4f}")
    # print(f"Calculated f1_macro: {f1_macro:.4f}")

    # report = {"scores": scores}
    # Convert your scores dict into SageMaker evaluation schema
    # report = {
    #     "binary_classification_metrics": {
    #         "accuracy":  {"value": float(scores["accuracy"])},
    #         "precision": {"value": float(scores["precision"])},
    #         "recall":    {"value": float(scores["recall"])},
    #         "f1":        {"value": float(scores["f1"])},
    #         "roc_auc":   {"value": float(scores["roc_auc"])} if scores["roc_auc"] is not None else None
    #     },
    #      "baseline_exists": False
    # }
    # report = {
    #     "binary_classification_metrics": {
    #         "accuracy":  {"value": float(scores["accuracy"])},
    #         "precision": {"value": float(scores["precision"])},
    #         "recall":    {"value": float(scores["recall"])},
    #         "f1":        {"value": float(scores["f1"])},
    #     }
    # }
    report = {"accuracy": scores["accuracy"],
              "precision": scores["precision"],
              "recall": scores["recall"],
             "f1": scores["f1"],
             "roc_auc": scores["roc_auc"]}


    # --- Check for Existing Baseline Model in SageMaker Model Registry ---
    print(f"Checking for baseline model in region: {args.region}")
    sagemaker_client = boto3.client("sagemaker", region_name=args.region)
    try:
        response = sagemaker_client.list_model_packages(
            ModelPackageGroupName=args.model_package_group_name, 
            ModelApprovalStatus="Approved", # filter for approved models only
            SortBy="CreationTime",
            SortOrder="Descending",
            MaxResults=1,
        )
        # If the list is not empty, an approved model already exists
        report["baseline_exists"] = len(response["ModelPackageSummaryList"]) > 0 # should include all models regardless of approval status
        if report["baseline_exists"]:
            print(f"An approved baseline model was found in '{args.model_package_group_name}'.")
        else:
             print(f"No approved baseline model was found in '{args.model_package_group_name}'.")

    except sagemaker_client.exceptions.ClientError as e:
        # If the ModelPackageGroup doesn't exist, there is no baseline
        if "ResourceNotFound" in str(e):
            report["baseline_exists"] = False
            print(f"Model Package Group '{args.model_package_group_name}' not found. Assuming no baseline exists.")
        else:
            raise

    # --- Write Final Report ---
    os.makedirs(args.output_path, exist_ok=True)
    report_path = os.path.join(args.output_path, "evaluation.json")
    with open(report_path, "w") as f:
        json.dump(report, f, indent=4)
        
    print(f"✅ Evaluation complete. Report written to: {report_path}")
    print("Evaluation Report:")
    print(json.dumps(report, indent=4))

Overwriting health_evaluate.py


## Define the SageMaker Pipeline

#### Pipeline settings & parameters

In [26]:
# Get the role from the current session
sagemaker_role = sagemaker.get_execution_role()

print("Your current SageMaker Execution Role ARN is:")
print(sagemaker_role)

Your current SageMaker Execution Role ARN is:
arn:aws:iam::837028399719:role/iti113-team12-sagemaker-iti113-team12-domain-iti113-team12-Role


In [27]:
# Same as defined above
# data_path = f"s3://{bucket_name}/{base_folder}/datasets/c17/working"
print("Verifying data path:")
print(data_path)

Verifying data path:
s3://iti113-team12-bucket/project/datasets/c17/working


In [28]:
from sagemaker.workflow.parameters import ParameterFloat, ParameterInteger, ParameterString, ParameterBoolean
from sagemaker.processing import ProcessingInput, ProcessingOutput
import sagemaker

# Get SageMaker role and session
sagemaker_role = sagemaker.get_execution_role()
print(f"Your current SageMaker Execution Role ARN is: {sagemaker_role}")
sagemaker_session = sagemaker.Session()

# Parameters
model_package_group_name = "activityLabelModels"
processing_instance_type = "ml.m5.large" #"ml.t3.medium"
training_instance_type = "ml.m5.large"
experiment_name_param = ParameterString(name="ExperimentName", default_value="c-activityLabelClf")
accuracy_threshold_param = ParameterFloat(name="AccuracyThreshold", default_value=0.8)
f1_threshold_param = ParameterFloat(name="F1Threshold", default_value = 0.8) # low for now


# Hyperparameters for random forest
# Integer hyperparameters
n_estimators_param = ParameterInteger(name="n_estimators", default_value=100)
max_depth_param = ParameterInteger(name="max_depth", default_value=10)
min_samples_leaf_param = ParameterInteger(name="min_samples_leaf", default_value=5)
min_samples_split_param = ParameterInteger(name="min_samples_split", default_value=10)
# String hyperparameter
max_features_param = ParameterString(name="max_features", default_value="sqrt")
# Boolean hyperparameter
bootstrap_param = ParameterBoolean(name="bootstrap", default_value=True)


# Define input/output S3 URIs as pipeline parameters
# data_path = f"s3://{bucket_name}/{base_folder}/datasets/c17/working"
# dataset_s3_path = f"{data_path}/users/volunteer_details.csv"
input_data_uri = ParameterString(
    name="InputDataURI", 
    default_value=f"{data_path}/combined_c17.csv",
)
engr_data_uri = ParameterString(
    name="EngrDataURI", 
    default_value=f"{data_path}/combined_c17_engineered.csv",
)
train_data_uri = ParameterString(
    name="TrainDataURI", 
    default_value=f"{data_path}/preprocessed-data/train" # S3 location
)
test_data_uri = ParameterString(
    name="TestDataURI", 
    default_value=f"{data_path}/preprocessed-data/test" 
)
processed_train_data_uri = ParameterString(
    name="ProcessedTrainDataURI", 
    default_value=f"{data_path}/preprocessed-data/train"
)
processed_test_data_uri = ParameterString(
    name="ProcessedTestDataURI", 
    default_value=f"{data_path}/preprocessed-data/test"
)


Your current SageMaker Execution Role ARN is: arn:aws:iam::837028399719:role/iti113-team12-sagemaker-iti113-team12-domain-iti113-team12-Role


#### Define the feature engineering step

In [29]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep

# Define the SKLearnProcessor
featureEngr = SKLearnProcessor(
    framework_version="1.2-1",  
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="feature-engr",
    role=sagemaker_role,
    sagemaker_session=sagemaker_session
)

# Define the feature engineering step
featureEng_step = ProcessingStep(
    name=f"featureEngrStep-{int(time.time())}",
    processor=featureEngr,
    code="health_featureEngr.py", # Script was created in previous step
    inputs=[
        ProcessingInput(
            input_name="input-path",
            source=input_data_uri,
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="output-path",
            source="/opt/ml/processing/output", # local
            destination=engr_data_uri # S3
        )
    ],
    job_arguments=[
        "--input-path", "/opt/ml/processing/input",
        "--output-path", "/opt/ml/processing/output"
    ]
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


#### Define the data cleaning step

In [30]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep

# Define the SKLearnProcessor
datacleaning = SKLearnProcessor(
    framework_version="1.2-1",  
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="cleaning-data",
    role=sagemaker_role,
    sagemaker_session=sagemaker_session
)

# Define the data cleaning step
datacleaning_step = ProcessingStep(
    name=f"CleaningStep-{int(time.time())}",
    processor=datacleaning,
    code="health_datacleaning.py", # Script was created in previous step
    inputs=[
        ProcessingInput(
            input_name="input-path",
            source=featureEng_step.properties.ProcessingOutputConfig.Outputs["output-path"].S3Output.S3Uri,
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="output-train-path",
            source="/opt/ml/processing/output/train", # local
            destination=train_data_uri # S3
        ),
        ProcessingOutput(
            output_name="output-test-path",
            source="/opt/ml/processing/output/test", # local
            destination=test_data_uri # S3
        )
    ],
    job_arguments=[
        "--input-path", "/opt/ml/processing/input",
        "--output-train-path", "/opt/ml/processing/output/train",
        "--output-test-path", "/opt/ml/processing/output/test"
    ]
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


#### Define the preprocessing step (in progress)

In [31]:
# Define the SKLearnProcessor
preprocessing = SKLearnProcessor(
    framework_version="1.2-1",  
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="preprocessing",
    role=sagemaker_role,
    sagemaker_session=sagemaker_session
)

# Define the processing step
preprocessing_step = ProcessingStep(
    name=f"PreprocessingStep-{int(time.time())}",
    processor=preprocessing,
    code="health_preprocessor.py", # Script was created in previous step
    inputs=[
        ProcessingInput(
            input_name="input-train-path",
            source=datacleaning_step.properties.ProcessingOutputConfig.Outputs["output-train-path"].S3Output.S3Uri,
            destination="/opt/ml/processing/input/train"
        ),
        ProcessingInput(
            input_name="input-test-path",
            source=datacleaning_step.properties.ProcessingOutputConfig.Outputs["output-test-path"].S3Output.S3Uri,
            destination="/opt/ml/processing/input/test"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="output-train-path",
            source="/opt/ml/processing/output/train",
            destination=processed_train_data_uri
        ),
        ProcessingOutput(
            output_name="output-test-path",
            source="/opt/ml/processing/output/test",
            destination=processed_test_data_uri
        )
    ],
    job_arguments=[
        "--input-train-path", "/opt/ml/processing/input/train",
        "--input-test-path", "/opt/ml/processing/input/test",
        "--output-train-path", "/opt/ml/processing/output/train",
        "--output-test-path", "/opt/ml/processing/output/test"
    ]
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


#### Define the training step (in progress)

In [32]:
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.workflow.steps import TrainingStep, TrainingInput

# Training Step
sklearn_estimator = SKLearn(
    entry_point="health_train.py", 
    # source_dir=".",  # 👈 include the current folder
    framework_version="1.2-1",
    instance_type=training_instance_type,
    role=sagemaker_role,
    hyperparameters={
        "tracking-server-arn": mlflow_tracking_server_arn,
        "experiment-name": experiment_name_param,
        "model-output-path": "/opt/ml/model",
        "n_estimators": n_estimators_param,
        "max_depth": max_depth_param,
        "min_samples_leaf": min_samples_leaf_param,
        "min_samples_split": min_samples_split_param,
        "max_features": max_features_param,
        "bootstrap": bootstrap_param
    },
    py_version="py3",
    # requirements="requirements.txt" 
)

train_step = TrainingStep(
    name=f"TrainModel-{int(time.time())}",
    estimator=sklearn_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs["output-train-path"].S3Output.S3Uri,
            content_type="text/csv",
            input_mode="File",  # ensure it's mounted as a file
        )
    },
)


#### Define the Evalution step (in progress)

In [33]:
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.model_metrics import ModelMetrics, FileSource

evaluation_processor = ScriptProcessor(
    image_uri=sagemaker.image_uris.retrieve("sklearn", sagemaker_session.boto_region_name, "1.2-1"),
    command=['python3'],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="evaluate-model",
    role=sagemaker_role,
)

evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

evaluation_step = ProcessingStep(
    name=f"EvaluateModel-{int(time.time())}",
    processor=evaluation_processor,
    inputs=[
        ProcessingInput(
            source=train_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=preprocessing_step.properties.ProcessingOutputConfig.Outputs["output-test-path"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")],
    code="health_evaluate.py",  # SageMaker will handle uploading and running this script
    job_arguments=[  # Pass arguments here instead of in command
        "--model-path", "/opt/ml/processing/model",
        "--input-test-path", "/opt/ml/processing/test",
        "--output-path", "/opt/ml/processing/evaluation",
        "--model-package-group-name", model_package_group_name,
        "--region", "ap-southeast-1",
    ],
    property_files=[evaluation_report],
    depends_on=[train_step]
)

model_metrics_report = ModelMetrics(
    model_statistics=FileSource(
        s3_uri=evaluation_step.properties.ProcessingOutputConfig.Outputs["evaluation"].S3Output.S3Uri,
        content_type="application/json"
    )
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.


#### Define the Model registration step (in progress)

In [34]:
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.conditions import ConditionNot
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionEquals
from sagemaker.workflow.functions import JsonGet

# RegisterModel step (always defined, but executed conditionally)
step_register_new = RegisterModel(
    name="RegisterNewModel",
    estimator=sklearn_estimator,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.t2.medium"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics_report,
    approval_status="Approved" # changed from "PendingManualApproval" since passed threshold
)

step_register_better_model = RegisterModel(
    name="RegisterBetterModel",
    estimator=sklearn_estimator,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.t2.medium"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics_report,
    approval_status="Approved" # changed from "PendingManualApproval" since passed threshold
)


# Conditions: check accuracy & f1 > threshold OR no model exists
cond_accuracy = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path="accuracy"
    ),
    right=accuracy_threshold_param
)

cond_f1 = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path="f1"
    ),
    right=f1_threshold_param
)

cond_no_registered = ConditionEquals(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path="baseline_exists" # Check the key added to the report
    ),
    right=False # Condition is TRUE if baseline_exists is False
)

# Outer step: Checks for existence of registered model first
step_cond_metrics = ConditionStep(
    name="CheckAccuracyAndF1",
    conditions=[
        cond_accuracy,
        cond_f1,
    ],
    if_steps=[step_register_better_model], # Register model if both metrics pass
    else_steps=[],
)

step_cond_no_registered = ConditionStep(
    name="CheckIfNoModelExists",
    conditions=[cond_no_registered],
    if_steps=[step_register_new], # Register model if no baseline exists
    else_steps=[step_cond_metrics], # If model exists, check accuracy and f1
    depends_on=[evaluation_step]
)





#### Define Pipeline

In [35]:
from sagemaker.workflow.pipeline import Pipeline

# Define the Pipeline
# This assembles our parameters and steps into a single workflow.
pipeline_name = "activityLabelPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[input_data_uri, 
                engr_data_uri,
                train_data_uri,
                test_data_uri,
                processed_train_data_uri,
                processed_test_data_uri,
                experiment_name_param,
                accuracy_threshold_param,
                f1_threshold_param,
                # RandomForest hyperparameters
                n_estimators_param,
                max_depth_param,
                min_samples_leaf_param,
                min_samples_split_param,
                max_features_param,
                bootstrap_param],
    steps=[featureEng_step,
           datacleaning_step,
           preprocessing_step,
           train_step,
           evaluation_step,
           step_cond_no_registered] # Use the 'no registered model' check as the primary condition step
)

# Create or update the pipeline definition in your AWS account.
pipeline.upsert(role_arn=sagemaker_role)

print(f"Pipeline '{pipeline_name}' is defined and ready to be executed.")



Pipeline 'activityLabelPipeline' is defined and ready to be executed.


## Start Pipeline

In [36]:
from sagemaker.workflow.pipeline import Pipeline

# -------------------------
# Running existing pipeline
# -------------------------
pipeline = Pipeline(name="activityLabelPipeline")
# bucket_name = 'iti113-team12-bucket'  # e.g., 'my-company-sagemaker-bucket'
# base_folder = 'project'
# data_path = f"s3://{bucket_name}/{base_folder}/datasets/c17/working"

# Start the pipeline and pass the S3 path for v1.0 to our parameter
print("\nStarting pipeline execution...")

# Experiment with data
execution = pipeline.start(
    parameters={
        "InputDataURI": f"{data_path}/combined_c17.csv",
        "EngrDataURI": f"{data_path}/combined_c17_engineered.csv",
        "TrainDataURI": f"{data_path}/preprocessed-data/train",
        "TestDataURI": f"{data_path}/preprocessed-data/test",
        "ProcessedTrainDataURI": f"{data_path}/preprocessed-data/train",
        "ProcessedTestDataURI": f"{data_path}/preprocessed-data/test",

        # RandomForest hyperparameters: pass raw values, not ParameterInteger/Boolean
        "n_estimators": 100,
        "max_depth": 10,
        "min_samples_leaf": 5,
        "min_samples_split": 10,
        "max_features": "sqrt",
        "bootstrap": True,

        # MLflow / experiment
        # "experiment-name": "c-activityLabelClf",
    },
    execution_display_name="dataEngr-to-modelReg"
)

print(f"--> Execution started! ARN: {execution.arn}")
print(execution.describe())



Starting pipeline execution...
--> Execution started! ARN: arn:aws:sagemaker:ap-southeast-1:837028399719:pipeline/activityLabelPipeline/execution/srbo9vxkij5c
{'PipelineArn': 'arn:aws:sagemaker:ap-southeast-1:837028399719:pipeline/activityLabelPipeline', 'PipelineExecutionArn': 'arn:aws:sagemaker:ap-southeast-1:837028399719:pipeline/activityLabelPipeline/execution/srbo9vxkij5c', 'PipelineExecutionDisplayName': 'dataEngr-to-modelReg', 'PipelineExecutionStatus': 'Executing', 'CreationTime': datetime.datetime(2025, 8, 25, 3, 42, 43, 715000, tzinfo=tzlocal()), 'LastModifiedTime': datetime.datetime(2025, 8, 25, 3, 42, 43, 715000, tzinfo=tzlocal()), 'CreatedBy': {'IamIdentity': {'Arn': 'arn:aws:sts::837028399719:assumed-role/iti113-team12-sagemaker-iti113-team12-domain-iti113-team12-Role/SageMaker', 'PrincipalId': 'AROA4FYWHZJT3SRRDCPYT:SageMaker'}}, 'LastModifiedBy': {'IamIdentity': {'Arn': 'arn:aws:sts::837028399719:assumed-role/iti113-team12-sagemaker-iti113-team12-domain-iti113-team12-R

In [None]:
sagemaker_client = sagemaker.Session().sagemaker_client

# Get the details of the specific pipeline execution
response = sagemaker_client.describe_pipeline_execution(
    PipelineExecutionArn=execution.arn
)

# Print the current status
status = response['PipelineExecutionStatus']
print(f"Pipeline Status: {status}")

# You can also see the creation time and last modified time
print(f"Creation Time: {response['CreationTime']}")
if 'LastModifiedTime' in response:
    print(f"Last Modified: {response['LastModifiedTime']}")

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
Pipeline Status: Succeeded
Creation Time: 2025-07-30 02:30:38.031000+00:00
Last Modified: 2025-07-30 02:49:11.783000+00:00
Pipeline Status: Succeeded
Creation Time: 2025-07-30 02:49:41.594000+00:00
Last Modified: 2025-07-30 03:07:53.567000+00:00


## Manual Deployment (unused)

#### 1. Identify best training job

In [31]:
import mlflow

mlflow.set_tracking_uri(mlflow_tracking_server_arn)

experiment = mlflow.get_experiment_by_name("healthstatus_classifier")
runs = mlflow.search_runs(experiment.experiment_id)

print("Experiment:", experiment)

# Find the run with the best metric (e.g., highest f1)
runs = mlflow.search_runs(experiment.experiment_id)
print("Runs type:", type(runs))
# print("Runs columns:", runs.columns)
# print("First few rows:\n", runs.head())

# Check if 'metrics.f1_macro' exists:
if 'metrics.f1_macro' not in runs.columns:
    raise ValueError("No 'metrics.f1_macro' column found. Available metrics:", runs.columns)

best_run = runs.loc[runs['metrics.f1_macro'].idxmax()]
print("Best run:", best_run)

# Construct the model URI from the run
best_run_id = best_run.run_id

model_uri = f"runs:/{best_run_id}/model"
print(f"Registering model from URI: {model_uri}")

# Register the model to the MLflow Model Registry
model_name = "healthstatus_classifier_model"
registered_model = mlflow.register_model(
    model_uri=model_uri,
    name=model_name
)

print(f"Model '{model_name}' registered with version: {registered_model.version}")

Experiment: <Experiment: artifact_location='s3://sagemaker-iti112-common/mlflow-1/106', creation_time=1753451874748, experiment_id='106', last_update_time=1753451874748, lifecycle_stage='active', name='8815v-PenguinPrediction', tags={}>
Runs type: <class 'pandas.core.frame.DataFrame'>
Best run: run_id                                            e844a036b919434eb7363bf3204d3b7e
experiment_id                                                                  106
status                                                                    FINISHED
artifact_uri                     s3://sagemaker-iti112-common/mlflow-1/106/e844...
start_time                                        2025-07-30 03:02:17.828000+00:00
end_time                                          2025-07-30 03:02:21.510000+00:00
metrics.accuracy                                                          0.992727
metrics.f1_macro                                                          0.992483
params.solver                           

Registered model 'penguin-model-8815v' already exists. Creating a new version of this model...
2025/07/30 03:12:37 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: penguin-model-8815v, version 7


Model 'penguin-model-8815v' registered with version: 7


Created version '7' of model 'penguin-model-8815v'.


#### 2. Retrieve model artifact from MLflow

In [32]:
from mlflow.tracking import MlflowClient
client = MlflowClient()
model_version = client.get_model_version(model_name, registered_model.version)

model_artifact_s3 = model_version.source
print("Model S3 Artifact URI:", model_artifact_s3)

Model S3 Artifact URI: s3://sagemaker-iti112-common/mlflow-1/106/e844a036b919434eb7363bf3204d3b7e/artifacts/model


#### 3. Download, repackage model, upload

In [33]:
import tarfile
import boto3
import os
import shutil

model_folder = "/tmp/model" # temp local dir
code_folder = os.path.join(model_folder, "code") 
os.makedirs(model_folder, exist_ok=True)

# Download mMLflow artifact to local
mlflow.artifacts.download_artifacts(
    artifact_uri=model_artifact_s3,
    dst_path=model_folder
)

# Find the actual model file inside model_folder (model.pkl)
model_file_path = None
for root, dirs, files in os.walk(model_folder):
    for f in files:
        if f.endswith(".pkl"): 
            model_file_path = os.path.join(root, f)
            break

if not model_file_path:
    raise FileNotFoundError("No model.pkl found in MLflow artifacts")

# Copy it to a clean directory so it's at root level
root_dir = "/tmp/model_root"
os.makedirs(root_dir, exist_ok=True)
shutil.copy(model_file_path, os.path.join(root_dir, "model.pkl"))

# Compress model to model.tar.gz with model.pkl at root
model_tar_path = "/tmp/model.tar.gz"
with tarfile.open(model_tar_path, "w:gz") as tar:
    tar.add(root_dir, arcname=".")

# Upload model.tar.gz to S3
s3 = boto3.client("s3")
bucket_name = sagemaker_session.default_bucket()
# parsed = boto3.session.Session().resource("s3").Bucket(sagemaker_session.default_bucket())
s3_key = f"{base_folder}/models/{model_name}-v{registered_model.version}/model.tar.gz"
s3.upload_file(model_tar_path, bucket_name, s3_key)

model_s3_uri = f"s3://{bucket_name}/{s3_key}"
print("✅ Compressed model uploaded to:", model_s3_uri)

Downloading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

✅ Compressed model uploaded to: s3://sagemaker-iti112-common/8708815v@myaccount.nyp.edu.sg/models/penguin-model-8815v-v7/model.tar.gz


#### 4. Write inference.py file

In [34]:
%%writefile inference.py
# inference.py
import joblib
import os
import pandas as pd

def model_fn(model_dir):
    return joblib.load(os.path.join(model_dir, "model.pkl"))

def input_fn(request_body, content_type):
    if content_type == "application/json":
        return pd.DataFrame.from_dict(eval(request_body))  # simple eval for test input
    raise ValueError(f"Unsupported content type: {content_type}")

def predict_fn(input_data, model):
    return model.predict(input_data)

def output_fn(prediction, accept):
    return str(prediction.tolist())

Overwriting inference.py


#### 5. Check model file structure
Make sure model.tar.gz contains model.pkl at the root

In [35]:
import boto3

s3 = boto3.client("s3")
s3_uri = model_s3_uri #"s3://your-bucket/path/to/model.tar.gz"

bucket = s3_uri.replace("s3://", "").split("/")[0]
key = "/".join(s3_uri.replace("s3://", "").split("/")[1:])

local_path = "model.tar.gz"
s3.download_file(bucket, key, local_path)


In [36]:
import tarfile

# Make sure model.tar.gz contains model.pkl at the root
# OR contains MLflow's default format with `MLmodel` + `model.pkl`

with tarfile.open(local_path, "r:gz") as tar:
    tar.list()   # shows file structure


?rwxr-xr-x sagemaker-user/users          0 2025-07-30 03:12:44 ./ 
?rw-r--r-- sagemaker-user/users       1084 2025-07-30 03:12:44 ./model.pkl 


#### 6. Create SageMaker Model object

In [None]:
from sagemaker.sklearn.model import SKLearnModel

# Create the model object
sklearn_model = SKLearnModel(
    model_data=model_s3_uri,
    role=sagemaker_role,
    entry_point="inference.py", 
    framework_version="1.2-1",
    sagemaker_session=sagemaker_session
)

#### 7. Check and deploy endpoint

In [43]:
from botocore.exceptions import ClientError

# Define the endpoint name
endpoint_name = "8815v-penguin-clf" 
# deleted previous endpoint & endpoint config instead of using {registered_model.version}"


sm_client = boto3.client("sagemaker")
def endpoint_exists(endpoint_name):
    try:
        sm_client.describe_endpoint(EndpointName=endpoint_name)
        return True
    except ClientError as e:
        if "Could not find endpoint" in str(e):
            return False
        else:
            raise

if endpoint_exists(endpoint_name):
    predictor = sklearn_model.deploy(
        instance_type="ml.t2.medium",
        initial_instance_count=1,
        endpoint_name=endpoint_name,
        update_endpoint=True # Update endpoint if it already exists
    )
else:
    # Create endpoint if it doesn't exist
    predictor = sklearn_model.deploy(
        instance_type="ml.t2.medium",
        initial_instance_count=1,
        endpoint_name=endpoint_name
    )

print(f"Model deployed at endpoint: {endpoint_name}")

INFO:sagemaker:Creating model with name: sagemaker-scikit-learn-2025-07-30-03-20-52-727
INFO:sagemaker:Creating endpoint-config with name 8815v-penguin-clf
INFO:sagemaker:Creating endpoint with name 8815v-penguin-clf


-----------!Model deployed at endpoint: 8815v-penguin-clf


## Inference and Cleanup

#### Test endpoint

In [46]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import StringDeserializer

# Create the predictor with JSON handling

predictor = Predictor(
    endpoint_name=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=JSONSerializer(), # convert to JSON format
    deserializer=StringDeserializer()
)

# Get X_test
processed_test_uri = f"{data_path}/v1.0/processed/test/test_processed.csv"

X_test = pd.read_csv(processed_test_uri)

test_sample = X_test.iloc[:5]
print(test_sample)
# Convert to a dict (records format: {col_name: [v1, v2, ...]})
test_sample = test_sample.drop(columns='species')
payload = test_sample.to_dict(orient="list")

print("Sending payload:\n", payload)

response = predictor.predict(payload)
print("Prediction response:", response)



  species  culmen_length_mm  culmen_depth_mm  flipper_length_mm  body_mass_g  \
0  Adelie         -2.005109        -0.534410          -1.645021    -1.636340   
1  Adelie         -0.816790         1.761560          -0.708965    -0.388429   
2  Adelie         -1.529781        -0.085198          -0.852974    -1.074780   
3  Gentoo          1.084521        -0.634235           1.667176     1.421042   
4  Gentoo          1.066240        -0.534410           0.875129     1.483437   

   sex_MALE  island_Dream  island_Torgersen  
0       0.0           1.0               0.0  
1       0.0           0.0               0.0  
2       0.0           0.0               1.0  
3       1.0           0.0               0.0  
4       1.0           0.0               0.0  
Sending payload:
 {'culmen_length_mm': [-2.0051090117600747, -0.8167896138012701, -1.5297812525765526, 1.0845214229328168, 1.0662395860411429], 'culmen_depth_mm': [-0.5344098100591036, 1.7615595346970407, -0.0851984165198584, -0.63423456417893

#### View endpoint logs

In [45]:
import boto3

# Enter the name of your SageMaker endpoint
endpoint_name = "8815v-penguin-clf"

# The log group is created based on the endpoint name
log_group_name = f"/aws/sagemaker/Endpoints/{endpoint_name}"

# Create a CloudWatch Logs client
logs_client = boto3.client("logs")

print(f"Searching for logs in: {log_group_name}\n")

try:
    # Find all log streams in the log group, ordered by the most recent
    response = logs_client.describe_log_streams(
        logGroupName=log_group_name,
        orderBy='LastEventTime',
        descending=True
    )

    log_streams = response.get("logStreams", [])

    if not log_streams:
        print("No log streams found. The endpoint might not have processed any requests yet.")
    
    # Loop through each stream and print its recent log events
    for stream in log_streams:
        stream_name = stream['logStreamName']
        print(f"--- Logs from stream: {stream_name} ---")

        # Get log events from the stream
        log_events = logs_client.get_log_events(
            logGroupName=log_group_name,
            logStreamName=stream_name,
            startFromHead=False,  # False gets recent logs first
            limit=50  # Get up to 50 recent log events
        )
        
        # Print events in chronological order
        for event in reversed(log_events.get("events", [])):
            print(event['message'].strip())
        
        print("-" * (len(stream_name) + 24), "\n")

except logs_client.exceptions.ResourceNotFoundException:
    print(f"Error: Log group '{log_group_name}' was not found.")
    print("Please check the endpoint name and ensure it has been invoked.")
except Exception as e:
    print(f"An error occurred: {e}")

Searching for logs in: /aws/sagemaker/Endpoints/8815v-penguin-clf

--- Logs from stream: AllTraffic/i-0ade6cfefcc711c88 ---
169.254.178.2 - - [30/Jul/2025:03:34:51 +0000] "GET /ping HTTP/1.1" 200 0 "-" "AHC/2.0"
169.254.178.2 - - [30/Jul/2025:03:34:46 +0000] "GET /ping HTTP/1.1" 200 0 "-" "AHC/2.0"
169.254.178.2 - - [30/Jul/2025:03:34:41 +0000] "GET /ping HTTP/1.1" 200 0 "-" "AHC/2.0"
169.254.178.2 - - [30/Jul/2025:03:34:36 +0000] "GET /ping HTTP/1.1" 200 0 "-" "AHC/2.0"
169.254.178.2 - - [30/Jul/2025:03:34:31 +0000] "GET /ping HTTP/1.1" 200 0 "-" "AHC/2.0"
169.254.178.2 - - [30/Jul/2025:03:34:26 +0000] "GET /ping HTTP/1.1" 200 0 "-" "AHC/2.0"
169.254.178.2 - - [30/Jul/2025:03:34:21 +0000] "GET /ping HTTP/1.1" 200 0 "-" "AHC/2.0"
169.254.178.2 - - [30/Jul/2025:03:34:16 +0000] "GET /ping HTTP/1.1" 200 0 "-" "AHC/2.0"
169.254.178.2 - - [30/Jul/2025:03:34:11 +0000] "GET /ping HTTP/1.1" 200 0 "-" "AHC/2.0"
169.254.178.2 - - [30/Jul/2025:03:34:06 +0000] "GET /ping HTTP/1.1" 200 0 "-" "AHC/2

#### Delete endpoint

In [42]:
# Clean Up Resources
print(f"Deleting SageMaker endpoint: {endpoint_name}...")
predictor.delete_endpoint()
print("Endpoint deleted successfully.")

from botocore.exceptions import ClientError

try:
    sm_client.delete_endpoint_config(EndpointConfigName=endpoint_name)
    print(f"Deleted old endpoint config: {endpoint_name}")
except ClientError as e:
    if "Could not find" not in str(e):
        raise


Deleting SageMaker endpoint: 8815v-penguin-clf...
Deleted old endpoint config: 8815v-penguin-clf


## Automated deployment with second pipeline
(Couldn't make the Eventbridge work despite watching for 'model created with approved status' on top of 'model state change to approved status' -  pipeline created but no executions)

#### Deployment script

In [67]:
%%writefile deploy.py
import subprocess
import sys

# --- Install required packages ---
def install(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", "boto3==1.28.57", "botocore==1.31.57", "numpy==1.24.1", "sagemaker" ])

# Ensure sagemaker SDK is installed before importing
try:
    import sagemaker
except ImportError:
    print("sagemaker SDK not found. Installing now...")
    install("sagemaker")
    import sagemaker

import argparse
import sagemaker
import boto3
from sagemaker.model import ModelPackage

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    # Accept the registered model's ARN instead of the S3 data path
    parser.add_argument("--model-package-arn", type=str, required=True)
    parser.add_argument("--role", type=str, required=True)
    parser.add_argument("--endpoint-name", type=str, required=True)
    parser.add_argument("--region", type=str, required=True)
    args = parser.parse_args()

    boto_session = boto3.Session(region_name=args.region)
    sagemaker_session = sagemaker.Session(boto_session=boto_session)

    # Create a SageMaker Model object directly from the Model Package ARN
    model = ModelPackage(
        model_package_arn=args.model_package_arn,
        role=args.role,
        sagemaker_session=sagemaker_session,
    )

    # Deploy the model to an endpoint
    print(f"Deploying registered model from ARN to endpoint: {args.endpoint_name}")
    model.deploy(
        initial_instance_count=1,
        instance_type="ml.t2.medium",
        endpoint_name=args.endpoint_name,
        # Update endpoint if it already exists
        update_endpoint=True
    )
    print("Deployment complete.")


Overwriting deploy.py


#### Define deployment pipeline

In [68]:
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.parameters import ParameterString
import sagemaker

# Define Parameters for the deployment pipeline
# This will be provided by the EventBridge trigger
model_package_arn_param = ParameterString(name="ModelPackageArn", default_value="")
role_param = ParameterString(name="ExecutionRole", default_value=sagemaker_role)
endpoint_name_param = ParameterString(name="EndpointName", default_value="8815v-penguin-clf-endpoint")

# Create a ScriptProcessor for deployment
# Using a more recent scikit-learn version is generally a good idea
deploy_processor = ScriptProcessor(
    image_uri=sagemaker.image_uris.retrieve("sklearn", sagemaker_session.boto_region_name, version="1.2-1"),
    command=["python3"],
    instance_type="ml.t3.medium",
    instance_count=1,
    role=role_param,
    base_job_name="deploy-registered-model"
)

# Define the deployment step that takes the model ARN as an argument
step_deploy = ProcessingStep(
    name="DeployRegisteredModel",
    processor=deploy_processor,
    code="deploy.py",
    job_arguments=[
        "--model-package-arn", model_package_arn_param,
        "--role", role_param,
        "--endpoint-name", endpoint_name_param,
        "--region", "ap-southeast-1"
    ]
)



# Define the independent deployment pipeline
deploy_pipeline = Pipeline(
    name="8815V-Deploy",
    parameters=[model_package_arn_param, role_param, endpoint_name_param],
    steps=[step_deploy]
)

# Create or update the pipeline definition
# Capture the response which contains the ARN
response = deploy_pipeline.upsert(role_arn=sagemaker_role)

# Extract the ARN from the response dictionary
pipeline_arn = response['PipelineArn']

print(f"Deployment pipeline ARN: {pipeline_arn}")

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.


sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket




Deployment pipeline ARN: arn:aws:sagemaker:ap-southeast-1:287730026636:pipeline/8815V-Deploy


#### Eventbridge to watch for event

In [69]:
import boto3
import json

# Initialize the EventBridge client
events_client = boto3.client("events")

# Define the event pattern to listen for
# This pattern triggers when a model package in your group has its status changed to "Approved"
event_pattern = {
    "source": ["aws.sagemaker"],
    "detail-type": [
        "SageMaker Model Package State Change",
        "SageMaker Model Package Created"],
    "detail": {
        "ModelPackageGroupName": [model_package_group_name], 
        "ModelApprovalStatus": ["Approved"]
    }
}

# Define the target for the rule (our deployment pipeline)
# We need to map the event's detail to the pipeline's parameters
target = {
    "Id": "DeployPenguinPipelineTarget",
    "Arn": pipeline_arn, # The ARN of the pipeline we just created
    "RoleArn": sagemaker_role, # The execution role for the pipeline
    "SageMakerPipelineParameters": {
        "PipelineParameterList": [
            {
                # Map the ARN from the event to the pipeline's "ModelPackageArn" parameter
                "Name": "ModelPackageArn",
                "Value": "$.detail.ModelPackageArn"
            }
        ]
    }
}

# Create or update the EventBridge rule
try:
    username_lower = "8708815v@myaccount.nyp.edu.sg".lower().replace("@", "-")
    rule_name = f"{username_lower}-TriggerModelDeploymentOnApproval"
    print(f"Creating or updating EventBridge rule: {rule_name}")
    response = events_client.put_rule(
        Name=rule_name,
        EventPattern=json.dumps(event_pattern),
        State="ENABLED",
        Description="Triggers the SageMaker pipeline to deploy a model upon approval."
    )
    
    # Add the pipeline as a target for the rule
    events_client.put_targets(Rule=rule_name, Targets=[target])
    print("EventBridge rule created successfully!")
    print("Now, when a model is approved in the Model Registry, the deployment pipeline will trigger automatically.")

except Exception as e:
    print(f"Error creating rule: {e}")

Creating or updating EventBridge rule: 8708815v-myaccount.nyp.edu.sg-TriggerModelDeploymentOnApproval
EventBridge rule created successfully!
Now, when a model is approved in the Model Registry, the deployment pipeline will trigger automatically.
