In [1]:
import torch
import torch.nn as nn
from torchvision import models
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam
from torchvision import transforms
import torchmetrics
from facenet_pytorch import MTCNN

import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint

import os
import re
import glob
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt

In [2]:
DATA_ROOT = "./data/vc_clothes"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 32
LR = 0.00035
EPOCHS = 3

print(f"Device: {DEVICE}")

Device: cpu


# Ekstaktor twarzy (mój Pyramidbox)


In [3]:
class FaceDetector:
    def __init__(self, device="cpu", confidence_threshold=0.8):
        self.device = device
        self.confidence_threshold = confidence_threshold

        self.mtcnn = MTCNN(keep_all=False, device=device)

        # Parametry z artykułu
        self.expansion_pixels = 15
        self.target_size = (50, 50)

        self.transform = transforms.Compose(
            [
                transforms.Resize(self.target_size),
                transforms.ToTensor(),
                transforms.Normalize(
                    mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
                ),
            ]
        )

    def get_face_tensor(self, image: Image):
        boxes, probs = self.mtcnn.detect(image)

        if boxes is None:
            return None

        # Filtrowanie po progu pewności
        valid_indices = [
            i for i, p in enumerate(probs) if p > self.confidence_threshold
        ]

        if not valid_indices:
            return None

        # Wybór najlepszej twarzy
        best_idx = valid_indices[np.argmax(probs[valid_indices])]
        box = boxes[best_idx]

        # Wycinanie twarzy
        x1, y1, x2, y2 = [int(b) for b in box]
        w_img, h_img = image.size

        x1 = max(0, x1 - self.expansion_pixels)
        y1 = max(0, y1 - self.expansion_pixels)
        x2 = min(w_img, x2 + self.expansion_pixels)
        y2 = min(h_img, y2 + self.expansion_pixels)

        face_crop = image.crop((x1, y1, x2, y2))
        tensor = self.transform(face_crop)

        return tensor.to(self.device)

# Dataset dla zbioru danych VC-Clothes


In [4]:
def build_transforms(normalize=False, height=256, width=128):
    normalize = transforms.Normalize(
        mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
    )

    if normalize:
        transform = transforms.Compose(
            [
                transforms.Resize((height, width)),
                transforms.ToTensor(),
                normalize,
            ]
        )
    else:
        transform = transforms.Compose(
            [
                transforms.Resize((height, width)),
                transforms.ToTensor(),
            ]
        )

    return transform

In [5]:
class VCClothesDataset(Dataset):
    def __init__(self, root_dir, mode="train", transform=None, verbose=True):

        self.detector = FaceDetector(device="cpu", confidence_threshold=0.5)
        self.root_dir = root_dir
        self.mode = mode
        self.transform_img = (
            build_transforms(normalize=True) if transform is None else transform
        )

        if mode == "train":
            self.data_dir = root_dir + "/train"
        elif mode == "query":
            self.data_dir = root_dir + "/query"
        elif mode == "gallery":
            self.data_dir = root_dir + "/gallery"
        else:
            raise ValueError("Mode musi być jednym z: 'train', 'query', 'gallery'")

        if not os.path.exists(self.data_dir):
            raise RuntimeError(f"Katalog {self.data_dir} nie istnieje.")

        self.dataset = self._process_dir(self.data_dir)

        self.img_paths = [x[0] for x in self.dataset]
        self.pids = [x[1] for x in self.dataset]
        self.camids = [x[2] for x in self.dataset]
        self.clothes_ids = [x[3] for x in self.dataset]

        # Zmiana mapowania PID dla treningu (muszą być ciągłe od 0 do N-1)
        unique_pids = sorted(list(set(self.pids)))
        self.pid_map = {pid: i for i, pid in enumerate(unique_pids)}
        self.pids = [self.pid_map[pid] for pid in self.pids]

        if verbose:
            print(f"Załadowano zbiór VC-Clothes ({mode}):")
            print(f"  Liczba obrazów: {len(self.dataset)}")
            print(f"  Liczba unikalnych ID: {len(set(self.pids))}")
            print(f"  Liczba kamer: {len(set(self.camids))}")
            print(f"  Liczba unikalnych ubrań: {len(set(self.clothes_ids))}")

    def _process_dir(self, dir_path):
        img_paths = glob.glob(os.path.join(dir_path, "*.jpg")) + glob.glob(
            os.path.join(dir_path, "*.png")
        )

        pattern = re.compile(
            r"([-\d]+)-(\d+)-(\d+)-(\d+)"
        )  # Format: PID-CAM-CLOTH_xxx.jpg

        dataset = []
        for img_path in img_paths:
            filename = os.path.basename(img_path)

            match = pattern.search(filename)
            if not match:
                continue

            pid, camid, clothes_id, _ = map(int, match.groups())

            if pid == -1:
                continue  # Pomijanie obrazów "junk"

            dataset.append((img_path, pid, camid, clothes_id))

        return dataset

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, index):
        path = self.img_paths[index]
        pid = self.pids[index]
        camid = self.camids[index]
        clothes_id = self.clothes_ids[index]

        img = Image.open(path).convert("RGB")

        face = self.detector.get_face_tensor(img)

        if self.transform_img is not None:
            img = self.transform_img(img)

        if isinstance(face, torch.Tensor):
            face = face.float()
        else:
            face = torch.zeros((3, 50, 50), dtype=torch.float)

        pid = torch.tensor(pid, dtype=torch.long)
        camid = torch.tensor(camid, dtype=torch.long)
        clothes_id = torch.tensor(clothes_id, dtype=torch.long)

        return img, pid, camid, clothes_id, face

# Moduł danych dla zbioru twarzy


In [6]:
class VCClothesDataModule(pl.LightningDataModule):
    def __init__(self, root_dir, batch_size=32, num_workers=0, val_split: float = 0.1):
        super().__init__()
        self.root_dir = root_dir
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.val_split = val_split

        self.transform = build_transforms(normalize=True)

    def setup(self, stage=None):
        gallery_dataset = VCClothesDataset(
            root_dir=self.root_dir, mode="gallery", transform=self.transform
        )

        total_size = len(gallery_dataset)
        val_size = int(total_size * self.val_split)
        train_size = total_size - val_size

        self.train_ds, self.val_ds = torch.utils.data.random_split(
            gallery_dataset, [train_size, val_size]
        )

        self.test_ds = VCClothesDataset(
            root_dir=self.root_dir, mode="query", transform=self.transform
        )

    def train_dataloader(self):
        return DataLoader(
            self.train_ds,
            batch_size=self.batch_size,
            shuffle=True,
            num_workers=self.num_workers,
        )

    def val_dataloader(self):
        return DataLoader(
            self.val_ds,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=self.num_workers,
        )

    def test_dataloader(self):
        return DataLoader(
            self.test_ds,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=self.num_workers,
        )

# Face feature extractor


In [7]:
class FaceFeatureExtractor(nn.Module):
    def __init__(self, num_classes=256, embedding_dim=512):
        super(FaceFeatureExtractor, self).__init__()

        self.backbone = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
        for param in self.backbone.parameters():
            param.requires_grad = False

        self.in_features = self.backbone.fc.in_features
        self.backbone.fc = nn.Identity()

        self.custom_head = nn.Sequential(
            nn.Linear(self.in_features, embedding_dim),
            nn.BatchNorm1d(embedding_dim),
            nn.ReLU(),
        )

        self.classifier = nn.Linear(embedding_dim, num_classes)

    def forward(self, x):
        self.backbone.eval()

        features = self.backbone(x)

        embedding = self.custom_head(features)

        embedding_norm = torch.nn.functional.normalize(embedding, p=2, dim=1)

        return self.classifier(embedding_norm)

# Moduł pytorch_lightning dla modelu


In [8]:
class FaceReIDModule(pl.LightningModule):
    def __init__(self, num_classes=256, learning_rate=0.00035, embedding_dim=512):
        super().__init__()
        self.lr = learning_rate

        self.model = FaceFeatureExtractor(
            num_classes=num_classes, embedding_dim=embedding_dim
        )

        self.criterion = nn.CrossEntropyLoss()
        # self.accuracy = torchmetrics.Accuracy(
        #     task="multiclass", num_classes=num_classes
        # )

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        _, pids, _, _, faces = batch

        # Filtorwanie
        # valid_mask = faces.sum(dim=(1, 2, 3)) > 0

        # if valid_mask.sum() == 0:
        #     return None  # Pomijamy krok, jeśli w całym batchu nie ma twarzy

        # valid_faces = faces[valid_mask]
        # valid_pids = pids[valid_mask]

        valid_faces = faces
        valid_pids = pids

        logits = self.model(valid_faces)
        loss = self.criterion(logits, valid_pids)

        preds = torch.argmax(logits, dim=1)
        acc = (preds == valid_pids).float().mean()
        # acc = self.accuracy(logits, valid_pids)

        self.log("train_loss", loss, on_step=True, on_epoch=True, prog_bar=True)
        self.log("train_acc", acc, on_step=False, on_epoch=True, prog_bar=True)

        return loss

    def validation_step(self, batch, batch_idx):
        _, pids, _, _, faces = batch

        # valid_mask = faces.sum(dim=(1, 2, 3)) > 0
        # if valid_mask.sum() == 0:
        #     return

        # valid_faces = faces[valid_mask]
        # valid_pids = pids[valid_mask]

        valid_faces = faces
        valid_pids = pids

        logits = self.model(valid_faces)
        loss = self.criterion(logits, valid_pids)

        preds = torch.argmax(logits, dim=1)
        acc = (preds == valid_pids).float().mean()

        # acc = self.accuracy(logits, valid_pids)

        self.log("val_loss", loss, prog_bar=True)
        self.log("val_acc", acc, prog_bar=True)
        return loss

    def test_step(self, batch, batch_idx):
        _, pids, _, _, faces = batch

        # valid_mask = faces.sum(dim=(1, 2, 3)) > 0
        # if valid_mask.sum() == 0:
        #     return

        # valid_faces = faces[valid_mask]
        # valid_pids = pids[valid_mask]

        valid_faces = faces
        valid_pids = pids

        logits = self.model(valid_faces)

        preds = torch.argmax(logits, dim=1)
        acc = (preds == valid_pids).float().mean()

        # acc = self.accuracy(logits, valid_pids)

        self.log("test_acc", acc)

    def configure_optimizers(self):
        optimizer = Adam(
            filter(lambda p: p.requires_grad, self.model.parameters()),
            lr=self.lr,
        )
        return optimizer

# Trening


In [9]:
dm = VCClothesDataModule(root_dir=DATA_ROOT, batch_size=64, num_workers=0)

num_classes = 256

model = FaceReIDModule(num_classes=num_classes, learning_rate=LR, embedding_dim=512)

checkpoint_callback = ModelCheckpoint(
    monitor="val_acc", mode="max", filename="face-reid-{epoch:02d}-{val_acc:.2f}"
)

trainer = Trainer(
    max_epochs=20,
    accelerator="auto",  # Automatycznie wykryje GPU
    devices=1,
    callbacks=[checkpoint_callback],
    # log_every_n_steps=10
)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
c:\Programing\Python\Python312\Lib\site-packages\pytorch_lightning\trainer\connectors\logger_connector\logger_connector.py:76: Starting from v1.9.0, `tensorboardX` has been removed as a dependency of the `pytorch_lightning` package, due to potential conflicts with other packages in the ML ecosystem. For this reason, `logger=True` will use `CSVLogger` as the default logger, unless the `tensorboard` or `tensorboardX` packages are found. Please `pip install lightning[extra]` or one of them to enable TensorBoard support by default


In [10]:
trainer.fit(model, datamodule=dm)

  state_dict = torch.load(state_dict_path)
  state_dict = torch.load(state_dict_path)
  state_dict = torch.load(state_dict_path)

  | Name      | Type                 | Params | Mode  | FLOPs
-------------------------------------------------------------------
0 | model     | FaceFeatureExtractor | 24.7 M | train | 0    
1 | criterion | CrossEntropyLoss     | 0      | train | 0    
-------------------------------------------------------------------
1.2 M     Trainable params
23.5 M    Non-trainable params
24.7 M    Total params
98.758    Total estimated model params size (MB)
158       Modules in train mode
0         Modules in eval mode
0         Total Flops


Załadowano zbiór VC-Clothes (gallery):
  Liczba obrazów: 8591
  Liczba unikalnych ID: 256
  Liczba kamer: 4
  Liczba unikalnych ubrań: 3
Załadowano zbiór VC-Clothes (query):
  Liczba obrazów: 1020
  Liczba unikalnych ID: 256
  Liczba kamer: 4
  Liczba unikalnych ubrań: 3


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

c:\Programing\Python\Python312\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:434: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.
c:\Programing\Python\Python312\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:434: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.


Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

`Trainer.fit` stopped: `max_epochs=20` reached.


# Ewaluacja


In [11]:
trainer.test(model, datamodule=dm)

Załadowano zbiór VC-Clothes (gallery):
  Liczba obrazów: 8591
  Liczba unikalnych ID: 256
  Liczba kamer: 4
  Liczba unikalnych ubrań: 3
Załadowano zbiór VC-Clothes (query):
  Liczba obrazów: 1020
  Liczba unikalnych ID: 256
  Liczba kamer: 4
  Liczba unikalnych ubrań: 3


c:\Programing\Python\Python312\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:434: The 'test_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.


Testing: |          | 0/? [00:00<?, ?it/s]

────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
        test_acc             0.522549033164978
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


[{'test_acc': 0.522549033164978}]

In [12]:
def evaluate_classification_metrics(model, dataloader, device):
    model.eval()
    model.to(device)

    correct_1 = 0
    correct_5 = 0
    correct_10 = 0
    total = 0

    with torch.no_grad():
        for batch in dataloader:
            _, pids, _, _, faces = batch

            faces = faces.to(device)
            pids = pids.to(device)

            logits = model(faces)

            _, pred_indices = logits.topk(k=10, dim=1)

            pids_reshaped = pids.view(-1, 1)

            matches = pred_indices == pids_reshaped

            correct_1 += matches[:, 0].sum().item()
            correct_5 += matches[:, :5].sum().item()
            correct_10 += matches[:, :10].sum().item()

            total += pids.size(0)

    r1 = correct_1 / total
    r5 = correct_5 / total
    r10 = correct_10 / total

    return r1, r5, r10

In [13]:
r1, r5, r10 = evaluate_classification_metrics(model, dm.test_dataloader(), DEVICE)

print("Final evaluation on test set:")
print(f"Rank-1 (test_acc): {r1:.2%}")
print(f"Rank-5  : {r5:.2%}")
print(f"Rank-10 : {r10:.2%}")

Final evaluation on test set:
Rank-1 (test_acc): 52.25%
Rank-5  : 70.29%
Rank-10 : 77.25%
