In [None]:
import torch
from torchvision.datasets import VisionDataset
import numpy as np
import urllib
from os import path, makedirs


class MovingMNIST(VisionDataset):
    mirrors = [
        "https://github.com/eadali/moving-mnist/releases/download/v0.1/",
    ]

    resources = [
        ("train-sequences.npy", ""),
        ("train-annotations.npy", ""),
    ]

    def __init__(self, root, download=False):
        self.root = root
        self.seq_path = path.join(self.root, 'train-sequences.npy')
        self.ann_path = path.join(self.root, 'train-annotations.npy')

        if download:
            self.download()

        self.sequences = torch.from_numpy(np.load(self.seq_path))
        self.annotations = torch.from_numpy(np.load(self.ann_path))
        #print(self.sequences.shape)
    def __len__(self):
        #print(self.sequences.shape[0])
        return self.sequences.shape[0]

    def __getitem__(self, idx):
        return self.sequences[idx], self.annotations[idx]

    def download(self):
        if not path.exists(self.root):
            makedirs(self.root)

        for filename, md5 in self.resources:
            for mirror in self.mirrors:
                url = f"{mirror}{filename}"
                file_path = path.join(self.root, filename)
                try:
                    print(f"Downloading {url}")
                    urllib.request.urlretrieve(url, file_path)
                except urllib.error.URLError as error:
                    print(f"Failed to download (trying next):\n{error}")
                    continue
                finally:
                    print()
                break
            else:
                raise RuntimeError(f"Error downloading {filename}")

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
from moving_mnist import MovingMNIST
from torch.utils.data import DataLoader, Dataset, random_split
import torch.nn.functional as F
# 检查是否可以使用GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 加载 MovingMNIST 数据集
num_videos = 10000
sequence_length = 20
root = 'data/processed/MovingMNIST/'
batch_size = 32

# MovingMNIST dataset loader
dataset = MovingMNIST(root=root, download=False)
mnist = DataLoader(dataset, batch_size=num_videos, shuffle=True)

# 将数字移动到新背景上的函数
def move_boxes_to_two_sequences(original_sequence, annotation, new_shape=(64, 64)):
    num_frames, _, height, width = original_sequence.shape
    new_sequence = torch.zeros((num_frames, 1, new_shape[0], new_shape[1]))  # 新序列
    labels = []

    for frame_idx in range(num_frames):
        boxes = annotation[frame_idx]  # 获取当前帧的所有 bounding boxes
        
        if len(boxes) >= 2:
            # 处理第一个box
            box1 = boxes[0].int()  # 获取第一个box的坐标
            class_id1, xmin1, ymin1, xmax1, ymax1 = box1
            digit_image1 = original_sequence[frame_idx, :, ymin1:ymax1, xmin1:xmax1]  # 提取出第一个数字
            
            # 将第一个数字插入到新序列的随机位置
            new_xmin1 = np.random.randint(0, new_shape[1] - (xmax1 - xmin1))
            new_ymin1 = np.random.randint(0, new_shape[0] - (ymax1 - ymin1))
            new_xmax1 = new_xmin1 + (xmax1 - xmin1)
            new_ymax1 = new_ymin1 + (ymax1 - ymin1)
            new_sequence[frame_idx, :, new_ymin1:new_ymax1, new_xmin1:new_xmax1] = digit_image1
            labels.append(class_id1.item())  # 记录标签
            


    return new_sequence, labels

# 定义可视化函数
def visualize_sequence(sequence, title="Video Sequence"):
    frames = sequence.cpu().numpy()
    num_frames = frames.shape[0]
    
    plt.figure(figsize=(10, 2))
    for i in range(num_frames):
        plt.subplot(1, num_frames, i + 1)
        plt.imshow(frames[i, 0], cmap='gray')
        plt.axis('off')
    plt.suptitle(title)
    plt.show()

# 提取数据并处理
sequences, annotations = next(iter(mnist))
sequences = sequences.squeeze(0)  # [10000, 20, 1, 64, 64]
annotations = annotations.squeeze(0)  # [10000, 20, boxes, 5]

# 用于存储新生成的 sequences 和标签
new_sequences = []
labels = []

# 对所有原始 sequences 进行操作
for sequence_idx in range(sequences.shape[0]):
    original_sequence = sequences[sequence_idx]  # 形状为 [20, 1, 64, 64]
    annotation = annotations[sequence_idx]  # 形状为 [20, boxes, 5]

    # 将两个 box 移动到新序列中
    new_sequence, label = move_boxes_to_two_sequences(original_sequence, annotation)

    # 存储结果
    new_sequences.append(new_sequence)
    labels.append(label[0])  # 取第一个label作为序列的label

# 划分训练集与验证集
train_size = int(0.8 * len(new_sequences))
val_size = len(new_sequences) - train_size
train_sequences, val_sequences = new_sequences[:train_size], new_sequences[train_size:]
train_labels, val_labels = labels[:train_size], labels[train_size:]

# 将数据转换为Tensor
train_sequences = torch.stack(train_sequences).to(device)  # [train_size, 20, 1, 64, 64]
val_sequences = torch.stack(val_sequences).to(device)  # [val_size, 20, 1, 64, 64]
train_labels = torch.tensor(train_labels).to(device)  # [train_size]
val_labels = torch.tensor(val_labels).to(device)  # [val_size]

# 定义数据集类
class CustomDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return self.sequences[idx], self.labels[idx]

# 创建数据加载器
train_dataset = CustomDataset(train_sequences, train_labels)
val_dataset = CustomDataset(val_sequences, val_labels)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# 定义LSTM模型
class Conv3DModel(nn.Module):
    def __init__(self):
        super(Conv3DModel, self).__init__()
        self.conv1 = nn.Conv3d(1, 16, kernel_size=(3, 3, 3), stride=1, padding=1)
        self.conv2 = nn.Conv3d(16, 32, kernel_size=(3, 3, 3), stride=1, padding=1)
        self.fc1 = nn.Linear(32 * 20 * 32 * 32*4, 128)  # 32x32是经过卷积后特征图的尺寸
        self.fc2 = nn.Linear(128, 10)  # 假设有10个类别

    def forward(self, x):
        x = self.conv1(x)  # [batch_size, 16, 20, 64, 64]
        x = F.relu(x)
        x = self.conv2(x)  # [batch_size, 32, 20, 64, 64]
        x = F.relu(x)
        x = x.view(x.size(0), -1)  # 展平
        x = self.fc1(x)  # [batch_size, 128]
        x = F.relu(x)
        x = self.fc2(x)  # [batch_size, 10]
        return x

# 训练模型
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()  # 确保在训练时使用 train 模式
        running_loss = 0.0
        correct = 0
        total = 0
        
        for sequences, labels in train_loader:
            sequences, labels = sequences.permute(0, 2, 1, 3, 4).to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(sequences)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        train_accuracy = 100 * correct / total
        val_accuracy = validate_model(model, val_loader, criterion)

        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, '
              f'Train Accuracy: {train_accuracy:.2f}%, Val Accuracy: {val_accuracy:.2f}%')

# 验证模型
def validate_model(model, val_loader, criterion):
    model.eval()  # 确保在评估时使用 eval 模式
    correct = 0
    total = 0
    with torch.no_grad():
        for sequences, labels in val_loader:
            sequences, labels = sequences.permute(0, 2, 1, 3, 4).to(device), labels.to(device)
            outputs = model(sequences)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

# 初始化模型、损失函数和优化器
model = Conv3DModel().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练模型
train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=50)

# 可视化其中一个新序列及其标签
visualize_sequence(new_sequences[0], title="New Video Sequence with Label " + str(labels[0]))
