# Distributed training using torch.distributed.launch module on Azure Machine Learning


This example show how to train language model using the huggingface library  distributed on Azure Machine Learning using pytorch estimator.

In [47]:
%load_ext autoreload
%autoreload 2


import wget
import os

from azureml.core import (Workspace, Experiment, 
                          VERSION, Datastore)
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.environment import Environment,CondaDependencies
from azureml.train.dnn import PyTorch,Nccl
from azureml.data.data_reference import DataReference
from azureml.core.compute_target import ComputeTargetException
from azureml.widgets import RunDetails

SUBSCRIPTION_ID = ""
RESOURCE_GROUP = ""
WORKSPACE_NAME = ""

EXP_NAME = 'Azureml-LM_huggingface_example'
CLUSTER_NAME = "hf-cluster"

RUN_DIR = os.getcwd()
DATA_DIR = 'data'

print("SDK version:", VERSION)

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
SDK version: 1.1.5


In [48]:
ws = Workspace(subscription_id = SUBSCRIPTION_ID, 
               resource_group =RESOURCE_GROUP , 
               workspace_name = WORKSPACE_NAME
              )


exp = Experiment(workspace=ws, name=EXP_NAME)

In [49]:
os.makedirs(DATA_DIR, exist_ok=True)
wget.download("https://s3.amazonaws.com/research.metamind.io/wikitext/wikitext-2-raw-v1.zip",
              out=DATA_DIR
             ) 

'data/wikitext-2-raw-v1.zip'

In [50]:
datastore = ws.get_default_datastore()
ds_reference = datastore.upload(src_dir='data',
                 target_path='wikitext',
                 overwrite=True,
                 show_progress=True)


Uploading an estimated of 1 files
Uploading data/wikitext-2-raw-v1.zip
Uploaded data/wikitext-2-raw-v1.zip, 1 files out of an estimated total of 1
Uploaded 1 files


In [51]:
from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget



found = False
cts = ws.compute_targets
if CLUSTER_NAME in cts and cts[CLUSTER_NAME].type == 'AmlCompute':
    found = True
    print('Found existing compute target.')
    compute_target = cts[CLUSTER_NAME]

if not found:
    print('Creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size =  'Standard_NC12',max_nodes = 8)

    # Create the cluster.\n",
    compute_target = ComputeTarget.create(ws, CLUSTER_NAME, provisioning_config)

print('Checking cluster status...')
compute_target.wait_for_completion(show_output = True, min_node_count = None, timeout_in_minutes = 20)

Found existing compute target.
Checking cluster status...
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


In [52]:
%%writefile $RUN_DIR/train.py

import os
import shutil 
import argparse
import subprocess
from git.repo.base import Repo
from zipfile import ZipFile


WORK_DIR = 'examples'
SRC_DIR = '/transformers'
OUTPUT_DIR = os.path.join(os.getcwd(),'outputs')
DATA_DIR = os.path.join(os.getcwd(),'wikitext-2-raw')

REPO_URL="https://github.com/datashinobi/transformers.git"
BRANCH='yassine/aml_distributed'

LOCAL_RANK = '0'

parser = argparse.ArgumentParser()
parser.add_argument('--dataset-path', dest='ds_path')
parser.add_argument('--rank', type=str,help='rank within nodes')
parser.add_argument('--node_count', type=str,help='number of nodes')
parser.add_argument('--process_per_node', type=str,help='number of process per node')
parser.add_argument('--batch_size', type=str,help='training & eval batch size')

args = parser.parse_args()

#============Clone forked repo==========
if os.path.exists(SRC_DIR):
    print("huggingface repo exists, skip cloning")
else:
    print('clone huggingface repo..........')
    Repo.clone_from(REPO_URL,SRC_DIR, branch=BRANCH)

#===============Unzip dataset=============
data_file = os.path.join(args.ds_path,"wikitext-2-raw-v1.zip")
with ZipFile(data_file,"r") as zip_file:
    zip_file.extractall(os.getcwd())
print(os.listdir(DATA_DIR))

#===========start training=================
master_node_params = os.environ['AZ_BATCH_MASTER_NODE'].split(':')
print("MASTER node", master_node_params)
master_ip = master_node_params[0]
master_port = master_node_params[1]

process = subprocess.Popen(['python', '-m', 'torch.distributed.launch',\
                            '--nnodes',args.node_count,\
                            '--nproc_per_node',args.process_per_node,\
                            '--node_rank', args.rank,\
                            '--master_addr',master_ip,\
                            '--master_port',master_port,\
                            os.path.join(SRC_DIR, WORK_DIR, 'run_language_modeling.py'),\
                            '--output_dir', OUTPUT_DIR,\
                            '--model_type', 'roberta', \
                            '--model_name_or_path', 'roberta-base', \
                            '--do_train', \
                            '--train_data_file', os.path.join(DATA_DIR, 'wiki.train.raw'),\
                            '--do_eval', \
                            '--eval_data_file', os.path.join(DATA_DIR, 'wiki.test.raw'),\
                            '--mlm',\
                            '--local_rank', LOCAL_RANK,\
                            '--per_gpu_train_batch_size', args.batch_size,\
                            '--per_gpu_eval_batch_size', args.batch_size
                             ],
                           stdout=subprocess.PIPE,
                           stderr=subprocess.STDOUT
                        )

lines_iterator = iter(process.stdout.readline, b"")
while process.poll() is None:
    for line in lines_iterator:
        print(line, end = "\r\n",flush =True)   

Overwriting /extdrive1/home/sasuke/dev/amlsamples/language_model_distributed/train.py


In [53]:
node_count = 8
process_per_node = 1

script_params = {
    '--dataset-path':ds_reference.as_mount(),
    '--rank':'$AZ_BATCHAI_TASK_INDEX',
    '--node_count':node_count,
    '--process_per_node':process_per_node,
    '--batch_size':'4'
}



from azureml.train.estimator import Estimator
est = PyTorch(source_directory=RUN_DIR,
                pip_packages=['gitpython','scikit-learn','seqeval','tensorboardX',\
                              'tqdm','transformers'],
                script_params=script_params,
                use_gpu=True,
                compute_target=compute_target,
                entry_script=os.path.join(RUN_DIR,'train.py'),
                framework_version='1.4',
                node_count=node_count,
                distributed_training=Nccl()
            )

In [54]:
run = exp.submit(est)
RunDetails(run).show()

_UserRunWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', '…

In [55]:
run

Experiment,Id,Type,Status,Details Page,Docs Page
Azureml-LM_huggingface_example,Azureml-LM_huggingface_example_1584361670_a864a6cc,azureml.scriptrun,Starting,Link to Azure Machine Learning studio,Link to Documentation
