# üè∞ Citadel Governance Hub - Testing Center

## Test your governance hub managed models, enabled through Azure API Management!

Use this Jupyter notebook with Python code snippets to verify proper functionality of your governance hub managed models when accessed through AI Gateway features in Azure API Management (APIM) part of AI Citadel Governance Hub.

> **Note:** This notebook assumes you have already set up your Citadel Governance Hub and have models deployed and managed through it. If you haven't done so, please refer to the [Citadel Governance Hub Deployment Guide](../guides/full-deployment-guide.md) or [Citadel Governance Hub Quick Deployment Guide](../guides/quick-deployment-guide.md) before proceeding.

<a id='0'></a>
### ‚öôÔ∏è Initialize client tool for your APIM service

üëâ An existing Azure AI Foundry API is expected to be already configured on APIM

In [None]:
import sys, json, requests
sys.path.insert(1, '../shared')  # add the shared directory to the Python path
import utils
from apimtools import APIMClientTool

inference_api_version = "2024-05-01-preview"

targetInferenceApi = "models" # use 'models' for universal LLM API, or 'openai' for Azure OpenAI

try:
    apimClientTool = APIMClientTool(
        "rg-ai-hub-citadel-dev-01" ## specify the resource group name where the API Management resource is located, or optionally add another parameter with the apim_resource_name
    )
    apimClientTool.initialize()
    apimClientTool.discover_api(targetInferenceApi) # use 'models' for inference API 'openai' for Azure OpenAI

    apim_resource_gateway_url = str(apimClientTool.apim_resource_gateway_url)
    azure_endpoint = str(apimClientTool.azure_endpoint)

    api_key = apimClientTool.apim_subscriptions[1].get("key") # Ensure that you have created a subscription in APIM

    # Get supported models from the policy fragment
    supported_models = apimClientTool.get_policy_fragment_supported_models("set-backend-pools")
    utils.print_info(f"Supported models in APIM policy fragment 'set-backend-pools': {supported_models}")
    # model_name = supported_models[2]  # pick the third model from the supported models in the policy fragment

    if targetInferenceApi == "openai":
        chat_completions_url = f"{azure_endpoint}openai/deployments/{model_name}/chat/completions?api-version={inference_api_version}"
    else:  # models
        chat_completions_url = f"{azure_endpoint}models/chat/completions?api-version={inference_api_version}"
    utils.print_info(f"Chat Completion Endpoint: {chat_completions_url}")

    utils.print_ok(f"Testing tool initialized successfully!")
except Exception as e:
    utils.print_error(f"Error initializing APIM Client Tool: {e}")



<a id='requests'></a>
### üß™ Test the API using a direct HTTP call


In [None]:
model_name = supported_models[2] # pick the model from the supported models in the policy fragment
utils.print_info(f"Using model: {model_name}")

api_key = apimClientTool.apim_subscriptions[6].get("key") # Ensure that you have created a subscription in APIM

messages={"model": model_name, "messages":[
    {"role": "system", "content": "You are a sarcastic, unhelpful assistant."},
    {"role": "user", "content": "Can you tell me the time, please?"}
]}


response = requests.post(chat_completions_url, headers = {'api-key':api_key}, json = messages)
utils.print_response_code(response)
utils.print_info(f"headers {response.headers}")
utils.print_info(f"x-ms-region: {response.headers.get("x-ms-region")}") # this header is useful to determine the region of the backend that served the request
if (response.status_code == 200):
    data = json.loads(response.text)
    print("üí¨ ", data.get("choices")[0].get("message").get("content"))
else:
    utils.print_error(response.text)

In [None]:
import sys, json, requests
sys.path.insert(1, '../shared')  # add the shared directory to the Python path
import utils
from apimtools import APIMClientTool

inference_api_version = "2024-05-01-preview"

targetInferenceApi = "models" # use 'models' for universal LLM API, or 'openai' for Azure OpenAI

try:
    apimClientTool = APIMClientTool(
        "rg-ai-hub-citadel-dev-01" ## specify the resource group name where the API Management resource is located, or optionally add another parameter with the apim_resource_name
    )
    apimClientTool.initialize()
    apimClientTool.discover_api(targetInferenceApi) # use 'models' for inference API 'openai' for Azure OpenAI

    apim_resource_gateway_url = str(apimClientTool.apim_resource_gateway_url)
    azure_endpoint = str(apimClientTool.azure_endpoint)

    api_key = apimClientTool.apim_subscriptions[1].get("key") # Ensure that you have created a subscription in APIM

    # Get supported models from the policy fragment
    supported_models = apimClientTool.get_policy_fragment_supported_models("set-backend-pools")
    utils.print_info(f"Supported models in APIM policy fragment 'set-backend-pools': {supported_models}")
    model_name = supported_models[2]  # pick the third model from the supported models in the policy fragment

    if targetInferenceApi == "openai":
        chat_completions_url = f"{azure_endpoint}openai/deployments/{model_name}/chat/completions?api-version={inference_api_version}"
    else:  # models
        chat_completions_url = f"{azure_endpoint}models/chat/completions?api-version={inference_api_version}"
    utils.print_info(f"Chat Completion Endpoint: {chat_completions_url}")

    utils.print_ok(f"Testing tool initialized successfully!")
except Exception as e:
    utils.print_error(f"Error initializing APIM Client Tool: {e}")



<a id='requests'></a>
### üß™ Send multiple requests within one minute to surpass the established token rate limit


In [None]:
import requests, json, time

# Run for 1 minute (60 seconds)
api_runs = []
start_time = time.time()
run_count = 0
messages={"messages":[
    {"role": "system", "content": "You are a sarcastic, unhelpful assistant."},
    {"role": "user", "content": "Can you tell me the time, please?"}
]}

model_name = supported_models[0] # pick the model from the supported models in the policy fragment

api_key = apimClientTool.apim_subscriptions[6].get("key") # Ensure that you have created a subscription in APIM
chat_completions_url = f"{azure_endpoint}/openai/deployments/{model_name}/chat/completions?api-version={inference_api_version}"

print(f"üïê Starting API calls for 1 minute...")
print(f"Start time: {time.strftime('%H:%M:%S', time.localtime(start_time))}")

while (time.time() - start_time) < 60:  # Run for 60 seconds
    run_count += 1    
    call_start_time = time.time()
    response = requests.post(chat_completions_url, headers = {'api-key':api_key}, json = messages)
    elapsed_time = time.time() - start_time
    
    if (response.status_code == 200):
        print(f"‚ñ∂Ô∏è Run: {run_count} | {elapsed_time:.1f}s | status: {response.status_code} ‚úÖ")
        data = json.loads(response.text)
        total_tokens = data.get("usage").get("total_tokens")
        print(f"    consumed tokens: {response.headers.get('consumed-tokens')}, remaining tokens: {response.headers.get('remaining-tokens')}")
    else:
        print(f"‚ñ∂Ô∏è Run: {run_count} | {elapsed_time:.1f}s | status: {response.status_code} ‚õî")        
        print(f"    error: {response.text}")
        total_tokens = 0
    
    api_runs.append((call_start_time, total_tokens, response.status_code))
    time.sleep(0.1) # Small delay to prevent overwhelming the API

end_time = time.time()
total_duration = end_time - start_time
print(f"\nüèÅ Completed {run_count} API calls in {total_duration:.1f} seconds")
print(f"Average rate: {run_count / total_duration:.2f} calls/second")


<a id='plot'></a>
### üîç Analyze Token Rate limiting results


In [None]:
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
from matplotlib.lines import Line2D

if 'api_runs' in locals() and api_runs:
    calls = [(t - api_runs[0][0], tokens or 0, status) for t, tokens, status in api_runs]
    capacity = 1000
    refill = capacity / 60
    bucket = capacity
    last_time = 0.0
    times, usage, status_codes, levels = [], [], [], []

    for call_time, tokens, status in calls:
        bucket = min(capacity, bucket + (call_time - last_time) * refill)
        levels.append(bucket)
        times.append(call_time)
        usage.append(tokens)
        status_codes.append(status)
        bucket = max(0, bucket - tokens)
        last_time = call_time

    colors = ['tab:green' if code == 200 else 'tab:red' if code == 429 else 'tab:orange' for code in status_codes]
    fig, ax1 = plt.subplots(figsize=(14, 6))
    ax2 = ax1.twinx()

    ax1.bar(times, usage, color=colors, width=0.35, alpha=0.7)
    ax2.plot(times, levels, color='purple', linewidth=2)
    ax2.axhline(capacity, color='purple', linestyle='--', alpha=0.6)

    throttled_times = [t for t, code in zip(times, status_codes) if code == 429]
    throttled_usage = [u for u, code in zip(usage, status_codes) if code == 429]
    if throttled_times:
        max_usage = max(usage) if usage else 0
        throttled_marker_heights = [u + max_usage * 0.01 for u in throttled_usage]
        ax1.scatter(throttled_times, throttled_marker_heights, marker='o', s=20, color='darkred', edgecolors='white', linewidth=0.4, zorder=6)

    ax1.set_xlabel('Seconds')
    ax1.set_ylabel('Tokens per call')
    ax2.set_ylabel('Tokens in bucket')
    ax1.set_title('Token bucket behaviour over 60 seconds')

    legend_items = [
        Patch(facecolor='tab:green', alpha=0.7, label='Success (200)'),
        Line2D([0], [0], color='purple', linewidth=2, label='Bucket level'),
        Line2D([0], [0], color='purple', linestyle='--', label='Capacity'),
        Line2D([0], [0], marker='o', color='darkred', markersize=8, linestyle='None',
                markerfacecolor='darkred', markeredgecolor='white', label='Throttled (429)')
    ]
    ax1.legend(handles=legend_items, loc='upper right', bbox_to_anchor=(0.98, 0.85), framealpha=0.9)

    success = sum(code == 200 for code in status_codes)
    throttled = sum(code == 429 for code in status_codes)
    print(f"Calls: {len(status_codes)} | Success: {success} | 429s: {throttled}")
else:
    print('Run the 60-second API test first to capture api_runs data.')

<a id='sdk'></a>
### üß™ Test with streaming using the Azure OpenAI Python SDK
With a streaming API call, the response is sent back incrementally in chunks via an [event stream](https://developer.mozilla.org/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format). In Python, you can iterate over these events with a for loop.

In [None]:
import time
from openai import AzureOpenAI
messages=[
        {'role': 'user', 'content': 'Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ...'}
]

start_time = time.time()
client = AzureOpenAI(
    azure_endpoint=chat_completions_url,
    api_key=api_key,
    api_version=inference_api_version
)

model_name = supported_models[2] # pick the model from the supported models in the policy fragment
utils.print_info(f"Using model: {model_name}")

response = client.chat.completions.with_raw_response.create(model=model_name, messages=messages, stream=True)

print("headers ", response.headers)
print("x-ms-region: ", response.headers.get("x-ms-region")) # this header is useful to determine the region of the backend that served the request
print("x-ms-stream: ", response.headers.get("x-ms-stream")) # this header is useful to determine if the response is streamed

completion = response.parse() 

# create variables to collect the stream of chunks
collected_chunks = []
collected_messages = []
# iterate through the stream of events
for chunk in completion:
    chunk_time = time.time() - start_time  # calculate the time delay of the chunk
    collected_chunks.append(chunk)  # save the event response
    if chunk.choices:
        chunk_message = chunk.choices[0].delta.content  # extract the message
        collected_messages.append(chunk_message)  # save the message
        print(f"Message received {chunk_time:.2f} seconds after request: {chunk_message}")  # print the delay and text
# print the time delay and text received
print(f"Full response received {chunk_time:.2f} seconds after request")
# clean None in collected_messages
collected_messages = [m for m in collected_messages if m is not None]
full_reply_content = ''.join(collected_messages)
print(f"Full conversation received: {full_reply_content}")



<a id='sdk'></a>
### üß™ Execute multiple runs for each subscription using the Azure OpenAI Python SDK

We will send requests for each subscription. Adjust the `sleep_time_ms` and the number of `runs` to your test scenario.


In [None]:
import time
from openai import AzureOpenAI

runs = 10
sleep_time_ms = 100

model_name = supported_models[2] # pick the model from the supported models in the policy fragment
utils.print_info(f"Using model: {model_name}")

clients = [
    AzureOpenAI(
        azure_endpoint = chat_completions_url,
        api_key = apimClientTool.apim_subscriptions[6].get("key"),
        api_version = inference_api_version
    ),
    AzureOpenAI(
        azure_endpoint = chat_completions_url,
        api_key = apimClientTool.apim_subscriptions[6].get("key"),
        api_version = inference_api_version
    ),
    AzureOpenAI(
        azure_endpoint = chat_completions_url,
        api_key = apimClientTool.apim_subscriptions[6].get("key"),
        api_version = inference_api_version
    )
]

for i in range(runs):
    print(f"‚ñ∂Ô∏è Run {i+1}/{runs}:")

    for j in range(0, 3):
        response = clients[j].chat.completions.create(
            model = model_name,
            messages = [
                {"role": "system", "content": "You are a sarcastic, unhelpful assistant."},
                {"role": "user", "content": "Can you tell me the time, please?"}
            ],
            extra_headers = {"x-user-id": "alex"}
        )
        print(f"üí¨ Subscription {j+1}: {response.choices[0].message.content}")

    print()

    time.sleep(sleep_time_ms/1000)
