In [None]:
import time
import uuid
import sys
import json
import boto3
import sagemaker

from sagemaker.pytorch import PyTorch
from sagemaker.tuner import (
    CategoricalParameter,
    HyperparameterTuner,
)

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
    ParameterBoolean,
)
from sagemaker.processing import Processor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TuningStep
from sagemaker.dataset_definition.inputs import AthenaDatasetDefinition, DatasetDefinition
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo, ConditionEquals
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.functions import Join
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.pipeline import Pipeline

In [None]:
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name

bucket = 'cgu-poc-sagemaker'

role = sagemaker.get_execution_role()

## Pipeline parameters

In [None]:
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.4xlarge"
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)

training_start = ParameterBoolean(
    name="TrainingStart",
    default_value=False
)

training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.p3.8xlarge"
)

training_instance_count = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=1
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

min_accuracy = ParameterFloat(
    name="Accuracy",
    default_value=0.9
)

## Flow processing step

In [None]:
flow_container_uri = "663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.18.0"
flow_volume_size_in_gb = 100
database = "cgu-poc-analytics"
table = "analytics_feedbacks"
output_name = "64859aa2-7440-486e-abf8-e1f23f9d9314.default"
output_config = {
    output_name:{}
}

## Input - Flow: data-wrangler-feedbacks.flow
flow_input = ProcessingInput(
    source = "./data-wrangler-feedbacks.flow",
    destination = "/opt/ml/processing/flow",
    input_name = "flow",
    s3_data_type = "S3Prefix",
    s3_input_mode = "File",
    s3_data_distribution_type = "FullyReplicated"
)

athena_dataset_definition = AthenaDatasetDefinition(
    catalog = "AwsDataCatalog",
    database = database,
    query_string = f"SELECT * FROM {table}",
    output_format = "PARQUET",
    output_s3_uri = f"s3://{bucket}/processing",
)

dataset_definition = DatasetDefinition(
    athena_dataset_definition = athena_dataset_definition,
    local_path = "/opt/ml/processing/output"
)

athena_input = ProcessingInput(
    destination = "/opt/ml/processing/output",
    input_name = "americanas-dataset",
    dataset_definition = dataset_definition
)

flow_output = ProcessingOutput(
    output_name = output_name,
    source = f"/opt/ml/processing/output/{output_name}",
    destination = f"s3://{bucket}/datasets/",
    s3_upload_mode = "EndOfJob"
)

data_wrangler_processor = Processor(
    role = role,
    image_uri = flow_container_uri,
    instance_count = processing_instance_count,
    instance_type = processing_instance_type,
    volume_size_in_gb = flow_volume_size_in_gb,
)

data_wrangler_step = ProcessingStep(
    name = "DataWranglerProcessingStep",
    processor = data_wrangler_processor,
    inputs = [
        flow_input,
        athena_input
    ], 
    outputs = [
        flow_output
    ],
    job_arguments = [f"--output-config '{json.dumps(output_config)}'"],
)

## Train step

In [None]:
model_prefix = 'models/feedbacks'
output_path = f"s3://{bucket}/{model_prefix}"

In [None]:
estimator = PyTorch(
    entry_point="train.py",
    source_dir="../modeltrain/script",
    role=role,
    framework_version="1.10.2",
    py_version="py38",
    instance_count=training_instance_count,
    instance_type=training_instance_type,
    output_path=output_path,
    hyperparameters={
        "batch-size": 16,
        "epochs": 1,
        "num_labels": 2,
        "backend": "gloo",
    },
    disable_profiler=True,
)

In [None]:
hyperparameter_ranges = {
    "lr": CategoricalParameter([3e-4, 1e-4, 5e-5, 3e-5]),
    "batch-size": CategoricalParameter([4, 8, 16]),
}

# change to accuracy
objective_metric_name = "accuracy"
objective_type = "Maximize"
metric_definitions = [{"Name": "accuracy", "Regex": "=====>#011{'accuracy': ([0-9\\.]+)"}]

In [None]:
tuner = HyperparameterTuner(
    estimator,
    objective_metric_name,
    hyperparameter_ranges,
    metric_definitions,
    max_jobs=2,
    max_parallel_jobs=2,
    objective_type=objective_type,
)

In [None]:
step_train = TuningStep(
    name="BERTTrain",
    tuner=tuner,
    inputs={
        "training": TrainingInput(
            s3_data=data_wrangler_step.properties.ProcessingOutputConfig.Outputs[output_name].S3Output.S3Uri
            # s3_data="s3://cgu-poc-sagemaker/datasets/data-wrangler-feedbacks-2022-06-14T03-02-28/train/feedbacks_train.csv"
        ),
        "testing": TrainingInput(
            # s3_data=data_wrangler_step.properties.ProcessingOutputConfig.Outputs[output_name].S3Output.S3Uri
            s3_data="s3://cgu-poc-sagemaker/datasets/data-wrangler-feedbacks-2022-06-14T03-02-28/test/feedbacks_test.csv"
        ),
    },
)

step_train.add_depends_on([data_wrangler_step])

## Register Model

In [None]:
step_register = RegisterModel(
    name="BERTRegisterModel",
    estimator=estimator,
    content_types=["application/json"],
    model_data=step_train.get_top_model_s3_uri(top_k=0, s3_bucket=f"{bucket}/models/feedbacks"),
    response_types=["application/json"],
    inference_instances=[training_instance_type],
    transform_instances=[training_instance_type],
    model_package_group_name="feedbacks-model-group",
    approval_status=model_approval_status,
)

## Pipeline definition

In [None]:
pipeline_name = "feedbacks-pipeline"

# Combine pipeline steps and create pipeline
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        training_instance_count,
        model_approval_status,
        min_accuracy,
        training_start
    ],
    steps=[
        data_wrangler_step,
        step_train,
        step_register
    ],
)

In [None]:
#pipeline.delete()
pipeline.upsert(role_arn=role)

## Pipeline execution

In [None]:
execution = pipeline.start()

In [None]:
execution.list_steps()

In [None]:
execution.wait()