In [None]:
from azure.ai.ml import MLClient
from azure.identity import (
    DefaultAzureCredential,
    InteractiveBrowserCredential,
)
import time

from azure.ai.ml.dsl import pipeline
from azure.ai.ml import Input

import ast

In [None]:
def fetch_data(model_name):
       import requests
       import pandas as pd
       from io import StringIO
       from sklearn.model_selection import train_test_split
       from sklearn.preprocessing import LabelEncoder

       seed = 613

       url = f""
       response = requests.get(url)

       response.raise_for_status()

       csv_data = StringIO(response.text)

       df = pd.read_csv(csv_data)

       test_size, val_size = .2, .2

       train_df, temp_df = train_test_split(df, test_size=test_size + val_size, random_state=seed, stratify=df['label_string'])

       val_size_adj = val_size / (val_size + test_size)
       val_df, test_df = train_test_split(temp_df, test_size=1 - val_size_adj, random_state=seed, stratify=temp_df['label_string'])

       train_df.to_json("train.jsonl", orient='records', lines=True)
       val_df.to_json("val.jsonl", orient='records', lines=True)
       test_df.to_json("test.jsonl", orient='records', lines=True)


In [None]:
try:
    credential = DefaultAzureCredential()
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    credential = InteractiveBrowserCredential()

subscription_id = ''
resource_group_name = ''
workspace_name = ''


try:
    workspace_ml_client = MLClient.from_config(credential=credential)
except:
    workspace_ml_client = MLClient(
        credential,
        subscription_id=subscription_id,
        resource_group_name=resource_group_name,
        workspace_name=workspace_name,
    )

# the models, fine tuning pipelines and environments are available in the AzureML system registry, "azureml"
registry_ml_client = MLClient(credential, registry_name="azureml")

In [None]:
# define the pipeline job
@pipeline()
def create_pipeline():
    text_classification_pipeline = pipeline_component_func(
        # specify the foundation model available in the azureml system registry id identified in step #3
        mlflow_model_path=foundation_model.id,
        # huggingface_id = 'bert-base-uncased', # to use a huggingface model, uncomment this line and comment the above line
        compute_model_import=compute_name,
        compute_preprocess=compute_name,
        compute_finetune=compute_name,
        compute_model_evaluation=compute_name,
        # map the dataset splits to parameters
        train_file_path=Input(
            type="uri_file", path= "./train.jsonl"
        ),
        validation_file_path=Input(
            type="uri_file", path= "val.jsonl"
        ),
        test_file_path=Input(
            type="uri_file", path= "./test.jsonl"
        ),
        evaluation_config=Input(
            type="uri_file", path="./text-classification-config.json"
        ),
        # The following parameters map to the dataset fields
        sentence1_key="text",
        label_key="label_string",
        # Training settings
        number_of_gpu_to_use_finetuning=gpus_per_node,  # set to the number of GPUs available in the compute
        **training_parameters,
        **optimization_parameters
    )
    return {
        # map the output of the fine tuning job to the output of pipeline job so that we can easily register the fine tuned model
        # registering the model is required to deploy the model to an online or batch endpoint
        "trained_model": text_classification_pipeline.outputs.mlflow_model_folder
    }

In [None]:
compute_name = "Standard_NC12s_v3"

compute = workspace_ml_client.compute.get(compute_name)
gpu_count_found = False
workspace_compute_sku_list = workspace_ml_client.compute.list_sizes()
available_sku_sizes = []
for compute_sku in workspace_compute_sku_list:
    available_sku_sizes.append(compute_sku.name)
    if compute_sku.name.lower() == compute.size.lower():
        gpus_per_node = compute_sku.gpus
        gpu_count_found = True
# if gpu_count_found not found, then print an error
if gpu_count_found:
    print(f"Number of GPU's in compute {compute.size}: {gpus_per_node}")
else:
    raise ValueError(
        f"Number of GPU's in compute {compute.size} not found. Available skus are: {available_sku_sizes}."
        f"This should not happen. Please check the selected compute cluster: {compute_name} and try again."
    )

In [None]:
models = [
    "t5-small", 
    "t5-large", 
    "t5-base", 
    "microsoft-deberta-xlarge", 
    "microsoft-deberta-large-mnli", 
    "microsoft-deberta-large", 
    "microsoft-deberta-base-mnli",
    "microsoft-deberta-base",
]

for model_name in models:

    fetch_data(model_name)
    
    experiment_name = "text-classification-" + model_name

    foundation_model = registry_ml_client.models.get(model_name, label="latest")

    # Training parameters
    training_parameters = dict(
        num_train_epochs=5,
        per_device_train_batch_size=4,
        per_device_eval_batch_size=4,
        learning_rate=2e-5,
        metric_for_best_model="loss",
        resume_from_checkpoint="true"
    )
    print(f"The following training parameters are enabled - {training_parameters}")

    # Optimization parameters - As these parameters are packaged with the model itself, lets retrieve those parameters
    if "model_specific_defaults" in foundation_model.tags:
        optimization_parameters = ast.literal_eval(
            foundation_model.tags["model_specific_defaults"]
        )  # convert string to python dict
    else:
        optimization_parameters = dict(
            apply_lora="true", 
            apply_deepspeed="true", 
            apply_ort="true"
        )
    print(f"The following optimizations are enabled - {optimization_parameters}")

    # fetch the pipeline component
    pipeline_component_func = registry_ml_client.components.get(
        name="text_classification_pipeline", 
        label="latest"
    )
    
    pipeline_object = create_pipeline()

    # don't use cached results from previous jobs
    pipeline_object.settings.force_rerun = True

    # set continue on step failure to False
    pipeline_object.settings.continue_on_step_failure = False

    # submit the pipeline job
    pipeline_job = workspace_ml_client.jobs.create_or_update(
        pipeline_object, experiment_name=experiment_name
    )