In [None]:
import requests
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from PIL import Image
import numpy as np
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import classification_report, accuracy_score
from torchvision import models
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import accuracy_score, classification_report
from torchvision import models
import numpy as np


# Definiujemy transformacje – najpierw resize, potem konwersja do tensora i normalizacja
my_transform = transforms.Compose([
    # transforms.Resize((32, 32)),  # standardowy rozmiar dla ResNet
    # transforms.ToTensor(),
    # transforms.Normalize(mean=[0.485, 0.456, 0.406],
    #                      std=[0.229, 0.224, 0.225])
    transforms.Resize((32, 32)),
    transforms.Lambda(lambda x: x.convert("RGB")),
    transforms.ToTensor(),
])

class TaskDataset(Dataset):
    def __init__(self, data, transform=None):
        # data.imgs -> lista/tablica obrazów (mogą być PIL.Image lub numpy.array)
        # data.labels -> lista/tablica etykiet
        self.imgs = data.imgs
        self.labels = data.labels
        self.transform = transform if transform is not None else transforms.ToTensor()

    def __len__(self):
        return len(self.imgs)

    def __getitem__(self, idx):
        x = self.imgs[idx]
        # Jeśli obraz jest tablicą numpy, konwertujemy do PIL
        if isinstance(x, np.ndarray):
            x = Image.fromarray(x)
        # Jeśli obraz nie jest w trybie RGB, konwertujemy go
        if x.mode != "RGB":
            x = x.convert("RGB")
        # Zastosowanie transformacji
        if self.transform:
            x = self.transform(x)
        y = self.labels[idx]
        return x, y

# Wczytanie danych z pliku .pt
data = torch.load('/content/drive/MyDrive/Train.pt',
                  weights_only=False,
                  map_location=torch.device('cpu'))

# Tworzymy dataset z transformacjami
dataset = TaskDataset(data, transform=my_transform)

# Podział na zbiór treningowy (80%) i testowy (20%)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

batch_size = 64

# DataLoader dla zbioru treningowego
train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=2,
    pin_memory=True,
    persistent_workers=False  # Jeśli dalej występują problemy, można ustawić False
)

# DataLoader dla zbioru testowego
test_loader = DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=2,
    pin_memory=True,
    persistent_workers=False
)

print("Train set size:", len(train_dataset))
print("Test set size:", len(test_dataset))



def train_and_evaluate_free(
    model, train_loader, test_loader, device,
    num_epochs=4, m=8, eps=2, alpha=2, lr=0.001
):
    """
    Free-m Adversarial Training:
      - num_epochs: liczba epok (outer loop) – tutaj 8 epok
      - m: liczba powtórzeń mini-batcha (wewnętrzna pętla)
      - eps: budżet perturbacji (norma L_inf)
      - alpha: krok aktualizacji perturbacji
      - lr: learning rate
    """
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    model.to(device)

    #delta = torch.zeros_like(batch_x, requires_grad=True).to(device)
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        num_samples = 0

        for batch_x, batch_y in train_loader:
            batch_x = batch_x.to(device)
            batch_y = batch_y.to(device)

            # Inicjalizacja perturbacji dla tego mini-batcha jako tensor liściowy
            delta = torch.zeros_like(batch_x, requires_grad=True).to(device)

            for i in range(m):
                adv_x = batch_x + delta
                # adv_x = torch.clamp(adv_x, 0, 1)  # ograniczenie zakresu pikseli

                # Forward pass na zaburzonym wejściu
                outputs = model(adv_x)
                loss = criterion(outputs, batch_y)

                optimizer.zero_grad()
                # Obliczamy gradienty; retain_graph=True bo będziemy wielokrotnie aktualizować delta
                loss.backward(retain_graph=True)

                optimizer.step()

                # Aktualizacja perturbacji delta – pobieramy gradient z delta.grad
                with torch.no_grad():
                    # Upewniamy się, że gradient nie jest None
                    if delta.grad is None:
                        raise RuntimeError("Brak gradientu dla delta!")
                    grad_sign = delta.grad.sign()
                    delta = delta + alpha * grad_sign
                    delta = torch.clamp(delta, -eps, eps)
                # Odłączamy delta i ustawiamy requires_grad ponownie
                delta = delta.detach()
                delta.requires_grad_()

                running_loss += loss.item() * batch_x.size(0)
                num_samples += batch_x.size(0)

            # Koniec iteracji mini-batcha (po m powtórzeniach)
        train_loss = running_loss / float(num_samples)

        # Ewaluacja na zbiorze testowym (naturalnych obrazach)
        model.eval()
        all_preds = []
        all_labels = []
        with torch.no_grad():
            for batch_x, batch_y in test_loader:
                batch_x = batch_x.to(device)
                batch_y = batch_y.to(device)
                outputs = model(batch_x)
                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(batch_y.cpu().numpy())
        test_acc = accuracy_score(all_labels, all_preds)
        print(f"[Epoka {epoch+1}/{num_epochs}] Loss: {train_loss:.4f} | Test Acc: {test_acc*100:.2f}%")

    print("Trening zakończony (Free Adversarial Training).")
    print("\nRaport klasyfikacji:")
    print(classification_report(all_labels, all_preds))

# Wybór urządzenia
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Urządzenie:", device)

# Inicjalizacja modelu ResNet34 (bez pretrained, z 10-klasowym wyjściem)
model_resnet34 = models.resnet34(pretrained=False)
model_resnet34.fc = nn.Linear(512, 10)

# Uruchomienie Free Adversarial Training na modelu ResNet34 przez 8 epok
train_and_evaluate_free(model_resnet34, train_loader, test_loader, device,
                          num_epochs=8, m=4, eps=2/255, alpha=2/255, lr=0.001)

