# Locust Load Testing for Embeddings Models

## Machine requirements

This notebook should be run on a machine with at least 1 Nvidia GPU, or 1 AWS AI chip. 

In [None]:
import os
import subprocess
p = subprocess.run('nvidia-smi --list-gpus | wc -l', 
                   shell=True, check=True, capture_output=True, encoding='utf-8')

device = None
num_device = 0

if p.returncode == 0:
    num_device = int(p.stdout)
    device = "cuda" if num_device > 0 else None

if device is None:
    p = subprocess.run('neuron-ls -j | grep neuron_device | wc -l', 
                       shell=True, check=True, capture_output=True, encoding='utf-8')
    if p.returncode == 0:
        num_device = int(p.stdout)
        device = "neuron" if num_device > 0 else None

assert (device == "cuda" and num_device >= 1) or (device == "neuron" and num_device >= 1), \
    "Machine must have 1 Nvidia CUDA devices, or 1 AWS Neuorn Devices"
print(f"Auto detected {num_device} {device} devices")
os.environ['NUM_DEVICE']=str(num_device)

Next, install required Python packages.

In [None]:
!pip install --upgrade pip
!pip install locust
!pip install datasets
!which locust

### Create Hugging Face User Access Token

Many of the popular Large Language Models (LLMs) in Hugging Face are [gated models](https://huggingface.co/docs/hub/en/models-gated). To access gated models, you need a Hugging Face [user access token](https://huggingface.co/docs/hub/en/security-tokens). Please create a Hugging Face user access token in your Hugging Face account, and set it below in `hf_token` variable below. 

In [None]:
import subprocess
import time
import os
import stat
import yaml
import json

hf_token=''
# Comment out next line if not using a Hugging Face gated model
assert hf_token, "Hugging Face user access token is required for gated models"
os.environ['HF_TOKEN']=hf_token

### Build Docker Containers

Next, we build the docker containers used to run the inference endpoint locally on this desktop.


In [None]:
! source build-containers-embeddings.sh

### Specify Hugging Face Model Id

Next, set the Hugging Face Model Id for the embeddings model you want to test in `hf_model_id` variable, below. 

The variable `MAX_MODEL_LEN` should be set to the *minimum* of the maximum context length allowed by the model, and the maximum context length you want to use for your testing.

In [None]:
hf_model_id = 'BAAI/bge-large-en-v1.5'
os.environ['MAX_MODEL_LEN']=str(512)

### Snapshot Huggingface model

Below we snapshot the Huggingface model and store it on the EFS. This is only done once. To force a refresh of the model from Huggingface hub, you must delete the local copy of the model from the EFS.

To use EFS, we create a symbolic link from `/home/ubuntu/snapshots` to `/home/ubuntu/efs/home/snapshots` directory. Please ensure `/home/ubuntu/efs/home` exists and is owned by user `ubuntu`.

In [None]:
from huggingface_hub import snapshot_download, list_repo_files
from tempfile import TemporaryDirectory
from pathlib import Path
import shutil
import pwd

home_dir = os.path.join(os.getenv('HOME'))
efs_home = os.path.join(home_dir, "efs", "home")

assert os.path.isdir(efs_home), f"{efs_home} directory must exist"

stat_info = os.stat(efs_home)
owner_uid = stat_info.st_uid
owner_username = pwd.getpwuid(owner_uid).pw_name
assert owner_username == "ubuntu", f"{efs_home} must be owned by ubuntu"
efs_snapshots = os.path.join(efs_home, "snapshots")
os.makedirs(efs_snapshots, exist_ok=True)
if not os.path.exists(os.path.join(home_dir, "snapshots")):
    os.symlink(efs_snapshots, os.path.join(home_dir, "snapshots")) # create a symbolic link to EFS directory

hf_home = os.path.join(home_dir, "snapshots", "huggingface")
os.makedirs(hf_home, exist_ok=True)

model_path = os.path.join(hf_home, hf_model_id)

hub_files = set(list_repo_files(repo_id=hf_model_id, repo_type="model"))
# Get local files (recursively)
local_files = set()
missing_files=None
for root, dirs, files in os.walk(model_path):
    for file in files:
        # Get relative path from model root
        local_files.add(os.path.relpath(os.path.join(root, file), model_path))
    
# Compare
missing_files = hub_files - local_files

if missing_files:
    print(f"Downloading HuggingFace model snapshot: {hf_model_id}")
    os.makedirs(model_path, exist_ok=True)
    with TemporaryDirectory(suffix="model", prefix="hf", dir="/tmp") as cache_dir:
        snapshot_download(repo_id=hf_model_id, cache_dir=cache_dir, token=hf_token)
        local_model_path = Path(cache_dir)
        model_snapshot_path = str(list(local_model_path.glob(f"**/snapshots/*/"))[0])
        print(f"Model snapshot: {model_snapshot_path} completed")
        
        print(f"Copying model snapshot files to EFS...")
        for root, dirs, files in os.walk(model_snapshot_path):
            for file in files:
                full_path = os.path.join(root, file)
                relative_path = f"{full_path[len(model_snapshot_path)+1:]}"
                dst_path = os.path.join(model_path, relative_path)
                dst_dir = os.path.dirname(dst_path)
                os.makedirs(dst_dir, exist_ok=True)
                print(f"Copying {os.path.basename(full_path)}")
                shutil.copyfile(full_path, dst_path)


os.environ['MODEL_ID']=model_path[len(home_dir):] # docker container volume mounts snapshots at /snapshots
print(f"MODEL_ID={os.environ['MODEL_ID']}")

## Configure Dynamic Batch Size

Set `MAX_NUM_SEQS` to maximum [dynamic batch](https://docs.nvidia.com/deeplearning/triton-inference-server/user-guide/docs/user_guide/batcher.html#delayed-batching) size. This is an advanced setting, and its maximum value depends on available accelerator memory, and `max_queue_delay_microseconds` setting. The default value of `max_queue_delay_microseconds` is set to `1000`. If you increase `MAX_NUM_SEQS` above `8` you may also need to increase `max_queue_delay_microseconds`.

See [triton-embeddings-cuda.sh](..//triton-server/embeddings/triton-embeddings-cuda.sh) and [tirton-embeddings-neuroxn.sh](../triton-server/embeddings/triton-embeddings-neuronx.sh) for setting `max_queue_delay_microseconds`.

In [None]:
os.environ['MAX_NUM_SEQS']=str(4)

### Specify Inference Server and Backend

Next, specify `inference_server`, and `backend` variables, below. This notebook supports [Triton Inference Server](https://github.com/triton-inference-server/server) with `embeddings` backend that uses a [custom python](../triton-server/embeddings/triton_embeddings_backend.py) backend for Triton Inference Server.

In [None]:
inference_server = 'triton_inference_server'
backend = 'embeddings'

print(f"Using '{inference_server}' inference server with '{backend}' backend")

### Launch Inference Server

Next we use Docker compose to launch the inference server locally on this desktop.

In [None]:
script_map = {
    "triton_inference_server": {
        "embeddings": {
            "cuda": "../triton-server/embeddings/compose-triton-embeddings-cuda.sh",
            "neuron": "../triton-server/embeddings/compose-triton-embeddings-neuronx.sh"
        }
    }
}

script_path = script_map[inference_server][backend][device]
! {script_path} down
! {script_path} up

## Locust Testing

### Load Configuration

Below. we load the appropriate configuration file for the specified inference server, and backend.

In [None]:
path = [ "config", f"{inference_server}-{backend}.yaml" ]

config_path=os.path.join(*path)
with open(config_path, "r") as mf:
    config=yaml.safe_load(mf)

print(json.dumps(config, indent=2))

### Verify inference server is up

The inference server may take several minutes to start up. Next, we verify the inference server is up. Do not proceed to next cell until inference server is up.

In [None]:
import requests
try:
    response = requests.get(config['endpoint_url'])
    response_code = int(response.status_code)
    assert (response_code == 405) or (response_code == 424), f"Inference server is not yet up: {response_code}"
    print("Inference server is up!")
except:
    print("Inference server is not yet up")

### Validate Configuration

The configuration file specifies a prompt generator module. The module is dynamically loaded, and is invoked iteratively by the Locust endpoint user (see `endpoint_user.py`) to get next prompt to drive Locust testing.

Let us validate our configuration by making a single request and inspecting the response.

In [None]:
from importlib import import_module
import re
from pprint import pprint
import sys

def get_prompt_generator():
    prompt_module_dir = config['module_dir']
    sys.path.append(prompt_module_dir)
    
    prompt_module_name = config['module_name']
    prompt_module=import_module(prompt_module_name)
    
    prompt_generator_name = config['prompt_generator']
    prompt_generator_class = getattr(prompt_module, prompt_generator_name)

    return prompt_generator_class()()
      
def fill_template(template: dict, template_keys:list, inputs:list) -> dict:
        
    assert len(template_keys) == len(inputs), f"template_keys: {template_keys}, prompts: {inputs}"
    for i, template_key in enumerate(template_keys):
        _template = template
        keys = template_key.split(".")
        for key in keys[:-1]:
            m = re.match(r'\[(\d+)\]', key)
            if m:
                key = int(m.group(1))
            _template = _template[key]

        _template[keys[-1]] = inputs[i]
    
    return template

def inference_request():
    prompt_generator = get_prompt_generator()
    inputs = next(prompt_generator)
    inputs = [inputs] if isinstance(inputs, str) else inputs

    template = config['template']
    assert template is not None

    template_keys = config['template_keys']
    assert template_keys is not None
    
    if "model" in template_keys:
        inputs.insert(0, os.environ['MODEL_ID'])
    data = fill_template(template=template, template_keys=template_keys, inputs=inputs)

    body = json.dumps(data).encode("utf-8")
    pprint(body)
    headers = {"Content-Type":  "application/json"}
    response = requests.post(config['endpoint_url'], data=body, headers=headers)
    return response

response = inference_request()
assert int(response.status_code) == 200, f"Response status code {response.status_code} != 200"
pprint(f"Endpoint validation successful")

### Run Locust Load Testing

The Locust load testing below uses 32 users with 32 workers to drive concurrent load, and by default, is set to run for 60 seconds. You can adjust these values as needed. Keep `SPAWN_RATE` the same as `USERS` to drive maximum concurrency.

In [None]:
path = [ "config", f"{inference_server}-{backend}.yaml" ]

config_path=os.path.join(*path)
with open(config_path, "r") as mf:
    config=yaml.safe_load(mf)
    
ts = round(time.time() * 1000)

os.environ["MODEL"] = os.environ['MODEL_ID']
os.environ["PROMPT_MODULE_DIR"] = config['module_dir']
os.environ["PROMPT_MODULE_NAME"] = config['module_name']
os.environ["PROMPT_GENERATOR_NAME"] = config['prompt_generator']
os.environ["TEMPLATE"] = json.dumps(config.get('template', {}))
os.environ["TEMPLATE_KEYS"] = json.dumps(config.get('template_keys', []))
os.environ["CONTENT_TYPE"]="application/json"
os.environ["ENDPOINT_NAME"] = config['endpoint_url']
os.environ["USERS"]="32"
os.environ["WORKERS"]="32"
os.environ["RUN_TIME"]="120s"
os.environ["SPAWN_RATE"]="32"
os.environ["SCRIPT"]="endpoint_user.py"
results_locust_path = os.path.join("output", "locust-testing")
os.environ["RESULTS_PREFIX"]=f"{results_locust_path}/results-{ts}"

try:
    with open("run_locust.log", "w") as logfile:
        print(f"Start Locust testing; logfile: run_locust.log; results: {results_locust_path}")
        path = os.path.join(os.getcwd(), "run_locust.sh")
        os.chmod(path, stat.S_IRUSR | stat.S_IEXEC)
        process = subprocess.Popen(path, encoding="utf-8", 
                                shell=True,stdout=logfile,stderr=subprocess.STDOUT)
        process.wait()
        logfile.flush()
        print(f"Locust testing completed")
except Exception as e:
    print(f"exception occurred: {e}")


## Visualize Locust Results

Below we first visualize the results of the Locust testing in a tabel. 

In [None]:
import pandas as pd
from IPython.display import display
import numpy as np

results_path = os.environ["RESULTS_PREFIX"] + "_stats.csv"
df = pd.read_csv(results_path)
df = df.replace(np.nan, '')

top_n = 1
caption=f"Locust results"
df = df.truncate(after=top_n - 1, axis=0)
df = df.style \
      .format(precision=6) \
        .set_properties(**{'text-align': 'left'}) \
        .set_caption(caption)
display(df)

### Shutdown inference server

Next, we shutdown inference server.

In [None]:
! {script_path} down