# Fine-tuning Open Source LLM using the Azure ML Python SDK (MLflow) 

### Overview
Azure ML Workspace is compatible with MLflow and can be used as an MLflow Tracking Server, as described in the following official guide from Microsoft. MLflow provides features such as experiment tracking, model management, and model deployment, allowing you to manage data science and machine learning workflows more efficiently and systematically. Below are the main advantages of using Azure ML and MLflow together.

#### 1. Experiment tracking and management
You can systematically manage the parameters, metrics, and artifacts of all your experiments. Integrating with Azur eML allows you to easily track and manage this information within your Azure ML workspace.

#### 2. Model management
MLflow provides a model registry for model versioning. Integrate with AzureML to systematically manage and deploy all versions of your models. When combined with AzureML's deployment capabilities, models can be easily deployed to a variety of environments (e.g. Azure Kubernetes Service, Azure Container Instances).

#### 3. Reproducibility and collaboration
MLflow records the parameters and environment of every experiment, so you can accurately reproduce the experiment. This is very useful when you need to redo the same experiment across collaborating team members, or when you need to rerun an experiment at a later date.

#### 4. CI/CD integration
MLflow makes it easy to implement continuous integration (CI) and continuous deployment (CD) of machine learning models. Integrate with Azure DevOps or GitHub Actions to automatically run training, validation, and deployment processes as model changes occur.

When training a model with Hugging Face's Trainer API, if you specify `report_to="azure_ml"`, basic indicators will be automatically logged without any additional code. Of course, you can freely log custom indicators using Bring Your Own Script like the conventional method, but Azure ML's basic logging function is also excellent, so try using it as a baseline.

[Note] Please use `Python 3.10 - SDK v2 (azureml_py310_sdkv2)` conda environment.

## Load config file
---

In [None]:
import os
import yaml
from datetime import datetime
snapshot_date = datetime.now().strftime("%Y-%m-%d")

with open('config.yml') as f:
    d = yaml.load(f, Loader=yaml.FullLoader)
    
AZURE_SUBSCRIPTION_ID = d['config']['AZURE_SUBSCRIPTION_ID']
AZURE_RESOURCE_GROUP = d['config']['AZURE_RESOURCE_GROUP']
AZURE_WORKSPACE = d['config']['AZURE_WORKSPACE']
AZURE_DATA_NAME = d['config']['AZURE_DATA_NAME']    
DATA_DIR = d['config']['DATA_DIR']
CLOUD_DIR = d['config']['CLOUD_DIR']
HF_MODEL_NAME_OR_PATH = d['config']['HF_MODEL_NAME_OR_PATH']
IS_DEBUG = d['config']['IS_DEBUG']
USE_LOWPRIORITY_VM = d['config']['USE_LOWPRIORITY_VM']

azure_env_name = d['train']['azure_env_name']  
azure_compute_cluster_name = d['train']['azure_compute_cluster_name']
azure_compute_cluster_size = d['train']['azure_compute_cluster_size']

!rm -rf $DATA_DIR 
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(CLOUD_DIR, exist_ok=True)

<br>

## 1. Dataset preparation
---

In [None]:
from datasets import load_dataset
from random import randrange

# Load dataset from the hub
dataset = load_dataset("HuggingFaceH4/ultrachat_200k", split="train_sft[:2%]")

print(f"dataset size: {len(dataset)}")
if IS_DEBUG:
    dataset = dataset.select(range(1000))

dataset = dataset.train_test_split(test_size=0.2)
train_dataset = dataset['train']
train_dataset.to_json(f"{DATA_DIR}/train.jsonl")
test_dataset = dataset['test']
test_dataset.to_json(f"{DATA_DIR}/eval.jsonl")

<br>

## 2. Training preparation
---

### 2.1. Configure workspace details

To connect to a workspace, we need identifying parameters - a subscription, a resource group, and a workspace name. We will use these details in the MLClient from azure.ai.ml to get a handle on the Azure Machine Learning workspace we need. We will use the default Azure authentication for this hands-on.

In [None]:
# import required libraries
import time
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient, Input
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import load_component
from azure.ai.ml import command
from azure.ai.ml.entities import Data, Environment, BuildContext
from azure.ai.ml.entities import Model
from azure.ai.ml import Input
from azure.ai.ml import Output
from azure.ai.ml.constants import AssetTypes
from azure.core.exceptions import ResourceNotFoundError, ResourceExistsError

credential = DefaultAzureCredential()
ml_client = None
try:
    ml_client = MLClient.from_config(credential)
except Exception as ex:
    print(ex)
    ml_client = MLClient(credential, AZURE_SUBSCRIPTION_ID, AZURE_RESOURCE_GROUP, AZURE_WORKSPACE)

### 2.2. Create AzureML environment and data

Azure ML defines containers (called environment asset) in which your code will run. We can use the built-in environment or build a custom environment (Docker container, conda). 
This hands-on uses conda yaml. 

Training data can be used as a dataset stored in the local development environment, but can also be registered as AzureML data.

In [None]:
%%writefile {CLOUD_DIR}/conda.yml
name: model-env
channels:
  - conda-forge
dependencies:
  - python=3.10
  - pip=24.0
  - pip:
    - bitsandbytes==0.43.1
    - transformers~=4.41
    - peft~=0.11
    - accelerate~=0.30
    - trl==0.9.4
    - einops==0.8.0
    - datasets==2.20.0
    - wandb==0.17.2
    - mlflow==2.14.1
    - azureml-mlflow==1.56.0
    - azureml-sdk==1.56.0
    - torchvision==0.18.1

In [None]:
def get_or_create_environment_asset(ml_client, env_name, conda_yml="cloud/conda.yml", update=False):
    
    try:
        latest_env_version = max([int(e.version) for e in ml_client.environments.list(name=env_name)])
        if update:
            raise ResourceExistsError('Found Environment asset, but will update the Environment.')
        else:
            env_asset = ml_client.environments.get(name=env_name, version=latest_env_version)
            print(f"Found Environment asset: {env_name}. Will not create again")
    except (ResourceNotFoundError, ResourceExistsError) as e:
        print(f"Exception: {e}")        
        env_docker_image = Environment(
            image="mcr.microsoft.com/azureml/curated/acft-hf-nlp-gpu:latest",
            conda_file=conda_yml,
            name=env_name,
            description="Environment created for llm fine-tuning.",
        )
        env_asset = ml_client.environments.create_or_update(env_docker_image)
        print(f"Created Environment asset: {env_name}")
        
    return env_asset


def get_or_create_data_asset(ml_client, data_name, data_local_dir, update=False):
    
    try:
        latest_data_version = max([int(d.version) for d in ml_client.data.list(name=data_name)])
        if update:
            raise ResourceExistsError('Found Data asset, but will update the Data.')            
        else:
            data_asset = ml_client.data.get(name=data_name, version=latest_data_version)
            print(f"Found Data asset: {data_name}. Will not create again")
    except (ResourceNotFoundError, ResourceExistsError) as e:
        data = Data(
            path=data_local_dir,
            type=AssetTypes.URI_FOLDER,
            description=f"{data_name} for fine tuning",
            tags={"FineTuningType": "Instruction", "Language": "En"},
            name=data_name
        )
        data_asset = ml_client.data.create_or_update(data)
        print(f"Created Data asset: {data_name}")
        
    return data_asset

In [None]:
env = get_or_create_environment_asset(ml_client, azure_env_name, conda_yml=f"{CLOUD_DIR}/conda.yml", update=False)
data = get_or_create_data_asset(ml_client, AZURE_DATA_NAME, data_local_dir=DATA_DIR, update=False)

### 2.3. Training script

In [None]:
%%writefile src_train/train_mlflow.py
from mlflow.models.signature import ModelSignature
from mlflow.types import DataType, Schema, ColSpec

import os
import mlflow
from mlflow.models import infer_signature
import argparse
import sys
import logging

import datasets
from datasets import load_dataset
from peft import LoraConfig
import torch
import transformers
from trl import SFTTrainer
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, BitsAndBytesConfig
from datasets import load_dataset
from datetime import datetime

logger = logging.getLogger(__name__)

def load_model(args, model_name_or_path="microsoft/Phi-3-mini-4k-instruct"):

    model_kwargs = dict(
        use_cache=False,
        trust_remote_code=True,
        #attn_implementation="flash_attention_2",  # loading the model with flash-attenstion support
        torch_dtype=torch.bfloat16,
        device_map=None
    )
    model = AutoModelForCausalLM.from_pretrained(model_name_or_path, **model_kwargs)
    tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)
    tokenizer.model_max_length = args.max_seq_length
    tokenizer.pad_token = tokenizer.unk_token  # use unk rather than eos token to prevent endless generation
    tokenizer.pad_token_id = tokenizer.convert_tokens_to_ids(tokenizer.pad_token)
    tokenizer.padding_side = "right"
    return model, tokenizer

def apply_chat_template(
    example,
    tokenizer,
):
    messages = example["messages"]
    # Add an empty system message if there is none
    if messages[0]["role"] != "system":
        messages.insert(0, {"role": "system", "content": ""})
    example["text"] = tokenizer.apply_chat_template(
        messages, tokenize=False, add_generation_prompt=False)
    return example

def main(args):
    
    ###################
    # Hyper-parameters
    ###################
    # Only overwrite environ if wandb param passed
    if len(args.wandb_project) > 0:
        os.environ['WANDB_API_KEY'] = args.wandb_api_key    
        os.environ["WANDB_PROJECT"] = args.wandb_project
    if len(args.wandb_watch) > 0:
        os.environ["WANDB_WATCH"] = args.wandb_watch
    if len(args.wandb_log_model) > 0:
        os.environ["WANDB_LOG_MODEL"] = args.wandb_log_model

    use_wandb = len(args.wandb_project) > 0 or ("WANDB_PROJECT" in os.environ and len(os.environ["WANDB_PROJECT"]) > 0) 

    training_config = {
        "bf16": True,
        "do_eval": False,
        "learning_rate": args.learning_rate,
        "log_level": "info",
        "logging_steps": args.logging_steps,
        "logging_strategy": "steps",
        "lr_scheduler_type": args.lr_scheduler_type,
        "num_train_epochs": args.epochs,
        "max_steps": -1,
        "output_dir": args.output_dir,
        "overwrite_output_dir": True,
        "per_device_train_batch_size": args.train_batch_size,
        "per_device_eval_batch_size": args.eval_batch_size,
        "remove_unused_columns": True,
        "save_steps": args.save_steps,
        "save_total_limit": 1,
        "seed": args.seed,
        "gradient_checkpointing": True,
        "gradient_checkpointing_kwargs": {"use_reentrant": False},
        "gradient_accumulation_steps": args.grad_accum_steps,
        "warmup_ratio": args.warmup_ratio,
    }

    peft_config = {
        "r": args.lora_r,
        "lora_alpha": args.lora_alpha,
        "lora_dropout": args.lora_dropout,
        "bias": "none",
        "task_type": "CAUSAL_LM",
        #"target_modules": "all-linear",
        "target_modules": ["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"],
        "modules_to_save": None,
    }

    checkpoint_dir = os.path.join(args.output_dir, "checkpoints")

    train_conf = TrainingArguments(
        **training_config,
        report_to="wandb" if use_wandb else "azure_ml",
        run_name=args.wandb_run_name if use_wandb else None,    
    )
    peft_conf = LoraConfig(**peft_config)
    model, tokenizer = load_model(args)

    ###############
    # Setup logging
    ###############
    logging.basicConfig(
        format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
        handlers=[logging.StreamHandler(sys.stdout)],
    )
    log_level = train_conf.get_process_log_level()
    logger.setLevel(log_level)
    datasets.utils.logging.set_verbosity(log_level)
    transformers.utils.logging.set_verbosity(log_level)
    transformers.utils.logging.enable_default_handler()
    transformers.utils.logging.enable_explicit_format()

    # Log on each process a small summary
    logger.warning(
        f"Process rank: {train_conf.local_rank}, device: {train_conf.device}, n_gpu: {train_conf.n_gpu}"
        + f" distributed training: {bool(train_conf.local_rank != -1)}, 16-bits training: {train_conf.fp16}"
    )
    logger.info(f"Training/evaluation parameters {train_conf}")
    logger.info(f"PEFT parameters {peft_conf}")    

    ##################
    # Data Processing
    ##################
    train_dataset = load_dataset('json', data_files=os.path.join(args.train_dir, 'train.jsonl'), split='train')
    eval_dataset = load_dataset('json', data_files=os.path.join(args.train_dir, 'eval.jsonl'), split='train')
    column_names = list(train_dataset.features)

    processed_train_dataset = train_dataset.map(
        apply_chat_template,
        fn_kwargs={"tokenizer": tokenizer},
        num_proc=10,
        remove_columns=column_names,
        desc="Applying chat template to train_sft",
    )

    processed_eval_dataset = eval_dataset.map(
        apply_chat_template,
        fn_kwargs={"tokenizer": tokenizer},
        num_proc=10,
        remove_columns=column_names,
        desc="Applying chat template to test_sft",
    )

    with mlflow.start_run() as run:        
        ###########
        # Training
        ###########
        trainer = SFTTrainer(
            model=model,
            args=train_conf,
            peft_config=peft_conf,
            train_dataset=processed_train_dataset,
            eval_dataset=processed_eval_dataset,
            max_seq_length=args.max_seq_length,
            dataset_text_field="text",
            tokenizer=tokenizer,
            packing=True,
        )

        # Show current memory stats
        gpu_stats = torch.cuda.get_device_properties(0)
        start_gpu_memory = round(torch.cuda.max_memory_reserved() / 1024 / 1024 / 1024, 3)
        max_memory = round(gpu_stats.total_memory / 1024 / 1024 / 1024, 3)
        logger.info(f"GPU = {gpu_stats.name}. Max memory = {max_memory} GB.")
        logger.info(f"{start_gpu_memory} GB of memory reserved.")
        
        last_checkpoint = None
        if os.path.isdir(checkpoint_dir):
            checkpoints = [os.path.join(checkpoint_dir, d) for d in os.listdir(checkpoint_dir)]
            if len(checkpoints) > 0:
                checkpoints.sort(key=os.path.getmtime, reverse=True)
                last_checkpoint = checkpoints[0]        

        trainer_stats = trainer.train(resume_from_checkpoint=last_checkpoint)

        #############
        # Logging
        #############
        metrics = trainer_stats.metrics

        # Show final memory and time stats 
        used_memory = round(torch.cuda.max_memory_reserved() / 1024 / 1024 / 1024, 3)
        used_memory_for_lora = round(used_memory - start_gpu_memory, 3)
        used_percentage = round(used_memory         /max_memory*100, 3)
        lora_percentage = round(used_memory_for_lora/max_memory*100, 3)

        logger.info(f"{metrics['train_runtime']} seconds used for training.")
        logger.info(f"{round(metrics['train_runtime']/60, 2)} minutes used for training.")
        logger.info(f"Peak reserved memory = {used_memory} GB.")
        logger.info(f"Peak reserved memory for training = {used_memory_for_lora} GB.")
        logger.info(f"Peak reserved memory % of max memory = {used_percentage} %.")
        logger.info(f"Peak reserved memory for training % of max memory = {lora_percentage} %.")
                
        trainer.log_metrics("train", metrics)

        model_info = mlflow.transformers.log_model(
            transformers_model={"model": trainer.model, "tokenizer": tokenizer},
            #prompt_template=prompt_template,
            #signature=signature,
            artifact_path=args.model_dir,  # This is a relative path to save model files within MLflow run
        )


def parse_args():
    # setup argparse
    parser = argparse.ArgumentParser()
    # curr_time = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")

    # hyperparameters
    parser.add_argument("--train_dir", default="data", type=str, help="Input directory for training")
    parser.add_argument("--model_dir", default="./model", type=str, help="output directory for model")
    parser.add_argument("--epochs", default=1, type=int, help="number of epochs")
    parser.add_argument("--output_dir", default="./output_dir", type=str, help="directory to temporarily store when training a model")
    parser.add_argument("--train_batch_size", default=8, type=int, help="training - mini batch size for each gpu/process")
    parser.add_argument("--eval_batch_size", default=8, type=int, help="evaluation - mini batch size for each gpu/process")
    parser.add_argument("--learning_rate", default=5e-06, type=float, help="learning rate")
    parser.add_argument("--logging_steps", default=2, type=int, help="logging steps")
    parser.add_argument("--save_steps", default=100, type=int, help="save steps")    
    parser.add_argument("--grad_accum_steps", default=4, type=int, help="gradient accumulation steps")
    parser.add_argument("--lr_scheduler_type", default="linear", type=str)
    parser.add_argument("--seed", default=0, type=int, help="seed")
    parser.add_argument("--warmup_ratio", default=0.2, type=float, help="warmup ratio")
    parser.add_argument("--max_seq_length", default=2048, type=int, help="max seq length")
    parser.add_argument("--save_merged_model", type=bool, default=False)

    # lora hyperparameters
    parser.add_argument("--lora_r", default=16, type=int, help="lora r")
    parser.add_argument("--lora_alpha", default=16, type=int, help="lora alpha")
    parser.add_argument("--lora_dropout", default=0.05, type=float, help="lora dropout")
    
    # wandb params
    parser.add_argument("--wandb_api_key", type=str, default="")
    parser.add_argument("--wandb_project", type=str, default="")
    parser.add_argument("--wandb_run_name", type=str, default="")
    parser.add_argument("--wandb_watch", type=str, default="gradients") # options: false | gradients | all
    parser.add_argument("--wandb_log_model", type=str, default="false") # options: false | true

    # parse args
    args = parser.parse_args()

    # return args
    return args

if __name__ == "__main__":
    #sys.argv = ['']
    args = parse_args()
    main(args)

<br>

## 3. Training
---

### 3.1. Create the compute cluster

In [None]:
from azure.ai.ml.entities import AmlCompute

### Create the compute cluster
try:
    compute = ml_client.compute.get(azure_compute_cluster_name)
    print("The compute cluster already exists! Reusing it for the current run")
except Exception as ex:
    print(
        f"Looks like the compute cluster doesn't exist. Creating a new one with compute size {azure_compute_cluster_size}!"
    )
    try:
        print("Attempt #1 - Trying to create a dedicated compute")
        tier = 'LowPriority' if USE_LOWPRIORITY_VM else 'Dedicated'
        compute = AmlCompute(
            name=azure_compute_cluster_name,
            size=azure_compute_cluster_size,
            tier=tier,
            max_instances=1,  # For multi node training set this to an integer value more than 1
        )
        ml_client.compute.begin_create_or_update(compute).wait()
    except Exception as e:
        print("Error")

### 3.2. Start training job


In [None]:
from azure.ai.ml import command
from azure.ai.ml import Input
from azure.ai.ml.entities import ResourceConfiguration

job = command(
    inputs=dict(
        #train_dir=Input(type="uri_folder", path=DATA_DIR), # Get data from local path
        train_dir=Input(path=f"{AZURE_DATA_NAME}@latest"),  # Get data from Data asset
        epoch=d['train']['epoch'],
        train_batch_size=d['train']['train_batch_size'],
        eval_batch_size=d['train']['eval_batch_size'],  
        model_dir=d['train']['model_dir']
    ),
    code="./src_train",  # local path where the code is stored
    compute=azure_compute_cluster_name,
    command="python train_mlflow.py --train_dir ${{inputs.train_dir}} --epochs ${{inputs.epoch}} --train_batch_size ${{inputs.train_batch_size}} --eval_batch_size ${{inputs.eval_batch_size}} --model_dir ${{inputs.model_dir}}",
    #environment="azureml://registries/azureml/environments/acft-hf-nlp-gpu/versions/57", # Use built-in Environment asset
    environment=f"{azure_env_name}@latest",
    distribution={
        "type": "PyTorch",
        "process_count_per_instance": 1, # For multi-gpu training set this to an integer value more than 1
    },
)
returned_job = ml_client.jobs.create_or_update(job)
ml_client.jobs.stream(returned_job.name)

In [None]:
display(returned_job)

In [None]:
# check if the `trained_model` output is available
job_name = returned_job.name

In [None]:
%store job_name

<br>

## 4. (Optional) Create model asset and get fine-tuned LLM to local folder
---

### 3.1. Create model asset

In [None]:
def get_or_create_model_asset(ml_client, model_name, job_name, model_dir="outputs", model_type="custom_model", update=False):
    
    try:
        latest_model_version = max([int(m.version) for m in ml_client.models.list(name=model_name)])
        if update:
            raise ResourceExistsError('Found Model asset, but will update the Model.')
        else:
            model_asset = ml_client.models.get(name=model_name, version=latest_model_version)
            print(f"Found Model asset: {model_name}. Will not create again")
    except (ResourceNotFoundError, ResourceExistsError) as e:
        print(f"Exception: {e}")        
        model_path = f"azureml://jobs/{job_name}/outputs/artifacts/paths/{model_dir}/"    
        run_model = Model(
            name=model_name,        
            path=model_path,
            description="Model created from run.",
            type=model_type # mlflow_model, custom_model, triton_model
        )
        model_asset = ml_client.models.create_or_update(run_model)
        print(f"Created Model asset: {model_name}")

    return model_asset

In [None]:
azure_model_name = d['serve']['azure_model_name']
model_dir = d['train']['model_dir']
model = get_or_create_model_asset(ml_client, azure_model_name, job_name, model_dir, model_type="custom_model", update=False)

### 3.2. Get fine-tuned LLM to local folder
You can copy it to your local directory to perform inference or serve the model in Azure environment. (e.g., real-time endpoint)

In [None]:
# Download the model (this is optional)
local_model_dir = "./artifact_downloads"
os.makedirs(local_model_dir, exist_ok=True)

ml_client.models.download(name=azure_model_name, download_path=local_model_dir, version=model.version)