# Pipeline of Digits

This is a starting notebook for solving the "Pipeline of Digits" assignment.


This notebook was created by [Santiago L. Valdarrama](https://twitter.com/svpino) as part of the [Machine Learning School](https://www.ml.school) program.

In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
import boto3
import sagemaker
import pandas as pd

from pathlib import Path

role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()

## Creating the S3 Bucket

Let's create an S3 bucket where you will upload all the information generated by the pipeline. Make sure you set `BUCKET` to the name of the bucket you want to use. This name has to be unique.

If you want to create a bucket in a region other than `us-east-1`, use this command instead:

```
!aws s3api create-bucket --bucket $BUCKET --create-bucket-configuration LocationConstraint=$region
```

The `LocationConstraint` argument should specify the region where you want to create the bucket.

In [5]:
BUCKET = "pochingto-mlschool"

# !aws s3api create-bucket --bucket $BUCKET

## Loading the dataset

We have two CSV files containing the MNIST dataset. These files come from the [MNIST in CSV](https://www.kaggle.com/datasets/oddrationale/mnist-in-csv) Kaggle dataset.

The `mnist_train.csv` file contains 60,000 training examples and labels. The `mnist_test.csv` contains 10,000 test examples and labels. Each row consists of 785 values: the first value is the label (a number from 0 to 9) and the remaining 784 values are the pixel values (a number from 0 to 255).

Let's extract the `dataset.tar.gz` file.

In [9]:
MNIST_FOLDER = "mnist"
DATASET_FOLDER = Path("dataset")

# !tar -xvzf dataset.tar.gz --no-same-owner

Let's load the first 10 rows of the test set.

In [10]:
df = pd.read_csv(DATASET_FOLDER / "mnist_train.csv")
df.head(10)

Unnamed: 0,label,1x1,1x2,1x3,1x4,1x5,1x6,1x7,1x8,1x9,...,28x19,28x20,28x21,28x22,28x23,28x24,28x25,28x26,28x27,28x28
0,5,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,4,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,9,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
5,2,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
6,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
7,3,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
8,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
9,4,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


## Uploading dataset to S3

In [11]:
S3_FILEPATH = f"s3://{BUCKET}/{MNIST_FOLDER}"


TRAIN_SET_S3_URI = sagemaker.s3.S3Uploader.upload(
    local_path=str(DATASET_FOLDER / "mnist_train.csv"), 
    desired_s3_uri=S3_FILEPATH,
)

TEST_SET_S3_URI = sagemaker.s3.S3Uploader.upload(
    local_path=str(DATASET_FOLDER / "mnist_test.csv"), 
    desired_s3_uri=S3_FILEPATH,
)

print(f"Train set S3 location: {TRAIN_SET_S3_URI}")
print(f"Test set S3 location: {TEST_SET_S3_URI}")

Train set S3 location: s3://pochingto-mlschool/mnist/mnist_train.csv
Test set S3 location: s3://pochingto-mlschool/mnist/mnist_test.csv


In [12]:
df['label']

0        5
1        0
2        4
3        1
4        9
        ..
59995    8
59996    3
59997    5
59998    6
59999    8
Name: label, Length: 60000, dtype: int64

In [13]:
len(df.drop("label", axis=1).select_dtypes(include=['float64', "int64"]).columns.tolist())

784

# Pipeline Configuration

In [14]:
import os
import numpy as np
import json
import numpy as np
import tempfile

from constants import *
from sagemaker.inputs import FileSystemInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig

In [15]:
S3_LOCATION

's3://pochingto-mlschool/mnist'

In [16]:
train_data_location = ParameterString(
    name="train_data_location",
    default_value=f"{S3_LOCATION}/mnist_train.csv",
)

test_data_location = ParameterString(
    name="test_data_location",
    default_value=f"{S3_LOCATION}/mnist_test.csv",
)

pipeline_definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

cache_config = CacheConfig(
    enable_caching=True, 
    expire_after="15d"
)

# Processing Step

In [17]:
%load_ext autoreload
%autoreload 2

import sys
import os
from pathlib import Path

CODE_FOLDER = Path("code")
sys.path.append(f"./{CODE_FOLDER}")

if not os.path.exists(CODE_FOLDER):
    os.mkdir(CODE_FOLDER)

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [22]:
%%writefile {CODE_FOLDER}/preprocessor.py

import os
import numpy as np
import pandas as pd

from pathlib import Path
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from pickle import dump


# This is the location where the SageMaker Processing job
# will save the input dataset.
BASE_DIRECTORY = "/opt/ml/processing"
TRAIN_DATA_FILEPATH = Path(BASE_DIRECTORY) / "input" / "train" / "mnist_train.csv"
TEST_DATA_FILEPATH = Path(BASE_DIRECTORY) / "input" / "test" / "mnist_test.csv"


def _save_splits(base_directory, train, validation, test):
    """
    One of the goals of this script is to output the three
    dataset splits. This function will save each of these
    splits to disk.
    """

    train_path = Path(base_directory) / "train"
    validation_path = Path(base_directory) / "validation"
    test_path = Path(base_directory) / "test"

    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)

    pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        validation_path / "validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(test_path / "test.csv", header=False, index=False)


def _save_pipeline(base_directory, pipeline):
    """
    Saves the Scikit-Learn pipeline that we used to
    preprocess the data.
    """
    pipeline_path = Path(base_directory) / "pipeline"
    pipeline_path.mkdir(parents=True, exist_ok=True)
    dump(pipeline, open(pipeline_path / "pipeline.pkl", "wb"))


def _save_baseline(base_directory, df_train, df_test):
    """
    During the data and quality monitoring steps, we will need a baseline
    to compute constraints and statistics. This function will save that
    baseline to the disk.
    """

    for split, data in [("train", df_train), ("test", df_test)]:
        baseline_path = Path(base_directory) / f"{split}-baseline"
        baseline_path.mkdir(parents=True, exist_ok=True)

        df = data.copy().dropna()
        df.to_json(
            baseline_path / f"{split}-baseline.json", orient="records", lines=True
        )


def preprocess(base_directory, train_data_filepath, test_data_filepath):
    """
    Preprocesses the supplied raw dataset and splits it into a train,
    validation, and a test set.
    """
    
    print("started, reading csv...")
    df_train = pd.read_csv(train_data_filepath)
    df_test = pd.read_csv(test_data_filepath)
    
    print("saving baseline...")
    _save_baseline(base_directory, df_train, df_test)
    
    print("sampling dataframe...")
    df_train = df_train.sample(frac=1, random_state=42)
    df_train, df_validation = train_test_split(df_train, test_size=0.3)
    
    print("defining numeric features and transform...")
    numeric_features = df_train.drop("label", axis=1).select_dtypes(include=['float64', 'int64']).columns.tolist()
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="mean")),
            ("scaler", MinMaxScaler()),
        ]
    )

    preprocessor = ColumnTransformer(
        transformers=[
            ("numeric", numeric_transformer, numeric_features),
        ]
    )

    pipeline = Pipeline(
        steps=[
            ("preprocessing", preprocessor)
        ]
    )
    
    print("defining y_ ...")
    y_train = df_train.label
    y_validation = df_validation.label
    y_test = df_test.label
    
    print("defining X_ ...")
    df_train = df_train.drop(["label"], axis=1)
    df_validation = df_validation.drop(["label"], axis=1)
    df_test = df_test.drop(["label"], axis=1)

    X_train = pipeline.fit_transform(df_train)
    X_validation = pipeline.transform(df_validation)
    X_test = pipeline.transform(df_test)
    
    print("concatenating train/valid/test csv...")
    train = np.concatenate((X_train, np.expand_dims(y_train, axis=1)), axis=1)
    validation = np.concatenate((X_validation, np.expand_dims(y_validation, axis=1)), axis=1)
    test = np.concatenate((X_test, np.expand_dims(y_test, axis=1)), axis=1)
    
    print("saving splits...")
    _save_splits(base_directory, train, validation, test)
    print("saving pipeline...")
    _save_pipeline(base_directory, pipeline=pipeline)
    print("finished")
    
if __name__ == "__main__":
    preprocess(BASE_DIRECTORY, TRAIN_DATA_FILEPATH, TEST_DATA_FILEPATH)

Overwriting code/preprocessor.py


In [21]:
# validate the code
from preprocessor import preprocess

TRAIN_DATA_FILEPATH = "./dataset/mnist_train.csv"
TEST_DATA_FILEPATH = "./dataset/mnist_test.csv"

with tempfile.TemporaryDirectory() as directory:
    preprocess(
        base_directory=directory, 
        train_data_filepath=TRAIN_DATA_FILEPATH,
        test_data_filepath=TEST_DATA_FILEPATH
    )
    
    print(f"Folders: {os.listdir(directory)}")

started, reading csv...
saving baseline...
sampling dataframe...
defining numeric features and transform...
defining y_ ...
defining X_ ...
concatenating train/valid/test csv...
saving splits...
saving pipeline...
finished
Folders: ['train-baseline', 'test-baseline', 'train', 'validation', 'test', 'pipeline']


In [23]:
sklearn_processor = SKLearnProcessor(
    base_job_name="mnist-preprocessing",
    framework_version="0.23-1",
    instance_type="ml.t3.xlarge",
    instance_count=1,
    role=role,
    sagemaker_session=pipeline_session
)

preprocess_data_step = ProcessingStep(
    name="preprocess-data",
    step_args=sklearn_processor.run(
        code=f"{CODE_FOLDER}/preprocessor.py",
        inputs=[
            ProcessingInput(source=train_data_location, destination="/opt/ml/processing/input/train"),
            ProcessingInput(source=test_data_location, destination="/opt/ml/processing/input/test")
        ],
        outputs=[
            ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
            ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
            ProcessingOutput(output_name="pipeline", source="/opt/ml/processing/pipeline"),
            ProcessingOutput(output_name="classes", source="/opt/ml/processing/classes"),
            ProcessingOutput(output_name="train-baseline", source="/opt/ml/processing/train-baseline"),
            ProcessingOutput(output_name="test-baseline", source="/opt/ml/processing/test-baseline"),
        ]
    ),
    cache_config=cache_config
)



In [24]:
session1_pipeline = Pipeline(
    name="mnist-session1-pipeline",
    parameters=[
        train_data_location,
        test_data_location
    ],
    steps=[
        preprocess_data_step, 
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=pipeline_session
)

session1_pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:681340771742:pipeline/mnist-session1-pipeline',
 'ResponseMetadata': {'RequestId': 'e46964bd-7056-4a59-be69-41e72392a345',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'e46964bd-7056-4a59-be69-41e72392a345',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '91',
   'date': 'Mon, 31 Jul 2023 15:23:55 GMT'},
  'RetryAttempts': 0}}

# Training

In [31]:
preprocess(
        base_directory="./tmp", 
        train_data_filepath=TRAIN_DATA_FILEPATH,
        test_data_filepath=TEST_DATA_FILEPATH
    )

started, reading csv...
saving baseline...
sampling dataframe...
defining numeric features and transform...
defining y_ ...
defining X_ ...
concatenating train/valid/test csv...
saving splits...
saving pipeline...
finished


In [6]:
from sagemaker.tuner import HyperparameterTuner
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TuningStep
from sagemaker.parameter import IntegerParameter
from sagemaker.inputs import TrainingInput
from sagemaker.tensorflow import TensorFlow
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.pipeline_context import PipelineSession

In [7]:
from __future__ import print_function
import argparse
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, TensorDataset, DataLoader
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 32, 3, 1)
        self.conv3 = nn.Conv2d(32, 32, 3, 1)
        self.conv4 = nn.Conv2d(32, 32, 3, 1)
        self.conv5 = nn.Conv2d(32, 32, 3, 1)
        
        self.dropout1 = nn.Dropout(0.25)
        self.fc2 = nn.Linear(1152, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2)
        x = self.dropout1(x)
        x = self.conv3(x)
        x = F.relu(x)
        # print(x.shape)
        x = self.conv4(x)
        x = F.relu(x)
        # print(x.shape)
        x = self.conv5(x)
        x = F.relu(x)
        # print(x.shape)
        # print(x.shape)
        x = torch.flatten(x, 1)
        # print(x.shape)
        x = self.fc2(x)
        output = F.softmax(x, dim=1)
        return output

  from .autonotebook import tqdm as notebook_tqdm


In [351]:
model = Net()

In [352]:
x = torch.rand((1, 1, 28, 28))
model(x).shape

torch.Size([1, 10])

In [353]:
train = pd.read_csv("./tmp/train/train.csv", header=None)
X_train = train.iloc[:, :-1].to_numpy()
y_train = train.iloc[:, -1].to_numpy()
validation = pd.read_csv("./tmp/validation/validation.csv", header=None)
X_validation = validation.iloc[:, :-1].to_numpy()
y_validation = validation.iloc[:, -1].to_numpy()

In [354]:
train_dataset = TensorDataset(torch.from_numpy(X_train).reshape((-1, 1, 28, 28)).float(), torch.from_numpy(y_train.astype("int")))
valid_dataset = TensorDataset(torch.from_numpy(X_validation).reshape((-1, 1, 28, 28)).float(), torch.from_numpy(y_validation.astype("int")))

train_loader = DataLoader(train_dataset, batch_size=128)
valid_loader = DataLoader(valid_dataset, batch_size=128)

In [355]:
def train_one_epoch(epoch, model, device, optimizer, train_dataloader, valid_dataloader, loss_func):
    model.train()
    total_loss = 0.0
    for batch_idx, (X, y) in enumerate(train_dataloader):
        X, y = X.to(device), y.to(device)
        y = F.one_hot(y, 10).to(device).float()
        
        optimizer.zero_grad()
        output = model(X)
        loss = loss_func(y, output)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.detach().item()

    print(f"Train Epoch: {epoch} \t Avg. loss: {total_loss / len(train_dataloader.dataset):.6f}")
          
    valid_loss = 0
    correct = 0
    with torch.no_grad():
        for X, y in valid_dataloader:
            X, y = X.to(device), y.to(device)
            y = F.one_hot(y, 10).to(device).float()
            output = model(X)
            loss = loss_func(y, output)
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            target = y.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
            valid_loss += loss.detach().item()

    valid_loss /= len(valid_dataloader.dataset)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        valid_loss, correct, len(valid_dataloader.dataset),
        100. * correct / len(valid_dataloader.dataset)))

In [356]:
%%time
model = model.float()

CPU times: user 393 µs, sys: 3 µs, total: 396 µs
Wall time: 350 µs


In [357]:
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())
device = torch.device("cpu")

In [358]:
%%time
for i in range(10):
    train_one_epoch(i + 1, model, device, optimizer, train_loader, valid_loader, loss_func)

Train Epoch: 1 	 Avg. loss: 0.014387

Test set: Average loss: 0.0139, Accuracy: 12312/18000 (68%)

Train Epoch: 2 	 Avg. loss: 0.013882

Test set: Average loss: 0.0133, Accuracy: 13763/18000 (76%)

Train Epoch: 3 	 Avg. loss: 0.013204

Test set: Average loss: 0.0131, Accuracy: 14139/18000 (79%)

Train Epoch: 4 	 Avg. loss: 0.013104

Test set: Average loss: 0.0131, Accuracy: 14205/18000 (79%)

Train Epoch: 5 	 Avg. loss: 0.013102

Test set: Average loss: 0.0131, Accuracy: 14222/18000 (79%)

Train Epoch: 6 	 Avg. loss: 0.013073

Test set: Average loss: 0.0131, Accuracy: 14263/18000 (79%)



KeyboardInterrupt: 

In [25]:
%%writefile {CODE_FOLDER}/train.py

import os
import argparse

import numpy as np
import pandas as pd

from pathlib import Path
import argparse
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, TensorDataset, DataLoader
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.conv3 = nn.Conv2d(64, 64, 3, 1)
        self.conv4 = nn.Conv2d(64, 64, 3, 1)
        self.conv5 = nn.Conv2d(64, 64, 3, 1)
        
        self.dropout1 = nn.Dropout(0.25)
        self.fc2 = nn.Linear(2304, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2)
        x = self.dropout1(x)
        x = self.conv3(x)
        x = F.relu(x)
        # print(x.shape)
        x = self.conv4(x)
        x = F.relu(x)
        # print(x.shape)
        x = self.conv5(x)
        x = F.relu(x)
        # print(x.shape)
        # print(x.shape)
        x = torch.flatten(x, 1)
        # print(x.shape)
        x = self.fc2(x)
        output = F.softmax(x, dim=1)
        return output
    
def load_data(train_path, validation_path, batch_size):
    train = pd.read_csv(Path(train_path) / "train.csv", header=None)
    X_train = train.iloc[:, :-1].to_numpy()
    y_train = train.iloc[:, -1].to_numpy()
    validation = pd.read_csv(Path(validation_path) / "validation.csv", header=None)
    X_validation = validation.iloc[:, :-1].to_numpy()
    y_validation = validation.iloc[:, -1].to_numpy()
    
    train_dataset = TensorDataset(torch.from_numpy(X_train).reshape((-1, 1, 28, 28)).float(), torch.from_numpy(y_train.astype("int")))
    valid_dataset = TensorDataset(torch.from_numpy(X_validation).reshape((-1, 1, 28, 28)).float(), torch.from_numpy(y_validation.astype("int")))

    train_loader = DataLoader(train_dataset, batch_size=batch_size)
    valid_loader = DataLoader(valid_dataset, batch_size=batch_size)
    
    return train_loader, valid_loader
    
def train_one_epoch(epoch, model, device, optimizer, train_dataloader, valid_dataloader, loss_func):
    model.train()
    total_loss = 0.0
    for batch_idx, (X, y) in enumerate(train_dataloader):
        X, y = X.to(device), y.to(device)
        y = F.one_hot(y, 10).to(device).float()
        
        optimizer.zero_grad()
        output = model(X)
        loss = loss_func(y, output)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.detach().item()

    print(f"Train Epoch: {epoch} \t Avg. loss: {total_loss / len(train_dataloader.dataset):.6f}")
    
    model.eval()
    valid_loss = 0
    correct = 0
    with torch.no_grad():
        for X, y in valid_dataloader:
            X, y = X.to(device), y.to(device)
            y = F.one_hot(y, 10).to(device).float()
            output = model(X)
            loss = loss_func(y, output)
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            target = y.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
            valid_loss += loss.detach().item()

    valid_loss /= len(valid_dataloader.dataset)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        valid_loss, correct, len(valid_dataloader.dataset),
        100. * correct / len(valid_dataloader.dataset)))
    
def train(base_directory, train_path, validation_path, epochs, batch_size):
    device = torch.device("cpu")
    if torch.cuda.is_available():
        device = torch.device("cuda")
    model = Net().to(device).float()
    loss_func = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters())
    
    print(f"Train path: {train_path}")
    print(f"Validation path: {validation_path}")
    train_dataloader, valid_dataloader = load_data(train_path, validation_path, batch_size)
    
    print("Start training...")
    for epoch in range(1, epochs + 1):
        train_one_epoch(epoch, model, device, optimizer, train_dataloader, valid_dataloader, loss_func)
    
    
if __name__ == "__main__":
    # Any hyperparameters provided by the training job are passed to the entry point
    # as script arguments. SageMaker will also provide a list of special parameters
    # that you can capture here. Here is the full list: 
    # https://github.com/aws/sagemaker-training-toolkit/blob/master/src/sagemaker_training/params.py
    parser = argparse.ArgumentParser()
    parser.add_argument("--base_directory", type=str, default="/opt/ml/")
    parser.add_argument("--train_path", type=str, default=os.environ.get("SM_CHANNEL_TRAIN", None))
    parser.add_argument("--validation_path", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION", None))
    parser.add_argument("--epochs", type=int, default=10)
    parser.add_argument("--batch_size", type=int, default=128)
    args, _ = parser.parse_known_args()
    
    base_directory, epochs, train_path, validation_path, batch_size = args.base_directory, args.epochs, args.train_path, args.validation_path, args.batch_size
    train(base_directory, train_path, validation_path, epochs, batch_size)
    

Overwriting code/train.py


In [None]:
from preprocessor import preprocess
from train import train

TRAIN_DATA_FILEPATH = "./dataset/mnist_train.csv"
TEST_DATA_FILEPATH = "./dataset/mnist_test.csv"

with tempfile.TemporaryDirectory() as directory:
    preprocess(
        base_directory=directory, 
        train_data_filepath=TRAIN_DATA_FILEPATH,
        test_data_filepath=TEST_DATA_FILEPATH
    )
    
    print(f"Folders: {os.listdir(directory)}")

    # Then, we train a model using the train and 
    # validation splits.
    train(
        base_directory=directory, 
        train_path=Path(directory) / "train", 
        validation_path=Path(directory) / "validation",
        epochs=3,
        batch_size=128
    )

In [26]:
from sagemaker.pytorch.estimator import PyTorch

estimator = PyTorch(
    entry_point=f"{CODE_FOLDER}/train.py",
    
    hyperparameters={
        "epochs": 10,
        "batch_size": 128,
    },
    
    framework_version="1.12",
    instance_type="ml.m5.xlarge",
    py_version="py38",
    instance_count=1,
    script_mode=True,
    
    # The default profiler rule includes a timestamp which will change each time
    # the pipeline is upserted, causing cache misses. Since we don't need
    # profiling, we can disable it to take advantage of caching.
    disable_profiler=True,

    role=role,
    sagemaker_session=pipeline_session,
)

In [27]:
train_model_step = TrainingStep(
    name="train-model",
    step_args=estimator.fit(
        inputs={
            "train": TrainingInput(
                s3_data=preprocess_data_step.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv"
            ),
            "validation": TrainingInput(
                s3_data=preprocess_data_step.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv"
            )
        }
    ),
    cache_config=cache_config
)

In [28]:
session2_pipeline = Pipeline(
    name="mnist-session2-pipeline",
    parameters=[
        train_data_location,
        test_data_location
    ],
    steps=[
        preprocess_data_step, 
        train_model_step
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=pipeline_session
)

session2_pipeline.upsert(role_arn=role)

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Using provided s3_resource


INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Using provided s3_resource


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:681340771742:pipeline/mnist-session2-pipeline',
 'ResponseMetadata': {'RequestId': 'b91d5edd-7123-46a4-a621-74f9730bfe65',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'b91d5edd-7123-46a4-a621-74f9730bfe65',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '91',
   'date': 'Mon, 31 Jul 2023 15:26:33 GMT'},
  'RetryAttempts': 0}}