<br>
<a href="https://www.nvidia.com/en-us/training/">
    <div style="width: 55%; background-color: white; margin-top: 50px;">
    <img src="https://dli-lms.s3.amazonaws.com/assets/general/nvidia-logo.png"
         width="400"
         height="186"
         style="margin: 0px -25px -5px; width: 300px"/>
</a>
<h1 style="line-height: 1.4;"><font color="#76b900"><b>Rapid Application Development<br>using Large Language Models</b></h1>
<h2><b>Notebook 6.5: [Advanced]</b> Running A GenAI Server</h2>
<br>

This notebook complements **Notebook 6** by providing a dedicated notebook - and hence a dedicated event loop - to kickstart a series of GenAI microservices. This notebook will enable the local model APIs that will be used through the rest of the course, and features some more advanced tangents for those interested. 

As a prelude to **multi-GPU scaled deployment**, this notebook provides an oversimplified but functionally-sufficient example of an **on-device deployment workflow** which can be used to offer a standardized API for a consumer-grade or on-device deployment strategy. Since this course uses a single A100 instance and has many notebooks, you will run into problems if any one notebook overallocates model resources or fails to clear cache. As such, this is your change to deploy some smaller models once and call them as necessary. 

> **NOTE:** This notebook is an advanced tangent, should be ran to facilitate some later feature sets, and can be optionally understood if you have the time and interest. **Do not feel pressured to understand everything in this notebook and halt your progress because of it!**

> **NOTE:** Before running this notebook, we recommend shutting down all active kernel instances from notebooks 5 and below. This notebook is intended to commandeer the majority of your A100 resources to launch multiple models. 

<hr>
<br>

## **Part 1:** Making A Custom Server For SDXL with FastAPI

To start, we can make a server to wrap the SDXL API you experimented with at the end of Notebook 5. This is intended as an example application derived largely from the HuggingFace documented example ([narrative](https://huggingface.co/docs/diffusers/main/en/using-diffusers/create_a_server) and [code](https://github.com/huggingface/diffusers/blob/main/examples/server/server.py)), and provides a peek behind the curtain of inference server deployments without exploring too much of the complexity. 

**Warning:** FastAPI and server development is not within the scope of this class, and serves as a simplified example for those interested. Feel free to run the service as-is and assume the API will roughly work and operate reliably enough to fulfill the exercises, but we do not recommend using this solution for production. If you are interested in productionalizing an image generation server, consider using:
- [Stable Diffusion NIM offerings](https://build.nvidia.com/stabilityai/stable-diffusion-xl) for self-hosted options.
- [Automatic1111-style](https://github.com/NVIDIA/Stable-Diffusion-WebUI-TensorRT/tree/main) and [ComfyUI-style](https://github.com/comfyanonymous/ComfyUI_TensorRT) Stable-Diffusion WebUIs for streamlined accelerated deployments. 
- Hosted APIs like those offered by OpenAI and StabilityAI for non-self-hosted options.
- Inference server projects like [vLLM](https://github.com/vllm-project/vllm) for inspirations on productionalized deployments. 

#### **Note The Following Features:**
- **This is sufficient for a background server that mimics OpenAI's Dalle API for a single-user use-case.**
- It roughly follows OpenAI's Dalle API with minimal functionality of generating a novel image.
- Once the server is running, other services can call it via a port interface without having to reload or re-construct the resource allocated in the server. 

#### **Note The Following Critical Limitations:**
- **It is explicitly not threadsafe!** This is outlined in the documentation narrative, and is important for multi-user use-cases. This server is only sufficient for a single user with relatively little allowed concurrency. 
- **There is no meaningful model discovery endpoint,** and it is generally non-trivial for an external user to understand how to use this server without digging into the code. This is omitted for brevity, since you have full access to the code.
- **The server saves the file directly to a temporary file on the host system.** This is simplified for convenience, and is sufficient for meaningful interactions in a local deployment context.
    - [OpenAI](https://platform.openai.com/docs/guides/images) would save such an image to a temporary publicly-accessible address. The caller can download the image and operate on it as usual.
    - [NVIDIA NIM microservices](https://docs.api.nvidia.com/nim/reference/stabilityai-stable-diffusion-xl-infer), and thereby `build.nvidia.com`, returns a buffer in its response which can be decoded and interpretted as an image by the caller.

In [1]:
%%writefile sdxl_app.py
import asyncio
import logging
import os
import random
import traceback
import uuid
from contextlib import asynccontextmanager
from time import time
from typing import Optional

import aiohttp
import torch
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from diffusers import DiffusionPipeline

#########################################################################################
## Configuration and Logging Setup; global settings and checks
#########################################################################################
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("TextToImageService")
MODEL_PATH = os.getenv("MODEL_PATH", "stabilityai/stable-diffusion-xl-base-1.0")
IMAGE_DIR = os.path.join("/dli/task/generated_images")
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

#########################################################################################
## FastAPI Application Setup
#########################################################################################
app = FastAPI()  # Initialize FastAPI app
globals = {      # Shared global resources
    "pipeline": None,
}

#########################################################################################
## Lifespan Context Manager: Dictates what all happens to construct and tear down server
#########################################################################################

@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    Manages application lifespan:
    - Initializes shared resources (e.g., HTTP client, model pipeline).
    - Cleans up resources on shutdown.
    """
    if DEVICE != "cuda":
        raise RuntimeError("CUDA device is required for this application.")
    os.makedirs(IMAGE_DIR, exist_ok=True)  # Ensure image directory exists
    session = aiohttp.ClientSession()
    try:
        logger.info("Loading model on CUDA...")
        globals["pipeline"] = DiffusionPipeline.from_pretrained(
            MODEL_PATH, torch_dtype=torch.float16, use_safetensors=True, variant="fp16",
        ).to(DEVICE)
        globals["pipeline"].enable_xformers_memory_efficient_attention()
        # Perform a warm-up operation to force memory allocation
        dummy_prompt = "A blank test image" 
        generator = torch.Generator(device=DEVICE).manual_seed(0)
        _ = globals["pipeline"](dummy_prompt, num_inference_steps=1, generator=generator)
        ## Announce server is ready, list basic route info
        logger.info("Server Ready. Routes:")
        [logger.info(f" - {r} ({getattr(r, 'description', '')})") for r in app.routes]
        yield  ## Yields scope to allow server to run. On shutdown, will get scope back
    finally:
        await session.close()
        logger.warning("Application shutdown complete.")

app.router.lifespan_context = lifespan

#########################################################################################
## API Endpoints. These functions can be called from any user with access to server port
#########################################################################################

@app.get("/")
@app.post("/")
@app.options("/")
async def base():
    """Root endpoint for a simple health check or welcome message."""
    return {"message": "Welcome to Diffusers! Use this service to generate images from prompts."}

class TextToImageInput(BaseModel):
    """Schema for image generation input."""
    model: str
    prompt: str
    size: Optional[str] = None
    n: Optional[int] = None

@app.post("/v1/images/generations")
async def generate_image(image_input: TextToImageInput):
    """Endpoint to generate an image from a given prompt using the shared diffusion pipeline."""
    try:
        logger.info(f"[Request] {image_input}")
        if globals["pipeline"] is None:
            raise HTTPException(status_code=500, detail="Model pipeline is not initialized")
        generator = torch.Generator(device=DEVICE)
        generator.manual_seed(random.randint(0, 10000000))
        # Run pipeline in a thread pool to avoid blocking the event loop
        loop = asyncio.get_event_loop()
        pipe_kws = {**image_input.__dict__, "generator": generator}
        pipe_kws["num_images_per_prompt"] = pipe_kws.get("n") or 1
        start_time = time()
        output = await loop.run_in_executor(None, lambda: globals["pipeline"](**pipe_kws))
        end_time = time() - start_time
        image_urls = [save_image(img) for img in output.images]
        logger.info(f" - Generated {image_urls} over {end_time:.4f}s")
        return {"data": [{"url": url} for url in image_urls]}
    except Exception as e:
        logger.error(f"Error during image generation: {str(e)}")
        trace = f"{str(e)}\n{traceback.format_exc()}"
        raise HTTPException(status_code=500, detail=f"Image generation failed: {trace}")

def save_image(image) -> str:
    """Saves the generated image to the designated directory and returns its public URL."""
    filename = f"draw_{uuid.uuid4().hex[:8]}.png"
    image_path = os.path.join(IMAGE_DIR, filename)
    logger.info(f"Saving image to {image_path}")
    image.save(image_path)
    return image_path
        
#########################################################################################
## Standalone Execution for Local Development
#########################################################################################

if __name__ == "__main__":
    logger.info("Starting application...")
    uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)

Writing sdxl_app.py


In [2]:
# !pip uninstall -y pynvml
# !pip install --upgrade nvidia-ml-py langchain-nvidia-ai-endpoints

<hr>
<br>

## **Part 2:** Getting Pre-Made Standardized Servers Using vLLM

For a higher layer of abstraction, many people like to use inference services like [vLLM](https://github.com/vllm-project/vllm) to help deploy scalable and standardized LLM APIs. This framework automatically integrated with the Transformers model repository to download model weights and deserialize from config to pipeline. At the same time, it also offers abstractions for memory management, multi-user deployment, and API standards to wrap a resilient server around the loaded model resource. 

Using vLLM, we will deploy two small models that can run from within this notebook on our A100 resource:

#### **1. A small vision language model ([SmolVLM-Instruct](https://huggingface.co/HuggingFaceTB/SmolVLM-Instruct))**.

This model, developed by HuggingFace TB Research Team for small language model use-cases, uses the multimodal fusion principles from Notebook 5 to perform joint reasoning with images and text. At the same time, it adopts the standard OpenAI-style API to largely interoperate with the likes of [**OpenAI's GPT-4o**](https://openai.com/index/hello-gpt-4o/), [**NVIDIA's NVLM**](https://arxiv.org/abs/2409.11402), and the open-sourced [**Llama 3.2 (2024)**](https://ai.meta.com/blog/llama-3-2-connect-2024-vision-edge-mobile-devices/).

To deploy an inference server, the following command will work in our environment:

```sh
vllm serve HuggingFaceTB/SmolVLM-Instruct
--max_model_len 8192
--gpu-memory-utilization 0.5
--enforce-eager
--max-num-seqs 16
--port 9002
```

#### **2. A small language model with (minimum-viable) tooling support ([Llama-3.2-3B](https://huggingface.co/meta-llama/Llama-3.2-3B-Instruct))**.

This model, developed by the META AI research team, offers a small language model that has been trained, to some extend, to abide by function calling to some extend. Of note, this model is not usually recommended for function-calling out-of-the-box without some much-needed fine-tuning, but it can be used sufficiently for our simpler use-cases. 

To deploy an inference server, the following command will work in our environment:

```sh
vllm serve unsloth/Llama-3.2-3B-Instruct 
--enable-auto-tool-choice
--tool-call-parser llama3_json
--chat-template tool_chat_template_llama3.2_json.jinja
--max_model_len 8192
--gpu-memory-utilization 0.4
--max-num-seqs 16
--port 9001

## NOTE: we're using unsloth secondarily because of some nice optimizations they use, 
## and primarily just to make it easier for you to load in the model without a HF token.
## Note that you are still bound by the llama usage/general license agreement during use:
## https://huggingface.co/meta-llama/Llama-3.2-3B-Instruct/blob/main/USE_POLICY.md
```

Note that these parameterizations are not meant to be recommendations, nor are they necessarily always good. Furthermore, there is still opportunities for resource contests and other issues. For productionalization, we highly recommend using NIM, which is a higher layer of abstraction over vLLM and the NVIDIA Triton acceleration framework.



<details> 
    
<summary><b>Argument Details</b></summary>

- **Port (9001 / 9002):** Defines the specific network port on which the model’s server listens. This lets other processes communicate with our server and send requests to it.
- **Max Model Length (8192):** Sets the upper limit for the number of input tokens the model can process in a single operation, affecting the model's ability to handle longer sequences.
    - Impacts resource allocation (since KV cache space needs to be allocated), increases maximum per-call inference time/resource use, and opens up opportunies for derailment. 
- **GPU Memory Utilization (0.4 / 0.5):** Dictates the percentage of the total available GPU memory that each model instance can use, ensuring that some GPU resources are left over for other processes.
    - You may notice that our diffusion model server does not have such a configuration... because this a VLLM-specific parameter which is tightly paired with its engine wrapper. Implementing this for diffusion would need to be done manually.
    - You may also note that 0.4 + 0.5 + diffusion resources (.17 usually) > 1. This tends to be ok in our specific context because the models require excess memory during construction. An actual productionalized solution should enforce stronger bounds if resilience is required.
- **Max Number of Sequences (16):** Controls how many sequences the model can process simultaneously, optimizing both computational efficiency and response time during parallel requests.
- **Enforce Eager:** Forces the model to execute operations immediately and in a sequential order, which simplifies debugging and execution traceability.
- **Chat Template:** Implements a Jinja template for standardizing API responses, facilitating consistent and predictable interactions with the model.
    - In reality, all LLMs that offer a a chat completions API have this format, which helps to convert an intuitive "message" format to "LLM input" format. It's important here because the version that is passed in [implemented here and advertised to work well for Llama 3.2](https://github.com/vllm-project/vllm/blob/main/examples/tool_chat_template_llama3.2_json.jinja), defines several non-default edge cases that enable things like function calling. 
- **Auto Tool Choice:** Employs automated logic to select the appropriate tools for function parsing and execution, adapting to the model's capabilities without manual configuration. This will be explored in more detail in **Notebook 7**

</details>

<hr>
<br>

## **Part 2.5:** BONUS: Wrangling These Servers With A Router

Reading more into the OpenAI API standard, you will notice the following server-level requirements: 
- You should be able to discover what the models accessible to you from the server.
- You should be able to communicate with the server in a standard and perdictable method.

The is very useful because you can build software around these endpoints that adjusts itself to the available model pool and invokes the endpoints in a predictable fashion. However, we currently have two different endpoints for our two LLM models. This is a bit of a hinderance for calling these endpoints, as we can see in this invocation example:

```python
from langchain_nvidia_ai_endpoints import ChatNVIDIA
import os 

## Saying Hello World to our model, ChatNVIDIA would auto-infer the name from discovery
llm = ChatNVIDIA(model="unsloth/Llama-3.2-3B-Instruct", base_url="http://0.0.0.0:9001/v1")
llm = ChatNVIDIA(model="HuggingFaceTB/SmolVLM-Instruct", base_url="http://0.0.0.0:9002/v1")
llm = ChatNVIDIA(base_url="http://0.0.0.0:9001/v1")  ## equivalent
vlm = ChatNVIDIA(base_url="http://0.0.0.0:9002/v1")  ## equivalent

llm.invoke("Hello World!")
```

Wouldn't it be nicer if we could have them all aggregate under a single server?

```python
## But wouldn't it be nicer if we could have both models come from the same connector?
os.environ["NVIDIA_BASE_URL"] = "http://0.0.0.0:9004/v1"
llm = ChatNVIDIA(model="unsloth/Llama-3.2-3B-Instruct")  ## equivalent
vlm = ChatNVIDIA(model="HuggingFaceTB/SmolVLM-Instruct")  ## equivalent
```

Below, we define a basic router application which should help with this process. This is merely for convenience, but illustrates how one could aggregate multiple servers under one umbrella. This is, in fact, what `build.nvidia.com` does under the hood, and is a gateway to understand the `llm_client` microservice for those interested.

In [3]:
%%writefile router_app.py
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import Response, StreamingResponse, JSONResponse
from functools import partial
import httpx
import json
import os
import traceback

# Initialize FastAPI application
app = FastAPI()

###################################################################################
## Configuration and Defaults

DEFAULTS = {
    "TIMEOUT": 60,  # Request timeout in seconds
    "URLS": "0.0.0.0:9001,0.0.0.0:9002,llm_client:9000",  # Backend URLs
    "FILTER_KEYWORDS": "meta,mistral,nvidia,llama-3.2,huggingfacetb",  # Filter keywords
}

def get_var(key: str) -> str:
    """Fetch a configuration variable, falling back to defaults."""
    return os.getenv(key, DEFAULTS.get(key, ""))

def get_urls() -> list:
    """Parse and normalize backend URLs."""
    return [url if "://" in url else f"http://{url.strip()}" for url in get_var("URLS").split(",")]

def get_filter_keywords() -> list:
    """Parse filter keywords for model filtering."""
    return [kw.strip().lower() for kw in get_var("FILTER_KEYWORDS").split(",")]

###################################################################################
## Utilities

async def fetch_models_from_server(server_url: str) -> dict:
    """Fetch available models from a single backend server."""
    try:
        async with httpx.AsyncClient(timeout=int(get_var("TIMEOUT"))) as client:
            response = await client.get(f"{server_url}/v1/models")
            if response.status_code == 200:
                return {"server": server_url, "models": response.json().get("data", [])}
            return {"error": f"Failed to fetch models from {server_url} with status {response.status_code}"}
    except Exception as e:
        traceback.print_exc()
        return {"error": f"Exception during model discovery from {server_url}: {str(e)}"}

async def discover_models() -> tuple:
    """
    Discover models from all backend servers.
    Returns:
        - aggregated_models: List of models with metadata.
        - errors: List of errors encountered during discovery.
    """
    tasks = [fetch_models_from_server(server) for server in get_urls()]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    aggregated_models, errors = [], []
    for server, result in zip(get_urls(), results):
        if isinstance(result, dict) and "models" in result:
            # Tag models with their originating server
            for model in result["models"]:
                model["server"] = server
                aggregated_models.append(model)
        elif isinstance(result, dict) and "error" in result:
            errors.append(result["error"])
        else:
            errors.append(f"Unexpected result from {server}: {result}")
    return aggregated_models, errors

def filter_models(models: list) -> list:
    """Filter models based on predefined keywords."""
    keywords = get_filter_keywords()
    return [model for model in models if any(kw in model.get("id", "").lower() for kw in keywords)]

###################################################################################
## Endpoints

@app.get("/v1/models")
async def list_models():
    """List all available models from all backend servers."""
    try:
        models, errors = await discover_models()
        filtered_models = filter_models(models)
        response = {"object": "list", "data": filtered_models}
        if errors:
            response["warnings"] = errors
        return JSONResponse(content=response)
    except Exception as e:
        traceback.print_exc()
        raise HTTPException(status_code=500, detail=f"Exception during model discovery: {str(e)}")

Client = partial(httpx.AsyncClient, timeout=httpx.Timeout(60))

async def create_completion_base(request: Request, extension: str):
    """Forwards chat completion requests to OpenAPI endpoint."""
    try:
        # Parse incoming request content
        content = await request.body()
        content = json.loads(content.decode())
        model = content.get("model")
        stream = content.get("stream", False)

        # Prepare headers for the outgoing request
        headers = {
            key: value for key, value in request.headers.items()
            if key.lower() not in ["host", "content-length"]
        }

        # Discover the target endpoint based on the requested model
        models, _ = await discover_models()
        model_entry = next((m for m in models if m["id"] == model), None)
        if not model_entry:
            raise HTTPException(status_code=404, detail=f"Model '{model}' not found.")
        target_server = model_entry["server"]

        # Prepare the request parameters
        call_kws = {
            "url": f"{target_server}/v1/{extension}",
            "headers": headers,
            "data": json.dumps(content).encode()
        }

        ############################################################
        # Handle Non-Streaming Use Case
        if not stream:
            try: 
                async with Client() as client:
                    response = await client.post(**call_kws)
            except httpx.TimeoutException as e:
                raise HTTPException(status_code=408)

            filtered_headers = {
                key: value for key, value in response.headers.items() 
                if key.lower() not in ["content-length", "content-encoding", "transfer-encoding"]
            }

            return Response(content=response.content, status_code=response.status_code, headers=filtered_headers)

        ############################################################
        ## Simple Use Case: Streaming
        ## Create a generator to keep querying the response endpoint after initial response
        ## NOTE: This is a weird way for keeping stream client open both for a potential
        ##  initial exception raise and also as an argument to the StreamingResponse return.
        async def respond_and_stream():
            try: 
                async with Client().stream("POST", **call_kws) as response:
                    yield response
                    ## This stage only gets invoked it the response is valid + streaming is enabled
                    agen = response.aiter_bytes()
                    async for cbytes in agen:
                        yield cbytes
            except httpx.TimeoutException as e:
                raise HTTPException(status_code=408)
    
        ## Create response generator and process initial response
        agen = respond_and_stream()
        response = await agen.__anext__()
        if response.status_code != 200:
            response_bytes = await response.aread()
            content = response_bytes
            filtered_headers = {
                key: value for key, value in response.headers.items() 
                if key.lower() not in ["content-length", "content-encoding", "transfer-encoding"]
            }
            return Response(content=content, status_code=response.status_code, headers=filtered_headers)
        else: 
            return StreamingResponse(agen, media_type='text/event-stream')
        return StreamingResponse(respond_and_stream(), media_type="text/event-stream")

    except json.JSONDecodeError:
        raise HTTPException(status_code=400, detail="Invalid JSON in request body.")
    except HTTPException as e:
        raise e
    except Exception as e:
        traceback.print_exc()
        raise HTTPException(
            status_code=500,
            detail=f"An error occurred while processing the request: {str(e)}"
        )


@app.post("/v1/{path:path}")
async def handle_request(request: Request, path: str):
    """Forwards requests based on the path to the appropriate OpenAPI endpoint."""
    return await create_completion_base(request, extension=path)

###################################################################################
## Health Check

@app.get("/health")
async def health_check():
    """Simple health check endpoint."""
    return {"status": "healthy"}

###################################################################################
## Application Lifecycle

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Manage application startup and shutdown events."""
    print(f"Configured backend servers: {get_urls()}")
    print(f"Model filter keywords: {get_filter_keywords()}")
    yield
    print("Application shutdown complete.")

app.router.lifespan_context = lifespan

###################################################################################
#


Writing router_app.py


<hr>
<br>

## **Part 3**: Kickstarting The Servers In Our Notebook

We've now explained how these servers are constructed, both behind the scenes and at a surface-level. **This is an advanced topic, so it's ok if this doesn't quite stick.** What's important is that you understand that:
- **It's possible to kickstart many LLM servers that can be used by one or many users.**
- **These servers can prevent resources from being allocated multiple times over.**
- **For good compatability, these servers usually follow a standard or easy-to-use API.**

We will need these features through the rest of the course, so please run the code cell below to kickstart your environment. This will kickstart each of your models, one at a time, until :9003 gets occupied with your Llama 3.2 model deployment. Once again, this process is documented here for your own reference, and is not necessary to understand as part of the course learning objectives.

**NOTE:** 
- We recommend shutting down previous kernels to free up all of your resources for this action. 
- If you encounter an issue of freeze-up, please interrupt the kernel (square button) and try again.
- This command will produce a running log. For that reason, there is no text below the following cell. Please return back to **Notebook 6** once you're done. 

In [None]:
import asyncio
import uvicorn
import subprocess
import nest_asyncio
from colorama import Fore, Style
import traceback

# Allow nested asyncio event loops
nest_asyncio.apply()

async def run_cmd(cmd, label, color, start_event=None, end_event=None, end_trigger="Uvicorn running on"):
    if start_event:
        await start_event.wait()  # Wait for the previous process to signal it's ready
    print(f"\nSTARTING {color}[{label}]{Style.RESET_ALL}")
    process = await asyncio.create_subprocess_shell(
        " ".join(cmd.strip().split()),
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    async def read_stream(stream, color, label):
        while True:
            line = await stream.readline()
            if not line: break
            decoded_line = line.decode().strip()
            print(f"{color}[{label}]{Style.RESET_ALL} {decoded_line}")
            if end_event and end_trigger and end_trigger in decoded_line:
                end_event.set()  # Signal that this process is ready when port is shown
    stdout_task = asyncio.create_task(read_stream(process.stdout, color, label))
    stderr_task = asyncio.create_task(read_stream(process.stderr, Fore.RED, f"{label}-ERR"))
    await asyncio.wait([stdout_task, stderr_task])
    await process.wait()

async def run_llama(port=9000, **kwargs):
    await run_cmd(f"""
    vllm serve unsloth/Llama-3.2-3B-Instruct
    --enable-auto-tool-choice
    --tool-call-parser llama3_json
    --chat-template tool_chat_template_llama3.2_json.jinja
    --max_model_len 8192
    --gpu-memory-utilization 0.35
    --enforce-eager
    --port {port}
    """, label="Llama3B", color=Fore.GREEN, **kwargs) 

async def run_smolvlm(port=9000, **kwargs):
    await run_cmd(f"""
    vllm serve HuggingFaceTB/SmolVLM-Instruct
    --max_model_len 8192
    --gpu-memory-utilization 0.35
    --enforce-eager
    --port {port}
    --max-num-seqs 16
    """, label="SmolVLM", color=Fore.BLUE, **kwargs) 

async def run_sdxl(port=9000, **kwargs):
    await run_cmd(
        f"PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True uvicorn sdxl_app:app --host 0.0.0.0 --port {port} --log-level info 2>&1",
        label="SDXL1.0", color=Fore.CYAN, **kwargs) 

async def run_router(port=9000, **kwargs):
    await run_cmd(f"uvicorn router_app:app --host 0.0.0.0 --port {port} --log-level info 2>&1",
    label="ROUTER", color=Fore.MAGENTA, **kwargs) 
    
# Create event objects to coordinate the start of each process
events = [None, None]
def get_ev_pair(events=events):
    events[:2] = [events[1], asyncio.Event()]
    return {"start_event": events[0], "end_event": events[1]}

print("Starting Processes")
await asyncio.gather(
    run_llama   (9001, **get_ev_pair()),
    run_smolvlm (9002, **get_ev_pair()),
    run_sdxl    (9003, **get_ev_pair()),
    run_router  (9004, **get_ev_pair()),
)

Starting Processes

STARTING [32m[Llama3B][0m
[32m[Llama3B][0m INFO 05-27 14:46:12 api_server.py:651] vLLM API server version 0.6.5
[32m[Llama3B][0m INFO 05-27 14:46:12 api_server.py:652] args: Namespace(subparser='serve', model_tag='unsloth/Llama-3.2-3B-Instruct', config='', host=None, port=9001, uvicorn_log_level='info', allow_credentials=False, allowed_origins=['*'], allowed_methods=['*'], allowed_headers=['*'], api_key=None, lora_modules=None, prompt_adapters=None, chat_template='tool_chat_template_llama3.2_json.jinja', chat_template_content_format='auto', response_role='assistant', ssl_keyfile=None, ssl_certfile=None, ssl_ca_certs=None, ssl_cert_reqs=0, root_path=None, middleware=[], return_tokens_as_token_ids=False, disable_frontend_multiprocessing=False, enable_auto_tool_choice=True, tool_call_parser='llama3_json', tool_parser_plugin='', model='unsloth/Llama-3.2-3B-Instruct', task='auto', tokenizer=None, skip_tokenizer_init=False, revision=None, code_revision=None, tokeniz

<a href="https://www.nvidia.com/en-us/training/">
    <div style="width: 55%; background-color: white; margin-top: 50px;">
    <img src="https://dli-lms.s3.amazonaws.com/assets/general/nvidia-logo.png"
         width="400"
         height="186"
         style="margin: 0px -25px -5px; width: 300px"/>
</a>