# Step 0 : Set up the Environment

In [None]:

import boto3
import os
import numpy as np
import pandas as pd
import sagemaker
import sys
import time
import sagemaker.session
from sagemaker.workflow.pipeline_context import PipelineSession


region = boto3.Session().region_name
account_id = boto3.client("sts").get_caller_identity()["Account"]
mlops_bucket_ = "mlops-{}-{}".format(region, account_id) #+ region + "-" + account_id
data_bucket_ = "data-{}-{}".format(region, account_id)

sagemaker_session = sagemaker.session.Session(default_bucket=mlops_bucket_)
pipeline_session = PipelineSession(default_bucket=mlops_bucket_)
role = sagemaker.get_execution_role()

model_package_group_name = f"AbalonePackageGroup"

print("AWS Region:", region)
print("AWS Account ID:", account_id)
print("mlops_bucket", mlops_bucket_)
print("data_bucket", data_bucket_)
print("Role", role)
print("model_package_group_name", model_package_group_name)


# Step 1: Upload the Dataset to S3 Bucket

In [None]:
#Verify the raw data is available 
#raw data for the lab is available in "s3://sagemaker-sample-files/datasets/tabular/uci_abalone/abalone.csv"
#!wget https://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.data -O /root/mlops-workshop/data/abalone.csv

# Create directory for storing data file
!mkdir -p data
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.data -O data/abalone.csv

# Download the data to local
# !aws s3 cp s3://sagemaker-sample-files/datasets/tabular/uci_abalone/abalone.csv /root/mlops-workshop/data/abalone.csv

!ls -ltr data/

In [None]:
# Upload the raw data to data bucket.

local_path = "data/abalone.csv"
data_prefix = "input/raw"

base_uri = f"s3://{data_bucket_}/{data_prefix}"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(input_data_uri)

# Step 2: Define Pipeline Parameters

In [None]:
from sagemaker.workflow.parameters import (ParameterInteger,ParameterString,ParameterFloat)
from sagemaker.workflow.execution_variables import ExecutionVariables

#Define Instance count and type paramters for Processing Step
processing_instance_type="ml.m5.xlarge"
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)


#Define Instance type parameter for Training Step
training_instance_type="ml.m5.xlarge"
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)

#Image Repository
image_uri="{}.dkr.ecr.{}.amazonaws.com/abalone:latest".format(account_id, region)

#pipeline_execution_id=ExecutionVariables.PIPELINE_EXECUTION_ID
#pipeline_name=ExecutionVariables.PIPELINE_NAME

#Define Model Approval Status Parameter
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="Approved",  # ModelApprovalStatus can be set to a default of "PendingManualApproval" if you don't want default approval.
    )
#model_approval_status = "Approved"

# Define Data Input Parameter for Training
input_data = input_data_uri

mlops_bucket = ParameterString(name="mlops_bucket", default_value=mlops_bucket_)
data_bucket = ParameterString(name="data_bucket", default_value=data_bucket_)

# Define MSE Threshold Parameter for Model Evaluation
mse_threshold = ParameterFloat(name="MseThreshold", default_value=6.0)

# Step 3: Define a Processing Step for Feature Engineering

In [None]:
#Verify if the directory for the ETL processing is created
#!ls -ltr mlops-workshop/etl

In [None]:
%%writefile /root/mlops-workshop/sgPipeline/abalone/preprocessing.py
import argparse
import os
import requests
import tempfile
import numpy as np
import pandas as pd


from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


# Because this is a headerless CSV file, specify the column names here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/abalone.csv",
        header=None, 
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)
    )
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler())
        ]
    )

    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore"))
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features)
        ]
    )
    
    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)
    
    X = np.concatenate((y_pre, X_pre), axis=1)
    
    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])

    
    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(f"{base_dir}/train/validate.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import FrameworkProcessor

framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    role=role,
    #sagemaker_session=sagemaker_session,
    sagemaker_session=pipeline_session,
)


In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.functions import Join
    

step_process = ProcessingStep(
    name="ETL",
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(source=Join(
                        on="/",
                        values=[
                        "s3:/", data_bucket,
                        "input/raw/abalone.csv",
                            ],
                        ), 
                      destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",
            destination=Join(
                on="/",
                values=[
                    "s3:/", mlops_bucket,
                    ExecutionVariables.PIPELINE_NAME,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "input/train",
                ],
            ),
        ),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/test",
            destination=Join(
                on="/",
                values=[
                    "s3:/", mlops_bucket,
                    ExecutionVariables.PIPELINE_NAME,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "input/test",
                ],
            ),
        )
    ],
    code="/root/mlops-workshop/sgPipeline/abalone/preprocessing.py",
)

print("step_process:", step_process)

# Step 4: Define a Training step

In [None]:
#Training step using Tensorflow framework
from sagemaker.estimator import Estimator
from sagemaker.workflow.execution_variables import ExecutionVariables

training_parameters = {
    'epochs': "2000", 
    'layers': "2", 
    'dense_layer': "64",
    'batch_size': "8"
}

estimator = sagemaker.estimator.Estimator(
    image_uri=image_uri,
    role=role,
    instance_count=training_instance_count,
    instance_type=training_instance_type,
    output_path=Join(
                on="/",
                values=[
                    "s3:/", mlops_bucket,
                    ExecutionVariables.PIPELINE_NAME,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "output",
                ],
    ),
    #sagemaker_session=sagemaker.Session(),
    sagemaker_session=pipeline_session,
    hyperparameters=training_parameters,
)


In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

step_train = TrainingStep(
    name="Train",
    estimator=estimator,
    inputs={
        "training": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
)

print("step_train", step_train)

# Step 5: Define a Processing Step for Model Evaluation 

In [None]:
%%writefile /root/mlops-workshop/sgPipeline/abalone/evaluation.py
import json
import logging
import sys
import os
import tarfile
import pandas as pd
import numpy as np
import traceback
import tensorflow as tf
from tensorflow import keras
from sklearn.metrics import mean_squared_error
from sklearn import preprocessing

tf.get_logger().setLevel('ERROR')

prefix = '/opt/ml/processing/'
# Sagemaker stores the dataset copied from S3
input_path = os.path.join(prefix, 'input')
# If something bad happens, write a failure file with the error messages and store here
output_path = os.path.join(prefix, 'output')
evaluation_path =  os.path.join(output_path, 'evaluation')

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

def load_model():
    logger.info("Load Pre-Trained Model")
    model_path = os.path.join(input_path, "model/model.tar.gz")
    with tarfile.open(model_path) as tar_file:
        tar_file.extractall(".")
    model = tf.keras.models.load_model("model.h5")
    model.compile(optimizer="adam", loss="mse")
    return model

if __name__ == "__main__":
    logger.info("Evaluation mode ...")
    
    try:
        test_path = os.path.join(input_path, "testing")
    
        # Load 'h5' keras model
        model = load_model()

        # Specify the Column names in order to manipulate the specific columns for pre-processing
        column_names = ["rings", "length", "diameter", "height", "whole weight", 
            "shucked weight", "viscera weight", "shell weight", "sex_F", "sex_I", "sex_M"]
        
        logger.info("Reading test data.")
        test_data = pd.read_csv(os.path.join(test_path, 'test.csv'), sep=',', names=column_names)
        y_test = test_data['rings'].to_numpy()
        x_test = test_data.drop(['rings'], axis=1).to_numpy()
        x_test = preprocessing.normalize(x_test)
        
        #run predictions
        predictions_ = model.predict(x_test)
        
        # Calculate the metrics
        mse = mean_squared_error(y_test, predictions_)
        rmse = mean_squared_error(y_test, predictions_, squared=False)
        std = np.std(np.array(y_test) - np.array(predictions_))
        # Save Metrics to S3 for Model Package
        logger.info("Root Mean Square Error: {}".format(rmse))
        logger.info("Mean Square Error: {}".format(mse))
        logger.info("Standard Deviation: {}".format(std))
        report_dict = {
            "regression_metrics": {
                'rmse': {
                    'value': rmse
                },
                'mse': {
                    'value': mse,
                },
                'standard_deviation': {
                    'value': std,
                },
            },
        }
        logger.info("Writing out evaluation report with mse: %f and std: %f", mse, std)
        if not os.path.exists(evaluation_path):
            os.makedirs(evaluation_path)
        evaluation_file = f"{evaluation_path}/evaluation.json"
        with open(evaluation_file, "w") as f:
            f.write(json.dumps(report_dict))
        logger.info("Model evaluation completed.")
            
    except Exception as e:
        # Write out an error file. This will be returned as the failureReason in the
        # `DescribeTrainingJob` result.
        trc = traceback.format_exc()
        with open(os.path.join(output_path, 'failure'), 'w') as s:
            s.write('Exception during training: ' + str(e) + '\\n' + trc)
        # Printing this causes the exception to be in the training job logs, as well.
        print('Exception during training: ' + str(e) + '\\n' + trc, file=sys.stderr)
        # A non-zero exit code causes the training job to be marked as Failed.
        sys.exit(255)

In [None]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
#    base_job_name="Evaluate",
    role=role,
)

In [None]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)
step_eval = ProcessingStep(
    name="Evaluate",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/input/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/input/testing"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/output/evaluation",
            destination=Join(
                on="/",
                values=[
                    "s3:/", mlops_bucket,
                    ExecutionVariables.PIPELINE_NAME,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "evaluation",
                ],
            )
        )
    ],
    code="/root/mlops-workshop/sgPipeline/abalone/evaluation.py",
    property_files=[evaluation_report],
)

print("step_eval", step_eval)

# Step 6: Define a RegisterModel Step to Create a Model Package 

In [None]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.functions import Join

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(
            on="/",
            values=[
                    "s3:/", mlops_bucket,
                    ExecutionVariables.PIPELINE_NAME,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "evaluation/evaluation.json",
            ],
        ),
        content_type="application/json",
    )
)

step_register = RegisterModel(
    name="Register",
    estimator=estimator,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge", "ml.c5.large"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)

print("step_register", step_register)

# Step 7: Define a Fail Step to Terminate the SageMaker Pipeline Exexution If Model Accuraacy is below required Threshold

In [None]:

from sagemaker.workflow.fail_step import FailStep

step_fail = FailStep(
    name="Fail",
    error_message=Join(on=" ", values=["Execution failed due to MSE greater than", mse_threshold]),
)

print("step_fail", step_fail)

# Step 8: Define a Condition Step to Verify Model Accuracy 

In [None]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
)
from sagemaker.workflow.functions import JsonGet

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step=step_eval,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=6.0 #normally 6.0, 0.0 if we want to trigger the bad model detection.
)


In [None]:
step_cond = ConditionStep(
    name="Condition",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[step_fail], 
)

print("step_cond", step_cond)

# Step 9: Finally Create a pipeline

In [None]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"MLOps-SageMaker-Pipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        training_instance_count,
        model_approval_status,
        input_data,
        mse_threshold,
        mlops_bucket,
        data_bucket
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)


# View Pipeline Definition Json

In [None]:
#To View the pipeline
import json

json.loads(pipeline.definition())

# Store Pipeline Definition JSON into File

In [None]:
#To View the pipeline
import json
#json.loads(pipeline.definition())
mlops_pipeline = json.loads(pipeline.definition())

# Writing to mlops-sm-pipeline.json
with open("/root/mlops-workshop/sgPipeline/abalone/mlops_sm_pipeline_defn.json", "w") as outfile:
    outfile.write(json.dumps(mlops_pipeline, indent=4))

# Parameterize the JSON file

In [None]:
!sed -i 's/-[0-9]\{12,\}/-<AccountId>/g' /root/mlops-workshop/sgPipeline/abalone/mlops_sm_pipeline_defn.json
!sed -i 's/:[0-9]\{12,\}/:<AccountId>/g' /root/mlops-workshop/sgPipeline/abalone/mlops_sm_pipeline_defn.json
!sed -i 's/'"$AWS_ACCOUNT_ID"'/<AccountId>/g' /root/mlops-workshop/sgPipeline/abalone/mlops_sm_pipeline_defn.json 
!sed -i 's/us-east-1/<Region>/g' /root/mlops-workshop/sgPipeline/abalone/mlops_sm_pipeline_defn.json 
!sed -i 's/"\RoleArn\"\: \".*/"\RoleArn\"\: \" \"\,/' /root/mlops-workshop/sgPipeline/abalone/mlops_sm_pipeline_defn.json

In [None]:
# Create or Update the Pipeline

pipeline.upsert(role_arn=role)

In [None]:
#Start the Execution of the Pipeline

execution = pipeline.start()

In [None]:
#To examine a pipeline execution

execution.describe()

In [None]:
execution.wait()

In [None]:
execution.list_steps()