In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.models import resnet18, ResNet18_Weights
import cv2
import numpy as np
import json
import os
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")

In [2]:
class HandwritingDataset(Dataset):
    def __init__(self, image_dir, annotations_file, max_length=128):
        self.image_dir = image_dir
        self.max_length = max_length

        with open(annotations_file, 'r') as f:
            self.annotations = json.load(f)

        self.data = [
            item for item in self.annotations 
            if item['status'] == 'success' and len(item['text'].strip()) > 0
        ]

        self.create_char_mappings()
        
        self.transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]
            )
        ])

    def create_char_mappings(self):
        all_chars = set()
        for item in self.data:
            all_chars.update(item['text'])
            

        self.char_to_idx = {char: idx + 1 for idx, char in enumerate(sorted(all_chars))}
        self.char_to_idx['<pad>'] = 0
        self.idx_to_char = {idx: char for char, idx in self.char_to_idx.items()}
        self.vocab_size = len(self.char_to_idx)
        
        print(f"Vocabulary size: {self.vocab_size}")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        
        # Load image
        image_path = os.path.join(
            self.image_dir,
            f"{os.path.splitext(item['filename'])[0]}_binary_adaptive.png"
        )
        image = cv2.imread(image_path)
        
        if image is None:
            raise ValueError(f"Could not load image: {image_path}")
            
        if len(image.shape) == 2:
            image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
            
   
        image = self.transform(image)
        text = item['text'][:self.max_length]
        text_indices = [self.char_to_idx[c] for c in text]
        

        if len(text_indices) < self.max_length:
            text_indices.extend([0] * (self.max_length - len(text_indices)))
            
        return {
            'image': image,
            'text': torch.tensor(text_indices, dtype=torch.long),
            'length': len(text)
        }

In [3]:
class HandwritingRecognitionModel(nn.Module):
    def __init__(self, vocab_size, hidden_size=256, sequence_length=128):
        super().__init__()
        self.sequence_length = sequence_length

        self.cnn = resnet18(weights=ResNet18_Weights.DEFAULT)
        self.cnn.fc = nn.Identity()
        
        self.feature_processor = nn.Sequential(
            nn.Linear(512, hidden_size),
            nn.ReLU(),
            nn.Dropout(0.5)
        )
        
        self.rnn = nn.LSTM(
            input_size=hidden_size,
            hidden_size=hidden_size,
            num_layers=2,
            bidirectional=True,
            batch_first=True
        )
        

        self.fc = nn.Linear(hidden_size * 2, vocab_size)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        batch_size = x.size(0)
        
        features = self.cnn(x)
        

        features = self.feature_processor(features)
        

        features = features.unsqueeze(1).repeat(1, self.sequence_length, 1)

        rnn_out, _ = self.rnn(features)
        
        logits = self.fc(rnn_out)
        
        return logits

In [4]:
def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    
    with tqdm(train_loader, desc='Training') as pbar:
        for batch in pbar:
            images = batch['image'].to(device)
            texts = batch['text'].to(device)   
            
            optimizer.zero_grad()
            
            # Forward pass
            logits = model(images)
            
            # Reshape for loss calculation
            B, S, V = logits.shape
            loss = criterion(
                logits.view(B * S, V),
                texts.view(-1)              )
            
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            pbar.set_postfix({'loss': loss.item()})
            
    return total_loss / len(train_loader)

def validate(model, val_loader, criterion, device):
    model.eval()
    total_loss = 0
    
    with torch.no_grad():
        for batch in val_loader:
            images = batch['image'].to(device)
            texts = batch['text'].to(device)
            
            logits = model(images)
            B, S, V = logits.shape
            loss = criterion(
                logits.view(B * S, V),
                texts.view(-1)
            )
            total_loss += loss.item()
            
    return total_loss / len(val_loader)

def decode_prediction(logits, idx_to_char):
    """Decode model predictions to text"""
    predictions = torch.argmax(logits, dim=-1)
    decoded_texts = []
    
    for pred in predictions:
        text = ''.join([idx_to_char[idx.item()] for idx in pred if idx.item() != 0])
        decoded_texts.append(text)
    
    return decoded_texts

def main():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    data_dir = "output"
    image_dir = os.path.join(data_dir, "processed_images")
    annotations_file = os.path.join(data_dir, "ocr/detailed_results.json")
    
    dataset = HandwritingDataset(image_dir, annotations_file)
    print(f"Total samples: {len(dataset)}")
    
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(
        dataset, [train_size, val_size]
    )
    
    train_loader = DataLoader(
        train_dataset,
        batch_size=16,
        shuffle=True,
        num_workers=0
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=16,
        shuffle=False,
        num_workers=0
    )
    
    model = HandwritingRecognitionModel(
        vocab_size=dataset.vocab_size,
        sequence_length=dataset.max_length
    ).to(device)
    
    criterion = nn.CrossEntropyLoss(ignore_index=0)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.5, patience=2, verbose=True
    )
    
    num_epochs = 10
    best_val_loss = float('inf')
    
    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        
        train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
        
        val_loss = validate(model, val_loader, criterion, device)
        
        print(f"Train Loss: {train_loss:.4f}")
        print(f"Val Loss: {val_loss:.4f}")
        
        scheduler.step(val_loss)
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'val_loss': val_loss,
                'char_to_idx': dataset.char_to_idx,
                'idx_to_char': dataset.idx_to_char
            }, 'best_model.pth')
            print("Saved best model")
            
            with torch.no_grad():
                model.eval()
                batch = next(iter(val_loader))
                images = batch['image'].to(device)
                logits = model(images)
                predictions = decode_prediction(logits, dataset.idx_to_char)
                print("\nSample Predictions:")
                for i, pred in enumerate(predictions[:3]):  # Show first 3 predictions
                    print(f"Predicted: {pred}")

In [None]:
if __name__ == "__main__":
    main()

Using device: cuda
Vocabulary size: 112
Total samples: 427

Epoch 1/10


Training:  41%|████      | 9/22 [00:13<00:18,  1.46s/it, loss=2.75]


In [None]:
import torch
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
from sklearn.metrics import confusion_matrix
import Levenshtein
import json
from collections import defaultdict
import os

class ModelAnalyzer:
    def __init__(self, model_path, test_loader, device, idx_to_char):
        """
        Initialize the analyzer with model and data
        """
        self.checkpoint = torch.load(model_path)
        self.model = HandwritingRecognitionModel(
            vocab_size=len(idx_to_char),
            sequence_length=128
        ).to(device)
        self.model.load_state_dict(self.checkpoint['model_state_dict'])
        self.model.eval()
        
        self.test_loader = test_loader
        self.device = device
        self.idx_to_char = idx_to_char
        
    def decode_text(self, tensor):
        """Convert tensor predictions to text"""
        indices = tensor.argmax(dim=-1)
        text = ''.join([self.idx_to_char[idx.item()] for idx in indices if idx.item() != 0])
        return text
        
    def calculate_text_metrics(self, pred_text, true_text):
        """Calculate various text similarity metrics"""
        return {
            'levenshtein_distance': Levenshtein.distance(pred_text, true_text),
            'similarity_ratio': Levenshtein.ratio(pred_text, true_text),
            'length_diff': abs(len(pred_text) - len(true_text)),
            'exact_match': pred_text == true_text
        }
        
    def analyze_model_performance(self):
        """Perform comprehensive model analysis"""
        results = []
        character_errors = defaultdict(int)
        total_chars = 0
        correct_chars = 0
        
        with torch.no_grad():
            for batch in tqdm(self.test_loader, desc="Analyzing performance"):
                images = batch['image'].to(self.device)
                true_texts = batch['text']
                
                logits = self.model(images)
                
                for i in range(len(images)):
                    pred_text = self.decode_text(logits[i])
                    true_text = ''.join([self.idx_to_char[idx.item()] 
                                       for idx in true_texts[i] if idx.item() != 0])
                    
                    metrics = self.calculate_text_metrics(pred_text, true_text)
                    
                    for pred_char, true_char in zip(pred_text, true_text):
                        total_chars += 1
                        if pred_char == true_char:
                            correct_chars += 1
                        else:
                            character_errors[f"{true_char}->{pred_char}"] += 1
                    
                    results.append({
                        'predicted_text': pred_text,
                        'true_text': true_text,
                        'text_length': len(true_text),
                        **metrics
                    })
        
        return pd.DataFrame(results), character_errors, correct_chars/total_chars

    def generate_analysis_report(self, save_dir='analysis_results'):
        """Generate comprehensive analysis report with visualizations"""
        os.makedirs(save_dir, exist_ok=True)
        
        print("Analyzing model performance...")
        results_df, char_errors, char_accuracy = self.analyze_model_performance()
        
        basic_stats = {
            'Total Samples': len(results_df),
            'Average Levenshtein Distance': results_df['levenshtein_distance'].mean(),
            'Average Similarity Ratio': results_df['similarity_ratio'].mean(),
            'Exact Match Rate': results_df['exact_match'].mean(),
            'Character Accuracy': char_accuracy
        }
        
        with open(os.path.join(save_dir, 'basic_stats.json'), 'w') as f:
            json.dump(basic_stats, f, indent=4)
        
        plt.figure(figsize=(15, 10))
        
        plt.subplot(2, 2, 1)
        sns.histplot(data=results_df, x='levenshtein_distance', bins=30)
        plt.title('Distribution of Levenshtein Distances')
        
        plt.subplot(2, 2, 2)
        sns.scatterplot(data=results_df, x='text_length', y='similarity_ratio')
        plt.title('Text Length vs Similarity Ratio')
        
        plt.subplot(2, 2, 3)
        char_errors_df = pd.DataFrame(
            list(char_errors.items()),
            columns=['Error_Type', 'Count']
        ).sort_values('Count', ascending=False).head(10)
        
        sns.barplot(data=char_errors_df, x='Error_Type', y='Count')
        plt.xticks(rotation=45)
        plt.title('Top 10 Character Error Types')
        
        plt.subplot(2, 2, 4)
        sns.histplot(data=results_df, x='length_diff', bins=20)
        plt.title('Distribution of Length Differences')
        
        plt.tight_layout()
        plt.savefig(os.path.join(save_dir, 'analysis_plots.png'))
        plt.close()
        
        error_analysis = results_df[results_df['exact_match'] == False].copy()
        error_analysis['error_type'] = error_analysis.apply(
            lambda x: self.categorize_error(x['predicted_text'], x['true_text']),
            axis=1
        )
        
        error_summary = error_analysis['error_type'].value_counts()
        error_summary.to_csv(os.path.join(save_dir, 'error_types.csv'))
        
        examples = results_df.sort_values('similarity_ratio')
        worst_examples = examples.head(10)
        best_examples = examples.tail(10)
        
        examples_report = {
            'worst_cases': worst_examples.to_dict('records'),
            'best_cases': best_examples.to_dict('records')
        }
        
        with open(os.path.join(save_dir, 'example_cases.json'), 'w') as f:
            json.dump(examples_report, f, indent=4)
        
        return basic_stats, results_df
    
    def categorize_error(self, pred, true):
        """Categorize the type of error"""
        if len(pred) > len(true):
            return 'insertion'
        elif len(pred) < len(true):
            return 'deletion'
        else:
            return 'substitution'

def analyze_model_results():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model_path = 'best_model.pth'
    
    test_dataset = HandwritingDataset(
        image_dir="output/processed_images",
        annotations_file="output/ocr/detailed_results.json"
    )
    
    test_loader = DataLoader(
        test_dataset,
        batch_size=16,
        shuffle=False,
        num_workers=0
    )
    
    analyzer = ModelAnalyzer(
        model_path=model_path,
        test_loader=test_loader,
        device=device,
        idx_to_char=test_dataset.idx_to_char
    )
    
    print("Generating analysis report...")
    stats, results = analyzer.generate_analysis_report()
    
    print("\nModel Performance Summary:")
    print("-" * 50)
    for key, value in stats.items():
        print(f"{key}: {value:.4f}")
    
    print("\nDetailed results saved in 'analysis_results' directory")

if __name__ == "__main__":
    analyze_model_results()