In [None]:
# pip install --upgrade azureml-core
# pip install --upgrade azureml-sdk[notebooks,contrib]
# pip install azure-ai-ml
# pip install azure-identity
from azureml.core import Workspace, Dataset
from azureml.core import ScriptRunConfig, Environment, Experiment, Workspace, Dataset, Model
from azureml.core.runconfig import PyTorchConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.data import OutputFileDatasetConfig
from azureml.core.container_registry import ContainerRegistry
import json
import os
import shutil
from tqdm import tqdm
from multiprocessing import Pool
import re
import pandas as pd
from azure.ai.ml import command, PyTorchDistribution
from azure.ai.ml.entities import JupyterLabJobService, SshJobService, TensorBoardJobService, VsCodeJobService
from azure.ai.ml.entities._assets.environment import Environment
from azure.identity import DefaultAzureCredential, ManagedIdentityCredential, AzureCliCredential
from azure.ai.ml import MLClient, Input, Output
from azure.ai.ml.constants import AssetTypes, InputOutputModes
from textwrap import dedent

In [None]:
with open('/home/t-ziyanwang/.bashrc', 'r') as f:
    bashrc = f.read() 
# Get WANDB_TOKEN and HF_TOKEN from bashrc
wandb_token = re.search(r'export WANDB_TOKEN=(.*)', bashrc)
hf_token = re.search(r'export HF_TOKEN=(.*)', bashrc)
if wandb_token:
    wandb_token = wandb_token.group(1).strip().strip('"')
if hf_token:
    hf_token = hf_token.group(1).strip().strip('"')
# print(f'WANDB_TOKEN: {wandb_token}')
# print(f'HF_TOKEN: {hf_token}')

In [None]:
print("WANDB_TOKEN:", wandb_token)
print("HF_TOKEN:", hf_token)

In [None]:
import os
import posixpath
os.chdir('/home/t-ziyanwang/intern/ma_dagger')
print('CWD set to', os.getcwd())

online_env_name = "vllm-openai-0-9-1-custom"

candidate_vcs = [
    # ("msrresrchvc", "Singularity.NC96ad_A100_v4", "Premium", "High", 4),
    # ("msrresrchvc", "Singularity.ND48_v4", "Premium", "High", 4),
    ("msrresrchvc", "Singularity.ND96amrs_A100_v4", "Basic", "High", 8),
    ("msrresrchbasicvc", "Singularity.NC96ad_A100_v4", "Standard", "High", 8),
    ("msrresrchbasicvc", "Singularity.ND96amrs_A100_v4", "Basic", "High", 8),
    ("msrresrchbasicvc", "Singularity.ND96_v4", "Basic", "High", 8),
    ("msrresrchbasicvc", "Singularity.ND96_H100_v5", "Basic", "High", 8),
    ("msrresrchvc", "Singularity.ND48_H100_v5", "Premium", "High", 8),
]

candidate_models = [
    ("qwen3-4b", "Qwen/Qwen3-4B"),
]

candidate_datasets = [
    ("aimo", "AI-MO/NuminaMath-CoT", 8),
]

output_base = f'azureml://subscriptions/d4fe558f-6660-4fe7-99ec-ae4716b5e03f/resourcegroups/aifrontiers/workspaces/aifrontiers_ws/datastores/ziyanwang_data/paths/'
num_shards = len(candidate_vcs)

for shard_id, (vc, instance_type, sla_tier, priority, gpu_num) in enumerate(candidate_vcs):
    for model_short_name, model_full_name in candidate_models:
        for dataset_short_name, dataset_full_name, n in candidate_datasets:
            run_base = f"collect_data_{model_short_name}_{dataset_short_name}"
            vc_tag = f"{vc}-{instance_type}".replace('.', '-')
            shard_tag = f"shard{shard_id:02d}"
            runid = f"p1-madagger-{dataset_short_name}-n{n}-{model_short_name}-{vc_tag}-{shard_tag}"
            experiment_name = 'ma_dagger'
            remote_subdir = posixpath.join(run_base, f"{vc_tag}_{shard_tag}")
            job_command_list = [
                'export WANDB_API_KEY=5f642e1080557e1b07a844b75f8f580e7ff47791 && export WANDB_TOKEN=5f642e1080557e1b07a844b75f8f580e7ff47791 && export WANDB_ENTITY=kcl_coopai && export WANDB_PROJECT=madagger &&',
                'huggingface-cli login --token ${{inputs.hf_token}} &&',
                'wandb login ${WANDB_API_KEY} --host https://api.wandb.ai &&',
                'free -h &&',
                'pip3 install -r requirements.txt && pip3 install -e ./rllm && pip3 install -e ./collect_traces/openai-python &&',
                'export GPU_COUNT=$(nvidia-smi -L | wc -l) && '
                'if [ "$GPU_COUNT" -ge 8 ]; then GEN_CUDA="0,1,2,3"; VER_CUDA="4,5,6,7"; TP_PER_STUDENT=4; '
                'elif [ "$GPU_COUNT" -ge 4 ]; then GEN_CUDA="0,1"; VER_CUDA="2,3"; TP_PER_STUDENT=2; '
                'else GEN_CUDA="0"; VER_CUDA="1"; TP_PER_STUDENT=1; fi &&',
                'echo "gen gpus=$GEN_CUDA ver gpus=$VER_CUDA tp=$TP_PER_STUDENT" &&',
                f"mkdir -p ${{outputs.output_dir}} && VLLM_WORKER_MULTIPROC_METHOD=spawn WANDB_API_KEY=5f642e1080557e1b07a844b75f8f580e7ff47791 WANDB_ENTITY=kcl_coopai WANDB_PROJECT=madagger "
                f"python3 gen_ver_dagger_fullft_vllm.py iterate "
                f"--dataset_name {dataset_short_name} --split train --batch_tasks {n} "
                f"--teacher_backend triapi --teacher_triapi_instance gcr/shared "
                f"--teacher_triapi_deployment gpt-5.1-chat_2025-11-13 "
                f"--teacher_triapi_scope api://trapi/.default --teacher_triapi_api_version 2024-12-01-preview "
                f"--teacher_triapi_max_parallel 64 --teacher_triapi_timeout 300 "
                f"--gen_base {model_full_name} --ver_base {model_full_name} "
                f"--gen_tokenizer {model_full_name} --ver_tokenizer {model_full_name} "
                f"--tp_s $TP_PER_STUDENT --tp_t 1 --gen_cuda $GEN_CUDA --ver_cuda $VER_CUDA "
                f"--parallel 64 --out_dir ${{outputs.output_dir}} --project_name madagger --experiment_name {runid}"
            ]
            inputs = {
                'wandb_token': wandb_token,
                'hf_token': hf_token,
            }
            output_dir_path = f"{output_base.rstrip('/')}/{remote_subdir}"
            outputs = {
                'output_dir': Output(type=AssetTypes.URI_FOLDER, path=output_dir_path, mode=InputOutputModes.RW_MOUNT)
            }

            node_count = 1
            process_count_per_node = 1
            job_name = runid

            subscription_id = os.getenv("SUBSCRIPTION_ID", default="d4fe558f-6660-4fe7-99ec-ae4716b5e03f")
            resource_group = os.getenv("RESOURCEGROUP_NAME", default="aifrontiers")
            workspace_name = os.getenv("WORKSPACE_NAME", default="aifrontiers_ws")

            class vc_info:
                def __init__(self, subscription_id= "156138e5-a3b1-48af-9e2e-883f4df3f457", resource_group="gcr-singularity-lab", vc="dell1"):
                    self.subscription_id = subscription_id
                    self.resource_group = resource_group
                    self.vc = vc
                    self.compute_config= "/subscriptions/"+ subscription_id +"/resourceGroups/"+ resource_group +"/providers/Microsoft.MachineLearningServices/virtualclusters/" + vc
            if vc in ["dell1", "kings01", "kings02", "kings03", "kings04", "kings05", "kings06", "kings07", "kings08", "kings09", "kings10", "kings11", "kings12", "mckinley01", "mckinley02", "mckinley03", "mckinley04", "mckinley05", "mckinley06", "mckinley07", "mckinley08", "barlow01", "barlow02", "barlow03", "barlow04", "barlow05", "barlow06", "barlow07", "barlow08", "barlow09", "msrresrchlab"]:
                vc_info= vc_info(subscription_id= "156138e5-a3b1-48af-9e2e-883f4df3f457", resource_group="gcr-singularity-lab", vc=vc)
            elif vc in ["baltic01", "baltic02", "baltic03", "baltic04", "baltic05", "baltic06", "baltic07", "baltic08", "baltic09", "baltic10", "baltic11", "baltic12", "huashanvc1", "huashanvc2", "huashanvc3", "huashanvc4"]:
                vc_info= vc_info(subscription_id= "22da88f6-1210-4de2-a5a3-da4c7c2a1213", resource_group="gcr-singularity", vc=vc)
            elif vc in ["msrresrchvc"]:
                vc_info= vc_info(subscription_id= "22da88f6-1210-4de2-a5a3-da4c7c2a1213", resource_group="gcr-singularity-resrch", vc=vc)
            elif vc in ["msroctovc", 'whitney00', 'whitney08', 'palisades04', 'whitney14']:
                vc_info= vc_info(subscription_id= "d4404794-ab5b-48de-b7c7-ec1fefb0a04e", resource_group="gcr-singularity-octo", vc=vc)
            elif vc in ['msrresrchbasicvc']:
                vc_info = vc_info(subscription_id= "22da88f6-1210-4de2-a5a3-da4c7c2a1213", resource_group="gcr-singularity", vc=vc)
            elif vc in ['msroctobasicvc']:
                vc_info = vc_info(subscription_id= "d4404794-ab5b-48de-b7c7-ec1fefb0a04e", resource_group="gcr-singularity-octo", vc=vc)
            else:
                raise ValueError(f"Unknown virtual cluster {vc}, please check the list of available VCs in the documentation.")

            ml_client = MLClient(
                AzureCliCredential(), subscription_id, resource_group, workspace_name
            )
            print(ml_client)
            print(vc)
            print(outputs)

            vc_config = {
                "instance_type": instance_type,
                "instance_count": node_count,
                "properties" : {
                    "AISuperComputer" : {
                        "interactive" : False,
                        "slaTier": sla_tier,
                        "priority": priority,
                        "tensorboardLogDirectory": "/scratch/tensorboard_logs",
                    }
                }
            }

            env = ml_client.environments.get(name=online_env_name, version="1")

            job = command(
                code='.',
                command=' '.join(job_command_list),
                inputs=inputs,
                outputs=outputs,
                environment=env,
                environment_variables={
                    'JOB_EXECUTION_MODE': "basic",
                    'AZUREML_COMPUTE_USE_COMMON_RUNTIME': 'true',
                    '_AZUREML_SINGULARITY_JOB_UAI': '/subscriptions/d4fe558f-6660-4fe7-99ec-ae4716b5e03f/resourcegroups/aifrontiers/providers/Microsoft.ManagedIdentity/userAssignedIdentities/aifrontiers',
                },
                compute=vc_info.compute_config,
                resources=vc_config,
                instance_count=node_count,
                display_name=job_name,
                experiment_name=experiment_name,
                distribution= {
                    "type": "PyTorch",
                },
            )

            print(job.command)

            returned_job = ml_client.jobs.create_or_update(job)
            print(f"Job URL: {returned_job.studio_url}")



In [None]:
# Merge shard outputs into a single dataset after jobs finish
import subprocess
from pathlib import Path

model_short_name = "qwen3-8b"
dataset_short_name = "aimo"
run_base_dir = Path("collect_traces/collect_traces/runs") / f"collect_data_{model_short_name}_{dataset_short_name}"

if not run_base_dir.exists():
    raise FileNotFoundError(f"{run_base_dir} not found. Download job outputs locally before merging.")

merged_output_root = Path("collect_traces/datasets")
merged_output_root.mkdir(parents=True, exist_ok=True)

merge_cmd = ["python", "collect_traces/build_dataset.py",
             "--gen_data_dir", str(run_base_dir),
             "--output_path", str(merged_output_root),
             "--keep", "firstfull",
             "--recursive"]
print("Running:", ' '.join(merge_cmd))
subprocess.run(merge_cmd, check=True)
