![Author](https://img.shields.io/badge/Author-Soufiane%20AAZIZI-brightgreen)
[![Medium](https://img.shields.io/badge/Medium-Follow%20Me-blue)](https://medium.com/@aazizi.soufiane)
[![GitHub](https://img.shields.io/badge/GitHub-Follow%20Me-lightgrey)](https://github.com/aazizisoufiane)
[![LinkedIn](https://img.shields.io/badge/LinkedIn-Connect%20with%20Me-informational)](https://www.linkedin.com/in/soufiane-aazizi-phd-a502829/)

---
# Streamlining Machine Learning Model Deployment with CI/CD and MLOps

In today's ever-evolving landscape of data science and machine learning, proficiency in managing and deploying machine learning models has transitioned from a desirable skill to an absolute necessity. Aspiring data scientists are no longer solely responsible for building sophisticated models; they are also expected to seamlessly integrate these models into real-world applications. This is where the convergence of Continuous Integration and Continuous Deployment (CI/CD) and Machine Learning Operations (MLOps) takes center stage.

In this Jupyter Notebook, we embark on an exciting journey into the realm of CI/CD and MLOps. Our goal is to demystify these crucial concepts and underscore their significance for emerging data scientists. Through hands-on exploration, we will delve into a project that harnesses the power of Amazon SageMaker and AWS Step Functions to simplify the entire process of data preprocessing, model training, and deployment. This project serves as an invaluable stepping stone for data scientists eager to acquire essential CI/CD and MLOps skills, further enhancing their appeal in today's competitive job market.

Join us on this educational journey as we unlock the potential of CI/CD and MLOps in the context of machine learning, empowering you to take your data science expertise to the next level.
## 


## Table of Contents
- [Installing Required Packages](#Installing)
- [Imports](#imports)
- [Initialization](#initialization)
- [Data Preprocessing](#data-preprocessing)
- [Model Training](#model-training)
- [Model Evaluation](#model-evaluation)
- [Results and Discussion](#results-and-discussion)
- [Conclusion](#conclusion)

## Installing Required Packages <a class="anchor" id="Installin"></a>
Before we begin, let's make sure we have all the necessary Python packages installed. You can run the following command to install any missing packages:


In [None]:
!pip install --upgrade pip

In [None]:
!pip install stepfunctions  omegaconf  nb-black

In [None]:
! pip install -U sagemaker

### Auto-formatting and Auto-Reloading Configuration

In [1]:
%load_ext lab_black
%load_ext autoreload
%autoreload 2

## Imports <a class="anchor" id="imports"></a>
- Group all import statements here.

In [None]:
import os
import time
import boto3
import sagemaker
import stepfunctions
from config import config
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.huggingface import HuggingFace
from stepfunctions import steps
from botocore.config import Config
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow
from stepfunctions.steps import (
    Chain,
    ChoiceRule,
    ModelStep,
    ProcessingStep,
    TrainingStep,
    TransformStep,
    Parallel,
)
from sagemaker.processing import ProcessingInput, ProcessingOutput
from dotenv import load_dotenv, find_dotenv

## Initialization <a class="anchor" id="initialization"></a>
- Set up your environment, load data, and configure settings.

In [2]:
# Load environment variables from a .env file
_ = load_dotenv(find_dotenv())

# Retrieve the SageMaker workflow execution role from environment variables
workflow_execution_role = os.getenv("SAGEMAKER_WORKFLOW_ROLE")


NameError: name 'load_dotenv' is not defined

In [261]:
execution_input = ExecutionInput(
    schema={
        "JobName": str,
        "PreprocessingJobName": str,
        "TrainingJobName": str,
        "ModelName": str,
        "EndpointName": str,
    }
)

In [262]:
# Define a Fail state for handling SageMaker processing job failures
failed_state_sagemaker_processing_failure = stepfunctions.steps.states.Fail(
    "ML Workflow failed",
    cause="SageMakerProcessingJobFailed"
)

# Define a Catch state for capturing specific errors and transitioning to the Fail state
catch_state_processing = stepfunctions.steps.states.Catch(
    error_equals=["States.TaskFailed"],
    next_step=failed_state_sagemaker_processing_failure
)


In [263]:
failed_state_sagemaker_training_failure = stepfunctions.steps.states.Fail(
    "ML Training failed", cause="SageMakerTrainingJobFailed"
)

catch_state_training = stepfunctions.steps.states.Catch(
    error_equals=["States.TaskFailed"],
    next_step=failed_state_sagemaker_training_failure,
)

In [263]:
failed_state_sagemaker_training_failure = stepfunctions.steps.states.Fail(
    "ML Save Model failed", cause="SageMakerSaveModelJobFailed"
)

catch_state_save_model = stepfunctions.steps.states.Catch(
    error_equals=["States.TaskFailed"],
    next_step=failed_state_sagemaker_training_failure,
)

In [264]:
failed_state_sagemaker_inference_failure = stepfunctions.steps.states.Fail(
    "ML Inference failed", cause="SageMakerInferenceFailed"
)

catch_state_inference = stepfunctions.steps.states.Catch(
    error_equals=["States.TaskFailed"],
    next_step=failed_state_sagemaker_inference_failure,
)

#### Sagemaker Configuration

In [255]:
# Create a SageMaker Boto3 client with custom configuration
sm_boto = boto3.client(
    "sagemaker",
    config=Config(connect_timeout=5, read_timeout=60, retries={"max_attempts": 60}),
)

# Initialize a SageMaker session using the custom Boto3 client
sagemaker_session = sagemaker.Session(sagemaker_client=sm_boto)

# Retrieve the AWS region from the SageMaker session
region = sagemaker_session.boto_session.region_name

# Get the execution role required for SageMaker operations
role = get_execution_role()

# Retrieve configuration values for the S3 bucket and prefix
bucket = config.s3.bucket
prefix = config.s3.prefix
s3_bucket_base_uri = config.s3.s3_bucket_base_uri


{'total_max_attempts': 61, 'mode': 'legacy'}


# Functions

In [None]:
def job_name(jobname):
    """
    Generate a unique job name for an Amazon SageMaker job based on a given 'jobname' and current timestamp.

    Args:
        jobname (str): A descriptive name for the job.

    Returns:
        str: A unique job name incorporating the 'jobname' and timestamp.
    """
    return f"MultiLabelClassification-{jobname}--{time.strftime('%Y%m%d%H%M%S', time.gmtime())}"


def upload_code(bucket_name, prefix_name, script_location):
    """
    Upload code or script to an Amazon S3 bucket for use in SageMaker.

    Args:
        bucket_name (str): The name of the S3 bucket where the code will be uploaded.
        prefix_name (str): The prefix or directory within the S3 bucket where the code will be stored.
        script_location (str): The local path to the code or script file.

    Returns:
        str: The S3 URI of the uploaded code.
    """
    return sagemaker_session.upload_data(
        script_location,
        bucket=bucket_name,
        key_prefix=f"{prefix_name}/{script_location}",
    )

## Data Preprocessing <a class="anchor" id="data-preprocessing"></a>
- Describe how you preprocess and clean the data.

In [265]:
# Define the location of the preprocessing script within the project
PREPROCESSING_SCRIPT_LOCATION = "preprocess/code"

# Create the output path for preprocessing results using the S3 bucket base URI and prefix
output_preprocess = "{}/{}".format(s3_bucket_base_uri, config.s3.prefix)


In [266]:
# Upload the preprocessing script to an S3 location and get the S3 URI
input_code_preprocess = sagemaker_session.upload_data(
    PREPROCESSING_SCRIPT_LOCATION,
    bucket=bucket,
    key_prefix=f"{prefix}/{PREPROCESSING_SCRIPT_LOCATION}",
)

In [266]:
# Define a list of ProcessingInput objects for the preprocessing job
inputs_preprocess = [
    ProcessingInput(
        source=f"{config.s3.s3_bucket_base_uri}/{config.s3.input}",
        destination="/opt/ml/processing/input",
        input_name="input-data",
    ),
    ProcessingInput(
        source=input_code_preprocess,
        destination="/opt/ml/processing/input/code",
        input_name="code",
    ),
    ProcessingInput(
        source=f"s3://{bucket}/{prefix}/{PREPROCESSING_SCRIPT_LOCATION}/config",
        destination="/opt/ml/processing/input/config",
        input_name="code-config",
    ),
]

In [266]:
# Define a list of ProcessingOutput objects for the preprocessing job
outputs_preprocess = [
    ProcessingOutput(
        source="/opt/ml/processing/train",
        destination=output_preprocess,
        output_name="train_data",
    ),
    ProcessingOutput(
        source="/opt/ml/processing/test",
        destination=output_preprocess,
        output_name="test_data",
    ),
    ProcessingOutput(
        source="/opt/ml/processing/labels",
        destination=output_preprocess,
        output_name="labels_data",
    ),
]

In [267]:
# Create a descriptive name for the preprocessing step, including a timestamp.
preprocessing_step_name = f"Multilabel Classification - Preprocessing Step {time.strftime('%Y%m%d%H%M%S', time.gmtime())}"


# Define an Amazon SageMaker SKLearnProcessor with custom settings.
def sklearn_processor(instance_type="ml.m5.xlarge"):
    """
    Create an SKLearnProcessor instance for Amazon SageMaker processing jobs.

    Args:
        instance_type (str): The Amazon SageMaker instance type for processing jobs.

    Returns:
        sagemaker.processing.SKLearnProcessor: An instance of the SKLearnProcessor.
    """
    return SKLearnProcessor(
        framework_version="1.2-1",
        role=role,  # Ensure the 'role' variable is defined and appropriate.
        instance_type=instance_type,
        instance_count=1,
        # max_runtime_in_seconds=1200,  # Uncomment and customize if needed.
    )


# Define the SageMaker processing step for data preprocessing.
processing_step = ProcessingStep(
    preprocessing_step_name,
    processor=sklearn_processor(),
    job_name=execution_input["PreprocessingJobName"],
    inputs=inputs_preprocess,  # Define your input data configuration here.
    outputs=outputs_preprocess,  # Define your output data configuration here.
    container_entrypoint=["python3", "/opt/ml/processing/input/code/run.py"],
    container_arguments=[
        "--train-test-split-ratio",
        "0.2",
    ],  # Uncomment and customize if needed.
)

## Model Training <a class="anchor" id="model-training"></a>
- Explain the machine learning models, algorithms, and training process.

In [271]:
# Define a list of metric definitions for model evaluation
metric_definitions = [
    {"Name": "eval_loss", "Regex": "'eval_loss': ([0-9]+(.|e\-)[0-9]+),?"},
    {"Name": "eval_accuracy", "Regex": "'eval_accuracy': ([0-9]+(.|e\-)[0-9]+),?"},
    {"Name": "eval_f1", "Regex": "'eval_f1': ([0-9]+(.|e\-)[0-9]+),?"},
    {"Name": "eval_roc", "Regex": "'eval_roc': ([0-9]+(.|e\-)[0-9]+),?"},
    {"Name": "eval_roc_auc", "Regex": "'eval_roc_auc': ([0-9]+(.|e\-)[0-9]+),?"},
    {"Name": "eval_precision", "Regex": "'eval_precision': ([0-9]+(.|e\-)[0-9]+),?"},
    {"Name": "eval_recall", "Regex": "'eval_recall': ([0-9]+(.|e\-)[0-9]+),?"},
    {"Name": "eval_runtime", "Regex": "'eval_runtime': ([0-9]+(.|e\-)[0-9]+),?"},
    {"Name": "eval_samples_per_second", "Regex": "'eval_samples_per_second': ([0-9]+(.|e\-)[0-9]+),?"},
    {"Name": "epoch", "Regex": "'epoch': ([0-9]+(.|e\-)[0-9]+),?"},
]

# Define instance volume configurations based on instance type
instance_volume = {
    "ml.g4dn.16xlarge": 900,
    "ml.g4dn.8xlarge": 500,
    "ml.g4dn.4xlarge": 225,
    "ml.g4dn.2xlarge": 225,
    "ml.g4dn.xlarge": 125,
}

# Define training parameters such as epochs, batch size, evaluation steps, instance type, and volume size
params = {
    "epochs": 1,
    "train-batch-size": 8,
    "eval_steps": 1,
    "instance_type": "ml.g4dn.4xlarge",
    "volume_size": 125,
}


In [269]:
# Extract the ProcessingOutputConfig from the processing step's parameters
output_config = processing_step.fields["parameters"]["ProcessingOutputConfig"]

# Initialize variables to store the S3 URIs for preprocessed data
preprocessed_training_data = None
preprocessed_test_data = None
preprocessed_labels_data = None

# Iterate through the list of outputs in the ProcessingOutputConfig
for output in output_config["Outputs"]:
    if output["OutputName"] == "train_data":
        # Set the S3 URI for preprocessed training data
        preprocessed_training_data = os.path.join(
            output["S3Output"]["S3Uri"], "train.csv"
        )
    if output["OutputName"] == "test_data":
        # Set the S3 URI for preprocessed test data
        preprocessed_test_data = os.path.join(output["S3Output"]["S3Uri"], "test.csv")
    if output["OutputName"] == "labels_data":
        # Set the S3 URI for preprocessed labels data
        preprocessed_labels_data = os.path.join(
            output["S3Output"]["S3Uri"], "labels.csv"
        )



's3://ck-qa-pldatascience/aws_mlOps/sagemaker-pipeline/stepfunctions/test.csv'

In [272]:
def generate_estimator():
    # Define the S3 URIs for checkpoints and training jobs
    checkpoint_s3_uri = f"s3://{config.s3.bucket}/{config.s3.prefix}/checkpoints"
    output_path = f"s3://{config.s3.bucket}/{config.s3.prefix}/training_jobs"
    
    # Define hyperparameters for the HuggingFace estimator
    hyperparameters = {
        "model_name": "distilbert-base-uncased",
        "epochs": params["epochs"],
        "train-batch-size": params["train-batch-size"],
        "output_dir": checkpoint_s3_uri,  # Use the checkpoint S3 URI for output
        "eval_steps": params["eval_steps"],
    }

    # Create and configure the HuggingFace estimator for SageMaker
    return HuggingFace(
        entry_point="train.py",  # Entry point script for training
        source_dir="train/code",  # Source directory containing training code
        output_path=f"{output_path}/",  # Output path for storing model artifacts
        code_location=output_path,
        role=role,  # SageMaker execution role
        base_job_name=f"multi-label-classification",  # Base job name for SageMaker job
        checkpoint_s3_uri=checkpoint_s3_uri,  # Specify the checkpoint input path
        instance_type=params["instance_type"],  # SageMaker instance type for training
        instance_count=1,  # Number of training instances
        transformers_version="4.6",
        pytorch_version="1.7",
        py_version="py36",
        hyperparameters=hyperparameters,  # Hyperparameters for training
        metric_definitions=metric_definitions,  # Metric definitions for evaluation
        volume_size=instance_volume[params["instance_type"]],  # Instance volume size
        sagemaker_session=sagemaker_session,  # SageMaker session
    )


In [274]:
def generate_data():
    # Define the paths to preprocessed training, test, and labels data
    train_path = preprocessed_training_data
    test_path = preprocessed_test_data
    labels_path = preprocessed_labels_data
    
    # Create a dictionary specifying the training data sources and content types
    data = {
        "train": sagemaker.TrainingInput(train_path, content_type="text/libsvm"),
        "test": sagemaker.TrainingInput(test_path, content_type="text/libsvm"),
        "labels": sagemaker.TrainingInput(labels_path, content_type="text/libsvm"),
    }

    return data

def generate_training_step(instance="ml.g4dn.2xlarge"):
    # Generate a unique job name based on the current timestamp
    jobname = f"multi-label-classification--{time.strftime('%Y%m%d%H%M%S', time.gmtime())}"
    
    # Create a TrainingStep in the Step Functions workflow
    training_step = steps.TrainingStep(
        f"Trainning -- instance {instance}",  # Step name
        estimator=generate_estimator(),  # Use the HuggingFace estimator
        data=generate_data(),  # Specify training data sources
        job_name=jobname,  # Job name for SageMaker training job
        wait_for_completion=True,  # Wait for training job to complete
    )
    
    return training_step, jobname


In [277]:
# Generate a SageMaker TrainingStep using the 'generate_training_step' function
training_step, training_job_name = generate_training_step(params["instance_type"])

# Create a ModelStep to save the trained model
model_step = steps.ModelStep(
    "Save model",  # Step name
    model=training_step.get_expected_model(),  # Use the model from the training step
    model_name=training_job_name,  # Specify the model name
    instance_type=params["instance_type"],  # SageMaker instance type for model deployment
)


Using provided s3_resource


# Create WorkFlow

In [285]:
from stepfunctions.workflow import Workflow

In [294]:
# Add catch states to handle failures for processing, training, and model steps
processing_step.add_catch(catch_state_processing)
training_step.add_catch(catch_state_training)
model_step.add_catch(catch_state_save_model)

In [294]:
# Create a workflow graph by chaining the processing, training, model, endpoint config, and endpoint steps
workflow_graph = Chain([processing_step, training_step, model_step, endpoint_config_step, endpoint_step])

In [294]:
# Create a Step Functions workflow named 'MlOpsWorkflow' using the defined graph and execution role
branching_workflow = Workflow(
    name=job_name("MlOpsWorkflow"),  # Specify the workflow name
    definition=workflow_graph,  # Use the workflow graph as the definition
    role=workflow_execution_role,  # Specify the execution role
)

In [294]:
# Create the Step Functions workflow
branching_workflow.create()

# Execute WorkFlow & Deploy

In [294]:
# Execute the Step Functions workflow with input parameters
execution = branching_workflow.execute(
    inputs={
        "PreprocessingJobName": job_name("PreprocessingJobName"),
        "TrainingJobName": job_name("TrainingJobName"),
        # Additional input parameters can be provided here if needed
    }
)

In [294]:
# Get the output of the workflow execution (non-blocking, does not wait for completion)
execution.get_output(wait=False)

# Prediction

In [None]:
from sagemaker.huggingface import HuggingFacePredictor

# Create a predictor for your SageMaker endpoint
predictor = HuggingFacePredictor(
    endpoint_name="multi-label-classification--20230922090633",
    sagemaker_session=sagemaker_session,
)

# Define the input data as a list of strings
inputs = ["Streamlining Machine Learning Model Deployment with CI/CD and MLOps"]

# Create the payload as a JSON object
payload = {
    "inputs": inputs,
    "parameters": {"return_all_scores": True},
}

# Make predictions
predictions = predictor.predict(payload)

# Print the predictions
print(predictions)
