In [2]:
DATA_DIR = "C:/Users/adars/github-classroom/DCC-UAB/xnap-project-ed_group_01/CRNN_implementation/dataset/img2/train"
ANNOTATIONS_FILE = "C:/Users/adars/github-classroom/DCC-UAB/xnap-project-ed_group_01/CRNN_implementation/dataset/annot2/labels_finals_train.txt"
BATCH_SIZE = 64
IMAGE_WIDTH = 128
IMAGE_HEIGHT = 32
NUM_WORKERS = 4
EPOCHS = 20
DEVICE = "cpu"

In [3]:
import albumentations
import torch

import numpy as np

from PIL import Image
from PIL import ImageFile

ImageFile.LOAD_TRUNCATED_IMAGES = True


class ClassificationDataset:
    def __init__(self, image_paths, targets, max_len, resize=None):
        # resize = (height, width)
        self.image_paths = image_paths
        self.targets = targets
        self.resize = resize

        self.max_len = max_len

        mean = (0.485, 0.456, 0.406)
        std = (0.229, 0.224, 0.225)
        self.aug = albumentations.Compose(
            [
                albumentations.Normalize(
                    mean, std, max_pixel_value=255.0, always_apply=True
                )
            ]
        )

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, item):
        print(self.image_paths[item])
        image = Image.open(self.image_paths[item]).convert("RGB")
        targets = self.targets[item]
        target = np.zeros(self.max_len)
        target[:len(targets)] = targets

        if self.resize is not None:
            image = image.resize(
                (self.resize[1], self.resize[0]), resample=Image.BILINEAR
            )

        image = np.array(image)
        augmented = self.aug(image=image)
        image = augmented["image"]
        image = np.transpose(image, (2, 0, 1)).astype(np.float32)

        return {
            "images": torch.tensor(image, dtype=torch.float),
            "targets": torch.tensor(target, dtype=torch.long),
        }

In [7]:
from tqdm import tqdm


def train_fn(model, data_loader, optimizer, epoch):
    model.train()
    fin_loss = 0
    example_ct = 0
    tk0 = tqdm(data_loader, total=len(data_loader))
    for i, data in enumerate(tk0):
        for key, value in data.items():
            data[key] = value.to(DEVICE)
        optimizer.zero_grad()
        _, loss = model(**data)
        loss.backward()
        optimizer.step()
        fin_loss += loss.item()
        example_ct += len(data[key])
        print(f"Loss after {str(epoch*len(data_loader.dataset) + example_ct).zfill(5)} examples: {loss.item():.3f}")
    return fin_loss / len(data_loader)


def eval_fn(model, data_loader):
    model.eval()
    fin_loss = 0
    fin_preds = []
    tk0 = tqdm(data_loader, total=len(data_loader))
    for data in tk0:
        for key, value in data.items():
            data[key] = value.to(DEVICE)
        batch_preds, loss = model(**data)
        fin_loss += loss.item()
        fin_preds.append(batch_preds)
    return fin_preds, fin_loss / len(data_loader.dataset)


In [5]:
import torch
from torch import nn
from torch.nn import functional as F


class CaptchaModel(nn.Module):
    def __init__(self, num_chars):
        super(CaptchaModel, self).__init__()
        self.conv_1 = nn.Conv2d(3, 128, kernel_size=(3, 6), padding=(1, 1))
        self.pool_1 = nn.MaxPool2d(kernel_size=(2, 2))
        self.conv_2 = nn.Conv2d(128, 64, kernel_size=(3, 6), padding=(1, 1))
        self.pool_2 = nn.MaxPool2d(kernel_size=(2, 2))
        self.linear_1 = nn.Linear(1152, 64)
        self.drop_1 = nn.Dropout(0.2)
        self.lstm = nn.GRU(64, 32, bidirectional=True, num_layers=2, dropout=0.25, batch_first=True)
        self.output = nn.Linear(64, num_chars + 1)

    def forward(self, images, targets=None):
        bs, _, _, _ = images.size()
        x = F.relu(self.conv_1(images))
        x = self.pool_1(x)
        x = F.relu(self.conv_2(x))
        x = self.pool_2(x)
        x = x.permute(0, 3, 1, 2)
        x = x.view(bs, x.size(1), -1)
        x = F.relu(self.linear_1(x))
        x = self.drop_1(x)
        x, _ = self.lstm(x)
        x = self.output(x)
        x = x.permute(1, 0, 2)

        if targets is not None:
            log_probs = F.log_softmax(x, 2)
            input_lengths = torch.full(
                size=(bs,), fill_value=log_probs.size(0), dtype=torch.int32
            )
            target_lengths = torch.zeros(bs, dtype=torch.int32)
            for i, t in enumerate(targets):
                target_lengths[i] = (sum([1 for t2 in t if t2 == 0]))
            loss = nn.CTCLoss(blank=0)(
                log_probs, targets, input_lengths, target_lengths
            )
            return x, loss

        return x, None

In [6]:
import os
import torch
import numpy as np

from sklearn import preprocessing
from sklearn import model_selection
from sklearn import metrics



def remove_duplicates(x):
    if len(x) < 2:
        return x
    fin = ""
    for j in x:
        if fin == "":
            fin = j
        else:
            if j == fin[-1]:
                continue
            else:
                fin = fin + j
    return fin


def decode_predictions(preds, encoder):
    preds = preds.permute(1, 0, 2)
    preds = torch.softmax(preds, 2)
    preds = torch.argmax(preds, 2)
    preds = preds.detach().cpu().numpy()
    cap_preds = []
    for j in range(preds.shape[0]):
        temp = []
        for k in preds[j, :]:
            k = k - 1
            if k == -1:
                temp.append("§")
            else:
                p = encoder.inverse_transform([k])[0]
                temp.append(p)
        tp = "".join(temp).replace("§", "")
        cap_preds.append(remove_duplicates(tp))
    return cap_preds


def run_training():
    with open(ANNOTATIONS_FILE, "r") as file:
        image_files = file.readlines()
    image_files = [os.path.join(DATA_DIR, path.split("/")[-1]) for path in image_files][:5000]
    image_files = [path.split(" ")[0] for path in image_files]
    print(image_files)
    print([x.split("/")[-1].split(".")[0] for x in image_files])
    targets_orig = [x.split("/")[-1].split(".")[0] for x in image_files]
    targets = [[c for c in x] for x in targets_orig]
    targets_flat = [c for clist in targets for c in clist]

    lbl_enc = preprocessing.LabelEncoder()
    lbl_enc.fit(targets_flat)
    targets_enc = [list(lbl_enc.transform(x)) for x in targets]
    targets_enc = [[c+1 for c in x] for x in targets_enc]
    targets_enc = np.array(targets_enc, dtype=object)
    max_len = max(len(x) for x in targets_enc)

    (
        train_imgs,
        test_imgs,
        train_targets,
        test_targets,
        _,
        test_targets_orig,
    ) = model_selection.train_test_split(
        image_files, targets_enc, targets_orig, test_size=0.1, random_state=42
    )

    train_dataset = ClassificationDataset(
        image_paths=train_imgs,
        targets=train_targets,
        max_len=max_len,
        resize=(IMAGE_HEIGHT, IMAGE_WIDTH),
    )
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
    )
    test_dataset = ClassificationDataset(
        image_paths=test_imgs,
        targets=test_targets,
        max_len=max_len,
        resize=(IMAGE_HEIGHT, IMAGE_WIDTH),
    )
    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=BATCH_SIZE,
        shuffle=False,
    )

    model = CaptchaModel(num_chars=len(lbl_enc.classes_))
    model.to(DEVICE)

    optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, factor=0.8, patience=5, verbose=True
    )
    total_example_ct = 0
    for epoch in range(EPOCHS):
        train_loss = train_fn(model, train_loader, optimizer, epoch)
        valid_preds, test_loss = eval_fn(model, test_loader)
        valid_captcha_preds = []
        for vp in valid_preds:
            current_preds = decode_predictions(vp, lbl_enc)
            valid_captcha_preds.extend(current_preds)
        combined = list(zip(test_targets_orig, valid_captcha_preds))
        print(combined[:10])
        test_dup_rem = [remove_duplicates(c) for c in test_targets_orig]
        accuracy = metrics.accuracy_score(test_dup_rem, valid_captcha_preds)
        print(
            f"Epoch={epoch}, Train Loss={train_loss}, Test Loss={test_loss} Accuracy={accuracy}"
        )
        total_example_ct += len(train_dataset)
        
        scheduler.step(test_loss)


if __name__ == "__main__":
    run_training()


['C:/Users/adars/github-classroom/DCC-UAB/xnap-project-ed_group_01/CRNN_implementation/dataset/img2/train\\w9dwpg.jpg\n', 'C:/Users/adars/github-classroom/DCC-UAB/xnap-project-ed_group_01/CRNN_implementation/dataset/img2/train\\ynlh2fo.jpg\n', 'C:/Users/adars/github-classroom/DCC-UAB/xnap-project-ed_group_01/CRNN_implementation/dataset/img2/train\\hqc.jpg\n', 'C:/Users/adars/github-classroom/DCC-UAB/xnap-project-ed_group_01/CRNN_implementation/dataset/img2/train\\3so14.jpg\n', 'C:/Users/adars/github-classroom/DCC-UAB/xnap-project-ed_group_01/CRNN_implementation/dataset/img2/train\\qqny66nj.jpg\n', 'C:/Users/adars/github-classroom/DCC-UAB/xnap-project-ed_group_01/CRNN_implementation/dataset/img2/train\\ntu46a.jpg\n', 'C:/Users/adars/github-classroom/DCC-UAB/xnap-project-ed_group_01/CRNN_implementation/dataset/img2/train\\o7k.jpg\n', 'C:/Users/adars/github-classroom/DCC-UAB/xnap-project-ed_group_01/CRNN_implementation/dataset/img2/train\\gwm5fcc.jpg\n', 'C:/Users/adars/github-classroom/D

  0%|          | 0/8 [00:00<?, ?it/s]


C:/Users/adars/github-classroom/DCC-UAB/xnap-project-ed_group_01/CRNN_implementation/dataset/img2/train\rbq1gjmbhe.jpg



OSError: [Errno 22] Invalid argument: 'C:/Users/adars/github-classroom/DCC-UAB/xnap-project-ed_group_01/CRNN_implementation/dataset/img2/train\\rbq1gjmbhe.jpg\n'