#### Simple CNN for image classification

In [None]:
import torch
from torch import nn
from transformers import ViTFeatureExtractor, AutoModelForImageClassification
vit_model = AutoModelForImageClassification.from_pretrained("google/vit-base-patch16-224-in21k", num_labels=10);

In [None]:
from torch.optim import AdamW
from transformers import get_scheduler
from eurosat_dataset import load_eurosat_for_vit

optimizer = AdamW(vit_model.parameters(), lr=5e-5)
loaders = load_eurosat_for_vit(batch_size=32)



num_epochs = 5
num_training_steps = num_epochs * len(loaders['train'])
lr_scheduler = get_scheduler(
    name="linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=num_training_steps
)

device = torch.device("cuda")


In [None]:
from tqdm.auto import tqdm

progress_bar = tqdm(range(num_training_steps))

vit_model.to(device)
vit_model.train()
for epoch in range(num_epochs):
    for x_batch, y_batch in loaders['train']:
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)
        outputs = vit_model(pixel_values=x_batch, labels=y_batch)
        loss = outputs.loss
        loss.backward()

        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()
        progress_bar.update(1)

In [None]:
def evaluate_model_vit(model, data_loader, device):   
    model.eval()
    y_list = []
    pred_list = []
    for x_batch, y_batch in data_loader:
        x_batch = x_batch.to(device)
        output = model(x_batch)
        pred_list.append(torch.argmax(output['logits'], dim=1).cpu())
        y_list.append(y_batch)
    preds = torch.cat(pred_list, dim=0)
    y_true = torch.cat(y_list, dim=0)    
    #Calculate the accuracy
    is_correct = (preds == y_true).float()
    accuracy = is_correct.sum() / is_correct.numel()
    print(f'Accuracy: {accuracy:4f}')
    return accuracy

evaluate_model_vit(vit_model, loaders['test'], device)    

In [None]:
from mimetypes import init


def create_model(img_channels = 3):
    model = nn.Sequential()
    model.add_module('conv0', nn.Conv2d(in_channels=img_channels, out_channels=16, kernel_size=5, padding='same'))
    model.add_module('norm0', nn.BatchNorm2d(16))
    model.add_module('relu0', nn.ReLU())
    model.add_module('pool0', nn.MaxPool2d(kernel_size=2))

    model.add_module('conv1', nn.Conv2d(in_channels=16, out_channels=32, kernel_size=5, padding='same'))
    model.add_module('norm1', nn.BatchNorm2d(32))
    model.add_module('relu1', nn.ReLU())
    model.add_module('pool1', nn.MaxPool2d(kernel_size=2))

    model.add_module('conv2', nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5, padding='same'))
    model.add_module('norm2', nn.BatchNorm2d(64))
    model.add_module('relu2', nn.ReLU())
    model.add_module('pool2', nn.MaxPool2d(kernel_size=2))

    model.add_module('conv3', nn.Conv2d(in_channels=64, out_channels=128, kernel_size=5, padding='same'))
    model.add_module('norm3', nn.BatchNorm2d(128))
    model.add_module('relu3', nn.ReLU())
    model.add_module('pool3', nn.MaxPool2d(kernel_size=2))

    model.add_module('conv4', nn.Conv2d(in_channels=128, out_channels=256, kernel_size=5, padding='same'))
    model.add_module('norm4', nn.BatchNorm2d(256))
    model.add_module('relu4', nn.ReLU())
    model.add_module('pool4', nn.MaxPool2d(kernel_size=2))

    model.add_module('flatten', nn.Flatten())
    model.add_module('dropout0', nn.Dropout(p=0.5))
    model.add_module('ln0', nn.Linear(1024, out_features=512))
    model.add_module('selu4', nn.SELU())
    model.add_module('dropout1', nn.Dropout(p=0.5))
    model.add_module('ln1', nn.Linear(512, out_features=10))
    return model


In [None]:
in_tensor = torch.randn(1, 3, 64, 64)
create_model()(in_tensor)

### Load and prepare an image dataset

In [None]:
from torchvision import datasets, transforms
#Import image augmentation clases from torchvision
from torchvision.transforms import ToTensor, Compose, Normalize, RandomHorizontalFlip, RandomRotation, Resize
from torch.utils.data import DataLoader
image_path = './data/eurosat'
transform = Compose([
    RandomHorizontalFlip(),
    RandomRotation(degrees=10),
    ToTensor(), Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
eurosat_dataset = datasets.EuroSAT(root=image_path, transform=transform, download=True)

#Randomize the dataset
torch.manual_seed(1)
train_len = int(0.85 * len(eurosat_dataset)) - int(0.85 * len(eurosat_dataset) * 0.2)
valid_len = int(0.85 * len(eurosat_dataset) * 0.2)
test_len = len(eurosat_dataset) - train_len - valid_len
train_dataset, test_dataset, valid_dataset = torch.utils.data.random_split(eurosat_dataset, [train_len, valid_len, test_len])

#Create DataLoaders
batch_size = 128
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=20)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, num_workers=20)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, num_workers=20)


#### Create the training loop function

In [None]:
def train(model, epochs, train_dl, valid_dl, gpu=False):
    if gpu:
        model.cuda()
    else:
        model.cpu()

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    loss_hist_train = [0] * epochs
    accuracy_hist_train = [0] * epochs

    loss_hist_valid = [0] * epochs
    accuracy_hist_valid = [0] * epochs
    progress_bar = tqdm(range(epochs*len(train_dl)))
    for epoch in range(epochs):
        model.train()
        for x_batch, y_batch in train_dl:
            if gpu:
                x_batch = x_batch.cuda()
                y_batch = y_batch.cuda()
            pred = model(x_batch)
            loss = loss_fn(pred, y_batch)
            
            loss.backward() #Calculate tensor gradients with backpropagation
            optimizer.step() #Apply updates to weights using the optimizer gradient descent specif impl algorithm
            optimizer.zero_grad() #Reset the gradients for the next iteration
            
            #Calculate and save matrics
            loss_hist_train[epoch] += loss.item() * y_batch.size(0) #Add the loss to the loss history
            is_correct = (torch.argmax(pred, dim=1) == y_batch).cpu().float()
            accuracy_hist_train[epoch] += is_correct.sum()
            progress_bar.update(1)

        loss_hist_train[epoch] /= train_len
        accuracy_hist_train[epoch] /= train_len

        model.eval() #Set the model to evaluation mode
        with torch.no_grad(): #Turn off gradients
            for x_batch, y_batch in valid_dl:
                if gpu:
                    x_batch = x_batch.cuda()
                    y_batch = y_batch.cuda()
                pred = model(x_batch)
                loss = loss_fn(pred, y_batch)
                loss_hist_valid[epoch] += (loss.item() * y_batch.size(0))
                is_correct = (torch.argmax(pred, dim=1) == y_batch).cpu().float()
                accuracy_hist_valid[epoch] += is_correct.sum()
            loss_hist_valid[epoch] /= valid_len
            accuracy_hist_valid[epoch] /= valid_len
        print(f'Epoch {epoch+1}/{epochs} accuracy: {accuracy_hist_train[epoch]:4f} valid_accuracy: {accuracy_hist_valid[epoch]:4f}')
    return loss_hist_train, accuracy_hist_train, loss_hist_valid, accuracy_hist_valid

In [None]:
def evaluate_model(model, data_loader, gpu=False):
    if gpu:
        model.cuda()
    else:
        model.cpu()    
    model.eval()
    y_list = []
    pred_list = []
    for x_batch, y_batch in data_loader:
        if gpu:
            x_batch = x_batch.cuda()
        pred = model(x_batch)
        pred_list.append(torch.argmax(pred, dim=1).cpu())
        y_list.append(y_batch)
    preds = torch.cat(pred_list, dim=0)
    y_true = torch.cat(y_list, dim=0)    
    #Calculate the accuracy
    is_correct = (preds == y_true).float()
    accuracy = is_correct.sum() / is_correct.numel()
    print(f'Accuracy: {accuracy:4f}')
    return accuracy

In [None]:
torch.backends.cuda.matmul.allow_tf32 = True

In [None]:
gpu_model = create_model()

#Measure the time it takes to train the model
import time
start = time.time()
loss_hist_train, accuracy_hist_train, loss_hist_valid, accuracy_hist_valid = train(gpu_model, epochs=40, train_dl=train_loader, valid_dl=valid_loader, gpu=True)
end = time.time()
print(f'Time to train: {end-start}')

In [None]:
cpu_model = create_model()

#Measure the time it takes to train the model
import time
start = time.time()
loss_hist_train, accuracy_hist_train, loss_hist_valid, accuracy_hist_valid = train(cpu_model, epochs=15, train_dl=train_loader, valid_dl=valid_loader, gpu=False)
end = time.time()
print(f'Time to train: {end-start}')

In [None]:
import matplotlib.pyplot as plt
#Plot the loss
plt.plot(loss_hist_train, label='Training Loss')
plt.plot(loss_hist_valid, label='Validation Loss')
plt.legend()
plt.show()


In [None]:
evaluate_model(gpu_model, test_loader, gpu=True)

In [None]:
evaluate_model(cpu_model, test_loader, gpu=False)

In [None]:
import pytorch_lightning as pl
loss_fn = nn.CrossEntropyLoss()
class ClsModel(pl.LightningModule):
    def __init__(self, img_channels=3) -> None:
        super().__init__()
        model = nn.Sequential()
        model.add_module('conv0', nn.Conv2d(in_channels=img_channels, out_channels=16, kernel_size=5, padding='same'))
        model.add_module('relu0', nn.ReLU())
        model.add_module('pool0', nn.MaxPool2d(kernel_size=2))

        model.add_module('conv1', nn.Conv2d(in_channels=16, out_channels=32, kernel_size=5, padding='same'))
        model.add_module('relu1', nn.ReLU())
        model.add_module('pool1', nn.MaxPool2d(kernel_size=2))

        model.add_module('conv2', nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5, padding='same'))
        model.add_module('relu2', nn.ReLU())
        model.add_module('pool2', nn.MaxPool2d(kernel_size=2))
        

        model.add_module('flatten', nn.Flatten())
        model.add_module('ln0', nn.Linear(4096, out_features=1024))
        model.add_module('relu4', nn.ReLU())
        model.add_module('ln1', nn.Linear(1024, out_features=10))
        self.model = model
       
    def forward(self, x):
        return self.model(x)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=0.001)
        return optimizer

    def step(self, batch):
        x, y = batch
        y_pred = self.forward(x)
        return loss_fn(y_pred, y)

    def training_step(self, train_batch, batch_idx):
        loss = self.step(train_batch)
        self.log('train_loss', loss)
        return loss  

    def validation_step(self, valid_batch, batch_idx):
        loss = self.step(valid_batch)
        self.log('valid_loss', loss)
        return loss       


In [None]:
lmodel = ClsModel()
trainer = pl.Trainer(max_epochs=20, accelerator='gpu', devices=1)

start = time.time()
trainer.fit(lmodel, train_loader, valid_loader)
end = time.time()
print(f'Time to train: {end-start}')



In [None]:
evaluate_model(lmodel, test_loader, gpu=True)

In [None]:
torch.version.cuda