## Deploy Jumpstart and Non Jumpstart Models Asynchronously 
---------------------
*This notebook works best with the conda_python3 kernel on a ml.t3.medium machine*.

**This step of our solution design covers setting up the environment, downloading the requirements needed to run the environment, as well as deploying the model endpoints from the config.yml file asychronously.**

1. Prerequisite: Navigate to the file: 0_setup.ipynb and Run the cell to import and download the requirements.txt.

2. Now you can run this notebook to deploy the models asychronously in different threads. The key components of this notebook for the purposes of understanding are:

- Loading the globals.py and config.yml file.

- Setting a blocker function deploy_model to deploy the given model endpoint followed by:

- A series of async functions to set tasks to deploy the models from the config yml file asynchronously in different threads. View the notebook from the link above.

- Once the endpoints are deployed, their model configurations are stored within the endpoints.json file.


#### Import all of the necessary libraries below to run this notebook

In [None]:
# if interactive mode is set to no -> pickup fmbench from Python installation path
# if interactive mode is set to yes -> pickup fmbench from the current path (one level above this notebook)
# if interactive mode is not defined -> pickup fmbench from the current path (one level above this notebook)
# the premise is that if run non-interactively then it can only be run through main.py which will set interactive mode to no
import os
import sys
if os.environ.get("INTERACTIVE_MODE_SET", "yes") == "yes":
    sys.path.append(os.path.dirname(os.getcwd()))

In [ ]:
import sys
import time
import json
import boto3
import asyncio
import logging
import importlib.util
import fmbench.scripts
from pathlib import Path
from fmbench.utils import *
from fmbench.globals import *
from fmbench.scripts import constants
from typing import Dict, List, Optional
import importlib.resources as pkg_resources
from botocore.exceptions import ClientError
from botocore.exceptions import NoCredentialsError

#### Pygmentize globals.py to view and use any of the globally initialized variables 

#### Set up a logger to log all messages while the code runs

In [None]:
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Remove existing handlers
logger.handlers.clear()

# Add a simple handler
handler = logging.StreamHandler()
formatter = logging.Formatter('[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

### Load the config.yml file
------

The config.yml file contains information that is used across this benchmarking environment, such as information about the aws account, prompts, payloads to be used for invocations, and model configurations like the version of the model, the endpoint name, model_id that needs to be deployed. Configurations also support the gives instance type to be used, for example: "ml.g5.24xlarge", the image uri, whether or not to deploy this given model, followed by an inference script "jumpstart.py" which supports the inference script for jumpstart models to deploy the model in this deploy notebook. 

View the contents of the config yml file below and how it is loaded and used throughout this notebook with deploying the model endpoints asynchronously.

In [ ]:
## Load the config.yml file referring to the globals.py file
config = load_main_config(CONFIG_FILE)

## configure the aws region
aws_region = config['aws']['region']

logger.info(f"aws_region={aws_region}")
logger.info(f"config={json.dumps(config, indent=2)}")

#### Deploy a single model: blocking function used for asynchronous deployment

This function is designed to deploy a single large language model endpoint. It takes three parameters: experiment_config (a dictionary containing configuration details for the model deployment from the config.yml file), aws_region (the AWS region where the model will be deployed), and role_arn (the AWS role's Amazon Resource Name used for the deployment).

In [None]:
# Initialize an environment variable to check if any of the endpoints are deployed on SageMaker
# this variable is set to False by default and changed to True if the model is deployed on SageMaker
any_ep_on_sagemaker: bool = False

In [ ]:
def deploy_model(experiment_config: Dict, aws_region: str, role_arn: str) -> Optional[Dict]:
    # Log the deployment details
    logger.info(f"going to deploy {experiment_config}, in {aws_region}")
    model_deployment_result: Optional[Dict] = None

    # For Bedrock models, we don't actually deploy anything - Bedrock models are already deployed
    # We simply register them in our endpoint list
    
    # Check if deployment is enabled in the config; skip if not
    deploy = experiment_config.get('deploy', False)
    if deploy is False:
        logger.info(f"skipping deployment of {experiment_config['model_id']} because deploy={deploy}")
        model_deployment_result = dict(endpoint_name=experiment_config['ep_name'], 
                                      experiment_name=experiment_config['name'], 
                                      instance_type=experiment_config['instance_type'], 
                                      instance_count=experiment_config['instance_count'],
                                      deployed=False)
        return model_deployment_result

    # For Bedrock models, create a simple record with the endpoint info
    # This simulates "deploying" the model by registering it in our system
    try:
        # Create a deployment result with the model info
        model_deployment_result = dict(
            endpoint_name=experiment_config['ep_name'],
            experiment_name=experiment_config['name'],
            instance_type=experiment_config['instance_type'],
            instance_count=experiment_config['instance_count'],
            deployed=True
        )
        
        logger.info(f"Registered Bedrock model {experiment_config['model_id']} as {experiment_config['ep_name']}")
        return model_deployment_result
    except Exception as error:
        logger.error(f"An error occurred during Bedrock model registration: {error}")
        return None

### Asynchronous Model Deployment
----

#### async_deploy_model: 

- This is an asynchronous wrapper around the deploy_model function. It uses asyncio.to_thread to run the synchronous deploy_model function in a separate thread. This allows the function to be awaited in an asynchronous context, enabling concurrent model deployments without any blocking from the main thread

#### async_deploy_all_models Function: 

- This 'async_deploy_all_models' function is designed to deploy multiple models concurrently. It splits the models into batches and deploys each batch concurrently using asyncio.gather.

In [ ]:
## Asynchronous wrapper function to allow our deploy_model function to allow concurrent requests for deployment
async def async_deploy_model(experiment_config: Dict, aws_region: str) -> str:
    # Run the deploy_model function in a separate thread to deploy the models asychronously
    return await asyncio.to_thread(deploy_model, experiment_config, aws_region, None)

## Final asychronous function to deploy all of the models concurrently
async def async_deploy_all_models(config: Dict) -> List[Dict]:
    
    ## Extract experiments from the config.yml file (contains information on model configurations)
    experiments: List[Dict] = config['experiments']
    n: int = 4 # max concurrency so as to not get a throttling exception
    
    ## Split experiments into smaller batches for concurrent deployment
    experiments_splitted = [experiments[i * n:(i + 1) * n] for i in range((len(experiments) + n - 1) // n )]
    results = []
    for exp_list in experiments_splitted:
        
        ## send the deployment in batches
        result = await asyncio.gather(*[async_deploy_model(m, config['aws']['region']) for m in exp_list])
        ## Collect and furthermore extend the results from each batch
        results.extend(result)
    return results

In [ ]:
# async version
s = time.perf_counter()

## Call all of the models for deployment using the config.yml file model configurations
endpoint_names = await async_deploy_all_models(config)

## Set a timer for model deployment counter
elapsed_async = time.perf_counter() - s
print(f"endpoint_names -> {endpoint_names}, deployed in {elapsed_async:0.2f} seconds")

In [ ]:
## Function to get all of the information on the deployed endpoints and store it in a json
def get_all_info_for_endpoint(ep: Dict) -> Dict:
    try:
        ## extract the endpoint name
        ep_name = ep['endpoint_name']        
        ## extract the experiment name from the config.yml file
        experiment_name = ep['experiment_name']
        
        if ep_name is None:
            return None
            
        # For Bedrock models, construct endpoint info directly
        logger.info(f"ep_name={ep_name} is a Bedrock model endpoint")
        info = dict(
            experiment_name=experiment_name,
            endpoint={'EndpointName': ep_name},
            instance_type=ep['instance_type'],
            instance_count=ep['instance_count'],
            deployed=ep['deployed'],
            model_config=None
        )
        return info
    except Exception as e:
        logger.error(f"Error processing endpoint {ep_name}: {str(e)}")
        return None

all_info = list(filter(None,
                  list(map(get_all_info_for_endpoint,
                             list(filter(None,
                                          endpoint_names))))))

## stores information in a dictionary for collectively all of the deployed model endpoints
all_info

In [None]:
# Convert data to JSON
json_data = json.dumps(all_info, indent=2, default=str)

# Specify the file name
file_name = "endpoints.json"

# Write to S3
endpoint_s3_path = write_to_s3(json_data, config['aws']['bucket'], MODELS_DIR, "", file_name)

logger.info(f"deployed endpoint info is written to this file --> {endpoint_s3_path}")

In [None]:
# check if we needed to deploy at least one endpoint and none got deployed
# and if that is so then raise an Exception because we cannot run any infernece
# so no point in continuing further
expected_deploy_count: int = len([e for e in config['experiments'] if e.get('deploy', True) is True])
actual_deploy_count: int = len([ep for ep in all_info if ep.get('deployed') is True])
assert_text: str = f"expected_deploy_count={expected_deploy_count} but actual_deploy_count={actual_deploy_count}, cannot continue"
assert expected_deploy_count == actual_deploy_count, assert_text