In [3]:
import os
import json
import torch
from PIL import Image
from tqdm import tqdm
from torchvision.transforms import functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# === 1. Загрузка аннотаций ===
with open("eccv_18_annotation_files/train_annotations.json") as f:
    data = json.load(f)

# === 2. Построение сопоставления category_id → class_index ===
# 🔹 Выбираем нужные классы
selected_class_names = ["deer", "raccoon", "coyote", "dog"]

# 🔹 Сопоставление названий классов с их исходными id
all_categories = data["categories"]
name_to_id = {c["name"]: c["id"] for c in all_categories}
selected_class_ids = [name_to_id[name] for name in selected_class_names]

# 🔹 Новый компактный маппинг: old_id → new_index
id_to_idx = {cid: idx for idx, cid in enumerate(selected_class_ids)}

# 🔹 Фильтрация аннотаций по нужным классам
filtered_annotations = [ann for ann in data["annotations"] if ann["category_id"] in selected_class_ids]

# 🔹 Отбор изображений, где есть эти аннотации
used_image_ids = set(ann["image_id"] for ann in filtered_annotations)
filtered_images = [img for img in data["images"] if img["id"] in used_image_ids]

# 🔹 Оставляем только 100 изображений
filtered_images = filtered_images[:100]

# 🔹 Перефильтруем аннотации по этим изображениям
selected_image_ids = set(img["id"] for img in filtered_images)
filtered_annotations = [ann for ann in filtered_annotations if ann["image_id"] in selected_image_ids]


In [4]:
# === 3. Класс Dataset ===
class AnimalDataset(Dataset):
    def __init__(self, images, annotations, image_dir, category_id_mapping, transforms=None):
        self.images = images
        self.transforms = transforms
        self.image_dir = image_dir
        self.category_id_mapping = category_id_mapping

        self.image_id_to_annotations = {}
        for ann in annotations:
            image_id = ann.get("image_id")
            if image_id is not None:
                self.image_id_to_annotations.setdefault(image_id, []).append(ann)

    def __getitem__(self, idx):
        image_info = self.images[idx]
        image_id = image_info["id"]
        file_name = image_info["file_name"]
        image_path = os.path.join(self.image_dir, file_name)

        try:
            image = Image.open(image_path).convert("RGB")
        except Exception as e:
            raise RuntimeError(f"Не удалось загрузить изображение: {image_path}") from e

        boxes = []
        labels = []
        for ann in self.image_id_to_annotations.get(image_id, []):
            if "bbox" not in ann or "category_id" not in ann:
                continue
            x, y, w, h = ann["bbox"]
            if w <= 0 or h <= 0:
                continue
            class_id = self.category_id_mapping.get(ann["category_id"])
            if class_id is None:
                continue
            boxes.append([x, y, x + w, y + h])
            labels.append(class_id)

        if len(boxes) == 0:
            boxes = [[0, 0, 1, 1]]
            labels = [0]

        boxes = torch.tensor(boxes, dtype=torch.float32)
        labels = torch.tensor(labels, dtype=torch.int64)

        target = {
            "boxes": boxes,
            "labels": labels,
            "image_id": torch.tensor([idx])
        }

        if self.transforms:
            image = self.transforms(image)
        else:
            image = F.to_tensor(image)

        return image, target

    def __len__(self):
        return len(self.images)



In [7]:
# === 4. Подготовка данных и модели ===
image_dir = "eccv_18_all_images_sm/eccv_18_all_images_sm"  # путь к папке с изображениями
dataset = AnimalDataset(
    filtered_images,
    filtered_annotations,
    image_dir=image_dir,
    category_id_mapping=id_to_idx
)
data_loader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = fasterrcnn_resnet50_fpn(pretrained=True)
in_features = model.roi_heads.box_predictor.cls_score.in_features
num_classes = len(id_to_idx) + 1  # +1 для background
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
model.to(device)

optimizer = torch.optim.SGD(model.parameters(), lr=0.0, momentum=0.9)

# === 5. Обучение с tqdm ===
model.train()
num_epochs = 5

for epoch in range(num_epochs):
    epoch_loss = 0.0
    print(f"\n🧠 Epoch {epoch + 1}/{num_epochs}")

    for images, targets in tqdm(data_loader, desc=f"Epoch {epoch + 1}"):
        images = list(img.to(device) for img in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        epoch_loss += losses.item()

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

    avg_loss = epoch_loss / len(data_loader)
    print(f"✅ Epoch {epoch + 1} завершена. Средний Loss: {avg_loss:.4f}")


🧠 Epoch 1/5


Epoch 1: 100%|██████████| 50/50 [01:44<00:00,  2.08s/it]


✅ Epoch 1 завершена. Средний Loss: 16.8335

🧠 Epoch 2/5


Epoch 2: 100%|██████████| 50/50 [01:43<00:00,  2.06s/it]


✅ Epoch 2 завершена. Средний Loss: 18.1189

🧠 Epoch 3/5


Epoch 3: 100%|██████████| 50/50 [01:43<00:00,  2.08s/it]


✅ Epoch 3 завершена. Средний Loss: 16.8346

🧠 Epoch 4/5


Epoch 4: 100%|██████████| 50/50 [01:45<00:00,  2.12s/it]


✅ Epoch 4 завершена. Средний Loss: 17.0745

🧠 Epoch 5/5


Epoch 5: 100%|██████████| 50/50 [01:45<00:00,  2.12s/it]

✅ Epoch 5 завершена. Средний Loss: 17.2626





In [None]:
import cv2
from torchvision.transforms import functional as F
from PIL import Image

# === idx_to_name: словарь индексов в имена классов (создан ранее) ===
# Пример: idx_to_name = {0: "deer", 1: "raccoon", 2: "coyote", 3: "dog"}

def detect_on_video(video_path, model, device, idx_to_name, output_path="output.avi", conf_thresh=0.5):
    model.eval()
    cap = cv2.VideoCapture(video_path)
    
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    with torch.no_grad():
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Преобразуем кадр в PIL → Tensor
            image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            image_tensor = F.to_tensor(image).unsqueeze(0).to(device)

            predictions = model(image_tensor)[0]

            for box, label, score in zip(predictions['boxes'], predictions['labels'], predictions['scores']):
                if score < conf_thresh:
                    continue
                x1, y1, x2, y2 = map(int, box.tolist())
                class_name = idx_to_name.get(label.item(), "unknown")
                color = (0, 255, 0)
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                text = f"{class_name}: {score:.2f}"
                cv2.putText(frame, text, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)

            out.write(frame)

    cap.release()
    out.release()
    print(f"🎬 Обработка завершена. Сохранено в: {output_path}")


In [None]:
detect_on_video(
    video_path="your_input_video.mp4",
    model=model,
    device=device,
    idx_to_name=idx_to_name,
    output_path="detection_output.avi",
    conf_thresh=0.5
)
