In [2]:
import os


ROOT = os.getcwd()

print("Project root:", ROOT)
print("HumanML3D folder:", ROOT)

MAX_SEQ_LEN = 196

Project root: C:\Users\21600\Downloads\assignment\HumanML3D\HumanML3D
HumanML3D folder: C:\Users\21600\Downloads\assignment\HumanML3D\HumanML3D


In [3]:


import os
import random
import numpy as np

import torch
from torch.utils.data import Dataset, DataLoader

print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("GPU:", torch.cuda.get_device_name(0))

PyTorch version: 2.8.0+cpu
CUDA available: False


In [4]:
# 2. DATASET & COLLATE

class HumanML3DDataset(Dataset):
    """
    HumanML3D 구조:
        Mean.npy, Std.npy
        train.txt / val.txt / test.txt
        new_joint_vecs/<id>.npy (motion: T, 263)
        texts/<id>.txt          (caption lines)
    """
    def __init__(self, data_root, split="train", normalize=True):
        super().__init__()
        self.data_root = data_root
        self.split = split
        self.normalize = normalize

        split_file = os.path.join(data_root, f"{split}.txt")
        with open(split_file, "r") as f:
            ids = [line.strip().replace(".npy", "") for line in f.readlines()]
        self.sample_ids = ids

        self.motion_dir = os.path.join(data_root, "new_joint_vecs")
        self.text_dir = os.path.join(data_root, "texts")

        # 정규화 (Mean, Std)
        if normalize:
            self.mean = np.load(os.path.join(data_root, "Mean.npy"))  # (263,)
            self.std = np.load(os.path.join(data_root, "Std.npy"))    # (263,)
        else:
            self.mean = None
            self.std = None

    def __len__(self):
        return len(self.sample_ids)

    def _load_motion(self, sid):
        path = os.path.join(self.motion_dir, sid + ".npy")
        motion = np.load(path).astype(np.float32)  # (T, 263)

        if self.normalize:
            motion = (motion - self.mean) / (self.std + 1e-8)

        return motion

    def _load_text(self, sid):
        path = os.path.join(self.text_dir, sid + ".txt")
        with open(path, "r", encoding="utf-8") as f:
            lines = [l.strip() for l in f.readlines() if l.strip()]
        raw = random.choice(lines)
        caption = raw.split("#")[0].lower()
        return caption

    def __getitem__(self, idx):
        sid = self.sample_ids[idx]
        motion = self._load_motion(sid)
        caption = self._load_text(sid)

        return {
            "name": sid,
            "motion": motion,   # numpy (T, 263)
            "length": motion.shape[0],
            "text": caption
        }

In [5]:

def humanml_collate_fn(batch):
    batch = sorted(batch, key=lambda x: x["length"], reverse=True)

    lengths = [b["length"] for b in batch]
    max_len = max(lengths)

    motions = []
    texts = []
    names = []

    for b in batch:
        m = b["motion"]   # (T, 263)
        T, D = m.shape

        # padding to max_len
        if T < max_len:
            pad = np.zeros((max_len - T, D), dtype=np.float32)
            m = np.concatenate([m, pad], axis=0)

        motions.append(torch.from_numpy(m))   # (max_len, 263)
        texts.append(b["text"])
        names.append(b["name"])

    motions = torch.stack(motions)   # (B, max_len, 263)
    lengths = torch.tensor(lengths, dtype=torch.long)

    return {
        "motion": motions,
        "length": lengths,
        "text": texts,
        "name": names
    }

In [7]:
import torch
import torch.nn as nn
from transformers import BertTokenizer, BertModel

  from .autonotebook import tqdm as notebook_tqdm


In [8]:
class BERTTextEncoder(nn.Module):
    def __init__(self, model_name="bert-base-uncased", pooling="cls"):
        super().__init__()
        self.pooling = pooling

        # Load tokenizer & model
        self.tokenizer = BertTokenizer.from_pretrained(model_name)
        self.bert = BertModel.from_pretrained(model_name)

        # 출력 차원: 768
        self.embed_dim = self.bert.config.hidden_size

    def forward(self, text_list):

        # Tokenize
        enc = self.tokenizer(
            text_list,
            padding=True,
            truncation=True,
            max_length=64,
            return_tensors="pt"
        ).to(next(self.bert.parameters()).device)

        out = self.bert(
            input_ids=enc["input_ids"],
            attention_mask=enc["attention_mask"]
        )

        if self.pooling == "cls":
            text_emb = out.last_hidden_state[:, 0]  # (B, 768)
        else:
            mask = enc["attention_mask"].unsqueeze(-1).float()  # (B, T, 1)
            sum_hidden = (out.last_hidden_state * mask).sum(dim=1)
            lengths = mask.sum(dim=1)
            text_emb = sum_hidden / lengths

        return text_emb  # (B, 768)

In [9]:
import torch.nn.functional as F

In [10]:
class TransformerBlock(nn.Module):
    def __init__(self, dim, num_heads, ff_dim, dropout=0.1):
        super().__init__()

        self.attn = nn.MultiheadAttention(
            embed_dim=dim,
            num_heads=num_heads,
            dropout=dropout,
            batch_first=True
        )
        self.ff = nn.Sequential(
            nn.Linear(dim, ff_dim),
            nn.GELU(),
            nn.Linear(ff_dim, dim)
        )
        self.norm1 = nn.LayerNorm(dim)
        self.norm2 = nn.LayerNorm(dim)

    def forward(self, x):
        attn_out, _ = self.attn(x, x, x)
        x = self.norm1(x + attn_out)

        ff_out = self.ff(x)
        x = self.norm2(x + ff_out)
        return x


class MotionDiffusionTransformerConcat(nn.Module):
    def __init__(
        self,
        motion_dim=263,       # HumanML3D motion vector dimension
        text_dim=768,         # BERT embedding dim
        proj_dim=256,         # we project both into 256-d
        model_dim=512,        # Transformer hidden dimension
        num_layers=8,
        num_heads=8,
        ff_dim=1024,
        dropout=0.1
    ):
        super().__init__()

        # motion → 256
        self.motion_proj = nn.Linear(motion_dim, proj_dim)

        # text → 256
        self.text_proj = nn.Linear(text_dim, proj_dim)

        # concat → 512
        self.input_dim = proj_dim * 2

        self.layers = nn.ModuleList([
            TransformerBlock(model_dim, num_heads, ff_dim, dropout)
            for _ in range(num_layers)
        ])

        self.match_dim = nn.Linear(self.input_dim, model_dim)

        self.output_proj = nn.Linear(model_dim, motion_dim)

    def forward(self, x, text_emb):
        """
        x: (B, T, motion_dim)
        text_emb: (B, text_dim)
        """
        B, T, _ = x.shape

        # project motion → (B, T, 256)
        motion_feat = self.motion_proj(x)

        # project text → (B, 256)
        text_feat = self.text_proj(text_emb)
        text_feat = text_feat[:, None, :].repeat(1, T, 1)  # (B, T, 256)

        # concat → (B, T, 512)
        h = torch.cat([motion_feat, text_feat], dim=-1)

        h = self.match_dim(h)

        for layer in self.layers:
            h = layer(h)

        out = self.output_proj(h)  # (B, T, 263)

        return out


In [49]:
def make_beta_schedule(num_steps=1000, start=1e-4, end=0.02):
    return torch.linspace(start, end, num_steps)


class DiffusionSchedule:
    def __init__(self, num_steps=1000, device="cpu"):
        self.num_steps = num_steps
        self.device = device
        
        self.betas = make_beta_schedule(num_steps).to(device)
        self.alphas = 1.0 - self.betas
        self.alpha_cumprod = torch.cumprod(self.alphas, dim=0)
        
        self.sqrt_alpha_cumprod = torch.sqrt(self.alpha_cumprod)
        self.sqrt_one_minus_alpha_cumprod = torch.sqrt(1.0 - self.alpha_cumprod)

    def extract(self, arr, t, x_shape):
        """
        arr: (num_steps,)
        t: (B,)
        return shape: (B, 1, 1)
        """
        return arr[t].view(-1, *([1] * (len(x_shape) - 1)))


def q_sample(x0, t, schedule: DiffusionSchedule, noise=None):
    """
    x0: (B, T, D)
    t: (B,)
    """
    if noise is None:
        noise = torch.randn_like(x0)

    sqrt_alpha_cumprod_t = schedule.extract(schedule.sqrt_alpha_cumprod, t, x0.shape)
    sqrt_one_minus_alpha_cumprod_t = schedule.extract(schedule.sqrt_one_minus_alpha_cumprod, t, x0.shape)

    x_t = sqrt_alpha_cumprod_t * x0 + sqrt_one_minus_alpha_cumprod_t * noise
    return x_t, noise


# =========================
# Loss
# =========================

def diffusion_loss(model, schedule, x0, text_emb):
    B = x0.shape[0]
    device = x0.device

    t = torch.randint(0, schedule.num_steps, (B,), device=device)
    x_t, noise = q_sample(x0, t, schedule)

    eps_pred = model(x_t, text_emb)

    return F.mse_loss(eps_pred, noise)

def p_sample(model, schedule, x_t, t, text_emb, eta=0.0):
 
    eps_pred = model(x_t, text_emb)   # (B, T, D)
    alphas = schedule.alphas          # (num_steps,)
    alpha_t = alphas[t].view(-1, 1, 1)       # (B,1,1)

    sqrt_alpha_t = torch.sqrt(alpha_t)
    sqrt_one_minus_alpha_t = torch.sqrt(1.0 - alpha_t)

    x_prev = (x_t - sqrt_one_minus_alpha_t * eps_pred) / sqrt_alpha_t

    return x_prev

@torch.no_grad()
def p_sample_loop(model, schedule, shape, text_emb, device="cpu", eta=0.0):
    B = shape[0]
    x_t = torch.randn(shape, device=device)
    text_emb = text_emb.to(device)

    for step in reversed(range(schedule.num_steps)):
        t = torch.full((B,), step, device=device, dtype=torch.long)
        x_t = p_sample(model, schedule, x_t, t, text_emb, eta=eta)

    return x_t

In [50]:
from tqdm import tqdm

def train(
    data_root,
    save_dir="./checkpoints",
    num_epochs=50,
    batch_size=16,
    lr=1e-4,
    num_diffusion_steps=1000,
    device=None,
    finetune_bert=False,
    num_workers=0,         
    log_interval=50
):
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"[INFO] Using device: {device}")

    os.makedirs(save_dir, exist_ok=True)

    train_dataset = HumanML3DDataset(
        data_root=data_root,
        split="train",
        normalize=True
    )

    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        collate_fn=humanml_collate_fn,
        drop_last=True,
    )

    text_encoder = BERTTextEncoder(
        model_name="bert-base-uncased",
        pooling="cls"
    ).to(device)

    motion_model = MotionDiffusionTransformerConcat(
        motion_dim=263,
        text_dim=text_encoder.embed_dim,
        proj_dim=256,
        model_dim=512,
        num_layers=8,
        num_heads=8,
        ff_dim=1024,
        dropout=0.1
    ).to(device)

    if not finetune_bert:
        for p in text_encoder.parameters():
            p.requires_grad = False
        text_encoder.eval()
        print("[INFO] BERT encoder is frozen (no finetuning).")
    else:
        print("[INFO] BERT encoder will be finetuned.")

    schedule = DiffusionSchedule(num_steps=num_diffusion_steps, device=device)

    if finetune_bert:
        params = list(text_encoder.parameters()) + list(motion_model.parameters())
    else:
        params = list(motion_model.parameters())

    optimizer = torch.optim.AdamW(params, lr=lr)
    max_grad_norm = 1.0

    global_step = 0
    for epoch in range(1, num_epochs + 1):
        motion_model.train()
        if finetune_bert:
            text_encoder.train()
        else:
            text_encoder.eval()

        running_loss = 0.0

        pbar = tqdm(
            enumerate(train_loader),
            total=len(train_loader),
            desc=f"Epoch {epoch}/{num_epochs}"
        )

        for batch_idx, batch in pbar:
            motions = batch["motion"].to(device)  # (B, T, 263)
            texts = batch["text"]                 # list[str]

            if finetune_bert:
                text_emb = text_encoder(texts)
            else:
                with torch.no_grad():
                    text_emb = text_encoder(texts)

            loss = diffusion_loss(
                model=motion_model,
                schedule=schedule,
                x0=motions,
                text_emb=text_emb
            )

            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(params, max_grad_norm)
            optimizer.step()

            loss_val = loss.item()
            running_loss += loss_val
            global_step += 1

            pbar.set_postfix({
                "step_loss": f"{loss_val:.6f}",
                "avg_loss": f"{running_loss / (batch_idx + 1):.6f}"
            })

            if (batch_idx + 1) % log_interval == 0:
                avg_loss = running_loss / (batch_idx + 1)
                print(
                    f"[Epoch {epoch:03d}] "
                    f"Step {batch_idx+1:04d}/{len(train_loader):04d} | "
                    f"Global {global_step:06d} | "
                    f"step_loss: {loss_val:.6f} | "
                    f"avg_loss: {avg_loss:.6f}"
                )

        ckpt_path = os.path.join(save_dir, f"epoch_{epoch:03d}.pt")
        save_dict = {
            "epoch": epoch,
            "motion_model_state": motion_model.state_dict(),
            "text_encoder_state": text_encoder.state_dict(),
            "optimizer_state": optimizer.state_dict(),
            "config": {
                "num_epochs": num_epochs,
                "batch_size": batch_size,
                "lr": lr,
                "num_diffusion_steps": num_diffusion_steps,
                "finetune_bert": finetune_bert,
            }
        }
        torch.save(save_dict, ckpt_path)
        print(f"[INFO] Saved checkpoint: {ckpt_path}")

In [51]:

if __name__ == "__main__":
    data_root = ROOT

    train(
        data_root=data_root,
        save_dir=os.path.join(ROOT, "checkpoints_humanml2"),
        num_epochs=100,
        batch_size=16,
        lr=1e-3,
        num_diffusion_steps=20,
        finetune_bert=False,      
        num_workers=0,            # Windows
        log_interval=50
    )

[INFO] Using device: cpu


Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.seq_relationship.bias', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


[INFO] BERT encoder is frozen (no finetuning).


Epoch 1/100:   0%|                           | 1/1461 [00:02<1:05:50,  2.71s/it, step_loss=1.338799, avg_loss=1.338799]


KeyboardInterrupt: 

In [None]:
import sys

current_dir = os.getcwd()
parent_dir = os.path.dirname(current_dir)

sys.path.append(parent_dir)
print(parent_dir)

In [46]:

from skeleton import Skeleton               
from paramUtil import t2m_raw_offsets, t2m_kinematic_chain
from common.quaternion import qrot, qinv       

from scipy.spatial.transform import Rotation as R


JOINT_NUM = 22
FACE_JOINT_IDX = [2, 1, 17, 16]

JOINT_NAMES = [
    "Hips",          # 0
    "LeftUpLeg",     # 1
    "RightUpLeg",    # 2
    "Spine",         # 3
    "LeftLeg",       # 4
    "RightLeg",      # 5
    "Spine1",        # 6
    "LeftFoot",      # 7
    "RightFoot",     # 8
    "Spine2",        # 9
    "LeftToeBase",   # 10
    "RightToeBase",  # 11
    "Neck",          # 12
    "RightShoulder", # 13
    "LeftShoulder",  # 14
    "RightArm",      # 15
    "LeftArm",       # 16
    "RightForeArm",  # 17
    "LeftForeArm",   # 18
    "RightHand",     # 19
    "LeftHand",      # 20
    "LeftHandEnd"    # 21 
]


# =========================
#  1. 263 → joint 위치 복원
# =========================

def recover_root_rot_pos(data: torch.Tensor):
    """
    data: (..., T, D=263) torch tensor
    반환:
        r_rot_quat: (..., T, 4)
        r_pos:      (..., T, 3)
    """
    rot_vel = data[..., 0]
    r_rot_ang = torch.zeros_like(rot_vel).to(data.device)
    r_rot_ang[..., 1:] = rot_vel[..., :-1]
    r_rot_ang = torch.cumsum(r_rot_ang, dim=-1)

    r_rot_quat = torch.zeros(data.shape[:-1] + (4,), device=data.device)
    r_rot_quat[..., 0] = torch.cos(r_rot_ang)
    r_rot_quat[..., 2] = torch.sin(r_rot_ang)

    r_pos = torch.zeros(data.shape[:-1] + (3,), device=data.device)
    # x, z 속도 적분
    r_pos[..., 1:, [0, 2]] = data[..., :-1, 1:3]
    r_pos = qrot(qinv(r_rot_quat), r_pos)
    r_pos = torch.cumsum(r_pos, dim=-2)
    # y 높이
    r_pos[..., 1] = data[..., 3]
    return r_rot_quat, r_pos


def recover_from_ric(data: torch.Tensor, joints_num: int):
    """
    HumanML3D 263D (RIC 기반 representation)에서 joint 위치로 복원.
    data: (B, T, D=263)
    반환:
        positions: (B, T, joints_num, 3)
    """
    r_rot_quat, r_pos = recover_root_rot_pos(data)

    # RIC part: joint local positions (root 제거된 것) 
    positions = data[..., 4:(joints_num - 1) * 3 + 4]
    positions = positions.view(positions.shape[:-1] + (-1, 3))  # (..., T, J-1, 3)

    # root 회전 적용해서 월드 좌표로
    positions = qrot(
        qinv(r_rot_quat[..., None, :]).expand(positions.shape[:-1] + (4,)),
        positions
    )

    # root 위치 더해주기
    positions[..., 0] += r_pos[..., 0:1]
    positions[..., 2] += r_pos[..., 2:3]

    # root joint 붙이기
    positions = torch.cat([r_pos.unsqueeze(-2), positions], dim=-2)
    return positions


def humanml3d_263_to_positions(motion_263: np.ndarray) -> np.ndarray:
    """
    motion_263: (T, 263) numpy
    반환: positions (T, 22, 3) numpy
    """
    assert motion_263.ndim == 2 and motion_263.shape[1] == 263
    data = torch.from_numpy(motion_263).float().unsqueeze(0)  # (1, T, 263)

    with torch.no_grad():
        positions = recover_from_ric(data, JOINT_NUM)[0]      # (T, 22, 3)

    return positions.cpu().numpy()


# =========================
#  2. joint 위치 → joint 회전 (quaternion)  → Euler
# =========================

def build_skeleton(device="cpu"):
    offsets = torch.from_numpy(t2m_raw_offsets).float()
    skel = Skeleton(offsets, t2m_kinematic_chain, device)     # :contentReference[oaicite:3]{index=3}
    return skel


def positions_to_quat(positions: np.ndarray, skel: Skeleton) -> np.ndarray:
    """
    positions: (T, J, 3) numpy
    반환: quat_params (T, J, 4) numpy, [w, x, y, z]
    """
    assert positions.ndim == 3
    quat_params = skel.inverse_kinematics_np(
        positions, FACE_JOINT_IDX, smooth_forward=True
    )  # (T, J, 4)
    return quat_params


def quat_to_euler_zyx(q: np.ndarray) -> np.ndarray:
    """
    q: (..., 4) numpy, [w, x, y, z]
    반환: (..., 3) Euler angles [Z, X, Y] in degrees (BVH 채널 순서용)
    SciPy는 [x, y, z, w] 를 쓰므로 순서 변환 필요.
    """
    # reshape to (-1, 4)
    orig_shape = q.shape[:-1]
    q_flat = q.reshape(-1, 4)

    # [w, x, y, z] -> [x, y, z, w]
    q_scipy = np.stack(
        [q_flat[:, 1], q_flat[:, 2], q_flat[:, 3], q_flat[:, 0]],
        axis=-1
    )

    rot = R.from_quat(q_scipy)
    euler = rot.as_euler('ZXY', degrees=True)  # (N, 3)
    return euler.reshape(orig_shape + (3,))


# =========================
#  3. BVH 파일 쓰기
# =========================

def build_parents_and_children(skel: Skeleton):
    parents = skel.parents()               # len J, root 0 의 parent == -1 :contentReference[oaicite:4]{index=4}
    J = len(parents)
    children = [[] for _ in range(J)]
    for j in range(J):
        p = parents[j]
        if p >= 0:
            children[p].append(j)
    return parents, children


def write_joint_recursive(f, joint_idx, parents, children, offsets, level=0):
    """
    재귀적으로 HIERARCHY 블럭 작성.
    offsets: (J, 3)
    """
    indent = "  " * level
    name = JOINT_NAMES[joint_idx] if joint_idx < len(JOINT_NAMES) else f"J{joint_idx}"

    if parents[joint_idx] == -1:
        # ROOT joint
        f.write(f"{indent}ROOT {name}\n")
    else:
        f.write(f"{indent}JOINT {name}\n")

    f.write(f"{indent}{{\n")
    off = offsets[joint_idx]
    f.write(f"{indent}  OFFSET {off[0]:.6f} {off[1]:.6f} {off[2]:.6f}\n")

    if parents[joint_idx] == -1:
        # ROOT: position + rotation
        f.write(f"{indent}  CHANNELS 6 Xposition Yposition Zposition Zrotation Xrotation Yrotation\n")
    else:
        f.write(f"{indent}  CHANNELS 3 Zrotation Xrotation Yrotation\n")

    # children joints
    for c in children[joint_idx]:
        write_joint_recursive(f, c, parents, children, offsets, level + 1)

    # End Site (leaf joint일 때만 적당히 하나 만들어 줌)
    if len(children[joint_idx]) == 0:
        f.write(f"{indent}  End Site\n")
        f.write(f"{indent}  {{\n")
        # 그냥 대략적인 길이로 0.1 넣음
        f.write(f"{indent}    OFFSET 0.000000 0.000000 0.100000\n")
        f.write(f"{indent}  }}\n")

    f.write(f"{indent}}}\n")

def write_bvh_fixed_offsets(path, positions, eulers, offsets, parents, frame_time):

    T, J, _ = positions.shape

    # ----- children list 만들기 -----
    children = [[] for _ in range(J)]
    for j in range(J):
        p = parents[j]
        if p >= 0:
            children[p].append(j)

    # ----- joint names -----
    names = JOINT_NAMES

    # ----- 재귀 출력 함수 -----
    def write_joint(f, j, level=0):
        indent = "  " * level
        name = names[j]

        if parents[j] == -1:
            f.write(f"{indent}ROOT {name}\n")
        else:
            f.write(f"{indent}JOINT {name}\n")

        f.write(f"{indent}{{\n")

        off = offsets[j]
        f.write(f"{indent}  OFFSET {off[0]:.6f} {off[1]:.6f} {off[2]:.6f}\n")

        if parents[j] == -1:
            f.write(f"{indent}  CHANNELS 6 Xposition Yposition Zposition Zrotation Xrotation Yrotation\n")
        else:
            f.write(f"{indent}  CHANNELS 3 Zrotation Xrotation Yrotation\n")

        for c in children[j]:
            write_joint(f, c, level + 1)

        if len(children[j]) == 0:
            f.write(f"{indent}  End Site\n")
            f.write(f"{indent}  {{\n")
            f.write(f"{indent}    OFFSET 0 0 0.1\n")
            f.write(f"{indent}  }}\n")

        f.write(f"{indent}}}\n")

    # ----- 파일 작성 시작 -----
    with open(path, "w") as f:
        f.write("HIERARCHY\n")
        write_joint(f, 0, 0)

        f.write("MOTION\n")
        f.write(f"Frames: {T}\n")
        f.write(f"Frame Time: {frame_time:.8f}\n")

        # preorder
        order = []
        def collect(j):
            order.append(j)
            for c in children[j]:
                collect(c)
        collect(0)

        for t in range(T):
            vals = []
            for j in order:
                pos = positions[t, j]
                rot = eulers[t, j]  # Z X Y
                if parents[j] == -1:
                    vals.extend([pos[0], pos[1], pos[2], rot[0], rot[1], rot[2]])
                else:
                    vals.extend([rot[0], rot[1], rot[2]])

            f.write(" ".join(f"{v:.6f}" for v in vals) + "\n")
# =========================
#  4. 전체 파이프라인 + CLI
# =========================
def convert_263_array_to_bvh(
    motion_263: np.ndarray,
    output_bvh: str,
    frame_time: float = 1/20.0
):
    """
    motion_263: (T,263) numpy (denormalized)
    """

    # -------- 1) 263D → joint positions --------
    positions = humanml3d_263_to_positions(motion_263)   # (T,22,3)
    # -------s- 2) Skeleton 준비 --------
    skel = build_skeleton(device="cpu")
    parents = skel.parents()
    J = len(parents)

    # -------- 3) BVH 오프셋을 직접 계산 --------
    first = positions[0]   # (22,3)
    offsets = np.zeros_like(first)

    for j in range(J):
        p = parents[j]
        if p == -1:
            offsets[j] = np.array([0, 0, 0])   # root offset 항상 0
        else:
            offsets[j] = first[j] - first[p]   # local offset by position diff

    # -------- 4) IK → quaternion --------
    quat = positions_to_quat(positions, skel)     # (T,22,4)

    # -------- 5) quaternion → Euler(Z,X,Y) --------
    eulers = quat_to_euler_zyx(quat)

    # -------- 6) BVH 저장 --------
    write_bvh_fixed_offsets(
        output_bvh,
        positions,
        eulers,
        offsets,
        parents,
        frame_time
    )
    print("[OK] Saved BVH:", output_bvh)

In [75]:
def get_test_loader(data_root, batch_size=4):
    test_dataset = HumanML3DDataset(
        data_root=data_root,
        split="test",
        normalize=True
    )

    return DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
        collate_fn=humanml_collate_fn,
        drop_last=False
    )

def load_inference_model(ckpt_path, device="cpu"):
    ckpt = torch.load(ckpt_path, map_location=device)

    text_encoder = BERTTextEncoder(
        model_name="bert-base-uncased",
        pooling="cls"
    ).to(device)

    motion_model = MotionDiffusionTransformerConcat(
        motion_dim=263,
        text_dim=text_encoder.embed_dim,
        proj_dim=256,
        model_dim=512,
        num_layers=8,
        num_heads=8,
        ff_dim=1024,
        dropout=0.1
    ).to(device)

    motion_model.load_state_dict(ckpt["motion_model_state"])
    text_encoder.load_state_dict(ckpt["text_encoder_state"])

    for p in text_encoder.parameters():
        p.requires_grad = False
    text_encoder.eval()
    motion_model.eval()

    schedule = DiffusionSchedule(num_steps=120, device=device)

    print("Loaded checkpoint:", ckpt_path)
    return text_encoder, motion_model, schedule

@torch.no_grad()
def generate_motion_from_text_batch(
    text_encoder,
    motion_model,
    schedule,
    texts,
    T,
    device="cuda"
):
    text_emb = text_encoder(texts).to(device)

    B = len(texts)
    D = 263
    shape = (B, T, D)

    x0 = p_sample_loop(
        model=motion_model,
        schedule=schedule,
        shape=shape,
        text_emb=text_emb,
        device=device
    )
    return x0  # normalized
def generate_5_bvh(
    data_root,
    ckpt_path,
    out_dir="bvh_samples",
    device="cpu",
    fps=20.0,
):
    os.makedirs(out_dir, exist_ok=True)

    loader = get_test_loader(data_root, batch_size=4)
    text_encoder, motion_model, schedule = load_inference_model(ckpt_path, device)

    mean = np.load(os.path.join(data_root, "Mean.npy"))   # (263,)
    std  = np.load(os.path.join(data_root, "Std.npy"))    # (263,)
    frame_time = 1.0 / fps
    saved = 0

    for batch in loader:
        if saved >= 5:
            break

        names = batch["name"]        
        motions = batch["motion"]    
        texts   = batch["text"]      
        T = motions.shape[1]

        x0_norm = generate_motion_from_text_batch(
            text_encoder,
            motion_model,
            schedule,
            texts,
            T,
            device
        )   # (B,T,263), normalized

        
        for i in range(x0_norm.shape[0]):
            if saved >= 5:
                break

            
            motion_263_norm = x0_norm[i].cpu().numpy()  # (T,263)

            # (2) denormalize: (x * std + mean)
            motion_263 = motion_263_norm * std + mean   # (T,263) denorm

            def show_stats(x, name):
                print(f"=== {name} ===")
                print("shape:", x.shape)
                print("min/max:", x.min(), x.max())
                print("mean/std:", x.mean(), x.std())
    
            show_stats(motion_263, "Gen")

            bvh_path = os.path.join(out_dir, f"{names[i]}_gen.bvh")

            convert_263_array_to_bvh(
                motion_263=motion_263,
                output_bvh=bvh_path,
                frame_time=frame_time,
            )

            print(f"[{saved}] Saved BVH:", bvh_path)
            print(f"    text:", texts[i])
            saved += 1

    print("Done, saved 5 BVH files.")
    return motion_263

In [76]:
import os

data_root = "."   

ckpt_path = os.path.join(data_root, "checkpoints_humanml", "epoch_034.pt")


out_dir = os.path.join(data_root, "bvh_samples")

gen = generate_5_bvh(
    data_root=data_root,
    ckpt_path=ckpt_path,
    out_dir=out_dir,
    device="cpu",       # ✅ CPU로 강제
)




Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.seq_relationship.bias', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Loaded checkpoint: .\checkpoints_humanml\epoch_034.pt
=== Gen ===
shape: (199, 263)
min/max: -0.8211727 1.4934978
mean/std: 0.19189551 0.41662708
[OK] Saved BVH: .\bvh_samples\004822_gen.bvh
[0] Saved BVH: .\bvh_samples\004822_gen.bvh
    text: person walking at a average pace forward, swaying arms and torso with a sense of swagger
=== Gen ===
shape: (199, 263)
min/max: -0.9096818 1.4815434
mean/std: 0.18952002 0.41479903
[OK] Saved BVH: .\bvh_samples\008463_gen.bvh
[1] Saved BVH: .\bvh_samples\008463_gen.bvh
    text: man walks along, then bends down and picks something up.
=== Gen ===
shape: (199, 263)
min/max: -0.8293877 1.4810334
mean/std: 0.18874314 0.4140141
[OK] Saved BVH: .\bvh_samples\009613_gen.bvh
[2] Saved BVH: .\bvh_samples\009613_gen.bvh
    text: the person is running backwards quickly.
=== Gen ===
shape: (199, 263)
min/max: -0.8484808 1.4817318
mean/std: 0.191075 0.414631
[OK] Saved BVH: .\bvh_samples\014457_gen.bvh
[3] Saved BVH: .\bvh_samples\014457_gen.bvh
    text: 