In [None]:
%%capture
import os
!pip install weave==0.50.0

!uv pip install openpipe-art[backend]==0.4.11  --prerelease allow --no-cache-dir

try:
    import numpy

    get_numpy = f"numpy=={numpy.__version__}"
except:
    get_numpy = "numpy"
try:
    import subprocess

    is_t4 = "Tesla T4" in str(subprocess.check_output(["nvidia-smi"]))
except:
    is_t4 = False
get_vllm, get_triton = (
    ("vllm==0.9.2", "triton==3.2.0") if is_t4 else ("vllm", "triton")
)
!uv pip install --upgrade \
    openpipe-art[backend]==0.4.11 litellm[proxy] "protobuf==5.29.5" {get_vllm} {get_numpy} --prerelease allow --no-cache-dir
!uv pip install -qqq {get_triton}

In [None]:
# Required - Used for generating training inputs and RULER evaluation
OPENROUTER_API_KEY = "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"

# Optional - Enables metric logging
WANDB_API_KEY = "fb07fe60de81f8074f5b355835a465661106fc76"

# Describe your custom task (be specific!)
MY_TASK_DESCRIPTION = """
Read the user's text and rewrite it in a different style while preserving the original meaning.
The style should be changed according to the instructions (e.g., sarcastic, poetic, formal, casual, Gen-Z slang).

For example, if the user's text is "I had a good day." and the style requested is sarcastic,
the output could be:

"Oh yeah, because absolutely nothing screams excitement like eating instant noodles alone all day."

If the style requested is poetic, the output could be:

"The sun caressed my weary soul, and joy bloomed quietly within."
"""

TASK_DESCRIPTION = MY_TASK_DESCRIPTION  # See more tasks in Advanced Settings

# Choose the base model to train
BASE_MODEL = "Qwen/Qwen2.5-1.5B-Instruct"  # Options: "Qwen/Qwen2.5-3B-Instruct", "Qwen/Qwen2.5-7B-Instruct", etc.

In [None]:
# Model configuration
MODEL_NAME = "rl-model-002"  # Name for your trained model
PROJECT_NAME = "auto-rl"  # Project name for tracking

# Training configuration
TRAINING_CONFIG = {
    "num_training_inputs": 25,  # Number of training inputs to generate
    "groups_per_step": 3,  # Inputs to process per training step
    "num_epochs": 3,  # Number of times through all data
    "rollouts_per_group": 4,  # Different responses per input (for RULER comparison)
    "learning_rate": 5e-6,  # Learning rate
    "max_training_steps": 1500,  # Maximum training steps (set to None for no limit)
}

NUM_TEST_INPUTS = 5  # Number of test inputs to generate
RULER_MODEL = "openrouter/deepseek/deepseek-r1-0528"  # Model for RULER evaluation
SYSTEM_PROMPT_GENERATION_MODEL = "openrouter/moonshotai/kimi-k2"
INPUT_GENERATION_MODEL = "openrouter/moonshotai/kimi-k2"

# GPU configuration (for T4 — keep these as-is unless you have a reason to change them)
MAX_SEQ_LENGTH = 4096  # Maximum sequence length
GPU_MEMORY_UTILIZATION = 0.7  # GPU memory usage (0.0-1.0)

In [None]:
import os
import random
from typing import List

import torch
import weave
from dotenv import load_dotenv
from litellm import acompletion
from pydantic import BaseModel, Field

import art
from art.local import LocalBackend
from art.rewards import ruler_score_group
from art.utils import iterate_dataset
from art.utils.litellm import convert_litellm_choice_to_openai

load_dotenv()

# Required
if OPENROUTER_API_KEY:
    os.environ["OPENROUTER_API_KEY"] = OPENROUTER_API_KEY
else:
    raise ValueError(
        "OPENROUTER_API_KEY is required for data generation and RULER evaluation."
    )

# Optional
if WANDB_API_KEY:
    os.environ["WANDB_API_KEY"] = WANDB_API_KEY
else:
    print("WANDB_API_KEY is not set. We'll skip logging metrics to Weights & Biases.")


class TrainingInput(BaseModel):
    input: str = Field(description="The input text for the task")


class TrainingDataset(BaseModel):
    inputs: List[TrainingInput] = Field(description="List of training inputs")


async def generate_training_inputs(
    task_description: str, num_examples: int = 50
) -> List[str]:
    """Generate diverse training inputs for the given task"""

    system_prompt = f"""You are a helpful assistant that generates diverse, high-quality training inputs.

Task: {task_description}

Generate {num_examples} diverse INPUT examples that someone might provide for this task.
Make sure the inputs:
1. Cover a wide range of cases and edge cases
2. Are realistic and practical
3. Vary in length and complexity
4. Represent real-world scenarios

Only generate the INPUTS, not the outputs. RULER will evaluate the model's attempts automatically.
"""

    messages = [
        {"role": "system", "content": system_prompt},
        {
            "role": "user",
            "content": f"Generate {num_examples} input examples for the task described above. Return them in the form of a list.",
        },
    ]

    print(f"Generating {num_examples} training inputs...")

    inputs = []

    i = 0
    while i < 5 and len(inputs) < num_examples:
        i += 1
        response = await acompletion(
            model=INPUT_GENERATION_MODEL,
            messages=messages,
            response_format=TrainingDataset,
            temperature=1.0,
        )

        dataset = TrainingDataset.model_validate_json(
            response.choices[0].message.content
        )
        inputs = [ex.input for ex in dataset.inputs]

    if len(inputs) < num_examples:
        raise ValueError(f"Failed to generate {num_examples} training inputs.")

    return inputs


# Generate training inputs
training_inputs = await generate_training_inputs(
    TASK_DESCRIPTION, num_examples=TRAINING_CONFIG["num_training_inputs"]
)
print(f"\nGenerated {len(training_inputs)} training inputs!")
print("\nFirst 5 examples:")
for i, input_text in enumerate(training_inputs[:5]):
    print(f"\nExample {i + 1}: {input_text}")

# =========== Model Creation Code ===========

random.seed(42)

# Declare the model
model = art.TrainableModel(
    name=MODEL_NAME,
    project=PROJECT_NAME,
    base_model=BASE_MODEL,
)

# To run on a T4, we need to override some config defaults.
if torch.cuda.get_device_properties(0).major < 8:
    model._internal_config = art.dev.InternalModelConfig(
        init_args=art.dev.InitArgs(
            max_seq_length=MAX_SEQ_LENGTH,
        ),
        engine_args=art.dev.EngineArgs(
            enforce_eager=True,
            gpu_memory_utilization=GPU_MEMORY_UTILIZATION,
        ),
    )

# Initialize the server
if torch.cuda.get_device_properties(0).major < 8:
    backend = LocalBackend(
        in_process=True,
        path="./.art",
    )
else:
    backend = LocalBackend()

# Register the model with the local Backend
await model.register(backend)

print("Model created!")
print("Base model:", BASE_MODEL)
print("Model name:", MODEL_NAME)
print("Project name:", PROJECT_NAME)

# ============ Rollout Function Code =============


if os.getenv("WANDB_API_KEY", ""):
    weave.init(PROJECT_NAME, settings={"print_call_link": False})


# Generate a system prompt for the task
async def generate_system_prompt(task_description: str) -> str:
    """Generate an appropriate system prompt for the task"""

    messages = [
        {
            "role": "system",
            "content": "Generate a clear, concise system prompt for a model that will perform the following task. The prompt should be direct and instructional.",
        },
        {
            "role": "user",
            "content": f"Task: {task_description}\n\nGenerate a system prompt for this task.",
        },
    ]

    response = await acompletion(
        model=SYSTEM_PROMPT_GENERATION_MODEL,
        messages=messages,
        temperature=0.3,
    )

    return response.choices[0].message.content.strip()


SYSTEM_PROMPT = await generate_system_prompt(TASK_DESCRIPTION)
print(f"Generated system prompt:\n\n{SYSTEM_PROMPT}")


class TaskInput(BaseModel):
    step: int
    input_text: str


@weave.op
async def rollout(model: art.Model, task_input: TaskInput) -> art.Trajectory:
    """Execute a single rollout for the custom task"""

    traj = art.Trajectory(
        reward=0.0,
        messages_and_choices=[],
        metadata={
            "step": task_input.step,
            "input": task_input.input_text,
        },
    )

    # Build the conversation
    traj.messages_and_choices = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": task_input.input_text},
    ]

    # Get model response
    if model.trainable:
        litellm_model_name = f"hosted_vllm/{model.name}"
    else:
        litellm_model_name = model.name

    response = await acompletion(
        model=litellm_model_name,
        base_url=model.inference_base_url,
        api_key=model.inference_api_key,
        temperature=0.7,
        messages=traj.messages(),
        caching=False,
    )

    # Add the model's response to the trajectory
    traj.messages_and_choices.append(
        convert_litellm_choice_to_openai(response.choices[0])
    )

    return traj


print("\nRollout function defined!")


# Test RULER with example outputs for a text formalization task
test_input = "hey can u send me the report asap? thx"

base_messages = [
    {"role": "system", "content": "Convert informal text to formal business language."},
    {"role": "user", "content": test_input},
]

good_trajectory = art.Trajectory(
    messages_and_choices=[
        *base_messages,
        {
            "role": "assistant",
            "content": "Could you please send me the report at your earliest convenience? Thank you.",
        },
    ],
    reward=0,
)

mediocre_trajectory = art.Trajectory(
    messages_and_choices=[
        *base_messages,
        {"role": "assistant", "content": "Can you send me the report soon? Thanks."},
    ],
    reward=0,
)

bad_trajectory = art.Trajectory(
    messages_and_choices=[
        *base_messages,
        {"role": "assistant", "content": "hey send report quick thx"},
    ],
    reward=0,
)

sample_group = art.TrajectoryGroup(
    trajectories=[good_trajectory, mediocre_trajectory, bad_trajectory]
)

# RULER will score these based on how well they accomplish the task
# Allow ten retries in case of API rate limiting
for i in range(10):
    try:
        judged_group = await ruler_score_group(sample_group, RULER_MODEL, debug=True)
        break
    except Exception as e:
        print(f"Error scoring group: {e}")
        continue

assert judged_group is not None

# Display rankings
sorted_trajectories = sorted(
    judged_group.trajectories, key=lambda t: t.reward, reverse=True
)
for rank, traj in enumerate(sorted_trajectories, 1):
    messages = traj.messages()
    print(f"\nRank {rank}: Score {traj.reward:.3f}")
    print(f"  Response: {messages[-1]['content']}")


# ============ Training Loop =============

# Convert training inputs to TaskInput objects
training_task_inputs = [TaskInput(step=0, input_text=inp) for inp in training_inputs]

# Create training iterator
training_iterator = iterate_dataset(
    training_task_inputs,
    groups_per_step=TRAINING_CONFIG["groups_per_step"],
    num_epochs=TRAINING_CONFIG["num_epochs"],
    initial_step=await model.get_step(),
)

print(f"Starting training with {len(training_task_inputs)} inputs...")
print(f"Training for {TRAINING_CONFIG['num_epochs']} epoch(s)")
print(
    f"Generating {TRAINING_CONFIG['rollouts_per_group']} responses per input for RULER to compare"
)
print(
    "\nWhy multiple responses? RULER needs to compare different attempts to learn what's good!"
)

for batch in training_iterator:
    print(
        f"\nTraining step {batch.step}, epoch {batch.epoch}, epoch step {batch.epoch_step}"
    )
    print(f"Batch contains {len(batch.items)} inputs")

    # Create trajectory groups for this batch
    groups = []
    for task_input in batch.items:
        # Update step number
        task_input.step = batch.step

        # Generate multiple responses for each input (RULER will compare these)
        groups.append(
            art.TrajectoryGroup(
                (
                    rollout(model, task_input)
                    for _ in range(TRAINING_CONFIG["rollouts_per_group"])
                )
            )
        )

    # Gather all trajectory groups
    finished_groups = await art.gather_trajectory_groups(
        groups,
        pbar_desc="Generating responses",
        max_exceptions=TRAINING_CONFIG["rollouts_per_group"] * len(batch.items),
    )

    # Use RULER to score each group
    judged_groups = []
    for group in finished_groups:
        # Allow ten retries in case of API rate limiting
        judged_group = None
        for i in range(10):
            try:
                judged_group = await ruler_score_group(group, RULER_MODEL, debug=False)
                break
            except Exception as e:
                print(f"Error scoring group: {e}")
                continue
        assert judged_group is not None
        judged_groups.append(judged_group)

    # Train on the scored trajectories
    await model.delete_checkpoints()
    await model.train(
        judged_groups,
        config=art.TrainConfig(learning_rate=TRAINING_CONFIG["learning_rate"]),
        _config={"logprob_calculation_chunk_size": 8},
    )

    print(f"Completed training step {batch.step}")

    # Stop after configured steps (if limit is set)
    if (
        TRAINING_CONFIG["max_training_steps"]
        and batch.step >= TRAINING_CONFIG["max_training_steps"]
    ):
        print(
            f"Reached maximum training steps ({TRAINING_CONFIG['max_training_steps']})"
        )
        break

print("\n✅ Training completed!")

Generating 25 training inputs...

Generated 25 training inputs!

First 5 examples:

Example 1: Could you turn this Slack thread about Friday's server outage into Shakespearean iambic pentameter, please?

Example 2: Make this customer complaint sound like a Victorian-era etiquette manual while keeping the core grievance intact.

Example 3: Please rewrite my LinkedIn update announcing a promotion as if it were a 2007 Facebook wall post from a scene kid named xXShadowXx.

Example 4: Translate this academic abstract on nanotechnology into Gen-Z slang that would make sense to someone who only speaks Twitch streamer.

Example 5: Turn this breakup text into a 1950s radio jingle while preserving the original message of 'it's not you, it's me'.


[34m[1mwandb[0m: Currently logged in as: [33mmehultotala[0m ([33mmehultotala-veermata-jeejabai-technological-institute[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


INFO 09-03 05:15:10 [__init__.py:235] Automatically detected platform cuda.
ERROR 09-03 05:15:11 [fa_utils.py:57] Cannot use FA version 2 is not supported due to FA2 is only supported on devices with compute capability >= 8



Please restructure your imports with 'import unsloth' at the top of your file.
  import unsloth  # type: ignore



🦥 Unsloth: Will patch your computer to enable 2x faster free finetuning.
🦥 Unsloth Zoo will now patch everything to make training faster!
Unsloth: Patching vLLM v1 graph capture
Unsloth: Patching vLLM v0 graph capture
==((====))==  Unsloth 2025.8.6: Fast Qwen2 patching. Transformers: 4.53.2. vLLM: 0.10.0.
   \\   /|    Tesla T4. Num GPUs = 1. Max memory: 14.741 GB. Platform: Linux.
O^O/ \_/ \    Torch: 2.7.1+cu126. CUDA: 7.5. CUDA Toolkit: 12.6. Triton: 3.2.0
\        /    Bfloat16 = FALSE. FA [Xformers = 0.0.31. FA2 = False]
 "-____-"     Free license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!
Unsloth: vLLM loading unsloth/qwen2.5-1.5b-instruct-unsloth-bnb-4bit with actual GPU utilization = 78.25%
Unsloth: Your GPU has CUDA compute capability 7.5 with VRAM = 14.74 GB.
Unsloth: Using conservativeness = 1.0. Chunked prefill tokens = 4096. Num Sequences = 224.
Unsloth: vLLM's KV Cache can use up to 10.32 GB. 

Loading safetensors checkpoint shards:   0% Completed | 0/1 [00:00<?, ?it/s]


Loading safetensors checkpoint shards:   0% Completed | 0/1 [00:00<?, ?it/s]


INFO 09-03 05:15:48 [punica_selector.py:19] Using PunicaWrapperGPU.
INFO 09-03 05:15:49 [model_runner.py:1115] Model loading took 1.4698 GiB and 4.807380 seconds
INFO 09-03 05:15:54 [worker.py:295] Memory profiling takes 3.34 seconds
INFO 09-03 05:15:54 [worker.py:295] the current vLLM instance can use total_gpu_memory (14.74GiB) x gpu_memory_utilization (0.70) = 10.32GiB
INFO 09-03 05:15:54 [worker.py:295] model weights take 1.47GiB; non_torch_memory takes 0.03GiB; PyTorch activation peak memory takes 1.23GiB; the rest of the memory reserved for KV Cache is 7.60GiB.
INFO 09-03 05:15:54 [executor_base.py:113] # cuda blocks: 17779, # CPU blocks: 0
INFO 09-03 05:15:54 [executor_base.py:118] Maximum concurrency for 4096 tokens per request: 69.45x
INFO 09-03 05:15:54 [llm_engine.py:424] init engine (profile, create kv cache, warmup model) took 4.82 seconds
Unsloth: Just some info: will skip parsing ['pre_feedforward_layernorm', 'q_norm', 'k_norm', 'post_feedforward_layernorm']
Unsloth: Jus

Unsloth 2025.8.6 patched 28 layers with 28 QKV layers, 28 O layers and 28 MLP layers.
Unsloth: Already have LoRA adapters! We shall skip this step.


Model created!
Base model: Qwen/Qwen2.5-1.5B-Instruct
Model name: rl-model-002
Project name: auto-rl
Generated system prompt:

System Prompt:
You are a style-rewriter.  
1. Read the user’s original text.  
2. Read the requested style (e.g., sarcastic, poetic, formal, casual, Gen-Z slang, etc.).  
3. Rewrite the text so the style changes completely while every core idea of the original is still present.  
4. Output only the rewritten text—no labels, explanations, or extra commentary.

Rollout function defined!



Rank 1: Score 0.950
  Response: Could you please send me the report at your earliest convenience? Thank you.

Rank 2: Score 0.700
  Response: Can you send me the report soon? Thanks.

Rank 3: Score 0.100
  Response: hey send report quick thx
Starting training with 25 inputs...
Training for 3 epoch(s)
Generating 4 responses per input for RULER to compare

Why multiple responses? RULER needs to compare different attempts to learn what's good!


Iterating dataset:  44%|####4     | 12/27 [00:00<?, ?batch/s]


Training step 12, epoch 1, epoch step 3
Batch contains 3 inputs


Generating responses:   0%|          | 0/12 [00:00<?, ?it/s]

Error scoring group: 

[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.

Error scoring group: litellm.APIError: APIError: OpenrouterException - {"error":{"message":"This request requires more credits, or fewer max_tokens. You requested up to 146719 tokens, but can only afford 121085. To increase, visit https://openrouter.ai/settings/keys and create a key with a higher limit","code":402,"metadata":{"provider_name":null}},"user_id":"user_2nvgBsp6WyCaH33KCqbvbe6lwa2"}

[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.

Error scoring group: litellm.APIError: APIError: OpenrouterException - {"error":{"message":"This request requires more credits, or fewer max_tokens. You requested up to 146719 tokens, but can only afford 138382. To increase, visit https://openrouter.ai/

train:   0%|          | 0/2 [00:00<?, ?it/s]

==((====))==  Unsloth - 2x faster free finetuning | Num GPUs used = 1
   \\   /|    Num examples = 10,000,000 | Num Epochs = 3 | Total steps = 30,000,000
O^O/ \_/ \    Batch size per device = 2 | Gradient accumulation steps = 1
\        /    Data Parallel GPUs = 1 | Total batch size (2 x 1 x 1) = 2
 "-____-"     Trainable parameters = 9,232,384 of 1,552,946,688 (0.59% trained)


Unsloth: Will smartly offload gradients to save VRAM!
Completed training step 12

Training step 13, epoch 1, epoch step 4
Batch contains 3 inputs


Generating responses:   0%|          | 0/12 [00:00<?, ?it/s]

No "val/reward" metric found in history
Deleted checkpoint ./.art/auto-rl/models/rl-model-002/checkpoints/0012
Packed 11 trajectories into 1 sequences of length 2048


train:   0%|          | 0/1 [00:00<?, ?it/s]

Completed training step 13

Training step 14, epoch 1, epoch step 5
Batch contains 3 inputs


Generating responses:   0%|          | 0/12 [00:00<?, ?it/s]

No "val/reward" metric found in history
Deleted checkpoint ./.art/auto-rl/models/rl-model-002/checkpoints/0013
Packed 12 trajectories into 1 sequences of length 2048


train:   0%|          | 0/1 [00:00<?, ?it/s]

Completed training step 14

Training step 15, epoch 1, epoch step 6
Batch contains 3 inputs


Generating responses:   0%|          | 0/12 [00:00<?, ?it/s]


[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.

Error scoring group: litellm.APIError: APIError: OpenrouterException - {"error":{"message":"This request requires more credits, or fewer max_tokens. You requested up to 145762 tokens, but can only afford 129800. To increase, visit https://openrouter.ai/settings/keys and create a key with a higher limit","code":402,"metadata":{"provider_name":null}},"user_id":"user_2nvgBsp6WyCaH33KCqbvbe6lwa2"}

[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.

Error scoring group: litellm.APIError: APIError: OpenrouterException - {"error":{"message":"This request requires more credits, or fewer max_tokens. You requested up to 145762 tokens, but can only afford 129800. To increase, visit https://openrouter.ai/settings/keys and crea

AssertionError: 

In [None]:
# Generate test inputs
print("Generating test inputs...")
test_inputs = await generate_training_inputs(
    TASK_DESCRIPTION, num_examples=NUM_TEST_INPUTS
)

print(f"\n🧪 Testing the trained model on {len(test_inputs)} new inputs:\n")
print("=" * 80)

for i, test_input in enumerate(test_inputs):
    print(f"\nTest {i + 1}:")
    print(f"Input: {test_input}")

    # Run the model
    test_task_input = TaskInput(step=999, input_text=test_input)
    result_trajectory = await rollout(model, test_task_input)

    # Extract the model's response
    messages = result_trajectory.messages()
    model_response = messages[-1]["content"] if messages else "No response"

    print(f"Model output: {model_response}")
    print("-" * 80)

print("\n🎉 Testing completed!")
print(f"\nYour model '{MODEL_NAME}' has been trained to: {TASK_DESCRIPTION}")
print("\nTo use this model in production:")
print("1. The model checkpoint is saved in ./.art/")
print("2. You can load it using the vLLM library")
print(
    "3. Or continue training with more examples by adjusting the configuration at the top"
)