# Phase 1: Training Orchestration

This notebook orchestrates all training activities without performing local computation.

## Overview

- **Step 1**: Load Centralized Configs
- **Step 2**: Data Ingestion & Versioning (Asset Layer)
- **Step 3**: Environment Definition
- **Step 4**: The Dry Run
- **Step 5**: The Sweep (HPO)
- **Step 6**: Best Configuration Selection (Automated)
- **Step 7**: Final Training (Post-HPO, Single Run)

## Important

- This notebook **only submits and monitors Azure ML jobs**
- **No training logic** is executed locally
- All computation happens remotely on Azure ML compute
- The notebook must be **re-runnable end-to-end**


In [None]:
import os
from pathlib import Path
import yaml
import hashlib
import json
from typing import Dict, Any
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential
from dotenv import load_dotenv

CONFIG_HASH_LENGTH = 16
DEFAULT_WORKSPACE_NAME = "resume-ner-ws"

env_path = Path("../config.env")
if env_path.exists():
    load_dotenv(env_path)


## Step 1: Load Centralized Configs

Load and validate all configuration files. Configs are immutable and will be logged with each job for reproducibility.


In [None]:
CONFIG_DIR = Path("../config")
CONFIG_PATHS = {
    "data": CONFIG_DIR / "data" / "resume_v1.yaml",
    "model": CONFIG_DIR / "model" / "distilbert.yaml",
    "train": CONFIG_DIR / "train.yaml",
    "hpo": CONFIG_DIR / "hpo" / "smoke.yaml",
    "env": CONFIG_DIR / "env" / "azure.yaml",
}


def load_config_file(path: Path) -> Dict[str, Any]:
    """
    Load a YAML config file.
    
    Args:
        path: Path to the YAML config file
        
    Returns:
        dict: Parsed configuration dictionary
        
    Raises:
        FileNotFoundError: If config file does not exist
    """
    if not path.exists():
        raise FileNotFoundError(f"Config file not found: {path}")
    with open(path, "r") as f:
        return yaml.safe_load(f)


def compute_config_hash(config: Dict[str, Any]) -> str:
    """
    Compute SHA256 hash of config for reproducibility.
    
    Args:
        config: Configuration dictionary
        
    Returns:
        str: Hexadecimal hash string (truncated to CONFIG_HASH_LENGTH)
    """
    config_str = json.dumps(config, sort_keys=True)
    full_hash = hashlib.sha256(config_str.encode()).hexdigest()
    return full_hash[:CONFIG_HASH_LENGTH]


configs = {}
config_hashes = {}

for name, path in CONFIG_PATHS.items():
    configs[name] = load_config_file(path)
    config_hashes[name] = compute_config_hash(configs[name])


In [None]:
original_configs = {k: json.dumps(v, sort_keys=True) for k, v in configs.items()}


def validate_config_immutability():
    """
    Ensure configs haven't been mutated at runtime.
    
    Raises:
        ValueError: If any config was mutated
    """
    for name in configs:
        current = json.dumps(configs[name], sort_keys=True)
        if current != original_configs[name]:
            raise ValueError(f"Config '{name}' was mutated at runtime!")


validate_config_immutability()


In [None]:
def get_workspace_name() -> str:
    """
    Get workspace name from infrastructure config or use default.
    
    Returns:
        str: Workspace name
    """
    infrastructure_config_path = Path("../config/infrastructure.yaml")
    if infrastructure_config_path.exists():
        with open(infrastructure_config_path, "r") as f:
            infrastructure_config = yaml.safe_load(f)
        return infrastructure_config["workspace"]["name"]
    return DEFAULT_WORKSPACE_NAME


subscription_id = os.getenv("AZURE_SUBSCRIPTION_ID")
resource_group = os.getenv("AZURE_RESOURCE_GROUP")

if not subscription_id or not resource_group:
    raise ValueError("AZURE_SUBSCRIPTION_ID and AZURE_RESOURCE_GROUP must be set")

workspace_name = get_workspace_name()
credential = DefaultAzureCredential()
ml_client = MLClient(
    credential=credential,
    subscription_id=subscription_id,
    resource_group_name=resource_group,
    workspace_name=workspace_name,
)


All configs and their hashes will be attached to each Azure ML job for full reproducibility.


In [None]:
def create_config_metadata(configs: Dict[str, Any], config_hashes: Dict[str, str]) -> Dict[str, str]:
    """
    Create metadata dictionary for job tagging.
    
    Args:
        configs: Dictionary of loaded configs
        config_hashes: Dictionary of config hashes
        
    Returns:
        dict: Metadata dictionary for Azure ML job tags
    """
    return {
        "data_config_hash": config_hashes["data"],
        "model_config_hash": config_hashes["model"],
        "train_config_hash": config_hashes["train"],
        "hpo_config_hash": config_hashes["hpo"],
        "env_config_hash": config_hashes["env"],
        "data_version": configs["data"]["version"],
        "model_backbone": configs["model"]["backbone"],
    }


config_metadata = create_config_metadata(configs, config_hashes)


## Step 2: Data Ingestion & Versioning (Asset Layer)

Upload dataset to Blob Storage and register as an Azure ML Data Asset for versioned, immutable data access.


In [None]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes
from azure.storage.blob import BlobServiceClient
import json

DATASET_LOCAL_PATH = Path("../dataset")
DATA_ASSET_NAME = configs["data"]["name"]
DATA_ASSET_VERSION = configs["data"]["version"]


In [None]:
def get_storage_account_name() -> str:
    """
    Get storage account name from infrastructure config.
    
    Returns:
        str: Storage account name
        
    Raises:
        ValueError: If storage account name not found
    """
    infrastructure_config_path = Path("../config/infrastructure.yaml")
    if infrastructure_config_path.exists():
        with open(infrastructure_config_path, "r") as f:
            infrastructure_config = yaml.safe_load(f)
        return infrastructure_config["storage"]["account_name"]
    raise ValueError("Storage account name not found in infrastructure config")


def build_connection_string(account_name: str, account_key: str) -> str:
    """
    Build storage account connection string.
    
    Args:
        account_name: Storage account name
        account_key: Storage account key
        
    Returns:
        str: Connection string for blob service client
    """
    return f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"


def upload_dataset_to_blob(dataset_path: Path, container_name: str) -> str:
    """
    Upload dataset to blob storage (idempotent).
    
    Args:
        dataset_path: Local path to dataset directory
        container_name: Blob storage container name
        
    Returns:
        str: Azure ML datastore path
    """
    storage_account_name = get_storage_account_name()
    subscription_id = os.getenv("AZURE_SUBSCRIPTION_ID")
    resource_group = os.getenv("AZURE_RESOURCE_GROUP")
    
    from azure.mgmt.storage import StorageManagementClient
    from azure.identity import DefaultAzureCredential
    
    credential = DefaultAzureCredential()
    storage_mgmt = StorageManagementClient(credential, subscription_id)
    keys = storage_mgmt.storage_accounts.list_keys(resource_group, storage_account_name)
    
    conn_str = build_connection_string(storage_account_name, keys.keys[0].value)
    blob_service_client = BlobServiceClient.from_connection_string(conn_str)
    
    container_client = blob_service_client.get_container_client(container_name)
    if not container_client.exists():
        container_client.create_container()
    
    blob_path = f"{DATA_ASSET_NAME}/v{DATA_ASSET_VERSION}"
    
    for file_path in dataset_path.rglob("*"):
        if file_path.is_file():
            relative_path = file_path.relative_to(dataset_path)
            blob_name = f"{blob_path}/{relative_path}"
            blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
            
            if not blob_client.exists():
                with open(file_path, "rb") as data:
                    blob_client.upload_blob(data, overwrite=False)
    
    return f"azureml://datastores/workspaceblobstore/paths/{blob_path}"


blob_uri = upload_dataset_to_blob(DATASET_LOCAL_PATH, "datasets")


In [None]:
def register_data_asset(name: str, version: str, uri: str, description: str) -> Data:
    """
    Register or resolve Azure ML Data Asset (uri_folder type).
    
    Args:
        name: Data asset name
        version: Data asset version
        uri: Azure ML datastore path
        description: Asset description
        
    Returns:
        Data: Registered data asset
    """
    try:
        existing_asset = ml_client.data.get(name=name, version=version)
        return existing_asset
    except Exception:
        data_asset = Data(
            name=name,
            version=version,
            description=description,
            path=uri,
            type=AssetTypes.URI_FOLDER,
        )
        return ml_client.data.create_or_update(data_asset)


data_asset = register_data_asset(
    name=DATA_ASSET_NAME,
    version=DATA_ASSET_VERSION,
    uri=blob_uri,
    description=configs["data"]["description"],
)


In [None]:
def validate_data_asset(asset: Data, local_path: Path) -> int:
    """
    Validate data asset: file presence, sample readability, token/label alignment.
    
    Args:
        asset: Registered data asset
        local_path: Local dataset path for validation
        
    Returns:
        int: Number of validated samples
        
    Raises:
        ValueError: If validation fails
    """
    train_file = local_path / "train.json"
    if not train_file.exists():
        raise ValueError(f"train.json not found in {local_path}")
    
    with open(train_file, "r", encoding="utf-8") as f:
        data = json.load(f)
    
    if not isinstance(data, list) or len(data) == 0:
        raise ValueError("train.json must contain a non-empty list")
    
    sample = data[0]
    required_keys = ["text", "annotations"]
    for key in required_keys:
        if key not in sample:
            raise ValueError(f"Sample missing required key: {key}")
    
    text = sample["text"]
    annotations = sample["annotations"]
    
    if not isinstance(text, str) or len(text) == 0:
        raise ValueError("Sample text must be a non-empty string")
    
    if not isinstance(annotations, list):
        raise ValueError("Annotations must be a list")
    
    for ann in annotations:
        if not isinstance(ann, list) or len(ann) != 3:
            raise ValueError("Each annotation must be [start, end, entity_type]")
        start, end, entity_type = ann
        if not (0 <= start < end <= len(text)):
            raise ValueError(f"Invalid annotation span: [{start}, {end}] for text length {len(text)}")
    
    return len(data)


validate_data_asset(data_asset, DATASET_LOCAL_PATH)


## Step 3: Environment Definition

Define a stable execution environment (Docker image + Conda dependencies) for consistent behavior across all training jobs.


In [None]:
from azure.ai.ml.entities import Environment
from azure.core.exceptions import ResourceNotFoundError

ENVIRONMENT_NAME = "resume-ner-training"
CONDA_ENV_PATH = Path("../config/environment/conda.yaml")
DEFAULT_DOCKER_IMAGE = "mcr.microsoft.com/azureml/curated/pytorch-2.0-ubuntu20.04-py310-cuda11.7.1:latest"


In [None]:
def load_conda_environment(path: Path) -> Dict[str, Any]:
    """
    Load conda environment definition from YAML file.
    
    Args:
        path: Path to conda environment YAML file
        
    Returns:
        dict: Parsed conda environment dictionary
        
    Raises:
        FileNotFoundError: If conda environment file does not exist
    """
    if not path.exists():
        raise FileNotFoundError(f"Conda environment file not found: {path}")
    with open(path, "r") as f:
        return yaml.safe_load(f)


def compute_environment_hash(conda_deps: Dict[str, Any], docker_image: str) -> str:
    """
    Compute SHA256 hash of environment definition for versioning.
    
    Args:
        conda_deps: Conda environment dependencies dictionary
        docker_image: Docker base image name
        
    Returns:
        str: Hexadecimal hash string (truncated to CONFIG_HASH_LENGTH)
    """
    env_spec = {
        "conda_dependencies": conda_deps,
        "docker_image": docker_image,
    }
    env_str = json.dumps(env_spec, sort_keys=True)
    full_hash = hashlib.sha256(env_str.encode()).hexdigest()
    return full_hash[:CONFIG_HASH_LENGTH]


conda_env = load_conda_environment(CONDA_ENV_PATH)
docker_image = DEFAULT_DOCKER_IMAGE
environment_hash = compute_environment_hash(conda_env, docker_image)
environment_version = f"v{environment_hash}"


In [None]:
def get_or_create_environment(
    ml_client: MLClient,
    name: str,
    version: str,
    conda_dependencies: Dict[str, Any],
    docker_image: str,
) -> Environment:
    """
    Get existing environment or create new one based on hash-based versioning.
    
    Args:
        ml_client: MLClient instance
        name: Environment name
        version: Environment version (hash-based)
        conda_dependencies: Conda environment dependencies
        docker_image: Docker base image
        
    Returns:
        Environment: Azure ML Environment instance
    """
    try:
        existing_env = ml_client.environments.get(name=name, version=version)
        return existing_env
    except ResourceNotFoundError:
        environment = Environment(
            name=name,
            version=version,
            conda_dependencies=conda_dependencies,
            docker_image=docker_image,
            description=f"Training environment for Resume NER (hash: {version})",
        )
        return ml_client.environments.create_or_update(environment)


training_environment = get_or_create_environment(
    ml_client=ml_client,
    name=ENVIRONMENT_NAME,
    version=environment_version,
    conda_dependencies=conda_env,
    docker_image=docker_image,
)


## Step P1-3.4: The Dry Run

Submit a minimal sweep job using `smoke.yaml` to validate the sweep mechanism and pipeline integrity before launching the production HPO sweep.


In [None]:
from azure.ai.ml import command, sweep
from azure.ai.ml.entities import Job
from azure.ai.ml.constants import AssetTypes

TRAINING_SCRIPT_PATH = Path("../src/train.py")
DRY_RUN_JOB_NAME = "dry-run-sweep"
SMOKE_HPO_CONFIG_PATH = Path("../config/hpo/smoke.yaml")


In [None]:
def load_smoke_hpo_config() -> Dict[str, Any]:
    """
    Load smoke HPO config for dry run sweep.
    
    Returns:
        dict: Smoke HPO configuration dictionary
        
    Raises:
        FileNotFoundError: If smoke HPO config file does not exist
    """
    if not SMOKE_HPO_CONFIG_PATH.exists():
        raise FileNotFoundError(f"Smoke HPO config not found: {SMOKE_HPO_CONFIG_PATH}")
    with open(SMOKE_HPO_CONFIG_PATH, "r") as f:
        return yaml.safe_load(f)


def create_search_space(hpo_config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Convert HPO config to Azure ML search space format.
    
    Args:
        hpo_config: HPO configuration dictionary
        
    Returns:
        dict: Azure ML search space dictionary
    """
    search_space = {}
    for param_name, param_config in hpo_config["search_space"].items():
        param_type = param_config["type"]
        if param_type == "choice":
            search_space[param_name] = param_config["values"]
        elif param_type == "uniform":
            search_space[param_name] = {
                "type": "uniform",
                "min_value": param_config["min"],
                "max_value": param_config["max"],
            }
        elif param_type == "loguniform":
            search_space[param_name] = {
                "type": "loguniform",
                "min_value": param_config["min"],
                "max_value": param_config["max"],
            }
    return search_space


def create_dry_run_sweep_job_for_backbone(
    ml_client: MLClient,
    script_path: Path,
    data_asset: Data,
    environment: Environment,
    compute_cluster: str,
    backbone: str,
    configs: Dict[str, Any],
    config_metadata: Dict[str, str],
) -> sweep:
    """
    Create a dry run Azure ML Sweep Job for a specific backbone model using smoke HPO config.
    
    Args:
        ml_client: MLClient instance
        script_path: Path to training script
        data_asset: Registered data asset
        environment: Training environment
        compute_cluster: Compute cluster name
        backbone: Backbone model name (e.g., "distilbert", "deberta")
        configs: Configuration dictionaries
        config_metadata: Configuration metadata for tagging
        
    Returns:
        sweep: Azure ML Sweep Job definition
        
    Raises:
        FileNotFoundError: If training script or smoke HPO config does not exist
    """
    if not script_path.exists():
        raise FileNotFoundError(f"Training script not found: {script_path}")
    
    smoke_hpo_config = load_smoke_hpo_config()
    
    smoke_hpo_config_without_backbone = {
        "search_space": {
            k: v for k, v in smoke_hpo_config["search_space"].items() if k != "backbone"
        }
    }
    search_space_without_backbone = create_search_space(smoke_hpo_config_without_backbone)
    
    dry_run_trials_per_backbone = max(2, smoke_hpo_config["sampling"]["max_trials"] // 2)
    
    command_args = (
        f"--data-asset ${{{{inputs.data}}}} "
        f"--config-dir ../config "
        f"--backbone {backbone} "
        f"--learning-rate ${{{{search_space.learning_rate}}}} "
        f"--batch-size ${{{{search_space.batch_size}}}}"
    )
    
    trial_job = command(
        code="../src",
        command=f"python {script_path.name} {command_args}",
        inputs={
            "data": data_asset,
        },
        environment=environment,
        compute=compute_cluster,
    )
    
    sweep_job = sweep(
        trial=trial_job,
        search_space=search_space_without_backbone,
        sampling_algorithm=smoke_hpo_config["sampling"]["algorithm"],
        objective=smoke_hpo_config["objective"],
        limits={
            "max_trials": dry_run_trials_per_backbone,
            "timeout_minutes": smoke_hpo_config["sampling"]["timeout_minutes"],
        },
        experiment_name=f"{configs['env']['logging']['experiment_name']}-{backbone}",
        tags={**config_metadata, "job_type": "dry_run_sweep", "backbone": backbone},
        display_name=f"{DRY_RUN_JOB_NAME}-{backbone}",
        description=f"Dry run sweep to validate sweep mechanism for {backbone} before production HPO",
    )
    
    return sweep_job


def submit_and_wait_for_job(ml_client: MLClient, job: command | sweep) -> Job:
    """
    Submit job and wait for completion.
    
    Args:
        ml_client: MLClient instance
        job: Job definition (command or sweep)
        
    Returns:
        Job: Completed job instance
        
    Raises:
        RuntimeError: If job fails
    """
    submitted_job = ml_client.jobs.create_or_update(job)
    ml_client.jobs.stream(submitted_job.name)
    
    completed_job = ml_client.jobs.get(submitted_job.name)
    
    if completed_job.status != "Completed":
        raise RuntimeError(f"Job {completed_job.name} failed with status: {completed_job.status}")
    
    return completed_job


compute_cluster_name = configs["env"]["compute"]["training_cluster"]
smoke_hpo_config = load_smoke_hpo_config()
backbone_values = smoke_hpo_config["search_space"]["backbone"]["values"]
dry_run_sweep_jobs = {}

for backbone in backbone_values:
    dry_run_sweep_jobs[backbone] = create_dry_run_sweep_job_for_backbone(
        ml_client=ml_client,
        script_path=TRAINING_SCRIPT_PATH,
        data_asset=data_asset,
        environment=training_environment,
        compute_cluster=compute_cluster_name,
        backbone=backbone,
        configs=configs,
        config_metadata=config_metadata,
    )


In [None]:
def validate_dry_run_sweep_job(job: Job, backbone: str) -> None:
    """
    Validate dry run sweep job completed successfully.
    
    Args:
        job: Completed sweep job instance
        backbone: Backbone model name for error messages
        
    Raises:
        ValueError: If validation fails
    """
    if job.status != "Completed":
        raise ValueError(f"Dry run sweep job for {backbone} failed with status: {job.status}")
    
    if not hasattr(job, "trial_count") or job.trial_count == 0:
        raise ValueError(f"Dry run sweep job for {backbone} produced no trials")
    
    min_expected_trials = 1
    if job.trial_count < min_expected_trials:
        raise ValueError(
            f"Dry run sweep job for {backbone} only produced {job.trial_count} trial(s), "
            f"expected at least {min_expected_trials}"
        )


dry_run_completed_jobs = {}

for backbone, sweep_job in dry_run_sweep_jobs.items():
    completed_job = submit_and_wait_for_job(ml_client, sweep_job)
    validate_dry_run_sweep_job(completed_job, backbone)
    dry_run_completed_jobs[backbone] = completed_job


## Step P1-3.5: The Sweep (HPO)

Submit a production hyperparameter optimization sweep to systematically search for the best model configuration.


In [None]:
HPO_SWEEP_JOB_NAME = "hpo-sweep"


In [None]:
def create_hpo_sweep_job_for_backbone(
    ml_client: MLClient,
    script_path: Path,
    data_asset: Data,
    environment: Environment,
    compute_cluster: str,
    hpo_config: Dict[str, Any],
    backbone: str,
    configs: Dict[str, Any],
    config_metadata: Dict[str, str],
) -> sweep:
    """
    Create a production HPO sweep job for a specific backbone model.
    
    Args:
        ml_client: MLClient instance
        script_path: Path to training script
        data_asset: Registered data asset
        environment: Training environment
        compute_cluster: Compute cluster name
        hpo_config: HPO configuration dictionary (from prod.yaml)
        backbone: Backbone model name (e.g., "distilbert", "deberta")
        configs: Configuration dictionaries
        config_metadata: Configuration metadata for tagging
        
    Returns:
        sweep: Azure ML Sweep Job definition
        
    Raises:
        FileNotFoundError: If training script does not exist
    """
    if not script_path.exists():
        raise FileNotFoundError(f"Training script not found: {script_path}")
    
    hpo_config_without_backbone = {
        "search_space": {
            k: v for k, v in hpo_config["search_space"].items() if k != "backbone"
        }
    }
    search_space_without_backbone = create_search_space(hpo_config_without_backbone)
    
    command_args = (
        f"--data-asset ${{{{inputs.data}}}} "
        f"--config-dir ../config "
        f"--backbone {backbone} "
        f"--learning-rate ${{{{search_space.learning_rate}}}} "
        f"--batch-size ${{{{search_space.batch_size}}}} "
        f"--dropout ${{{{search_space.dropout}}}} "
        f"--weight-decay ${{{{search_space.weight_decay}}}}"
    )
    
    trial_job = command(
        code="../src",
        command=f"python {script_path.name} {command_args}",
        inputs={
            "data": data_asset,
        },
        environment=environment,
        compute=compute_cluster,
    )
    
    early_termination = None
    if "early_termination" in hpo_config:
        et_config = hpo_config["early_termination"]
        if et_config["policy"] == "bandit":
            from azure.ai.ml.sweep import BanditPolicy
            early_termination = BanditPolicy(
                evaluation_interval=et_config["evaluation_interval"],
                slack_factor=et_config["slack_factor"],
                delay_evaluation=et_config["delay_evaluation"],
            )
    
    sweep_job = sweep(
        trial=trial_job,
        search_space=search_space_without_backbone,
        sampling_algorithm=hpo_config["sampling"]["algorithm"],
        objective=hpo_config["objective"],
        limits={
            "max_trials": hpo_config["sampling"]["max_trials"],
            "timeout_minutes": hpo_config["sampling"]["timeout_minutes"],
        },
        early_termination_policy=early_termination,
        experiment_name=f"{configs['env']['logging']['experiment_name']}-{backbone}",
        tags={**config_metadata, "job_type": "hpo_sweep", "backbone": backbone},
        display_name=f"{HPO_SWEEP_JOB_NAME}-{backbone}",
        description=f"Production hyperparameter optimization sweep for {backbone}",
    )
    
    return sweep_job


backbone_values = configs["hpo"]["search_space"]["backbone"]["values"]
hpo_sweep_jobs = {}

for backbone in backbone_values:
    hpo_sweep_jobs[backbone] = create_hpo_sweep_job_for_backbone(
        ml_client=ml_client,
        script_path=TRAINING_SCRIPT_PATH,
        data_asset=data_asset,
        environment=training_environment,
        compute_cluster=compute_cluster_name,
        hpo_config=configs["hpo"],
        backbone=backbone,
        configs=configs,
        config_metadata=config_metadata,
    )


In [None]:
def validate_hpo_sweep_job(job: Job, backbone: str) -> None:
    """
    Validate HPO sweep job completed successfully with sufficient trials.
    
    Args:
        job: Completed sweep job instance
        backbone: Backbone model name for error messages
        
    Raises:
        ValueError: If validation fails
    """
    if job.status != "Completed":
        raise ValueError(f"HPO sweep job for {backbone} failed with status: {job.status}")
    
    if not hasattr(job, "trial_count") or job.trial_count == 0:
        raise ValueError(f"HPO sweep job for {backbone} produced no trials")
    
    min_expected_trials = 5
    if job.trial_count < min_expected_trials:
        raise ValueError(
            f"HPO sweep job for {backbone} only produced {job.trial_count} trial(s), "
            f"expected at least {min_expected_trials}"
        )


hpo_completed_jobs = {}

for backbone, sweep_job in hpo_sweep_jobs.items():
    completed_job = submit_and_wait_for_job(ml_client, sweep_job)
    validate_hpo_sweep_job(completed_job, backbone)
    hpo_completed_jobs[backbone] = completed_job


## Step P1-3.6: Best Configuration Selection (Automated)

Programmatically select the best configuration from all HPO sweep runs across all backbone models.


In [None]:
from typing import Optional

BEST_CONFIG_KEY = "best_config"


In [None]:
def get_best_trial_from_sweep(ml_client: MLClient, sweep_job: Job, objective_metric: str, goal: str) -> Optional[Job]:
    """
    Get the best trial run from a completed sweep job.
    
    Args:
        ml_client: MLClient instance
        sweep_job: Completed sweep job
        objective_metric: Metric name to optimize (e.g., "macro-f1")
        goal: Optimization goal ("maximize" or "minimize")
        
    Returns:
        Job: Best trial run, or None if no trials found
    """
    try:
        trials = ml_client.jobs.list(parent_job_name=sweep_job.name)
        trials_list = list(trials)
        
        if not trials_list:
            return None
        
        best_trial = None
        best_value = None
        
        for trial in trials_list:
            if trial.status != "Completed":
                continue
            
            if not hasattr(trial, "metrics") or not trial.metrics:
                continue
            
            if objective_metric not in trial.metrics:
                continue
            
            metric_value = trial.metrics[objective_metric]
            
            if best_value is None:
                best_value = metric_value
                best_trial = trial
            elif goal == "maximize" and metric_value > best_value:
                best_value = metric_value
                best_trial = trial
            elif goal == "minimize" and metric_value < best_value:
                best_value = metric_value
                best_trial = trial
        
        return best_trial
    except Exception:
        return None


def extract_trial_configuration(trial: Job) -> Dict[str, Any]:
    """
    Extract configuration from a trial run.
    
    Args:
        trial: Trial job instance
        
    Returns:
        dict: Extracted configuration including hyperparameters and metadata
    """
    config = {
        "trial_name": trial.name,
        "trial_id": trial.id,
        "backbone": trial.tags.get("backbone", "unknown"),
        "hyperparameters": {},
        "metrics": {},
        "dataset_version": trial.tags.get("data_version", configs["data"]["version"]),
    }
    
    if hasattr(trial, "hyperparameters") and trial.hyperparameters:
        config["hyperparameters"] = dict(trial.hyperparameters)
    
    if hasattr(trial, "metrics") and trial.metrics:
        config["metrics"] = dict(trial.metrics)
    
    return config


def select_best_configuration(
    ml_client: MLClient,
    hpo_completed_jobs: Dict[str, Job],
    hpo_config: Dict[str, Any],
) -> Dict[str, Any]:
    """
    Select the best configuration across all backbone sweep jobs.
    
    Args:
        ml_client: MLClient instance
        hpo_completed_jobs: Dictionary of completed sweep jobs by backbone
        hpo_config: HPO configuration dictionary
        
    Returns:
        dict: Best configuration with all extracted information
        
    Raises:
        ValueError: If no valid trials found or selection fails
    """
    objective_metric = hpo_config["objective"]["metric"]
    goal = hpo_config["objective"]["goal"]
    
    best_trial = None
    best_value = None
    best_backbone = None
    
    for backbone, sweep_job in hpo_completed_jobs.items():
        trial = get_best_trial_from_sweep(ml_client, sweep_job, objective_metric, goal)
        
        if trial is None:
            continue
        
        if not hasattr(trial, "metrics") or objective_metric not in trial.metrics:
            continue
        
        metric_value = trial.metrics[objective_metric]
        
        if best_value is None:
            best_value = metric_value
            best_trial = trial
            best_backbone = backbone
        elif goal == "maximize" and metric_value > best_value:
            best_value = metric_value
            best_trial = trial
            best_backbone = backbone
        elif goal == "minimize" and metric_value < best_value:
            best_value = metric_value
            best_trial = trial
            best_backbone = backbone
    
    if best_trial is None:
        raise ValueError("No valid trials found in any sweep job")
    
    best_config = extract_trial_configuration(best_trial)
    best_config["selection_criteria"] = {
        "metric": objective_metric,
        "goal": goal,
        "best_value": best_value,
        "backbone": best_backbone,
    }
    
    return best_config


best_configuration = select_best_configuration(
    ml_client=ml_client,
    hpo_completed_jobs=hpo_completed_jobs,
    hpo_config=configs["hpo"],
)


In [None]:
def log_best_configuration(best_config: Dict[str, Any]) -> None:
    """
    Log the best configuration selection for reproducibility.
    
    Args:
        best_config: Best configuration dictionary
    """
    selection_criteria = best_config["selection_criteria"]
    
    print(f"Best Configuration Selected:")
    print(f"  Backbone: {selection_criteria['backbone']}")
    print(f"  Metric: {selection_criteria['metric']} = {selection_criteria['best_value']:.4f}")
    print(f"  Hyperparameters: {best_config['hyperparameters']}")
    print(f"  Dataset Version: {best_config['dataset_version']}")
    print(f"  Trial: {best_config['trial_name']}")


log_best_configuration(best_configuration)


## Step P1-3.7: Final Training (Post-HPO, Single Run)

Train the final production model using the best configuration from HPO with stable, controlled conditions.


In [None]:
FINAL_TRAINING_JOB_NAME = "final-training"
RANDOM_SEED = 42


In [None]:
def build_final_training_config(
    best_config: Dict[str, Any],
    train_config: Dict[str, Any],
) -> Dict[str, Any]:
    """
    Build final training configuration by merging best HPO config with train.yaml defaults.
    
    Args:
        best_config: Best configuration from HPO selection
        train_config: Training defaults from train.yaml
        
    Returns:
        dict: Final training configuration
    """
    hyperparameters = best_config.get("hyperparameters", {})
    
    final_config = {
        "backbone": best_config["backbone"],
        "learning_rate": hyperparameters.get("learning_rate", train_config["training"]["learning_rate"]),
        "batch_size": hyperparameters.get("batch_size", train_config["training"]["batch_size"]),
        "dropout": hyperparameters.get("dropout", train_config["training"].get("dropout", 0.1)),
        "weight_decay": hyperparameters.get("weight_decay", train_config["training"]["weight_decay"]),
        "epochs": train_config["training"]["epochs"],
        "random_seed": RANDOM_SEED,
        "early_stopping_enabled": False,
        "use_combined_data": True,
    }
    
    return final_config


final_training_config = build_final_training_config(best_configuration, configs["train"])


In [None]:
def create_final_training_job(
    ml_client: MLClient,
    script_path: Path,
    data_asset: Data,
    environment: Environment,
    compute_cluster: str,
    final_config: Dict[str, Any],
    configs: Dict[str, Any],
    config_metadata: Dict[str, str],
) -> command:
    """
    Create final training Azure ML Command Job with best HPO configuration.
    
    Args:
        ml_client: MLClient instance
        script_path: Path to training script
        data_asset: Registered data asset
        environment: Training environment
        compute_cluster: Compute cluster name
        final_config: Final training configuration
        configs: Configuration dictionaries
        config_metadata: Configuration metadata for tagging
        
    Returns:
        command: Azure ML Command Job definition
        
    Raises:
        FileNotFoundError: If training script does not exist
    """
    if not script_path.exists():
        raise FileNotFoundError(f"Training script not found: {script_path}")
    
    command_args = (
        f"--data-asset ${{{{inputs.data}}}} "
        f"--config-dir ../config "
        f"--backbone {final_config['backbone']} "
        f"--learning-rate {final_config['learning_rate']} "
        f"--batch-size {final_config['batch_size']} "
        f"--dropout {final_config['dropout']} "
        f"--weight-decay {final_config['weight_decay']} "
        f"--epochs {final_config['epochs']} "
        f"--random-seed {final_config['random_seed']} "
        f"--early-stopping-enabled {str(final_config['early_stopping_enabled']).lower()} "
        f"--use-combined-data {str(final_config['use_combined_data']).lower()}"
    )
    
    job = command(
        code="../src",
        command=f"python {script_path.name} {command_args}",
        inputs={
            "data": data_asset,
        },
        environment=environment,
        compute=compute_cluster,
        experiment_name=configs["env"]["logging"]["experiment_name"],
        tags={
            **config_metadata,
            "job_type": "final_training",
            "backbone": final_config["backbone"],
            "best_trial": best_configuration["trial_name"],
            "best_metric_value": str(best_configuration["selection_criteria"]["best_value"]),
        },
        display_name=FINAL_TRAINING_JOB_NAME,
        description="Final production training with best HPO configuration",
    )
    
    return job


final_training_job = create_final_training_job(
    ml_client=ml_client,
    script_path=TRAINING_SCRIPT_PATH,
    data_asset=data_asset,
    environment=training_environment,
    compute_cluster=compute_cluster_name,
    final_config=final_training_config,
    configs=configs,
    config_metadata=config_metadata,
)


In [None]:
def validate_final_training_job(job: Job) -> None:
    """
    Validate final training job completed successfully with required outputs.
    
    Args:
        job: Completed job instance
        
    Raises:
        ValueError: If validation fails
    """
    if job.status != "Completed":
        raise ValueError(f"Final training job failed with status: {job.status}")
    
    if not hasattr(job, "outputs") or not job.outputs:
        raise ValueError("Final training job produced no outputs")
    
    required_outputs = ["checkpoint"]
    for output_name in required_outputs:
        if output_name not in job.outputs:
            raise ValueError(f"Final training job missing required output: {output_name}")


final_training_completed_job = submit_and_wait_for_job(ml_client, final_training_job)
validate_final_training_job(final_training_completed_job)


## Step P1-4: Model Conversion & Optimization

Convert the final training checkpoint to an optimized ONNX model (int8 quantized) for production inference.


In [None]:
CONVERSION_SCRIPT_PATH = Path("../src/convert_to_onnx.py")
CONVERSION_JOB_NAME = "model-conversion"


In [None]:
def get_checkpoint_output_from_training_job(training_job: Job):
    """
    Get checkpoint output object from completed training job.
    
    Args:
        training_job: Completed training job
        
    Returns:
        Checkpoint output object
        
    Raises:
        ValueError: If checkpoint not found in job outputs
    """
    if not hasattr(training_job, "outputs") or not training_job.outputs:
        raise ValueError("Training job produced no outputs")
    
    if "checkpoint" not in training_job.outputs:
        raise ValueError("Training job missing 'checkpoint' output")
    
    return training_job.outputs["checkpoint"]


checkpoint_output = get_checkpoint_output_from_training_job(final_training_completed_job)


In [None]:
def create_conversion_job(
    ml_client: MLClient,
    script_path: Path,
    checkpoint_output,
    environment: Environment,
    compute_cluster: str,
    configs: Dict[str, Any],
    config_metadata: Dict[str, str],
    best_config: Dict[str, Any],
) -> command:
    """
    Create Azure ML Command Job for model conversion to ONNX with int8 quantization.
    
    Args:
        ml_client: MLClient instance
        script_path: Path to conversion script
        checkpoint_output: Checkpoint output from training job
        environment: Training environment (reused for conversion)
        compute_cluster: CPU compute cluster name
        configs: Configuration dictionaries
        config_metadata: Configuration metadata for tagging
        best_config: Best configuration from HPO selection
        
    Returns:
        command: Azure ML Command Job definition
        
    Raises:
        FileNotFoundError: If conversion script does not exist
    """
    if not script_path.exists():
        raise FileNotFoundError(f"Conversion script not found: {script_path}")
    
    command_args = (
        f"--checkpoint-path ${{{{inputs.checkpoint}}}} "
        f"--config-dir ../config "
        f"--backbone {best_config['backbone']} "
        f"--output-dir ${{{{outputs.onnx_model}}}} "
        f"--quantize-int8 "
        f"--run-smoke-test"
    )
    
    job = command(
        code="../src",
        command=f"python {script_path.name} {command_args}",
        inputs={
            "checkpoint": checkpoint_output,
        },
        outputs={
            "onnx_model": None,
        },
        environment=environment,
        compute=compute_cluster,
        experiment_name=configs["env"]["logging"]["experiment_name"],
        tags={
            **config_metadata,
            "job_type": "model_conversion",
            "backbone": best_config["backbone"],
            "source_training_job": final_training_completed_job.name,
            "quantization": "int8",
        },
        display_name=CONVERSION_JOB_NAME,
        description="Convert PyTorch checkpoint to optimized ONNX model (int8 quantized)",
    )
    
    return job


conversion_cluster_name = configs["env"]["compute"]["conversion_cluster"]
conversion_job = create_conversion_job(
    ml_client=ml_client,
    script_path=CONVERSION_SCRIPT_PATH,
    checkpoint_output=checkpoint_output,
    environment=training_environment,
    compute_cluster=conversion_cluster_name,
    configs=configs,
    config_metadata=config_metadata,
    best_config=best_configuration,
)


In [None]:
def validate_conversion_job(job: Job) -> None:
    """
    Validate conversion job completed successfully with required ONNX model output.
    
    Args:
        job: Completed job instance
        
    Raises:
        ValueError: If validation fails
    """
    if job.status != "Completed":
        raise ValueError(f"Conversion job failed with status: {job.status}")
    
    if not hasattr(job, "outputs") or not job.outputs:
        raise ValueError("Conversion job produced no outputs")
    
    if "onnx_model" not in job.outputs:
        raise ValueError("Conversion job missing required output: onnx_model")
    
    onnx_output = job.outputs["onnx_model"]
    if hasattr(onnx_output, "path"):
        onnx_path = onnx_output.path
    elif isinstance(onnx_output, str):
        onnx_path = onnx_output
    else:
        raise ValueError(f"Unexpected ONNX output type: {type(onnx_output)}")
    
    if not onnx_path or not onnx_path.endswith(".onnx"):
        raise ValueError(f"Invalid ONNX model path: {onnx_path}")


conversion_completed_job = submit_and_wait_for_job(ml_client, conversion_job)
validate_conversion_job(conversion_completed_job)


## Step P1-5: Model Registration (The Handover)

Register the optimized ONNX model in Azure ML Model Registry with full metadata for production deployment.


In [None]:
from azure.ai.ml.entities import Model
from azure.core.exceptions import ResourceNotFoundError

MODEL_NAME = "resume-ner-onnx"
PROD_STAGE = "prod"


In [None]:
def get_onnx_model_path(conversion_job: Job) -> str:
    """
    Get ONNX model path from completed conversion job.
    
    Args:
        conversion_job: Completed conversion job
        
    Returns:
        str: ONNX model path (Azure ML datastore URI)
        
    Raises:
        ValueError: If ONNX model not found in job outputs
    """
    if not hasattr(conversion_job, "outputs") or not conversion_job.outputs:
        raise ValueError("Conversion job produced no outputs")
    
    if "onnx_model" not in conversion_job.outputs:
        raise ValueError("Conversion job missing 'onnx_model' output")
    
    onnx_output = conversion_job.outputs["onnx_model"]
    
    if hasattr(onnx_output, "path"):
        return onnx_output.path
    elif isinstance(onnx_output, str):
        return onnx_output
    else:
        raise ValueError(f"Unexpected ONNX output type: {type(onnx_output)}")


onnx_model_path = get_onnx_model_path(conversion_completed_job)


In [None]:
def compute_model_version(best_config: Dict[str, Any], config_hashes: Dict[str, str]) -> str:
    """
    Compute deterministic model version from configuration hashes.
    
    Args:
        best_config: Best configuration from HPO selection
        config_hashes: Configuration hashes dictionary
        
    Returns:
        str: Model version string
    """
    version_components = [
        config_hashes["data"],
        config_hashes["model"],
        config_hashes["train"],
        best_config["backbone"],
    ]
    version_str = "_".join(version_components)
    version_hash = hashlib.sha256(version_str.encode()).hexdigest()[:CONFIG_HASH_LENGTH]
    return f"v{version_hash}"


model_version = compute_model_version(best_configuration, config_hashes)


In [None]:
def register_production_model(
    ml_client: MLClient,
    model_name: str,
    model_version: str,
    model_path: str,
    best_config: Dict[str, Any],
    configs: Dict[str, Any],
    config_metadata: Dict[str, str],
) -> Model:
    """
    Register optimized ONNX model in Azure ML Model Registry.
    
    Args:
        ml_client: MLClient instance
        model_name: Model name in registry
        model_version: Model version
        model_path: Path to ONNX model (Azure ML datastore URI)
        best_config: Best configuration from HPO selection
        configs: Configuration dictionaries
        config_metadata: Configuration metadata for tagging
        
    Returns:
        Model: Registered model instance
        
    Raises:
        ValueError: If model path is invalid
    """
    if not model_path or not model_path.endswith(".onnx"):
        raise ValueError(f"Invalid ONNX model path: {model_path}")
    
    selection_criteria = best_config["selection_criteria"]
    
    model_description = (
        f"Production ONNX model for Resume NER. "
        f"Backbone: {selection_criteria['backbone']}, "
        f"Metric: {selection_criteria['metric']}={selection_criteria['best_value']:.4f}"
    )
    
    model_tags = {
        **config_metadata,
        "stage": PROD_STAGE,
        "backbone": selection_criteria["backbone"],
        "metric": selection_criteria["metric"],
        "metric_value": str(selection_criteria["best_value"]),
        "dataset_version": best_config["dataset_version"],
        "model_format": "onnx",
        "quantization": "int8",
        "source_training_job": final_training_completed_job.name,
        "source_conversion_job": conversion_completed_job.name,
    }
    
    model = Model(
        name=model_name,
        version=model_version,
        description=model_description,
        path=model_path,
        tags=model_tags,
    )
    
    try:
        existing_model = ml_client.models.get(name=model_name, version=model_version)
        return existing_model
    except ResourceNotFoundError:
        return ml_client.models.create_or_update(model)


registered_model = register_production_model(
    ml_client=ml_client,
    model_name=MODEL_NAME,
    model_version=model_version,
    model_path=onnx_model_path,
    best_config=best_configuration,
    configs=configs,
    config_metadata=config_metadata,
)


In [None]:
def validate_registered_model(model: Model) -> None:
    """
    Validate registered model has required metadata and tags.
    
    Args:
        model: Registered model instance
        
    Raises:
        ValueError: If validation fails
    """
    required_tags = ["stage", "backbone", "metric", "dataset_version"]
    for tag in required_tags:
        if tag not in model.tags:
            raise ValueError(f"Registered model missing required tag: {tag}")
    
    if model.tags.get("stage") != PROD_STAGE:
        raise ValueError(f"Model stage must be '{PROD_STAGE}', got: {model.tags.get('stage')}")
    
    if not model.path or not model.path.endswith(".onnx"):
        raise ValueError(f"Invalid model path: {model.path}")


validate_registered_model(registered_model)


In [None]:
print(f"Model Registered Successfully:")
print(f"  Name: {registered_model.name}")
print(f"  Version: {registered_model.version}")
print(f"  Path: {registered_model.path}")
print(f"  Stage: {registered_model.tags.get('stage')}")
print(f"  Backbone: {registered_model.tags.get('backbone')}")
print(f"  Metric: {registered_model.tags.get('metric')} = {registered_model.tags.get('metric_value')}")