# End-to-End Text Classification Pipeline (Sentiment Analysis) with Azure ML v2 and Transformers 

### Utilizing HyperDrive for hyperparameter tuning and Managed Online Endpoint for Effective Model Training and Deployment

This pipeline represents the end-to-end process of training, tuning, registering, and deploying the sentiment analysis model using AzureML  SDK v2. 
It starts by setting up the necessary AzureML workspace and compute resources, configuring the environment, and preparing the data by downloading it from a URL, parsing the JSON, and creating a Pandas DataFrame. The data is then split into train, validation, and test sets and registered as datasets in the AzureML workspace. The script then creates a pipeline consisting of a HyperDriveStep for hyperparameter tuning and several PythonScriptSteps for testing, registering, and deploying the best model as a web service.
The dataset used is Amazon product reviews in the Automotive category. 

## Importing Libraries and Setting Up Workspace
In this section, we import the required libraries and set up the Azure Machine Learning workspace by providing the subscription ID, resource group, and workspace name

In [None]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

# Enter details of your AML workspace
subscription_id = "<subscription_id>"
resource_group = "<resource_group>"
workspace = "<workspace>"

# get a handle to the workspace
ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)

print(ml_client.subscription_id, ml_client.resource_group_name, ml_client.workspace_name, sep='\n')

### Preparing the Amazon review dataset
The code in this section downloads the Amazon review dataset, processes it to add a sentiment column, and generates training, validation, and test sets. The resulting dataframes are registered as Azure ML datasets. 

In [None]:
# to get larger datasets, visit: http://jmcauley.ucsd.edu/data/amazon/

!wget http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Automotive_5.json.gz -P data/

In [None]:
import pandas as pd
import gzip

def parse(path):
  g = gzip.open(path, 'rb')
  for l in g:
    yield eval(l)

def getDF(path):
  i = 0
  df = {}
  for d in parse(path):
    df[i] = d
    i += 1
  return pd.DataFrame.from_dict(df, orient='index')

pdf_main = getDF('data/reviews_Automotive_5.json.gz')
pdf_main.shape

In [None]:
pdf_main.loc[pdf_main['overall'] >= 4, 'sentiment'] = 1
pdf_main.loc[pdf_main['overall'] < 3, 'sentiment'] = 0

pdf_main.describe()

In [None]:
from sklearn.model_selection import train_test_split

def generate_datasets(pdf_target_training, label = 'sentiment'):
    X_train, X_test_val, y_train, y_test_val = train_test_split(pdf_target_training.drop(label, axis=1), pdf_target_training[label],
                                                        stratify=pdf_target_training[label],
                                                        shuffle=True,
                                                        test_size=0.20)

    X_val, X_test, y_val, y_test = train_test_split(X_test_val, y_test_val,
                                                        stratify=y_test_val,
                                                        shuffle=True,
                                                        test_size=0.5)
    pdf_X_train = X_train
    pdf_X_val = X_val
    pdf_X_test = X_test

    pdf_X_train['sentiment'] = y_train
    pdf_X_val['sentiment'] = y_val
    pdf_X_test['sentiment'] = y_test
    
    print(f'Total records for: "pdf_X_train": [{pdf_X_train.shape[0]}]')
    print(f'Total records for: "pdf_X_val": [{pdf_X_val.shape[0]}]')
    print(f'Total records for: "pdf_X_test": [{pdf_X_test.shape[0]}]')
    


    
    return pdf_X_train, pdf_X_val, pdf_X_test

In [None]:
pdf_train, pdf_val, pdf_test = generate_datasets(pdf_main[['reviewText', 'sentiment']].dropna(), 'sentiment')

pdf_train.to_csv('data/pdf_train.csv')
pdf_val.to_csv('data/pdf_val.csv')
pdf_test.to_csv('data/pdf_test.csv')


In [None]:
from azure.ai.ml.entities import Data
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes

# === Note on path ===
# can be can be a local path or a cloud path. AzureML supports https://`, `abfss://`, `wasbs://` and `azureml://` URIs.
# Local paths are automatically uploaded to the default datastore in the cloud.
# More details on supported paths: https://docs.microsoft.com/azure/machine-learning/how-to-read-write-data-v2#supported-paths

def gen_input_data(url):
    return Input(type=AssetTypes.URI_FILE, path=url)


In [None]:
ds_train = gen_input_data('data/pdf_train.csv')
ds_val = gen_input_data('data/pdf_val.csv')
ds_test = gen_input_data('data/pdf_test.csv')


## Setting Up Environment 
 This section sets up the environment for training the model by specifying the conda dependencies and creating an environment object

In [None]:
source_directory = "./src_v2/"
experiment_name = 'transformer_hp_v2'

In [None]:
%%writefile environments/conda_dependencies.yml

channels:
  - pytorch
  - anaconda
  - conda-forge
dependencies:
  - python=3.7
  - pip=21.1.2
  - pip:
      - azure-ai-ml==1.2.0
      - mlflow== 1.26.1
      - azureml-mlflow==1.42.0
      - nvitop
      - transformers
      - inference-schema
      - joblib
      - datasets
  - numpy~=1.21.6
  - pandas~=1.1.5
  - shap=0.39.0
  - scikit-learn~=0.22.1
  - pytorch==1.7.1
name: nlp_training_environment

In [None]:
from azure.ai.ml.entities import Environment

env_name = 'nlp-accelerator-sdk-v2'
env_list = list(ml_client.environments.list(name=env_name))
if len(env_list) > 0:
    env = env_list[0]
else:
    env = Environment(
        image="mcr.microsoft.com/azureml/openmpi4.1.0-cuda11.1-cudnn8-ubuntu20.04:latest",
        conda_file='environments/conda_dependencies.yml',
        name=env_name,
        description='This environment is curated to run NLP Transformer based models using AML SDK-v2 and native MLFlow integration'
    )

    ml_client.environments.create_or_update(env)


## Configuring Compute Targets
In this section, we configure the compute targets for training and deployment. 

In [None]:
cluster_name = "a100-cluster"
compute_target = ml_client.compute.get(cluster_name)


In [None]:
cpu_compute_target = ml_client.compute.get("cpu-cluster")

## Defining the Training Job
The training job is defined by specifying the inputs, outputs, compute target, environment, and the command to run the training script. This job trains the sentiment analysis model using the preprocessed data and the specified environment. 

In [None]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

job_train = command(
    inputs=dict(
        training_dataset=ds_train,
        val_dataset=ds_val,
        test_dataset=ds_test,
        target_name='sentiment', 
        text_field_name='reviewText',
        is_test=1,
        is_final=0,
        is_local=0,
        is_jump=0,
        evaluation_strategy='epoch',
        collect_resource_utilization=1,
        resource_utilization_interval=5.0, # seconds
        base_checkpoint='bert-base-cased',
        batch_size=8,
        no_epochs=4,
        learning_rate=5.5e-5,
        warmup_steps=0,
        weight_decay=0.0,
        adam_beta1=0.9,
        adam_beta2=0.999,
        adam_epsilon=1e-8
    ),
    outputs=dict(
        model_output=Output(type="custom_model")
    ),
    compute=compute_target,
    environment=env,
    code=source_directory, # location of source code
    command="""
    python train_transformer.py \
        --collect-resource-utilization ${{inputs.collect_resource_utilization}} \
        --resource-utilization-interval ${{inputs.resource_utilization_interval}} \
        --target-name ${{inputs.target_name}} \
        --training-dataset ${{inputs.training_dataset}} \
        --val-dataset ${{inputs.val_dataset}} \
        --test-dataset ${{inputs.test_dataset}} \
        --model-path ${{outputs.model_output}} \
        --text-field-name ${{inputs.text_field_name}} \
        --is-test ${{inputs.is_test}} \
        --is-final ${{inputs.is_final}} \
        --is-local ${{inputs.is_local}} \
        --is-jump ${{inputs.is_jump}} \
        --evaluation-strategy ${{inputs.evaluation_strategy}} \
        --base-checkpoint ${{inputs.base_checkpoint}} \
        --batch-size ${{inputs.batch_size}} \
        --no-epochs ${{inputs.no_epochs}} \
        --learning-rate ${{inputs.learning_rate}} \
        --warmup-steps ${{inputs.warmup_steps}} \
        --weight-decay ${{inputs.weight_decay}} \
        --adam-beta1 ${{inputs.adam_beta1}} \
        --adam-beta2 ${{inputs.adam_beta2}} \
        --adam-epsilon ${{inputs.adam_epsilon}}
    """,
    display_name="HyperDrive_Step",
)


## Hyperparameter Tuning Configuration
In the hyperparameter tuning section, we define the hyperparameter search space and configure the hyperparameter tuning job using the BanditPolicy for early termination. This allows us to find the best model by searching through different combinations of hyperparameters. 

In [None]:
from azure.ai.ml.sweep import Choice, BanditPolicy

# we will reuse the command_job created before. we call it as a function so that we can apply inputs
job_train_for_sweep = job_train(
    # large checkpoints needs larger GPU VMs such as A100
    base_checkpoint=Choice(["bert-base-cased"]), #, "bert-base-cased"]), # , "bert-large-cased", "microsoft/deberta-v3-small", "distilbert-base-uncased", "bert-base-uncased"]),
    batch_size=Choice([8]),
    no_epochs=Choice([4]),
    learning_rate=Choice([5.5e-5, 5e-5, 4.5e-5, 4e-5, 5.5e-5, 6e-5, 3.5e-5, 6.5e-5]),
    warmup_steps=Choice([0]),
    weight_decay=Choice([0.0]),
    adam_beta1=Choice([0.9]),
    adam_beta2=Choice([0.999]),
    adam_epsilon=Choice([1e-8])
)

In [None]:
%%writefile environments/conda_dependencies_cpu_v2.yml

channels:
  - pytorch
  - anaconda
  - conda-forge
dependencies:
  - python=3.7
  - pip
  - pip:
      - azure-ai-ml
      - mlflow
      - azureml-mlflow
      - nvitop
      - transformers
      - joblib
      - datasets
  - numpy
  - pandas
  - shap
  - scikit-learn
name: sdk_v2_cpu

## Setting Up the Training Environment
To set up the environment for training the model, we specify the conda dependencies and create an environment object. This object contains all the necessary packages and configurations needed for running the training script.

In [None]:
from azure.ai.ml.entities import Environment

env_name_non_gpu = 'sdk-v2-cpu'

try:
    env_list = list(ml_client.environments.list(name=env_name_non_gpu))
    env_v2 = env_list[0]
except:
    env_v2 = Environment(
        image="mcr.microsoft.com/azureml/curated/sklearn-0.24-ubuntu18.04-py37-cpu:latest",
        conda_file='environments/conda_dependencies_cpu_v2.yml',
        name=env_name_non_gpu,
        description='This environment is curated to run sdk v2 for cpu base use-cases'
    )

    ml_client.environments.create_or_update(env_v2)


## Model Registration Job Definition
The model registration job is defined to register the best model from the hyperparameter tuning step. This step makes the trained model available for deployment and further use in the Azure Machine Learning workspace

In [None]:
model_name='sentiment_classifier_sdk_v2'

job_register = command(
    inputs=dict(
        model_path=Input(type="custom_model"),
        model_name=model_name,
        subscription_id=subscription_id,
        resource_group=resource_group,
        workspace=workspace
    ),
    outputs=dict(
        linkage_data=Output(type="custom_model")
    ),
    compute=cpu_compute_target,
    environment=env_v2,
    code=source_directory, # location of source code
    command="""
    python register_model.py \
        --model-path ${{inputs.model_path}} \
        --model-name ${{inputs.model_name}} \
        --subscription-id ${{inputs.subscription_id}} \
        --resource-group ${{inputs.resource_group}} \
        --workspace ${{inputs.workspace}} \
        --model-data ${{outputs.linkage_data}}
    """,
    display_name="Register_Best_Model",
)


## Model Deployment Job Definition
In the model deployment section, we define the model deployment job, which deploys the registered model as a web service. This allows users to access and use the sentiment analysis model through a REST API. 

In [None]:
job_deploy = command(
    inputs=dict(
        endpoint_name='sentiment-endpoint-sdkv2',
        linkage_data=Input(type="custom_model"),
        model_name=model_name,
        environment_name=env_name,
        subscription_id=subscription_id,
        resource_group=resource_group,
        workspace=workspace
    ),
    compute=cpu_compute_target,
    environment=env_v2,
    code=source_directory, # location of source code
    command="""
    python deploy_model.py \
        --endpoint-name ${{inputs.endpoint_name}} \
        --model-name ${{inputs.model_name}} \
        --environment-name ${{inputs.environment_name}} \
        --subscription-id ${{inputs.subscription_id}} \
        --resource-group ${{inputs.resource_group}} \
        --workspace ${{inputs.workspace}} \
        --model-data ${{inputs.linkage_data}}
    """,
    display_name="Deploy_Latest_Model",
)



## Creating and Chaining the Pipeline with the list of job steps
create the pipeline by chaining the hyperparameter tuning, model registration, and deployment jobs

In [None]:
from azure.ai.ml.dsl import pipeline

@pipeline()
def pipeline_construction():
    """The hello world pipeline job."""
    hyper_drive = job_train_for_sweep.sweep(
        compute=compute_target,
        sampling_algorithm="random",
        primary_metric="test_f1_weighted",
        goal="Maximize",
        max_total_trials=1,
        max_concurrent_trials=1,
        early_termination_policy=BanditPolicy(
            slack_factor=0.1, evaluation_interval=5
        ),
    )

    reg_step = job_register(model_path=hyper_drive.outputs.model_output)
    dep_step = job_deploy(linkage_data=reg_step.outputs.linkage_data)

In [None]:
pipeline_job = pipeline_construction()


In [None]:
pipeline_job = ml_client.jobs.create_or_update(pipeline_job, experiment_name=experiment_name)