<a href="https://colab.research.google.com/github/TigaJi/InverseRLviaMentorNet/blob/main/ResNet_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader

In [3]:
import numpy as np
import torchvision.datasets as datasets

# Load the CIFAR-10 training dataset
cifar10_train = datasets.CIFAR10(root='./data', train=True, download=True)

# Stack all the images into a single large numpy array
images = np.stack([np.array(image) for image, _ in cifar10_train])

# Calculate the mean and standard deviation for each channel
mean = images.mean(axis=(0, 1, 2)) / 255
std = images.std(axis=(0, 1, 2)) / 255

print("Mean:", mean)
print("Standard Deviation:", std)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:02<00:00, 77946350.49it/s]


Extracting ./data/cifar-10-python.tar.gz to ./data


KeyboardInterrupt: ignored

In [4]:
from torchvision import transforms

transform = transforms.Compose([
    # you can add other transformations in this list
    transforms.ToTensor()
])


trainset = CIFAR10(root='./data', train=True, download=True,  transform=transform)
testset = CIFAR10(root='./data', train=False, download=True, transform=transform)

Files already downloaded and verified
Files already downloaded and verified


In [5]:

train_loader = DataLoader(trainset, batch_size=128, shuffle=True)
test_loader = DataLoader(testset, batch_size=128, shuffle=False)

In [6]:
import torch.nn as nn
from torch.optim import SGD
from torch.optim.lr_scheduler import CosineAnnealingLR
from torchsummary import summary

class ModifiedResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ModifiedResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = self.relu(out)
        return out


class ModifiedResNet(nn.Module): ## need some initialization control
    def __init__(self, block, num_blocks, num_classes=10):
        super(ModifiedResNet, self).__init__()
        self.in_channels = 32 ##change
        self.conv1 = nn.Conv2d(3, 32, kernel_size=5, stride=1, padding=2, bias=False)##change
        self.bn1 = nn.BatchNorm2d(32)##change
        self.relu = nn.ReLU(inplace=True)

        ##change
        self.layer1 = self._make_layer(block, 32, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 64, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 128, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 256, num_blocks[3], stride=2)
        #self.layer5 = self._make_layer(block, 256, num_blocks[4], stride=2)
        self.gap = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(256, num_classes) #a fully connected layer

        #ensure that the neural network is initialized properly before training
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        #out = self.layer5(out)
        out = self.gap(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out

In [36]:
def ModifiedResNet18():
    #return ModifiedResNet(ModifiedResidualBlock, [2, 2, 2, 2, 3])
    return ModifiedResNet(ModifiedResidualBlock, [2, 2, 2, 2]) 
    ## this could be changed

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = ModifiedResNet18().to(device)

# Get the summary
input_shape = (3, 32, 32)  # (channels, height, width)
summary(model, input_shape, device=device)

##change
#optimizer = SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-4)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
##could change the scheduler
scheduler = CosineAnnealingLR(optimizer, T_max=200, eta_min=0)
#scheduler=torch.optim.lr_scheduler.MultiStepLR(optimizer,milestones=[15,25,30],gamma=0.1)

In [32]:
def add_noise(x,n,p):
    mask = torch.rand_like(x.to(torch.float),device=x.device) < p
    return torch.where(mask, torch.randint(low=0, high=n, size=x.shape, dtype=torch.long,device=x.device), x)
  

In [41]:
num_epochs = 20
best_accuracy = 0.0  # Initialize the best accuracy variable
save_path = 'best_model.pth'  # Specify the path where the best model will be saved

train_losses = []
test_accuracies = []
learning_rates = []


for epoch in range(num_epochs):
    model.train()
    for i, (images, labels) in enumerate(train_loader):
      images, labels = images.to(device), labels.to(device)

      #add noise
      labels = add_noise(labels,10,0.8)
      optimizer.zero_grad()
      outputs = model(images)
      loss = nn.CrossEntropyLoss()(outputs, labels)
      loss.backward()
      optimizer.step()
      scheduler.step()

    # Test accuracy evaluation
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = correct / total
    print(f'Epoch [{epoch+1}/{num_epochs}], Accuracy: {accuracy:.4f}')
    # Save the best model
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        torch.save(model.state_dict(), save_path)
    # Append training loss after each batch
    train_losses.append(loss.item())

    # Append test accuracy after each epoch
    test_accuracies.append(accuracy)

    # Append learning rate after each epoch
    learning_rates.append(scheduler.get_last_lr()[0])

Epoch [1/20], Accuracy: 0.1544
Epoch [2/20], Accuracy: 0.1956
Epoch [3/20], Accuracy: 0.1978
Epoch [4/20], Accuracy: 0.2527
Epoch [5/20], Accuracy: 0.2007
Epoch [6/20], Accuracy: 0.2931
Epoch [7/20], Accuracy: 0.2969
Epoch [8/20], Accuracy: 0.2865
Epoch [9/20], Accuracy: 0.3160
Epoch [10/20], Accuracy: 0.3702
Epoch [11/20], Accuracy: 0.3070
Epoch [12/20], Accuracy: 0.3584
Epoch [13/20], Accuracy: 0.4175
Epoch [14/20], Accuracy: 0.4146
Epoch [15/20], Accuracy: 0.4165
Epoch [16/20], Accuracy: 0.4298
Epoch [17/20], Accuracy: 0.4347
Epoch [18/20], Accuracy: 0.4407
Epoch [19/20], Accuracy: 0.4408
Epoch [20/20], Accuracy: 0.4381
