In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [3]:
# link to the Dataset -> https://www.kaggle.com/code/egregiouslytalented/crackcaptchanetonjasondataset?scriptVersionId=239239503&cellId=5

In [4]:
!pip install --upgrade torch-summary

Collecting torch-summary
  Downloading torch_summary-1.4.5-py3-none-any.whl.metadata (18 kB)
Downloading torch_summary-1.4.5-py3-none-any.whl (16 kB)
Installing collected packages: torch-summary
Successfully installed torch-summary-1.4.5


In [5]:
import torch
from torch import nn
import torch.nn.functional as F
from torchsummary import summary
from tqdm import tqdm

In [29]:
DATA_DIR='/kaggle/input/comprasnet-captchas/comprasnet_imagensacerto'
#Model Hyperparamters
BATCH_SIZE=128
VAL_SPLIT=0.05
#CRNN
CRNN_KERNEL=5
CRNN_POOL_KERNEL=2
CRNN_DROPOUT=0.3
CRNN_LATENT=128
LSTM_HIDDEN_DIM=32
VOCAB_SIZE=26*2+10
OUTPUT_LENGTH=6
#AFFN
AFFN_KERNEL=5
AFFN_STRIDE=1
AFFN_DEPTH=4

#CRNN
CRNN_KERNEL=5
CRNN_POOL_KERNEL=2
CRNN_DROPOUT=0.3
CRNN_LATENT=128
LSTM_HIDDEN_DIM=32
VOCAB_SIZE=26*2+10
OUTPUT_LENGTH=6


SAVE_EPOCH=10
VAL_EPOCH=1
EPOCHS=40

In [13]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [14]:
import os
import torch
import torch
from torch.nn.utils.rnn import pad_sequence
from torchvision import transforms
from torch.utils.data import DataLoader, random_split, Dataset
from PIL import Image
import string



def get_dataloaders(data_dir, batch_size, val_split, shuffle=True, num_workers=2):
    """
    Creates PyTorch dataloaders for training and validation with one-hot encoded labels.
    
    Parameters:
        data_dir (str): Path to the folder containing images.
        batch_size (int): Batch size for dataloaders.
        val_split (float): Fraction of data to use for validation.
        shuffle (bool): Whether to shuffle data.
        num_workers (int): Number of workers for dataloaders.

    Returns:
        train_loader, val_loader: DataLoaders for training and validation.
    """
    # Define the character set (vocabulary)
    characters = string.ascii_letters + string.digits  # Uppercase + lowercase + digits
    char_to_idx = {char: idx for idx, char in enumerate(characters)}
    vocab_size = len(characters)

    class CustomDataset(Dataset):
        def __init__(self, root_dir, transform=None):
            self.root_dir = root_dir
            self.transform = transform
            self.image_paths = []
            self.labels = []

            for f in os.listdir(root_dir):
                if f.endswith(('png', 'jpg', 'jpeg')):
                    label = os.path.splitext(f)[0]
                if len(label) == 6:
                    self.image_paths.append(os.path.join(root_dir, f))
                    self.labels.append(label)

        def __len__(self):
            return len(self.image_paths)
        
        def __getitem__(self, idx):
            img_path = self.image_paths[idx]
            image = Image.open(img_path).convert('RGB')
            label_str = self.labels[idx] 
            
            # Convert label string to one-hot encoded tensor
            label_indices = [char_to_idx[c] for c in label_str if c in char_to_idx]  # Map characters to indices
            label_tensor = torch.zeros(len(label_indices),dtype=torch.long)  # One-hot encoding tensor
            for i, index in enumerate(label_indices):
                label_tensor[i] = index  # Set one-hot encoding
            
            if self.transform:
                image = self.transform(image)
            
            return image, label_tensor
    
    
    transform = transforms.Compose([
        transforms.Resize((40, 150)),  # Resize to a fixed size
        transforms.ToTensor(),
        transforms.Grayscale(),
    ]) 
    dataset = CustomDataset(root_dir=data_dir, transform=transform)
    
    # Compute train-validation split
    total_size = len(dataset)
    val_size = int(total_size * val_split)
    train_size = total_size - val_size
    train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers,)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers,)
    
    return train_loader, val_loader

In [15]:
train_loader,val_loader=get_dataloaders(DATA_DIR, batch_size=BATCH_SIZE, val_split=VAL_SPLIT, shuffle=True)

In [16]:
for X,y in train_loader:
    print(X.shape)
    print(y.shape)
    break

torch.Size([128, 1, 40, 150])
torch.Size([128, 6])


In [17]:
class Encoder(nn.Sequential):
    def __init__(self,n,kernel_size,stride):
        super().__init__(
            nn.Conv2d(in_channels=4**(n-1),out_channels=4**n,kernel_size=kernel_size,stride=stride),
            nn.BatchNorm2d(num_features=4**n),
            nn.ReLU(inplace=False)
        )

In [18]:
class Decoder(nn.Sequential):
    def __init__(self,n,kernel_size,stride):
        super().__init__(
            nn.ConvTranspose2d(in_channels=4**n,out_channels=4**(n-1),kernel_size=kernel_size,stride=stride),
            nn.BatchNorm2d(num_features=4**(n-1)),
            nn.ReLU(inplace=False)
        )

In [20]:
class AFFN(nn.Module):
    def __init__(self,n):
        super().__init__()
        self.n=n
        # self.test=nn.Linear(1,2)
        self.alpha=nn.Parameter(torch.randn(n-1).to(device)).to(device)
        self.encoders=[]
        for i in range(1,n+1):
            self.encoders.append(Encoder(i,AFFN_KERNEL,AFFN_STRIDE).to(device))

        self.decoders=[]
        for i in range(n,0,-1):
            self.decoders.append(Decoder(i,AFFN_KERNEL,AFFN_STRIDE).to(device))
            
    def forward(self, x):
        residuals = []
        for i, enc in enumerate(self.encoders):
            x = enc(x)
            if i < self.n - 1:
                x = x * (1 - self.alpha[i])  
                residuals.append(x * self.alpha[i])
    
        for i, dec in enumerate(self.decoders):
            x = dec(x)
            if i < self.n - 1:
                x = x + residuals.pop()
    
        return x

In [21]:
class CRNN(nn.Module):
    def __init__(self, in_channels, kernel_size, pool_kernel_size, dropout, latent_dim, lstm_hidden_dim, vocab_size, output_length=5):
        super().__init__()
        self.lstm_hidden_dim = lstm_hidden_dim
        self.output_length = output_length
        self.vocab_size = vocab_size
        
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=in_channels*2, kernel_size=kernel_size, padding=2),
            nn.BatchNorm2d(num_features=in_channels*2),
            nn.ReLU(inplace=False),
            nn.MaxPool2d(kernel_size=pool_kernel_size)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=in_channels*2, out_channels=in_channels*4, kernel_size=kernel_size, padding=2),
            nn.BatchNorm2d(num_features=in_channels*4),
            nn.ReLU(inplace=False),
            nn.MaxPool2d(kernel_size=pool_kernel_size)
        )
        self.flatten = nn.Flatten()
        self.dropout = nn.Dropout(dropout)
        self.latent_fc = nn.LazyLinear(latent_dim)
        self.lstm = nn.LSTM(input_size=latent_dim, hidden_size=lstm_hidden_dim, num_layers=1, batch_first=True)
        self.output_fc = nn.Linear(lstm_hidden_dim, vocab_size)
    
    def forward(self, x):
        batch_size = x.size(0)
        
        conv1_out = self.conv1(x)
        conv2_out = self.conv2(conv1_out)
        flattened = self.flatten(conv2_out)
        dropped = self.dropout(flattened)
        latent = self.latent_fc(dropped)
        
        lstm_input = latent.unsqueeze(1)  # Shape: (batch_size, 1, latent_dim)
        
        h0 = torch.zeros(1, batch_size, self.lstm_hidden_dim, device=x.device)
        c0 = torch.zeros(1, batch_size, self.lstm_hidden_dim, device=x.device)
        
        outputs = []
        
        for _ in range(self.output_length):
            out, (h0, c0) = self.lstm(lstm_input, (h0, c0))  # out shape: (batch_size, 1, lstm_hidden_dim)
            logits = self.output_fc(out.squeeze(1))  # Shape: (batch_size, vocab_size)
            outputs.append(logits)
            
        outputs = torch.stack(outputs, dim=1)  # Shape: (batch_size, 5, vocab_size)
        
        return outputs
        
        


In [22]:
class CaptchaCrackNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.affn=AFFN(AFFN_DEPTH).to(device)

        self.conv1=nn.Sequential(
            nn.Conv2d(in_channels=1,out_channels=32,kernel_size=5,padding=2),
            nn.ReLU(inplace=False),
            nn.MaxPool2d(kernel_size=2)
        )

        self.conv2=nn.Sequential(
                    nn.Conv2d(in_channels=32,out_channels=48,kernel_size=5,padding=2),
                    nn.ReLU(inplace=False),
                    nn.MaxPool2d(kernel_size=2)
                )

        self.conv3=nn.Sequential(
            nn.Conv2d(in_channels=48,out_channels=64,kernel_size=5,padding=2),
            nn.ReLU(inplace=False),
            nn.MaxPool2d(kernel_size=2)
        )

        self.res=nn.Conv2d(in_channels=1, out_channels=32, kernel_size=5, stride=2, padding=2)

        self.crnn=CRNN(64,CRNN_KERNEL,CRNN_POOL_KERNEL,CRNN_DROPOUT,CRNN_LATENT,LSTM_HIDDEN_DIM,VOCAB_SIZE,OUTPUT_LENGTH).to(device)

    def forward(self,x):
        affn_out=self.affn(x)
        res_out=self.res(x)
        conv1_out=self.conv1(affn_out)
        conv2_out=self.conv2(conv1_out+res_out)
        conv3_out=self.conv3(conv2_out)
        output=self.crnn(conv3_out)
        return output

In [23]:
def loss_fn(preds,target):
    ce_loss=F.cross_entropy(preds,target)
    
    return ce_loss

def asr(preds,target):
    preds_argmax=torch.argmax(preds,dim=-1)
    asr=(preds_argmax==target).sum(dim=-1)/preds.shape[1]
    return asr.mean()


def train(model,train_loader,val_loader,optimizer,loss_fn,epochs):
    train_history=[]
    val_history=[]
    asr_history=[]
    model.to(device)
    for epoch in range(1,epochs+1):
        print(f"Epoch {epoch}:")
        model.train()
        avg_loss=0
        for batch_num,(X,y) in enumerate(tqdm(train_loader,desc="Progress: ")):
            X=X.to(device)
            y=y.to(device)
            optimizer.zero_grad()
            preds=model(X)
            
            loss=loss_fn(preds.view(-1, VOCAB_SIZE),y.view(-1))
            loss.backward()
            optimizer.step()
            
            avg_loss+=loss.item()
        avg_loss/=len(train_loader)
        train_history.append(avg_loss)
        print(f"Loss: {avg_loss}")

        eval_loss=0
        asr_avg=0
        if VAL_EPOCH and epoch%VAL_EPOCH==0:
            model.eval()
            with torch.no_grad():
                for batch_num,(X,y) in enumerate(tqdm(val_loader,desc="Progress: ")):
                    X=X.to(device)
                    y=y.to(device)
                    preds=model(X)
                    loss=loss_fn(preds.view(-1, VOCAB_SIZE),y.view(-1))
        
                    eval_loss+=loss.item()
                    
                    asr_val=asr(preds,y)
                    asr_avg+=asr_val.item()
                    
                eval_loss/=len(val_loader)
                asr_avg/=len(val_loader)
                asr_history.append(asr_avg)
                val_history.append(eval_loss)
                print(f"Val Loss: {eval_loss}",end=' ')
                print(f"Val ASR: {asr_avg}")

        if SAVE_EPOCH and epoch%SAVE_EPOCH==0:
            print("Saving model")
            path=str(epoch)+'.pth'
            torch.save(model.state_dict(), path)
    torch.save(model.state_dict(),'final.pth')
    return train_history,val_history,asr_history

In [24]:
model=CaptchaCrackNet().to(device)
optimizer=torch.optim.Adam(model.parameters())

In [None]:
train_history,val_history,=train(model,train_loader,val_loader,optimizer,nn.CrossEntropyLoss(),EPOCHS)

In [None]:
import matplotlib.pyplot as plt
plt.plot(train_history,label='train')
plt.plot(val_history,label='val')
plt.title("Loss")
plt.legend()
plt.show()

In [None]:
plt.plot(asr_history)
plt.title("Validation ASR")
plt.show()

In [None]:
characters = string.ascii_letters + string.digits  # Uppercase + lowercase + digits
idx_to_char = {idx: char for idx, char in enumerate(characters)}
def to_text(arr):
    ans=''
    for c in arr:
        ans=ans+idx_to_char[c.item()]
    return ans

In [None]:
import matplotlib.pyplot as plt
with torch.no_grad():
    for X,y in train_loader:
        X=X[0]
        
        plt.imshow(X.numpy().transpose(1,2,0))
        plt.show()
        output=model(X.unsqueeze(0).to(device))
        X1=model.affn(X.unsqueeze(0).to(device))
        plt.imshow(X1[0].cpu().numpy().transpose(1,2,0))
        plt.show()
        print(output.shape)
        print(to_text(output.squeeze(0).argmax(axis=1)))
        break