In [5]:
import torch
import torchvision
import torch.nn as nn
from tqdm import tqdm
import multiprocessing
import torch.optim as optim
import torch.nn.functional as  F
from torchvision import transforms
from torch.utils.data import Dataset
from torch.utils.data import DataLoader


####################################################################
# Set Device
####################################################################

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device: ", device)

####################################################################
# Dataset Class
####################################################################



#Agregaremos Ruido
class AddGaussianNoise:
    def _init_(self, mean=0.0, std=0.05):
        self.mean = mean
        self.std = std

    def _call_(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean

    def _repr_(self):
        return self._class.name_ + f"(mean={self.mean}, std={self.std})"

#Quitamos las tecnicas agregadas y dejamos solo el noise
da_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.RandomErasing(p=0.5, scale=(0.02, 0.33), ratio=(0.3, 3.3)),
])

da_test = transforms.Compose([
    transforms.ToTensor(),
])

class CIFAR10_dataset1(Dataset):

    def __init__(self, transform, partition = "train"):

        print("\nLoading CIFAR10 ", partition, " Dataset...")
        self.partition = partition
        self.transform = transform
        if self.partition == "train":
            self.data = torchvision.datasets.CIFAR10('.data/',
                                                     train=True,
                                                     download=True)
        else:
            self.data = torchvision.datasets.CIFAR10('.data/',
                                                     train=False,
                                                     download=True)
        print("\tTotal Len.: ", len(self.data), "\n", 50*"-")

    def _len_(self):
        return len(self.data)

    def _getitem_(self, idx):

        # Image
        image = self.data[idx][0]
        image_tensor = self.transform(image)

        # Label
        label = torch.tensor(self.data[idx][1])
        label = F.one_hot(label, num_classes=10).float()

        return {"img": image_tensor, "label": label}

train_dataset = CIFAR10_dataset1(da_train, partition="train")
test_dataset = CIFAR10_dataset1(da_test, partition="test")

####################################################################
# DataLoader Class
####################################################################

batch_size = 512
num_workers = multiprocessing.cpu_count()-1
print("Num workers", num_workers)
train_dataloader = DataLoader(train_dataset, batch_size, shuffle=True, num_workers=num_workers)
test_dataloader = DataLoader(test_dataset, batch_size, shuffle=False, num_workers=num_workers)

####################################################################
# Neural Network Class
####################################################################

# Utilizamos ResnetBlock ahora
class ResNetBlock(nn.Module):
    def _init_(self, in_channels, out_channels, stride=1):
        super(ResNetBlock, self)._init_()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.conv_shortcut = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, padding=0)

        self.relu = nn.ReLU()

    def forward(self, x):
        shortcut = self.conv_shortcut(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out += shortcut
        out = self.relu(out)

        return out

class ResNetCNN(nn.Module):
    def _init_(self, num_classes=10):
        super(ResNetCNN, self)._init_()

        self.resnet_layers = nn.Sequential(
            ResNetBlock(3, 32, stride=1),
            ResNetBlock(32, 32, stride=1),
            ResNetBlock(32, 64, stride=2),
            ResNetBlock(64, 64, stride=1),
            ResNetBlock(64, 128, stride=2),
            ResNetBlock(128, 128, stride=1),
            ResNetBlock(128, 256, stride=2),
            ResNetBlock(256, 256, stride=1),
            ResNetBlock(256, 512, stride=2),
            ResNetBlock(512, 512, stride=1)
        )

        # Calcula dinámicamente el tamaño de la salida
        with torch.no_grad():
            dummy_input = torch.zeros(1, 3, 32, 32)  # Entrada ficticia (1, 3, 32, 32)
            dummy_output = self.resnet_layers(dummy_input)
            flattened_size = dummy_output.view(1, -1).shape[1]

        self.fc1 = nn.Linear(flattened_size, 512)
        self.fc2 = nn.Linear(512, num_classes)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.6)

    def forward(self, x):
        x = self.resnet_layers(x)
        x = torch.flatten(x, start_dim=1)
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x


# Instantiating the network and printing its architecture
num_classes = 10
net = ResNetCNN(
    num_classes
    )
print(net)

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)
print("Params: ", count_parameters(net))

####################################################################
# Training settings
####################################################################

# Training hyperparameters
epochs = 200
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(net.parameters(), lr=0.01, weight_decay=1e-4)
lr_scheduler = optim.lr_scheduler.OneCycleLR(
    optimizer, max_lr=0.01, steps_per_epoch=len(train_dataloader), epochs=epochs
)



####################################################################
# Training
####################################################################


#Agregamos Early Stopping
class EarlyStopping:
    def _init_(self, patience=10, min_delta=0.0):
        self.patience = patience
        self.min_delta = min_delta
        self.best_score = None
        self.counter = 0
        self.early_stop = False

    def _call_(self, val_loss):
        if self.best_score is None:
            self.best_score = val_loss
        elif val_loss > self.best_score - self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = val_loss
            self.counter = 0


# Load model in GPU
net.to(device)

print("\n---- Start Training ----")
early_stopping = EarlyStopping(patience=30, min_delta=0.001)
best_accuracy = -1
best_epoch = 0
for epoch in range(epochs):


    # TRAIN NETWORK
    train_loss, train_correct = 0, 0
    net.train()
    with tqdm(iter(train_dataloader), desc="Epoch " + str(epoch), unit="batch") as tepoch:
        for batch in tepoch:

            # Returned values of Dataset Class
            images = batch["img"].to(device)
            labels = batch["label"].to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # Forward
            outputs = net(images)
            loss = criterion(outputs, labels)

            # Calculate gradients
            loss.backward()

            # Update gradients
            optimizer.step()

            # one hot -> labels
            labels = torch.argmax(labels, dim=1)
            pred = torch.argmax(outputs, dim=1)
            train_correct += pred.eq(labels).sum().item()

            # print statistics
            train_loss += loss.item()

    train_loss /= (len(train_dataloader.dataset) / batch_size)

    # TEST NETWORK
    test_loss, test_correct = 0, 0
    net.eval()
    with torch.no_grad():
      with tqdm(iter(test_dataloader), desc="Test " + str(epoch), unit="batch") as tepoch:
          for batch in tepoch:

            images = batch["img"].to(device)
            labels = batch["label"].to(device)

            # Forward
            outputs = net(images)
            test_loss += criterion(outputs, labels)

            # one hot -> labels
            labels = torch.argmax(labels, dim=1)
            pred = torch.argmax(outputs, dim=1)

            test_correct += pred.eq(labels).sum().item()

    lr_scheduler.step(test_loss)

    test_loss /= (len(test_dataloader.dataset) / batch_size)
    test_accuracy = 100. * test_correct / len(test_dataloader.dataset)

    print("[Epoch {}] Train Loss: {:.6f} - Test Loss: {:.6f} - Train Accuracy: {:.2f}% - Test Accuracy: {:.2f}%".format(
        epoch + 1, train_loss, test_loss, 100. * train_correct / len(train_dataloader.dataset), test_accuracy
    ))

    if test_accuracy > best_accuracy:
        best_accuracy = test_accuracy
        best_epoch = epoch
        # Save best weights
        torch.save(net.state_dict(), "best_model.pt")


print(f"\nBEST TEST ACCURACY: {best_accuracy} in epoch  {best_epoch}")

Device:  cuda

Loading CIFAR10  train  Dataset...
Files already downloaded and verified
	Total Len.:  50000 
 --------------------------------------------------

Loading CIFAR10  test  Dataset...
Files already downloaded and verified
	Total Len.:  10000 
 --------------------------------------------------
Num workers 11


TypeError: object of type 'CIFAR10_dataset1' has no len()