# APIM ❤️ OpenAI

## Demo Overview

This Jupyter Notebook demonstrates the integration of Azure API Management (APIM) with Azure OpenAI services. The demo covers various aspects including initial setup, JWT authentication, semantic caching, load balancing, and content safety testing. Each section is designed to showcase specific functionalities and performance metrics of the integrated services.

## Table of Contents
1. [Initial Setup](#3)
2. [JWT Authentication Testing](#jwt-authentication-testing)
3. [Analyzing Semantic Caching](#sdk)
    - [Set 1 - Exact Matches (Control Group)](#🧪-semantic-caching-set-1-exact-matches-control-group)
    - [Set 2 - Paraphrased/Slightly Related Matches](#🧪-semantic-caching-set-2-paraphrasedslightly-related-matches)
4. [Load Balancing Test](#load-balancing-test)
    - [Weights and Priorities](#🧪-load-balancing-test-weights-and-priorities)
5. [Summary Analysis](#summary-analysis)

### Prerequisites
- [Python 3.8 or later version](https://www.python.org/) installed
- [Pandas Library](https://pandas.pydata.org/) and matplotlib installed
- [VS Code](https://code.visualstudio.com/) installed with the [Jupyter notebook extension](https://marketplace.visualstudio.com/items?itemName=ms-toolsai.jupyter) enabled
- [Azure CLI](https://learn.microsoft.com/en-us/cli/azure/install-azure-cli) installed
- [An Azure Subscription](https://azure.microsoft.com/en-us/free/) with Contributor permissions
- [Access granted to Azure OpenAI](https://aka.ms/oai/access) or just enable the mock service
- [Sign in to Azure with Azure CLI](https://learn.microsoft.com/en-us/cli/azure/authenticate-azure-cli-interactively)
- Azure API Management Resource you have access to


<a id='3'></a>
### 1️⃣ Initial Setup

In this section, we set up the environment variables and test the basic network connections to ensure everything is configured correctly. This includes loading environment variables, setting up API endpoints, and verifying the connection to the Azure OpenAI service.

In [1]:
# Initialize Logging
from helper import initialize_logging
import logging

LOG_LEVEL = logging.DEBUG
logger = initialize_logging(LOG_LEVEL)

In [2]:
# Initialize Environment Variables
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv(dotenv_path='./.env')

# Set the local env vars
apim_base_url = os.getenv('APIM_BASE_URL')
apim_api = os.getenv('APIM_API')
apim_subscription_key = os.getenv('APIM_SUBSCRIPTION_KEY').strip()
openai_api_version = os.getenv('OPENAI_API_VERSION')
openai_model_name = os.getenv('OPENAI_MODEL_NAME')
openai_deployment_name = os.getenv('OPENAI_DEPLOYMENT_NAME')
frontend_client_id = os.getenv('CLIENT_ID')
tenant_id = os.getenv('TENANT_ID')

# Set the OpenAI API URL (https://<apim_base_url>/<api>)
apim_resource_gateway_url = f"{apim_base_url}/{apim_api}"


Test the endpoint with just the APIM subscription key

In [None]:
from openai import AzureOpenAI

# Test the connection to ensure the environment variables are set correctly
# If you see a constant 404 or other errors:
#   - check to make sure the API URL suffix includes the `/openai` path
#   - check the backend config to have ssl validation enabled
try:
    client = AzureOpenAI(
        azure_endpoint=apim_resource_gateway_url, 
        api_key=apim_subscription_key, 
        api_version=openai_api_version
    )
    
    response = client.chat.completions.create(
        model=openai_model_name, 
        messages=[{"role": "system", "content": "Test connection"}]
    )
    print("Connection successful. Response received.")
    logger.debug(f"Response: {response}")

except Exception as e:
    print(f"Connection failed: {e}")
    logger.error(f"Connection failed: {e}")
    if "401" in str(e):
        print("❌❌❌ APIM requires a valid JWT access token and a valid Subscription Key. Please ensure your token/key is correct and try again. ❌❌❌")
    elif "404" in str(e):
        print("❌❌❌ The requested resource was not found. Please check the URL and endpoint configuration. ❌❌❌")
    elif "timeout" in str(e).lower():
        print("❌❌❌ The connection timed out. Please check your network connection and try again. ❌❌❌")
    

[36mDEBUG   [0m [34mResponse: ChatCompletion(id='chatcmpl-Ab58yR1arGd098ys26mxXmWB6je1d', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content='Connection successful! How can I assist you today?', refusal=None, role='assistant', audio=None, function_call=None, tool_calls=None), content_filter_results={'hate': {'filtered': False, 'severity': 'safe'}, 'protected_material_code': {'filtered': False, 'detected': False}, 'protected_material_text': {'filtered': False, 'detected': False}, 'self_harm': {'filtered': False, 'severity': 'safe'}, 'sexual': {'filtered': False, 'severity': 'safe'}, 'violence': {'filtered': False, 'severity': 'safe'}})], created=1733401272, model='gpt-4o-mini', object='chat.completion', service_tier=None, system_fingerprint='fp_04751d0b65', usage=CompletionUsage(completion_tokens=10, prompt_tokens=9, total_tokens=19, completion_tokens_details=None, prompt_tokens_details=None), prompt_filter_results=[{'prompt_index': 0,

Connection successful. Response received.


### 🔐 JWT Authentication Testing

In this section, we will test the JWT (JSON Web Token) authentication mechanism integrated with Azure API Management (APIM). JWT is a compact, URL-safe means of representing claims to be transferred between two parties. It is commonly used for authentication and authorization purposes.

[![flow](../images/access-controlling.gif)](access-controlling.ipynb)


#### Overview

The JWT authentication testing will cover the following aspects:

1. __Token Generation__: How to generate a JWT token with the necessary claims.
1. __Token Validation__: How APIM validates the JWT token to ensure it is authentic and has not been tampered with.
1. __Access Control__: How to use JWT tokens to control access to different API endpoints based on the claims within the token.

In [None]:
# Retrieve the access token using the device flow
# Requires setting the CLIENT_ID and TENANT_ID environment variables in your .env file

import msal
import requests
import json

url = apim_resource_gateway_url + "/openai/deployments/" + openai_deployment_name + "/chat/completions?api-version=" + openai_api_version

app = msal.PublicClientApplication(frontend_client_id, authority=f"https://login.microsoftonline.com/{tenant_id}")

# Initiate the device flow
flow = app.initiate_device_flow(scopes=["User.Read"])
if "user_code" not in flow:
    raise ValueError("Failed to create device flow. Error: %s" % json.dumps(flow, indent=2))

print(flow["message"])

# Acquire token by device flow
result = app.acquire_token_by_device_flow(flow)
access_token = None
if "access_token" in result:
    access_token = result['access_token']
    # Calling graph using the access token
    graph_data = requests.get(
        "https://graph.microsoft.com/v1.0/me",
        headers={'Authorization': 'Bearer ' + access_token},
    ).json()
    print("Graph API call result: %s" % json.dumps(graph_data, indent=2))
else:
    logger.debug(result.get("error"))
    logger.debug(result.get("error_description"))
    logger.debug(result.get("correlation_id"))


In [None]:
import json

messages={"messages":[
    {"role": "system", "content": "You are a sarcastic unhelpful assistant."},
    {"role": "user", "content": "Can you tell me the time, please?"}
]}

response = requests.post(url, headers = {'api-key':apim_subscription_key, 'Authorization': 'Bearer ' + access_token}, json = messages)

logging.info("status code: ", response.status_code)

if (response.status_code == 200):
    logger.info(response.headers.get("x-ms-region"))
    remaining_tokens = response.headers.get("x-ratelimit-remaining-tokens")
    remaining_requests = response.headers.get("x-ratelimit-remaining-requests")
    
    if remaining_tokens:
        logger.info(f"Remaining Tokens: {remaining_tokens}")
    if remaining_requests:
        logger.info(f"Remaining Requests: {remaining_requests}")
    
    data = json.loads(response.text)
    print("response: ", data.get("choices")[0].get("message").get("content"))
    print("🎉🎉🎉 Successfully authenticated with JWT using Entra ID! 🎉🎉🎉")
else:
    logger.error(response.text)
    print("❌❌❌ Failed to authenticate with JWT using Entra ID! ❌❌❌")

### Initialize APIClient

We will use the helper methods to send requests from now on.

In [None]:
from helper import APIClient

client = APIClient(log_level=LOG_LEVEL if 'LOG_LEVEL' in globals() else logging.INFO) 

## Backend Circuit Breaker
![flow](../images/backend-circuit-breaking.gif)

Playground to try the built-in [backend circuit breaker functionality of APIM](https://learn.microsoft.com/en-us/azure/api-management/backends?tabs=bicep) 

For the sake of the demo, we will be leveraging the apiOps toolkit to add a circuit breaker policy and test its results.
1. Append to the `backendInformation.json` of your target backend the `circuitBreaker` property like so:
    ```json
    "circuitBreaker": {
        "rules": [
            {
                "failureCondition": {
                    "count": 3,
                    "errorReasons": [
                        "Server errors"
                    ],
                    "interval": "PT5M",
                    "statusCodeRanges": [
                        {
                        "min": 429,
                        "max": 429
                        }
                    ]
                },
                "name": "myBreakerRule",
                "tripDuration": "PT1M"
            }
        ]
    }
    ```
    > Where you can define multiple circuit breaker rules and conditions such as the number of failure conditions (`count`) within a defined time `interval` and a range of status codes (`statusCodeRanges`) indicating failures.

2. Once published to APIM, ensure your API policy is routing to the backend with the circuit breaker
3. Set the token limit from the Azure OpenAI model to a small value such as 1K for testing

If the circuitbreaker is working as intended, you should see 60s delay between requests.

In [None]:
# from helper import run_queries, plot_results
runs = 10
api_runs = []  # Response Times for each run

# Set 1: Exact Matches (Control Group)
    # Expected Outcome:
	# •	Latency should be extremely low for repeated queries.
questions = [
    "What is climate change?",
    "What is climate change?",
    "What is climate change?",
    "What is climate change?",
    "What is climate change?"
]

api_runs = client.run_queries(questions, runs)
response_times = [run[0] for run in api_runs]
client.plot_results(response_times, 'Circuit Breaker Test')

### 🧪 Load Balancing Test: Weights and Priorities

![flow](../images/backend-pool-load-balancing.gif)

In this section, we test the load balancing capabilities of the Azure OpenAI service with a focus on weights and priorities. The test involves sending multiple requests to the service and monitoring the distribution of these requests across different backend servers based on their assigned weights and priorities.

#### Expected Outcome:
- Requests should be distributed according to the weights and priorities assigned to each backend server.
- Higher priority servers should handle more requests.
- Response times should be consistent, indicating effective load balancing.

#### Steps:
1. **Send Requests**: Send multiple requests to the Azure OpenAI API and record the backend server handling each request.
2. **Monitor Distribution**: Analyze the distribution of requests across the backend servers to ensure they align with the assigned weights and priorities.
3. **Plot Results**: Visualize the response times and distribution of requests across different backend servers.

#### Monitoring Load Balancing:
- **Server Distribution**: The `plot_load_balancing_results` function will show the number of requests handled by each backend server.
- **Response Times**: Consistent response times across requests indicate effective load balancing.
- **Logs and Metrics**: Use Azure Monitor and Application Insights to track logs and metrics related to request distribution and performance.

By analyzing the server distribution and response times, you can ensure that the load balancing mechanism is working effectively and that the requests are distributed according to the assigned weights and priorities.

#### Load Balancing Options:
- **Round-robin**: By default, requests are distributed evenly across the backends in the pool.
- **Weighted**: Weights are assigned to the backends in the pool, and requests are distributed across the backends based on the relative weight assigned to each backend. Use this option for scenarios such as conducting a blue-green deployment.
- **Priority-based**: Backends are organized in priority groups, and requests are sent to the backends in order of the priority groups. Within a priority group, requests are distributed either evenly across the backends, or (if assigned) according to the relative weight assigned to each backend.

For this demo, we will be referencing the backend load balancing pool config outlined in the [openai-backend-pool](../apimartifacts/backends/openai-backend-pool/backendInformation.json)

##### Steps:
1. Apply the json configuration found in [openai-backend-pool](../apimartifacts/backends/openai-backend-pool/backendInformation.json) via apiops
2. Add a retry condition policy to the backend to configure the load balancing behavior on the API
```xml
    <backend>
        <retry condition="@(context.Response.StatusCode == 429)" count="2" interval="1" first-fast-retry="true">
            <forward-request />
        </retry>
    </backend>
```
3. Run the python cell below to observe the behavior

In [None]:
api_runs = []
runs = 20
questions = [
    "What is climate change?",
    "Explain climate change.",
    "Describe the concept of climate change.",
    "Tell me about global warming and its impact on the environment.",
    "What causes climate change?",
    "How does carbon dioxide contribute to global warming?",
    "What are the effects of climate change on sea levels?"
]

api_runs = client.run_queries(questions, runs, randomize=True) 
# Define the color map
color_map = {
    'East US': 'blue',
    'North Central US': 'green',
    'West US': 'red'
    # Add more regions and their corresponding colors as needed
}

client.plot_load_balancing_results(api_runs, color_map, 'Load Balancing results')

<a id='sdk'></a>
### 🧪 Analyzing Semantic Caching

[![flow](../images/semantic-caching.gif)](semantic-caching.ipynb)


#### Instructions
1. Connect an Azure Redis Cache instance to your API Management instance:
    - Navigate to the Azure Portal.
    - Go to Deployment & Infrastructure > External Cache.
    - Follow the prompts to connect your Redis Cache instance.

2. Create a backend for your embeddings model deployment:
    - This involves setting up the backend service that will handle requests for your embeddings model.

3. Add the following policy snippet to your inbound policy in Azure API Management:
    ```xml
    <azure-openai-semantic-cache-lookup score-threshold="0.8" embeddings-backend-id="<embeddings-backend-name>" embeddings-backend-auth="system-assigned" />
    ```
    - `score-threshold`: A value between 0.0 and 1.0 that determines the similarity tolerance for cache matches. A higher score requires higher similarity.
    - `embeddings-backend-id`: The identifier for your embeddings backend service.
    - `embeddings-backend-auth`: Authentication method for the backend service, typically "system-assigned".

4. Run through each test provided in the notebook and analyze the results to ensure the caching mechanism works as expected.

For more detailed information, refer to the official documentation: [Azure API Management - How to Cache External Responses](https://learn.microsoft.com/en-us/azure/api-management/api-management-howto-cache-external)
requiring higher similarity).


#### 🧪 Semantic Caching: Set 1 - Exact Matches (Control Group)

This subsection of semantic caching is responsible for testing the first set of questions (Set 1: Exact Matches).

The purpose of this test is to evaluate the performance and accuracy of the caching mechanism when dealing with exact match queries. This set serves as the control group, providing a baseline for comparison with other sets of questions that may involve more complex query patterns.

__Expected Outcome__:
- Latency should be extremely low for repeated queries.

The results from this test will help in understanding the efficiency of the semantic caching system in handling straightforward, exact match queries and will serve as a reference point for further optimizations.


In [None]:
# from helper import run_queries, plot_results
runs = 5
api_runs = []  # Response Times for each run

# Set 1: Exact Matches (Control Group)
    # Expected Outcome:
	# •	Latency should be extremely low for repeated queries.
questions = [
    "What is climate change?",
    "What is climate change?",
    "What is climate change?",
    "What is climate change?",
    "What is climate change?"
]

api_runs = client.run_queries(questions, runs)
response_times = [run[0] for run in api_runs]
client.plot_results(response_times, 'Semantic Caching Performance (Set 1: Exact Matches)')


#### 🧪 Semantic Caching: Set 2: Paraphrased/Slightly Related Matches

This subsection of semantic caching is responsible for testing the first set of questions (Set 2: Paraphrased Matches)

In this test, we evaluate the performance and accuracy of the caching mechanism when dealing with paraphrased queries. This set helps in understanding how well the caching system handles variations in query phrasing while still retrieving relevant cached responses.

__Expected Outcome__:
- Similar questions (1, 2, 3) should hit the cache.
- Slightly reworded but related questions (4) may still hit the cache if the similarity threshold allows.
- Questions with overlapping meanings (5 and 6) might hit the cache if configured for broad similarity.
- Question 7 might generate a new response depending on threshold settings.

The results from this test will provide insights into the robustness of the semantic caching system in handling paraphrased queries and its ability to maintain low latency and high accuracy.

In [None]:
api_runs = []  # Response Times for each run
runs = 6
questions = [
    "What is climate change?",
    "Explain climate change.",
    "Describe the concept of climate change.",
    "Tell me about global warming and its impact on the environment.",
    "What causes climate change?",
    "How does carbon dioxide contribute to global warming?",
    "What are the effects of climate change on sea levels?"
]

api_runs = client.run_queries(questions, runs, randomize=False) 
response_times = [run[0] for run in api_runs]
client.plot_results(response_times, 'Semantic Caching Performance (Set 2: Paraphrased Matches)')

In [None]:
from helper import run_queries, plot_load_balancing_results

api_runs = []
runs = 20
questions = [
    "What is climate change?",
    "Explain climate change.",
    "Describe the concept of climate change.",
    "Tell me about global warming and its impact on the environment.",
    "What causes climate change?",
    "How does carbon dioxide contribute to global warming?",
    "What are the effects of climate change on sea levels?"
]

api_runs = run_queries(questions, runs, randomize=True) 
# Define the color map
color_map = {
    'East US': 'blue',
    'North Central US': 'green',
    'West US': 'red'
    # Add more regions and their corresponding colors as needed
}

plot_load_balancing_results(api_runs, color_map, 'Load Balancing results')

#### Summary Analysis:

In [None]:
import numpy as np



# Extract response times and regions from api_runs
response_times = [run[0] for run in api_runs]
regions = [run[1] for run in api_runs]

client.plot_results(response_times, 'Semantic Caching Performance (Set 2: Paraphrased Matches)')
# Calculate % distribution of load to the backend regions
region_counts = pd.Series(regions).value_counts(normalize=True) * 100
print("\n📊 Percentage distribution of load to backend regions:")
print(region_counts)

# Calculate average response times per region
region_avg_response_times = pd.DataFrame(api_runs, columns=['Response Time', 'Region']).groupby('Region').mean()
print("\n⏱️ Average response times per region:")
print(region_avg_response_times)

# Identify potential causes for long response times
threshold = np.percentile(response_times, 90)  # Define a threshold for long response times (90th percentile)
long_response_times = [(time, region) for time, region in api_runs if time > threshold]

print("\n🚨 Potential causes for long response times:")
for time, region in long_response_times:
    print(f"⏰ Response Time: {time:.2f} seconds, 🌍 Region: {region}")
    if time > 10:  # Arbitrary threshold for rate limiting/throttling
        print("  - ⚠️ Potential cause: Rate limiting/throttling or policy misconfiguration")

        # Analyze time gaps between region calls to identify potential circuit breaker events
        time_gaps = [j - i for i, j in zip(response_times[:-1], response_times[1:])]
        circuit_breaker_events = [(gap, regions[idx + 1]) for idx, gap in enumerate(time_gaps) if gap > 5]
        if circuit_breaker_events:
            print("\n🔍 Circuit Breaker Analysis based on time gaps:")
            for gap, region in circuit_breaker_events:
                print(f"⏰ Time Gap: {gap:.2f} seconds, 🌍 Region: {region}")
                print("  - ⚠️ Potential cause: Circuit breaker triggered due to high response time gap")
      
