In [1]:
import torch
import torchvision
import torchvision.transforms as transforms

In [2]:
# Define transformations for data augmentation and normalization
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

In [3]:
# Load CIFAR-10 dataset
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=256, shuffle=True, num_workers=2, pin_memory=True)

test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=False, transform=transform_test)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=512, shuffle=False, num_workers=2, pin_memory=True)


In [4]:
from torch import nn
import torch.optim as optim

In [5]:
# Set device (GPU if available, else CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
learningRate = 0.01

In [6]:
class BasicBlock(nn.Module):
    def __init__(self,in_features=64,out_features=64,stride=[1,1],down_sample=False):
        # stride : list 
        # the value at corresponding indices are the strides of corresponding layers in a residual block
        
        super(BasicBlock,self).__init__()

        self.conv1 = nn.Conv2d(in_features,out_features,3,stride[0],padding=1,bias=False) #weight layer
        self.bn1 = nn.BatchNorm2d(out_features) #weight layer
        
        self.relu = nn.ReLU(True) #relu
        
        self.conv2 = nn.Conv2d(out_features,out_features,3,stride[1],padding=1,bias=False) #weight layer
        self.bn2 = nn.BatchNorm2d(out_features) #weight layer

        self.down_sample = down_sample
        if down_sample:
            self.downsample = nn.Sequential(
                    nn.Conv2d(in_features,out_features,1,2,bias=False),
                    nn.BatchNorm2d(out_features)
                )
    
    def forward(self,x):
        x0=x.clone()
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)

        if self.down_sample:
            x0 = self.downsample(x0)  
        x = x + x0    # F(x)+x
        x= self.relu(x)
        return x

In [7]:
class ResNet(nn.Module):

    def __init__(self,in_channels=3,num_residual_block=[3,4,6,3],num_class=1000,block_type='normal'):
        super(ResNet,self).__init__()

        self.conv1 = nn.Conv2d(in_channels,64,7,2,3,bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(True)
        self.maxpool = nn.MaxPool2d(3,2,1)

        # if block_type.lower() == 'bottleneck':    
        #     self.resnet,outchannels = self.__bottlenecks(num_residual_block)
        # else:
        self.resnet,outchannels = self.set_layers(num_residual_block)
    
        #extra layer for 19
        self.conv2 = nn.Conv2d(512, 512, 3, 1, 1)
        self.bn2 = nn.BatchNorm2d(512)
        self.relu2 = nn.ReLU(True)

        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(in_features=outchannels,out_features=num_class,bias=True)

        
    def forward(self,x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.resnet(x)
        #print("Before Last layer: ",x.shape)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)
        #print("After Last layer: ",x.shape)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x 
    
    def set_layers(self,num_residual_block):
        layer=[]
        layer += [BasicBlock()]*num_residual_block[0]
        inchannels=64
        for numOFlayers in num_residual_block[1:]:
            stride = [2,1] #updating the stride, the first layer of residual block
            # will have a stride of two and the 2nd layer of the residual block have 
            # a stride of 1
            downsample=True
            outchannels = inchannels*2
            for _ in range(numOFlayers):
                layer.append(BasicBlock(inchannels,outchannels,stride,down_sample=downsample))
                inchannels = outchannels
                downsample = False 
                stride=[1,1]
            
        return nn.Sequential(*layer),outchannels

In [8]:
def  resnet18(**kwargs):
    return ResNet(num_residual_block=[2,2,2,2],**kwargs)

In [9]:
model18 = resnet18()
model18.to(device)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (resnet): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [10]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model18.parameters(), lr=learningRate, momentum=0.9)

In [11]:
epoch_Loss = 0

def train(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if i % 50 == 49:  # Prints every 50 mini-batches
            print(f'Batch {i + 1}, Loss: {running_loss / 50:.3f}')
            running_loss = 0.0

    epoch_Loss = running_loss / 256
    print(f'TOTAL EPOCH LOSS: {epoch_Loss}')
    return epoch_Loss

In [12]:
test_accuracy = 0

def test(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = correct / total
    test_accuracy = accuracy
    print(f'Accuracy on the test set: {100 * accuracy:.2f}%')
    return test_accuracy

In [13]:
import wandb

wandb.login()

wandb.init(
# set the wandb project where this run will be logged
project="ResNet19-CIFAR10-test",

# track hyperparameters and run metadata
config={
"learning_rate": 0.01,
"architecture": "RESNET18 +1 layer",
"dataset": "CIFAR-10",
"epochs": 30,
}
)

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33mmarcospagnoletti-ms[0m. Use [1m`wandb login --relogin`[0m to force relogin


In [14]:
num_epochs = 30

current_epoch = 0

print("USING DEVICE: ",device)

# initialize Step LR
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)

# Training the model
for epoch in range(num_epochs):
    current_epoch += 1
    print("")
    print("Epoch: ",current_epoch)
    #train_loop(tr_loader, model18, criterion, optimizer)
    

    Loss = train(model18, train_loader, criterion, optimizer, device)
    Acc = test(model18, test_loader, device)
    scheduler.step()
    LearnRate = scheduler.get_last_lr()[0]
    print("LR: ",LearnRate)
    wandb.log({"Accuracy":Acc, "Loss": Loss, "LearningRate":LearnRate})
    

print('Finished Training')
wandb.finish()

# Testing the model
test(model18, test_loader, device)

USING DEVICE:  cuda

Epoch:  1
Batch 50, Loss: 2.637
Batch 100, Loss: 1.596
Batch 150, Loss: 1.471
TOTAL EPOCH LOSS: 0.2513175210915506
Accuracy on the test set: 50.14%
LR:  0.01

Epoch:  2
Batch 50, Loss: 1.312
Batch 100, Loss: 1.252
Batch 150, Loss: 1.198
TOTAL EPOCH LOSS: 0.21063700481317937
Accuracy on the test set: 56.03%
LR:  0.01

Epoch:  3
Batch 50, Loss: 1.101
Batch 100, Loss: 1.071
Batch 150, Loss: 1.050
TOTAL EPOCH LOSS: 0.17976200417615473
Accuracy on the test set: 65.29%
LR:  0.01

Epoch:  4
Batch 50, Loss: 0.996
Batch 100, Loss: 0.953
Batch 150, Loss: 0.933
TOTAL EPOCH LOSS: 0.16598937311209738
Accuracy on the test set: 67.95%
LR:  0.01

Epoch:  5
Batch 50, Loss: 0.875
Batch 100, Loss: 0.873
Batch 150, Loss: 0.876
TOTAL EPOCH LOSS: 0.15301166661083698
Accuracy on the test set: 69.26%
LR:  0.01

Epoch:  6
Batch 50, Loss: 0.833
Batch 100, Loss: 0.811
Batch 150, Loss: 0.814
TOTAL EPOCH LOSS: 0.14166647335514426
Accuracy on the test set: 70.24%
LR:  0.01

Epoch:  7
Batch 50, 

0,1
Accuracy,▁▂▄▅▅▅▆▆▆▇▇▇▇▇▇▇█▇▇███████████
LearningRate,█████████▄▄▄▄▄▄▄▄▄▄▂▂▂▂▂▂▂▂▂▂▁
Loss,█▆▅▅▄▄▄▃▃▃▃▂▂▂▂▂▂▂▂▂▁▁▁▁▁▁▁▁▁▁

0,1
Accuracy,0.8158
LearningRate,0.00125
Loss,0.06634


Accuracy on the test set: 81.58%


0.8158