In [1]:
import sys
import os
import yaml
from datetime import datetime
snapshot_date = datetime.now().strftime("%Y-%m-%d")

sys.path.append(os.path.abspath(os.path.join('..')))
# sys.path.append(os.path.dirname(os.path.abspath(__file__)))

with open('./llama-fc_config.yaml') as f:
    d = yaml.load(f, Loader=yaml.FullLoader)
    
AZURE_SUBSCRIPTION_ID = d['config']['AZURE_SUBSCRIPTION_ID']
AZURE_RESOURCE_GROUP = d['config']['AZURE_RESOURCE_GROUP']
AZURE_WORKSPACE = d['config']['AZURE_WORKSPACE']
AZURE_DATA_NAME = d['config']['AZURE_SFT_DATA_NAME']    
DATA_DIR = d['config']['SFT_DATA_DIR']
CLOUD_DIR = d['config']['CLOUD_DIR']
HF_MODEL_NAME_OR_PATH = d['config']['HF_MODEL_NAME_OR_PATH']
IS_DEBUG = d['config']['IS_DEBUG']
USE_LOWPRIORITY_VM = d['config']['USE_LOWPRIORITY_VM']

azure_env_name = d['train']['azure_env_name']  
azure_compute_cluster_name = d['train']['azure_compute_cluster_name']
azure_compute_cluster_size = d['train']['azure_compute_cluster_size']

os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(CLOUD_DIR, exist_ok=True)

In [2]:
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)  # Set this to the lowest level you want to capture

# Create console handler with a higher log level
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)  # Set this to the lowest level you want to capture

# Create file handler which logs even debug messages
file_handler = logging.FileHandler("debug.log")
file_handler.setLevel(logging.DEBUG)  # Set this to the lowest level you want to capture

# Create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)

# Add the handlers to the logger
logger.addHandler(console_handler)
logger.addHandler(file_handler)

In [3]:
logger.info("===== 0. Azure ML Training Info =====")
logger.info(f"AZURE_SUBSCRIPTION_ID={AZURE_SUBSCRIPTION_ID}")
logger.info(f"AZURE_RESOURCE_GROUP={AZURE_RESOURCE_GROUP}")
logger.info(f"AZURE_WORKSPACE={AZURE_WORKSPACE}")
logger.info(f"AZURE_DATA_NAME={AZURE_DATA_NAME}")
logger.info(f"DATA_DIR={DATA_DIR}")
logger.info(f"CLOUD_DIR={CLOUD_DIR}")
logger.info(f"HF_MODEL_NAME_OR_PATH={HF_MODEL_NAME_OR_PATH}")
logger.info(f"IS_DEBUG={IS_DEBUG}")
logger.info(f"USE_LOWPRIORITY_VM={USE_LOWPRIORITY_VM}")
logger.info(f"azure_env_name={azure_env_name}")
logger.info(f"azure_compute_cluster_name={azure_compute_cluster_name}")
logger.info(f"azure_compute_cluster_size={azure_compute_cluster_size}")

2024-12-30 11:12:33,174 - __main__ - INFO - ===== 0. Azure ML Training Info =====
2024-12-30 11:12:33,232 - __main__ - INFO - AZURE_SUBSCRIPTION_ID=8cebb108-a4d5-402b-a0c4-f7556126277f
2024-12-30 11:12:33,238 - __main__ - INFO - AZURE_RESOURCE_GROUP=azure-ml-priya-demo
2024-12-30 11:12:33,244 - __main__ - INFO - AZURE_WORKSPACE=azure-ml-priya-westus3
2024-12-30 11:12:33,249 - __main__ - INFO - AZURE_DATA_NAME=sft-demo-data-function-call
2024-12-30 11:12:33,254 - __main__ - INFO - DATA_DIR=./dataset
2024-12-30 11:12:33,259 - __main__ - INFO - CLOUD_DIR=./cloud
2024-12-30 11:12:33,264 - __main__ - INFO - HF_MODEL_NAME_OR_PATH=unsloth/Llama-3.2-3B-Instruct
2024-12-30 11:12:33,269 - __main__ - INFO - IS_DEBUG=True
2024-12-30 11:12:33,274 - __main__ - INFO - USE_LOWPRIORITY_VM=False
2024-12-30 11:12:33,283 - __main__ - INFO - azure_env_name=slm-llama-acft-custom-env
2024-12-30 11:12:33,288 - __main__ - INFO - azure_compute_cluster_name=gpu-a100-demo-vm
2024-12-30 11:12:33,293 - __main__ - I

### 2. Training Preparation
#### 2.1 Configure Workspace Details
To connect to a workspace, we need identifying parameters - a subscription, a resource group, and a workspace name. We will use these details in the MLClient from azure.ai.ml to get a handle on the Azure Machine Learning workspace we need. We will use the default Azure authentication for this hands-on.

In [4]:
# import required libraries
import time
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient, Input
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import load_component
from azure.ai.ml import command
from azure.ai.ml.entities import Data, Environment, BuildContext
from azure.ai.ml.entities import Model
from azure.ai.ml import Input
from azure.ai.ml import Output
from azure.ai.ml.constants import AssetTypes
from azure.core.exceptions import ResourceNotFoundError, ResourceExistsError

credential = DefaultAzureCredential()
ml_client = None
try:
    ml_client = MLClient.from_config(credential)
except Exception as ex:
    print(ex)
    ml_client = MLClient(credential, AZURE_SUBSCRIPTION_ID, AZURE_RESOURCE_GROUP, AZURE_WORKSPACE)

Found the config file in: /config.json


#### 2.2. Load and prepare the dataset
#####
Training data can be used as a dataset stored in the local development environment, but can also be registered as AzureML data. For this hands-on session, we will register the data as AzureML Data asset and will use the registered dataset for training and inference

In [10]:
import re
from datasets import load_dataset, load_from_disk

def get_or_create_data_asset(ml_client, data_name, data_local_dir, update=False):
    
    try:
        latest_data_version = max([int(d.version) for d in ml_client.data.list(name=data_name)])
        if update:
            raise ResourceExistsError('Found Data asset, but will update the Data.')            
        else:
            data_asset = ml_client.data.get(name=data_name, version=latest_data_version)
            print(f"Found Data asset: {data_name}. Will not create again")
    except (ResourceNotFoundError, ResourceExistsError) as e:
        data = Data(
            path=data_local_dir,
            type=AssetTypes.URI_FOLDER,
            description=f"{data_name} for fine tuning",
            tags={"FineTuningType": "Instruction", "Language": "En"},
            name=data_name
        )
        data_asset = ml_client.data.create_or_update(data)
        print(f"Created Data asset: {data_name}")
        
    return data_asset

In [6]:
from datasets import load_dataset

dataset = load_dataset("glaiveai/glaive-function-calling-v2", split="train")

num_samples = len(dataset)

train_dataset = dataset.select(range(2000))
test_dataset = dataset.select(range(2000, 2200))
val_dataset = dataset.select(range(2200, 2700))

train_dataset.save_to_disk(f"{DATA_DIR}/train")
val_dataset.save_to_disk(f"{DATA_DIR}/val")
test_dataset.save_to_disk(f"{DATA_DIR}/test")

  from .autonotebook import tqdm as notebook_tqdm
Saving the dataset (1/1 shards): 100%|██████████| 2000/2000 [00:00<00:00, 21931.86 examples/s]
Saving the dataset (1/1 shards): 100%|██████████| 500/500 [00:00<00:00, 7985.59 examples/s]
Saving the dataset (1/1 shards): 100%|██████████| 200/200 [00:00<00:00, 5330.30 examples/s]


In [9]:
def get_or_create_data_asset(ml_client, data_name, data_local_dir, update=False):
    
    try:
        latest_data_version = max([int(d.version) for d in ml_client.data.list(name=data_name)])
        if update:
            raise ResourceExistsError('Found Data asset, but will update the Data.')            
        else:
            data_asset = ml_client.data.get(name=data_name, version=latest_data_version)
            logger.info(f"Found Data asset: {data_name}. Will not create again")
    except (ResourceNotFoundError, ResourceExistsError) as e:
        data = Data(
            path=data_local_dir,
            type=AssetTypes.URI_FOLDER,
            description=f"{data_name} for fine tuning",
            tags={"FineTuningType": "Instruction", "Language": "En"},
            name=data_name
        )
        data_asset = ml_client.data.create_or_update(data)
        logger.info(f"Created/Updated Data asset: {data_name}")
        
    return data_asset

In [10]:
train_data = get_or_create_data_asset(ml_client, f"{AZURE_DATA_NAME}_train", data_local_dir=f"{DATA_DIR}/train", update=True)
val_data = get_or_create_data_asset(ml_client, f"{AZURE_DATA_NAME}_val", data_local_dir=f"{DATA_DIR}/val", update=True)
test_data = get_or_create_data_asset(ml_client, f"{AZURE_DATA_NAME}_test", data_local_dir=f"{DATA_DIR}/test", update=True)

[32mUploading train (4.04 MBs): 100%|██████████| 4039057/4039057 [00:00<00:00, 29147467.79it/s]
[39m

2024-12-30 11:15:52,663 - __main__ - INFO - Created/Updated Data asset: sft-demo-data-function-call_train
[32mUploading val (1.0 MBs): 100%|██████████| 1000766/1000766 [00:00<00:00, 14375008.43it/s]
[39m

2024-12-30 11:15:55,016 - __main__ - INFO - Created/Updated Data asset: sft-demo-data-function-call_val
[32mUploading test (0.44 MBs): 100%|██████████| 444942/444942 [00:00<00:00, 9224383.93it/s]
[39m

2024-12-30 11:15:56,641 - __main__ - INFO - Created/Updated Data asset: sft-demo-data-function-call_test


#### 2.3 Create AzureML environment
Azure ML defines containers (called environment asset) in which your code will run. We can use the built-in environment or build a custom environment (Docker container, conda). This hands-on uses conda yaml.

##### Conda Enviornment

In [15]:
%%writefile {CLOUD_DIR}/train/conda.yml
name: model-env
channels:
  - conda-forge
dependencies:
  - python=3.10
  - pip=24.0
  - pip:
    - bitsandbytes==0.43.3
    - transformers==4.44.2
    - peft~=0.12
    - accelerate~=0.33
    - trl==0.10.1
    - einops==0.8.0
    - datasets==2.21.0
    - wandb==0.17.8
    - mlflow==2.16.0
    - azureml-mlflow==1.57.0
    - azureml-sdk==1.57.0
    - torchvision==0.19.0
    - torch==2.4.0

Writing ./cloud/train/conda.yml


##### Docker Enviornment

In [16]:
%%writefile {CLOUD_DIR}/train/Dockerfile

FROM mcr.microsoft.com/aifx/acpt/stable-ubuntu2004-cu124-py310-torch241:biweekly.202410.2

USER root

# support Deepspeed launcher requirement of passwordless ssh login
RUN apt-get update && apt-get -y upgrade
RUN pip install --upgrade pip
RUN apt-get install -y openssh-server openssh-client

# Install pip dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt --no-cache-dir

RUN MAX_JOBS=4 pip install flash-attn==2.6.3 --no-build-isolation

Writing ./cloud/train/Dockerfile


In [12]:
from azure.ai.ml.entities import Environment, BuildContext

def get_or_create_environment_asset(ml_client, env_name, conda_yml="cloud/conda.yml", update=False):
    
    try:
        latest_env_version = max([int(e.version) for e in ml_client.environments.list(name=env_name)])
        if update:
            raise ResourceExistsError('Found Environment asset, but will update the Environment.')
        else:
            env_asset = ml_client.environments.get(name=env_name, version=latest_env_version)
            print(f"Found Environment asset: {env_name}. Will not create again")
    except (ResourceNotFoundError, ResourceExistsError) as e:
        print(f"Exception: {e}")        
        env_docker_image = Environment(
            image="mcr.microsoft.com/azureml/curated/acft-hf-nlp-gpu:latest",
            conda_file=conda_yml,
            name=env_name,
            description="Environment created for llm fine-tuning.",
        )
        env_asset = ml_client.environments.create_or_update(env_docker_image)
        print(f"Created Environment asset: {env_name}")
        
    return env_asset


def get_or_create_docker_environment_asset(ml_client, env_name, docker_dir, update=False):
    
    try:
        latest_env_version = max([int(e.version) for e in ml_client.environments.list(name=env_name)])
        if update:
            raise ResourceExistsError('Found Environment asset, but will update the Environment.')
        else:
            env_asset = ml_client.environments.get(name=env_name, version=latest_env_version)
            print(f"Found Environment asset: {env_name}. Will not create again")
    except (ResourceNotFoundError, ResourceExistsError) as e:
        print(f"Exception: {e}")
        env_docker_image = Environment(
            build=BuildContext(path=docker_dir),
            name=env_name,
            description="Environment created from a Docker context.",
        )
        env_asset = ml_client.environments.create_or_update(env_docker_image)
        print(f"Created Environment asset: {env_name}")
    
    return env_asset

In [13]:
env = get_or_create_docker_environment_asset(ml_client, azure_env_name, docker_dir=f"{CLOUD_DIR}/train", update=False)

Found Environment asset: slm-llama-acft-custom-env. Will not create again


### 3. Training
#### 3.1 Create the compute cluster

In [14]:
from azure.ai.ml.entities import AmlCompute

### Create the compute cluster
try:
    compute = ml_client.compute.get(azure_compute_cluster_name)
    print("The compute cluster already exists! Reusing it for the current run")
except Exception as ex:
    print(
        f"Looks like the compute cluster doesn't exist. Creating a new one with compute size {azure_compute_cluster_size}!"
    )
    try:
        print("Attempt #1 - Trying to create a dedicated compute")
        tier = 'LowPriority' if USE_LOWPRIORITY_VM else 'Dedicated'
        compute = AmlCompute(
            name=azure_compute_cluster_name,
            size=azure_compute_cluster_size,
            tier=tier,
            max_instances=1,  # For multi node training set this to an integer value more than 1
        )
        ml_client.compute.begin_create_or_update(compute).wait()
    except Exception as e:
        print("Error")

The compute cluster already exists! Reusing it for the current run


##### 3.2 Start the training job

The `command` allows user to configure the following key aspects.  
  
- **inputs** - This is the dictionary of inputs using name value pairs to the command.  
  - **type** - The type of input. This can be a `uri_file` or `uri_folder`. The default is `uri_folder`.  
  - **path** - The path to the file or folder. These can be local or remote files or folders. For remote files - http/https, wasb are supported.  
    - Azure ML `data`/`dataset` or `datastore` are of type `uri_folder`. To use `data`/`dataset` as input, you can use registered dataset in the workspace using the format `'<data_name>:<version>'`. For example, `Input(type='uri_folder', path='my_dataset:1')`  
  - **mode** - Mode of how the data should be delivered to the compute target. Allowed values are `ro_mount`, `rw_mount`, and `download`. Default is `ro_mount`.  
  
- **code** - This is the path where the code to run the command is located.  
  
- **compute** - The compute on which the command will run. You can run it on the local machine by using `local` for the compute.  
  
- **command** - This is the command that needs to be run using the `${{inputs.<input_name>}}` expression. To use files or folders as inputs, we can use the `Input` class. The `Input` class supports three parameters:  
  
- **environment** - This is the environment needed for the command to run. Curated (built-in) or custom environments from the workspace can be used.  
  
- **instance_count** - Number of nodes. Default is 1.  
  
- **distribution** - Distribution configuration for distributed training scenarios. Azure Machine Learning supports PyTorch, TensorFlow, and MPI-based distributed.  


In [15]:
from azure.ai.ml import command
from azure.ai.ml import Input
from azure.ai.ml.entities import ResourceConfiguration

job = command(
    inputs=dict(
        #train_dir=Input(type="uri_folder", path=DATA_DIR), # Get data from local path
        train_dir=Input(path=f"{AZURE_DATA_NAME}_train@latest"),  # Get data from Data asset
        val_dir = Input(path=f"{AZURE_DATA_NAME}_val@latest"),
        epoch=d['train']['epoch'],
        train_batch_size=d['train']['train_batch_size'],
        eval_batch_size=d['train']['eval_batch_size'],  
    ),
    code=f"{CLOUD_DIR}/train",  # local path where the code is stored
    compute=azure_compute_cluster_name,
    command="python train_v3.py --train_dir ${{inputs.train_dir}} --val_dir ${{inputs.val_dir}} --train_batch_size ${{inputs.train_batch_size}} --eval_batch_size ${{inputs.eval_batch_size}}",
    #environment="azureml://registries/azureml/environments/acft-hf-nlp-gpu/versions/77", # Use built-in Environment asset
    environment=f"{azure_env_name}@latest",
    distribution={
        "type": "PyTorch",
        "process_count_per_instance": 1, # For multi-gpu training set this to an integer value more than 1
    },
)
returned_job = ml_client.jobs.create_or_update(job)
ml_client.jobs.stream(returned_job.name)

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
[32mUploading train (0.03 MBs): 100%|

RunId: sleepy_coat_8sbpg21mp3
Web View: https://ml.azure.com/runs/sleepy_coat_8sbpg21mp3?wsid=/subscriptions/8cebb108-a4d5-402b-a0c4-f7556126277f/resourcegroups/azure-ml-priya-demo/workspaces/azure-ml-priya-westus3

Execution Summary
RunId: sleepy_coat_8sbpg21mp3
Web View: https://ml.azure.com/runs/sleepy_coat_8sbpg21mp3?wsid=/subscriptions/8cebb108-a4d5-402b-a0c4-f7556126277f/resourcegroups/azure-ml-priya-demo/workspaces/azure-ml-priya-westus3

AzureMLCompute job failed
ExecutionFailed: [REDACTED]
	exit_codes: 1
	Appinsights Reachable: Some(true)



JobException: Exception : 
 {
    "error": {
        "code": "UserError",
        "message": "Execution failed. User process 'Rank 0' exited with status code 1. Please check log file 'user_logs/std_log_process_0.txt' for error details. Error: [rank0]:   File \"/mnt/azureml/cr/j/431a216f07394b8c84cce56199b6f827/exe/wd/train_v3.py\", line 294, in <module>\n[rank0]:     main(args)\n[rank0]:   File \"/mnt/azureml/cr/j/431a216f07394b8c84cce56199b6f827/exe/wd/train_v3.py\", line 196, in main\n[rank0]:     train_dataset, eval_dataset = prepare_dataset(tokenizer, args)\n[rank0]:   File \"/mnt/azureml/cr/j/431a216f07394b8c84cce56199b6f827/exe/wd/train_v3.py\", line 106, in prepare_dataset\n[rank0]:     processed_train_dataset = train_dataset.map(apply_chat_template, cache_file_name = f\"{cache_dir}/cache.arrow\", batched = True, remove_columns=column_names)\n[rank0]:   File \"/opt/conda/envs/ptca/lib/python3.10/site-packages/datasets/arrow_dataset.py\", line 560, in wrapper\n[rank0]:     out: Union[\"Dataset\", \"DatasetDict\"] = func(self, *args, **kwargs)\n[rank0]:   File \"/opt/conda/envs/ptca/lib/python3.10/site-packages/datasets/arrow_dataset.py\", line 3055, in map\n[rank0]:     for rank, done, content in Dataset._map_single(**dataset_kwargs):\n[rank0]:   File \"/opt/conda/envs/ptca/lib/python3.10/site-packages/datasets/arrow_dataset.py\", line 3458, in _map_single\n[rank0]:     batch = apply_function_on_filtered_inputs(\n[rank0]:   File \"/opt/conda/envs/ptca/lib/python3.10/site-packages/datasets/arrow_dataset.py\", line 3320, in apply_function_on_filtered_inputs\n[rank0]:     processed_inputs = function(*fn_args, *additional_args, **fn_kwargs)\n[rank0]:   File \"/mnt/azureml/cr/j/431a216f07394b8c84cce56199b6f827/exe/wd/train_v3.py\", line 93, in apply_chat_template\n[rank0]:     for system, chat in zip(examples[\"system\"], examples[\"chat\"]):\n[rank0]:   File \"/opt/conda/envs/ptca/lib/python3.10/site-packages/datasets/formatting/formatting.py\", line 277, in __getitem__\n[rank0]:     value = self.data[key]\n[rank0]: KeyError: 'system'\n[rank0]:[W1230 11:30:57.243089535 ProcessGroupNCCL.cpp:1250] Warning: WARNING: process group has NOT been destroyed before we destruct ProcessGroupNCCL. On normal program exit, the application should call destroy_process_group to ensure that any pending NCCL operations have finished in this process. In rare cases this process can exit before this point and block the progress of another member of the process group. This constraint has always been present,  but this warning has only been added since PyTorch 2.4 (function operator())\n",
        "message_parameters": {},
        "details": []
    },
    "time": "0001-01-01T00:00:00.000Z",
    "component_name": "CommonRuntime"
} 

#### 4. Register the model for future deployment and inference

In [14]:
from azureml.core import Workspace, Run 
import os  
  
# Connect to your workspace  
ws = Workspace.from_config()  
  
experiment_name =  'fine-tune-llama-32b-fc'
run_id = 'maroon_frame_xmhq41lcjl'

run = Run(ws.experiments[experiment_name], run_id)  

# Register the model  
model = run.register_model(  
    model_name=d["serve"]["azure_model_name"],  # this is the name the model will be registered under  
    model_path="outputs"  # this is the path to the model file in the run's outputs  
)  
# Create a local directory to save the outputs  
local_folder = './model'  
os.makedirs(local_folder, exist_ok=True)  
  
# Download the entire outputs folder  
run.download_files(prefix='outputs', output_directory=local_folder)  