# Model Pipeline

## Pipeline Setup

In [1]:
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
import pprint
#import optuna
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import time

In [2]:
import sys

import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [3]:
local_path = "data/sensor_data.csv"

base_uri = f"s3://{default_bucket}/airdata"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sagemaker-us-east-1-728406898807/airdata/sensor_data.csv


In [4]:
processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)

instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)

mse_threshold = ParameterFloat(name="MseThreshold", default_value=0.5)

## Preprocessing Script

In [5]:
%%writefile code/preprocessing.py
import argparse
import os
import requests
import tempfile

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler


TRAIN_SPLIT = 0.7
VAL_SPLIT = 0.15
TEST_SPLIT = 0.15
TARGET_PARAM = "pm25"

base_dir = "/opt/ml/processing"

df_location = pd.read_csv(
    f"{base_dir}/input/sensor_data.csv"
)

# Split training
df_param = df_location[df_location['parameter'] == TARGET_PARAM]  # Filter data for this parameter
train_data = df_param.iloc[:int(len(df_param) * TRAIN_SPLIT)]
train_data = train_data.reset_index(drop=True)

# Split validation
val_data = df_param.iloc[int(len(df_param) * TRAIN_SPLIT):int(len(df_param) * TRAIN_SPLIT) + int(len(df_param) * VAL_SPLIT)]
val_data = val_data.reset_index(drop=True)

# Split testing
test_data = df_param.iloc[int(len(df_param) * TRAIN_SPLIT) + int(len(df_param) * VAL_SPLIT):int(len(df_param) * TRAIN_SPLIT) + int(len(df_param) * VAL_SPLIT) + int(len(df_param) * TEST_SPLIT)]
test_data = test_data.reset_index(drop=True)

# Normalize the training dataset
scaler = StandardScaler()
train_data.loc[:, "value"] = scaler.fit_transform(train_data["value"].values.reshape(-1, 1))
val_data.loc[:, "value"] = scaler.transform(val_data["value"].values.reshape(-1, 1))
test_data.loc[:, "value"] = scaler.transform(test_data["value"].values.reshape(-1, 1))

print("Train Data Shape:", train_data.shape)
print("Validation Data Shape:", val_data.shape)
print("Test Data Shape:", test_data.shape)

pd.DataFrame(train_data).to_csv(f"{base_dir}/train/train.csv")
pd.DataFrame(val_data).to_csv(f"{base_dir}/validation/validation.csv")
pd.DataFrame(test_data).to_csv(f"{base_dir}/test/test.csv")

Overwriting code/preprocessing.py


In [6]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-airdata-process",
    role=role,
    sagemaker_session=pipeline_session,
)

In [7]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocessing.py",
)

step_process = ProcessingStep(name="AirDataProcess", step_args=processor_args)



## Feature Store

After processing features are uploaded to a Feature Store so they can be accessed in later projects or when re-training the model.

In [8]:
boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name="sagemaker",
                                       region_name=region)

featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime", region_name=region
)

feature_store_session = sagemaker.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

In [9]:
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = "airdata-featurestore"

print(default_s3_bucket_name)

sagemaker-us-east-1-728406898807


#### Process Features

Pre-process and perform feature engineering before uploading features to Feature Store

In [10]:
# Run pre-processor once

import argparse
import os
import requests
import tempfile

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler


TRAIN_SPLIT = 0.7
VAL_SPLIT = 0.15
TEST_SPLIT = 0.15
TARGET_PARAM = "pm25"

base_dir = "data"

df_location = pd.read_csv(
    f"{base_dir}/sensor_data.csv"
)

# Split training
df_param = df_location[df_location['parameter'] == TARGET_PARAM]  # Filter data for this parameter
train_data = df_param.iloc[:int(len(df_param) * TRAIN_SPLIT)]
train_data = train_data.reset_index(drop=True)

# Split validation
val_data = df_param.iloc[int(len(df_param) * TRAIN_SPLIT):int(len(df_param) * TRAIN_SPLIT) + int(len(df_param) * VAL_SPLIT)]
val_data = val_data.reset_index(drop=True)

# Split testing
test_data = df_param.iloc[int(len(df_param) * TRAIN_SPLIT) + int(len(df_param) * VAL_SPLIT):int(len(df_param) * TRAIN_SPLIT) + int(len(df_param) * VAL_SPLIT) + int(len(df_param) * TEST_SPLIT)]
test_data = test_data.reset_index(drop=True)

# Normalize the training dataset
scaler = StandardScaler()
train_data.loc[:, "value"] = scaler.fit_transform(train_data["value"].values.reshape(-1, 1))
val_data.loc[:, "value"] = scaler.transform(val_data["value"].values.reshape(-1, 1))
test_data.loc[:, "value"] = scaler.transform(test_data["value"].values.reshape(-1, 1))

print("Train Data Shape:", train_data.shape)
print("Validation Data Shape:", val_data.shape)
print("Test Data Shape:", test_data.shape)

Train Data Shape: (700, 11)
Validation Data Shape: (150, 11)
Test Data Shape: (150, 11)


In [29]:
# Change id to string for better lookup

train_data["measurement_id"] = train_data["measurement_id"].astype(str)  # Convert before ingestion
val_data["measurement_id"] = val_data["measurement_id"].astype(str)  # Convert before ingestion
test_data["measurement_id"] = test_data["measurement_id"].astype(str)  # Convert before ingestion

### Define Feature Groups

In [12]:
from time import gmtime, strftime, sleep

train_feature_group_name = "train-feature-group-" + strftime("%d-%H-%M-%S", gmtime())
val_feature_group_name = "val-feature-group-" + strftime("%d-%H-%M-%S", gmtime())
test_feature_group_name = "test-feature-group-" + strftime("%d-%H-%M-%S", gmtime())

In [13]:
from sagemaker.feature_store.feature_group import FeatureGroup

train_feature_group = FeatureGroup(
    name=train_feature_group_name, sagemaker_session=feature_store_session
)
val_feature_group = FeatureGroup(
    name=val_feature_group_name, sagemaker_session=feature_store_session
)
test_feature_group = FeatureGroup(
    name=test_feature_group_name, sagemaker_session=feature_store_session
)

In [14]:
import time

current_time_sec = int(round(time.time()))


def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == "object":
            data_frame[label] = data_frame[label].astype("str").astype("string")


# cast object dtype to string. The SageMaker FeatureStore Python SDK will then map the string dtype to String feature type.
cast_object_to_string(train_data)
cast_object_to_string(val_data)
cast_object_to_string(test_data)

# record identifier and event time feature names
record_identifier_feature_name = "measurement_id"
event_time_feature_name = "EventTime"

# append EventTime feature
train_data[event_time_feature_name] = pd.Series(
    [current_time_sec] * len(train_data), dtype="float64"
)
val_data[event_time_feature_name] = pd.Series(
    [current_time_sec] * len(val_data), dtype="float64"
)
test_data[event_time_feature_name] = pd.Series(
    [current_time_sec] * len(val_data), dtype="float64"
)

# load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.
train_feature_group.load_feature_definitions(data_frame=train_data)
# output is suppressed
val_feature_group.load_feature_definitions(data_frame=val_data)
# output is suppressed
test_feature_group.load_feature_definitions(data_frame=test_data)
# output is suppressed

[FeatureDefinition(feature_name='measurement_id', feature_type=<FeatureTypeEnum.STRING: 'String'>, collection_type=None),
 FeatureDefinition(feature_name='sensor_id', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>, collection_type=None),
 FeatureDefinition(feature_name='location_id', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>, collection_type=None),
 FeatureDefinition(feature_name='location', feature_type=<FeatureTypeEnum.STRING: 'String'>, collection_type=None),
 FeatureDefinition(feature_name='latitude', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>, collection_type=None),
 FeatureDefinition(feature_name='longitude', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>, collection_type=None),
 FeatureDefinition(feature_name='epoch', feature_type=<FeatureTypeEnum.STRING: 'String'>, collection_type=None),
 FeatureDefinition(feature_name='duration', feature_type=<FeatureTypeEnum.STRING: 'String'>, collection_type=None),
 FeatureDefinition(feature_name='paramet

### Create FeatureGroups in SageMaker FeatureStore

In [15]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")


train_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True,
)

val_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True,
)

test_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True,
)

wait_for_feature_group_creation_complete(feature_group=train_feature_group)
wait_for_feature_group_creation_complete(feature_group=val_feature_group)
wait_for_feature_group_creation_complete(feature_group=test_feature_group)

Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup train-feature-group-19-22-47-28 successfully created.
FeatureGroup val-feature-group-19-22-47-28 successfully created.
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup test-feature-group-19-22-47-28 successfully created.


In [16]:
train_feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:728406898807:feature-group/train-feature-group-19-22-47-28',
 'FeatureGroupName': 'train-feature-group-19-22-47-28',
 'RecordIdentifierFeatureName': 'measurement_id',
 'EventTimeFeatureName': 'EventTime',
 'FeatureDefinitions': [{'FeatureName': 'measurement_id',
   'FeatureType': 'String'},
  {'FeatureName': 'sensor_id', 'FeatureType': 'Integral'},
  {'FeatureName': 'location_id', 'FeatureType': 'Integral'},
  {'FeatureName': 'location', 'FeatureType': 'String'},
  {'FeatureName': 'latitude', 'FeatureType': 'Fractional'},
  {'FeatureName': 'longitude', 'FeatureType': 'Fractional'},
  {'FeatureName': 'epoch', 'FeatureType': 'String'},
  {'FeatureName': 'duration', 'FeatureType': 'String'},
  {'FeatureName': 'parameter', 'FeatureType': 'String'},
  {'FeatureName': 'value', 'FeatureType': 'Fractional'},
  {'FeatureName': 'units', 'FeatureType': 'String'},
  {'FeatureName': 'EventTime', 'FeatureType': 'Fractional'}],
 'CreationTime': datetim

In [17]:
sagemaker_client.list_feature_groups()

{'FeatureGroupSummaries': [{'FeatureGroupName': 'val-feature-group-19-22-47-28',
   'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:728406898807:feature-group/val-feature-group-19-22-47-28',
   'CreationTime': datetime.datetime(2025, 2, 19, 22, 47, 30, 267000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created'},
  {'FeatureGroupName': 'val-feature-group-19-22-38-29',
   'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:728406898807:feature-group/val-feature-group-19-22-38-29',
   'CreationTime': datetime.datetime(2025, 2, 19, 22, 38, 39, 697000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created',
   'OfflineStoreStatus': {'Status': 'Active'}},
  {'FeatureGroupName': 'val-feature-group-19-00-42-14',
   'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:728406898807:feature-group/val-feature-group-19-00-42-14',
   'CreationTime': datetime.datetime(2025, 2, 19, 0, 42, 15, 804000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created',
   'OfflineStoreStatus': {'Status': 'Active'}},
  {'Fe

### Put Records Into Feature Groups

In [18]:
print(len(train_data))

700


In [19]:
train_feature_group.ingest(data_frame=train_data, max_workers=3, wait=True)


IngestionManagerPandas(feature_group_name='train-feature-group-19-22-47-28', feature_definitions={'measurement_id': {'FeatureName': 'measurement_id', 'FeatureType': 'String'}, 'sensor_id': {'FeatureName': 'sensor_id', 'FeatureType': 'Integral'}, 'location_id': {'FeatureName': 'location_id', 'FeatureType': 'Integral'}, 'location': {'FeatureName': 'location', 'FeatureType': 'String'}, 'latitude': {'FeatureName': 'latitude', 'FeatureType': 'Fractional'}, 'longitude': {'FeatureName': 'longitude', 'FeatureType': 'Fractional'}, 'epoch': {'FeatureName': 'epoch', 'FeatureType': 'String'}, 'duration': {'FeatureName': 'duration', 'FeatureType': 'String'}, 'parameter': {'FeatureName': 'parameter', 'FeatureType': 'String'}, 'value': {'FeatureName': 'value', 'FeatureType': 'Fractional'}, 'units': {'FeatureName': 'units', 'FeatureType': 'String'}, 'EventTime': {'FeatureName': 'EventTime', 'FeatureType': 'Fractional'}}, sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7f53346228

In [20]:
val_feature_group.ingest(data_frame=val_data, max_workers=3, wait=True)

IngestionManagerPandas(feature_group_name='val-feature-group-19-22-47-28', feature_definitions={'measurement_id': {'FeatureName': 'measurement_id', 'FeatureType': 'String'}, 'sensor_id': {'FeatureName': 'sensor_id', 'FeatureType': 'Integral'}, 'location_id': {'FeatureName': 'location_id', 'FeatureType': 'Integral'}, 'location': {'FeatureName': 'location', 'FeatureType': 'String'}, 'latitude': {'FeatureName': 'latitude', 'FeatureType': 'Fractional'}, 'longitude': {'FeatureName': 'longitude', 'FeatureType': 'Fractional'}, 'epoch': {'FeatureName': 'epoch', 'FeatureType': 'String'}, 'duration': {'FeatureName': 'duration', 'FeatureType': 'String'}, 'parameter': {'FeatureName': 'parameter', 'FeatureType': 'String'}, 'value': {'FeatureName': 'value', 'FeatureType': 'Fractional'}, 'units': {'FeatureName': 'units', 'FeatureType': 'String'}, 'EventTime': {'FeatureName': 'EventTime', 'FeatureType': 'Fractional'}}, sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7f5334622850

In [21]:
test_feature_group.ingest(data_frame=test_data, max_workers=3, wait=True)

IngestionManagerPandas(feature_group_name='test-feature-group-19-22-47-28', feature_definitions={'measurement_id': {'FeatureName': 'measurement_id', 'FeatureType': 'String'}, 'sensor_id': {'FeatureName': 'sensor_id', 'FeatureType': 'Integral'}, 'location_id': {'FeatureName': 'location_id', 'FeatureType': 'Integral'}, 'location': {'FeatureName': 'location', 'FeatureType': 'String'}, 'latitude': {'FeatureName': 'latitude', 'FeatureType': 'Fractional'}, 'longitude': {'FeatureName': 'longitude', 'FeatureType': 'Fractional'}, 'epoch': {'FeatureName': 'epoch', 'FeatureType': 'String'}, 'duration': {'FeatureName': 'duration', 'FeatureType': 'String'}, 'parameter': {'FeatureName': 'parameter', 'FeatureType': 'String'}, 'value': {'FeatureName': 'value', 'FeatureType': 'Fractional'}, 'units': {'FeatureName': 'units', 'FeatureType': 'String'}, 'EventTime': {'FeatureName': 'EventTime', 'FeatureType': 'Fractional'}}, sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7f533462285

### Test Sample

In [22]:
import pprint

response = sagemaker_client.describe_feature_group(FeatureGroupName=train_feature_group_name)
pprint.pprint(response)

{'CreationTime': datetime.datetime(2025, 2, 19, 22, 47, 28, 590000, tzinfo=tzlocal()),
 'EventTimeFeatureName': 'EventTime',
 'FeatureDefinitions': [{'FeatureName': 'measurement_id',
                         'FeatureType': 'String'},
                        {'FeatureName': 'sensor_id', 'FeatureType': 'Integral'},
                        {'FeatureName': 'location_id',
                         'FeatureType': 'Integral'},
                        {'FeatureName': 'location', 'FeatureType': 'String'},
                        {'FeatureName': 'latitude',
                         'FeatureType': 'Fractional'},
                        {'FeatureName': 'longitude',
                         'FeatureType': 'Fractional'},
                        {'FeatureName': 'epoch', 'FeatureType': 'String'},
                        {'FeatureName': 'duration', 'FeatureType': 'String'},
                        {'FeatureName': 'parameter', 'FeatureType': 'String'},
                        {'FeatureName': 'value', 'Fe

In [27]:
record_identifier_value = str(2302)

record = featurestore_runtime.get_record(
    FeatureGroupName=train_feature_group_name,
    RecordIdentifierValueAsString=record_identifier_value,
)

pprint.pprint(record)

{'Record': [{'FeatureName': 'measurement_id', 'ValueAsString': '2302'},
            {'FeatureName': 'sensor_id', 'ValueAsString': '2000855'},
            {'FeatureName': 'location_id', 'ValueAsString': '947312'},
            {'FeatureName': 'location', 'ValueAsString': 'Canyon ES (2795)'},
            {'FeatureName': 'latitude', 'ValueAsString': '34.03213'},
            {'FeatureName': 'longitude', 'ValueAsString': '-118.51198'},
            {'FeatureName': 'epoch', 'ValueAsString': '2022-02-19 06:36:52'},
            {'FeatureName': 'duration', 'ValueAsString': '0 days 00:03:00'},
            {'FeatureName': 'parameter', 'ValueAsString': 'pm25'},
            {'FeatureName': 'value', 'ValueAsString': '-0.46964278273851745'},
            {'FeatureName': 'units', 'ValueAsString': 'µg/m³'},
            {'FeatureName': 'EventTime', 'ValueAsString': '1740005248.0'}],
 'ResponseMetadata': {'HTTPHeaders': {'content-length': '1013',
                                      'content-type': 'applic

A record can be successfully pulled from the Feature Store, indicating that features are available for future work.

## Model Training

In [23]:
%%writefile code/train.py

import argparse
import os
import pandas as pd
import numpy as np
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset

# ---------------------------
# Dataset definition
# ---------------------------
class TimeSeriesDataset(Dataset):
    def __init__(self, data, seq_len=30, pred_len=1):
        """
        Args:
        - data (pd.DataFrame or np.array): Time series with a column 'value'
        - seq_len (int): Number of timesteps in the input sequence
        - pred_len (int): Number of timesteps in the output sequence
        """
        self.data = np.array(data["value"]) if isinstance(data, pd.DataFrame) else np.array(data)
        self.seq_len = seq_len
        self.pred_len = pred_len

    def __len__(self):
        """Returns total number of sequences available"""
        return max(0, len(self.data) - self.seq_len - self.pred_len)

    def __getitem__(self, idx):
        """Retrieves input sequence and target sequence"""
        if idx >= len(self):
            raise IndexError(f"Index {idx} out of bounds for dataset length {len(self)}")

        x = self.data[idx : idx + self.seq_len]
        y = self.data[idx + self.seq_len : idx + self.seq_len + self.pred_len]

        # Return tensors for PyTorch
        return torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.float32)

# ---------------------------
# Model definition
# ---------------------------
class LSTM(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=1):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        out, _ = self.lstm(x.unsqueeze(-1), (h0, c0))
        # out is (batch_size, seq_len, hidden_size)
        # We want the last timestep
        out = out[:, -1, :]
        out = self.fc(out)  # shape (batch_size, 1)
        return out

# ---------------------------
# Main training function
# ---------------------------
def main():
    parser = argparse.ArgumentParser()

    # Channels for data paths (SageMaker will populate these automatically)
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--validation", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION"))
    parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))

    # Hyperparameters
    parser.add_argument("--seq-len", type=int, default=20)
    parser.add_argument("--pred-len", type=int, default=1)
    parser.add_argument("--batch-size", type=int, default=8)
    parser.add_argument("--epochs", type=int, default=50)
    parser.add_argument("--hidden-size", type=int, default=50)
    parser.add_argument("--num-layers", type=int, default=1)
    parser.add_argument("--lr", type=float, default=0.001)

    args = parser.parse_args()

    # ---------------------------
    # Load data
    # ---------------------------
    # Processor wrote "train.csv", "validation.csv", and "test.csv"
    # into the respective directories: /opt/ml/input/data/train, etc.
    train_csv = os.path.join(args.train, "train.csv")
    val_csv   = os.path.join(args.validation, "validation.csv")
    test_csv  = os.path.join(args.test, "test.csv")

    train_data = pd.read_csv(train_csv)
    val_data   = pd.read_csv(val_csv)
    test_data  = pd.read_csv(test_csv)

    # ---------------------------
    # Create PyTorch datasets & loaders
    # ---------------------------
    train_dataset = TimeSeriesDataset(train_data, seq_len=args.seq_len, pred_len=args.pred_len)
    val_dataset   = TimeSeriesDataset(val_data, seq_len=args.seq_len, pred_len=args.pred_len)
    test_dataset  = TimeSeriesDataset(test_data, seq_len=args.seq_len, pred_len=args.pred_len)

    train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
    val_loader   = DataLoader(val_dataset, batch_size=args.batch_size, shuffle=False)
    test_loader  = DataLoader(test_dataset, batch_size=args.batch_size, shuffle=False)

    # ---------------------------
    # Initialize model, loss, optimizer
    # ---------------------------
    model = LSTM(
        input_size=1,
        hidden_size=args.hidden_size,
        num_layers=args.num_layers
    )

    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=args.lr)

    # ---------------------------
    # Training Loop
    # ---------------------------
    for epoch in range(args.epochs):
        # Training
        model.train()
        train_loss = 0.0
        for inputs, targets in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        train_loss /= len(train_loader)

        # Validation
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for val_inputs, val_targets in val_loader:
                val_outputs = model(val_inputs)
                val_loss += criterion(val_outputs, val_targets).item()
        val_loss /= len(val_loader)

        # Test
        model.eval()
        test_loss = 0.0
        with torch.no_grad():
            for test_inputs, test_targets in test_loader:
                test_outputs = model(test_inputs)
                test_loss += criterion(test_outputs, test_targets).item()
        test_loss /= len(test_loader)

        print(f"Epoch [{epoch+1}/{args.epochs}] "
              f"TrainLoss: {train_loss:.4f} "
              f"ValLoss: {val_loss:.4f} "
              f"TestLoss: {test_loss:.4f}")
        
    # ---------------------------
    # Save the model
    # ---------------------------
    # Uploads /opt/ml/model to S3 after training
    model_dir = os.environ.get("SM_MODEL_DIR", "/opt/ml/model")
    model_path = os.path.join(model_dir, "model.pt")
    torch.save(model.state_dict(), model_path)
    print(f"Model saved to {model_path}")

if __name__ == "__main__":
    main()


Overwriting code/train.py


In [24]:
from sagemaker.pytorch import PyTorch
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

pytorch_estimator = PyTorch(
    entry_point="train.py",
    source_dir="code",
    role=role,
    framework_version="2.0",
    py_version="py310",
    instance_count=1,
    instance_type="ml.m5.xlarge",
    hyperparameters={
        "seq-len": 20,
        "pred-len": 1,
        "batch-size": 8,
        "epochs": 50,
        "hidden-size": 50,
        "num-layers": 1,
        "lr": 0.001
    },
    sagemaker_session=pipeline_session,
    metric_definitions=[
        {
            "Name": "test_mse",
            "Regex": r"TestLoss:\s+([0-9\.]+)"
        }
    ]
)


In [25]:
step_train = TrainingStep(
    name="TrainStep",
    estimator=pytorch_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri
        ),
    },
)

## Inference Code

In [26]:
%%writefile code/inference.py

import json
import os
import torch
import numpy as np
from torch import nn

class LSTM(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=1):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        out, _ = self.lstm(x.unsqueeze(-1), (h0, c0))
        # out is (batch_size, seq_len, hidden_size)
        # We want the last timestep
        out = out[:, -1, :]
        out = self.fc(out)  # shape (batch_size, 1)
        return out

def model_fn(model_dir):
    """
    Loads the model from the model_dir. This is invoked by SageMaker once at
    container startup to initialize your model. The returned object is passed
    to `predict_fn` for every inference request.
    """
    # Create model with the same architecture/hyperparams as training
    model = LSTM(input_size=1, hidden_size=50, num_layers=1)
    # Load state dict from model.pt
    model_path = os.path.join(model_dir, "model.pt")
    model.load_state_dict(torch.load(model_path, map_location=torch.device("cpu")))
    model.eval()
    return model

def input_fn(request_body, request_content_type):
    """
    Deserializes the incoming request body into a PyTorch tensor.
    - If you expect JSON, parse it.
    - If you expect CSV, parse differently, etc.
    """
    if request_content_type == "application/json":
        # Example: request_body = '{"data": [12.3, 45.6, 78.9, ...]}'
        data = json.loads(request_body)["data"]
        # Convert to a float32 tensor. Suppose it's a 1D series (seq_len).
        inputs = torch.tensor([data], dtype=torch.float32)
        return inputs
    else:
        raise ValueError(f"Unsupported content type: {request_content_type}")

def predict_fn(input_object, model):
    """
    Performs prediction on the deserialized input.
    """
    with torch.no_grad():
        # input_object shape = (batch=1, seq_len)
        preds = model(input_object)  # shape = (batch=1, 1)
    return preds

def output_fn(prediction, response_content_type):
    """
    Serializes the prediction output.
    """
    if response_content_type == "application/json":
        # Convert the tensor to a Python float
        result = prediction.squeeze().item()  # single float
        return json.dumps({"prediction": result})
    else:
        raise ValueError(f"Unsupported response content type: {response_content_type}")


Overwriting code/inference.py


In [27]:
from sagemaker.pytorch.model import PyTorchModel
from sagemaker.workflow.model_step import ModelStep

inference_model = PyTorchModel(
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
    sagemaker_session=pipeline_session,
    framework_version="2.0",
    py_version="py310",
    entry_point="inference.py",
    source_dir="code",
)

step_create = ModelStep(
    name="AirDataCreateModel",
    step_args=inference_model.create(instance_type="ml.m5.large"),
)



In [28]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = None  # metrics we want?
register_args = inference_model.register(
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name="AirDataModelGroup",
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

step_register = ModelStep(
    name="RegisterAirDataModel",
    step_args=register_args
)

step_register = ModelStep(name="AirDataRegisterModel", step_args=register_args)

## Model Evaluation

In [29]:
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="MSEFail",
    error_message=Join(on=" ", values=["Execution failed due to MSE >", mse_threshold])
)

test_mse = step_train.properties.FinalMetricDataList[0].Value

step_check_loss = ConditionStep(
    name="CheckTestLoss",
    conditions=[
        ConditionLessThanOrEqualTo(
            left=test_mse,
            right=mse_threshold
        )
    ],
    if_steps=[
        step_create, step_register
    ],
    else_steps=[step_fail],
)


## Create Pipeline

In [30]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"AirDataPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        mse_threshold,
    ],
    steps=[
        step_process,
        step_train,
        step_check_loss,
    ],
)

In [31]:
import json

definition = json.loads(pipeline.definition())
definition

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-728406898807/airdata/sensor_data.csv'},
  {'Name': 'MseThreshold', 'Type': 'Float', 'DefaultValue': 0.5}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'AirDataProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '683313688378.dkr.ecr.us-east-1.amazona

## Execute Pipeline

In [32]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()
try:
    execution.wait()
except Exception as error:
    print(error)

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


In [33]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:728406898807:pipeline/AirDataPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:728406898807:pipeline/AirDataPipeline/execution/37dgozc3o6h0',
 'PipelineExecutionDisplayName': 'execution-1739925798619',
 'PipelineExecutionStatus': 'Succeeded',
 'PipelineExperimentConfig': {'ExperimentName': 'airdatapipeline',
  'TrialName': '37dgozc3o6h0'},
 'CreationTime': datetime.datetime(2025, 2, 19, 0, 43, 18, 555000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2025, 2, 19, 0, 51, 2, 787000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:728406898807:user-profile/d-0og3bvbn5ajw/kdevoe-studio-1',
  'UserProfileName': 'kdevoe-studio-1',
  'DomainId': 'd-0og3bvbn5ajw',
  'IamIdentity': {'Arn': 'arn:aws:sts::728406898807:assumed-role/LabRole/SageMaker',
   'PrincipalId': 'AROA2TGDPHB33P5GE3SPR:SageMaker'}},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:728406898807:user-profi

In [34]:
execution.list_steps()

[{'StepName': 'AirDataCreateModel-CreateModel',
  'StartTime': datetime.datetime(2025, 2, 19, 0, 51, 0, 250000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 2, 19, 0, 51, 2, 435000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:728406898807:model/pipelines-37dgozc3o6h0-AirDataCreateModel-C-quRFFKqOy1'}},
  'AttemptCount': 1},
 {'StepName': 'AirDataRegisterModel-RegisterModel',
  'StartTime': datetime.datetime(2025, 2, 19, 0, 50, 57, 705000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 2, 19, 0, 50, 59, 963000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:728406898807:model-package/AirDataModelGroup/1'}},
  'AttemptCount': 1},
 {'StepName': 'AirDataCreateModel-RepackModel-0',
  'StartTime': datetime.datetime(2025, 2, 19, 0, 48, 42, 992000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 2, 19, 0, 50, 59, 552000, tzinfo=tzloc

## Call Inference Endpoint