In [None]:
import torch
from torch import nn
from torch.utils.data import Dataset
import os
from PIL import Image
from torchvision import transforms

In [None]:
class CustomLoss(nn.Module):
    def __init__(self):
        super(CustomLoss, self).__init__()
        self.mse_loss = nn.MSELoss(reduction='sum')  # Для координат
        self.bce_loss = nn.BCEWithLogitsLoss(reduction='sum')  # Для уверенности

    def forward(self, predictions, targets):
        # Предсказанные уверенности и координаты
        pred_confidences = predictions[:, 0]
        pred_boxes = predictions[:, 1:]

        # Целевые уверенности (все 1, так как все целевые боксы содержат объекты)
        target_confidences = torch.ones_like(pred_confidences)

        # Целевые координаты
        target_boxes = targets

        # Расчет потерь
        confidence_loss = self.bce_loss(pred_confidences, target_confidences)
        coordinate_loss = self.mse_loss(pred_boxes, target_boxes)

        return confidence_loss + coordinate_loss

In [None]:
# Определение Symmetrical ReLU (SymReLU)
class SymReLU(nn.Module):
    def __init__(self, alpha=1.0):
        super(SymReLU, self).__init__()
        self.alpha = alpha
    
    def forward(self, x):
        return torch.max(x, torch.zeros_like(x)) - self.alpha * torch.min(x, torch.zeros_like(x))

# Определение слоя свертки с SymReLU
class ConvSymRelu(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding, groups=1):
        super(ConvSymRelu, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, groups=groups, bias=False)
        self.symrelu = SymReLU(alpha=1.0)
    
    def forward(self, x):
        return self.symrelu(self.conv(x))
    
class VideoYOLOv3(nn.Module):
    def __init__(self, num_classes, lstm_hidden_size=512, lstm_layers=1):
        super(VideoYOLOv3, self).__init__()
        self.conv_layers = nn.Sequential(
            ConvSymRelu(3, 12, (5, 5), (1, 1), (2, 2)),
            nn.MaxPool2d((2, 2), stride=(2, 2)),
            ConvSymRelu(12, 16, (5, 5), (2, 2), (1, 1)),
            ConvSymRelu(16, 16, (3, 3), (2, 2), (1, 1)),
            ConvSymRelu(16, 16, (3, 3), (1, 1), (1, 1)),
            nn.MaxPool2d((2, 2), stride=(2, 2)),
            ConvSymRelu(16, 24, (3, 3), (1, 1), (1, 1)),
            ConvSymRelu(24, 48, (3, 3), (2, 2), (1, 1)),
            ConvSymRelu(48, 48, (3, 3), (1, 1), (1, 1)),
            ConvSymRelu(48, 48, (3, 3), (1, 1), (1, 1)),
            ConvSymRelu(48, 48, (3, 3), (1, 1), (1, 1)),
            ConvSymRelu(48, 64, (3, 3), (1, 1), (1, 1)),
            ConvSymRelu(64, 64, (7, 7), (7, 7), (0, 0)),  # Уменьшаем до 1x1
            ConvSymRelu(64, 4, (1, 1), (1, 1), (0, 0)),  # Меняем количество каналов, сохраняем 1x1
            ConvSymRelu(4, 4, (3, 3), (1, 1), (1, 1)),
            ConvSymRelu(4, 4, (3, 3), (1, 1), (1, 1)),
            nn.Upsample(scale_factor=(1, 1), mode='nearest'),
            ConvSymRelu(4, 96, (3, 3), (1, 1), (1, 1)),
            ConvSymRelu(96, 256, (1, 1), (1, 1), (0, 0)),
            nn.Conv2d(256, 512, (1, 1))
        )
        
        # LSTM слой
        self.lstm = nn.LSTM(input_size=512,  # Размер входа должен соответствовать выходу последнего сверточного слоя
                            hidden_size=lstm_hidden_size,
                            num_layers=lstm_layers,
                            batch_first=True)
        
        self.fc = nn.Linear(lstm_hidden_size, 4)

    def forward(self, x):
        batch_size, timesteps, C, H, W = x.size()
        c_in = x.view(batch_size * timesteps, C, H, W)
        c_out = self.conv_layers(c_in)
        
        # Переводим выход сверточных слоев в формат (batch, timesteps, features)
        c_out = c_out.view(batch_size, timesteps, -1)
        
        # LSTM слой теперь обрабатывает каждый кадр
        r_out, _ = self.lstm(c_out)
        
        # Применяем полносвязный слой к каждому временному шагу
        out = self.fc(r_out)
        
        return out


In [None]:
def annotations_to_tensor(ann_file):
    with open(ann_file, 'r') as file:
        annotations = file.readlines()
    
    tensor_annotations = []
    for ann_string in annotations:
        ann_data = ann_string.strip().split()
        coords = list(map(float, ann_data[1:]))  # Преобразуем координаты во float (пропускаем класс)
        tensor_annotations.append(coords)
    
    return torch.tensor(tensor_annotations, dtype=torch.float32)  # Преобразование списка в тензор


In [None]:
class VideoFramesDataset(Dataset):
    def __init__(self, video_dir, transform=None):
        """
        Конструктор класса VideoFramesDataset.

        Параметры:
        video_dir (str): Путь к каталогу с видео, каждое видео в отдельной папке.
        transform (callable, optional): Преобразования, применяемые к каждому кадру.
        """
        self.video_dir = video_dir
        self.transform = transform

        # Список всех видео (папок)
        self.videos = [os.path.join(video_dir, v) for v in os.listdir(video_dir) if os.path.isdir(os.path.join(video_dir, v))]
        self.videos.sort()

    def __len__(self):
        return len(self.videos)

    def __getitem__(self, idx):
        video_path = self.videos[idx]
        images_path = os.path.join(video_path, 'images')
        annotations_path = os.path.join(video_path, 'annotations')

        frames = [os.path.join(images_path, frame) for frame in sorted(os.listdir(images_path)) if frame.endswith('.jpg')]
        annotations_files = [os.path.join(annotations_path, ann) for ann in sorted(os.listdir(annotations_path)) if ann.endswith('.txt')]

        images = [Image.open(frame).convert('RGB') for frame in frames]
        anns = [annotations_to_tensor(ann_file) for ann_file in annotations_files]

        if self.transform:
            images = [self.transform(image) for image in images]

        images = torch.stack(images)
        annotations = torch.cat(anns, dim=0)  # Объединяем все аннотации в один тензор

        return images, annotations


    def load_annotations(self, ann_file):
        """
        Загрузка аннотаций из файла. Здесь необходимо адаптировать под формат ваших данных.
        """
        with open(ann_file, 'r') as file:
            annotations = file.readlines()  # Простой пример, может потребоваться изменение
        return annotations

In [None]:
# Пример использования
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

test_dataset = VideoFramesDataset("/kaggle/input/dataset0505/aaa/test", transform=transform)
train_dataset = VideoFramesDataset("/kaggle/input/dataset0505/aaa/train", transform=transform)
print("Количество видео в датасете:", len(test_dataset))

# Получаем кадры и аннотации первого видео
video_frames, video_annotations = test_dataset[0]
print("Форма тензора с кадрами первого видео:", video_frames.shape)
print("Аннотации первого видео:", video_annotations)

In [None]:
class VideoBBoxLoss(nn.Module):
    def __init__(self):
        super(VideoBBoxLoss, self).__init__()
        self.mse_loss = nn.MSELoss(reduction='mean')  # Среднее по всем элементам

    def forward(self, predictions, targets):
        # Предсказания и цели имеют размерность [batch_size, seq_len, 4]
        # где 4 соответствует [x_center, y_center, width, height]
        return self.mse_loss(predictions, targets)

# Создание экземпляра функции потерь
criterion = VideoBBoxLoss()


In [None]:
from torch.utils.data import DataLoader

# Создаем DataLoader для тестового датасета
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=False)
test_loader

In [None]:
# Предполагается, что модель и DataLoader уже определены
model = VideoYOLOv3(num_classes=1)  # Подставьте корректное количество классов
if torch.cuda.is_available():
    model = model.cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=0.002)


In [None]:
import torch
import matplotlib.pyplot as plt
from torchvision.ops import box_iou

In [None]:
def batch_iou(bboxes_pred, bboxes_gt):
    ious = []
    for pred, gt in zip(bboxes_pred, bboxes_gt):
        iou = box_iou(pred.unsqueeze(0), gt.unsqueeze(0))
        ious.append(iou)
    return torch.tensor(ious)  # Возвращаем тензор IoU для каждого кадра

In [None]:
def convert_yolo_to_corners(bboxes, img_width, img_height):
    # Предполагаем, что bboxes имеет размерность [batch_size, seq_len, 4]
    batch_size, seq_len, _ = bboxes.shape
    converted_bboxes = []
    
    for i in range(batch_size):
        video_bboxes = []
        for j in range(seq_len):
            x_center, y_center, width, height = bboxes[i, j]
            x_min = (x_center - width / 2) * img_width
            y_min = (y_center - height / 2) * img_height
            x_max = (x_center + width / 2) * img_width
            y_max = (y_center + height / 2) * img_height
            video_bboxes.append([x_min, y_min, x_max, y_max])
        converted_bboxes.append(video_bboxes)
    
    # Конвертация в тензор с той же размерностью, что и исходный bboxes тензор
    return torch.tensor(converted_bboxes, dtype=torch.float32)


In [None]:
import time
from torchvision.ops import box_iou

train_ious, val_ious = [], []

num_epochs = 12
image_width, image_height = 224, 224  # Примерные размеры изображения

for epoch in range(num_epochs):
    model.train()
    train_loss_accum, train_iou_accum = 0, 0
    print(f'Epoch: {epoch}')

    for batch_idx, (images, targets) in enumerate(train_loader):
        if torch.cuda.is_available():
            images, targets = images.cuda(), targets.cuda()
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        train_loss_accum += loss.item()
        
        outputs = outputs.detach().cpu()
        targets = targets.detach().cpu()
        
        # Расчет IoU для каждого батча

        predicted_corners = convert_yolo_to_corners(outputs, image_width, image_height)
        target_corners = convert_yolo_to_corners(targets, image_width, image_height)

        for i in range(predicted_corners.size(0)):
            iou_scores = box_iou(predicted_corners[i], target_corners[i])
            train_iou_accum += iou_scores.diag().mean().item()

    average_train_iou = train_iou_accum / len(train_loader)
    train_ious.append(average_train_iou)

    model.eval()
    val_mse, val_iou_accum = 0, 0
    with torch.no_grad():
        for images, targets in test_loader:
            if torch.cuda.is_available():
                images, targets = images.cuda(), targets.cuda()

            outputs = model(images)
            outputs = outputs.detach().cpu()
            targets = targets.detach().cpu()


            # Расчет IoU
            predicted_corners = convert_yolo_to_corners(outputs, image_width, image_height)
            target_corners = convert_yolo_to_corners(targets, image_width, image_height)
            for i in range(predicted_corners.size(0)):
                iou_scores = box_iou(predicted_corners[i], target_corners[i])
                val_iou_accum += iou_scores.diag().mean().item()

    average_val_iou = val_iou_accum / len(test_loader)
    val_ious.append(average_val_iou)

    print(f"Epoch {epoch + 1}, Train IoU: {average_train_iou}, Val IoU: {average_val_iou}")
