In [1]:
!pip install torch torchvision scikit-learn Pillow

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, Subset
from torchvision import transforms
from sklearn.model_selection import train_test_split
from PIL import Image
import os
import string
import numpy as np
from tqdm import tqdm

# Dataset Class for Task 2
class CaptchaSequenceDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        
        # Define vocabulary (case-sensitive letters)
        self.chars = string.ascii_letters  # 52 characters
        self.char_to_idx = {char: idx+1 for idx, char in enumerate(self.chars)}  # blank=0
        self.idx_to_char = {idx+1: char for idx, char in enumerate(self.chars)}
        self.num_classes = len(self.chars) + 1  # 53
        
        # Load images and labels
        for img_name in os.listdir(root_dir):
            if img_name.endswith('.png'):
                parts = img_name.split('_')
                if len(parts) >=3 and parts[0] == 'captcha':
                    label = parts[1]
                    # Validate characters
                    if all(c in self.char_to_idx for c in label):
                        self.image_paths.append(os.path.join(root_dir, img_name))
                        self.labels.append(label)
                    else:
                        print(f"Skipped {img_name} due to invalid characters")
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        # Convert label to indices
        target = [self.char_to_idx[c] for c in label]
        target_length = torch.tensor(len(target), dtype=torch.long)
        target = torch.tensor(target, dtype=torch.long)
        
        return image, target, target_length

# Collate function for DataLoader
def collate_fn(batch):
    images, targets, target_lengths = [], [], []
    for item in batch:
        images.append(item[0])
        targets.append(item[1])
        target_lengths.append(item[2])
    images = torch.stack(images)
    targets = torch.cat(targets)
    target_lengths = torch.stack(target_lengths)
    # Assume fixed sequence length (determined by model)
    input_lengths = torch.full((len(batch),), 64, dtype=torch.long)  # Update based on model
    return images, targets, input_lengths, target_lengths

# Model Architecture (CRNN with CTC)
class CRNN(nn.Module):
    def __init__(self, num_classes):
        super(CRNN, self).__init__()
        self.num_classes = num_classes
        # CNN layers
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 64, 3, padding=1), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d((2,2)),
            nn.Conv2d(64, 128, 3, padding=1), nn.BatchNorm2d(128), nn.ReLU(), nn.MaxPool2d((2,2)),
            nn.Conv2d(128, 256, 3, padding=1), nn.BatchNorm2d(256), nn.ReLU(),
            nn.Conv2d(256, 256, 3, padding=1), nn.BatchNorm2d(256), nn.ReLU(), nn.MaxPool2d((2,1)),
            nn.Conv2d(256, 512, 3, padding=1), nn.BatchNorm2d(512), nn.ReLU(), nn.MaxPool2d((2,1)),
            nn.Conv2d(512, 512, 3, padding=1), nn.BatchNorm2d(512), nn.ReLU(), nn.MaxPool2d((4,1))
        )
        # RNN layers
        self.rnn = nn.LSTM(512, 256, bidirectional=True, num_layers=2, dropout=0.3, batch_first=True)
        self.fc = nn.Linear(512, num_classes)
        # Vocabulary mapping
        self.idx_to_char = {idx+1: char for idx, char in enumerate(string.ascii_letters)}
        self.idx_to_char[0] = '-'  # Blank

    def forward(self, x):
        x = self.cnn(x)  # (batch, channels, height, width)
        x = x.squeeze(2)  # Remove height dim
        x = x.permute(0, 2, 1)  # (batch, seq_len, features)
        x, _ = self.rnn(x)
        x = self.fc(x)
        x = nn.functional.log_softmax(x, dim=2)
        return x

# Transforms
train_transform = transforms.Compose([
    transforms.Resize((64, 256)),
    transforms.RandomRotation(5),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1)),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_test_transform = transforms.Compose([
    transforms.Resize((64, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])





In [4]:
# Initialize Dataset and DataLoaders
full_dataset = CaptchaSequenceDataset(root_dir='/kaggle/input/hard-captcha-data-set-50k/hard_captcha_dataset', transform=None)
train_val_idx, test_idx = train_test_split(
    range(len(full_dataset)), test_size=0.2, random_state=42, 
   
)
train_idx, val_idx = train_test_split(
    train_val_idx, test_size=0.25, random_state=42,  # 25% of train+val for validation
    
)

class TransformedSubset(Dataset):
    def __init__(self, subset, transform=None):
        self.subset = subset
        self.transform = transform
    def __len__(self):
        return len(self.subset)
    def __getitem__(self, idx):
        x, y, y_len = self.subset[idx]
        if self.transform:
            x = self.transform(x)
        return x, y, y_len

train_subset = TransformedSubset(Subset(full_dataset, train_idx), train_transform)
val_subset = TransformedSubset(Subset(full_dataset, val_idx), val_test_transform)
test_subset = TransformedSubset(Subset(full_dataset, test_idx), val_test_transform)

batch_size = 64
train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn, num_workers=4, pin_memory=True)
test_loader = DataLoader(test_subset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn, num_workers=4, pin_memory=True)

# Training Setup
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = CRNN(num_classes=53).to(device)
criterion = nn.CTCLoss(blank=0)
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5)




# Decoding function

In [5]:
def decode_predictions(outputs, model):
    _, max_indices = torch.max(outputs, 2)
    batch_size = outputs.size(0)
    decoded = []
    for i in range(batch_size):
        indices = max_indices[i].cpu().numpy()
        chars = []
        previous = None
        for idx in indices:
            if idx != 0:
                if idx != previous:
                    chars.append(model.idx_to_char.get(idx, ''))
            previous = idx
        decoded.append(''.join(chars))
    return decoded





# Training Loop

In [6]:
num_epochs = 30
best_val_word_acc = 0.0

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    total_train_chars = 0
    correct_train_chars = 0
    correct_train_words = 0
    
    for batch in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs} - Training'):
        images, targets, input_lengths, target_lengths = batch
        images, targets = images.to(device), targets.to(device)
        input_lengths, target_lengths = input_lengths.to(device), target_lengths.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        log_probs = outputs.permute(1, 0, 2)  # CTC requires (seq_len, batch, num_classes)
        loss = criterion(log_probs, targets, input_lengths, target_lengths)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * images.size(0)
        # Decode and calculate accuracy
        decoded = decode_predictions(outputs, model)
        for i in range(len(decoded)):
            true_label = full_dataset.labels[train_idx[batch[0].tolist().index(i)]]
            pred_label = decoded[i]
            correct_train_chars += sum(c1 == c2 for c1, c2 in zip(true_label, pred_label))
            total_train_chars += len(true_label)
            if pred_label == true_label:
                correct_train_words += 1
    
    # Validation
    model.eval()
    val_loss = 0.0
    correct_val_chars = 0
    total_val_chars = 0
    correct_val_words = 0
    
    with torch.no_grad():
        for batch in tqdm(val_loader, desc=f'Epoch {epoch+1}/{num_epochs} - Validation'):
            images, targets, input_lengths, target_lengths = batch
            images, targets = images.to(device), targets.to(device)
            input_lengths, target_lengths = input_lengths.to(device), target_lengths.to(device)
            
            outputs = model(images)
            log_probs = outputs.permute(1, 0, 2)
            loss = criterion(log_probs, targets, input_lengths, target_lengths)
            val_loss += loss.item() * images.size(0)
            
            decoded = decode_predictions(outputs, model)
            for i in range(len(decoded)):
                true_label = full_dataset.labels[val_idx[batch[0].tolist().index(i)]]
                pred_label = decoded[i]
                correct_val_chars += sum(c1 == c2 for c1, c2 in zip(true_label, pred_label))
                total_val_chars += len(true_label)
                if pred_label == true_label:
                    correct_val_words += 1
    
    # Calculate metrics
    train_loss = train_loss / len(train_loader.dataset)
    train_char_acc = correct_train_chars / total_train_chars if total_train_chars >0 else 0
    train_word_acc = correct_train_words / len(train_loader.dataset)
    
    val_loss = val_loss / len(val_loader.dataset)
    val_char_acc = correct_val_chars / total_val_chars if total_val_chars >0 else 0
    val_word_acc = correct_val_words / len(val_loader.dataset)
    
    print(f'Epoch {epoch+1}/{num_epochs}:')
    print(f'Train Loss: {train_loss:.4f} | Char Acc: {train_char_acc:.4f} | Word Acc: {train_word_acc:.4f}')
    print(f'Val Loss: {val_loss:.4f} | Char Acc: {val_char_acc:.4f} | Word Acc: {val_word_acc:.4f}')
    
    scheduler.step(val_loss)
    
    if val_word_acc > best_val_word_acc:
        best_val_word_acc = val_word_acc
        torch.save(model.state_dict(), 'best_model_task2.pth')
        print('Saved best model!')


Epoch 1/30 - Training:   0%|          | 0/469 [00:03<?, ?it/s]


ValueError: 0 is not in list

In [None]:
# Testing
model.load_state_dict(torch.load('best_model_task2.pth'))
model.eval()
test_correct_chars = 0
test_total_chars = 0
test_correct_words = 0

with torch.no_grad():
    for batch in tqdm(test_loader, desc='Testing'):
        images, targets, input_lengths, target_lengths = batch
        images = images.to(device)
        outputs = model(images)
        decoded = decode_predictions(outputs, model)
        for i in range(len(decoded)):
            true_label = full_dataset.labels[test_idx[batch[0].tolist().index(i)]]
            pred_label = decoded[i]
            test_correct_chars += sum(c1 == c2 for c1, c2 in zip(true_label, pred_label))
            test_total_chars += len(true_label)
            if pred_label == true_label:
                test_correct_words += 1

test_char_acc = test_correct_chars / test_total_chars if test_total_chars >0 else 0
test_word_acc = test_correct_words / len(test_loader.dataset)
print(f'Test Results: Char Acc: {test_char_acc:.4f} | Word Acc: {test_word_acc:.4f}')