# End-to-End Auto ML Workflow with Your Own StyleGAN2 Image

## Prerequisites

Running this notebook requires permissions in addition to the normal SageMakerFullAccess permissions. This is because it creates an Lambda Function and new repositories in Amazon ECR. For simplicity of illustration, you can just add the following managed policies to the notebook instance role: <em><strong>AmazonEC2ContainerRegistryFullAccess</strong></em> and <em><strong>AmazonSageMakerPipelinesIntegrations</strong></em>.

The Lambda function also needs an IAM role that allows it to deploy a SageMaker Endpoint. The role ARN must be provided in the LambdaStep.  For simplicity of illustration, you can just add the managed policy <em><strong>SageMakerFullAccess</strong></em> to the Lambda Function role.

Please remember you should always use IAM policies with least privileges as per AWS IAM best practices.

## Setup

In [406]:
import json
import boto3
import sagemaker

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
account_id = sagemaker_session.account_id()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()

## Data Download and Upload

Please visit the original dataset on Kaggle [here](https://www.kaggle.com/datasets/splcher/animefacedataset?resource=download) and download the data in file format(.jpg) to the default_bucket with the prefix of "My-StyleGAN2-Pipeline/animeface"

## Training Image Prepare

Clone the original StyleGAN2 Git repo to local.

In [None]:
!git clone https://github.com/NVlabs/stylegan2-ada-pytorch 

Let's print the training_loop.py, we need to modify the original training_loop.py so that whenever it saves a checkpoint, it also updates the model in the directory that will be saved to S3 as final result.  You can refer to the <em><strong>training_loop_modified.py</strong></em> directly and replace the original <em><strong>stylegan2-ada-pytorch/training/training_loop.py</strong></em> or you can make the changes yourself.

Mainly, just make the following changes:

<strong>original:</strong>

if rank == 0:

    with open(snapshot_pkl, 'wb') as f:
        pickle.dump(snapshot_data, f)

<strong>new:</strong>

if rank == 0:

    with open(snapshot_pkl, 'wb') as f:
       pickle.dump(snapshot_data, f)

    with open(model_pkl, 'wb') as f:
       pickle.dump(snapshot_data, f)

In [None]:
!cat stylegan2-ada-pytorch/training/training_loop.py

Then let's build the image locally and then push to ECR. 

In [None]:
%%sh

# The name of our algorithm
algorithm_name=mystylegan2

account=$(aws sts get-caller-identity --query Account --output text)

# Get the region defined in the current configuration (default to us-west-2 if none defined)
region=$(aws configure get region)
region=${region:-us-west-2}

fullname="${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:latest"

# If the repository doesn't exist in ECR, create it.

aws ecr describe-repositories --repository-names "${algorithm_name}" > /dev/null 2>&1

if [ $? -ne 0 ]
then
    aws ecr create-repository --repository-name "${algorithm_name}" > /dev/null
fi

# Get the login command from ECR and execute it directly
$(aws ecr get-login --region ${region} --no-include-email)

# Get the login command from ECR in order to pull down the SageMaker PyTorch image
$(aws ecr get-login --registry-ids 763104351884 --region ${region} --no-include-email)

# Build the docker image locally with the image name and then push it to ECR
# with the full name.

docker build  -t ${algorithm_name} . --build-arg REGION=${region}
docker tag ${algorithm_name} ${fullname}
docker push ${fullname}

Let's print the training image name

In [407]:
algorithm_name='mystylegan2'
stylegan2_image="{}.dkr.ecr.{}.amazonaws.com/{}:latest".format(account_id, region, algorithm_name)
stylegan2_image

'357224784104.dkr.ecr.us-west-2.amazonaws.com/mystylegan2:latest'

## Define Parameters and Placeholders

In [408]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat

#### Process Placeholders

In [409]:
process_input_data_uri = 's3://{}/My-StyleGAN2-Pipeline/animeface/'.format(default_bucket)
process_input_data_path = ParameterString(name="ProcessInput",default_value=process_input_data_uri,)
process_instance_count = ParameterInteger(name="ProcessInstanceCount",default_value=1)
process_instance_type = ParameterString(name="ProcessInstancetType",default_value='ml.m5.xlarge',)

#### Train Placeholders

In [410]:
image_uri = stylegan2_image
output_uri = 's3://{}/My-StyleGAN2-Pipeline/Model'.format(default_bucket)
checkpoint_uri = 's3://{}/My-StyleGAN2-Pipeline/checkpoints'.format(default_bucket)

train_instance_count = ParameterInteger(name="TrainInstanceCount",default_value=1)
train_instance_type = ParameterString(name="TrainInstancetType",default_value='ml.p3.2xlarge',)
train_checkpoint_path = ParameterString(name="TrainCheckpointPath",default_value=checkpoint_uri)
train_output_path = ParameterString(name="TrainOutputlPath",default_value=output_uri)# we write the final model to the same S3 directory as the inferencing source codes
train_image = ParameterString(name="TrainImage",default_value=image_uri,)

#### Inference Placeholders

In [411]:
#Upload the inference code to S3.  
source_code_uri = sagemaker_session.upload_data('stylegan2-ada-pytorch', key_prefix='My-StyleGAN2-Pipeline/Inference')
#Upload a test image
inference_img_uri = sagemaker_session.upload_data('test.png', key_prefix='My-StyleGAN2-Pipeline/InferenceImg')

inference_code_path = ParameterString(name="InferenceCodePath",default_value=source_code_uri)
inference_image_path = ParameterString(name="InferenceImgPath",default_value=inference_img_uri)
inference_instance_count = ParameterInteger(name="InferenceInstanceCount",default_value=1)
inference_instance_type = ParameterString(name="InferenceInstancetType",default_value='ml.g4dn.2xlarge',)

## Define Pipeline Steps

In [412]:
from sagemaker.workflow.pipeline_context import PipelineSession
pipeline_session = PipelineSession()

#### Define Processing Step

Preprocessing the animeface dataset into format that fits StyleGAN2.

In [413]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.sklearn.processing import SKLearnProcessor
framework_version = "0.23-1"

# Configure Processor
sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=process_instance_type,
    instance_count=process_instance_count,
    role=role,
)

# Configure ProcessingStep
step_process = ProcessingStep(
    name="stylegan2Process",
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(source=process_input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
      ProcessingOutput(output_name="train", source="/opt/ml/processing/train")
    ],
    ## Processing Arguments
    job_arguments=['--source', '/opt/ml/processing/input/',
                   '--dest','/opt/ml/processing/train/animeface.zip',
                   '--width', '256',
                   '--height','256',],
    code="code_pipeline/dataset_tool.py",
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it.


#### Define Training Step

Define the training step.  Here we are using the training image we build earlier.  All the checkpoints will be saved to /opt/ml/checkpoints on the training instance, and will be synced with the checkpoint_s3_uri we specified.

In [414]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.estimator import Estimator

# Configure training parameters
def json_encode_hyperparameters(hyperparameters):
    return {str(k): json.dumps(v) for (k, v) in hyperparameters.items()}
params = {"data": "/opt/ml/input/data/train/animeface.zip",
                   "gpus": 1,
                   "augpipe": "bg",
                   "gamma": 10,
                   "cfg": "paper256", 
                   "mirror": 1, 
                   "snap": 10,
                   "metrics": "none",
                   "kimg": 1,
                   "outdir": "/opt/ml/checkpoints"}
hyperparameters = json_encode_hyperparameters(params)

# Configure the estimator
estimator_stylegan2 = Estimator(
    role=role,
    image_uri=train_image,
    train_instance_count=train_instance_count,
    train_instance_type=train_instance_type,
    hyperparameters=hyperparameters,
    disable_profiler=True,
    checkpoint_s3_uri=train_checkpoint_path,
    checkpoint_local_path='/opt/ml/checkpoints',
    output_path= train_output_path,
)

# Configure Training Step
step_train = TrainingStep(
    name="stylegan2train",
    estimator = estimator_stylegan2,
    inputs={
        "train": TrainingInput(s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, 
                               content_type = 'application/x-image'),
    },
    depends_on = [step_process],
)

train_instance_count has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.
train_instance_type has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


#### Define Inference Step

Define processing step for batch inference with trained model.

In [415]:
#Initialize the PyTorchProcessor
pytorch_processor = PyTorchProcessor(
    framework_version='1.10.2',
    role=get_execution_role(),
    instance_type=inference_instance_type,
    instance_count=inference_instance_count,
    base_job_name='stylegan2_batch_inference',
    py_version = 'py38'
)

# Configure ProcessingStep
step_process_inference = ProcessingStep(
    name="stylegan2Inference",
    processor=pytorch_processor,
    inputs=[
      # input 1: source code 
      ProcessingInput(source=inference_code_path,destination="/opt/ml/processing/input"),
      # input 2: trained model
      ProcessingInput(source=step_train.properties.ModelArtifacts.S3ModelArtifacts,destination="/opt/ml/processing/model"),
      # input 3: test image
      ProcessingInput(source=inference_image_path,destination="/opt/ml/processing/data")
    ],
    outputs=[
      ProcessingOutput(output_name="result", source="/opt/ml/processing/output/test")
    ],
    code="code_pipeline/inference.sh",
    depends_on=[step_train]
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it.


#### Define Notification Lambda Step

In [416]:
%%writefile lambda_deployer.py

"""
This Lambda function sents Email to SNS Topic end users once the inference is complete and notify them the output directory on S3.
"""

import json
import boto3

def lambda_handler(event, context):
    """ """
    sns_client = boto3.client("sns")
  
    output_s3_dir = event["output_s3_dir"]
    msg = 'The Inference is done!  The output has been stored at: '+str(output_s3_dir)
    response = sns_client.publish(
    TopicArn='<Your Topic Arn Here>',
    Message=msg,
    Subject='StyleGAN2 Inference',
)
    return {
        "statusCode": 200,
        "body": json.dumps("Message Sent!"),
    }

Overwriting lambda_deployer.py


In [418]:
# Define Lambda Step
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)

function_name = "StyleGAN2_pipeline_callback"

# Lambda helper class can be used to create the Lambda function
func = Lambda(
    function_name=function_name,
    execution_role_arn=lambda_role,
    script="lambda_deployer.py",
    handler="lambda_deployer.lambda_handler",
    timeout=600,
    memory_size=10240,
)

# Lambda Step Input
output_s3_dir = step_process_inference.properties.ProcessingOutputConfig.Outputs["result"].S3Output.S3Uri
# Lambda Step Output
output_param_1 = LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name="body", output_type=LambdaOutputTypeEnum.String)

step_lambda = LambdaStep(
    name="stylegan2Notification",
    lambda_func=func,
    inputs={
        "output_s3_dir": output_s3_dir,
    },
    outputs=[output_param_1, output_param_2],
    depends_on=[step_process_inference]
)

#### Execute Pipeline

In [419]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"stylegan2pipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[process_input_data_path, 
                process_instance_count, 
                process_instance_type, 
                train_instance_count, 
                train_instance_type, 
                train_checkpoint_path, 
                train_output_path,
                train_image,
                inference_image_path,
                inference_code_path,
                inference_instance_count,
                inference_instance_type
               ],
    steps=[step_process, step_train, step_process_inference, step_lambda],
)
pipeline.upsert(role_arn=role)
execution = pipeline.start()

List all the steps that has been executed.

In [None]:
execution.list_steps()

## Clean Up

In [None]:
import boto3
import time

sm_client = boto3.client("sagemaker")

def delete_sagemaker_pipeline(sm_client, pipeline_name):
    try:
        sm_client.delete_pipeline(
            PipelineName=pipeline_name,
        )
        print("{} pipeline deleted".format(pipeline_name))
    except Exception as e:
        print("{} \n".format(e))
        return
    
delete_sagemaker_pipeline(sm_client, pipeline_name)