Before, starting the deployment make sure AzureMl kernal is selected

In [None]:
# Check for package
!pip show azure-ai-ml

In [None]:
# Install the package if not available
!pip install azure-ai-ml

# User Identity verification

In [None]:
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient
# Get the registered model
from azure.ai.ml.entities import (
    ManagedOnlineEndpoint,
    ManagedOnlineDeployment,
    Environment,
    CodeConfiguration,
    Model
)
try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

## Initialize ML Client

In [None]:

# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

Found the config file in: /config.json


## Initialize an endpoint

In [None]:
endpoint_name = "gte-finance-endpoint"
# Create an online endpoint
endpoint = ManagedOnlineEndpoint(
    name=endpoint_name,
    description="Endpoint for GTE Finance model",
    auth_mode="key"
)


## Register the embedding model with ML Workspace

In [None]:
model = Model(
       name="gte-finance-model",
       version="1",
       description="FinanceRAG embedding model",
       path="./model/gte-finance-model/",
       type="custom_model"
   )
ml_client.models.create_or_update(model)

# Create enviornment from base Image

In [None]:
# Create enviornment.yml file

%% writefile model/enviornment.yml

name: embedding_inference_env
channels:
  - conda-forge
  - pytorch
  - defaults
dependencies:
  - python=3.10
  - pip=23.1.2
  - pip:
    - sentence-transformers>=2.2.2
    - torch>=2.0.0
    - transformers>=4.30.0
    - peft>=0.4.0
    - numpy>=1.24.0
    - tqdm>=4.65.0
    - scikit-learn>=1.2.2
    - joblib
    - azureml-inference-server-http
    - inference-schema


In [None]:
# Create environment
environment = Environment(
    name="gte-finance-env",
    description="Environment for GTE Finance model",
    conda_file="model/enviornment.yml",
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest"
)

Retrieved model 'gte-finance-model' version '1'


# Create Endpoint

In [None]:
# Create a deployment

# Create or update the endpoint
ml_client.online_endpoints.begin_create_or_update(endpoint).result()
print(f"Endpoint '{endpoint_name}' created or updated successfully")

Endpoint 'gte-finance-endpoint' created or updated successfully


# Create Score.py file for Model Inference

In [None]:
%%writefile model/score.py
import os
import json
import logging
import numpy as np
from sentence_transformers import SentenceTransformer

# Declare globals at the module level
model = None
config = {}

def init():
    """
    Initialize the model when the container starts.
    This function is called once when the service is deployed.
    """
    global model, config

    logging.info("Initializing GTE Finance model")

    # Centralized configuration
    config = {
        "embedding_batch_size": 32,
        "show_progress_bar": False,
        "normalize_embeddings": True,
        "max_seq_length": 2000
    }

    try:
        # Get base path from environment variable
        base_model_dir = os.getenv("AZUREML_MODEL_DIR", "")

        # If the environment variable is not set, use fallback path
        if not base_model_dir:
            base_model_dir = "/var/azureml-app/azureml-models/gte-finance-model/1"

        # Recursive function to find potential model directories
        def find_model_dirs(directory, max_depth=5, current_depth=0):
            if current_depth > max_depth or not os.path.exists(directory):
                return []

            potential_dirs = []

            # Check if this directory could be a model directory
            if any(os.path.exists(os.path.join(directory, f)) for f in
                  ["config.json", "config_sentence_transformers.json", "modules.json"]):
                potential_dirs.append(directory)

            # Recursively check subdirectories
            try:
                for item in os.listdir(directory):
                    item_path = os.path.join(directory, item)
                    if os.path.isdir(item_path):
                        potential_dirs.extend(find_model_dirs(item_path, max_depth, current_depth + 1))
            except Exception:
                pass

            return potential_dirs

        # Find all potential model directories
        potential_model_dirs = find_model_dirs(base_model_dir)

        # Try loading the model from each potential directory
        model = None

        # Try loading from Hugging Face hub as a fallback option
        try:
            model = SentenceTransformer("Yaksh170802/gte-finance-model",trust_remote_code=True)
        except Exception:
            pass

        # If Hugging Face loading failed, try local directories
        if model is None:
            for model_dir in potential_model_dirs:
                try:
                    model = SentenceTransformer(model_dir)
                    break
                except Exception:
                    continue

        # Final attempt - try loading directly from base directory with trust_remote_code
        if model is None:
            model = SentenceTransformer(base_model_dir, trust_remote_code=True)

        # Set max_seq_length after initialization
        if config.get("max_seq_length"):
            model.max_seq_length = config.get("max_seq_length")

        logging.info(f"Model max sequence length: {model.max_seq_length}")
        logging.info("Model loaded successfully ✅")

    except Exception as e:
        logging.error(f"Error in model initialization: {str(e)}")
        raise

def generate_embeddings(texts, model_instance, model_config):
    """
    Generate embeddings for a list of input texts.

    Args:
        texts (list of str): The texts to embed.
        model_instance (SentenceTransformer): The loaded model instance.
        model_config (dict): A dictionary containing configuration for encoding.

    Returns:
        numpy.ndarray: The generated embeddings.
    """
    if not model_instance:
        raise RuntimeError("Model has not been initialized. Call init() first.")

    # Generate embeddings
    embeddings = model_instance.encode(
        texts,
        batch_size=model_config.get("embedding_batch_size", 32),
        show_progress_bar=model_config.get("show_progress_bar", False),
        normalize_embeddings=model_config.get("normalize_embeddings", True)
    )

    return embeddings

def run(raw_data):
    """
    Run a prediction on the input data.
    This function is called for each scoring request.
    """
    try:
        logging.info("Received input data for scoring")
        data = json.loads(raw_data)

        texts = data.get("texts", [])
        if not isinstance(texts, list) or not texts:
            return json.dumps({
                "error": "Input must be a JSON object with a 'texts' key containing a non-empty list of strings."
            })

        # Generate embeddings using the loaded model and config
        embeddings = generate_embeddings(texts, model, config)

        # Prepare the successful response
        response = {
            "embeddings": embeddings.tolist(),
            "dimensions": embeddings.shape[1],
            "count": len(texts)
        }

        return json.dumps(response)

    except json.JSONDecodeError:
        logging.error("Failed to decode JSON from input data.")
        return json.dumps({"error": "Invalid JSON format received."})

    except Exception as e:
        logging.error(f"Error during prediction: {str(e)}", exc_info=True)
        return json.dumps({"error": str(e)})

# Initialize Managed Endpoint Deployment

In [None]:
deployment_name =  "gte-finance-deployment"
instance_type = "Standard_E2s_v3"
deployment = ManagedOnlineDeployment(
    name=deployment_name,
    endpoint_name=endpoint_name,
    model=model.id,
    environment=environment,
    code_configuration=CodeConfiguration(
        code="./model",
        scoring_script="score.py"  # This must match the name of your script in the src directory
    ),
    instance_type=instance_type,
    instance_count=1,
    environment_variables={
        "MAX_SEQUENCE_LENGTH": "2000",
        "SENTENCE_TRANSFORMERS_HOME": "/var/azureml-app/sentence_transformers_cache"
    }
)

# Create Deployment

In [None]:
# Create or update the deployment
ml_client.online_deployments.begin_create_or_update(deployment).result()
print(f"Deployment '{deployment_name}' created or updated successfully")

# Allocate traffic to the deployment
endpoint = ml_client.online_endpoints.get(name=endpoint_name)

# Update traffic
ml_client.online_endpoints.begin_create_or_update(
    ManagedOnlineEndpoint(
        name=endpoint_name,
        traffic={deployment_name: 100}
    )
).result()

print(f"Traffic allocated to deployment '{deployment_name}'")
print(f"Endpoint URL: {endpoint.scoring_uri}")

Check: endpoint gte-finance-endpoint exists
[32mUploading depoly (28.93 MBs): 100%|██████████| 28931797/28931797 [00:00<00:00, 73226274.77it/s]
[39m



.........................................................................................................................Deployment 'gte-finance-deployment' created or updated successfully
Traffic allocated to deployment 'gte-finance-deployment'
Endpoint URL: https://gte-finance-endpoint.eastus.inference.ml.azure.com/score


# Test Deployment

In [None]:
import urllib.request
import json


data = {"texts":["Hi I am Yaksh Shah" for i in range(32)]}

body = str.encode(json.dumps(data))

url = 'MODEL_ENDPOINT_URL'
# Replace this with the primary/secondary key, AMLToken, or Microsoft Entra ID token for the endpoint
api_key = 'YOUR_API_KEY'
if not api_key:
    raise Exception("A key should be provided to invoke the endpoint")


headers = {'Content-Type':'application/json', 'Accept': 'application/json', 'Authorization':('Bearer '+ api_key)}

req = urllib.request.Request(url, body, headers)

try:
    response = urllib.request.urlopen(req)

    result = response.read()
    print(result)
except urllib.error.HTTPError as error:
    print("The request failed with status code: " + str(error.code))

    # Print the headers - they include the requert ID and the timestamp, which are useful for debugging the failure
    print(error.info())
    print(error.read().decode("utf8", 'ignore'))


b'"{\\"embeddings\\": [[-0.009951372630894184, 0.0310331080108881, 0.003037626389414072, -0.026683658361434937, 0.012104762718081474, -0.04607776179909706, 0.044618986546993256, 0.0223553329706192, 0.07123686373233795, 0.028216103091835976, -0.06299486011266708, -0.0028544606175273657, -0.056256189942359924, 0.033583685755729675, -0.004442782606929541, 0.07584337145090103, 0.08083660900592804, 0.0771072506904602, 0.05174950882792473, 0.013348898850381374, 0.06094573810696602, 0.07138437777757645, -0.04005582258105278, 0.1143268570303917, -0.042824599891901016, 0.025043420493602753, 0.07684802263975143, -0.051938559859991074, -0.12196481972932816, 0.017458882182836533, -0.10475583374500275, 0.019344551488757133, 0.014739216305315495, -0.007026264909654856, 0.034627821296453476, -0.044741395860910416, -0.016833104193210602, 0.06564822047948837, -0.061839792877435684, 0.016921423375606537, -0.0674680694937706, -0.007358831353485584, 0.012081935070455074, -0.013716674409806728, -0.01169577