In [6]:
import torch
x = torch.rand(5, 3)
print(x)

tensor([[0.4204, 0.8024, 0.4790],
        [0.4604, 0.5669, 0.0029],
        [0.0281, 0.6576, 0.6562],
        [0.4122, 0.2332, 0.5772],
        [0.6728, 0.8616, 0.1212]])


### 1. Data Loading and Preprocessing

In [7]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision.io import read_video
from torchvision import transforms
import pandas as pd

class VideoCaptionDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        Args:
            root_dir (string): Directory with all the video folders.
            transform (callable, optional): Optional transform to be applied on a video frame.
        """
        self.root_dir = root_dir
        self.transform = transform
        self.data_pairs = self._load_data_pairs(root_dir)
    
    def _load_data_pairs(self, root_dir):
        data_pairs = []
        for folder_name in sorted(os.listdir(root_dir), key=lambda x: int(x)):
            video_path = os.path.join(root_dir, folder_name, 'video.mp4')  # Assuming video is named 'video.mp4'
            caption_path = os.path.join(root_dir, folder_name, 'caption.txt')  # Assuming caption is named 'caption.txt'
            with open(caption_path, 'r', encoding='utf-8') as f:
                caption = f.read().strip()
            data_pairs.append((video_path, caption))
        return data_pairs
    
    def __len__(self):
        return len(self.data_pairs)
    
    def __getitem__(self, idx):
        video_path, caption = self.data_pairs[idx]
        video, _, _ = read_video(video_path)  # Reads video into a (T, H, W, C) tensor
        if self.transform:
            video = self.transform(video)
        return video, caption

### 2. Define Transformations

In [8]:
from torchvision.transforms import Compose, Resize, Normalize

transform = Compose([
    Resize((224, 224)),  # Resize video frames to 224x224
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize with ImageNet mean and std
])

### 3. Initialize Dataset and DataLoader

In [9]:
root_dir = 'data_powdervibe'
dataset = VideoCaptionDataset(root_dir=root_dir, transform=transform)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

### 4. Encoder-Decoder Model Architecture

In [13]:
import torch.nn as nn
import torchvision.models as models
from torch.nn.utils.rnn import pack_padded_sequence

class EncoderCNN(nn.Module):
    def __init__(self, embed_size):
        """Load the pretrained ResNet-152 and replace top fc layer."""
        super(EncoderCNN, self).__init__()
        resnet = models.resnet152(pretrained=True)
        modules = list(resnet.children())[:-1]      # delete the last fc layer.
        self.resnet = nn.Sequential(*modules)
        self.linear = nn.Linear(resnet.fc.in_features, embed_size)
        self.bn = nn.BatchNorm1d(embed_size, momentum=0.01)
        
    def forward(self, images):
        """Extract feature vectors from input images."""
        with torch.no_grad():  # We don't need to calculate gradients here, so we disable gradient computation.
            features = self.resnet(images)
        features = features.reshape(features.size(0), -1)
        features = self.bn(self.linear(features))
        return features

class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers, max_seq_length=20):
        """Set the hyper-parameters and build the layers."""
        super(DecoderRNN, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.max_seg_length = max_seq_length
        
    def forward(self, features, captions, lengths):
        """Decode image feature vectors and generates captions."""
        embeddings = self.embed(captions)
        embeddings = torch.cat((features.unsqueeze(1), embeddings), 1)
        packed = pack_padded_sequence(embeddings, lengths, batch_first=True, enforce_sorted=False) 
        hiddens, _ = self.lstm(packed)
        outputs = self.linear(hiddens[0])
        return outputs
    
    def sample(self, features, states=None):
        """Generate captions for given image features using greedy search."""
        sampled_ids = []
        inputs = features.unsqueeze(1)
        for i in range(self.max_seg_length):
            hiddens, states = self.lstm(inputs, states)          # hiddens: (batch_size, 1, hidden_size)
            outputs = self.linear(hiddens.squeeze(1))            # outputs:  (batch_size, vocab_size)
            _, predicted = outputs.max(1)                        # predicted: (batch_size)
            sampled_ids.append(predicted)
            inputs = self.embed(predicted)                       # inputs: (batch_size, embed_size)
            inputs = inputs.unsqueeze(1)                         # inputs: (batch_size, 1, embed_size)
        sampled_ids = torch.stack(sampled_ids, 1)                # sampled_ids: (batch_size, max_seq_length)
        return sampled_ids

### Preprocessing Video Frames

In [14]:
from torchvision import transforms

# Define the transform to match ResNet's expectations
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

### Processing Captions

In [15]:
class Vocabulary:
    def __init__(self):
        self.word2idx = {}
        self.idx2word = {}
        self.idx = 0

    def add_word(self, word):
        if not word in self.word2idx:
            self.word2idx[word] = self.idx
            self.idx2word[self.idx] = word
            self.idx += 1

    def __call__(self, word):
        if not word in self.word2idx:
            return self.word2idx['<unk>']
        return self.word2idx[word]

    def __len__(self):
        return len(self.word2idx)


### Implementing a Training Loop

In [None]:
import torch.optim as optim

# Assuming `encoder` and `decoder` are your model instances and `train_loader` is your DataLoader instance
encoder = EncoderCNN(embed_size)
decoder = DecoderRNN(embed_size, hidden_size, len(vocab), num_layers)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
params = list(decoder.parameters()) + list(encoder.linear.parameters()) + list(encoder.bn.parameters())
optimizer = optim.Adam(params, lr=learning_rate)

# Train the models
total_step = len(train_loader)
for epoch in range(num_epochs):
    for i, (images, captions, lengths) in enumerate(train_loader):
        
        # Set mini-batch dataset
        images = images.to(device)
        captions = captions.to(device)
        targets = pack_padded_sequence(captions, lengths, batch_first=True)[0]
        
        # Forward, backward and optimize
        features = encoder(images)
        outputs = decoder(features, captions, lengths)
        loss = criterion(outputs, targets)
        
        decoder.zero_grad()
        encoder.zero_grad()
        loss.backward()
        optimizer.step()

        # Print log info
        if i % log_step == 0:
            print(f'Epoch [{epoch}/{num_epochs}], Step [{i}/{total_step}], Loss: {loss.item():.4f}')
