<a href="https://colab.research.google.com/github/Saivam/Snapshot/blob/main/Railway_Snapshot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.optim.lr_scheduler import CyclicLR
from tqdm import tqdm

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = nn.functional.relu(x)
        x = self.conv2(x)
        x = nn.functional.relu(x)
        x = nn.functional.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = nn.functional.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = nn.functional.log_softmax(x, dim=1)
        return output

def train(model, device, train_loader, optimizer, scheduler, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = nn.functional.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        scheduler.step()

def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += nn.functional.nll_loss(output, target, reduction='sum').item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))


In [None]:
if __name__ == '__main__':
    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    # Define the dataset and data loader
    train_transform = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

    test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])
    train_dataset = datasets.ImageFolder(root='/content/drive/MyDrive/Dataset', transform=train_transform)
    test_dataset = datasets.ImageFolder(root='/content/drive/MyDrive/Dataset', transform=test_transform)
    train_loader = torch.utils.data.DataLoader(trainset, batch_size=128,
                                              shuffle=True, num_workers=2)
    # testset = "/content/drive/MyDrive/Dataset"
    test_loader = torch.utils.data.DataLoader(testset, batch_size=100,
                                             shuffle=False, num_workers=2)

    # Define the model, optimizer, and cyclic learning rate scheduler
    model = Net().to(device)
    optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
    scheduler = CyclicLR(optimizer, base_lr=0.001, max_lr=0.1, mode='triangular2')
    
    # Define the snapshot ensemble parameters
    n_cycles = 5
    n_epochs = 24
    n_models = n_cycles * n_epochs
    n_batch = len(train_loader)
    alpha_zero = 0.1
    alpha = lambda epoch: alpha_zero * (1 - (epoch % n_epochs) / n_epochs)
    snapshots = []
    
    # Train the model for n_models epochs using cyclic learning rate and snapshot ensemble
    for cycle in range(n_cycles):
        for epoch in range(n_epochs):
            print(f'Cycle {cycle+1}, Epoch {epoch+1}')
            train(model, device, train_loader, optimizer, scheduler, epoch)
            test(model, device, test_loader)
    
            # Save the model state dictionary at the end of each epoch
            snapshot = {
                'model': model.state_dict(),
                'optimizer': optimizer.state_dict(),
                'scheduler': scheduler.state_dict(),
                'epoch': epoch
            }
            snapshots.append(snapshot)
    
            # Reset the cyclic learning rate scheduler at the end of each cycle
            if epoch == n_epochs - 1:
                scheduler = CyclicLR(optimizer, base_lr=0.001, max_lr=0.1, mode='triangular2')
    
        # Update the learning rate after each cycle using the formula alpha = alpha_zero * (1 - k/n)
        alpha_zero = alpha(cycle * n_epochs)
    
    # Create the snapshot ensemble by averaging the saved model parameters
    ensemble_params = None
    for snapshot in snapshots:
        if ensemble_params is None:
            ensemble_params = snapshot['model']
        else:
            for key in ensemble_params:
                ensemble_params[key] += snapshot['model'][key]
    
    for key in ensemble_params:
        ensemble_params[key] /= n_models
    
    # Create the ensemble model and test its performance on the test set
    ensemble_model = Net().to(device)
    ensemble_model.load_state_dict(ensemble_params)
    test(ensemble_model, device, test_loader)
    
    

FileNotFoundError: ignored

In [None]:
# Define the model architecture
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.fc1 = nn.Linear(64 * 8 * 8, 256)
        self.fc2 = nn.Linear(256, 10)

    def forward(self, x):
        x = nn.functional.relu(self.bn1(self.conv1(x)))
        x = nn.functional.max_pool2d(x, 2)
        x = nn.functional.relu(self.bn2(self.conv2(x)))
        x = nn.functional.max_pool2d(x, 2)
        x = nn.functional.relu(self.bn3(self.conv3(x)))
        x = nn.functional.max_pool2d(x, 2)
        x = x.view(-1, 64 * 8 * 8)
        x = nn.functional.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Define the training and testing functions
def train(model, device, train_loader, optimizer, scheduler, epoch):
    model.train()
    train_loss = 0
    correct = 0
    total = 0
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = nn.functional.cross_entropy(output, target)
        loss.backward()
        optimizer.step()
        scheduler.step()
        train_loss += loss.item()
        _, predicted = output.max(1)
        total += target.size(0)
        correct += predicted.eq(target).sum().item()
        progress_bar(batch_idx, len(train_loader), 'Loss: %.3f | Acc: %.3f%% (%d/%d)'
            % (train_loss/(batch_idx+1), 100.*correct/total, correct, total))

def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = nn.functional.cross_entropy(output, target)
            test_loss += loss.item()
            _, predicted = output.max(1)
            total += target.size(0)
            correct += predicted.eq(target).sum().item()

        progress_bar(len(test_loader), len(test_loader), 'Loss: %.3f | Acc: %.3f%% (%d/%d)'
            % (test_loss/(len(test_loader)), 100.*correct/total, correct, total))

    return 100.*correct/total

# Define the dataset and data loaders
train_transform = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

train_dataset = datasets.ImageFolder(root='/content/drive/MyDrive/Dataset', transform=train_transform)
test_dataset = datasets.ImageFolder(root='/content/drive/MyDrive/Dataset', transform=test_transform)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=2)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=2)

# Define the device and hyperparameters
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

learning_rate = 0.1
momentum = 0.9
weight_decay = 5e-4
max_lr = 0.5
step_size = 800
epochs = 200
num_snapshots = 5

# Initialize the model, optimizer, and cyclic learning rate scheduler
model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum, weight_decay=weight_decay)
scheduler = CyclicLR(optimizer, base_lr=learning_rate, max_lr=max_lr, step_size_up=step_size)

# Train the model and save the snapshots
for snapshot_idx in range(num_snapshots):
    for epoch in range(epochs):
        print(f'Snapshot {snapshot_idx + 1} - Epoch {epoch + 1}')
        train(model, device, train_loader, optimizer, scheduler, epoch)
        acc = test(model, device, test_loader)

    # Save the model snapshot
    torch.save(model.state_dict(), f'snapshot_ensemble_snapshot{snapshot_idx + 1}.pt')


FileNotFoundError: ignored