In [1]:
import sys
sys.path.append("../..")

import os
import shutil

import torch
import azureml.core
from azureml.train.dnn import PyTorch
from azureml.core.runconfig import MpiConfiguration
from azureml.core import Experiment
from azureml.widgets import RunDetails
from utils_nlp.azureml.azureml_utils import get_or_create_workspace

In [2]:

    ws = get_or_create_workspace(
    subscription_id="15ae9cb6-95c1-483d-a0e3-b1a1a3b06324",
    resource_group="nlprg",
    workspace_name="MAIDAIPBERT-eastus",
    workspace_region="East US",
)


In [None]:
'''
    ws = get_or_create_workspace(
    subscription_id="15ae9cb6-95c1-483d-a0e3-b1a1a3b06324",
    resource_group="nlprg",
    workspace_name="MAIDAIPBERT-eastus",
    workspace_region="East US",
)
'''


In [3]:
print("Workspace name: {}".format(ws.name))
print("Resource group: {}".format(ws.resource_group))

Workspace name: MAIDAIPBERT-eastus
Resource group: nlprg


In [4]:
cluster_name = "bertncrs24"
#cluster_name = "gpu-entail"

In [5]:
from azureml.core.compute import ComputeTarget

In [6]:
try:
    compute_target = ComputeTarget(workspace=ws, name=cluster_name)
    print("Found compute target: {}".format(cluster_name))
except ComputeTargetException:
    print("Creating new compute target: {}".format(cluster_name))
    compute_config = AmlCompute.provisioning_configuration(
        vm_size="STANDARD_NC6", max_nodes=1
    )
    compute_target = ComputeTarget.create(ws, cluster_name, compute_config)
    compute_target.wait_for_completion(show_output=True)


print(compute_target.get_status().serialize())

Found compute target: bertncrs24
{'currentNodeCount': 0, 'targetNodeCount': 0, 'nodeStateCounts': {'preparingNodeCount': 0, 'runningNodeCount': 0, 'idleNodeCount': 0, 'unusableNodeCount': 0, 'leavingNodeCount': 0, 'preemptedNodeCount': 0}, 'allocationState': 'Steady', 'allocationStateTransitionTime': '2019-07-29T19:42:36.790000+00:00', 'errors': None, 'creationTime': '2019-07-12T19:59:45.933132+00:00', 'modifiedTime': '2019-07-12T20:00:01.793458+00:00', 'provisioningState': 'Succeeded', 'provisioningStateTransitionTime': None, 'scaleSettings': {'minNodeCount': 0, 'maxNodeCount': 4, 'nodeIdleTimeBeforeScaleDown': 'PT120S'}, 'vmPriority': 'Dedicated', 'vmSize': 'STANDARD_NC24RS_V3'}


In [7]:
DEBUG = True
project_dir = "./entail_utils"
if DEBUG and os.path.exists(project_dir): 
    shutil.rmtree(project_dir) 
shutil.copytree("../../utils_nlp", os.path.join(project_dir, "utils_nlp"))

'./entail_utils\\utils_nlp'

In [8]:
%%writefile $project_dir/train.py

import horovod.torch as hvd
import torch
import numpy as np
import time
import argparse
from torch.utils.data import DataLoader, SequentialSampler
from utils_nlp.dataset.xnli_torch_dataset import XnliDataset
from utils_nlp.models.bert.common import Language
from pytorch_pretrained_bert.optimization import BertAdam
from utils_nlp.models.bert.sequence_classification_distributed import BERTSequenceClassifier
from sklearn.metrics import classification_report

from azureml.core.run import Run
# get the Azure ML run object
run = Run.get_context()

print("Torch version:", torch.__version__)

hvd.init()

LANGUAGE_ENGLISH = "en"
#CACHE_DIR = "./xnli_data3"
TRAIN_FILE_SPLIT = "train"
TEST_FILE_SPLIT = "test"
TO_LOWERCASE = True 
PRETRAINED_BERT_LNG = Language.ENGLISH

# optimizer configurations
LEARNING_RATE= 5e-5
WARMUP_PROPORTION= 0.1
BATCH_SIZE = 32

NUM_GPUS = 4

## each machine gets it's own copy of data
CACHE_DIR = './xnli-data-%d' % hvd.rank()
print("======= cache dir =======================", CACHE_DIR)

parser = argparse.ArgumentParser()
# Training settings
parser.add_argument('--seed', type=int, default=42, metavar='S',help='random seed (default: 42)')
parser.add_argument('--epochs', type=int, default=2, metavar='N', help='number of epochs to train (default: 2)')
parser.add_argument('--num_workers', type=int, default=2, metavar='N', help='number of workers to train (default: 2)')
parser.add_argument('--no-cuda', action='store_true', default=False,help='disables CUDA training')


args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()
print(args.cuda)

'''
For example, you have 4 nodes and 4 GPUs each node, so you spawn 16 workers. 
Every worker will have a rank [0, 15], and every worker will have a local_rank [0, 3].
You use local_rank for GPU pinning because there's typically one GPU available on the node per process. 
It wouldn't make sense to use rank here because rank could be 10, but you only have 4 GPUs so there is no GPU 10.
In the MNIST example, 
we give each worker a separate copy of the data and give it a unique name based on the rank of the worker 
that will be using that data. If all the data is being copied to local disk, then you could use local_rank here, 
but often we use a shared filesystem, so if you used local_rank, processes would be overwriting 
each other's data during the download process.
'''

if args.cuda:
    torch.cuda.set_device(hvd.local_rank())
    torch.cuda.manual_seed(args.seed)

#kwargs = {}
kwargs = {'num_workers': 4, 'pin_memory': True} if args.cuda else {}

train_dataset = XnliDataset(file_split=TRAIN_FILE_SPLIT, 
                            cache_dir=CACHE_DIR, 
                            language=LANGUAGE_ENGLISH,
                            to_lowercase=TO_LOWERCASE,
                            tok_language=PRETRAINED_BERT_LNG)

train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader =  DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=train_sampler, **kwargs)
    
#set the label_encoder for evaluation datset
label_encoder = train_dataset.label_encoder
num_labels = len(np.unique(train_dataset.labels))

classifier = BERTSequenceClassifier(language=PRETRAINED_BERT_LNG,
                                            num_labels=num_labels,
                                            cache_dir=CACHE_DIR,
                                            )

# optimizer configurations
num_samples = len(train_loader.dataset)
num_batches = int(num_samples/BATCH_SIZE)
num_workers = args.num_workers
num_train_optimization_steps = num_batches*args.epochs #int(num_batches/hvd.size()) * args.epochs 
optimizer_grouped_parameters = classifier.optimizer_params

print("================= num_train_optimization_steps ==============================")
print(num_train_optimization_steps)

lr=LEARNING_RATE * hvd.size()

bert_optimizer = BertAdam(optimizer_grouped_parameters,
                   lr=lr,
                   t_total=num_train_optimization_steps,
                   warmup=WARMUP_PROPORTION,)

if WARMUP_PROPORTION is None:
    print("================== Without Warmup proprtion ===========================")
    bert_optimizer = BertAdam(optimizer_grouped_parameters, lr=lr)
else:
    print("================== With Warmup proportion =============================")
    bert_optimizer = BertAdam(optimizer_grouped_parameters,
                   lr=lr,
                   t_total=num_train_optimization_steps,
                   warmup=WARMUP_PROPORTION,
                  )


## Distributed optimizer
bert_optimizer = hvd.DistributedOptimizer(bert_optimizer, classifier.model.named_parameters())
hvd.broadcast_parameters(classifier.model.state_dict(), root_rank=0)

#remove later
if(hvd.rank() == 0):
    print("===================== rank rank =======================", hvd.rank())
else:
    print("===== not master rank =================================")
    

classifier.fit(train_loader, bert_optimizer, args.epochs, NUM_GPUS, hvd.rank())

#evaluation
if(hvd.rank() == 0):
    NUM_GPUS = 0
    kwargs = {}
    test_dataset = XnliDataset(file_split=TEST_FILE_SPLIT,
                           cache_dir=CACHE_DIR,
                           language=LANGUAGE_ENGLISH,
                           to_lowercase=TO_LOWERCASE,
                           tok_language=PRETRAINED_BERT_LNG
                          )    
    #test_sampler = SequentialSampler(test_dataset)  
    test_loader = DataLoader(test_dataset, **kwargs)
    start_time = time.time()
    predictions = classifier.predict(test_loader, NUM_GPUS, BATCH_SIZE, probabilities=False)
    end_time = time.time()
    print("================= Time to predict ===========================")
    print(end_time - start_time)
    print('=================== Predictions =====================')
    print(predictions)

#     test_dict = next(iter(test_loader))
#     test_labels = test_dict['labels']
    
    test_labels = []
    for data in test_dataset:
        test_labels.append(data['labels'])
    
    print(len(test_labels))
    
    predictions= label_encoder.inverse_transform(predictions)
    print(classification_report(test_labels, predictions))

Writing ./entail_utils/train.py


In [9]:
NODE_COUNT = 2
mpiConfig=MpiConfiguration()
mpiConfig.process_count_per_node=4

est = PyTorch(
    source_directory=project_dir,
    compute_target=compute_target,
    entry_script="train.py",
    node_count=NODE_COUNT,
    distributed_training=mpiConfig,
    use_gpu=True,
    framework_version="1.0",
    conda_packages=["scikit-learn=0.20.3", "numpy", "spacy", "nltk"],
    pip_packages=["pandas","seqeval[gpu]", "pytorch-pretrained-bert"],
)

In [10]:
print("Azure ML SDK Version:", azureml.core.VERSION)

Azure ML SDK Version: 1.0.48


In [11]:
experiment = Experiment(ws, name="Nlp-Entailment-BERT")
run = experiment.submit(est)

In [12]:
RunDetails(run).show()

_UserRunWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', '…

In [None]:
run.cancel()

In [None]:
run.get_details

In [None]:
model = run.register_model(model_name='outputs', model_path='outputs')
print(model.name, model.id, model.version, sep='\t')

This statement downloads the model to local and you can use this to run predictions locally!

In [None]:
model.download(exist_ok=True)

## Prediction 