# Activity Recognition on Amazon SageMaker

To reproduce this example, run the cells below. Note that running some of the cells will call SageMaker APIs, such as the ones for training and deployment, which may incur charges. Be sure to cleanup all resources at the end. 

## Prepare libraries

We will be using the SageMaker SDK to interact with the SageMaker service. We will use version 2.181.0 of the SDK. We will also use the numpy library for data manipulation.

In [None]:
!pip install -r requirements.txt

In [None]:
from sagemaker import Session
from sagemaker import get_execution_role
from sagemaker.pytorch import PyTorch
from sagemaker.s3 import S3Uploader
from numpy.random import rand
from numpy import argmax, float32

We setup SageMaker constructs including a SageMaker session, execution role, and default bucket for data and model artifacts.

In [None]:
session = Session()
role = get_execution_role()
bucket = session.default_bucket()

## Prepare and save data

### Download data:

We first download the data from the UCI machine learning repository and unzip the dataset into the data/ folder

In [None]:
!wget -q -O uci_har_dataset.zip https://archive.ics.uci.edu/ml/machine-learning-databases/00240/UCI%20HAR%20Dataset.zip
!unzip -q uci_har_dataset.zip && mv 'UCI HAR Dataset' data
!rm uci_har_dataset.zip

### Upload data:

We'll upload the data to the default SageMaker bucket in our Region. Feel free to change the bucket to a custom bucket in your account.

In [None]:
s3_data_location = f's3://{bucket}/activity-recognition/data'

In [None]:
S3Uploader.upload(
    local_path = 'data/', 
    desired_s3_uri = s3_data_location, 
    sagemaker_session=session
)

## Create training script

With SageMaker script mode, we provide a training script and use a prebuilt framework container to run the script. In this case, we will write a PyTorch script for PyTorch version 1.13.1. We will split our model definition and our training script into two different files and put them in the same directory (sagemaker_scripts).

In [None]:
!mkdir sagemaker_scripts

In [None]:
%%writefile sagemaker_scripts/model.py

""" Model definition """
from torch.nn import (
    Dropout,
    Linear,
    LSTM,
    Module,
    ReLU,
    Softmax
)

class LSTMClassifier(Module):
    """
    A PyTorch LSTM implementation.

    Methods:
        __init__(self, input_dim, hidden_dim, output_dim):
            Initializes the neural network.

        forward(self, x):
            Defines the forward pass of the neural network.
    """
    def __init__(self, input_dim, hidden_dim, output_dim):
        """
        Initialize the neural network module.

        Args:
            input_dim (int): Number of input features.
            hidden_dim (int): Number of hidden nodes in the neural network.
            output_dim (int): Number of output nodes in the neural network.
        """
        super(LSTMClassifier, self).__init__()
        self.lstm = LSTM(input_dim, hidden_dim, batch_first=True)
        self.relu = ReLU()
        self.dropout = Dropout(0.2)
        self.fc = Linear(hidden_dim, output_dim)
        self.softmax = Softmax(dim=1)

    def forward(self, x):
        """
        Forward pass of the PyTorch module.

        Args:
            x (torch.Tensor): Input tensor to the module.

        Returns:
            torch.Tensor: Output tensor after applying the forward computation.
        """
        lstm_output, _ = self.lstm(x)
        x = lstm_output[:, -1, :] # only take the last timestamp's state (since LSTM is recursive)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc(x)
        x = self.softmax(x)
        return x

The below script is a fairly standard PyTorch training script. 

To load the data into memory, we use several functions specific to the file structure of this dataset. These functions were derived from [this](https://machinelearningmastery.com/how-to-develop-rnn-models-for-human-activity-recognition-time-series-classification/) tutorial. 

For the model, we use an LSTM plus a linear output layer equal to the number of activities we want to predict (in this case, 6). We use softmax as our final activation to get the probabilities of each class from the network. We use categorical cross entropy for the loss function, the Adam optimizer, and a learning rate scheduler that halves the learning rate every 10 epochs. 

To make the script play nicely with SageMaker we parse arguments such as batch size, learning rate, and data location that get passed to our script during invocation. We also make sure to save our model in the ```/opt/ml/model``` directory. This is the directory expected by SageMaker and allows SageMaker to copy our model artifacts to S3 upon completion of the training job.

In [2]:
%%writefile sagemaker_scripts/torch_train.py

""" SageMaker Training Script """

# Sagemaker imports
import argparse
import os

# other imports
import logging
from numpy import (
    arange,
    argmax,
    count_nonzero,
    dstack,
    zeros
)
from pandas import read_csv
from torch import (
    cuda as torch_cuda,
    device as torch_device,
    no_grad as torch_no_grad,
    Tensor
)
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR
from torch.nn import CrossEntropyLoss
from torch.jit import (
    save as torch_jit_save,
    script as torch_jit_script
)
from torch.utils.data import DataLoader, TensorDataset
from model import LSTMClassifier

# setup logging
logging.basicConfig(level=logging.INFO)

def load_file(filepath):
    """
    Loads a single file as a numpy array

    Args:
        filepath (string): Local path to file

    Returns:
        numpy array: Array containing data from file
    """
    print('Reading from file: ', filepath)
    df = read_csv(filepath, header=None, delim_whitespace=True)
    return df.values

def load_file_group(filenames, prefix=''):
    """
    Loads a list of files and return as a 3d numpy array

    Args:
        filenames (list of strings): List of filenames containing input data
        prefix (string): Optional prefix for local filepath

    Returns:
        numpy array: 3d numpy array containing data from files
    """
    data_group = []
    for filename in filenames:
        data = load_file(prefix + filename)
        data_group.append(data)

    return dstack(data_group)

def load_dataset_type(dataset_type, prefix=''):
    """
    Loads a dataset type, such as train or test

    Args:
        dataset_type (string): Group name such as 'train' or 'test'
        prefix (string): Optional prefix for local filepath

    Returns:
        numpy array: Features
        numpy array: Labels
    """
    data_folder_path = prefix + dataset_type + '/Inertial Signals/'
    filenames = []
    for signal_type in ['total_acc', 'body_acc', 'body_gyro']:
        for axis in ['x', 'y', 'z']:
            filenames.append(f'{signal_type}_{axis}_{dataset_type}.txt') 

    X = load_file_group(filenames = filenames, prefix = data_folder_path)
    y = load_file(prefix + dataset_type + '/y_'+dataset_type+'.txt')
    return X, y

def one_hot_encode(inputs):
    """
    One hot encodes a Numpy array

    Args:
        inputs (numpy array): 1d numpy array containing class indices

    Returns:
        numpy array: 2d numpy array with one hot encoded class indices
    """
    outputs = zeros((inputs.size, inputs.max() + 1))
    outputs[arange(inputs.size), inputs] = 1
    return outputs

def load_dataset(prefix=''):
    """
    Loads the dataset

    Args:
        prefix (string): Optional prefix for local filepath

    Returns:
        numpy array: Features for train dataset
        numpy array: Labels for train dataset
        numpy array: Features for test dataset
        numpy array: Labels for test dataset
    """
    # load train dataset
    train_X, train_y = load_dataset_type(dataset_type = 'train', prefix = prefix + '/')
    # load test dataset
    test_X, test_y = load_dataset_type(dataset_type = 'test', prefix = prefix + '/')

    # offset class values to start at zero
    train_y = train_y - 1
    test_y = test_y - 1

    # one-hot encode labels
    train_y = train_y.reshape(len(train_y)) # array of single-label arrays -> array of labels
    test_y = test_y.reshape(len(test_y)) 
    train_y = one_hot_encode(train_y) # one hot encode train labels for neural network
    return train_X, train_y, test_X, test_y

def train(model, train_X, train_y, test_X, test_y, num_epochs = 20, batch_size = 64, learning_rate = 0.01):
    """
    Trains the model on the given dataset and evaluates model after each epoch

    Args:
        model (PyTorch NN module): Model to be trained
        train_X (numpy array): Features for train dataset
        train_y (numpy array): Labels for train dataset
        test_X (numpy array): Features for test dataset
        test_y (numpy array): Labels for test dataset
        num_epochs (int): Optional, specifies number of epochs to train for, default=20
        batch_size (int): Optional, specifies batch size for training, default=64
        learning_rate (float): Optional, specifies initial learning rate for training, default=0.01

    Returns:
        PyTorch NN module: Trained model
    """
    device = torch_device('cuda' if torch_cuda.is_available() else 'cpu')
    model.to(device) # pass model to GPU if present

    train_X_tensor = Tensor(train_X)
    train_y_tensor = Tensor(train_y)
    train_dataset = TensorDataset(train_X_tensor, train_y_tensor) 
    train_dataloader = DataLoader(train_dataset, batch_size = batch_size)

    cat_loss = CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=learning_rate)
    scheduler = StepLR(optimizer, step_size=10, gamma=0.5)

    for i in range(num_epochs):
        model.train()
        for batch, labels in train_dataloader:
            batch, labels = batch.to(device), labels.to(device) # pass data to GPU if present
            optimizer.zero_grad()
            outputs = model(batch)
            loss = cat_loss(outputs, labels)
            loss.backward()
            optimizer.step()
  
        curr_lr = optimizer.param_groups[0]['lr']
        scheduler.step() # update the learning rate

        # evaluate after each epoch
        model.eval()
        with torch_no_grad():
            test_X_tensor = Tensor(test_X)
            test_X_tensor = test_X_tensor.to(device)
            test_outputs = model(test_X_tensor)
            predictions = argmax(test_outputs.cpu().detach().numpy(), axis=-1)
            accuracy = count_nonzero(predictions == test_y) / len(test_y)

        logging.info(f'Epoch {i + 1} of {num_epochs}, Loss: {loss.item()}, Test Accuracy: {accuracy}, LR: {curr_lr}')

    return model

if __name__ == "__main__":

    parser = argparse.ArgumentParser()
    parser.add_argument('--data', type=str, default=os.environ.get('SM_CHANNEL_DATA')) # usually this is split into train and test
    parser.add_argument('--model_dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    parser.add_argument('--epochs', type=int, default=20)
    parser.add_argument('--learning-rate', type=float, default=0.01)
    parser.add_argument('--batch-size', type=int, default=64)
    args = parser.parse_args()

    # load data
    train_X, train_y, test_X, test_y = load_dataset(prefix=args.data)
    n_timesteps, n_features, n_outputs = train_X.shape[1], train_X.shape[2], train_y.shape[1]

    # fit model
    model = LSTMClassifier(n_features, 100, n_outputs)
    trained_model = train(model, train_X, train_y, test_X, test_y, args.epochs, args.batch_size, args.learning_rate)

    # pickle and save the model
    model_path = os.path.join('/opt/ml/model', 'model.pt')
    trained_model = torch_jit_script(trained_model)
    torch_jit_save(trained_model, model_path)


Overwriting sagemaker_scripts/torch_train.py


## Create SageMaker Estimator

Next we will define our hyperparameters and SageMaker Estimator. As mentioned earlier, we will use the PyTorch framework container with PyTorch version 1.13.1 and Python3.9. We provide our main training script as the ```entry_point``` parameter and the directory containing all of our files (including a requirements.txt if we needed to provide one) in the ```source_dir``` parameter. 

We will use a single ml.g5.xlarge instance for this training job. The ml.g5.xlarge instance has 4 vCPU, 16 GB memory, 1 NVIDIA A10G GPU, and 24 GB GPU memory. We will also specify ```keep_alive_period_in_seconds``` to make use of SageMaker Warm Pools, so that subsequent training runs start faster. Delete this parameter if you are only running a single training job.

In [None]:
hyperparameters = {
    'epochs': 50, 
    'batch-size': 64, 
    'learning-rate': 0.02
}

In [None]:
torch_estimator = PyTorch(
    entry_point='torch_train.py',
    source_dir='sagemaker_scripts',
    role=role,
    instance_count=1,
    instance_type='ml.g5.xlarge',
    py_version='py39',
    framework_version='1.13.1',
    output_path=f's3://{bucket}/activity-recognition/outputs/',
    hyperparameters=hyperparameters,
    keep_alive_period_in_seconds=900 # warm pool configuration for repeated training (15 mins)
)

Now we will call fit() on our Estimator and pass in our input data to start the training job. This process will take a few minutes to complete. We should see from the logs that the model reaches ~88% accuracy. If you do not see that performance, you may need to change the learning rate or number of epochs in the hyperparameter dictionary above.

Note: It is a common pattern to first prepare that data (preprocess and split into train/val/test for example) before passing it to the training job and provide the fit() method with multiple data channels (i.e. train and test). However, in this example, the data is already processed and split by folders. Therefore, we will just pass the data in a single channel called 'data'.

In [None]:
torch_estimator.fit({"data": s3_data_location})

## Deploy model

To deploy our model, we will simply call the deploy() function on our Estimator. For deployment, we will use a CPU instance, the ml.m5.4xlarge with 16 vCPU and 64 GB memory. Feel free to change this to a different instance type, depending on your performance and cost requirements (make sure to pick an instance size that has enough memory of GPU memory to fit the model). This process will take a few minutes to complete. 

Note: If we have run the training job in the past but loss the ```torch_estimator``` variable (this can sometimes happen if the kernel is restarted), we could create a SageMaker Model object using the model artifact location in S3 and calling deploy() on the Model object.

In [None]:
predictor = torch_estimator.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.4xlarge'
)

In [None]:
dummy_inputs = rand(1, 128, 9).astype(float32)

result = predictor.predict(dummy_inputs)

In [17]:
result

array([[2.02133990e-08, 1.16042474e-06, 9.99998808e-01, 5.32531297e-10,
        9.05011201e-14, 1.52982560e-09]])

We have 6 predictions, one for each class. In order to get the class label, we will take the index with the highest predicted probability and map it to the label.

In [18]:
# get the activity with the highest predicted probability
activity_index = argmax(result)

In [19]:
# get class labels
activities = []
with open('data/activity_labels.txt', 'r') as f:
    lines = f.readlines()
    for line in lines:
        activities.append(line.rstrip('\n')[2:])

In [20]:
print('Predicted activity: ', activities[activity_index])

Predicted activity:  WALKING_DOWNSTAIRS


## Clean-Up

Make sure to delete the resources created by this notebook to prevent unnecessary charges.

In [None]:
predictor.delete_endpoint()