In [1]:
import torch 
import cv2
import os
from torch import nn 
from torchvision import models
from torchvision.transforms import Compose, Resize, ToTensor, Normalize, Lambda, transforms
from torch import cuda
from PIL import Image
from torchvision import datasets
from tqdm import tqdm
import copy

In [2]:
# def preprocess_digit(img_path):
#     img = cv2.imread(img_path)
#     gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
#     blur = cv2.GaussianBlur(gray, (5,5), 0)
#     _, binary = cv2.threshold(
#         blur, 0, 255,
#         cv2.THRESH_BINARY + cv2.THRESH_OTSU
#     )
#     contours, _ = cv2.findContours(
#         binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
#     )

#     if len(contours) > 0:
#         c = max(contours, key=cv2.contourArea)
#         x,y,w,h = cv2.boundingRect(c)
#         digit = binary[y:y+h, x:x+w]
#     else:
#         digit = binary
#     digit = cv2.resize(digit, (180,180))
#     digit = cv2.copyMakeBorder(
#         digit, 22,22,22,22,
#         cv2.BORDER_CONSTANT, value=0
#     )
#     digit = cv2.cvtColor(digit, cv2.COLOR_GRAY2RGB)

#     return digit

In [3]:
# input_dir = "/run/media/eugene/D/university_project/ai_project/khmer-number-recognition/database/train"
# output_dir = "/run/media/eugene/D/university_project/ai_project/khmer-number-recognition/database/processed/train"

# os.makedirs(output_dir, exist_ok=True)

# for label in os.listdir(input_dir):
#     in_dir  = os.path.join(input_dir, label)
#     out_dir = os.path.join(output_dir, label)
#     os.makedirs(out_dir, exist_ok=True)

#     for img_name in os.listdir(in_dir):
#         img_path = os.path.join(in_dir, img_name)
#         processed = preprocess_digit(img_path)

#         cv2.imwrite(
#             os.path.join(out_dir, img_name),
#             processed
#         )

In [4]:
img = cv2.imread("/run/media/eugene/D/university_project/ai_project/khmer-number-recognition/database/processed/train/1/0_001.png")
img.shape

(224, 224, 3)

In [5]:
train_transform = transforms.Compose([
    transforms.RandomAffine(
        degrees=15,
        translate=(0.1,0.1),
        scale=(0.9,1.1)
    ),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485,0.456,0.406],
        std=[0.229,0.224,0.225]
    )
])

In [6]:
def conv3x3(in_planes, out_planes, stride=1):
    return nn.Conv2d(
        in_planes, out_planes,
        kernel_size=3, stride=stride,
        padding=1, bias=False
    )


def conv1x1(in_planes, out_planes, stride=1):
    return nn.Conv2d(
        in_planes, out_planes,
        kernel_size=1, stride=stride,
        bias=False
    )

class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super().__init__()

        self.conv1 = conv1x1(in_planes, planes)
        self.bn1 = nn.BatchNorm2d(planes)

        self.conv2 = conv3x3(planes, planes, stride)
        self.bn2 = nn.BatchNorm2d(planes)

        self.conv3 = conv1x1(planes, planes * self.expansion)
        self.bn3 = nn.BatchNorm2d(planes * self.expansion)

        self.relu = nn.ReLU(inplace=True)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != planes * self.expansion:
            self.shortcut = nn.Sequential(
                conv1x1(in_planes, planes * self.expansion, stride),
                nn.BatchNorm2d(planes * self.expansion)
            )

    def forward(self, x):
        identity = x

        out = self.relu(self.bn1(self.conv1(x)))
        out = self.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))

        out += self.shortcut(identity)
        out = self.relu(out)

        return out


class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=10):
        super().__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(
            3, 64, kernel_size=3,
            stride=1, padding=1, bias=False
        )
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)

        self.maxpool = nn.Identity()

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        self._initialize_weights()

    def _make_layer(self, block, planes, blocks, stride=1):
        layers = [block(self.in_planes, planes, stride)]
        self.in_planes = planes * block.expansion

        for _ in range(1, blocks):
            layers.append(block(self.in_planes, planes))

        return nn.Sequential(*layers)

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(
                    m.weight, mode='fan_out', nonlinearity='relu'
                )
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x


# -------------------------
# ResNet-50 Factory
# -------------------------
def resnet50(num_classes=10):
    return ResNet(Bottleneck, [3, 4, 6, 3], num_classes)


In [7]:
def train_model(
    model, train_loader, val_loader,
    optimizer, criterion, scheduler,
    device, epochs=30
):
    best_acc = 0.0
    best_model_wts = copy.deepcopy(model.state_dict())

    for epoch in range(epochs):
        print(f"\nEpoch [{epoch+1}/{epochs}]")
        print("-" * 40)

        model.train()
        train_loss, train_correct, total = 0.0, 0, 0

        loop = tqdm(train_loader, desc="Train", leave=False)
        for inputs, labels in loop:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            preds = outputs.argmax(1)
            train_loss += loss.item() * inputs.size(0)
            train_correct += (preds == labels).sum().item()
            total += inputs.size(0)

            loop.set_postfix(loss=loss.item())

        scheduler.step()

        train_loss /= total
        train_acc = train_correct / total

        model.eval()
        val_loss, val_correct, val_total = 0.0, 0, 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                preds = outputs.argmax(1)

                val_loss += loss.item() * inputs.size(0)
                val_correct += (preds == labels).sum().item()
                val_total += inputs.size(0)

        val_loss /= val_total
        val_acc = val_correct / val_total

        if val_acc > best_acc:
            best_acc = val_acc
            best_model_wts = copy.deepcopy(model.state_dict())

        lr = optimizer.param_groups[0]["lr"]

        print(
            f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.3f}\n"
            f"Val   Loss: {val_loss:.4f} | Val   Acc: {val_acc:.3f}\n"
            f"LR: {lr:.6f} | Best Acc: {best_acc:.3f}"
        )

    model.load_state_dict(best_model_wts)
    print("\n Training complete. Best Val Acc:", best_acc)
    return model


In [8]:
train_dataset = datasets.ImageFolder(
    root="/run/media/eugene/D/university_project/ai_project/khmer-number-recognition/database/processed/train",
    transform=train_transform
)

train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_subset, val_subset = torch.utils.data.random_split(
    train_dataset, [train_size, val_size]
)

train_loader = torch.utils.data.DataLoader(
    train_subset, batch_size=32, shuffle=True, num_workers=4
)

val_loader = torch.utils.data.DataLoader(
    val_subset, batch_size=32, shuffle=False, num_workers=4
)

In [9]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = resnet50(num_classes=10).to(device)
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = torch.optim.AdamW(
    model.parameters(),
    lr=3e-4,
    weight_decay=1e-4
)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=30   
)

In [10]:
print(device)

cuda


In [11]:
train_model(
    model=model,
    train_loader=train_loader,
    val_loader=val_loader,
    optimizer=optimizer,
    criterion=criterion,
    scheduler=scheduler,
    device=device,
    epochs=30
)


Epoch [1/30]
----------------------------------------


                                             

OutOfMemoryError: CUDA out of memory. Tried to allocate 1.53 GiB. GPU 0 has a total capacity of 5.64 GiB of which 1.48 GiB is free. Including non-PyTorch memory, this process has 4.14 GiB memory in use. Of the allocated memory 4.02 GiB is allocated by PyTorch, and 16.95 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)