# Human Activity Recognition - AWS

### Problem Statement
Let’s open the notebook “HAR Model training notebook”. The problem statement for this notebook is: Deploying the Human Activity Recognition problem using the Level 1 MLOps architecture, the aim is to enhance the experience of Blackmi's health app by overcoming the problems faced in the level 0 architecture. Utilising the Human Activity Recognition dataset, we will construct a machine-learning model along with the ML pipelines to categorise user activities for the real-time health alerts using AWS sagemaker studio. Here we will also be monitoring the model performance and deploy the model using different deployment techniques.

### Approach 
In this notebook we will be building the level 1 architecture of MLOps, and our major focus would be on creating ML pipeline, model monitoring and model deployment. The major take away for this lesson is to learn:

1. Feature engineering with the amazon sagemaker processing 



In [1]:
#pip install s3fs

In [2]:
# Importing all the necessary libraries 
# Importing pandas and numpy for data preprocessing. 
import pandas as pd
import numpy as np
# Boto3 is used for launching the EC2 instances and manipulating s3 buckets.
import boto3
# Sagemaker is imported for building, training and deploying machine learning models.
import sagemaker

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [3]:
# Initialising new sagemaker session as "sess".
sess = sagemaker.Session()
# Check for necessary permission needed for training and deploying models. 
role = sagemaker.get_execution_role()
# To understand where this session is configured to operate.
region = boto3.Session().region_name
region

'us-east-1'

In [4]:
# Bucket variable is used for storing the location of the bucket
bucket = sess.default_bucket()
# Assigning the prefix variable 
prefix = 'mlops-human-activity-recognition'
###

input_source = f's3://{bucket}/{prefix}/data/data.csv'

## Load Data & Save to S3 Path

In [5]:
input_data_path = './data/train.csv'
df = pd.read_csv(input_data_path)

In [6]:
df.shape

(2240, 563)

In [13]:
df.to_csv(f's3://{bucket}/{prefix}/data/data.csv', index = False)

In [23]:
bucket

'sagemaker-us-east-1-825765393939'

## Feature Engineering with Amazon SageMaker Processing

In [15]:
%%writefile preprocessing.py

import pandas as pd
import numpy as np
import argparse
import os
from joblib import dump, load
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import ExtraTreesClassifier
def _parse_args():

    parser = argparse.ArgumentParser()
    
    # Data, model, and output directories
    # model_dir is always passed in from SageMaker. By default this is a S3 path under the default bucket.
    parser.add_argument('--filepath', type=str, default='/opt/ml/processing/input/')
    parser.add_argument('--filename', type=str, default='data.csv')
    parser.add_argument('--outputpath', type=str, default='/opt/ml/processing/output/')

    return parser.parse_known_args()


def get_top_k_features(X, Y, k):
        clf = ExtraTreesClassifier(n_estimators=50)
        clf = clf.fit(X, Y)
        feature_df = pd.DataFrame(
            data=(X.columns, clf.feature_importances_)
        ).T.sort_values(by=1, ascending=False)
        cols = feature_df.head(k)[0].values
        return cols

if __name__=="__main__":
    # Process arguments
    args, _ = _parse_args()
    # Load data
    path = os.path.join(args.filepath,args.filename)
    print(path)
    
    # Reading the dataset and performing label encoding 
    df = pd.read_csv(os.path.join(args.filepath,args.filename))
    df.dropna(inplace = True)
    le = LabelEncoder()
    df['Activity'] = le.fit_transform(df['Activity'])

    # Assignining the indepeneded and depended variable 
    X = df.drop(['Activity'], axis =1)
    Y = df['Activity']
    
    # Extracting top 12 important feature and filtering the dataset
    k =12
    final_cols = get_top_k_features(X, Y, k)
    final_cols = np.append(final_cols,np.array(['Activity']))
    df = df[final_cols]
    
    # Train, test, validation split
    # Randomly sort the data then split out first 70%, second 20%, and last 10%
    train_data, validation_data, test_data = np.split(df.sample(frac=1, random_state=42), [int(0.8 * len(df)), int(0.9 * len(df))])  
    
    # Storing of train, validation and test datasets 
    pd.concat([train_data['Activity'], train_data.drop(['Activity'], axis=1)], axis=1).to_csv(os.path.join(args.outputpath, 'train/train.csv'), index=False, header=False)
    pd.concat([validation_data['Activity'], validation_data.drop(['Activity'], axis=1)], axis=1).to_csv(os.path.join(args.outputpath, 'validation/validation.csv'), index=False, header=False)
    test_data[['Activity']].to_csv(os.path.join(args.outputpath, 'test/test_y.csv'), index=False, header=False)
    test_data.drop(['Activity'], axis=1).to_csv(os.path.join(args.outputpath, 'test/test_x.csv'), index=False, header=False)
    
    ## Save Features columns
    dump(final_cols, os.path.join(args.outputpath, 'feature/feature.joblib'))
    ## Save Encoder
    dump(le, os.path.join(args.outputpath, 'feature/encoder.joblib'))
    
    print("## Processing complete. Exiting.")

Overwriting preprocessing.py


In [16]:
train_path = f"s3://{bucket}/{prefix}/train"
validation_path = f"s3://{bucket}/{prefix}/validation"
test_path = f"s3://{bucket}/{prefix}/test"
feature_path = f"s3://{bucket}/{prefix}/feature"

In [17]:
# Importing necessary library for data processing 
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=get_execution_role(),
    instance_type="ml.t3.medium",
    instance_count=1, 
    base_job_name='sklearn-ml-train'
)

sklearn_processor.run(
    code='preprocessing.py',
    inputs=[
        ProcessingInput(
            source=input_source, 
            destination="/opt/ml/processing/input",
            s3_input_mode="File",
            s3_data_distribution_type="ShardedByS3Key"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="train_data", 
            source="/opt/ml/processing/output/train",
            destination=train_path,
        ),
        ProcessingOutput(output_name="validation_data", source="/opt/ml/processing/output/validation", destination=validation_path),
        ProcessingOutput(output_name="test_data", source="/opt/ml/processing/output/test", destination=test_path),
        ProcessingOutput(output_name="feature_data", source="/opt/ml/processing/output/feature", destination=feature_path)
    ]
)

INFO:sagemaker:Creating processing-job with name sklearn-ml-train-2024-11-20-17-26-03-779


................[34m/opt/ml/processing/input/data.csv[0m
[34m## Processing complete. Exiting.[0m



## Training

In [18]:
s3_input_train = sagemaker.TrainingInput(s3_data=train_path.format(bucket, prefix), 
                                                    content_type='csv')
s3_input_validation = sagemaker.TrainingInput(s3_data=validation_path.format(bucket, prefix),
                                                     content_type='csv')

In [19]:
%%writefile sklearn-train.py

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from joblib import dump, load
import pandas as pd, numpy as np, os, argparse
from io import StringIO
## predict

# inference function - tells SageMaker how to load the model
def model_fn(model_dir):
    clf = load(os.path.join(model_dir, "model.joblib"))
    return clf

def input_fn(input_data, content_type):
    """Parse input data payload

    We currently only take csv input. Since we need to process both labelled
    and unlabelled data we first determine whether the label column is present
    by looking at how many columns were provided.
    """
    if content_type == 'text/csv':
        # Read the raw input data as CSV.
        df = pd.read_csv(StringIO(input_data), 
                         header=None)

        if len(df.columns) == len(feature_columns_names) + 1:
            # This is a labelled example, includes the ring label
            df.columns = feature_columns_names + [label_column]
        elif len(df.columns) == len(feature_columns_names):
            # This is an unlabelled example.
            df.columns = feature_columns_names

        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))


def output_fn(prediction, accept):
    """Format prediction output

    The default accept/content-type between containers for serial inference is JSON.
    We also want to set the ContentType or mimetype as the same value as accept so the next
    container can read the response payload correctly.
    """
    if accept == "application/json":
        instances = []
        for row in prediction.tolist():
            instances.append({"features": row})

        json_output = {"instances": instances}

        return worker.Response(json.dumps(json_output), accept, mimetype=accept)
    elif accept == 'text/csv':
        return worker.Response(encoders.encode(prediction, accept), accept, mimetype=accept)
    else:
        raise RuntimeException("{} accept type is not supported by this script.".format(accept))
        
def predict_fn(input_data, model):
    """Preprocess input data

    We implement this because the default predict_fn uses .predict(), but our model is a preprocessor
    so we want to use .transform().

    The output is returned in the following order:

        rest of features either one hot encoded or standardized
    """
    features = model.transform(input_data)

    if label_column in input_data:
        # Return the label (as the first column) and the set of features.
        return np.insert(features, 0, input_data[label_column], axis=1)
    else:
        # Return only the set of features
        return features        
        
# Argument parser
def _parse_args():
    parser = argparse.ArgumentParser()
    # Hyperparameters
    parser.add_argument("--n-estimators", type=int, default=10)
    parser.add_argument("--min-samples-leaf", type=int, default=3)
    # Data, model, and output directories
    parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))
    parser.add_argument("--train-file", type=str, default="train.csv")
    parser.add_argument("--test-file", type=str, default="test.csv")
    # Parse the arguments
    return parser.parse_known_args()

# Main Training Loop
if __name__=="__main__":
    # Process arguments
    args, _ = _parse_args()
    # Load the dataset
    train_df = pd.read_csv(os.path.join(args.train, args.train_file))
    test_df = pd.read_csv(os.path.join(args.test, args.test_file))
    # Separate X and y
    X_train, y_train = train_df.drop(train_df.columns[0], axis=1), train_df[train_df.columns[0]]
    X_test, y_test = test_df.drop(test_df.columns[0], axis=1), test_df[test_df.columns[0]]
    # Define the model and train it
    model = RandomForestClassifier(
        n_estimators=args.n_estimators, n_jobs=-1
    )
    model.fit(X_train, y_train)
    # Evaluate the model performances
    print(f'Model Accuracy: {accuracy_score(y_test, model.predict(X_test))}')
    dump(model, os.path.join(args.model_dir, 'model.joblib'))
    

Writing sklearn-train.py


In [27]:
# We use the Estimator from the SageMaker Python SDK
from sagemaker import get_execution_role
from sagemaker.sklearn.estimator import SKLearn

FRAMEWORK_VERSION = "0.23-1"

# Define the Estimator from SageMaker (Script Mode)
sklearn_estimator = SKLearn(
    entry_point="sklearn-train.py",
    role=get_execution_role(),
    instance_count=1,
    instance_type="ml.m5.xlarge",
    framework_version=FRAMEWORK_VERSION,
    base_job_name="rf-scikit",
    metric_definitions=[{"Name": "Accuracy", "Regex": "Accuracy: ([0-9.]+).*$"}],
    hyperparameters={
        "n-estimators": 120,
        "min-samples-leaf": 3,
        "test-file": "validation.csv"
    },
)

# Train the model (~5 minutes)
sklearn_estimator.fit({"train": s3_input_train, "test": s3_input_validation})

INFO:sagemaker:Creating training-job with name: rf-scikit-2024-11-20-17-41-28-939


2024-11-20 17:41:34 Starting - Starting the training job...
2024-11-20 17:41:50 Starting - Preparing the instances for training...
2024-11-20 17:42:15 Downloading - Downloading input data...
2024-11-20 17:42:40 Downloading - Downloading the training image..
2024-11-20 17:43:34 Training - Training image download completed. Training in progress.
2024-11-20 17:43:34 Uploading - Uploading generated training model
2024-11-20 17:43:34 Completed - Training job completed
[34m2024-11-20 17:43:17,533 sagemaker-containers INFO     Imported framework sagemaker_sklearn_container.training[0m
[34m2024-11-20 17:43:17,537 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2024-11-20 17:43:17,583 sagemaker_sklearn_container.training INFO     Invoking user training script.[0m
[34m2024-11-20 17:43:17,738 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2024-11-20 17:43:17,750 sagemaker-training-toolkit INFO     No GPU

## Hyper Parameter

In [28]:
# we use the Hyperparameter Tuner
from sagemaker.tuner import IntegerParameter

# Define exploration boundaries
hyperparameter_ranges = {
    "n-estimators": IntegerParameter(100, 200),
    "min-samples-leaf": IntegerParameter(2, 6)
}

Optimizer = sagemaker.tuner.HyperparameterTuner(
    estimator=sklearn_estimator,
    hyperparameter_ranges=hyperparameter_ranges,
    base_tuning_job_name="RF-tuner",
    objective_type="Maximize",
    objective_metric_name="Accuracy",
    metric_definitions=[
        {"Name": "Accuracy", "Regex": "Accuracy: ([0-9.]+).*$"}
    ],  # extract tracked metric from logs with regexp
    max_jobs=10,
    max_parallel_jobs=2,
)

In [29]:
Optimizer.fit({"train": s3_input_train, "test": s3_input_validation})


INFO:sagemaker:Creating hyperparameter tuning job with name: RF-tuner-241120-1746


...........................................................!


In [30]:
# get tuner results in a df
results = Optimizer.analytics().dataframe()
while results.empty:
    time.sleep(1)
    results = Optimizer.analytics().dataframe()
results.head()


Unnamed: 0,min-samples-leaf,n-estimators,TrainingJobName,TrainingJobStatus,FinalObjectiveValue,TrainingStartTime,TrainingEndTime,TrainingElapsedTimeSeconds
0,3.0,171.0,RF-tuner-241120-1746-010-779302a8,Completed,0.941704,2024-11-20 17:51:31+00:00,2024-11-20 17:52:00+00:00,29.0
1,3.0,131.0,RF-tuner-241120-1746-009-8d26adb8,Completed,0.946188,2024-11-20 17:51:26+00:00,2024-11-20 17:51:55+00:00,29.0
2,3.0,194.0,RF-tuner-241120-1746-008-9803ff51,Completed,0.946188,2024-11-20 17:50:50+00:00,2024-11-20 17:51:19+00:00,29.0
3,3.0,100.0,RF-tuner-241120-1746-007-a6cd8a50,Completed,0.950673,2024-11-20 17:50:41+00:00,2024-11-20 17:51:09+00:00,28.0
4,6.0,199.0,RF-tuner-241120-1746-006-80a559a3,Completed,0.946188,2024-11-20 17:50:03+00:00,2024-11-20 17:50:32+00:00,29.0


In [11]:
import sagemaker
import boto3
from sagemaker import image_uris
from sagemaker.session import Session
from sagemaker.inputs import TrainingInput

# initialize hyperparameters
hyperparameters = {
        "num_class":6,
        "max_depth":"5",
        "eta":"0.2",
        "gamma":"4",
        "min_child_weight":"6",
        "subsample":"0.7",
        "objective":"multi:softmax",
        "num_round":"50"}

# set an output path where the trained model will be saved
#bucket = sagemaker.Session().default_bucket()
#prefix = 'DEMO-xgboost-as-a-built-in-algo'
output_path = 's3://{}/{}/{}/output'.format(bucket, prefix, 'abalone-xgb-built-in-algo')

# this line automatically looks for the XGBoost image URI and builds an XGBoost container.
# specify the repo_version depending on your preference.
xgboost_container = sagemaker.image_uris.retrieve("xgboost", region, "1.7-1")

# construct a SageMaker estimator that calls the xgboost-container
xgboost_estimator = sagemaker.estimator.Estimator(image_uri=xgboost_container, 
                                          hyperparameters=hyperparameters,
                                          role=sagemaker.get_execution_role(),
                                          instance_count=1, 
                                          instance_type='ml.m5.xlarge', 
                                          volume_size=5, # 5 GB 
                                          output_path=output_path)

# define the data type and paths to the training and validation datasets
content_type = "text/csv"#"libsvm"
train_input = TrainingInput("s3://{}/{}/{}/".format(bucket, prefix, 'train'), content_type=content_type)
validation_input = TrainingInput("s3://{}/{}/{}/".format(bucket, prefix, 'validation'), content_type=content_type)

# execute the XGBoost training job
xgboost_estimator.fit({'train': train_input, 'validation': validation_input})

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating training-job with name: sagemaker-xgboost-2023-09-26-05-47-34-821


2023-09-26 05:47:34 Starting - Starting the training job...
2023-09-26 05:47:51 Starting - Preparing the instances for training......
2023-09-26 05:48:56 Downloading - Downloading input data
2023-09-26 05:48:56 Training - Downloading the training image...
2023-09-26 05:49:31 Training - Training image download completed. Training in progress....[34m[2023-09-26 05:49:50.393 ip-10-0-151-179.ap-south-1.compute.internal:7 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34m[2023-09-26 05:49:50.416 ip-10-0-151-179.ap-south-1.compute.internal:7 INFO profiler_config_parser.py:111] User has disabled profiler.[0m
[34m[2023-09-26:05:49:50:INFO] Imported framework sagemaker_xgboost_container.training[0m
[34m[2023-09-26:05:49:50:INFO] Failed to parse hyperparameter objective value multi:softmax to Json.[0m
[34mReturning the value itself[0m
[34m[2023-09-26:05:49:50:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2023-09-26:05:49:50:INFO] Running XGBoost Sagemaker i