Libraries required

azure-ai-ml

azure-identity

datasets

mlflow

azureml-mlflow

py7zr

ast

Wether you're running on azure notebook or a local one we first need to connect to an azure instance

In [None]:
from azure.ai.ml import MLClient
from azure.identity import (
    DefaultAzureCredential,
    InteractiveBrowserCredential,
)
from azure.ai.ml.entities import AmlCompute
import time

try:
    credential = DefaultAzureCredential()
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    credential = InteractiveBrowserCredential()

try:
    workspace_ml_client = MLClient.from_config(credential=credential)
except:
    workspace_ml_client = MLClient(
        credential,
        subscription_id="<SUBSCRIPTION_ID>",
        resource_group_name="<RESOURCE_GROUP>",
        workspace_name="<WORKSPACE_NAME>",
    )

# the models, fine tuning pipelines and environments are available in the AzureML system registry, "azureml"

everything that we will be working with are available in the registry of azureml.
Check different registeries to see which model is available for example if we want to work with or finetune a llama model we can find them in the meta registery azureml-meta

The pipelines and eviroments are also available in the registery too

(insert image of ui maybe)



1. registry_name: This is the name of the model registry where the model is stored. In this case, it’s set to “azureml-meta”, which is a public registry that contains Llama 2 models.

2. model_name: This is the name of the model to be deployed. Here, it’s set to “Llama-2–7b”. There are other available models for text generation.

3. endpoint_name: This is the name of the endpoint where the model will be deployed. It’s set to the model name followed by “-test-ep”. You should replace this with your own endpoint name.

4. deployment_name: This is the name of the deployment. It’s be default set to “llama”, but you should replace this with your own deployment name. Note that it should be in lowercase only.

5. sku_name: This is the name of the SKU (stock keeping unit), which represents the instance type for the deployment. It’s set to “Standard_NC24s_v3”, but you should check the model list to find the most optimal SKU for your model.

6. content_severity_threshold: This is the severity level that will trigger the response to be blocked. It’s set to “2”. For more details, you can refer to the Azure AI content documentation.

7. uai_name: This is the name of the User-Assigned Identity (UAI) to be used for endpoint authentication. It’s currently empty, but defaults to “aacs-uai” in the prepare UAI notebook.

In [None]:
registry_ml_client = MLClient(credential, registry_name="azureml")
registry_ml_client_meta = MLClient(credential, registry_name="azureml-meta")
experiment_name = "text-generation"

# generating a unique timestamp that can be used for names and versions that need to be unique
timestamp = str(int(time.time()))

In [None]:

model_name = "Llama-2-70b"

foundation_model = registry_ml_client_meta.models.get(model_name, label="latest")
print(
    "\n\nUsing model name: {0}, version: {1}, id: {2} for fine tuning".format(
        foundation_model.name, foundation_model.version, foundation_model.id
    )
)

In [None]:
# GPU cluster

if "computes_allow_list" in foundation_model.tags:
    computes_allow_list = ast.literal_eval(
        foundation_model.tags["computes_allow_list"]
    )  # convert string to python list
    computes_allow_list.append("Standard_NC48ads_A100_v4")
    computes_allow_list.append("Standard_NC96ads_A100_v4") 
    print(f"Please create a compute from the above list - {computes_allow_list}")
else:
    computes_allow_list = None
    print("Computes allow list is not part of model tags")

In [None]:
# If you have a specific compute size to work with change it here. By default we use the 8 x V100 compute from the above list
compute_cluster_size = "Standard_NC96ads_A100_v4"

# If you already have a gpu cluster, mention it here. Else will create a new one with the name 'gpu-cluster-big'
#compute_cluster = "gpu-cluster-big"
compute_cluster = "gpu-cluster-large2"

try:
    compute = workspace_ml_client.compute.get(compute_cluster)
    print("The compute cluster already exists! Reusing it for the current run")
except Exception as ex:
    print(
        f"Looks like the compute cluster doesn't exist. Creating a new one with compute size {compute_cluster_size}!"
    )
    try:
        print("Attempt #1 - Trying to create a dedicated compute")
        compute = AmlCompute(
            name=compute_cluster,
            size=compute_cluster_size,
            tier="Dedicated",
            max_instances=2,  # For multi node training set this to an integer value more than 1
        )
        workspace_ml_client.compute.begin_create_or_update(compute).wait()
    except Exception as e:
        try:
            print(
                "Attempt #2 - Trying to create a low priority compute. Since this is a low priority compute, the job could get pre-empted before completion."
            )
            compute = AmlCompute(
                name=compute_cluster,
                size=compute_cluster_size,
                tier="LowPriority",
                max_instances=2,  # For multi node training set this to an integer value more than 1
            )
            workspace_ml_client.compute.begin_create_or_update(compute).wait()
        except Exception as e:
            print(e)
            raise ValueError(
                f"WARNING! Compute size {compute_cluster_size} not available in workspace"
            )


# Wait for the compute to be created


# This is the number of GPUs in a single node of the selected 'vm_size' compute.
# Setting this to less than the number of GPUs will result in underutilized GPUs, taking longer to train.
# Setting this to more than the number of GPUs will result in an error.
gpu_count_found = False
workspace_compute_sku_list = workspace_ml_client.compute.list_sizes()
available_sku_sizes = []
for compute_sku in workspace_compute_sku_list:
    available_sku_sizes.append(compute_sku.name)
    if compute_sku.name.lower() == compute.size.lower():
        gpus_per_node = compute_sku.gpus
        gpu_count_found = True
        print(compute_sku.name.lower())
# if gpu_count_found not found, then print an error
if gpu_count_found:
    print(f"Number of GPU's in compute {compute.size}: {gpus_per_node}")
else:
    raise ValueError(
        f"Number of GPU's in compute {compute.size} not found. Available skus are: {available_sku_sizes}."
        f"This should not happen. Please check the selected compute cluster: {compute_cluster} and try again."
    )

Depending on the model being used it can either accept text or a tokenized dataset

In [None]:
# Pull the finetune data from SA

In [None]:
def get_preprocessed(df):
    prompt = f"Summarize this dialog:\n{{}}\n---\nSummary:\n"

    df["text"] = df["dialogue"].map(prompt.format)
    df = df.drop(columns=["dialogue", "id"])
    df = df[["text", "summary"]]

    return df

Depending on the cluster of gpu's and model used please make sure the change the per_device_train_batch_size and per_device_eval_batch_size.

For example a 7b model with 4 A100 gpu's could have a maximum batch size of 8

In [None]:
# Training parameters
training_parameters = dict(
    num_train_epochs=3,
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    learning_rate=2e-5,
)
print(f"The following training parameters are enabled - {training_parameters}")

# Optimization parameters - As these parameters are packaged with the model itself, lets retrieve those parameters
if "model_specific_defaults" in foundation_model.tags:
    optimization_parameters = ast.literal_eval(
        foundation_model.tags["model_specific_defaults"]
    )  # convert string to python dict
else:
    optimization_parameters = dict(
        apply_lora="true", apply_deepspeed="true", apply_ort="true"
    )
print(f"The following optimizations are enabled - {optimization_parameters}")

In [None]:
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import CommandComponent, PipelineComponent, Job, Component
from azure.ai.ml import PyTorchDistribution, Input

# fetch the pipeline component
pipeline_component_func = registry_ml_client.components.get(
    name="text_generation_pipeline", label="latest"
)


# define the pipeline job
@pipeline()
def create_pipeline():
    text_generation_pipeline = pipeline_component_func(


        ####### Depending on the model you are using, you can use either the model you have or found on huggingface or the model's found in the registery

        # mlflow_model_path=foundation_model.id, # Uncomment this line if you want to use a model from the registry and comment the line below
        # huggingface_id = 'meta-llama/Llama-2-7b', # if you want to use a huggingface model, uncomment this line and comment the above line

        ########
        compute_model_import=compute_cluster,
        compute_preprocess=compute_cluster,
        compute_finetune=compute_cluster,
        compute_model_evaluation=compute_cluster,
        # map the dataset splits to parameters
        train_file_path=Input(
            type="uri_file", path="<path to train>"
        ),
        validation_file_path=Input(
            type="uri_file", path="<path to validation>"
        ),
        test_file_path=Input(type="uri_file", path="<path to test>"),
        evaluation_config=Input(type="uri_file", path="<path to evaluation_config>"),
        # map the preprocessed data to parameters
        # The following parameters map to the dataset fields
        text_key="text",
        ground_truth_key="summary",
        num_nodes_finetune=2,
        # Training settings
        number_of_gpu_to_use_finetuning=gpus_per_node,  # set to the number of GPUs available in the compute
        **training_parameters,
        **optimization_parameters
    )
    return {
        # map the output of the fine tuning job to the output of pipeline job so that we can easily register the fine tuned model
        # registering the model is required to deploy the model to an online or batch endpoint
        "trained_model": text_generation_pipeline.outputs.mlflow_model_folder
    }


pipeline_object = create_pipeline()

# don't use cached results from previous jobs
pipeline_object.settings.force_rerun = True

# set continue on step failure to False
pipeline_object.settings.continue_on_step_failure = False

# set the pytorch and mlflow mode to mount

pipeline_object.jobs["text_generation_pipeline"]["outputs"]["pytorch_model_folder"].mode = "mount"

pipeline_object.jobs["text_generation_pipeline"]["outputs"]["mlflow_model_folder"].mode = "mount"

In [None]:
# submit the pipeline job
pipeline_job = workspace_ml_client.jobs.create_or_update(
    pipeline_object, experiment_name=experiment_name
)
# wait for the pipeline job to complete
workspace_ml_client.jobs.stream(pipeline_job.name)

Deploy the model (some question here)


In [None]:
 ## this part was copied from the documentation,not sure if it will work

# from azure.ai.ml.entities import (
#     OnlineRequestSettings,
#     CodeConfiguration,
#     ManagedOnlineDeployment,
#     ProbeSettings,
# )

# deployment = ManagedOnlineDeployment(
#     name=deployment_name,
#     endpoint_name=endpoint_name,
#     model=llama_model.id,
#     instance_type=sku_name,
#     instance_count=1,
#     environment = "llama2-environment:1"
#     environment_variables=deployment_env_vars,
#     request_settings=OnlineRequestSettings(request_timeout_ms=REQUEST_TIMEOUT_MS),
#     liveness_probe=ProbeSettings(
#         failure_threshold=30,
#         success_threshold=1,
#         period=100,
#         initial_delay=500,
#     ),
#     readiness_probe=ProbeSettings(
#         failure_threshold=30,
#         success_threshold=1,
#         period=100,
#         initial_delay=500,
#     ),
# )
# # Trigger the deployment creation
# try:
#     ml_client.begin_create_or_update(deployment).wait()
#     print("\n---Deployment created successfully---\n")
# except Exception as err:
#     raise RuntimeError(
#         f"Deployment creation failed. Detailed Response:\n{err}"
#     ) from err

In [None]:
# import os

# test_src_dir = "./llama-test"
# os.makedirs(test_src_dir, exist_ok=True)
# print(f"test script directory: {test_src_dir}")
# sample_data = os.path.join(test_src_dir, "sample-request.json")

# ## For text-generation models (without -chat suffix)
# ## Successful response

# import json

# with open(sample_data, "w") as f:
#     json.dump(
#         {
#             "input_data": {
#                 "input_string": [
#                     "Hello",
#                     "My name is John and I have a dog.",
#                 ],
#                 "parameters": {
#                     "temperature": 0.6,
#                     "top_p": 0.6,
#                     "max_new_tokens": 256,
#                     "do_sample": True,
#                 },
#             }
#         },
#         f,
#     )

# ml_client.online_endpoints.invoke(
#     endpoint_name=endpoint_name,
#     deployment_name=deployment_name,
#     request_file=sample_data,
# )