# 1. Overall Architecture Review

![](./DFCI_Demo_App_Architecture.jpeg)

# 2. Test OAuth 2.0 Authorization and API Request

### At this section, we want to first test if the OAuth 2.0 server is working or not. We wont involve the "Databricks Model Serving Endpoint and Gateway" module for now.

Create your secret scope using "Databricks Secret Vault", for example,
1. Create your secret scope first for a specific workspace profile: `databricks secrets create-scope yyang_secret_scope`
2. Put your secret key and value: `databricks secrets put-secret yyang_secret_scope pat`, here `pat` is your key
    - then input the value following the prompt or editor edit/save
3. (optional) you can also save other key:value pair like databricks_host and workspace_id. `databricks secrets put-secret yyang_secret_scope db_host`


Now you are done.



Ref: https://learn.microsoft.com/en-us/azure/databricks/security/secrets/

In [0]:
# TODO: use db keyvault to save below

client_secret = 'fill-your-secret-here' # web app "dfci-demo-app/fernet-w-api" (slot) registration

In [0]:
import requests

tenant_id = '9f37a392-f0ae-4280-9796-f1864a10effc'
# client_id = '67ed6fea-e8ec-4d56-880c-f59688aa2c48' # web app "dfci-demo-app"
client_id = '610ad44b-74fc-4f7e-b385-cc8c93a8423b' # web app "dfci-demo-app/fernet-w-api" (slot)
# client_id = '78061f18-2c8b-4c8d-95f7-c9527a75deb3' # APIM
# scope = 'api://67ed6fea-e8ec-4d56-880c-f59688aa2c48/.default' # dfci-demo-app"
scope = 'api://610ad44b-74fc-4f7e-b385-cc8c93a8423b/.default' # web app "dfci-demo-app/fernet-w-api" (slot)
# scope = 'api://78061f18-2c8b-4c8d-95f7-c9527a75deb3/.default' # APIM

url = f'https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token'
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
data = {
    'grant_type': 'client_credentials',
    'client_id': client_id,
    'client_secret': client_secret,
    'scope': scope
}
response = requests.post(url, headers=headers, data=data)
print(response.status_code)
print(response.json())
token = response.json().get('access_token')

In [0]:
print(token)

In [0]:
# headers = {'Authorization': f'Bearer {token + "xxx"}'} # this is for testing if bad token will lead to failure as expected
headers = {'Authorization': f'Bearer {token}'} # if you see status 200, that means succesful.
response = requests.get('https://dfci-api-management.azure-api.net/api2/v1/', headers=headers)
print(response.status_code)
print(response)

Make sure you have the right endpoint '/api/hello' defined in your App, here we used Python Flask framework to define the App content. You can also use node.js, django, etc.

In [0]:
import requests

url = 'https://dfci-api-management.azure-api.net/api2/v1/api/hello'
headers = {
    'Authorization': f'Bearer {token}',
    'Content-Type': 'application/x-www-form-urlencoded'
}
data = {'req': 'analyze ASML stock'}

response = requests.post(url, headers=headers, data=data)
display(response.status_code)
print(response)

In [0]:
dir(response)

In [0]:
displayHTML(response.content.decode('utf-8'))

# 3. Custom pyfunc model and Mlflow logging model to UC catalog, then use databricks model serving endpoint to serve it

## Now we will work on the "Databricks Model Serving Endpoint and Gateway" module.

The general workflow is:
databricks notebook calling (client side) -> Databricks model serving endpoint with AI gateway -> getting Azure OAuth 2.0 servier token -> calling the API managed by APIM.

We have 3 modules to write:
1. defining pyfunc for logging customized model into UC catalog, which can getting OAuth token and can call Azure APIM using get and post request method
2. logging the model to UC catalog
3. setting up the databricks model serving endpoint with this customized model.

In [0]:
import os

## We need to setup environment variables correctly.
1. Locate your Azure Web App's credentials and API scope
2. Locate your OAuth 2.0 Token URL
3. Locate your APIM URL for the specific App's API managed by the APIM

Fill in below

In [0]:
import requests

# OAuth 2.0 scopes provide a way to limit the amount of access that is granted to an access token. For example, an access token issued to a client app may be granted READ and WRITE access to protected resources, or just READ access. You can implement your APIs to enforce any scope or combination of scopes you wish.

#: make sure you have all credentials and configs setup correctly in the os.environ variables.
os.environ["TENANT_ID"] = tenant_id = '9f37a392-f0ae-4280-9796-f1864a10effc'
os.environ["OAUTH_TOKEN_URL"] = oauth_token_url = f'https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token'
os.environ["CLIENT_ID"] = client_id = '610ad44b-74fc-4f7e-b385-cc8c93a8423b' # web app "dfci-demo-app/fernet-w-api" (slot)
os.environ["CLIENT_SECRET"] = client_secret # TODO: use db keyvault
os.environ["API_SCOPE"] = api_scope = "api://610ad44b-74fc-4f7e-b385-cc8c93a8423b/.default"
# here APIM_URL includes api name and version already. later we just need to provide action/endpoint like /hello or /api/hello
os.environ["APIM_URL"] = apim_url = 'https://dfci-api-management.azure-api.net/api2/v1' # make sure no ending '/' for later syntax compliance, e.g., we dont want to have double slash in the url string.

## Define the pyfunc custom PythonModel

In [0]:
import os
import requests
import mlflow
# import openai
import pandas as pd
from mlflow.pyfunc import PythonModel

# While both methods are used for initialization, __init__ is the standard constructor for Python classes, and load_context is specific to mlflow.pyfunc.PythonModel for loading model-related resources when the model is loaded from storage.

class AzureAPIMModel(PythonModel):
    def __init__(self):
        self.oauth_token_url = os.environ["OAUTH_TOKEN_URL"]
        self.tenant_id = os.environ["TENANT_ID"]
        self.client_id = os.environ["CLIENT_ID"]
        self.client_secret = os.environ["CLIENT_SECRET"]
        self.api_scope = os.environ["API_SCOPE"]
        self.apim_url = os.environ["APIM_URL"]
    
    def load_context(self, context):       
        pass
        #: if needed, uncomment below
        # # Load artifacts or perform initialization tasks
        # self.model = context.artifacts["my_model"]
        # self.api_key = context.artifacts["api_key"]

    def get_oauth_token(self):
        payload = {
            'grant_type': 'client_credentials',
            'client_id': self.client_id,
            'client_secret': self.client_secret,
            'scope': self.api_scope
        }
        response = requests.post(self.oauth_token_url, data=payload)
        response.raise_for_status()
        return response.json()['access_token']

    def _call_apim(self, method: str = 'get', endpoint: str = None, headers: dict = {}, data: dict = {'req': 'example query'}):
        """
        This method calls the Azure API Management (APIM) endpoint with the specified HTTP method, endpoint, headers, and data.

        Args:
            method (str): The HTTP method to use for the request ('get' or 'post').
            endpoint (str): The specific API endpoint to call.
            headers (dict): Additional headers to include in the request.
            data (dict): The data payload to send with the request.

        Returns:
            str: The response content from the APIM call.

        Raises:
            ValueError: If an unsupported HTTP method is provided.
            requests.exceptions.HTTPError: If the HTTP request returned an unsuccessful status code.
        """
        if method.lower() not in ('get', 'post'):
            raise ValueError("Method value must be one of ('get', 'post')")
        token = self.get_oauth_token()
        headers = headers or {} # inherit other headers users specified
        headers['Authorization'] = f'Bearer {token}'
        if method.lower() == 'get':
            url = f"{self.apim_url}/"
            response = requests.get(url, headers=headers)
        elif method.lower() == 'post':
            # this depends on how your Flask app handles the request, e.g., req = request.form.get('req'), then you have to use application/x-www-form-urlencoded.
            headers['Content-Type'] = 'application/x-www-form-urlencoded' 
            url = f"{self.apim_url}{endpoint}"
            print(url)
            response = requests.post(url, headers=headers, data=data)
            # response = requests.post(url, headers=headers, json=data) # this is for 'application/json' format
            # response.content.decode('utf-8')
        else:
            raise ValueError("Unsupported method")

        response.raise_for_status()
        return response.content.decode('utf-8')

    def predict(self, context: dict, model_input: pd.Series[str] = pd.Series(['Please analyze NVIDIA stock'])) -> pd.Series:
    # def predict(self, context, model_input = "Please analyze NVIDIA stock"):

        responses = [self._call_apim(method = 'post', endpoint = '/api/hello', data = {'req': model_input[i]}) for i in range(len(model_input)) ]

        return pd.Series(responses)

### dry test the class and instantiated the class object for inference without setting up model serving.

In [0]:
#: assume all os.environ is set correctly.

# Instantiate the AzureAPIMModel class
model = AzureAPIMModel()

# Example input for prediction
model_input = pd.Seriesy(['Please analyze BABA, Could you also analyze MAMA?', 'I would like to learn more about SON'])

# Call the predict method
predictions = model.predict(context = None, model_input = model_input)

# Display the predictions
display(predictions)

In [0]:
displayHTML(predictions)

## Now it is time to Mlflow log your custom model into the UC Catalog, so later you can serve it form model serving endpoint

1. Model name should follow the hierarchy format of **"catalog.schema.model_name"**
2. You will need the Databricks PAT or SP credentials for below operations.

In [0]:
os.environ['DATABRICKS_TOKEN'] = dbutils.secrets.get(scope="yyang_secret_scope", key="pat")

In [0]:
from mlflow.models import infer_signature
signature=mlflow.models.infer_signature(model_input, predictions)

In [0]:
signature

In [0]:
with mlflow.start_run():
    mlflow.pyfunc.log_model(
        artifact_path="azure_apim_model",
        signature = signature,
        input_example=model_input[0],
        python_model=AzureAPIMModel(),
        registered_model_name="yyang.dfci_demo.DFCI_AzureAPIMModel"
    )

### test loading directly from UC catalog and do batch inference (without model serving)

In [0]:
model_input = "Please analyze TSM stock"

In [0]:
import mlflow.pyfunc

# Load the model
model_name = "yyang.dfci_demo.DFCI_AzureAPIMModel"
model_version = 9
model = mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/{model_version}")

In [0]:
# Example input for prediction
example_input = model_input  # Assuming model_input is defined in previous cells

# Make predictions
predictions = model.predict(example_input)

# Display predictions
display(predictions)

In [0]:
displayHTML(predictions)

## Create your Model Serving Endpoint programmatically
Alternatively you can create it using UI interface.

In [0]:
import mlflow.deployments

client = mlflow.deployments.get_deploy_client("databricks")
client.create_endpoint(
    name="azure-apim-serving-endpoint",
    config={
        "served_models": [{
            "name": "DFCI_AzureAPIMModel",
            "model_name": "yyang.dfci_demo.DFCI_AzureAPIMModel",
            "model_version": "1",
            "workload_size": "Small",
            "scale_to_zero_enabled": True
        }]
    }
)

In [0]:
import time
import requests

# Define the endpoint status URL
status_url = f"https://adb-984752964297111.11.azuredatabricks.net/api/2.0/serving-endpoints/azure-apim-serving-endpoint"


# Define the headers
headers = {
    "Authorization": f"Bearer {os.environ['DATABRICKS_TOKEN']}",
    "Content-Type": "application/json"
}

# Function to check the endpoint status
def check_status():
    status_response = requests.get(status_url, headers=headers)
    status_response.raise_for_status()
    return status_response.json()

# Loop to check the status periodically
while True:
    status = check_status()
    state = status.get("state", {}).get("ready", "UNKNOWN")
    
    if state == "READY":
        print("The endpoint creation has succeeded.")
        break
    elif state == "PENDING":
        print("The endpoint creation is still pending.")
    else:
        print(f"Current status: {state}")
    
    time.sleep(30)  # Wait for 30 seconds before checking again

In [0]:
import mlflow.deployments
client = mlflow.deployments.get_deploy_client("databricks")

client.update_endpoint(
    endpoint="azure-apim-serving-endpoint",
    config={
        "served_models": [{
            "name": "DFCI_AzureAPIMModel",
            "model_name": "yyang.dfci_demo.DFCI_AzureAPIMModel",
            "model_version": "9",
            "workload_size": "Small",
            "scale_to_zero_enabled": True
        }]
    }
)

> Make sure you have model serving endpoint ready before next steps.

# 4. Query your model hosted on the Model Serving Endpoint for Inference

Model Serving offers a unified REST API and MLflow Deployment API for CRUD and querying tasks.

There are multiple ways to query the endpoint, including
1. python script
2. curl command
3. SQL ai_query function
4. UI

Here, since it is in python notebook, we show the example of "1. python script" with two syntax. Feel free to use either one.

In [0]:
os.environ['DATABRICKS_TOKEN'] = dbutils.secrets.get(scope="yyang_secret_scope", key="pat")

## Syntax A

In [0]:
import os
import requests
import numpy as np
import pandas as pd
import json

def create_tf_serving_json(data):
    return {'inputs': {name: data[name].tolist() for name in data.keys()} if isinstance(data, dict) else data.tolist()}

def score_model(dataset):
    url = 'https://adb-984752964297111.11.azuredatabricks.net/serving-endpoints/azure-apim-serving-endpoint/invocations'
    headers = {'Authorization': f'Bearer {os.environ.get("DATABRICKS_TOKEN")}', 'Content-Type': 'application/json'}
    ds_dict = {'dataframe_split': dataset.to_dict(orient='split')} if isinstance(dataset, pd.DataFrame) else create_tf_serving_json(dataset)
    data_json = json.dumps(ds_dict, allow_nan=True)
    response = requests.request(method='POST', headers=headers, url=url, data=data_json)
    if response.status_code != 200:
        raise Exception(f'Request failed with status {response.status_code}, {response.text}')
    return response.json()

In [0]:
result = score_model( pd.Series(['Please analyze TSM stock']))

display(result)

In [0]:
displayHTML(result.get('predictions')[0]['0'])

## Syntax B

In [0]:
import os
import requests
import json

def score_model(data):
    url = 'https://adb-984752964297111.11.azuredatabricks.net/serving-endpoints/azure-apim-serving-endpoint/invocations'
    headers = {'Authorization': f'Bearer {os.environ.get("DATABRICKS_TOKEN")}', 'Content-Type': 'application/json'}
    data_json = json.dumps({"inputs": [data]})
    response = requests.request(method='POST', headers=headers, url=url, data=data_json)
    if response.status_code != 200:
        raise Exception(f'Request failed with status {response.status_code}, {response.text}')
    return response.json()

In [0]:
result = score_model('Please analyze Netflix stock')
display(result)

In [0]:
displayHTML(result.get('predictions')[0]['0'])