In [None]:
import torch
import json
import torch.nn as nn
import os
from transformers import AutoTokenizer
import math
import torch.nn.functional as F
import decimal
!pip install nltk rouge-score
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# Model for Emotion Encoder-Decoder Transformer
# This model is designed to process keypoints and emotion features, and generate a sequence of tokens.
class EmotionEncoderDecoderTransformer(nn.Module):
    def __init__(self,
                 num_joints=25,
                 kp_input_dim=3,
                 emo_input_dim=7,
                 hidden_size=256,
                 num_layers=4,
                 nhead=8,
                 ff_dim=512,
                 dropout=0.1,
                 max_len=1024,
                 vocab_size=30522):
        super().__init__()

        # Keypoints encoder
        self.kp_fc = nn.Linear(num_joints * kp_input_dim, hidden_size)

        # Emotions encoder
        self.emotion_fc = nn.Linear(emo_input_dim, hidden_size)

        # Emotion bias modulator for decoder
        self.emo_to_bias = nn.Linear(emo_input_dim, hidden_size)

        # Positional encoding buffer
        pe = self._build_positional_encoding(max_len, hidden_size)
        self.register_buffer("pos_encoder", pe)  # [max_len, hidden_size]

        # Encoder
        encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_size, nhead=nhead,
                                                   dim_feedforward=ff_dim, dropout=dropout, batch_first=True)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # Token embedding
        self.token_embedding = nn.Embedding(vocab_size, hidden_size)

        # Decoder
        decoder_layer = nn.TransformerDecoderLayer(d_model=hidden_size, nhead=nhead,
                                                   dim_feedforward=ff_dim, dropout=dropout, batch_first=True)
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)

        # Output projection
        self.output_fc = nn.Linear(hidden_size, vocab_size)

    def _build_positional_encoding(self, max_len, hidden_size):
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, hidden_size, 2) * -(torch.log(torch.tensor(10000.0)) / hidden_size))
        pe = torch.zeros(max_len, hidden_size)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe  # [max_len, hidden_size]

    def forward(self, keypoints, emotions, tgt, tgt_mask=None):
        device = keypoints.device

        # Project keypoints and emotion
        kp_feat = self.kp_fc(keypoints)           # (B, T, H)
        emo_feat = self.emotion_fc(emotions)      # (B, T, H)

        # Encoder input = keypoints + emotion bias + positional encoding
        encoder_input = kp_feat + emo_feat
        pos_enc = self.pos_encoder[:encoder_input.size(1)].to(device)  # (T, H)
        encoder_input = encoder_input + pos_enc.unsqueeze(0)

        # Encode
        memory = self.encoder(encoder_input)

        # Token embeddings for decoder
        tgt_embed = self.token_embedding(tgt).to(device)
        tgt_pos_enc = self.pos_encoder[:tgt.size(1)].to(device)
        tgt_input = tgt_embed + tgt_pos_enc.unsqueeze(0)

        # Apply emotion bias to decoder input (mean emotion across T)
        emotion_bias = self.emo_to_bias(emotions.mean(dim=1))  # (B, H)
        emotion_bias = emotion_bias.unsqueeze(1).expand(-1, tgt_input.size(1), -1)  # (B, T', H)
        tgt_input = tgt_input + emotion_bias

        # Decode
        output = self.decoder(tgt_input, memory, tgt_mask=tgt_mask)
        logits = self.output_fc(output)
        return logits


In [None]:
# path to the directory containing the emotions model
model_path = "<path_to_model>"

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Recreate the model
model = EmotionEncoderDecoderTransformer().to(device)

# Load the model weights
model.load_state_dict(torch.load(model_path, map_location=device))

# Set model to eval mode
model.eval()

# Load tokenizer 
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

In [None]:
# Function to convert subtitle times to seconds
def time_to_float(time_str):

    # Split the time string into hours, minutes, and seconds
    hours, minutes, seconds = time_str.split(':')

    # Convert to float
    hours = float(hours)
    minutes = float(minutes)
    seconds = float(seconds)

    # Convert to total seconds
    total_seconds = hours * 3600 + minutes * 60 + seconds

    # Return the total seconds as a float
    return total_seconds

In [None]:
# Load JSON files
def load_from_json(file_path):
    with open(file_path, 'r') as f:
        data = json.load(f)
    return data


In [None]:
# Load the test data
test_data = load_from_json("<path_to_test_data>")

In [None]:
# Load the keypoints and emotions from JSON files
def load_emotion_tensor_from_json(json_path, video_id):
    # Load the JSON file
    with open(json_path, 'r') as f:
        data = json.load(f)

    # Create a tensor for the normalised keypoints
    normalized = []
    # Iterate through the frames 
    for i, frame in enumerate(data):
        # Check if the frame is empty or all zeros
        # If so, append a zero tensor
        if not frame or all(v == 0 for v in frame):
            normalized.append(torch.zeros(7, dtype=torch.float32))
        else:
            # Convert the frame to a tensor
            values = [float(decimal.Decimal(str(x))) for x in frame]
            tensor = torch.tensor(values, dtype=torch.float32)
            normalized.append(tensor)

    # Convert the list of tensors to a single tensor
    return video_id, torch.stack(normalized)  # [T, 7]

In [None]:
# greedy decoding function
def greedy_decode(model, keypoints_tensor, emotion_tensor, tokenizer, max_len=80, start_token_id=101, end_token_id=102):
    # Set the model to evaluation mode
    model.eval()
    # Get the device from the model parameters
    device = next(model.parameters()).device

    # Move the tensors to the same device as the model
    # unsqueeze the tensors to add a batch dimension
    keypoints_tensor = keypoints_tensor.unsqueeze(0).to(device)  # [1, T, 75]
    emotion_tensor = emotion_tensor.unsqueeze(0).to(device)      # [1, T, 7]
    # Get the start token ID from the tokenizer
    generated = torch.tensor([[start_token_id]], dtype=torch.long, device=device)  # [1, 1]

    # Iterate for the maximum length of the sequence
    for step in range(max_len):
        # Generate the target mask for the decoder
        tgt_mask = torch.nn.Transformer.generate_square_subsequent_mask(generated.size(1)).to(device)

        with torch.no_grad():
            # Forward pass through the model
            logits = model(keypoints_tensor, emotion_tensor, generated, tgt_mask=tgt_mask)
            # Get the logits for the last token in the sequence
            next_token = logits[:, -1, :].argmax(dim=-1, keepdim=True)
            # Append the next token to the generated sequence
            generated = torch.cat([generated, next_token], dim=1)

        # Check if the end token is generated
        if next_token.item() == end_token_id:
            break

    # Decode the generated sequence to text
    return tokenizer.decode(generated[0], skip_special_tokens=True)

In [None]:
# Run manual tests
print("Running Manual Tests")

# Paths to the emotion data and keypoints
test_keypoints_dir = "<path_to_test_keypoints>"
test_emotions_dir = "<path_to_test_emotions>"
# Load the test metadata
fps = 25
num_joints = 25
keypoints_dim = num_joints * 3

# Create a list to store the processed test samples
processed_test_samples = []

# Iterate through the test data
for data in test_data:
    # Extract the code and subtitle text from the data
    code = data["code"]
    subtitle_text = data["text"]
    
    # Build the path to the keypoints file
    keypoints_path = os.path.join(test_keypoints_dir, code + "_keypoints.pt")
    # Check if the keypoints file exists
    if not os.path.exists(keypoints_path):
        print(f"[Warning] Keypoints file not found for {code}")
        continue

    # Load the keypoints tensor
    full_keypoints = torch.load(keypoints_path)

    # Check if the keypoints tensor is empty
    if len(full_keypoints) == 0:
        print(f"[Warning] Skipping {code}: empty keypoints")
        continue
    
    # Process the keypoints tensor
    processed_kps = []
    # Iterate through the frames in the keypoints tensor
    for frame in full_keypoints:
        # Create a tensor for the keypoints in the current frame
        frame_tensor = torch.zeros(keypoints_dim)
        # Check if the frame is empty
        if len(frame) > 0:
            # Get the first person in the frame
            person = frame[0]
            # Flatten the keypoints for the first person
            flat_kps = [coord for part in person for joint in part for coord in joint]
            flat_kps = flat_kps[:keypoints_dim] + [0] * max(0, keypoints_dim - len(flat_kps))
            # Create a tensor for the flattened keypoints
            frame_tensor = torch.tensor(flat_kps[:keypoints_dim], dtype=torch.float32)
        # Append the frame tensor to the list of processed keypoints
        processed_kps.append(frame_tensor)

    # Convert the list of processed keypoints to a tensor
    keypoints_tensor = torch.stack(processed_kps)  # [T, 75]

    # Load the emotion tensor
    _, emotion_tensor = load_emotion_tensor_from_json(os.path.join(test_emotions_dir, code + "_edited.json"), code + "_edited.json")
    # Check if the emotion tensor is the right size
    if emotion_tensor.size(0) != keypoints_tensor.size(0):
        # If not, truncate the longer tensor to the length of the shorter one
        min_len = min(emotion_tensor.size(0), keypoints_tensor.size(0))
        emotion_tensor = emotion_tensor[:min_len]
        keypoints_tensor = keypoints_tensor[:min_len]

    # Add the processed sample to the list
    processed_test_samples.append((keypoints_tensor, emotion_tensor, subtitle_text, code))

# Initialize the values for BLEU and ROUGE scores
smooth_fn = SmoothingFunction().method1
scorer = rouge_scorer.RougeScorer(['rouge1', 'rougeL'], use_stemmer=True)
all_bleu_scores = []
all_rouge_scores = []

print("\nRunning Manual Tests")
# Iterate through the processed test samples
for keypoints, emotions, expected_text, code in processed_test_samples:
    print(f"\nTest Sample: {code}")
    print("Expected:", expected_text)

    # Get the predicted text using greedy decoding
    result = greedy_decode(model, keypoints, emotions, tokenizer)
    print("Predicted:", result)

    # BLEU
    ref_tokens = [expected_text.split()]
    gen_tokens = result.split()
    bleu = sentence_bleu(ref_tokens, gen_tokens, smoothing_function=smooth_fn)
    all_bleu_scores.append(bleu)
    print("BLEU:", bleu)

    # ROUGE
    rouge = scorer.score(expected_text, result)
    all_rouge_scores.append(rouge)
    print("ROUGE:", rouge)

# Get the average BLEU and ROUGE scores
avg_bleu = sum(all_bleu_scores) / len(all_bleu_scores)
avg_rouge1 = sum(r['rouge1'].fmeasure for r in all_rouge_scores) / len(all_rouge_scores)
avg_rougeL = sum(r['rougeL'].fmeasure for r in all_rouge_scores) / len(all_rouge_scores)

# Print the average scores
print(f"\nAverage BLEU: {avg_bleu:.4f}")
print(f"Average ROUGE-1: {avg_rouge1:.4f}")
print(f"Average ROUGE-L: {avg_rougeL:.4f}")

Running Manual Tests

Running Manual Tests

Test Sample: 001
Expected: Help me.
Predicted: i ' m not a bit of the world, but it ' s not it and it ' s not a new of the world.
BLEU: 0
ROUGE: {'rouge1': Score(precision=0.0, recall=0.0, fmeasure=0.0), 'rougeL': Score(precision=0.0, recall=0.0, fmeasure=0.0)}

Test Sample: 002
Expected: It would start to concern me a little bit, looking at five years, looking at retiring.
Predicted: i ' m not a big, and i ' m not a little of the world.
BLEU: 0.026012784404037925
ROUGE: {'rouge1': Score(precision=0.14285714285714285, recall=0.125, fmeasure=0.13333333333333333), 'rougeL': Score(precision=0.14285714285714285, recall=0.125, fmeasure=0.13333333333333333)}

Test Sample: 003
Expected: Oh, God, I hate these Land Cruisers.
Predicted: i ' ve got to be a little, but it was a little of the world, but it was, but it.
BLEU: 0
ROUGE: {'rouge1': Score(precision=0.05, recall=0.14285714285714285, fmeasure=0.07407407407407408), 'rougeL': Score(precision=0.05,