In [1]:
from __future__ import print_function
import os
import argparse
import torch, gc
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import time
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
# from autoattack import AutoAttack
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from torch.autograd import Variable

In [2]:
%pip install import-ipynb

Collecting import-ipynb
  Downloading import_ipynb-0.2-py3-none-any.whl.metadata (2.3 kB)
Collecting jedi>=0.16 (from IPython->import-ipynb)
  Downloading jedi-0.19.2-py2.py3-none-any.whl.metadata (22 kB)
Downloading import_ipynb-0.2-py3-none-any.whl (4.0 kB)
Downloading jedi-0.19.2-py2.py3-none-any.whl (1.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m29.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: jedi, import-ipynb
Successfully installed import-ipynb-0.2 jedi-0.19.2


In [3]:
import import_ipynb

from vgg import *
from d_trades import *

[NbConvertApp] Converting notebook vgg.ipynb to script
[NbConvertApp] Writing 2345 bytes to vgg.py
This application is used to convert notebook files (*.ipynb)
        to various other formats.


Options
The options below are convenience aliases to configurable class-options,
as listed in the "Equivalent to" description-line of the aliases.
To see all configurable class-options for some <cmd>, use:
    <cmd> --help-all

--debug
    set log level to logging.DEBUG (maximize logging output)
    Equivalent to: [--Application.log_level=10]
--show-config
    Show the application's configuration (human-readable format)
    Equivalent to: [--Application.show_config=True]
--show-config-json
    Show the application's configuration (json format)
    Equivalent to: [--Application.show_config_json=True]
--generate-config
    generate default config file
    Equivalent to: [--JupyterApp.generate_config=True]
-y
    Answer yes to any questions instead of prompting.
    Equivalent to: [--JupyterApp.a

In [4]:
parser = argparse.ArgumentParser(description='PyTorch MNIST TRADES Adversarial Training')
parser.add_argument('--batch-size', type=int, default=128, metavar='N',
                    help='input batch size for training (default: 128)')
parser.add_argument('--test-batch-size', type=int, default=128, metavar='N',
                    help='input batch size for testing (default: 128)')
parser.add_argument('--epochs', type=int, default=50, metavar='N',
                    help='number of epochs to train')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
                    help='learning rate')
parser.add_argument('--momentum', type=float, default=0.9, metavar='M',
                    help='SGD momentum')
parser.add_argument('--no-cuda', action='store_true', default=False,
                    help='disables CUDA training')
parser.add_argument('--epsilon', default=0.3,
                    help='perturbation')
parser.add_argument('--num-steps', default=20,
                    help='perturb number of steps')
parser.add_argument('--step-size', default=0.01,
                    help='perturb step size')
parser.add_argument('--alpha', default=1.0,
                    help='regularization, i.e., 1/lambda in TRADES')
parser.add_argument('--beta', default=1.0,
                    help='regularization, i.e., 1/lambda in TRADES')
parser.add_argument('--seed', type=int, default=1, metavar='S',
                    help='random seed (default: 1)')
parser.add_argument('--log-interval', type=int, default=100, metavar='N',
                    help='how many batches to wait before logging training status')
parser.add_argument('--model-dir', default='./model-trades-mnist-resnet18',
                    help='directory of model for saving checkpoint')
parser.add_argument('--save-freq', '-s', default=3, type=int, metavar='N',
                    help='save frequency')
args, unknown = parser.parse_known_args()


In [5]:
model_dir = args.model_dir
if not os.path.exists(model_dir):
    os.makedirs(model_dir)
use_cuda = not args.no_cuda and torch.cuda.is_available()
torch.manual_seed(args.seed)
device = torch.device("cuda" if use_cuda else "cpu")
kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}

In [6]:
# setup data loader
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=True, download=True,
                   transform=transforms.ToTensor()),
    batch_size=args.batch_size, shuffle=True, **kwargs)

test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=False,
                   transform=transforms.ToTensor()),
                   batch_size=args.test_batch_size, shuffle=False, **kwargs)

100%|██████████| 9.91M/9.91M [00:00<00:00, 20.2MB/s]
100%|██████████| 28.9k/28.9k [00:00<00:00, 476kB/s]
100%|██████████| 1.65M/1.65M [00:00<00:00, 4.46MB/s]
100%|██████████| 4.54k/4.54k [00:00<00:00, 9.09MB/s]


In [None]:
#transform = transforms.ToTensor()
#train_dataset = datasets.MNIST('../data', train=True, download=True, transform=transform)

In [None]:
#k_folds = 5
#kfold = KFold(n_splits=k_folds, shuffle=True)

In [7]:
lambda_min = []
lambda_max = []
lambda_mean = []
current_epoch = []
def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)

        optimizer.zero_grad()

        # calculate robust loss
        # MODIFIED: d_trades_loss now returns both loss and lambda_value_calculated
        loss, lambda_value = d_trades_loss(model=model,
                                                 x_natural=data,
                                                 y=target,
                                                 optimizer=optimizer,
                                                 step_size=args.step_size,
                                                 epsilon=args.epsilon,
                                                 perturb_steps=args.num_steps,
                                                 alpha=args.alpha,
                                                 beta=args.beta)

        loss.backward()
        optimizer.step()

        # print progress
        if batch_idx % args.log_interval == 0:
            # MODIFIED: Use lambda for logging
            lambda_min.append(lambda_value.min().item())
            lambda_max.append(lambda_value.max().item())
            lambda_mean.append(lambda_value.mean().item())
            current_epoch.append(epoch + batch_idx)
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))


In [8]:
def _pgd_whitebox(model,
                  X,
                  y,
                  epsilon=args.epsilon,
                  num_steps=20,
                  step_size=0.003):
    out = model(X)
    err = (out.data.max(1)[1] != y.data).float().sum()
    X_pgd = Variable(X.data, requires_grad=True)

    random_noise = torch.FloatTensor(*X_pgd.shape).uniform_(-epsilon, epsilon).to(device)
    X_pgd = Variable(X_pgd.data + random_noise, requires_grad=True)

    for _ in range(num_steps):
        opt = optim.SGD([X_pgd], lr=1e-3)
        opt.zero_grad()

        with torch.enable_grad():
            loss = nn.CrossEntropyLoss()(model(X_pgd), y)
        loss.backward()
        eta = step_size * X_pgd.grad.data.sign()
        X_pgd = Variable(X_pgd.data + eta, requires_grad=True)
        eta = torch.clamp(X_pgd.data - X.data, -epsilon, epsilon)
        X_pgd = Variable(X.data + eta, requires_grad=True)
        X_pgd = Variable(torch.clamp(X_pgd, 0, 1.0), requires_grad=True)
    err_pgd = (model(X_pgd).data.max(1)[1] != y.data).float().sum()
    return err, err_pgd

In [9]:
def eval_adv_test_whitebox(model, device, test_loader):

    model.eval()
    robust_err_total = 0
    natural_err_total = 0

    for data, target in test_loader:
        data, target = data.to(device), target.to(device)
        # pgd attack
        X, y = Variable(data, requires_grad=True), Variable(target)
        err_natural, err_robust = _pgd_whitebox(model, X, y)
        robust_err_total += err_robust
        natural_err_total += err_natural

    natural_acc = 1 - natural_err_total / len(test_loader.dataset)
    robust_acc = 1- robust_err_total / len(test_loader.dataset)
    robust_drop = natural_acc - robust_acc
    attack_success_rate = 1 - robust_acc


    #print('natural_acc: ', 1 - natural_err_total / len(test_loader.dataset))
    #print('robust_acc: ', 1- robust_err_total / len(test_loader.dataset))
    print(f'PGD natural_acc: {natural_acc:.4f}, robust_acc: {robust_acc:.4f}, robust_drop: {robust_drop:4f}, attack_success_rate: {attack_success_rate:4f}')
    return natural_acc, robust_acc, robust_drop, attack_success_rate

In [10]:
def adjust_learning_rate(optimizer, epoch):
    """decrease the learning rate"""
    lr = args.lr
    if epoch >= 55:
        lr = args.lr * 0.1
    if epoch >= 75:
        lr = args.lr * 0.01
    if epoch >= 90:
        lr = args.lr * 0.001
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

In [11]:
gc.collect()
torch.cuda.empty_cache()

In [12]:
def main():
    #init model, Net() can be also used here for training
    # model = ResNet18()
    # model.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False)
    model = vgg16(in_channels=1)
    model = model.to(device)

    optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)

    history = {'natural_acc': [], 'robust_acc': [], 'robust_drop': [], 'attack_succes_rate': []}
    results = {}

    for epoch in range(1, args.epochs + 1):
        # adjust learning rate for SGD
        adjust_learning_rate(optimizer, epoch)

        start_time = time.time()

        # adversarial training
        train(args, model, device, train_loader, optimizer, epoch)


        print('================================================================')
        #evaluation
        natural_acc, robust_acc, robust_drop, attack_succes_rate = eval_adv_test_whitebox(model, device, test_loader)
        print('using time:', time.time()-start_time)
        print('================================================================')

        history['natural_acc'].append(natural_acc)
        history['robust_acc'].append(robust_acc)
        history['robust_drop'].append(robust_drop)
        history['attack_succes_rate'].append(attack_succes_rate)

        results[f'history{epoch}'] = history

        # save checkpoint
        if epoch % args.save_freq == 0:
            torch.save(model.state_dict(),
                       os.path.join(model_dir, 'model-nn-epoch{}.pt'.format(epoch)))
            torch.save(optimizer.state_dict(),
                       os.path.join(model_dir, 'opt-nn-checkpoint_epoch{}.tar'.format(epoch)))

    print('================================================================')
    print("\nResumen de Resultados por historial:")
    for epochs, history in results.items():
        print(f"{epochs}: Max Robust Acc: {max(history['robust_acc']):.4f}")

if __name__ == '__main__':
    main()

PGD natural_acc: 0.9875, robust_acc: 0.9597, robust_drop: 0.027800, attack_success_rate: 0.040300
using time: 1115.489494562149
PGD natural_acc: 0.9901, robust_acc: 0.9689, robust_drop: 0.021200, attack_success_rate: 0.031100
using time: 1116.6608264446259
PGD natural_acc: 0.9930, robust_acc: 0.9690, robust_drop: 0.024000, attack_success_rate: 0.031000
using time: 1119.6472692489624
PGD natural_acc: 0.9935, robust_acc: 0.9685, robust_drop: 0.025000, attack_success_rate: 0.031500
using time: 1118.159054517746
PGD natural_acc: 0.9926, robust_acc: 0.9730, robust_drop: 0.019600, attack_success_rate: 0.027000
using time: 1116.8133642673492
PGD natural_acc: 0.9931, robust_acc: 0.9734, robust_drop: 0.019700, attack_success_rate: 0.026600
using time: 1115.3598275184631


KeyboardInterrupt: 

In [13]:
# 1. Instancia el modelo con la arquitectura EXACTA que usaste (ej. ResNet18 o VGG)
# Si usaste VGG16 con 1 canal, asegúrate de que esté correctamente definida.
model = vgg16(in_channels=1).to(device)

# 2. Define la ruta al archivo
# Debes reemplazar 'epoch_num' con el número de época que quieres cargar (ej., 1 o 50)
MODEL_PATH = os.path.join(model_dir, 'model-nn-epoch5.pt'.format(1))

# 3. Carga los pesos en el modelo
# 'map_location' es útil si entrenaste en GPU y ahora cargas en CPU, o viceversa.
model.load_state_dict(torch.load(MODEL_PATH, map_location=device))

# 4. Pon el modelo en modo de evaluación si vas a probar ataques
model.eval()

print(f"Modelo cargado y listo para evaluación desde: {MODEL_PATH}")

Modelo cargado y listo para evaluación desde: ./model-trades-mnist-resnet18/model-nn-epoch5.pt


In [14]:
# 1. Asegúrate de que el modelo ya esté cargado (ver paso 1)
# y que el optimizador esté inicializado con los parámetros del modelo cargado.
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)

# 2. Define la ruta al archivo
# Reemplaza 'epoch_num' con el número de época
OPTIMIZER_PATH = os.path.join(model_dir, 'opt-nn-checkpoint_epoch5.tar'.format(1))

# 3. Carga el estado en el optimizador
optimizer.load_state_dict(torch.load(OPTIMIZER_PATH, map_location=device))

print(f"Estado del optimizador cargado desde: {OPTIMIZER_PATH}")

Estado del optimizador cargado desde: ./model-trades-mnist-resnet18/opt-nn-checkpoint_epoch5.tar


In [None]:
model = vgg16(in_channels=1)
model = model.to(device)

checkpoint_path = "model-trades-mnist-vgg16/model-nn-epoch1.pt"
model.load_state_dict(torch.load(checkpoint_path, map_location=device))

optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
optimizer.load_state_dict(torch.load("model-trades-mnist-vgg16/opt-nn-checkpoint_epoch1.tar"))

print("Modelo y optimizador cargados correctamente ✅")

In [None]:
def main():
    #init model, Net() can be also used here for training
    #model = ResNet18()
    #model.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False)

    model = vgg16(in_channels=1)
    model = model.to(device)

    optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)

    history = {'natural_acc': [], 'robust_acc': [], 'robust_drop': [], 'attack_succes_rate': []}
    results = {}

    for epoch in range(5, args.epochs + 1):
        # adjust learning rate for SGD
        adjust_learning_rate(optimizer, epoch)

        start_time = time.time()

        # adversarial training
        train(args, model, device, train_loader, optimizer, epoch)


        print('================================================================')
        #evaluation
        natural_acc, robust_acc, robust_drop, attack_succes_rate = eval_adv_test_whitebox(model, device, test_loader)
        print('using time:', time.time()-start_time)
        print('================================================================')

        history['natural_acc'].append(natural_acc)
        history['robust_acc'].append(robust_acc)
        history['robust_drop'].append(robust_drop)
        history['attack_succes_rate'].append(attack_succes_rate)

        results[f'history{epoch}'] = history

        # save checkpoint
        if epoch % args.save_freq == 0:
            torch.save(model.state_dict(),
                       os.path.join(model_dir, 'model-nn-epoch{}.pt'.format(epoch)))
            torch.save(optimizer.state_dict(),
                       os.path.join(model_dir, 'opt-nn-checkpoint_epoch{}.tar'.format(epoch)))

    print('================================================================')
    print("\nResumen de Resultados por historial:")
    for epochs, history in results.items():
        print(f"{epochs}: Max Robust Acc: {max(history['robust_acc']):.4f}")

if __name__ == '__main__':
    main()

PGD natural_acc: 0.9902, robust_acc: 0.9652, robust_drop: 0.025000, attack_success_rate: 0.034800
using time: 1113.5546910762787
PGD natural_acc: 0.9909, robust_acc: 0.9691, robust_drop: 0.021800, attack_success_rate: 0.030900
using time: 1118.1482045650482
PGD natural_acc: 0.9917, robust_acc: 0.9642, robust_drop: 0.027500, attack_success_rate: 0.035800
using time: 1119.8488037586212
PGD natural_acc: 0.9923, robust_acc: 0.9456, robust_drop: 0.046700, attack_success_rate: 0.054400
using time: 1118.4280734062195
PGD natural_acc: 0.9922, robust_acc: 0.9642, robust_drop: 0.028000, attack_success_rate: 0.035800
using time: 1116.6252443790436
PGD natural_acc: 0.9933, robust_acc: 0.9730, robust_drop: 0.020300, attack_success_rate: 0.027000
using time: 1117.8241181373596


In [None]:
def eval_autoattack_direct(model, device, test_loader, eps=8/255, bs=128, version='standard'):
    """
    Evalúa modelo con AutoAttack (modelo debe aceptar imágenes en [0,1] y devolver logits).
    Devuelve: natural_acc (float), robust_acc (float)
    """
    model.to(device)
    model.eval()

    # 1) Recolectar todo el test set en numpy (AutoAttack espera arrays numpy)
    xs = []
    ys = []
    for xb, yb in test_loader:
        xs.append(xb)        # ¡NO .cpu().numpy()!
        ys.append(yb)
    x_all_tensor = torch.cat(xs, dim=0)    # tensor en CPU
    y_all_tensor = torch.cat(ys, dim=0)

    # 2) natural accuracy (compute with torch in device, batched to avoid overflow)
    correct = 0
    total = 0
    with torch.no_grad():
        for xb, yb in test_loader:
            xb = xb.to(device)
            yb = yb.to(device)
            logits = model(xb)
            pred = logits.argmax(dim=1)
            correct += (pred == yb).sum().item()
            total += yb.size(0)
    natural_acc = correct / total

    # 3) AutoAttack (white-box)
    # AutoAttack expects the forward to receive inputs in [0,1] and return logits (numpy)
    adversary = AutoAttack(model, norm='Linf', eps=eps, version=version, verbose=False)

    # run_standard_evaluation imprimirá por pantalla y retorna el robust accuracy (dependiendo de versión puede ser float o tuple)
    aa_out = adversary.run_standard_evaluation(x_all_tensor, y_all_tensor, bs=bs)

    # aa_out suele devolver robust accuracy (float). Manejo defensivo:
    if isinstance(aa_out, tuple):
        robust_acc = aa_out[0]
    else:
        robust_acc = aa_out

    print(f'AutoAttack (eps={eps}) -> natural_acc: {natural_acc:.4f}, robust_acc: {robust_acc:.4f}')
    return float(natural_acc), float(robust_acc)

In [None]:
max_epoch = int(np.ceil(epoch_np.max()))

plt.figure(figsize=(12, 6))
plt.plot(current_epoch, lambda_mean, label='Lambda Mean', color='royalblue', marker='o')
plt.plot(current_epoch, lambda_min,  label='Lambda Min',  color='seagreen', linestyle='--', marker='s')
plt.plot(current_epoch, lambda_max,  label='Lambda Max',  color='crimson', linestyle='--', marker='^')

# Rango del eje X y ticks coherentes con tus datos
plt.xlim(1, max_epoch)
plt.xticks(np.arange(1, max_epoch + 1, 1))

# Formato del gráfico
plt.title("Evolución del parámetro dinámico λ(x) durante el entrenamiento")
plt.xlabel("Época (progreso del entrenamiento)")
plt.ylabel("Valor de λ(x)")
plt.grid(True, linestyle='--', alpha=0.4)
plt.legend()
plt.tight_layout()

# Mostrar gráfico
plt.show()

In [None]:
"""
def main():
    results = {}
    best_val_acc = 0
    best_model_state = None

    for fold, (train_idx, val_idx) in enumerate(kfold.split(train_dataset)):
        current_fold = fold + 1
        print(f'Fold {fold+1}')

        train_subset = Subset(train_dataset, train_idx)
        val_subset = Subset(train_dataset, val_idx)

        train_loader = DataLoader(train_subset, batch_size=args.batch_size, shuffle=True)
        val_loader = DataLoader(val_subset, batch_size=args.batch_size, shuffle=False)

        #model = ResNet18()
        #model.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False)
        model = vgg16(in_channels=1)

        model = model.to(device)
        optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
        fold_history = {'natural_acc': [], 'robust_acc': [], 'robust_drop': [], 'attack_succes_rate': []}

        for epoch in range(1, args.epochs + 1):
            # adjust learning rate for SGD
            adjust_learning_rate(optimizer, epoch)

            start_time = time.time()

            # adversarial training
            train(args, model, device, train_loader, optimizer, epoch)

            print('================================================================')
            #evaluation
            nat_acc, robust_acc, robust_drop, attack_succes_rate = eval_adv_test_whitebox(model, device, val_loader)
            print('using time:', time.time()-start_time)
            print('================================================================')

            #Guardado de fold
            fold_history['natural_acc'].append(nat_acc)
            fold_history['robust_acc'].append(robust_acc)
            fold_history['robust_drop'].append(nat_acc)
            fold_history['attack_succes_rate'].append(robust_acc)

            if robust_acc > best_val_acc:
                best_val_acc = robust_acc
                best_model_state = model.state_dict()
                print(f"Mejor modelo guardado en Fold {current_fold}, Epoch {epoch} con Robust Acc: {best_val_acc:.4f}")

        results[f'fold_{current_fold}'] = fold_history

        torch.save(best_model_state, os.path.join(model_dir, 'model-nn-epoch{}.pt'.format(epoch)))
        torch.save(optimizer.state_dict(), os.path.join(model_dir, 'opt-res-checkpoint_epoch{}.tar'.format(epoch)))
        print(f"Pesos del mejor modelo robusto guardados. Mejor Acc: {best_val_acc:.4f}")


    #if best_model_state:
        #final_best_model = ResNet18()
        #final_best_model.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False)

    #    final_best_model = vgg16(in_channels = 1)

    #    final_best_model.load_state_dict(best_model_state)
    #    torch.save(final_best_model.state_dict(), os.path.join(model_dir, 'model-nn-epoch{}.pt'.format(epoch)))
    #    print(f"Pesos del mejor modelo robusto guardados. Mejor Acc: {best_val_acc:.4f}")

    print("\nResumen de Resultados por Fold:")
    for fold_name, history in results.items():
        print(f"{fold_name}: Max Robust Acc: {max(history['robust_acc']):.4f}")

        # save checkpoint
        #if epoch % args.save_freq == 0:
        #    torch.save(model.state_dict(),
        #               os.path.join(model_dir, 'model-nn-epoch{}.pt'.format(epoch)))
        #    torch.save(optimizer.state_dict(),
        #               os.path.join(model_dir, 'opt-nn-checkpoint_epoch{}.tar'.format(epoch)))

if __name__ == '__main__':
    main()
"""