In [1]:
%run "config.py"

declared 4 variables


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms

from PIL import Image

In [3]:
F.relu

<function torch.nn.functional.relu(input: torch.Tensor, inplace: bool = False) -> torch.Tensor>

In [4]:
import sys
import pathlib
PYTHON_DIR = pathlib.Path(sys.executable).parent.parent.resolve()
torch.ops.load_library(str(PYTHON_DIR.joinpath("lib", "libpt_ocl.so")))

In [5]:
class Net(nn.Module):
    def __init__(self, num_channels) -> None:
        super(Net, self).__init__()

        self.num_channels = num_channels
        self.conv1 = nn.Conv2d(3, self.num_channels, 3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(self.num_channels)
        self.conv2 = nn.Conv2d(self.num_channels, self.num_channels * 2, 3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(self.num_channels*2)
        self.conv3 = nn.Conv2d(self.num_channels * 2, self.num_channels * 4, 3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(self.num_channels*4)

        self.fc1 = nn.Linear(self.num_channels*4*8*8, self.num_channels*4)
        self.fcbn1 = nn.BatchNorm1d(self.fc1.out_features)
        self.fc2 = nn.Linear(self.num_channels*4, 6)

    def forward(self, x): # (3, 64, 64)
        x = self.conv1(x) # (n, 64, 64)
        x = self.bn1(x)
        x = F.relu(F.max_pool2d(x, 2)) # (n, 32, 32)
        x = self.conv2(x) # (2*n, 32, 32)
        x = self.bn2(x)
        x = F.relu(F.max_pool2d(x, 2)) # (2*n, 16, 16)
        x = self.conv3(x) # (2*n, 16, 16)
        x = self.bn3(x)
        x = F.relu(F.max_pool2d(x, 2)) # (4*n, 8, 8)

        # flatten
        x = x.view(-1, self.num_channels*4*8*8)

        # fc
        x = self.fc1(x)
        x = self.fcbn1(x)
        x = F.relu(x)
        x = F.dropout(x, p = 0.8, training=True)
        x = self.fc2(x)
        
        # softmax
        x = F.log_softmax(x, dim=1)
        
        return x

In [6]:
class SIGNSDataset():
    def __init__(self, base_dir: str | pathlib.Path, split: str = "train", transform = None) -> None:
        if isinstance(base_dir, str): base_dir = pathlib.Path(base_dir)
        path = base_dir.joinpath(split)
        files = path.iterdir()

        self.filenames = list(filter(lambda x: str(x).endswith(".jpg"), files))
        self.labels = [int(f.name[0]) for f in self.filenames]
        self.transform = transform
    
    def __len__(self):
        return len(self.filenames)
    def __getitem__(self, idx):
        image = Image.open(self.filenames[idx])
        if self.transform:
            image = self.transform(image)
        return image, self.labels[idx]

In [7]:
transform = transforms.Compose(
    [
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))
    ]
)

In [8]:
DIR = SIGNS_DIR.joinpath("64x64_SIGNS")
train_dataset = SIGNSDataset(DIR, "train_signs", transform=transforms.ToTensor())
test_dataset = SIGNSDataset(DIR, "test_signs", transform=transforms.ToTensor())
val_dataset = SIGNSDataset(DIR, "val_signs", transform=transforms.ToTensor())

In [9]:
train_dataset[0][0].shape

torch.Size([3, 64, 64])

In [10]:
train_loader = DataLoader(train_dataset, batch_size=16)
test_loader = DataLoader(test_dataset, batch_size=16)
val_loader = DataLoader(val_dataset, batch_size=16)

In [11]:
device = torch.device('cpu')
net = Net(32).to(device)

In [14]:
class RunningMetric():
    def __init__(self) -> None:
        self.S = 0
        self.N = 0
    
    def update(self, val, size):
        self.S += val
        self.N += size

    def __call__(self):
        try:
            return self.S / float(self.N)
        except ZeroDivisionError as e:
            return 0

In [15]:
loss_fn = nn.NLLLoss()
optimizer = optim.SGD(net.parameters(), lr=1e-3, momentum=0.9)

num_epochs = 25
for epoch in range(1, num_epochs + 1):
    print("Epoch {} / {}".format(epoch, num_epochs))
    print("-"*10)

    running_loss = RunningMetric()
    running_ac  = RunningMetric()
    running_loss_v = RunningMetric()
    running_ac_v  = RunningMetric()

    for phase in ('train', 'val'):
        if phase == "train":
            net.train()
        else:
            net.eval()

        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()

            outputs = net(inputs)
            outputs1 = outputs.to('cpu')
            _, preds = torch.max(outputs1, 1)
            # preds = np.max(outputs1)
            loss = loss_fn(outputs, targets)
            if phase == 'train':
                loss.backward()
                optimizer.step()

            batch_size = inputs.size()[0]
            running_loss.update(batch_size*loss.item(),
                                batch_size)
            running_ac.update(torch.sum(preds == targets.to('cpu')),
                                batch_size)
            print("\r {} Loss: {:4f} - Accuracy {:4f}".format(phase, running_loss(), running_ac()), end="")
        print()


Epoch 1 / 25
----------
 train Loss: 1.458429 - Accuracy 0.418981
 val Loss: 1.329740 - Accuracy 0.493056
Epoch 2 / 25
----------
 train Loss: 1.077997 - Accuracy 0.609375

KeyboardInterrupt: 

In [18]:
def train_eval(model: nn.Module, optimizer: optim.Optimizer, loss_fn , dataloaders: list, epochs: int = 10, lr: float = 0.01):
    for g in optimizer.param_groups:
        g['lr'] = lr
    
    train_loader, val_loader = dataloaders

    for epoch in range(1, epochs + 1):
        print("Epoch {} / {}".format(epoch, epochs))
        print("-"*10)

        running_loss = RunningMetric()
        running_ac  = RunningMetric()
        running_loss_v = RunningMetric()
        running_ac_v  = RunningMetric()

        for phase in ('train', 'val'):
            if phase == "train":
                model.train()
            else:
                model.eval()

            for inputs, targets in train_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                optimizer.zero_grad()
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = net(inputs)
                    outputs1 = outputs.to('cpu')
                    _, preds = torch.max(outputs1, 1)
                    # preds = np.max(outputs1)
                    loss = loss_fn(outputs, targets)
                if phase == 'train':
                    loss.backward()
                    optimizer.step()

                batch_size = inputs.size()[0]
                running_loss.update(batch_size*loss.item(),
                                    batch_size)
                running_ac.update(torch.sum(preds == targets.to('cpu')),
                                    batch_size)
                print("\r {} Loss: {:4f} - Accuracy {:4f}".format(phase, running_loss(), running_ac()), end="")
            print()

In [20]:
import numpy as np

In [50]:
lrs = np.arange(-6, -2, 1)
lrs = 10. ** lrs

In [52]:
for lr in lrs:
    train_eval(model=net, optimizer=optimizer, dataloaders=[train_loader, val_loader], loss_fn=loss_fn, epochs=10, lr=lr)

Epoch 1 / 10
----------
 train Loss: 1.073740 - Accuracy 0.625000

KeyboardInterrupt: 