In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
%%html
<style>
.cell-output-ipywidget-background {
    background-color: transparent !important;
}
:root {
    --jp-widgets-color: var(--vscode-editor-foreground);
    --jp-widgets-font-size: var(--vscode-editor-font-size);
}  
</style>

In [3]:
import asyncio
from itertools import cycle, islice
from lib import models
from lib.grpo import GRPO
from lib.nyt_connections import get_connections_games, get_connections_tasks
from lib.pack import packed_tensors_from_tokenized_results, plot_packed_tensors
from lib.recipe import ComponentConfig, TuneRecipeConfig
from lib.tasks import ChatCompletionParams, get_task_results
from lib.tokenize import TaskResultTokenizer
from lib.tune import clear_iteration_dirs, get_iteration, last_tune_log, tune, Verbosity
from lib.vllm import start_vllm, kill_vllm_workers
import polars as pl
import random
import torch
from transformers import AutoTokenizer
import wandb

run_name = "017"
run = wandb.init(
    project="grpo-tests",
    name=run_name,
    id=run_name,
    resume="allow",
    config={"task": "nyt-connections"},
)

games = get_connections_games()
tasks = list(
    islice(get_connections_tasks(games, parse_answers_liberally=False), len(games) * 2)
)
distill_tasks = tasks[:436]
val_tasks = tasks[436:508]
test_tasks = tasks[508 : len(games)]
train_tasks = tasks[len(games) : len(games) + 436]
random.seed(42)
random.shuffle(train_tasks)
len(distill_tasks), len(val_tasks), len(test_tasks), len(train_tasks)

[34m[1mwandb[0m: Currently logged in as: [33mbradhilton[0m to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin
[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.


(436, 72, 92, 436)

In [4]:
# GRPO params
wandb.config["clip_epsilon"] = clip_epsilon = 0.2
wandb.config["entropy_coef"] = entropy_coef = 0.0
wandb.config["kl_coef"] = kl_coef = 0.0

expected_tokens = 4000  # Expected completion tokens per task sample
wandb.config["lr"] = lr = 5e-6
model = models.llama_70b()
wandb.config["model"] = model_name = model.base_model
num_iterations = 10
output_dir = f"./models/{run_name}"
wandb.config["samples_per_task"] = samples_per_task = 64
wandb.config["seq_len"] = seq_len = 16384
wandb.config["stride"] = stride = 8
wandb.config["tasks_per_iter"] = tasks_per_iter = 8
tokenizer = AutoTokenizer.from_pretrained(model.base_model)
verbosity: Verbosity = 2

tokenizer_config.json:   0%|          | 0.00/3.07k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/9.08M [00:00<?, ?B/s]

In [5]:
vllm = await start_vllm(
    model_name,
    max_concurrent_requests=512,
    named_arguments=dict(
        block_size=32,
        disable_log_requests=True,
        enable_prefix_caching=True,
        enforce_eager=True,
        gpu_memory_utilization=0.95,
        max_model_len=16384,
        max_num_seqs=512,
        max_num_batched_tokens=16384,
        num_scheduler_steps=8,
        preemption_mode="swap",
        return_tokens_as_token_ids=True,
        swap_space=80,
        tensor_parallel_size=torch.cuda.device_count(),
    ),
    timeout=180 + 15 * torch.cuda.device_count(),
    verbosity=verbosity,
)

$ vllm serve deepseek-ai/DeepSeek-R1-Distill-Llama-70B --block-size=32 --disable-log-requests --enable-prefix-caching --enforce-eager --gpu-memory-utilization=0.95 --max-model-len=16384 --max-num-seqs=512 --max-num-batched-tokens=16384 --num-scheduler-steps=8 --preemption-mode=swap --return-tokens-as-token-ids --swap-space=80 --tensor-parallel-size=8 --port=8000 --api-key=default
INFO 02-19 19:06:27 __init__.py:190] Automatically detected platform cuda.
INFO 02-19 19:06:29 api_server.py:840] vLLM API server version 0.7.2
INFO 02-19 19:06:29 api_server.py:841] args: Namespace(subparser='serve', model_tag='deepseek-ai/DeepSeek-R1-Distill-Llama-70B', config='', host=None, port=8000, uvicorn_log_level='info', allow_credentials=False, allowed_origins=['*'], allowed_methods=['*'], allowed_headers=['*'], api_key='default', lora_modules=None, prompt_adapters=None, chat_template=None, chat_template_content_format='auto', response_role='assistant', ssl_keyfile=None, ssl_certfile=None, ssl_ca_cer

Traceback (most recent call last):
  File "/home/ubuntu/sky_workdir/.venv/bin/vllm", line 10, in <module>
    sys.exit(main())
             ^^^^^^
  File "/home/ubuntu/sky_workdir/.venv/lib/python3.12/site-packages/vllm/scripts.py", line 204, in main
    args.dispatch_function(args)
  File "/home/ubuntu/sky_workdir/.venv/lib/python3.12/site-packages/vllm/scripts.py", line 44, in serve
    uvloop.run(run_server(args))
  File "/home/ubuntu/sky_workdir/.venv/lib/python3.12/site-packages/uvloop/__init__.py", line 109, in run
    return __asyncio.run(
           ^^^^^^^^^^^^^^
  File "/home/ubuntu/.local/share/uv/python/cpython-3.12.9-linux-x86_64-gnu/lib/python3.12/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/home/ubuntu/.local/share/uv/python/cpython-3.12.9-linux-x86_64-gnu/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "uvloo

CancelledError: 

In [None]:
i = 1
semaphore = asyncio.Semaphore(int(1.2 * vllm.max_concurrent_tokens / expected_tokens))
offset = (i - 1) * stride
(_, val_stats), (train_results, train_stats) = await asyncio.gather(
    get_task_results(
        tasks=val_tasks,
        client=vllm.client,
        model=vllm.model,
        cache=False,
        log_results=8,
        params=ChatCompletionParams(
            stream_options={
                "include_usage": True,
            },
        ),
        pbar_desc="val",
        semaphore=semaphore,
    ),
    get_task_results(
        tasks=list(islice(cycle(train_tasks), offset, offset + tasks_per_iter)),
        client=vllm.client,
        model=vllm.model,
        cache=False,
        log_results=False,
        n=samples_per_task,
        params=ChatCompletionParams(
            stream_options={
                "include_usage": True,
            },
        ),
        pbar_desc="train",
        semaphore=semaphore,
        transform=TaskResultTokenizer(tokenizer),
    ),
)

In [15]:
for i in range(get_iteration(output_dir), num_iterations + 1):

    
    assert val_stats.grades > 0
    assert val_stats.usages > 0
    wandb_data = {
        "iteration": i,
        "exceptions": val_stats.exceptions + train_stats.exceptions,
        "reward": val_stats.total_reward / val_stats.grades,
        "tokens": val_stats.completion_tokens / val_stats.usages,
    }
    try:
        wandb_data.update(
            pl.DataFrame(last_tune_log(output_dir)).drop("step").mean().to_dicts()[0]
        )
    except Exception:
        pass
    wandb.log(wandb_data)
    expected_tokens = round(wandb_data["tokens"])
    vllm.process.terminate()
    kill_vllm_workers()
    try:
        best_iteration = (
            wandb.Api()
            .run(f"{run.entity}/{run.project}/{run.id}")
            .history()
            .sort_values(by="reward")["iteration"]
            .iloc[-1]
        )
        clear_iteration_dirs(output_dir, [best_iteration, get_iteration(output_dir) - 1])
    except Exception:
        pass
    packed_tensors = packed_tensors_from_tokenized_results(
        [
            result
            for results in train_results
            for result in results
            if result.advantage != 0
        ],
        seq_len=seq_len,
        pad_token_id=tokenizer.pad_token_id,  # type: ignore
    )
    if verbosity == 2:
        plot_packed_tensors(packed_tensors)
    else:
        print(f"Packed tensors into {packed_tensors["tokens"].size()} shape")
    optimizer_config = ComponentConfig(model.tune_optimizer, lr=lr)
    if model.tune_optimizer == "torch.optim.AdamW":
        optimizer_config.fused = True
    model_name = await tune(
        base_model=model.base_model,
        output_dir=output_dir,
        packed_tensors=packed_tensors,
        model=model.tune_model,
        model_type=model.tune_model_type,
        config=TuneRecipeConfig(
            optimizer=optimizer_config,
            loss=ComponentConfig(
                GRPO,
                clip_epsilon=clip_epsilon,
                entropy_coef=entropy_coef,
                kl_coef=kl_coef,
            ),
            shuffle=True,
            batch_size=model.tune_max_batch_tokens // seq_len,
            fsdp_cpu_offload=model.tune_fsdp_cpu_offload,
            enable_activation_checkpointing=True,
            enable_activation_offloading=True,
            custom_sharded_layers=["tok_embeddings", "output"],
            num_output_chunks=8,
        ),
        verbosity=verbosity,
    )
wandb.finish()

$ tune run --nproc-per-node=4 lib.recipe.TuneRecipe --config ./models/016/config.yaml
Running with torchrun...


W0219 16:51:15.158000 27072 torch/distributed/run.py:793] 
W0219 16:51:15.158000 27072 torch/distributed/run.py:793] *****************************************
W0219 16:51:15.158000 27072 torch/distributed/run.py:793] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
W0219 16:51:15.158000 27072 torch/distributed/run.py:793] *****************************************
INFO:torchtune.utils._logging:Set intra op parallelism no. of threads to 26
INFO:torchtune.utils._logging:Set intra op parallelism no. of threads to 26
INFO:torchtune.utils._logging:Set intra op parallelism no. of threads to 26
INFO:torchtune.utils._logging:Set intra op parallelism no. of threads to 26
INFO:torchtune.utils._logging:Running FullFinetuneRecipe with resolved config:

batch_size: 3
checkpointer:
  _component_: torchtune.training.checkpointing._checkpointer

Writing logs to models/016/logs/log_1739983880.txt


INFO:torchtune.utils._logging:FSDP is enabled. Instantiating model and loading checkpoint on Rank 0 ...
INFO:torchtune.utils._logging:Instantiating model and loading checkpoint took 29.26 secs
INFO:torchtune.utils._logging:Memory stats after model init:
	GPU peak memory allocation: 7.81 GiB
	GPU peak memory reserved: 8.04 GiB
	GPU peak memory active: 7.81 GiB
INFO:torchtune.utils._logging:Optimizer is initialized.
INFO:torchtune.utils._logging:Loss is initialized.
INFO:torchtune.utils._logging:Dataset and Sampler are initialized.
INFO:torchtune.utils._logging: Profiler config after instantiation: {'enabled': False}
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/home/gcpuser/sky_workdir/experiments/lib/recipe.py", line 1322, in <module>
    sys.exit(config.parse(recipe_main)())  # type: ignore
  File "/home/gcpuser/sky_workdir/.venv/lib/python3.12/site-packages/torchtune/config/_parse.py", line 99, in wrapper
    sys.exi

AssertionError: No model checkpoint files found to save in output directory ./models/016