In [1]:
!pip install -q kaggle

In [2]:
from google.colab import files
files.upload()

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"debashish0sarker","key":"889ec46e790212e1771d258ca88d6db4"}'}

In [3]:
!mkdir ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [4]:
!kaggle datasets download -d debashish0sarker/captcha-shortened

Dataset URL: https://www.kaggle.com/datasets/debashish0sarker/captcha-shortened
License(s): unknown
Downloading captcha-shortened.zip to /content
 99% 3.32G/3.35G [00:28<00:00, 200MB/s]
100% 3.35G/3.35G [00:28<00:00, 128MB/s]


In [5]:
!unzip captcha-shortened.zip -d captcha_shortened

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: captcha_shortened/SPHINX_shortened/ZOUA_378480.png  
  inflating: captcha_shortened/SPHINX_shortened/ZOUB_423764.png  
  inflating: captcha_shortened/SPHINX_shortened/ZOUC_118519.png  
  inflating: captcha_shortened/SPHINX_shortened/ZOUC_601525.png  
  inflating: captcha_shortened/SPHINX_shortened/ZOUF_386752.png  
  inflating: captcha_shortened/SPHINX_shortened/ZOUG_855976.png  
  inflating: captcha_shortened/SPHINX_shortened/ZOUI_232716.png  
  inflating: captcha_shortened/SPHINX_shortened/ZOUI_709128.png  
  inflating: captcha_shortened/SPHINX_shortened/ZOUJ_524944.png  
  inflating: captcha_shortened/SPHINX_shortened/ZOUJ_814891.png  
  inflating: captcha_shortened/SPHINX_shortened/ZOUO_195100.png  
  inflating: captcha_shortened/SPHINX_shortened/ZOUO_317513.png  
  inflating: captcha_shortened/SPHINX_shortened/ZOUP_563916.png  
  inflating: captcha_shortened/SPHINX_shortened/ZOUQ_404157.png  
  inflating

In [6]:
!pip install -q torchsummary

In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision import transforms
import matplotlib.pyplot as plt
import numpy as np
import os
from PIL import Image
import string


In [13]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)

# CAPTCHA setup
ALPHABET = string.ascii_uppercase
NUM_CLASSES = len(ALPHABET) + 1   # +1 for CTC blank
IMG_HEIGHT, IMG_WIDTH = 40, 150
BATCH_SIZE = 64
EPOCHS = 10


Using device: cuda


In [14]:
class CAPTCHADataset(Dataset):
    def __init__(self, data_dir, transform=None, alphabet=ALPHABET, max_len=4):
        self.data_dir = data_dir
        self.transform = transform
        self.alphabet = alphabet
        self.max_len = max_len

        self.char_to_idx = {c: i+1 for i,c in enumerate(alphabet)}  # 1..26
        self.idx_to_char = {i+1: c for i,c in enumerate(alphabet)}
        self.blank_idx = 0  # reserved for CTC blank

        self.image_paths, self.labels = [], []
        for fname in sorted(os.listdir(data_dir)):
            if fname.lower().endswith((".png",".jpg",".jpeg")):
                label = fname.split("_")[0][:max_len].upper()
                if all(c in alphabet for c in label):
                    self.image_paths.append(os.path.join(data_dir, fname))
                    self.labels.append(label)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img = Image.open(self.image_paths[idx]).convert("L")
        if self.transform:
            img = self.transform(img)

        label = self.labels[idx]
        label_idx = [self.char_to_idx[c] for c in label]

        return img, torch.tensor(label_idx, dtype=torch.long), len(label_idx)


In [16]:
transform = transforms.Compose([
    transforms.Resize((IMG_HEIGHT, IMG_WIDTH)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

def get_loaders(data_dir, batch_size=64, val_split=0.2):
    dataset = CAPTCHADataset(data_dir, transform=transform)
    val_size = int(len(dataset) * val_split)
    train_size = len(dataset) - val_size
    train_ds, val_ds = random_split(dataset, [train_size, val_size])

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    val_loader   = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
    return dataset, train_loader, val_loader

DATA_DIR = "/content/captcha_shortened/SPHINX_shortened"  # change if needed
dataset, train_loader, val_loader = get_loaders(DATA_DIR, BATCH_SIZE)


In [36]:
class CRNN(nn.Module):
    def __init__(self, img_h=40, num_classes=NUM_CLASSES, hidden_size=256):
        super(CRNN, self).__init__()

        self.cnn = nn.Sequential(
            nn.Conv2d(1, 64, 3, 1, 1), nn.ReLU(), nn.MaxPool2d(2,2),   # 20x75
            nn.Conv2d(64,128,3,1,1), nn.ReLU(), nn.MaxPool2d(2,2),     # 10x37
            nn.Conv2d(128,256,3,1,1), nn.BatchNorm2d(256), nn.ReLU(),
            nn.Conv2d(256,256,3,1,1), nn.ReLU(), nn.MaxPool2d((2,2),(2,1),(0,1)), # 5x37
            nn.Conv2d(256,512,3,1,1), nn.BatchNorm2d(512), nn.ReLU(),
            nn.Conv2d(512,512,3,1,1), nn.ReLU(), nn.MaxPool2d((2,2),(2,1),(0,1)), # 2x37
            nn.Conv2d(512,512,2,1,0), nn.ReLU()  # 1x36
        )

        self.lstm = nn.LSTM(512, hidden_size, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_size*2, num_classes)

    def forward(self, x):
        conv = self.cnn(x)     # (B, C, H, W)
        b,c,h,w = conv.size()
        conv = conv.squeeze(2).permute(0,2,1)  # (B, W, C)
        rnn_out,_ = self.lstm(conv)
        out = self.fc(rnn_out)  # (B, W, num_classes)
        return out.permute(1,0,2) # (T,B,C) for CTC


In [41]:
# Create the model
model = CRNN(num_classes=NUM_CLASSES, hidden_size=256).to(device)

print("Model created:")
summary(model, (1, 1, 40, 150))  # include channel dimension: (batch, channels, H, W)


Model created:


Layer (type:depth-idx)                   Output Shape              Param #
CRNN                                     [38, 1, 63]               --
├─Sequential: 1-1                        [1, 512, 1, 38]           --
│    └─Conv2d: 2-1                       [1, 64, 40, 150]          640
│    └─ReLU: 2-2                         [1, 64, 40, 150]          --
│    └─MaxPool2d: 2-3                    [1, 64, 20, 75]           --
│    └─Conv2d: 2-4                       [1, 128, 20, 75]          73,856
│    └─ReLU: 2-5                         [1, 128, 20, 75]          --
│    └─MaxPool2d: 2-6                    [1, 128, 10, 37]          --
│    └─Conv2d: 2-7                       [1, 256, 10, 37]          295,168
│    └─BatchNorm2d: 2-8                  [1, 256, 10, 37]          512
│    └─ReLU: 2-9                         [1, 256, 10, 37]          --
│    └─Conv2d: 2-10                      [1, 256, 10, 37]          590,080
│    └─ReLU: 2-11                        [1, 256, 10, 37]          --

In [18]:
!pip install torchinfo


Collecting torchinfo
  Downloading torchinfo-1.8.0-py3-none-any.whl.metadata (21 kB)
Downloading torchinfo-1.8.0-py3-none-any.whl (23 kB)
Installing collected packages: torchinfo
Successfully installed torchinfo-1.8.0


In [38]:
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
model_size_mb = (total_params * 4) / (1024*1024)  # float32 = 4 bytes

print(f"📊 Total parameters: {total_params:,}")
print(f"✅ Trainable parameters: {trainable_params:,}")
print(f"💾 Model size (float32): {model_size_mb:.2f} MB")


📊 Total parameters: 7,141,147
✅ Trainable parameters: 7,141,147
💾 Model size (float32): 27.24 MB


In [42]:
from torch.optim.lr_scheduler import ReduceLROnPlateau
criterion = nn.CTCLoss(blank=0, zero_infinity=True)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)

In [44]:
import os

CKPT_DIR   = "/kaggle/working/checkpoints"
LATEST_CKPT = os.path.join(CKPT_DIR, "latest_checkpoint.pth")
BEST_CKPT   = os.path.join(CKPT_DIR, "best_model.pth")

os.makedirs(CKPT_DIR, exist_ok=True)

def save_checkpoint(state, path):
    torch.save(state, path)

def load_latest_if_available(model, optimizer, scheduler, history):
    start_epoch = 0
    best_val_seq_acc = 0.0
    best_epoch = 0

    if os.path.exists(LATEST_CKPT):
        print("🔄 Loading latest checkpoint...")
        ckpt = torch.load(LATEST_CKPT, map_location=device)
        model.load_state_dict(ckpt["model_state_dict"])
        optimizer.load_state_dict(ckpt["optimizer_state_dict"])
        scheduler.load_state_dict(ckpt["scheduler_state_dict"])
        start_epoch = ckpt["epoch"] + 1
        history.update(ckpt["history"])
        best_val_seq_acc = ckpt.get("best_val_seq_acc", 0.0)
        best_epoch = ckpt.get("best_epoch", 0)
        print(f"✅ Resumed from epoch {start_epoch}")
    return start_epoch, best_val_seq_acc, best_epoch


In [45]:
def train_crnn_with_ckpt(model, train_loader, val_loader, criterion, optimizer, scheduler, epochs):
    history = {
        "train_loss": [], "val_loss": [],
        "train_char_acc": [], "val_char_acc": [],
        "train_seq_acc": [],  "val_seq_acc": []
    }
    start_epoch, best_val_seq_acc, best_epoch = load_latest_if_available(model, optimizer, scheduler, history)

    for epoch in range(start_epoch, epochs):
        # ---- Train ----
        model.train()
        running_loss = 0.0
        running_char, running_seq, batches = 0, 0, 0

        pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs} [Train]")
        for imgs, labels, lengths in pbar:
            imgs = imgs.to(device)
            flat_labels = torch.cat([l for l in labels]).to(device)
            label_lengths = torch.tensor(lengths, dtype=torch.long)

            optimizer.zero_grad()
            outputs = model(imgs)  # (T,B,C)
            T,B,C = outputs.size()
            input_lengths = torch.full(size=(B,), fill_value=T, dtype=torch.long)

            loss = criterion(outputs, flat_labels, input_lengths, label_lengths)
            loss.backward(); optimizer.step()

            running_loss += loss.item()

            # Accuracy
            c_acc, seq_acc = calculate_accuracy(outputs, labels, lengths)
            running_char += c_acc; running_seq += seq_acc; batches += 1
            pbar.set_postfix(loss=f"{loss.item():.4f}")

        avg_train_loss = running_loss / len(train_loader)
        avg_train_char_acc = running_char / batches
        avg_train_seq_acc = running_seq / batches

        # ---- Validate ----
        model.eval()
        val_loss, val_char, val_seq, v_batches = 0,0,0,0
        with torch.no_grad():
            for imgs, labels, lengths in tqdm(val_loader, desc=f"Epoch {epoch+1}/{epochs} [Val]"):
                imgs = imgs.to(device)
                flat_labels = torch.cat([l for l in labels]).to(device)
                label_lengths = torch.tensor(lengths, dtype=torch.long)

                outputs = model(imgs)
                T,B,C = outputs.size()
                input_lengths = torch.full(size=(B,), fill_value=T, dtype=torch.long)

                loss = criterion(outputs, flat_labels, input_lengths, label_lengths)
                val_loss += loss.item()

                c_acc, seq_acc = calculate_accuracy(outputs, labels, lengths)
                val_char += c_acc; val_seq += seq_acc; v_batches += 1

        avg_val_loss = val_loss / len(val_loader)
        avg_val_char_acc = val_char / v_batches
        avg_val_seq_acc = val_seq / v_batches

        scheduler.step(avg_val_loss)

        # ---- Save history ----
        history["train_loss"].append(avg_train_loss)
        history["val_loss"].append(avg_val_loss)
        history["train_char_acc"].append(avg_train_char_acc)
        history["val_char_acc"].append(avg_val_char_acc)
        history["train_seq_acc"].append(avg_train_seq_acc)
        history["val_seq_acc"].append(avg_val_seq_acc)

        print(f"\nEpoch {epoch+1}/{epochs}")
        print(f"Train Loss: {avg_train_loss:.4f} | Train Char Acc: {avg_train_char_acc*100:.2f}% | Train Seq Acc: {avg_train_seq_acc*100:.2f}%")
        print(f"Val   Loss: {avg_val_loss:.4f} | Val Char Acc: {avg_val_char_acc*100:.2f}% | Val Seq Acc: {avg_val_seq_acc*100:.2f}%")
        print(f"LR: {optimizer.param_groups[0]['lr']:.6f}")

        # ---- Save latest ----
        latest_state = {
            "epoch": epoch,
            "model_state_dict": model.state_dict(),
            "optimizer_state_dict": optimizer.state_dict(),
            "scheduler_state_dict": scheduler.state_dict(),
            "history": history,
            "best_val_seq_acc": best_val_seq_acc,
            "best_epoch": best_epoch,
        }
        save_checkpoint(latest_state, LATEST_CKPT)

        # ---- Save epoch checkpoint ----
        epoch_ckpt_path = os.path.join(CKPT_DIR, f"epoch_{epoch+1:03d}.pth")
        save_checkpoint(latest_state, epoch_ckpt_path)
        print(f"📁 Checkpoint saved: {epoch_ckpt_path}")

        # ---- Save best (by full-sequence acc) ----
        if avg_val_seq_acc > best_val_seq_acc:
            best_val_seq_acc = avg_val_seq_acc
            best_epoch = epoch + 1
            best_state = {
                "epoch": best_epoch,
                "model_state_dict": model.state_dict(),
                "val_char_acc": avg_val_char_acc,
                "val_seq_acc": avg_val_seq_acc,
                "config": {"num_classes": NUM_CLASSES}
            }
            save_checkpoint(best_state, BEST_CKPT)
            print(f"🏆 New BEST model saved (Seq Acc: {avg_val_seq_acc*100:.2f}% at epoch {best_epoch})")

        print("-"*60)

    # ---- Save final model ----
    final_path = os.path.join(CKPT_DIR, "final_model.pth")
    torch.save({
        "epoch": epochs,
        "model_state_dict": model.state_dict(),
        "final_history": history
    }, final_path)
    print(f"🎉 Training completed.")
    print(f"🏆 Best full-sequence val accuracy: {best_val_seq_acc*100:.2f}% at epoch {best_epoch}")

    return history


In [46]:
history = train_crnn_with_ckpt(
    model, train_loader, val_loader,
    criterion, optimizer, scheduler, EPOCHS
)

  label_lengths = torch.tensor(lengths, dtype=torch.long)
Epoch 1/10 [Train]:  18%|█▊        | 852/4627 [02:07<09:24,  6.68it/s, loss=3.6216]


KeyboardInterrupt: 