In [1]:
import os
import time
import boto3
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import sagemaker
from sagemaker import get_execution_role
from sagemaker.workflow.pipeline_context import PipelineSession
import tensorflow as tf
from tensorflow.keras import datasets, layers, models
from tensorflow.keras.utils import to_categorical

In [2]:
sess = boto3.Session()
sm = sess.client("sagemaker")
role = get_execution_role()
sagemaker_session = sagemaker.Session(boto_session=sess)
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name
model_package_group_name = "TF2-California-Housing"  # Model name in model registry
prefix = "tf2-california-housing-pipelines"
pipeline_name = "testing-please-work2"  # SageMaker Pipeline name
current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())

In [3]:
data_dir = os.path.join(os.getcwd(), "data")
os.makedirs(data_dir, exist_ok=True)

raw_dir = os.path.join(os.getcwd(), "data/raw")
os.makedirs(raw_dir, exist_ok=True)

In [4]:
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data() 

In [5]:
np.save(os.path.join(raw_dir, "x_train.npy"), train_images)
np.save(os.path.join(raw_dir, "x_test.npy"), train_labels)
np.save(os.path.join(raw_dir, "y_train.npy"), test_images)
np.save(os.path.join(raw_dir, "y_test.npy"), test_labels)
rawdata_s3_prefix = "{}/data/raw".format(prefix)
raw_s3 = sagemaker_session.upload_data(path="./data/raw/", key_prefix=rawdata_s3_prefix)
print(raw_s3)

s3://sagemaker-us-east-1-690806730396/tf2-california-housing-pipelines/data/raw


In [6]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat

# raw input data
input_data = ParameterString(name="InputData", default_value=raw_s3)

# training step parameters
training_epochs = ParameterString(name="TrainingEpochs", default_value="100")

# model performance step parameters
accuracy_mse_threshold = ParameterFloat(name="AccuracyMseThreshold", default_value=0.75)

# Inference step parameters
endpoint_instance_type = ParameterString(name="EndpointInstanceType", default_value="ml.m5.large")

In [7]:
%%writefile preprocess.py

#! /usr/bin/env/python

import numpy as np
import os
from tensorflow.keras.utils import to_categorical
import tensorflow as tf
from tensorflow.keras import datasets
import numpy as np

if __name__ == "__main__":

    fashion_mnist = tf.keras.datasets.fashion_mnist
    (train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data() 
    
    train_label_ohe = to_categorical(train_labels)
    test_label_ohe = to_categorical(test_labels)
    
    trainX = train_images.reshape((train_images.shape[0], 28, 28, 1))
    train_images_scaled = trainX / 255.0
    testX = test_images.reshape((test_images.shape[0], 28, 28, 1))
    test_images_scaled = testX / 255.0
    
    np.save(os.path.join("/opt/ml/processing/x_train", "x_train.npy"), train_images_scaled)
    #np.save(os.path.join("/opt/ml/processing/output", "x_test.npy"), test_images_scaled)
    np.save(os.path.join("/opt/ml/processing/y_train", "y_train.npy"), train_label_ohe)
    #np.save(os.path.join(raw_dir, "y_test.npy"), test_label_ohe)

Writing preprocess.py


In [8]:
from sagemaker.tensorflow import TensorFlowProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession

framework_version = "2.4.1"
python_version = "py37"
BASE_DIR = os.path.dirname(os.path.realpath("Untitled.ipynb"))

# Create SKlearn processor object,
# The object contains information about what instance type to use, the IAM role to use etc.
# A managed processor comes with a preconfigured container, so only specifying version is required.
tensorflow_processor = TensorFlowProcessor(
    framework_version=framework_version,
    role=role,
    instance_type="ml.m5.large",
    instance_count=1,
    base_job_name="tf2-california-housing-processing-job",
    py_version = python_version,
    sagemaker_session = PipelineSession()
)

step_args = tensorflow_processor.run(
    code='preprocess.py',
    outputs=[
        ProcessingOutput(output_name="x_train", source="/opt/ml/processing/x_train"),
        ProcessingOutput(output_name="y_train", source="/opt/ml/processing/y_train"),
    ] # add processing.py and requirements.txt here
)

# Use the sklearn_processor in a Sagemaker pipelines ProcessingStep
step_preprocess_data = ProcessingStep(
    name="Preprocess-Fashion-MNIST-Data",
    step_args=step_args
)


Job Name:  tf2-california-housing-processing-job-2022-08-10-05-54-05-941
Inputs:  [{'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-690806730396/tf2-california-housing-processing-job-2022-08-10-05-54-05-941/source/sourcedir.tar.gz', 'LocalPath': '/opt/ml/processing/input/code/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'entrypoint', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-690806730396/tf2-california-housing-processing-job-2022-08-10-05-54-05-941/source/runproc.sh', 'LocalPath': '/opt/ml/processing/input/entrypoint', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'x_train', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-690806730396/tf2-california-housing-processing-job-2022-08-10-05-54-05-941/outp



In [9]:
%%writefile evaluate.py

import os
import json
import subprocess
import sys
import numpy as np
import pathlib
import tarfile


def install(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])


if __name__ == "__main__":

    install("tensorflow==2.4.1")
    import tensorflow as tf
    from tensorflow.keras import layers, models
    
    model = models.Sequential()
    model.add(layers.Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', input_shape=(28, 28, 1)))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Flatten())
    model.add(layers.Dense(100, activation='relu', kernel_initializer='he_uniform'))
    model.add(layers.Dense(10, activation='softmax'))

    test_path1 = "/opt/ml/processing/x_train/"
    test_path2 = "/opt/ml/processing/y_train/"
    x_train = np.load(os.path.join(test_path1, "x_train.npy"))
    y_train = np.load(os.path.join(test_path2, "y_train.npy"))
    
    model.compile(optimizer='adam',
              loss='categorical_crossentropy',
              metrics=['accuracy'])
    
    model.fit(x_train, y_train, validation_split=0.15, epochs=5)

Writing evaluate.py


In [10]:
from sagemaker.workflow.properties import PropertyFile
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version2 = "0.20.0"

# Create SKLearnProcessor object.
# The object contains information about what container to use, what instance type etc.
evaluate_model_processor = SKLearnProcessor(
    framework_version=framework_version2,
    instance_type="ml.m5.large",
    instance_count=1,
    base_job_name="tf2-california-housing-evaluate",
    role=role,
)

# Create a PropertyFile
# A PropertyFile is used to be able to reference outputs from a processing step, for instance to use in a condition step.
# For more information, visit https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html

# Use the evaluate_model_processor in a Sagemaker pipelines ProcessingStep.
step_evaluate_model = ProcessingStep(
    name="Train-Fashion-MNIST-Model",
    processor=evaluate_model_processor,
    inputs=[
        ProcessingInput(
            source=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[
                "x_train"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/x_train",
        ), ProcessingInput(
            source=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[
                "y_train"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/y_train",
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="evaluate.py",
)

In [11]:
from sagemaker.workflow.pipeline import Pipeline

# Create a Sagemaker Pipeline.
# Each parameter for the pipeline must be set as a parameter explicitly when the pipeline is created.
# Also pass in each of the steps created above.
# Note that the order of execution is determined from each step's dependencies on other steps,
# not on the order they are passed in below.
boto_session = boto3.Session(region_name=region)
sagemaker_client = boto_session.client("sagemaker")

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        training_epochs,
        accuracy_mse_threshold,
        endpoint_instance_type,
    ],
    steps=[step_preprocess_data, step_evaluate_model],
    sagemaker_session=PipelineSession(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        default_bucket=None,
    )
)



In [12]:
import json

definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-690806730396/tf2-california-housing-pipelines/data/raw'},
  {'Name': 'TrainingEpochs', 'Type': 'String', 'DefaultValue': '100'},
  {'Name': 'AccuracyMseThreshold', 'Type': 'Float', 'DefaultValue': 0.75},
  {'Name': 'EndpointInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.large'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'Preprocess-Fashion-MNIST-Data',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.large',
      'InstanceCount': 1,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '763104351884.dkr.ecr.us-east-1.amazonaws.com/tensorflow-training:2.4.1-cpu-py37',
     'ContainerEntrypoint': ['/bin/bash',
      '/opt/ml/processing/inpu

In [13]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:690806730396:pipeline/testing-please-work2',
 'ResponseMetadata': {'RequestId': 'bd768d99-20f9-4b50-88b8-abd805ade1f6',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'bd768d99-20f9-4b50-88b8-abd805ade1f6',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '88',
   'date': 'Wed, 10 Aug 2022 05:54:06 GMT'},
  'RetryAttempts': 0}}

In [14]:
execution = pipeline.start()