Mounting Google Drive

In [1]:
import os, sys
from pathlib import Path

# Detect Colab
IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=True)
    BASE_DIR = Path("/content/drive/Othercomputers/My Laptop/UTN/Computer Vision/Captcha-Cracker")
else:
    BASE_DIR = Path.cwd() / "Captcha-Cracker"
    BASE_DIR.mkdir(parents=True, exist_ok=True)

print("Project base dir:", BASE_DIR)


Mounted at /content/drive
Project base dir: /content/drive/Othercomputers/My Laptop/UTN/Computer Vision/Captcha-Cracker


In [2]:
!cp "/content/drive/Othercomputers/My Laptop/UTN/Computer Vision/Captcha-Cracker/Dataset/UTN-CV25-Captcha-Dataset.zip" /content/

In [3]:
import zipfile
from tqdm import tqdm

zip_path = "/content/UTN-CV25-Captcha-Dataset.zip"
dst_dir = "/content/data/"

with zipfile.ZipFile(zip_path, 'r') as zf:
    files = zf.namelist()
    for file in tqdm(files, desc="Unzipping", unit="files"):
        zf.extract(file, dst_dir)


Unzipping: 100%|██████████| 300030/300030 [00:38<00:00, 7695.70files/s]


In [4]:
DATASET_ROOT = Path("/content/data/part2")
OUTPUT_DIR = Path("/content/drive/Othercomputers/My Laptop/UTN/Computer Vision/Captcha-Cracker/outputs")


Imports & Utils

In [5]:
import numpy as np, random, json
import torch, torch.nn as nn, torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T, torchvision.models as models
from PIL import Image
import matplotlib.pyplot as plt

# Reproducibility
SEED = 1337
random.seed(SEED); np.random.seed(SEED)
torch.manual_seed(SEED); torch.cuda.manual_seed_all(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# Charset
CHARS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
NUM_CLASSES = len(CHARS) + 1  # +1 for CTC blank
BLANK = 0
char2idx = {c:i+1 for i,c in enumerate(CHARS)}
idx2char = {i+1:c for i,c in enumerate(CHARS)}

def text_to_indices(s): return [char2idx[c] for c in s.upper() if c in char2idx]
def indices_to_text(seq): return ''.join(idx2char.get(i,'') for i in seq if i!=BLANK)

def levenshtein(a,b):
    m,n=len(a),len(b); dp=list(range(n+1))
    for i,ca in enumerate(a,1):
        prev,dp[0]=dp[0],i
        for j,cb in enumerate(b,1):
            cost=0 if ca==cb else 1
            prev,dp[j]=dp[j],min(dp[j]+1,dp[j-1]+1,prev+cost)
    return dp[n]

def ler(preds,gts):
    return np.mean([levenshtein(g,p)/max(1,len(g)) for p,g in zip(preds,gts)])


Device: cuda


Load Dataset

In [6]:
PART2_DIR = DATASET_ROOT # Update this line with the correct path to your "part2" directory
TRAIN_JSON = PART2_DIR / "train" / "labels.json"
VAL_JSON   = PART2_DIR / "val" / "labels.json"
TEST_DIR   = PART2_DIR / "test" / "images"

def load_json(path): return json.load(open(path,'r',encoding='utf-8'))

train_ann = load_json(TRAIN_JSON)
val_ann   = load_json(VAL_JSON)
print("Train size:",len(train_ann),"Val size:",len(val_ann))

class CaptchaDataset(Dataset):
    def __init__(self, ann, root, has_labels=True):
        self.ann=ann; self.root=root; self.has_labels=has_labels
        self.tf = T.Compose([
            # T.Grayscale(), # Removed to match ResNet's expected 3 input channels
            T.Resize((160,640)),
            T.ToTensor(), T.Normalize((0.5,),(0.5,))
        ])
    def __len__(self): return len(self.ann)
    def __getitem__(self,idx):
        r=self.ann[idx]
        img=Image.open(self.root/"images"/(r["image_id"]+".png")).convert("RGB") # Ensure image is RGB (3 channels)
        x=self.tf(img)
        y=torch.tensor(text_to_indices(r["captcha_string"]),dtype=torch.long) if self.has_labels else torch.tensor([])
        return x,y,r["image_id"]

def collate_fn(batch):
    xs,ys,ids=zip(*batch)
    flat=torch.cat([y for y in ys]) if ys[0].numel() else torch.tensor([],dtype=torch.long)
    y_lens=torch.tensor([len(y) for y in ys],dtype=torch.long)
    return torch.stack(xs), flat, y_lens, list(ids)

train_ds=CaptchaDataset(train_ann,PART2_DIR/"train",has_labels=True)
val_ds=CaptchaDataset(val_ann,PART2_DIR/"val",has_labels=True)

Train size: 60000 Val size: 20000


Model (ResNet50 + LSTM + CTC)

In [7]:
class ResNetSeq(nn.Module):
    def __init__(self,num_classes):
        super().__init__()
        base=models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
        layers=list(base.children())[:-2]   # drop FC & avgpool
        self.backbone=nn.Sequential(*layers)
        self.rnn=nn.LSTM(2048,256,2,bidirectional=True)
        self.fc=nn.Linear(512,num_classes)
    def forward(self,x):
        f=self.backbone(x)         # [B,2048,H,W]
        f=f.mean(2)                # pool H → [B,2048,W]
        f=f.permute(2,0,1)         # [W,B,2048]
        seq,_=self.rnn(f)          # [W,B,512]
        out=self.fc(seq)           # [W,B,C]
        return F.log_softmax(out,dim=-1), torch.full((x.size(0),),seq.size(0),dtype=torch.long)

def ctc_decode(logp):
    _,max_idx=logp.max(-1)
    max_idx=max_idx.transpose(0,1).cpu().tolist()
    out=[]
    for seq in max_idx:
        prev=None; arr=[]
        for t in seq:
            if t!=BLANK and t!=prev: arr.append(t)
            prev=t
        out.append(arr)
    return out

def idxseqs_to_texts(seqs): return [''.join(idx2char.get(i,'') for i in s) for s in seqs]


Training & Evaluation

In [14]:
BATCH_SIZE=64; EPOCHS=20; LR=1e-3

train_loader=DataLoader(train_ds,batch_size=BATCH_SIZE,shuffle=True,num_workers=30,pin_memory=True,prefetch_factor=2,collate_fn=collate_fn)
val_loader=DataLoader(val_ds,batch_size=BATCH_SIZE,shuffle=False,num_workers=30,pin_memory=True,collate_fn=collate_fn)

model=ResNetSeq(NUM_CLASSES).to(device)
criterion=nn.CTCLoss(blank=BLANK,zero_infinity=True)
opt=torch.optim.Adam(model.parameters(),lr=LR)
sch=torch.optim.lr_scheduler.CosineAnnealingLR(opt,T_max=EPOCHS,eta_min=1e-5)

def evaluate(loader):
    model.eval(); preds_all=[]; gts_all=[]; losses=[]
    with torch.no_grad():
        for X,flat,y_lens,ids in loader:
            X=X.to(device); flat=flat.to(device)
            logp,out_lens=model(X)
            loss=criterion(logp,flat,out_lens,y_lens); losses.append(loss.item())
            seqs=ctc_decode(logp); preds=idxseqs_to_texts(seqs)
            # unpack GTs
            off=0; gts=[]
            for L in y_lens.tolist():
                gts.append(''.join(idx2char.get(i,'') for i in flat[off:off+L].cpu().tolist()))
                off+=L
            preds_all.extend(preds); gts_all.extend(gts)
    return np.mean(losses), ler(preds_all,gts_all)




In [9]:
CHECKPOINT_DIR = OUTPUT_DIR / "checkpoints"
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)

LAST_CKPT = CHECKPOINT_DIR / "last.pt"
BEST_CKPT = CHECKPOINT_DIR / "best.pt"


In [10]:
def save_checkpoint(epoch, model, optimizer, scheduler, best=False):
    state = {
        "epoch": epoch,
        "model": model.state_dict(),
        "optimizer": optimizer.state_dict(),
        "scheduler": scheduler.state_dict(),
    }
    path = BEST_CKPT if best else LAST_CKPT
    torch.save(state, path)
    print(f"Saved checkpoint: {path.name} (epoch {epoch+1})")


In [11]:
def load_checkpoint(model, optimizer, scheduler):
    if LAST_CKPT.exists():
        ckpt = torch.load(LAST_CKPT, map_location=device)
        model.load_state_dict(ckpt["model"])
        optimizer.load_state_dict(ckpt["optimizer"])
        scheduler.load_state_dict(ckpt["scheduler"])
        start_epoch = ckpt["epoch"] + 1
        print(f"Resumed from {LAST_CKPT.name} (epoch {start_epoch})")
        return start_epoch
    print("No checkpoint found, starting from scratch.")
    return 0


Training

In [None]:
start_epoch = load_checkpoint(model, opt, sch)
best_ler = 1e9

for epoch in range(start_epoch, EPOCHS):
    # ----- Train -----
    model.train(); losses=[]
    for X, flat, y_lens, ids in train_loader:
        X, flat = X.to(device), flat.to(device)
        opt.zero_grad()
        logp, out_lens = model(X)
        loss = criterion(logp, flat, out_lens, y_lens)
        loss.backward(); opt.step()
        losses.append(loss.item())
    tr_loss = np.mean(losses)

    # ----- Validate -----
    val_loss, val_ler = evaluate(val_loader)
    sch.step()

    print(f"Epoch {epoch+1}/{EPOCHS} tr_loss={tr_loss:.4f} "
          f"val_loss={val_loss:.4f} val_LER={val_ler:.4f}")

    # Save last checkpoint
    save_checkpoint(epoch, model, opt, sch, best=False)

    # Save best model
    if val_ler < best_ler:
        best_ler = val_ler
        save_checkpoint(epoch, model, opt, sch, best=True)
        print(f"New best LER: {best_ler:.4f}")


Resumed from last.pt (epoch 10)
Epoch 11/20 tr_loss=0.0072 val_loss=0.7775 val_LER=0.1386
Saved checkpoint: last.pt (epoch 11)
Saved checkpoint: best.pt (epoch 11)
New best LER: 0.1386
Epoch 12/20 tr_loss=0.0072 val_loss=0.8196 val_LER=0.1413
Saved checkpoint: last.pt (epoch 12)
Epoch 13/20 tr_loss=0.0093 val_loss=0.8234 val_LER=0.1425
Saved checkpoint: last.pt (epoch 13)
Epoch 14/20 tr_loss=0.0196 val_loss=0.7747 val_LER=0.1370
Saved checkpoint: last.pt (epoch 14)
Saved checkpoint: best.pt (epoch 14)
New best LER: 0.1370


Inference Test set

In [None]:
test_img_dir=TEST_DIR
if test_img_dir.exists():
    model.load_state_dict(torch.load(BASE_DIR/"best_resnet50.pt",map_location=device))
    model.eval()
    test_files=sorted(list(test_img_dir.glob("*.png")))
    preds=[]
    with torch.no_grad():
        for f in test_files:
            img=Image.open(f).convert("RGB")
            x=T.Compose([T.Grayscale(),T.Resize((160,640)),T.ToTensor(),T.Normalize((0.5,),(0.5,))])(img).unsqueeze(0).to(device)
            logp,_=model(x)
            seqs=ctc_decode(logp); txt=idxseqs_to_texts(seqs)[0]
            preds.append({"height":160,"width":640,"image_id":f.stem,"captcha_string":txt,"annotations":[]})
    out_path=BASE_DIR/"predictions.json"
    json.dump(preds,open(out_path,"w"),indent=2)
    print("Wrote predictions to",out_path)
else:
    print("No test images found.")


In [19]:
from google.colab import files
files.download("/content/best_resnet50.pt")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [20]:
import os

# Number of logical CPU cores available
print("CPU cores:", os.cpu_count())


CPU cores: 2


In [15]:
!nproc


12
