*Copyright (c) Microsoft Corporation. All rights reserved.*

*Licensed under the MIT License.*

# Text Classification of MultiNLI Sentences using BERT with Azure ML Pipelines

In [1]:
import sys
sys.path.append("../../")
import os
import json
import random
import shutil
import pandas as pd

from utils_nlp.bert.common import Language, Tokenizer
from utils_nlp.azureml import azureml_utils
from utils_nlp.dataset.multinli import get_generator

from sklearn.preprocessing import LabelEncoder
import azureml.core
from azureml.core import Datastore, Experiment,  get_run
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration
from azureml.core.compute import ComputeTarget,  AmlCompute
from azureml.exceptions import ComputeTargetException
from azureml.data.data_reference import DataReference
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.widgets import RunDetails
from azureml.train.dnn import PyTorch
from azureml.core.runconfig import MpiConfiguration
from azureml.pipeline.steps import EstimatorStep

print("System version: {}".format(sys.version))
print("Azure ML SDK Version:", azureml.core.VERSION)

System version: 3.6.8 |Anaconda, Inc.| (default, Feb 21 2019, 18:30:04) [MSC v.1916 64 bit (AMD64)]
Azure ML SDK Version: 1.0.48


## 0. Introduction

In this notebook, we fine-tune and evaluate a pretrained [BERT](https://arxiv.org/abs/1810.04805) model on a subset of the [MultiNLI](https://www.nyu.edu/projects/bowman/multinli/) dataset using [AzureML](https://azure.microsoft.com/en-us/services/machine-learning-service/) Pipelines.

We use a [distributed sequence classifier](../../utils_nlp/bert/sequence_classification_distributed.py) that wraps [Hugging Face's PyTorch implementation](https://github.com/huggingface/pytorch-pretrained-BERT) of Google's [BERT](https://github.com/google-research/bert).

The notebooks acts as a template to,
1. Process a massive dataset in parallel by dividing the dataset into chunks using [DASK](https://dask.org/) .
2. Perform distributed training on AzureML compute on these processed chunks.

In [2]:
LABEL_COL = "genre"
TEXT_COL = "sentence1"
DATA_FOLDER = "../../data/temp"
TRAIN_FOLDER = "../../data/temp/train"
TEST_FOLDER = "../../data/temp/test"
BERT_CACHE_DIR = "../../data/temp"
LANGUAGE = Language.ENGLISH
TO_LOWER = True
MAX_LEN = 150
BATCH_SIZE = 32
NUM_GPUS = 2
NUM_EPOCHS = 1
TRAIN_SIZE = 0.6
TEXT_COL = "sentence1"
ENCODED_LABEL_COL = "label"
TOKEN_COL = "tokens"
MASK_COL = "mask"
NUM_PARTITIONS = None
LABELS = ['telephone', 'government', 'travel', 'slate', 'fiction']
PROJECT_FOLDER = "../../"

In this example we will use AzureML pipelines to execute training pipelines. Each preprocessing step is included as a step in the pipeline. For a more detailed walkthrough of what pipelines are with a getting started guidelines check this [notebook](https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-getting-started.ipynb). We start by doing some AzureML related setup below.

### 0.1 Create a workspace

First, go through the [Configuration](https://github.com/Azure/MachineLearningNotebooks/blob/master/configuration.ipynb) notebook to install the Azure Machine Learning Python SDK and create an Azure ML `Workspace`. This will create a config.json file containing the values needed below to create a workspace.

**Note**: you do not need to fill in these values if you have a config.json in the same folder as this notebook

In [3]:
ws = azureml_utils.get_or_create_workspace(
    subscription_id="<SUBSCRIPTION_ID>",
    resource_group="<RESOURCE_GROUP>",
    workspace_name="<WORKSPACE_NAME>",
    workspace_region="<WORKSPACE_REGION>",
)

Performing interactive authentication. Please follow the instructions on the terminal.




Interactive authentication successfully completed.


### 0.2 Create a compute target

In [4]:
# choose your cluster
cluster_name = "pipelines-tc-12"

try:
    compute_target = ComputeTarget(workspace=ws, name=cluster_name)
    print("Found existing compute target.")
except ComputeTargetException:
    print("Creating a new compute target...")
    compute_config = AmlCompute.provisioning_configuration(
        vm_size="STANDARD_NC12", max_nodes=8
    )

    # create the cluster
    compute_target = ComputeTarget.create(ws, cluster_name, compute_config)

    compute_target.wait_for_completion(show_output=True)

# use get_status() to get a detailed status for the current AmlCompute.
print(compute_target.get_status().serialize())

Found existing compute target.
{'currentNodeCount': 2, 'targetNodeCount': 2, 'nodeStateCounts': {'preparingNodeCount': 0, 'runningNodeCount': 0, 'idleNodeCount': 2, 'unusableNodeCount': 0, 'leavingNodeCount': 0, 'preemptedNodeCount': 0}, 'allocationState': 'Steady', 'allocationStateTransitionTime': '2019-07-30T23:55:09.394000+00:00', 'errors': None, 'creationTime': '2019-07-25T04:16:20.598768+00:00', 'modifiedTime': '2019-07-25T04:16:36.486727+00:00', 'provisioningState': 'Succeeded', 'provisioningStateTransitionTime': None, 'scaleSettings': {'minNodeCount': 2, 'maxNodeCount': 10, 'nodeIdleTimeBeforeScaleDown': 'PT120S'}, 'vmPriority': 'Dedicated', 'vmSize': 'STANDARD_NC12'}


## 1. Preprocessing

The pipeline is defined by a series of steps, the first being a PythonScriptStep which utilizes [DASK](https://dask.org/) to load dataframes in partitions allowing us to load and preprocess different sets of data in parallel.

### 1.1 Read Dataset

In [5]:
train_batches = get_generator(DATA_FOLDER, "train", num_batches=NUM_PARTITIONS, batch_size=10e6)
test_batches = get_generator(DATA_FOLDER, "dev_matched", num_batches=NUM_PARTITIONS, batch_size=10e6)

### 1.2 Preprocess and Tokenize

In the classification task, we use the first sentence only as the text input, and the corresponding genre as the label. Select the examples corresponding to one of the entailment labels (*neutral* in this case) to avoid duplicate rows, as the sentences are not unique, whereas the sentence pairs are.

Once filtered, we encode the labels. To do this, fit a label encoder with the known labels in a MNLI dataset.

In [6]:
os.makedirs(TRAIN_FOLDER, exist_ok=True)
os.makedirs(TEST_FOLDER, exist_ok=True)

labels = LABELS
label_encoder = LabelEncoder()
label_encoder.fit(labels)

num_train_partitions = 0
for batch in train_batches:
    batch = batch[batch["gold_label"]=="neutral"]
    batch[ENCODED_LABEL_COL] = label_encoder.transform(batch[LABEL_COL])
    batch.to_csv(TRAIN_FOLDER+"/batch{}.csv".format(str(num_train_partitions)))
    num_train_partitions += 1
    
num_test_partitions = 0
for batch in test_batches:
    batch = batch[batch["gold_label"]=="neutral"]
    batch[ENCODED_LABEL_COL] = label_encoder.transform(batch[LABEL_COL])
    batch.to_csv(TEST_FOLDER+"/batch{}.csv".format(str(num_test_partitions)))
    num_test_partitions += 1

Once we have the partitions of data ready they are uploaded to the datastore.

In [None]:
ds = ws.get_default_datastore()
ds.upload(src_dir=TRAIN_FOLDER, target_path="mnli_data/train", overwrite=True, show_progress=False)
ds.upload(src_dir=TEST_FOLDER, target_path="mnli_data/test", overwrite=True, show_progress=False)

In [None]:
shutil.rmtree(TRAIN_FOLDER)
shutil.rmtree(TEST_FOLDER)

We can now parallely operate on each batch to tokenize the data and preprocess the tokens. To do this, we create a PythonScript step below.

In [7]:
%%writefile preprocess.py
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import argparse
import logging
import os

import pandas as pd

from utils_nlp.bert.common import Language, Tokenizer

LABEL_COL = "genre"
TEXT_COL = "sentence1"
LANGUAGE = Language.ENGLISH
TO_LOWER = True
MAX_LEN = 150

logger = logging.getLogger(__name__)


def tokenize(df):
    """Tokenize the text documents and convert them to lists of tokens using the BERT tokenizer.
    Args:
        df(pd.Dataframe): Dataframe with training or test samples

    Returns:

        list: List of lists of tokens for train set.

    """
    tokenizer = Tokenizer(
        LANGUAGE, to_lower=TO_LOWER)
    tokens = tokenizer.tokenize(list(df[TEXT_COL]))

    return tokens


def preprocess(tokens):
    """ Preprocess method that does the following,
            Convert the tokens into token indices corresponding to the BERT tokenizer's vocabulary
            Add the special tokens [CLS] and [SEP] to mark the beginning and end of a sentence
            Pad or truncate the token lists to the specified max length
            Return mask lists that indicate paddings' positions
            Return token type id lists that indicate which sentence the tokens belong to (not needed
            for one-sequence classification)

    Args:
        tokens(pd.Dataframe): Dataframe with tokens for train set.

    Returns:
        list: List of lists of tokens for train or test set with special tokens added.
        list: Input mask.
    """
    tokenizer = Tokenizer(
        LANGUAGE, to_lower=TO_LOWER)
    tokens, mask, _ = tokenizer.preprocess_classification_tokens(
        tokens, MAX_LEN
    )

    return tokens, mask


parser = argparse.ArgumentParser()
parser.add_argument("--input_data", type=str, help="input data")
parser.add_argument("--output_data", type=str, help="Path to the output file.")

args = parser.parse_args()
input_data = args.input_data
output_data = args.output_data
output_dir = os.path.dirname(os.path.abspath(output_data))

if output_dir is not None:
    os.makedirs(output_dir, exist_ok=True)
    logger.info("%s created" % output_dir)

df = pd.read_csv(args.input_data)
tokens_array = tokenize(df)
tokens_array, mask_array = preprocess(tokens_array)

df['tokens'] = tokens_array
df['mask'] = mask_array

# Filter columns
cols = ['tokens', 'mask', 'label']
df = df[cols]
df.to_csv(output_data, header=False, index=False)
logger.info("Completed")

Writing preprocess.py


In [8]:
preprocess_file = os.path.join(PROJECT_FOLDER,'utils_nlp/bert/preprocess.py')
shutil.move('preprocess.py',preprocess_file)

'../../utils_nlp/bert/preprocess.py'

Create a conda environment for the steps below.

In [9]:
conda_dependencies = CondaDependencies.create(
    conda_packages=[
        "numpy",
        "scikit-learn",
        "pandas",
    ],
    pip_packages=["azureml-sdk==1.0.43.*", 
                  "torch==1.1", 
                  "tqdm==4.31.1",
                 "pytorch-pretrained-bert>=0.6"],
    python_version="3.6.8",
)
run_config = RunConfiguration(conda_dependencies=conda_dependencies)
run_config.environment.docker.enabled = True

Then create the list of steps that use the preprocess.py created above. We use the output of these steps as input to training in the next section.

In [10]:
processed_train_files = []
processed_test_files = []
ds = ws.get_default_datastore()

for i in range(num_train_partitions):
        input_data = DataReference(datastore=ds, 
                                   data_reference_name='train_batch_{}'.format(str(i)), 
                                   path_on_datastore='mnli_data/train/batch{}.csv'.format(str(i)),
                                   overwrite=False)

        output_data = PipelineData(name="train{}".format(str(i)), datastore=ds,
                       output_path_on_compute='mnli_data/processed_train/batch{}.csv'.format(str(i)))

        step = PythonScriptStep(
            name='preprocess_step_train_{}'.format(str(i)),
            arguments=["--input_data", input_data, "--output_data", output_data],
            script_name= 'utils_nlp/bert/preprocess.py',
            inputs=[input_data],
            outputs=[output_data],
            source_directory=PROJECT_FOLDER,
            compute_target=compute_target,
            runconfig=run_config,
            allow_reuse=False,
        )
        
        processed_train_files.append(output_data)         
            
for i in range(num_test_partitions):
            input_data = DataReference(datastore=ds, 
                                       data_reference_name='test_batch_{}'.format(str(i)), 
                                       path_on_datastore='mnli_data/test/batch{}.csv'.format(str(i)),
                                       overwrite=False)
        
            output_data = PipelineData(name="test{}".format(str(i)), datastore=ds,
                        output_path_on_compute='mnli_data/processed_test/batch{}.csv'.format(str(i)))
            
            step = PythonScriptStep(
                name='preprocess_step_test_{}'.format(str(i)),
                arguments=["--input_data", input_data, "--output_data", output_data],
                script_name= 'utils_nlp/bert/preprocess.py',
                inputs=[input_data],
                outputs=[output_data],
                source_directory=PROJECT_FOLDER,
                compute_target=compute_target,
                runconfig=run_config,
                allow_reuse=False,
            )
            
            processed_test_files.append(output_data)

## 2. Train and Score

Once the data is processed and available on datastore, we  train the classifier using the training examples. This involves fine-tuning the BERT Transformer and learning a linear classification layer on top of that. After training is complete we score the performance of the model on the test dataset

The training is distributed and is done AzureML's capability to support distributed using MPI with horovod. 


### 2.1 Setup training script

In [11]:
%%writefile train.py
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import argparse
import json
import logging
import os
import torch
from sklearn.metrics import classification_report

from utils_nlp.bert.common import Language
from utils_nlp.bert.sequence_classification_distributed import (
    BERTSequenceDistClassifier,
)
from utils_nlp.common.timer import Timer

BATCH_SIZE = 32
NUM_GPUS = 2
NUM_EPOCHS = 1
LABELS = ["telephone", "government", "travel", "slate", "fiction"]
OUTPUT_DIR = "./outputs/"

logger = logging.getLogger(__name__)

parser = argparse.ArgumentParser()
parser.add_argument(
    "--train_files",
    nargs="+",
    default=[],
    help="List of file paths to all the files in train dataset.",
)

parser.add_argument(
    "--test_files",
    nargs="+",
    default=[],
    help="List of file paths to all the files in test dataset.",
)

args = parser.parse_args()
train_files = [file.strip() for file in args.train_files]
test_files = [file.strip() for file in args.test_files]

# Handle square brackets from train list
train_files[0] = train_files[0][1:]
train_files[len(train_files) - 1] = train_files[len(train_files) - 1][:-1]

# Handle square brackets from test list
test_files[0] = test_files[0][1:]
test_files[len(test_files) - 1] = test_files[len(test_files) - 1][:-1]

# Train
classifier = BERTSequenceDistClassifier(
    language=Language.ENGLISH, num_labels=len(LABELS)
)
with Timer() as t:
    classifier.fit(
        train_files,
        num_gpus=NUM_GPUS,
        num_epochs=NUM_EPOCHS,
        batch_size=BATCH_SIZE,
        verbose=True,
    )

# Predict
preds, labels_test = classifier.predict(
    test_files, num_gpus=NUM_GPUS, batch_size=BATCH_SIZE
)

results = classification_report(
    labels_test, preds, target_names=LABELS, output_dict=True
)

# Write out results.
result_file = os.path.join(OUTPUT_DIR, "results.json")
with open(result_file, "w+") as fp:
    json.dump(results, fp)

# Save model
model_file = os.path.join(OUTPUT_DIR, "model.pt")
torch.save(classifier.model.state_dict(), model_file)

Writing train.py


In [12]:
train_file = os.path.join(PROJECT_FOLDER,'utils_nlp/bert/train.py')
shutil.move('train.py',train_file)

'../../utils_nlp/bert/train.py'

### 2.2 Create a Pytorch Estimator

We create a Pytorch Estimator using AzureML SDK and additonally define an EstimatorStep to run it on AzureML pipelines.

In [13]:
estimator = PyTorch(source_directory=PROJECT_FOLDER,
                    compute_target=compute_target,
                    entry_script='utils_nlp/bert/train.py',
                    node_count=4,
                    distributed_training=MpiConfiguration(),
                    process_count_per_node=2,
                    use_gpu=True,
                    conda_packages=['scikit-learn=0.20.3', 'numpy>=1.16.0', 'pandas'],
                    pip_packages=["tqdm==4.31.1","pytorch-pretrained-bert>=0.6"]
                   )



In [14]:
inputs = processed_train_files + processed_test_files

est_step = EstimatorStep(name="Estimator-Train", 
                         estimator=estimator, 
                         estimator_entry_script_arguments=[
                             '--train_files',  str(processed_train_files),
                             '--test_files', str(processed_test_files)],
                         inputs = inputs,
                         runconfig_pipeline_params=None, 
                         compute_target=compute_target)

### 2.3 Submit the pipeline

The model is fine tuned on AML Compute and takes ~45 minutes to train.

In [15]:
pipeline = Pipeline(workspace=ws, steps=[est_step])
experiment = Experiment(ws, 'NLP-TC-BERT-distributed')
pipeline_run = experiment.submit(pipeline)

Created step Estimator-Train [473f7d30][ab68a3b4-3d2b-4a49-a4c6-1b0b791b1e88], (This step is eligible to reuse a previous run's output)
Created step preprocess_step_train_0 [9fd37e6c][366a60a5-3853-4dac-a091-403789f8b34d], (This step will run and generate new outputs)
Created step preprocess_step_train_1 [74d43aed][b87722e9-b634-4369-b401-40a478dbc512], (This step will run and generate new outputs)
Created step preprocess_step_train_2 [0e17f205][2cda9c54-0172-4dad-8a11-bf2be1b8f1a6], (This step will run and generate new outputs)
Created step preprocess_step_train_3 [be4b9aeb][f000b8dd-3bb1-464a-bdd7-787939e8a1f7], (This step will run and generate new outputs)
Created step preprocess_step_train_4 [b3ca2f90][8341040b-bebf-4bcf-89a8-63242911e135], (This step will run and generate new outputs)
Created step preprocess_step_train_5 [38afb135][1e1929f9-986f-45e1-93d6-860d75f37858], (This step will run and generate new outputs)
Created step preprocess_step_train_6 [4e9145ef][d3914ba2-d13b-4d0c

Using data reference train_batch_7 for StepId [0d0b8edc][d2cb8ea4-4650-4143-85cc-3a50b2d92fb3], (Consumers of this data are eligible to reuse prior runs.)
Using data reference train_batch_8 for StepId [8ead4f4d][0b8ae803-5f49-4361-9c27-156a4f69486b], (Consumers of this data are eligible to reuse prior runs.)
Using data reference train_batch_9 for StepId [1f979fec][2d50bd14-2fb0-4742-8805-3bd1dc19c3c3], (Consumers of this data are eligible to reuse prior runs.)
Using data reference train_batch_10 for StepId [4e505f04][cba68341-2c4a-432e-a4bc-38af276684a6], (Consumers of this data are eligible to reuse prior runs.)
Using data reference train_batch_11 for StepId [47aafb86][f7250703-3bad-4db6-9c4a-e68c98565f0a], (Consumers of this data are eligible to reuse prior runs.)
Using data reference train_batch_12 for StepId [c61464e0][1e833559-7b27-422d-ad18-c7b8421af99a], (Consumers of this data are eligible to reuse prior runs.)
Using data reference train_batch_13 for StepId [fc80ef2f][c28e0d83-

In [16]:
RunDetails(pipeline_run).show()

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …



In [None]:
#If you would like to cancel the job for any reasons uncomment the code below.
#pipeline_run.cancel()

In [None]:
#wait for the run to complete before continuing in the notebook
pipeline_run.wait_for_completion()

### 2.4 Download and analyze results

In [18]:
step_run = pipeline_run.find_step_run("Estimator-Train")[0]
file_names = ['outputs/results.json', 'outputs/model.pt']
azureml_utils.get_output_files(step_run, './outputs', file_names=file_names)

Downloading file outputs/results.json to ./outputs\results.json...
Downloading file outputs/model.pt to ./outputs\model.pt...


In [19]:
with open('outputs/results.json', 'r') as handle:
    parsed = json.load(handle)
    print(pd.DataFrame.from_dict(parsed).transpose())

              f1-score  precision    recall  support
telephone     0.915902   0.882180  0.952305    629.0
government    0.947368   0.948161  0.946578    599.0
travel        0.841503   0.898778  0.791091    651.0
slate         0.991093   0.991896  0.990291    618.0
fiction       0.942097   0.920489  0.964744    624.0
micro avg     0.927587   0.927587  0.927587   3121.0
macro avg     0.927593   0.928301  0.929002   3121.0
weighted avg  0.926549   0.927690  0.927587   3121.0


Finally clean up any intermediate files we created.

In [20]:
os.remove(train_file)
os.remove(preprocess_file)