In [1]:
from t2m import Text2Motion
from utils.get_opt import get_opt
from utils.fixseed import fixseed

import torch
import numpy as np
from os.path import join as pjoin

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
denoiser_name = "t2m_denoiser_vpred_vaegelu"
dataset_name = "t2m"
generator = Text2Motion(denoiser_name, dataset_name)

opt = generator.opt
wrapper_opt = get_opt(opt.dataset_opt_path, torch.device("cuda"))
mean = np.load(pjoin(wrapper_opt.meta_dir, "mean.npy"))
std = np.load(pjoin(wrapper_opt.meta_dir, "std.npy"))

Reading checkpoints/t2m/t2m_denoiser_vpred_vaegelu/opt.txt
Reading checkpoints/t2m/t2m_vae_gelu/opt.txt
Loading VAE Model t2m_vae_gelu
Loading Denoiser Model t2m_denoiser_vpred_vaegelu


0it [00:00, ?it/s]


Loaded CLIP text encoder version ViT-B/32
Reading ./checkpoints/t2m/Comp_v6_KLD005/opt.txt


In [3]:
# denoiser_name = "kit_denoiser_vpred_vaegelu_bsz16" # Point to your specific KIT checkpoint
# dataset_name = "kit"                                   # Tell the system to use KIT-ML logic

# generator = Text2Motion(denoiser_name, dataset_name)

# opt = generator.opt
# wrapper_opt = get_opt(opt.dataset_opt_path, torch.device("cuda"))
# mean = np.load(pjoin(wrapper_opt.meta_dir, "mean.npy"))
# std = np.load(pjoin(wrapper_opt.meta_dir, "std.npy"))

### Original

In [4]:
fixseed(42)
src_text = "a man is dancing"
m_lens = 64
cfg_scale = 7.5
num_inference_timesteps = 50

init_noise, src_motion, (sa, ta, ca) = generator.generate(src_text,
                                                          m_lens,
                                                          cfg_scale,
                                                          num_inference_timesteps)

### Edit - 4 Different Cases

In [5]:
# edit_text = "slowly"
edit_text = "a really desperate man walking around"
src_proportion = 0.2

# # case 1: mirror
# edit_motion = generator.edit(init_noise,
#                              src_text=src_text,
#                              edit_text=edit_text,
#                              edit_mode="mirror",
#                              mirror_mode="lower",
#                              cfg_scale=cfg_scale,
#                              num_inference_timesteps=num_inference_timesteps,
#                              src_sa=sa,
#                              src_ta=ta,
#                              src_ca=ca,
#                              src_proportion=src_proportion)

# # case 2: reweight
# edit_motion = generator.edit(init_noise,
#                              src_text=src_text,
#                              edit_text=src_text,
#                              edit_mode="reweight",
#                              tgt_word="high",
#                              reweight_scale=-1.0,
#                              cfg_scale=cfg_scale,
#                              num_inference_timesteps=num_inference_timesteps,
#                              src_sa=sa,
#                              src_ta=ta,
#                              src_ca=ca,
#                              src_proportion=src_proportion)

# case 3: refine
edit_motion = generator.edit(init_noise,
                             src_text=src_text,
                             edit_text=edit_text,
                             edit_mode="refine",
                             cfg_scale=cfg_scale,
                             num_inference_timesteps=num_inference_timesteps,
                             src_sa=sa,
                             src_ta=ta,
                             src_ca=ca,
                             src_proportion=src_proportion)

# # case 4: word swap
# edit_motion = generator.edit(init_noise,
#                              src_text=src_text,
#                              edit_text=edit_text,
#                              edit_mode="word_swap",
#                              cfg_scale=cfg_scale,
#                              num_inference_timesteps=num_inference_timesteps,
#                              src_sa=sa,
#                              src_ta=None,
#                              src_ca=ca,
#                              src_proportion=src_proportion,
#                              swap_src_proportion=0.2)


### Visualize

In [6]:
import os
from os.path import join as pjoin
import torch
import numpy as np
from utils.motion_process import recover_from_ric
from utils.plot_script import plot_3d_motion
from utils.get_opt import get_opt

def plot_t2m(data, text, filename):
    os.makedirs("edit_result", exist_ok=True)
    #data = data[:m_lens[0].item()]
    data = data[:m_lens]
    joint = recover_from_ric(torch.from_numpy(data).float(), opt.joints_num).numpy()
    save_path = pjoin("edit_result", f"{filename}.mp4")
    plot_3d_motion(save_path, opt.kinematic_chain, joint, title=text, fps=20)

    np.save(pjoin("edit_result", f"{filename}_pos.npy"), joint)
    np.save(pjoin("edit_result", f"{filename}_feats.npy"), data)
    
# mean and std for de-normalization
wrapper_opt = get_opt(opt.dataset_opt_path, torch.device('cuda'))
mean = np.load(pjoin(wrapper_opt.meta_dir, 'mean.npy'))
std = np.load(pjoin(wrapper_opt.meta_dir, 'std.npy'))

Reading ./checkpoints/t2m/Comp_v6_KLD005/opt.txt


### Plot Motions

In [7]:
src_motion = src_motion.detach().cpu().numpy() * std + mean
plot_t2m(src_motion[0], src_text, "src")

edit_motion = edit_motion.detach().cpu().numpy()* std + mean
plot_t2m(edit_motion[0], edit_text, "edit")

### Video Visualization

In [8]:
import subprocess
import os
from IPython.display import Video

# Define paths
src_path = os.path.join("edit_result", "src.mp4")
edit_path = os.path.join("edit_result", "edit.mp4")
out_path = os.path.join("edit_result", "final.mp4")

# Build the ffmpeg command
# -y: overwrite output
# -filter_complex hstack: stack video streams horizontally
cmd = [
    "ffmpeg",
    "-y",
    "-i", src_path,
    "-i", edit_path,
    "-filter_complex", "hstack",
    out_path
]

# Run the command
print("Running FFmpeg...")
subprocess.run(cmd, check=True)

# Display
Video(out_path, width=800, height=400)

Running FFmpeg...


## UNLEARNING


In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import numpy as np

# ==========================================
# 1. Configuration
# ==========================================
forget_prompts = [
    "a man kicking", "kicking someone", "violent kick",
    "punching", "a man punching", "fighting",
    "attacking", "violence", "hitting", "combat",
    "martial arts", "strike", "beating"
]
neutral_prompt = "standing" # Empty string = generic/unconditional motion

# Training params
learning_rate = 1e-5
num_steps = 150        # ESD usually works quickly (100-300 steps)
batch_size = 4         # Keep small for VRAM
motion_length = 64     # Length in frames

# ==========================================
# 2. Setup Optimizer & Freeze CLIP
# ==========================================
# The denoiser is where the weights we want to change are located
denoiser = generator.denoiser
device = generator.device

# Freeze the CLIP text encoder to prevent destroying language understanding
# In t2m.py, CLIP is stored at generator.denoiser.clip_model
if hasattr(denoiser, 'clip_model'):
    print("Freezing CLIP model layers...")
    denoiser.clip_model.eval()
    for param in denoiser.clip_model.parameters():
        param.requires_grad = False
else:
    print("Warning: clip_model not found on denoiser. Check architecture.")

# Ensure denoiser is in training mode
denoiser.train()

# We only optimize the denoiser parameters (excluding the frozen CLIP)
optimizer = optim.AdamW(filter(lambda p: p.requires_grad, denoiser.parameters()), lr=learning_rate)
mse_loss = nn.MSELoss()

print(f"Starting ESD Unlearning on {len(forget_prompts)} concepts...")

# ==========================================
# 3. Training Loop
# ==========================================
pbar = tqdm(range(num_steps))

for step in pbar:
    optimizer.zero_grad()
    
    # --- A. Prepare Data ---
    # Sample prompts
    current_prompts = np.random.choice(forget_prompts, batch_size).tolist()
    neutral_prompts = [neutral_prompt] * batch_size
    
    # --- B. Create Latent Noise ---
    # Based on t2m.py generate(): z shape is (Batch, m_lens//4, 7, latent_dim)
    # The VAE compresses motion by factor of 4 roughly? Or architecture specific.
    # We use m_lens // 4 as seen in the 'generate' method.
    dim_latent = generator.vae_opt.latent_dim
    # Shape: [Batch, Frames, Joints, Dim]
    # Note: Text2Motion.generate uses (1, m_lens//4, 7, dim). We match that pattern.
    z = torch.randn(batch_size, motion_length // 4, 7, dim_latent).to(device)
    
    # --- C. Sample Timesteps ---
    # We need random timesteps for diffusion training
    # generator.opt.num_train_timesteps usually holds the max steps (e.g. 1000)
    max_timesteps = generator.opt.num_train_timesteps
    t = torch.randint(0, max_timesteps, (batch_size,), device=device).long()
    
    # --- D. Forward Pass (ESD Logic) ---
    # t2m.py denoiser.forward takes: (z, timestep, text_list, need_attn)
    # It returns (prediction, attention_tuple). We only need prediction [0].
    
    # 1. Prediction for "Violence" (The behavior we want to change)
    pred_forget, _ = denoiser(z, t, current_prompts, need_attn=False)
    
    # 2. Prediction for "Neutral" (The target behavior)
    # We detach this because we don't want to learn the neutral concept, 
    # we just want the violent concept to POINT to this output.
    with torch.no_grad():
        pred_neutral, _ = denoiser(z, t, neutral_prompts, need_attn=False)

    # --- E. Loss & Backprop ---
    # Minimize difference between "Kick" output and "Neutral" output
    loss = mse_loss(pred_forget, pred_neutral)
    
    loss.backward()
    optimizer.step()
    
    pbar.set_description(f"Loss: {loss.item():.4f}")

print("Unlearning complete.")
denoiser.eval() # Switch back to eval for inference

Freezing CLIP model layers...
Starting ESD Unlearning on 13 concepts...


Loss: 0.0060: 100%|██████████| 150/150 [00:13<00:00, 11.48it/s]

Unlearning complete.





Denoiser(
  (input_process): InputProcess(
    (layers): Sequential(
      (0): Linear(in_features=32, out_features=256, bias=True)
      (1): ReLU()
      (2): Linear(in_features=256, out_features=256, bias=True)
    )
  )
  (output_process): OutputProcess(
    (layers): Sequential(
      (0): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
      (1): Linear(in_features=256, out_features=256, bias=True)
      (2): ReLU()
      (3): Linear(in_features=256, out_features=32, bias=True)
    )
  )
  (timestep_emb): TimestepEmbedding(
    (mlp): Sequential(
      (0): Linear(in_features=256, out_features=256, bias=True)
      (1): SiLU()
      (2): Linear(in_features=256, out_features=256, bias=True)
    )
  )
  (clip_model): FrozenCLIPTextEncoder(
    (model): CLIPModel(
      (text_model): CLIPTextTransformer(
        (embeddings): CLIPTextEmbeddings(
          (token_embedding): Embedding(49408, 512)
          (position_embedding): Embedding(77, 512)
        )
        (encoder): CL

In [14]:
# Force a specific seed to compare cleanly
from utils.fixseed import fixseed
fixseed(42)

# Use a prompt from the forget set
test_text = "a man kicking" 

# Generate
# We assume 'generator' is still the main object instance
# and 'denoiser' (which we just trained) is linked to it.
init_noise, src_motion, (sa, ta, ca) = generator.generate(test_text, 
                                                          m_lens=64, 
                                                          cfg_scale=7.5, 
                                                          num_inference_timesteps=50)

# De-normalize and plot
src_motion = src_motion.detach().cpu().numpy() * std + mean
plot_t2m(src_motion[0], test_text, "after_unlearning")

# Show video
from IPython.display import Video
import os
Video(os.path.join("edit_result", "after_unlearning.mp4"))

In [34]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import numpy as np
import subprocess
import os
from IPython.display import Video
from utils.fixseed import fixseed
from os.path import join as pjoin

# --- Configuration ---
test_prompt = "a man kicking violently"
comparison_filename = "comparison_unlearning.mp4"

# ==========================================
# Step 1: Generate "Before" (Original Model)
# ==========================================
# We must reload the model to ensure it is clean
print("Reloading original model for 'Before' generation...")
generator = Text2Motion(denoiser_name, dataset_name) # Reloads weights from disk

fixseed(42)
_, before_motion, _ = generator.generate(test_prompt, m_lens=64, cfg_scale=7.5)

# Save "Before" visualization
before_motion = before_motion.detach().cpu().numpy() * std + mean
plot_t2m(before_motion[0], f"Before: {test_prompt}", "before_unlearning")

# ==========================================
# Step 2: Perform ESD Unlearning
# ==========================================
print("\nStarting Unlearning Process...")

# Unlearning Configuration
forget_prompts = ["kick", "punch", "figh", "strike", "violence"]
neutral_prompt = "standing" 
learning_rate = 1e-5 # Slightly higher for quicker results in demo
num_steps = 300
batch_size = 4

# Setup Optimization
denoiser = generator.denoiser
denoiser.train()
# Freeze CLIP
if hasattr(denoiser, 'clip_model'):
    denoiser.clip_model.eval()
    for param in denoiser.clip_model.parameters():
        param.requires_grad = False

optimizer = optim.AdamW(filter(lambda p: p.requires_grad, denoiser.parameters()), lr=learning_rate)
mse_loss = nn.MSELoss()
device = generator.device

# Training Loop
for step in tqdm(range(num_steps), desc="Unlearning"):
    optimizer.zero_grad()
    
    # 1. Inputs
    current_prompts = np.random.choice(forget_prompts, batch_size).tolist()
    neutral_prompts = [neutral_prompt] * batch_size
    
    # 2. Noise (Match shape: Batch, Frames//4, Joints, Dim)
    z = torch.randn(batch_size, 64 // 4, 7, generator.vae_opt.latent_dim).to(device)
    t = torch.randint(0, generator.opt.num_train_timesteps, (batch_size,), device=device).long()
    
    # 3. ESD Loss
    pred_forget, _ = denoiser(z, t, current_prompts, need_attn=False)
    with torch.no_grad():
        pred_neutral, _ = denoiser(z, t, neutral_prompts, need_attn=False)

    loss = mse_loss(pred_forget, pred_neutral)
    loss.backward()
    optimizer.step()

denoiser.eval() # Switch back to eval

# ==========================================
# Step 3: Generate "After" (Unlearned Model)
# ==========================================
print("\nGenerating 'After' motion...")
fixseed(42) # Use SAME seed to see exact impact
_, after_motion, _ = generator.generate(test_prompt, m_lens=64, cfg_scale=7.5)

# Save "After" visualization
after_motion = after_motion.detach().cpu().numpy() * std + mean
plot_t2m(after_motion[0], f"After: {test_prompt}", "after_unlearning")

# ==========================================
# Step 4: Create Side-by-Side Video
# ==========================================
print("\nStitching videos...")
path_before = pjoin("edit_result", "before_unlearning.mp4")
path_after = pjoin("edit_result", "after_unlearning.mp4")
path_out = pjoin("edit_result", comparison_filename)

cmd = [
    "ffmpeg", "-y",
    "-i", path_before,
    "-i", path_after,
    "-filter_complex", "[0:v][1:v]hstack=inputs=2[v]", # Stack horizontally
    "-map", "[v]",
    path_out
]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

print("Displaying comparison:")
Video(path_out, width=800, height=400)

Reloading original model for 'Before' generation...
Reading checkpoints/t2m/t2m_denoiser_vpred_vaegelu/opt.txt
Reading checkpoints/t2m/t2m_vae_gelu/opt.txt
Loading VAE Model t2m_vae_gelu
Loading Denoiser Model t2m_denoiser_vpred_vaegelu


0it [00:00, ?it/s]


Loaded CLIP text encoder version ViT-B/32

Starting Unlearning Process...


Unlearning: 100%|██████████| 300/300 [00:23<00:00, 12.51it/s]



Generating 'After' motion...

Stitching videos...
Displaying comparison:
