In [1]:
import torch
import torch.nn as nn
from transformers import AutoModelForCausalLM, AutoTokenizer

In [2]:
MODEL_NAME = "Qwen/Qwen2.5-Coder-1.5B-Instruct"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

In [3]:
DTYPE = torch.bfloat16 if DEVICE == "cuda" else torch.float32

N_PROMPT_TOKENS = 20
LR = 1e-4  
EPOCHS = 50
MAX_NEW_TOKENS = 64

In [4]:
train_data = [
    (
        "Move forward 2 meters",
        "ros2 topic pub /cmd_vel geometry_msgs/msg/Twist \"{linear: {x: 2.0}}\""
    ),
    (
        "Turn left 90 degrees",
        "ros2 service call /rotate_robot robot_msgs/srv/Rotate \"{angle: 1.57}\""
    ),
    (
        "Navigate to waypoint A",
        "ros2 action send_goal /navigate_to_pose nav2_msgs/action/NavigateToPose "
        "\"{pose: {header: {frame_id: 'map'}, pose: {position: {x: 5.0, y: 2.0}}}}\""
    )
]

In [5]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=DTYPE,
    device_map="auto"
)

for p in model.parameters():
    p.requires_grad = False

model.eval()

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

config.json:   0%|          | 0.00/660 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!
2025-12-24 04:16:35.414639: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1766549795.808826      55 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1766549795.918292      55 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1766549796.863798      55 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1766549796.863841      55 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1766549796.863844      55

model.safetensors:   0%|          | 0.00/3.09G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/242 [00:00<?, ?B/s]

Qwen2ForCausalLM(
  (model): Qwen2Model(
    (embed_tokens): Embedding(151936, 1536)
    (layers): ModuleList(
      (0-27): 28 x Qwen2DecoderLayer(
        (self_attn): Qwen2Attention(
          (q_proj): Linear(in_features=1536, out_features=1536, bias=True)
          (k_proj): Linear(in_features=1536, out_features=256, bias=True)
          (v_proj): Linear(in_features=1536, out_features=256, bias=True)
          (o_proj): Linear(in_features=1536, out_features=1536, bias=False)
        )
        (mlp): Qwen2MLP(
          (gate_proj): Linear(in_features=1536, out_features=8960, bias=False)
          (up_proj): Linear(in_features=1536, out_features=8960, bias=False)
          (down_proj): Linear(in_features=8960, out_features=1536, bias=False)
          (act_fn): SiLUActivation()
        )
        (input_layernorm): Qwen2RMSNorm((1536,), eps=1e-06)
        (post_attention_layernorm): Qwen2RMSNorm((1536,), eps=1e-06)
      )
    )
    (norm): Qwen2RMSNorm((1536,), eps=1e-06)
    (rotar

In [6]:
class PTuningV2Prompt(nn.Module):
    def __init__(self, n_tokens, hidden_size, dtype):
        super().__init__()

        self.virtual_tokens = torch.arange(n_tokens)
        print("Virtual tokens { P-Tuning v2 } :", self.virtual_tokens)

        self.embedding = nn.Embedding(n_tokens, hidden_size, dtype=dtype)
        print("Embedding dtype { P-Tuning v2 }:", self.embedding.weight.dtype)

        self.mlp = nn.Sequential(
            nn.Linear(hidden_size, hidden_size, dtype=dtype),
            nn.Tanh(),
            nn.Linear(hidden_size, hidden_size, dtype=dtype)
        )
        print("MLP layers { P-Tuning v2 }:", [layer for layer in self.mlp])

        nn.init.normal_(self.embedding.weight, mean=0.0, std=0.02)

    def forward(self, batch_size, device):
        tokens = self.virtual_tokens.to(device)
        x = self.embedding(tokens)
        x = self.mlp(x)
        return x.unsqueeze(0).expand(batch_size, -1, -1)

prompt_encoder = PTuningV2Prompt(
    N_PROMPT_TOKENS,
    model.config.hidden_size,
    DTYPE
).to(model.device)

Virtual tokens { P-Tuning v2 } : tensor([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17,
        18, 19])
Embedding dtype { P-Tuning v2 }: torch.float32
MLP layers { P-Tuning v2 }: [Linear(in_features=1536, out_features=1536, bias=True), Tanh(), Linear(in_features=1536, out_features=1536, bias=True)]


In [7]:
def compute_loss(input_text, target_text):
    input_ids = tokenizer(input_text, return_tensors="pt").input_ids.to(model.device)
    target_ids = tokenizer(target_text, return_tensors="pt").input_ids.to(model.device)

    batch_size = input_ids.size(0)

    full_ids = torch.cat([input_ids, target_ids], dim=1)

    token_embeds = model.get_input_embeddings()(full_ids).to(DTYPE)
    prompt_embeds = prompt_encoder(batch_size, model.device)

    full_embeds = torch.cat([prompt_embeds, token_embeds], dim=1)

    attention_mask = torch.ones(
        full_embeds.shape[:-1],
        device=model.device,
        dtype=torch.long
    )

    labels = torch.cat(
        [
            torch.full(
                (batch_size, N_PROMPT_TOKENS + input_ids.size(1)),
                -100,
                device=model.device,
                dtype=torch.long
            ),
            target_ids
        ],
        dim=1
    )

    outputs = model(
        inputs_embeds=full_embeds,
        attention_mask=attention_mask,
        labels=labels
    )
    return outputs.loss

In [None]:
optimizer = torch.optim.AdamW(prompt_encoder.parameters(), lr=LR)

print("\n" + "="*40)
print("ðŸš€ Starting P Tuning Training Session")
print("="*40 + "\n")

for epoch in range(EPOCHS):
    total_loss = 0.0

    for inp, out in train_data:
        loss = compute_loss(inp, out)

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(prompt_encoder.parameters(), 1.0)
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch+1:03d} | Loss: {total_loss:.4f}")

print("âœ¨ Training Complete!\n")


ðŸš€ Starting P Tuning Training Session



In [None]:
torch.save(prompt_encoder.state_dict(), "p_tuning_ros2.pt")
print("\nâœ… Saved: p_tuning_ros2.pt")

In [None]:
def infer_ros2_command(human_input):
    input_ids = tokenizer(human_input, return_tensors="pt").input_ids.to(model.device)
    input_embeds = model.get_input_embeddings()(input_ids).to(DTYPE)

    prompt_embeds = prompt_encoder(1, model.device)
    print("Prompt embeds shape { Inference ros2 command } :", prompt_embeds.shape)

    full_embeds = torch.cat([prompt_embeds, input_embeds], dim=1)
    print("Full embeds shape { Inference ros2 command } :", full_embeds.shape)

    with torch.no_grad():
        output_ids = model.generate(
            inputs_embeds=full_embeds,
            max_new_tokens=MAX_NEW_TOKENS,
            do_sample=False,
            pad_token_id=tokenizer.eos_token_id
        )

    return tokenizer.decode(output_ids[0], skip_special_tokens=True)

In [None]:
print("\nðŸ§ª Testing\n")

tests = [
    "Move forward 2 meters",
    "Turn left 90 degrees",
    "Navigate to waypoint A"
]

for t in tests:
    print("Input :", t)
    print("Output:", infer_ros2_command(t))
    print("-" * 80)
