# MNIST z twistem: Multi-task Learning

Klasyczny MNIST to "Hello World" uczenia maszynowego. Dzisiaj trochę go **popsujemy**.

### Zadanie
Stworzyliśmy zmodyfikowany zbiór danych, w którym:
1. Ok. **20%** obrazków zostało odbitych lustrzanie w poziomie (Horizontal Flip)
2. Ok. **20%** obrazków zostało odbitych w pionie (Vertical Flip)

Twoim celem jest zbudowanie sieci neuronowej, która **jednocześnie** przewidzi:
1. Jaka cyfra jest na obrazku (0-9)
2. Czy obrazek jest odbity w poziomie (tak/nie)
3. Czy obrazek jest odbity w pionie (tak/nie)

To zadanie typu **Multi-Task Learning** - jedna sieć z trzema "głowami".

## Kryterium oceny

Jakość rozwiązania mierzymy za pomocą **accuracy** (dokładności) na zbiorze testowym.

Accuracy obliczamy jako procent przykładów, dla których **wszystkie trzy predykcje** są poprawne:
- cyfra (0-9)
- odbicie poziome (H-flip)
- odbicie pionowe (V-flip)

### Punktacja

Punktacja zależy od accuracy na zbiorze testowym:

* jeśli accuracy ≥ 96% – dostajesz **1 punkt**,
* jeśli accuracy ≤ 90% – dostajesz **0 punktów**,
* w przedziale 90-96% – punkty są skalowane **liniowo**.

$$
\text{punkty}(\text{acc}) =
\begin{cases}
1, & \text{gdy } \text{acc} \ge 0.96,\\
0, & \text{gdy } \text{acc} \le 0.90,\\
\dfrac{\text{acc} - 0.90}{0.06}, & \text{w przeciwnym razie.}
\end{cases}
$$

## Format zgłoszenia rozwiązania

Rozwiązaniem jest **jeden plik** w formacie `npz`, zapisany tak:

```python
np.savez(
    "solution.npz",
    digit=pred_digit,    # shape: (N,), dtype: int, wartości 0-9
    h_flip=pred_h_flip,  # shape: (N,), dtype: int, wartości 0 lub 1
    v_flip=pred_v_flip,  # shape: (N,), dtype: int, wartości 0 lub 1
)
```

gdzie:
* `N` to liczba przykładów w zbiorze testowym,
* `pred_digit` – przewidywane cyfry (0-9),
* `pred_h_flip` – przewidywane odbicia poziome (0 lub 1),
* `pred_v_flip` – przewidywane odbicia pionowe (0 lub 1).

In [None]:
import random

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch import Tensor
from torch.utils.data import DataLoader, Dataset

# Ziarno losowości dla powtarzalności
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Urządzenie: {DEVICE}")

## 1. Wczytywanie Danych

Wczytujemy przygotowane zbiory danych z plików.

In [None]:
# Wczytywanie danych z jednego pliku
data = np.load("data.npz")

# Dane treningowe
train_images = torch.from_numpy(data["train_images"]).float()
train_digits = torch.from_numpy(data["train_digit"]).long()
train_h = torch.from_numpy(data["train_h_flip"]).float()
train_v = torch.from_numpy(data["train_v_flip"]).float()

# Dane walidacyjne
val_images = torch.from_numpy(data["val_images"]).float()
val_digits = torch.from_numpy(data["val_digit"]).long()
val_h = torch.from_numpy(data["val_h_flip"]).float()
val_v = torch.from_numpy(data["val_v_flip"]).float()

# Dane testowe (tylko obrazki)
test_images = torch.from_numpy(data["test_images"]).float()

print(f"Zbiór treningowy: {len(train_images)} obrazków")
print(f"Zbiór walidacyjny: {len(val_images)} obrazków")
print(f"Zbiór testowy: {len(test_images)} obrazków")

In [None]:
# Tworzenie DataLoaderów
class TwistedMNISTDataset(Dataset):
    def __init__(self, images, digits, h_flips, v_flips):
        self.images = images
        self.digits = digits
        self.h_flips = h_flips
        self.v_flips = v_flips

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        return self.images[idx], self.digits[idx], self.h_flips[idx], self.v_flips[idx]


train_ds = TwistedMNISTDataset(train_images, train_digits, train_h, train_v)
val_ds = TwistedMNISTDataset(val_images, val_digits, val_h, val_v)

train_loader = DataLoader(train_ds, batch_size=64, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=1000, shuffle=False)
test_loader = DataLoader(test_images, batch_size=1000, shuffle=False)

## 2. Wizualizacja

Zobaczmy przykłady z każdą kombinacją transformacji.

In [None]:
def show_examples(dataset: TwistedMNISTDataset, n: int = 8) -> None:
    """Pokaż przykładowe obrazki z datasetu."""
    fig, axes = plt.subplots(2, n // 2, figsize=(12, 6))
    axes = axes.flatten()

    for i in range(n):
        img, digit, h, v = dataset[i]
        axes[i].imshow(img.squeeze(), cmap="gray")

        title = f"Cyfra: {digit}"
        if h:
            title += " [H]"
        if v:
            title += " [V]"
        axes[i].set_title(title)
        axes[i].axis("off")

    plt.tight_layout()
    plt.show()


show_examples(train_ds)

## 3. Model Multi-Head CNN

Model składa się z:
1. **Backbone**: Wspólne warstwy konwolucyjne
2. **Heads**: Trzy osobne "głowy" - po jednej dla każdego zadania

### TODO: Uzupełnij definicję modelu

Stwórz sieć z:
- Wspólną częścią konwolucyjną (backbone)
- Trzema osobnymi "głowami":
  - `digit_head` - klasyfikacja cyfr (10 klas)
  - `h_flip_head` - wykrywanie odbicia poziomego (binarna)
  - `v_flip_head` - wykrywanie odbicia pionowego (binarna)

In [None]:
class MultiHeadMNIST(nn.Module):
    """Sieć z trzema głowami: cyfra, h-flip, v-flip."""

    def __init__(self) -> None:
        super().__init__()

        # TODO: Zdefiniuj warstwy modelu
        pass

    def forward(self, x: Tensor) -> tuple[Tensor, Tensor, Tensor]:
        # TODO: Zaimplementuj forward pass
        # Zwróć tuple: (digit_out, h_flip_out, v_flip_out) - wszystkie 3 wartości PRZED softmax/sigmoid
        pass

## 4. Trening

### TODO: Uzupełnij pętlę treningową

Użyj:
- `CrossEntropyLoss` dla klasyfikacji cyfr
- `BCEWithLogitsLoss` dla wykrywania odbić (binarna klasyfikacja)

In [None]:
def evaluate(model: nn.Module, loader: DataLoader) -> dict:
    """Ewaluacja modelu na danym zbiorze."""
    model.eval()
    correct_digit = 0
    correct_h = 0
    correct_v = 0
    correct_all = 0
    total = 0

    with torch.no_grad():
        for images, labels_digit, labels_h, labels_v in loader:
            images = images.to(DEVICE)
            out_digit, out_h, out_v = model(images)

            # Predykcje
            pred_digit = out_digit.argmax(dim=1)
            pred_h = (torch.sigmoid(out_h) > 0.5).squeeze()
            pred_v = (torch.sigmoid(out_v) > 0.5).squeeze()

            # Zliczanie poprawnych
            digit_correct = pred_digit == labels_digit.to(DEVICE)
            h_correct = pred_h == labels_h.to(DEVICE)
            v_correct = pred_v == labels_v.to(DEVICE)

            correct_digit += digit_correct.sum().item()
            correct_h += h_correct.sum().item()
            correct_v += v_correct.sum().item()
            correct_all += (digit_correct & h_correct & v_correct).sum().item()
            total += labels_digit.size(0)

    return {
        "all": 100 * correct_all / total,
        "digit": 100 * correct_digit / total,
        "h_flip": 100 * correct_h / total,
        "v_flip": 100 * correct_v / total,
    }


def train_model(
    model: nn.Module,
    train_loader: DataLoader,
    val_loader: DataLoader,
    epochs: int = 10,
) -> None:
    """Trenuj model przez zadaną liczbę epok."""

    # TODO: Zdefiniuj optymalizator i funkcje straty
    # optimizer = ...
    # criterion_digit = nn.CrossEntropyLoss()
    # criterion_flip = nn.BCEWithLogitsLoss()

    for epoch in range(1, epochs + 1):
        model.train()
        total_loss = 0.0

        for images, labels_digit, labels_h, labels_v in train_loader:
            images = images.to(DEVICE)
            labels_digit = labels_digit.to(DEVICE)
            labels_h = labels_h.to(DEVICE).unsqueeze(1)
            labels_v = labels_v.to(DEVICE).unsqueeze(1)

            # TODO: Uzupełnij pętlę treningową
            # 1. Wyzeruj gradienty
            # 2. Przepuść obrazy przez model
            # 3. Oblicz 3 straty i zsumuj je
            # 4. Backpropagacja
            # 5. Krok optymalizatora
            pass

        # Ewaluacja na zbiorze treningowym i walidacyjnym
        train_metrics = evaluate(model, train_loader)
        val_metrics = evaluate(model, val_loader)

        print(
            f"Epoka {epoch:2d} | "
            f"Train: {train_metrics['all']:.1f}% | "
            f"Val: {val_metrics['all']:.1f}% | "
            f"(Cyfry: {val_metrics['digit']:.1f}%, "
            f"H: {val_metrics['h_flip']:.1f}%, "
            f"V: {val_metrics['v_flip']:.1f}%)"
        )

In [None]:
model = MultiHeadMNIST().to(DEVICE)
print(f"Parametry modelu: {sum(p.numel() for p in model.parameters()):,}")

train_model(model, train_loader, val_loader, epochs=10)

## 5. Analiza Wyników

Zobaczmy macierz pomyłek (confusion matrix) dla cyfr.

In [None]:
def plot_confusion_matrix(model: nn.Module, loader: DataLoader) -> None:
    """Narysuj macierz pomyłek dla predykcji cyfr."""
    from sklearn.metrics import confusion_matrix
    import seaborn as sns

    model.eval()
    all_true = []
    all_pred = []

    with torch.no_grad():
        for images, labels_digit, _, _ in loader:
            images = images.to(DEVICE)
            out_digit, _, _ = model(images)
            pred = out_digit.argmax(dim=1).cpu().numpy()

            all_true.extend(labels_digit.numpy())
            all_pred.extend(pred)

    cm = confusion_matrix(all_true, all_pred)

    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
    plt.xlabel("Predykcja")
    plt.ylabel("Prawda")
    plt.title("Macierz pomyłek - rozpoznawanie cyfr")
    plt.show()


plot_confusion_matrix(model, val_loader)

In [None]:
def plot_flip_accuracy_by_digit(model: nn.Module, loader: DataLoader) -> None:
    """Pokaż accuracy dla H-flip i V-flip w podziale na cyfry."""
    model.eval()
    
    # Słowniki: cyfra -> (correct_h, correct_v, total)
    stats = {d: {"correct_h": 0, "correct_v": 0, "total": 0} for d in range(10)}
    
    with torch.no_grad():
        for images, labels_digit, labels_h, labels_v in loader:
            images = images.to(DEVICE)
            _, out_h, out_v = model(images)
            
            pred_h = (torch.sigmoid(out_h) > 0.5).squeeze().cpu()
            pred_v = (torch.sigmoid(out_v) > 0.5).squeeze().cpu()
            
            for i, digit in enumerate(labels_digit.numpy()):
                stats[digit]["total"] += 1
                if pred_h[i] == labels_h[i]:
                    stats[digit]["correct_h"] += 1
                if pred_v[i] == labels_v[i]:
                    stats[digit]["correct_v"] += 1
    
    # Oblicz accuracy
    digits = list(range(10))
    acc_h = [100 * stats[d]["correct_h"] / stats[d]["total"] for d in digits]
    acc_v = [100 * stats[d]["correct_v"] / stats[d]["total"] for d in digits]
    
    # Wykres
    x = np.arange(10)
    width = 0.35
    
    fig, ax = plt.subplots(figsize=(12, 5))
    bars_h = ax.bar(x - width/2, acc_h, width, label="H-Flip", color="steelblue")
    bars_v = ax.bar(x + width/2, acc_v, width, label="V-Flip", color="coral")
    
    ax.set_xlabel("Cyfra")
    ax.set_ylabel("Accuracy (%)")
    ax.set_title("Accuracy wykrywania odbić w podziale na cyfry")
    ax.set_xticks(x)
    ax.set_xticklabels(digits)
    ax.legend()
    ax.set_ylim(0, 105)
    
    # Dodaj wartości na słupkach
    for bar in bars_h:
        ax.annotate(f'{bar.get_height():.1f}',
                    xy=(bar.get_x() + bar.get_width() / 2, bar.get_height()),
                    ha='center', va='bottom', fontsize=8)
    for bar in bars_v:
        ax.annotate(f'{bar.get_height():.1f}',
                    xy=(bar.get_x() + bar.get_width() / 2, bar.get_height()),
                    ha='center', va='bottom', fontsize=8)
    
    # Zaznacz cyfry symetryczne
    ax.axvspan(-0.5, 0.5, alpha=0.1, color='green', label='Symetryczne')
    ax.axvspan(0.5, 1.5, alpha=0.1, color='green')
    ax.axvspan(7.5, 8.5, alpha=0.1, color='green')
    
    plt.tight_layout()
    plt.show()
    
    # Podsumowanie
    symmetric = [0, 1, 8]
    asymmetric = [2, 3, 4, 5, 6, 7, 9]
    
    avg_h_sym = np.mean([acc_h[d] for d in symmetric])
    avg_h_asym = np.mean([acc_h[d] for d in asymmetric])
    avg_v_sym = np.mean([acc_v[d] for d in symmetric])
    avg_v_asym = np.mean([acc_v[d] for d in asymmetric])
    
    print(f"Średnia accuracy H-Flip: symetryczne (0,1,8): {avg_h_sym:.1f}% | asymetryczne: {avg_h_asym:.1f}%")
    print(f"Średnia accuracy V-Flip: symetryczne (0,1,8): {avg_v_sym:.1f}% | asymetryczne: {avg_v_asym:.1f}%")


plot_flip_accuracy_by_digit(model, val_loader)

## 6. Przykłady Błędów

In [None]:
def show_errors(model: nn.Module, dataset: TwistedMNISTDataset, n: int = 8) -> None:
    """Pokaż przykłady błędnych predykcji (błąd w dowolnym z trzech zadań)."""
    model.eval()
    errors = []

    def format_label(digit: int, h: bool, v: bool) -> str:
        """Formatuj etykietę jako 'cyfra [H?] [V?]'."""
        s = str(digit)
        s += " [H]" if h else " [ ]"
        s += " [V]" if v else " [ ]"
        return s

    with torch.no_grad():
        for i in range(len(dataset)):
            img, true_digit, true_h, true_v = dataset[i]
            out_digit, out_h, out_v = model(img.unsqueeze(0).to(DEVICE))
            
            pred_digit = out_digit.argmax().item()
            pred_h = (torch.sigmoid(out_h) > 0.5).item()
            pred_v = (torch.sigmoid(out_v) > 0.5).item()

            # Błąd jeśli dowolny z elementów nie pasuje
            digit_wrong = pred_digit != true_digit
            h_wrong = pred_h != true_h
            v_wrong = pred_v != true_v

            if digit_wrong or h_wrong or v_wrong:
                errors.append({
                    "idx": i,
                    "true_digit": true_digit.item(), "true_h": bool(true_h), "true_v": bool(true_v),
                    "pred_digit": pred_digit, "pred_h": pred_h, "pred_v": pred_v,
                    "digit_wrong": digit_wrong, "h_wrong": h_wrong, "v_wrong": v_wrong,
                })

            if len(errors) >= n:
                break

    if not errors:
        print("Brak błędów!")
        return

    fig, axes = plt.subplots(2, n // 2, figsize=(14, 7))
    axes = axes.flatten()

    for i, err in enumerate(errors):
        img, _, _, _ = dataset[err["idx"]]
        axes[i].imshow(img.squeeze(), cmap="gray")

        true_str = format_label(err["true_digit"], err["true_h"], err["true_v"])
        pred_str = format_label(err["pred_digit"], err["pred_h"], err["pred_v"])
        
        # Kolorowanie tytułu w zależności od typu błędu
        error_types = []
        if err["digit_wrong"]:
            error_types.append("D")
        if err["h_wrong"]:
            error_types.append("H")
        if err["v_wrong"]:
            error_types.append("V")
        
        title = f"True: {true_str}\nPred: {pred_str}\nBłąd: {','.join(error_types)}"
        axes[i].set_title(title, color="red", fontsize=9)
        axes[i].axis("off")

    plt.suptitle("Błędne predykcje (D=cyfra, H=h-flip, V=v-flip)", fontsize=12)
    plt.tight_layout()
    plt.show()


show_errors(model, val_ds)

## 7. Predykcja na zbiorze testowym i zapis rozwiązania

In [None]:
def predict_test(model: nn.Module, test_loader: DataLoader) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Generuj predykcje dla zbioru testowego."""
    model.eval()
    all_digits = []
    all_h = []
    all_v = []

    with torch.no_grad():
        for images in test_loader:
            images = images.to(DEVICE)
            out_digit, out_h, out_v = model(images)

            pred_digit = out_digit.argmax(dim=1).cpu().numpy()
            pred_h = (torch.sigmoid(out_h) > 0.5).squeeze().cpu().numpy().astype(int)
            pred_v = (torch.sigmoid(out_v) > 0.5).squeeze().cpu().numpy().astype(int)

            all_digits.extend(pred_digit)
            all_h.extend(pred_h)
            all_v.extend(pred_v)

    return np.array(all_digits), np.array(all_h), np.array(all_v)


# Generowanie predykcji
pred_digit, pred_h, pred_v = predict_test(model, test_loader)

print(f"Liczba predykcji: {len(pred_digit)}")
print(f"Przykładowe predykcje cyfr: {pred_digit[:10]}")
print(f"Przykładowe predykcje H-flip: {pred_h[:10]}")
print(f"Przykładowe predykcje V-flip: {pred_v[:10]}")

In [None]:
# Zapisywanie rozwiązania
assert pred_digit.shape == (len(test_images),), f"Nieprawidłowe wymiary digit: {pred_digit.shape}"
assert pred_h.shape == (len(test_images),), f"Nieprawidłowe wymiary h_flip: {pred_h.shape}"
assert pred_v.shape == (len(test_images),), f"Nieprawidłowe wymiary v_flip: {pred_v.shape}"

np.savez(
    "solution.npz",
    digit=pred_digit,
    h_flip=pred_h,
    v_flip=pred_v,
)
print("Zapisano plik 'solution.npz' z wynikami dla zbioru testowego.")

## Wnioski

1. **Symetria to problem** - Cyfry symetryczne (0, 1, 8) są trudne do wykrycia czy są odbite
2. **Multi-task learning działa** - Jedna sieć uczy się trzech zadań jednocześnie
3. **Eksperyment**: Spróbuj zmienić wagi strat i zobacz jak wpływa to na wyniki