# **ZenFlow AI: Real-Time Model Updating and Custom Monitoring Dashboard**

## **Project Overview**
ZenFlow AI is a sophisticated MLOps project designed to showcase real-time model updating and custom monitoring capabilities. The project integrates continuous model deployment with real-time monitoring to ensure optimal performance and timely updates.

### **1. Real-Time Model Updating Pipeline**
   * **Objective:** Automatically update and redeploy the model when new data is available, ensuring that the model remains accurate and relevant.
   * **Pipeline Steps:**
     * **Data Ingestion:** Implemented in the `check_new_data` step, this step periodically checks for new data, which triggers model retraining if data is detected.
     * **Model Training:** Implemented in the `train_model` step, where the Hugging Face model is retrained on the new data.
     * **Deployment:** Implemented in the `deploy_model` step, which redeploys the updated model to AWS SageMaker.
     * **Triggering Mechanism:** Currently, the pipeline is manually triggered, but can be extended with event-based triggers like AWS Lambda for automation.

### **2. Custom Monitoring Dashboard**
   * **Objective:** Provide real-time monitoring and alerting for the deployed model using a custom dashboard.
   * **Dashboard Components:**
     * **Monitoring Setup:** Integrated with AWS CloudWatch to gather real-time metrics like Latency and Invocation Errors.
     * **Dashboard UI:** Built using Streamlit, this dashboard visualizes the collected metrics to give insights into the model's performance.
     * **Alerting:** Alerts can be set up using AWS CloudWatch to notify when key performance indicators (KPIs) like model accuracy or latency deviate from expected thresholds.

---

### **Detailed Pipeline Steps**

#### **Data Ingestion Step (`check_new_data`)**
This step checks if new data is available for training. It acts as a trigger for the entire pipeline, ensuring that the model is only retrained when new data is detected.

#### **Model Training Step (`train_model`)**
If new data is detected, this step retrains the Hugging Face model. The training process includes loading the dataset, tokenizing the data, and fine-tuning the model using the new data.

#### **Deployment Step (`deploy_model`)**
Once the model is trained, it is automatically deployed to AWS SageMaker. This step ensures that the latest version of the model is always in production.




In [1]:
# Connect Google drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [10]:
# Import necessary libraries and packages
import os
import boto3
import datetime
import streamlit as st
import pandas as pd

from zenml import pipeline, step
from sagemaker import Session
from transformers import PreTrainedModel, AutoModelForSequenceClassification, AutoTokenizer, Trainer, TrainingArguments
from datasets import load_dataset

from sagemaker.huggingface import HuggingFaceModel
from typing import Optional

### **Step 1: Set Up AWS Credentials and Initialize S3 Client**

Before interacting with AWS services, you need to configure your credentials. This is done by setting environment variables for your AWS access key, secret access key, and default region. These credentials are required to authenticate your requests to AWS.

Additionally, an S3 client using `boto3` is intialized, a Python SDK for AWS. This client will be used to upload and manage files in Amazon S3, a storage service.

Replace the placeholder values with your actual AWS credentials and region.


In [3]:
# Set up AWS credentials
os.environ['AWS_ACCESS_KEY_ID'] =  # Replace with your AWS access key ID
os.environ['AWS_SECRET_ACCESS_KEY'] =  # Replace with your AWS secret access key
os.environ['AWS_DEFAULT_REGION'] =  # Replace with your AWS region

# Initialize S3 client for uploading and managing file in S3
s3_client = boto3.client('s3')

INFO:botocore.credentials:Found credentials in environment variables.


[1;35mFound credentials in environment variables.[0m


### **Step 2: Check for New Data Availability**

In this step, the pipeline checks if new data is available for training. The `check_new_data` function simulates this process by returning `True`, indicating that new data is available. This is a placeholder logic, and in a real-world scenario, you would implement a more complex check (e.g., querying a database or an API) to determine if new data has been added since the last training session.

This step is crucial to ensure that the model is updated only when necessary.


In [4]:
# Define a step to check for new data to see if it's available for training
@step
def check_new_data() -> bool:
    """
    Step to check if new data is available.

    Returns:
        bool: True if new data is detected, otherwise False.
    """
    new_data_available= True  # Placeholder logic to simulate new data detection
    return new_data_available

### **Step 3: Model Training**

In this step, the pipeline trains a machine learning model using the Hugging Face `DistilBERT` model. The `train_model` function is designed to be executed only if new data is detected. The process includes:

- **Loading a Pre-Trained Model:** The `DistilBERT` model and tokenizer are loaded from Hugging Face's model hub.
- **Data Preparation:** A dataset (GLUE MRPC) is loaded and tokenized for training.
- **Training Process:** The model is fine-tuned using the `Trainer` class, which handles the training loop and optimization.
- **Output:** The fine-tuned model is returned, ready for deployment.

If no new data is available, this step is skipped, and no model is trained.


In [5]:
# Define a step for model training
@step
def train_model(new_data_available: bool) -> Optional[PreTrainedModel]:
    """
    Step to train the model if new data is available.

    Args:
        new_data_available (bool): Indicates if new data is available for training.

    Returns:
        model: The trained model or None if no training occurred.
    """
    if new_data_available:
        # Load a pre-trained DistilBERT model and tokenizer from Hugging Face
        model_name= 'distilbert-base-uncased'
        model= AutoModelForSequenceClassification.from_pretrained(model_name)
        tokenizer= AutoTokenizer.from_pretrained(model_name)

        # Load and preprocess dataset (using GLUE MRPC as an example)
        dataset= load_dataset('glue', 'mrpc', split= 'train[:10%]')
        def tokenize(batch):
            return tokenizer(batch['sentence1'], batch['sentence2'], padding= True, truncation= True)
        dataset= dataset.map(tokenize, batched= True)

        # Define training arguments
        training_args= TrainingArguments(
            output_dir= './results',
            num_train_epochs= 1,
            per_device_train_batch_size= 8,
            logging_dir= './logs',
        )

        # Initialize Trainer and start fine-tuning the model
        trainer= Trainer(model= model, args= training_args, train_dataset=dataset)
        trainer.train()

        return model  # Return the fine-tuned model
    return None  # Return None if no training occurred

### **Step 4: Model Deployment to AWS SageMaker**

This step handles the deployment of the trained model to AWS SageMaker, making it accessible for real-time predictions. The `deploy_model` function includes:

- **SageMaker Session Initialization:** A SageMaker session is set up in the specified AWS region.
- **IAM Role Definition:** The SageMaker execution role is defined, providing necessary permissions.
- **Model Setup:** The Hugging Face model is configured for deployment, specifying the S3 path and model configurations.
- **Deployment:** The model is deployed to an endpoint, making it ready for inference.

If no model is provided, deployment is skipped.


In [6]:
# Define a step to deploy the model to AWS SageMaker
@step
def deploy_model(model: Optional[PreTrainedModel]):
    """
    Step to deploy the trained model to AWS SageMaker.

    Args:
        model (Optional[PreTrainedModel]): The trained model to be deployed. If None, deployment is skipped.
    """
    if model:

        # Set up SageMaker session
        region= 'us-west-2'  # AWS region
        sagemaker_session= Session(boto_session= boto3.Session(region_name= region))

        # Define SageMaker IAM role with necessary permissions
        role= 'arn:aws:iam::339713129438:role/ZenFlowSageMakerExecutionRole' # Replace with your IAM role ARN

        # Set up the Hugging Face model for deployment in SageMaker
        huggingface_model= HuggingFaceModel(
            model_data='s3://sagemaker-us-west-2-339713129438/model.tar.gz', # Path to your model in S3
            role= role,
            transformers_version= '4.6',
            pytorch_version= '1.7',
            py_version= 'py36',
            sagemaker_session= sagemaker_session
        )

        # Deploy the model to an endpoint
        predictor= huggingface_model.deploy(
            initial_instance_count= 1,
            instance_type= 'ml.m5.large' # Specify instance type for deployment
        )
        print(f'Model deployed to SageMaker with endpoint name: {predictor.endpoint_name}')
    else:
        print('No model to deploy.') # If no model is passed, skip deployment

In [7]:
!zenml init

[1;35mNumExpr defaulting to 2 threads.[0m
[?25l[1;35mInitializing the ZenML global configuration version to 0.64.0[0m
[1;35mCreating database tables[0m
[32m⠋[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠙[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠹[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠸[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠼[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠴[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠦[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠧[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠇[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠏[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠋[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠙[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠹[0m Initializing ZenML repository at /cont

### **Step 5: ZenFlow AI Pipeline Integration**

The `zenflow_pipeline` function orchestrates the entire MLOps process by integrating all the key steps:

- **Data Ingestion:** First, it checks if new data is available using the `check_new_data` step.
- **Model Training:** If new data is found, the `train_model` step is executed to fine-tune the model.
- **Model Deployment:** The fine-tuned model is then deployed to AWS SageMaker using the `deploy_model` step.

The pipeline is executed directly, streamlining the workflow from data ingestion to deployment.


In [8]:
# The Zenflow AI pipeline integrates data ingestion, model training and deployment
@pipeline
def zenflow_pipeline():

  # Define the steps in the pipeline
  new_data_available= check_new_data()
  model= train_model(new_data_available= new_data_available)
  deploy_model(model= model)

# Run the pipeline directly
zenflow_pipeline()

[1;35mInitiating a new run for the pipeline: [0m[1;36mzenflow_pipeline[1;35m.[0m


Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



[1;35mRegistered new pipeline: [0m[1;36mzenflow_pipeline[1;35m.[0m
[1;35mExecuting a new run.[0m
[1;35mUsing user: [0m[1;36mdefault[1;35m[0m
[1;35mUsing stack: [0m[1;36mdefault[1;35m[0m
[1;35m  artifact_store: [0m[1;36mdefault[1;35m[0m
[1;35m  orchestrator: [0m[1;36mdefault[1;35m[0m
[1;35mYou can visualize your pipeline runs in the [0m[1;36mZenML Dashboard[1;35m. In order to try it locally, please run [0m[1;36mzenml up[1;35m.[0m
[1;35mStep [0m[1;36mcheck_new_data[1;35m has started.[0m
[1;35mStep [0m[1;36mcheck_new_data[1;35m has finished in [0m[1;36m0.176s[1;35m.[0m
[1;35mStep [0m[1;36mcheck_new_data[1;35m completed successfully.[0m
[1;35mStep [0m[1;36mtrain_model[1;35m has started.[0m
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
Yo

config.json:   0%|          | 0.00/483 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/268M [00:00<?, ?B/s]

Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Downloading readme:   0%|          | 0.00/35.3k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/649k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/75.7k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/308k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/3668 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/408 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/1725 [00:00<?, ? examples/s]

Map:   0%|          | 0/367 [00:00<?, ? examples/s]

Step,Training Loss


[33mNo materializer is registered for type [0m[1;36m<class 'transformers.models.distilbert.modeling_distilbert.DistilBertForSequenceClassification'>[33m, so the default Pickle materializer was used. Pickle is not production ready and should only be used for prototyping as the artifacts cannot be loaded when running with a different Python version. Please consider implementing a custom materializer for type [0m[1;36m<class 'transformers.models.distilbert.modeling_distilbert.DistilBertForSequenceClassification'>[33m according to the instructions at [0m[34mhttps://docs.zenml.io/how-to/handle-data-artifacts/handle-custom-data-types[33m[0m
[1;35mStep [0m[1;36mtrain_model[1;35m has finished in [0m[1;36m4m4s[1;35m.[0m
[1;35mStep [0m[1;36mtrain_model[1;35m completed successfully.[0m
[1;35mStep [0m[1;36mdeploy_model[1;35m has started.[0m
INFO:botocore.credentials:Found credentials in environment variables.
[1;35mFound credentials in environment variables.[0m
INFO:

PipelineRunResponse(body=PipelineRunResponseBody(created=datetime.datetime(2024, 8, 11, 20, 37, 6, 98331), updated=datetime.datetime(2024, 8, 11, 20, 43, 44, 359033), user=UserResponse(body=UserResponseBody(created=datetime.datetime(2024, 8, 11, 20, 36, 54, 373711), updated=datetime.datetime(2024, 8, 11, 20, 36, 54, 373717), active=True, activation_token=None, full_name='', email_opted_in=None, is_service_account=False, is_admin=True), metadata=None, resources=None, id=UUID('46ce3031-cb92-4678-b1ef-18406e2d61cc'), permission_denied=False, name='default'), status=<ExecutionStatus.COMPLETED: 'completed'>, stack=StackResponse(body=StackResponseBody(created=datetime.datetime(2024, 8, 11, 20, 36, 53, 617449), updated=datetime.datetime(2024, 8, 11, 20, 36, 53, 617454), user=None), metadata=None, resources=None, id=UUID('2e597a81-c2a9-4d99-bd22-f8fa92e92f16'), permission_denied=False, name='default'), pipeline=PipelineResponse(body=PipelineResponseBody(created=datetime.datetime(2024, 8, 11, 2

### **Step 6: Integrating CloudWatch Monitoring**

In this step, a `boto3` client to connect to AWS CloudWatch is initialized, which allows for retrieval and monitoring of SageMaker metrics. The `get_cloudwatch_metrics` function fetches specific metrics, such as latency, from CloudWatch.

- **CloudWatch Initialization:** The `cloudwatch` client is set up to interact with the AWS CloudWatch service.
- **Metric Retrieval Function:** The `get_cloudwatch_metrics` function retrieves data for a given metric, which is then printed out. This function can be customized for different metrics and namespaces and can be customized depending on which metrics you want to follow and monitor.


In [12]:
# Initialize the boto3 client for cloudwatch
cloudwatch= boto3.client('cloudwatch', region_name= 'us-west-2')

# Function to get CloudWatch metrics
def get_cloudwatch_metrics(metric_name, namespace, period= 300, start_time= None, end_time= None):
  if start_time is None:
    start_time= datetime.datetime.utcnow() - datetime.timedelta(hours= 1)
  if end_time is None:
    end_time= datetime.datetime.utcnow()

  response= cloudwatch.get_metric_statistics(
      Namespace= namespace,
      MetricName= metric_name,
      StartTime= start_time,
      EndTime= end_time,
      Period= period,
      Statistics= ['Average']
  )

  return response['Datapoints']

# Test with a simple metric (replace with a metric that should have data)
data= get_cloudwatch_metrics('Latency', 'AWS/SageMaker')
print(data)

[]
