## Get subscription

In [None]:
import requests
import json
import pandas as pd
from datetime import datetime
from azure.mgmt.subscription import SubscriptionClient
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
sub_client = SubscriptionClient(credential)
subscription = next(sub_client.subscriptions.list(), None)
if not subscription:
    raise Exception("Authenticate using the az cli")
subscriptionId = subscription.subscription_id
print(f"Using subscription {subscriptionId}")

token = credential.get_token('https://management.azure.com/.default')
headers = {'Authorization': 'Bearer ' + token.token}

## Discover regions and geography groups using the Locations API

### Regions

In [None]:
def get_regions():
    locations_request = f"https://management.azure.com/subscriptions/{subscriptionId}/locations?api-version=2021-04-01"
    response = requests.get(locations_request, headers=headers)
    data = json.loads(response.text)
    return data["value"]

regions = get_regions()

def get_region_names(regions):
    return set([r['name'] for r in regions])

regions_list = get_region_names(regions)
print(f"{len(regions_list)} regions:")
print(", ".join(regions_list))

### Geography groups

In [None]:
def get_geography_groups(regions):
    return set([region['metadata']['geographyGroup'] for region in regions if 'geographyGroup' in region['metadata']])

geography_groups = get_geography_groups(regions)
print(", ".join(geography_groups))

## Let's focus on US regions

In [None]:
def filter_by_geography_group(regions, *args):
    return filter(lambda r: r['metadata']['geographyGroup'] in args, [r for r in regions if 'geographyGroup' in r['metadata']])
regions_list = get_region_names(filter_by_geography_group(regions, 'US'))
print(", ".join(regions_list))

## Helper to parallelize calls per region

Most of the API calls we'll need to make going forward are per region and we want to aggregate results across regions and depending on how many regions you are considering, you can find yourself querying up to 90 regions. The following helper will allow us to run queries in parallel for each region

In [None]:
def parallel_map(fn, *iterables, executor=None, **kwargs):
    from concurrent.futures import ThreadPoolExecutor, as_completed
    from tqdm import tqdm

    """
    Equivalent to executor.map(fn, *iterables),
    but displays a tqdm-based progress bar.
    
    Does not support timeout or chunksize as executor.submit is used internally
    
    **kwargs is passed to tqdm.
    """
    with executor if executor else ThreadPoolExecutor(max_workers=len(regions_list)) as ex:
        futures = []
        for iterable in iterables:
            futures += [ex.submit(fn, i) for i in iterable]
        for f in tqdm(as_completed(futures), total=len(futures), **kwargs):
            yield f.result()

## Discover regions where Azure AI is supported using the Models API

In [None]:
regions_successful = []
regions_failed = []

def get_models_response(region):
    url = f"https://management.azure.com/subscriptions/{subscriptionId}/providers/Microsoft.CognitiveServices/locations/{region}/models?api-version=2023-05-01"
    return (region, requests.get(url, headers=headers))

for (region, result) in parallel_map(get_models_response, regions_list):
    if result.status_code == 200:
        regions_successful.append(region)
    else:
        regions_failed.append(region)

print("Potential Azure OpenAI regions based on control plane response:")
print(", ".join(regions_successful))

In [None]:
print("Azure OpenAI Not Supported:")
print(", ".join(regions_failed))

## Discover models and SKUs using the Models API

In [None]:
def get_models(region):
    (_, response) = get_models_response(region)
    parsed = json.loads(response.text)
    return parsed['value']

def get_models_regions(regions):
    def get_models_region(region):
        return [{
            "model": model,
            "region": region
        } for model in get_models(region)]

    import itertools
    return itertools.chain.from_iterable(parallel_map(get_models_region, regions))

models_regions = list(get_models_regions(regions_successful))
models_regions[:2]

In [None]:
region_model_data = {}  

excluded_models =  ['text-similarity-ada-001', 'text-babbage-001', 'text-curie-001', 'text-similarity-curie-001', 'text-davinci-002','text-davinci-003', 'text-davinci-fine-tune-002', 'code-davinci-002', 'code-davinci-fine-tune-002','text-ada-001', 'text-search-ada-doc-001', 'text-search-ada-query-001', 'code-search-ada-code-001','code-search-ada-text-001', 'text-similarity-babbage-001', 'text-search-babbage-doc-001','text-search-babbage-query-001', 'code-search-babbage-code-001', 'code-search-babbage-text-001', 'text-search-curie-doc-001', 'text-search-curie-query-001', 'text-davinci-001','text-similarity-davinci-001', 'text-search-davinci-doc-001', 'text-search-davinci-query-001','code-cushman-001']

for region in regions_successful:
    data_test = []

    for item in get_models(region):
        model_name = None
        version = None
        sku_name = None
        if item["model"]["capabilities"].get("scaleType") == "Manual": #skip legacy models
            continue
        model_name = item["model"]["name"]
        if model_name in excluded_models: # if in list skip
            continue
        version = item["model"]["version"]
        rdate = item["model"]["deprecation"]
        for sku in item["model"]["skus"]:
            sku_name = sku["name"]
        if sku_name == "Standard": # This example is only targeting Standard Model deployments SKUI
            data_test.append({"Model Name": model_name, "Version": version, "SKU Name": sku_name})
                #print(data_test)

    region_model_data[region] = data_test  # store the model data under corresponding region name

# Print result
#for region, model_data in region_model_data.items():
#    print(f'{region}: {model_data}')

In [None]:
rows = []
for region, models in region_model_data.items():
    for model in models:
        row = model.copy()  
        row['Region'] = region  
        rows.append(row)

df = pd.DataFrame(rows)
df = df[['Region', 'Model Name', 'Version', 'SKU Name']]
pd.set_option('display.max_rows', None)

df['Exist'] = True 
pivot_df = df.pivot_table(index='Region', columns=['Model Name', 'Version'], values='Exist', fill_value=False, aggfunc='any')
pivot_df.reset_index(inplace = True)

pivot_df

In [None]:
def infer_type(usage_name_localized_value):
    if 'Tokens Per Minute' in usage_name_localized_value:
        return 'tokens-per-minute'
    elif 'Requests Per Minute' in usage_name_localized_value:
        return 'requests-per-minute'
    elif 'Enqueued tokens' in usage_name_localized_value:
        return 'enqueued-tokens'
    else:
        return None

import re

def extract_model_info(usage):
    '''Extracts model vendor, SKU and name from the value using regular expressions.
    Value example: OpenAI.Standard.gpt-35-turbo or OpenAI.Standard.gpt-35-turbo-finetune'''
    localized = usage['name']['localizedValue'].lower()
    if not 'tokens' in localized and not 'requests' in localized: # Skip if not a token usage
        return
    # Updated pattern to match finetune suffix and classify work type
    pattern = r'(?P<vendor>\w+)\.(?P<SKU>\w+)\.(?P<name>[\w-]+?)(?:-finetune)?$'
    match = re.match(pattern, usage['name']['value'])
    if match:
        result = match.groupdict()
        # Determine work type based on the presence of the 'finetune' suffix
        result['workload'] = 'finetune' if usage['name']['value'].endswith('-finetune') else 'inference'
        return result
    return None

def extract_usage_details(region, usage):
    '''Extracts region, name, localizedValue, type, currentValue, limit, unit from the item'''
    model_info = extract_model_info(usage)
    currentValue = usage['currentValue']
    limit = usage['limit']
    if 'thousands' in usage['name']['localizedValue']:
        currentValue *= 1000
        limit *= 1000
    remaining = limit - currentValue
    return {
        'region': region,
        'value': usage['name']['value'],
        'localizedValue': usage['name']['localizedValue'],
        'type': infer_type(usage['name']['localizedValue']),
        'modelName': model_info['name'] if model_info else None,
        'vendor': model_info['vendor'] if model_info else None,
        'SKU': model_info['SKU'] if model_info else None,
        'workload': model_info['workload'] if model_info else None,
        'current': currentValue,
        'remaining': remaining,
        'limit': limit,
        'unit': usage['unit']
    }

## Discover quotas using the Usages API

In [None]:
def get_usages(region):
    url = f"https://management.azure.com/subscriptions/{subscriptionId}/providers/Microsoft.CognitiveServices/locations/{region}/usages?api-version=2023-05-01"
    response = requests.get(url, headers=headers)
    return json.loads(response.text)['value']

### Example for eastus2

In [None]:
usages = get_usages('eastus2')
usages[:2]

In [None]:
def get_regions_usages(regions):
    import itertools
    def get_region_usages(region):
        return (region, get_usages(region))
    return dict(parallel_map(get_region_usages, regions))

regional_usages = get_regions_usages(regions_successful)

In [None]:
token_limit_details = [extract_usage_details(region, usage) for (region, usages) in list(regional_usages.items()) for usage in usages]
token_limit_details

## Convert the list to a DataFrame
df = pd.DataFrame(token_limit_details)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', 100)
df[df['modelName'].notnull()].head()
#df[df['modelName'] == 'gpt-4o']

### Index usages by name and region

We will use this index to join usages with the models on the usage name

In [None]:
def index_usages_by_name(usages):
    from functools import reduce
    def usage_name_reducer(j, u):
        j[u['name']['value']] = u
        return j
    return reduce(usage_name_reducer, usages, {})

regional_usages_by_name = dict([(region, index_usages_by_name(usages)) for (region, usages) in regional_usages.items()])

## Join models with SKUs

SKUs are listed as an array under each model. We want one SKU per line

In [None]:
def flatten_models_sku(models_region):
    for model_join in models_region:
        model = model_join['model']
        skus = model['model']['skus'] or [None]
        for sku in skus:
            yield model_join | {
                "sku": sku
            }
models_flattened = list(flatten_models_sku(models_regions))
models_flattened[:2]

## Join models with usages

In [None]:
def join_models_usages(models_flattened, regional_usages_by_name):
    for model_join in models_flattened:
        model = model_join['model']
        sku = model_join['sku']
        region = model_join['region']
        usage = None
        if sku:
            usageName = sku['usageName']
            usages_by_name = regional_usages_by_name[region]
            usage = usages_by_name[usageName]
        yield model_join | {
            "usage": usage
        }

joined_model_sku_usages = list(join_models_usages(models_flattened, regional_usages_by_name))
joined_model_sku_usages[:2]

## Find models matching name, TPM and SKU requirements

In [None]:
def model_matches(model_names, tpm, sku_names):
    def criteria(model, sku, usage, **kwargs):
        if not usage:
            return False
        limit = usage['limit']
        current = usage['currentValue']
        remaining = limit - current
        model_name = model['model']['name']
        sku_name = sku['name']
        matches = True
        if model_names:
            matches = matches and model_name in model_names
        if sku_names:
            matches = matches and sku_name in sku_names
        if tpm:
            matches = matches and remaining >= tpm
        return matches
    return criteria

def __or__(p1, p2):
    def pred(*args, **kwargs):
        return p1(*args, **kwargs) or p2(*args, **kwargs)
    return pred

def filter_models_sku_usages(criteria, models_sku_usages):
    return list(filter(lambda msu: criteria(**msu), models_sku_usages))

editor_req = model_matches(model_names = ['gpt-35-turbo', 'gpt-35-turbo-16k'], tpm = 10, sku_names = ['Standard', 'GlobalStandard'])
eval_req = model_matches(model_names = ['gpt-4', 'gpt-4-32k'], tpm = 5, sku_names = ['Standard', 'GlobalStandard'])
writer_req = model_matches(model_names = ['gpt-4o', 'gpt-4o-mini'], tpm = 15, sku_names = ['Standard', 'GlobalStandard'])
embedding_req = model_matches(model_names = ['text-embedding-3-small', 'text-embedding-ada-002'], tpm = 30, sku_names = ['Standard', 'GlobalStandard'])

requirements = [editor_req, eval_req, writer_req, embedding_req]

## Find all models matching requirements

Let's say you want to get an overview of all models matching any of the requirements

In [None]:
req = __or__(editor_req, embedding_req)

models_sku_usages_filtered = filter_models_sku_usages(req, joined_model_sku_usages)
models_sku_usages_filtered[:2]

## Finding regions matching all requirements

Sometimes, you're interested in deploying an AI application in a region where all requirements are met.

In [None]:
def unique_regions(model_sku_usages):
    return set([model_sku_usage['region'] for model_sku_usage in model_sku_usages])

In [None]:
def regions_matching_all(requirements, joined_model_sku_usages):
    filter_msu_sets = [filter_models_sku_usages(req, joined_model_sku_usages) for req in requirements]
    regions_sets = [unique_regions(model_sku_usages) for model_sku_usages in filter_msu_sets]
    #return regions_sets
    return set.intersection(*regions_sets)

regions_matching_all(requirements, joined_model_sku_usages)

## Useful links

- [Azure REST API Browser](https://learn.microsoft.com/en-us/rest/api/)
- [Azure Locations API MS Learn](https://learn.microsoft.com/en-us/rest/api/resources/subscriptions/list-locations)
- [Azure AI Models API MS Learn](https://learn.microsoft.com/en-us/rest/api/aiservices/accountmanagement/models/list)
- [Azure AI Usages API MS Learn](https://learn.microsoft.com/en-us/rest/api/aiservices/accountmanagement/usages/list) (Quotas)