In [10]:
import torch
import torch.nn as nn

# Zelle 2: Beispiel CNN-Modell
class ConvNet(nn.Module):
    def __init__(self, num_bugs=1): # Add num_bugs as an argument
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(3, 16, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(128, 256, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256*16*16, 128),
            nn.ReLU(),
            nn.Linear(128, num_bugs * 2)  # Output coordinates for multiple bugs
        )
        self.num_bugs = num_bugs # Store num_bugs

    def forward(self, x):
        output = self.fc(self.conv(x))
        return output.reshape(-1, self.num_bugs, 2) # Reshape to (batch_size, num_bugs, 2)


model = ConvNet()

In [11]:
#from google.colab import drive
#drive.mount('/content/drive')

In [22]:
import cv2
import torch
import pandas as pd
import numpy as np
from torch.utils.data import Dataset
from torchvision import transforms
from PIL import Image
import os

class VideoTrackingDataset(Dataset):
    def __init__(self, video_dir, label_dir, limit, resize=(512, 512), transform=None, max_objects=10):
        self.video_dir = video_dir
        self.label_dir = label_dir
        self.resize = resize
        self.transform = transform or transforms.Compose([
            transforms.Resize(resize),
            transforms.ToTensor()
        ])
        self.max_objects = max_objects

        self.samples = self._gather_samples()
        if limit is not None:
            self.samples = self.samples[:limit]

    def _gather_samples(self):
        samples = []
        for file in os.listdir(self.video_dir):
            if file.endswith(".mp4"):
                base = file[:-4]
                video_path = os.path.join(self.video_dir, file)
                label_path = os.path.join(self.label_dir, f"{base}.csv")
                if os.path.exists(label_path):
                    samples.append((video_path, label_path))
        return samples

    def __len__(self):
        return sum([self._count_frames(v) for v, _ in self.samples])

    def _count_frames(self, video_path):
        cap = cv2.VideoCapture(video_path)
        count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        cap.release()
        return count

    def __getitem__(self, index):
        cumulative = 0
        for video_path, label_path in self.samples:
            num_frames = self._count_frames(video_path)
            if index < cumulative + num_frames:
                frame_idx = index - cumulative
                return self._get_sample(video_path, label_path, frame_idx)
            cumulative += num_frames
        raise IndexError("Index außerhalb des Datasets")

    def _get_sample(self, video_path, label_path, frame_idx):
        cap = cv2.VideoCapture(video_path)
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
        ret, frame = cap.read()
        cap.release()
        if not ret:
            raise ValueError(f"Frame {frame_idx} konnte nicht gelesen werden.")

        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image = Image.fromarray(frame)
        original_width, original_height = image.size

        df = pd.read_csv(label_path)
        frame_data = df[df['t'] == frame_idx]

        coords = frame_data[['x', 'y']].values
        coords = coords / np.array([original_width, original_height])
        coords = coords.astype(np.float32)

        # Padding
        padded = np.full((self.max_objects, 2), -1, dtype=np.float32)
        num_coords = min(len(coords), self.max_objects)
        padded[:num_coords] = coords[:num_coords]

        image = self.transform(image)
        coords_tensor = torch.tensor(padded, dtype=torch.float32)

        return image, coords_tensor


In [24]:
dataset = VideoTrackingDataset("../training", "../training", limit=5, max_objects=10)
loader = torch.utils.data.DataLoader(dataset, batch_size=5, shuffle=False)

In [25]:
from torch import optim

# Zelle 4: Training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ConvNet().to(device)

optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

for epoch in range(1):
    for imgs, labels in loader:
        imgs, labels = imgs.to(device), labels.to(device)
        preds = model(imgs)
        loss = criterion(preds, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")


  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(inpu

KeyboardInterrupt: 

In [None]:
torch.save(model, "conv_net.pth")

In [None]:
import torch
import cv2
import pandas as pd
import numpy as np
from torchvision import transforms
from PIL import Image

# Lade das trainierte Modell
model = torch.load("conv_net.pth", weights_only=False)
model.eval()  # Setze das Modell in den Evaluierungsmodus

# Definiere die Transformationen für die Eingabebilder
transform = transforms.Compose([
    transforms.Resize((512, 512)),
    transforms.ToTensor()
])

# Pfad zum Video
video_path = "../training/training02.mp4"

# Lade das Video
cap = cv2.VideoCapture(video_path)

# Erstelle eine leere Liste, um die Vorhersagen zu speichern
predictions = []

# Verarbeite jeden Frame des Videos
frame_count = 0
row_count = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Konvertiere den Frame in ein PIL-Bild
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    image = Image.fromarray(frame)

    # Wende die Transformationen auf das Bild an
    image = transform(image)

    # Füge eine Batch-Dimension hinzu
    image = image.unsqueeze(0)

    # Führe die Vorhersage mit dem Modell aus
    with torch.no_grad():
        output = model(image)

    # Extrahiere die x- und y-Koordinaten aus der Vorhersage
    x, y = output[0, 0].tolist()

    # Hole die Originalgröße des Bildes
    original_height, original_width, _ = frame.shape

    # Rechne die x- und y-Koordinaten zurück
    x_original = x * original_width
    y_original = y * original_height

    # Füge die Vorhersage zur Liste hinzu
    predictions.append([row_count, frame_count, 0, x_original, y_original])

    frame_count += 1
    row_count += 1

# Schließe das Video
cap.release()

# Erstelle einen Pandas DataFrame aus den Vorhersagen
df = pd.DataFrame(predictions, columns=["", "t", "hexbug", "x", "y"])

# Speichere den DataFrame in einer CSV-Datei
df.to_csv("predictions.csv", index=False)

print("Vorhersagen wurden in predictions.csv gespeichert.")
