In [1]:
# combined.py
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from preprocess import preprocess_image  # Import from preprocess.py
from model import OCRModel  # Import from separate.py
from dataload import bangla_load, english_load

# Load Bangla and English data loaders
bangla_train_loader, bangla_val_loader, bangla_test_loader, bangla_num_classes = bangla_load
english_train_loader, english_val_loader, english_test_loader, english_num_classes = english_load

# Load pre-trained models with weights_only=True
bangla_model = OCRModel(bangla_num_classes)
english_model = OCRModel(english_num_classes)
bangla_model.load_state_dict(torch.load("saved_models/bangla_model.pth", weights_only=True)['model_state_dict'])
english_model.load_state_dict(torch.load("saved_models/english_model.pth", weights_only=True)['model_state_dict'])


Directory: Dataset/Bangla/Dataset/Train
Checked 12000 images in total.
Removed 0 unreadable images.
Removed 0 non-image files.
Total number of images: 12000
Total number of labels: 50

Directory: Dataset/Bangla/Dataset/Test
Checked 3000 images in total.
Removed 0 unreadable images.
Removed 0 non-image files.
Total number of images: 3000
Total number of labels: 50

Directory: Dataset/English/data/training_data
Checked 20628 images in total.
Removed 0 unreadable images.
Removed 0 non-image files.
Total number of images: 20628
Total number of labels: 36

Directory: Dataset/English/data/testing_data
Checked 1008 images in total.
Removed 0 unreadable images.
Removed 0 non-image files.
Total number of images: 1008
Total number of labels: 36


<All keys matched successfully>

In [2]:
# Freeze layers in both models up to the LSTM layers
for param in list(bangla_model.parameters())[:-4]:
    param.requires_grad = True
for param in list(english_model.parameters())[:-4]:
    param.requires_grad = True

In [3]:
from torch.utils.data import Dataset, DataLoader
import torch

class CustomDataset(Dataset):
    def __init__(self, data):
        # Data should be a list of tuples (features, labels)
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        features, labels = self.data[idx]
        return features, labels


class CombinedDataset(Dataset):
    def __init__(self, bangla_loader, english_loader):
        # Modify labels for both datasets to make them unique
        self.bangla_data = self.modify_label('b', bangla_loader)
        self.english_data = self.modify_label('e', english_loader)

        # Combine the datasets into a single dataset
        self.combined_data = self.bangla_data + self.english_data

        # Calculate unique labels
        self.unique_labels = self.calculate_unique_labels()
        print(f"Total number of unique labels: {len(self.unique_labels)}")

        # Create a label-to-index mapping
        self.label_mapping = self.create_label_mapping()

    def __len__(self):
        # Return the total length of the combined dataset
        return len(self.combined_data)

    def __getitem__(self, idx):
        # Fetch item from the combined dataset
        features, label = self.combined_data[idx]
        
        # Ensure features are tensors
        features = torch.tensor(features) if not isinstance(features, torch.Tensor) else features
        
        # Convert label to integer using the mapping
        label = self.label_mapping[label]
        label = torch.tensor(label)
        
        return features, label

    def modify_label(self, prefix, dataloader):
        # Create a new dataset with modified labels
        new_data = []
        for data, label in dataloader:
            # Add prefix to each label (e.g., 'b0', 'e1')
            modified_label = [f"{prefix}{lbl}" for lbl in label]
            # Append data and unique labels
            new_data.extend(zip(data, modified_label))
        return new_data
    
    def create_label_mapping(self):
        # Extract unique labels
        unique_labels = {item[1] for item in self.combined_data}
        # Create a mapping from label string to integer
        return {label: idx for idx, label in enumerate(unique_labels)}

    def calculate_unique_labels(self):
        # Extract unique labels from the combined data
        labels = [item[1] for item in self.combined_data]
        return set(labels)

    def get_num_classes(self):
        # Return the total number of unique classes
        return len(self.unique_labels)

# Combine datasets
combined_dataset = CombinedDataset(bangla_train_loader, english_train_loader)
combined_dataloader = DataLoader(combined_dataset, batch_size=32, shuffle=True)

# Combine datasets
combined_test_dataset = CombinedDataset(bangla_test_loader, english_test_loader)
combined_test_dataloader = DataLoader(combined_test_dataset, batch_size=32, shuffle=True)

# Get the number of classes
print(f"Number of classes: {combined_dataset.get_num_classes()}")


Total number of unique labels: 86
Total number of unique labels: 86
Number of classes: 86


In [4]:
from torch.utils.data import random_split, DataLoader

def split_dataloader(dataloader, train_ratio=0.8):
    
    # Access the dataset from the DataLoader
    dataset = dataloader.dataset
    batch_size = dataloader.batch_size

    # Calculate sizes for train and validation datasets
    total_size = len(dataset)
    train_size = int(total_size * train_ratio)
    val_size = total_size - train_size

    # Split the dataset
    train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

    # Create DataLoaders for the subsets
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader

combined_train_loader, combined_val_loader = split_dataloader(combined_dataloader, train_ratio=0.8)

# Check sizes of datasets
print(f"Training set size: {len(combined_train_loader.dataset)}")
print(f"Validation set size: {len(combined_val_loader.dataset)}")

Training set size: 24796
Validation set size: 6200


In [5]:
# Combined model
class CombinedModel(nn.Module):
    def __init__(self, bangla_model, english_model, combined_classes):
        super(CombinedModel, self).__init__()
        self.bangla_feature_extractor = nn.Sequential(*list(bangla_model.children())[:-2])  # Up to LSTM
        self.bangla_lstm = list(bangla_model.children())[-2]  # LSTM layer
        self.english_feature_extractor = nn.Sequential(*list(english_model.children())[:-2])  # Up to LSTM
        self.english_lstm = list(english_model.children())[-2]  # LSTM layer

        
        # Joint layers
        self.fc1 = nn.Linear(512, 128)
        self.fc2 = nn.Linear(128, combined_classes)

    def forward(self, image_x):
        # Bangla feature extraction
        bangla_features = self.bangla_feature_extractor(image_x)  # 4D output
        bangla_features = bangla_features.permute(0, 2, 3, 1).reshape(bangla_features.size(0), -1, 1024)
        bangla_features, _ = self.bangla_lstm(bangla_features)  # LSTM processing
        
        # English feature extraction
        english_features = self.english_feature_extractor(image_x)  # 4D output
        english_features = english_features.permute(0, 2, 3, 1).reshape(english_features.size(0), -1, 1024)
        english_features, _ = self.english_lstm(english_features)  # LSTM processing

        # Concatenate features
        combined_features = torch.cat((bangla_features[:, -1, :], english_features[:, -1, :]), dim=1)

        # Fully connected layers
        x = torch.relu(self.fc1(combined_features))
        x = self.fc2(x)
        return x

In [6]:
class OCRTrainer:
    def __init__(self, model, train_loader, val_loader, test_loader, lr=0.001):
        
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.test_loader = test_loader
        self.lr = lr

        # Determine device
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Assign model and move it to the device
        self.model = model.to(self.device)

        # Define loss and optimizer
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)

    def train_and_validate(self, epochs=10):
        
        history = {
            "train_loss": [],
            "train_accuracy": [],
            "val_loss": [],
            "val_accuracy": []
        }

        for epoch in range(epochs):
            # Training
            self.model.train()
            running_loss = 0.0
            correct_train = 0
            total_train = 0
            for images, labels in self.train_loader:
                images, labels = images.to(self.device), labels.to(self.device)
                self.optimizer.zero_grad()
                outputs = self.model(images)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()
                running_loss += loss.item()

                # Training accuracy
                _, predicted = torch.max(outputs, 1)
                total_train += labels.size(0)
                correct_train += (predicted == labels).sum().item()

            train_loss = running_loss / len(self.train_loader)
            train_accuracy = 100 * correct_train / total_train

            # Validation
            self.model.eval()
            val_loss = 0.0
            correct_val = 0
            total_val = 0
            with torch.no_grad():
                for images, labels in self.val_loader:
                    images, labels = images.to(self.device), labels.to(self.device)
                    outputs = self.model(images)
                    loss = self.criterion(outputs, labels)
                    val_loss += loss.item()

                    # Validation accuracy
                    _, predicted = torch.max(outputs, 1)
                    total_val += labels.size(0)
                    correct_val += (predicted == labels).sum().item()

            val_loss = val_loss / len(self.val_loader)
            val_accuracy = 100 * correct_val / total_val

            # Append metrics to history
            history["train_loss"].append(train_loss)
            history["train_accuracy"].append(train_accuracy)
            history["val_loss"].append(val_loss)
            history["val_accuracy"].append(val_accuracy)

            print(f"Epoch {epoch+1}: Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%, "
                  f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")

        return history

    def test(self):
        
        self.model.eval()
        test_loss = 0.0
        correct_test = 0
        total_test = 0
        with torch.no_grad():
            for images, labels in self.test_loader:
                images, labels = images.to(self.device), labels.to(self.device)
                outputs = self.model(images)
                loss = self.criterion(outputs, labels)
                test_loss += loss.item()

                # Test accuracy
                _, predicted = torch.max(outputs, 1)
                total_test += labels.size(0)
                correct_test += (predicted == labels).sum().item()

        test_loss = test_loss / len(self.test_loader)
        test_accuracy = 100 * correct_test / total_test
        print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%")
        return test_loss, test_accuracy


In [7]:
# Initialize combined model
combined_classes = combined_dataset.get_num_classes()
combined_model = CombinedModel(bangla_model, english_model, combined_classes)

# Define training settings for the combined model
combined_optimizer = optim.Adam(filter(lambda p: p.requires_grad, combined_model.parameters()), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [8]:
# Common data
lr = 0.0001
epochs = 50

# Initialize the trainer
combined_trainer = OCRTrainer(combined_model, combined_train_loader, combined_val_loader, combined_test_dataloader, lr=lr)

# Train and validate
combined_history = combined_trainer.train_and_validate(epochs=epochs)

# Test
test_loss, test_accuracy = combined_trainer.test()

Epoch 1: Train Loss: 2.7622, Train Accuracy: 28.00%, Val Loss: 1.9742, Val Accuracy: 43.63%
Epoch 2: Train Loss: 1.6232, Train Accuracy: 53.14%, Val Loss: 1.3964, Val Accuracy: 58.26%
Epoch 3: Train Loss: 1.2335, Train Accuracy: 63.18%, Val Loss: 1.3562, Val Accuracy: 58.10%
Epoch 4: Train Loss: 1.0878, Train Accuracy: 67.22%, Val Loss: 1.0598, Val Accuracy: 67.73%
Epoch 5: Train Loss: 0.8792, Train Accuracy: 72.92%, Val Loss: 0.9159, Val Accuracy: 70.82%
Epoch 6: Train Loss: 0.7981, Train Accuracy: 75.16%, Val Loss: 0.8818, Val Accuracy: 72.48%
Epoch 7: Train Loss: 0.7275, Train Accuracy: 77.46%, Val Loss: 0.7860, Val Accuracy: 75.39%
Epoch 8: Train Loss: 0.6570, Train Accuracy: 79.42%, Val Loss: 0.7397, Val Accuracy: 76.44%
Epoch 9: Train Loss: 0.5888, Train Accuracy: 81.57%, Val Loss: 0.7093, Val Accuracy: 77.39%
Epoch 10: Train Loss: 0.5624, Train Accuracy: 82.49%, Val Loss: 0.7159, Val Accuracy: 77.94%
Epoch 11: Train Loss: 0.4976, Train Accuracy: 84.22%, Val Loss: 0.6435, Val Acc

In [9]:
import os
import torch
import matplotlib.pyplot as plt
import random
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc
import numpy as np
from sklearn.preprocessing import label_binarize


class Evaluator:
    def __init__(self, model, test_loader, num_classes, output_dir="evaluation_outputs"):
        """
        Initializes the evaluator.

        Args:
            model (torch.nn.Module): Trained model to evaluate.
            test_loader (DataLoader): DataLoader for the test dataset.
            num_classes (int): Number of classes in the dataset.
            output_dir (str): Directory to save evaluation plots and metrics.
        """
        self.model = model
        self.test_loader = test_loader
        self.num_classes = num_classes
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.output_dir = output_dir

        # Create output directory if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)

    def evaluate_and_save(self):
        """
        Evaluates the model on test data and saves visualizations and metrics to disk.
        """
        self.model.eval()
        all_images = []
        all_labels = []
        all_predictions = []
        all_probabilities = []

        with torch.no_grad():
            for images, labels in self.test_loader:
                images, labels = images.to(self.device), labels.to(self.device)
                outputs = self.model(images)
                probabilities = torch.softmax(outputs, dim=1)
                _, predictions = torch.max(outputs, 1)

                # Collect all images, labels, and predictions
                all_images.extend(images.cpu())  # Collect all test images
                all_labels.extend(labels.cpu().numpy())
                all_predictions.extend(predictions.cpu().numpy())
                all_probabilities.extend(probabilities.cpu().numpy())

        # Convert collected data to NumPy arrays
        all_labels = np.array(all_labels)
        all_predictions = np.array(all_predictions)
        all_probabilities = np.array(all_probabilities)

        # Save sample predictions
        self._save_sample_predictions(all_images, all_labels, all_predictions)

        # Save confusion matrix
        self._save_confusion_matrix(all_labels, all_predictions)

        # Save classification report
        self._save_classification_report(all_labels, all_predictions)

        # Save ROC curve
        self._save_roc_curve(all_labels, all_probabilities)

    def _save_sample_predictions(self, images, labels, predictions):
        """
        Randomly selects test images until there are images of 5 unique labels 
        and saves them along with their predictions and true labels to a file.
        """
        max_samples = 5  # Limit the number of unique labels
        unique_labels = {}
        selected_indices = []
        total_samples = len(labels)

        # Randomly shuffle indices
        indices = list(range(total_samples))
        random.shuffle(indices)

        # Select images with unique labels
        for idx in indices:
            label = labels[idx]
            if label not in unique_labels:
                unique_labels[label] = True
                selected_indices.append(idx)
                if len(unique_labels) == max_samples:
                    break

        # Plot and save the selected samples
        plt.figure(figsize=(15, 5))
        for i, idx in enumerate(selected_indices):
            plt.subplot(1, len(selected_indices), i + 1)
            plt.imshow(images[idx].squeeze(), cmap='gray')
            plt.title(f"True: {labels[idx]}\nPred: {predictions[idx]}")
            plt.axis('off')

        plt.tight_layout()
        file_path = os.path.join(self.output_dir, "sample_predictions.png")
        plt.savefig(file_path)
        plt.close()




    def _save_confusion_matrix(self, labels, predictions):
        """
        Saves the confusion matrix to a file.
        """
        cm = confusion_matrix(labels, predictions)
        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=True, yticklabels=True)
        plt.title("Confusion Matrix")
        plt.xlabel("Predicted")
        plt.ylabel("Actual")
        file_path = os.path.join(self.output_dir, "confusion_matrix.png")
        plt.savefig(file_path)
        plt.close()

    def _save_classification_report(self, labels, predictions):
        """
        Saves the classification report to a text file.
        """
        report = classification_report(labels, predictions)
        file_path = os.path.join(self.output_dir, "classification_report.txt")
        with open(file_path, "w") as f:
            f.write(report)

    def _save_roc_curve(self, labels, probabilities):
        # Binarize labels for multi-class ROC calculation
        labels_binarized = label_binarize(labels, classes=range(self.num_classes))
        fpr, tpr, _ = roc_curve(labels_binarized.ravel(), probabilities.ravel())
        roc_auc = auc(fpr, tpr)

        # Plot the ROC curve
        plt.figure(figsize=(10, 8))
        plt.plot(fpr, tpr, label=f"Combined Classes (AUC = {roc_auc:.2f})")
        plt.plot([0, 1], [0, 1], 'k--')  # Diagonal line
        plt.title("ROC Curve (All Classes)")
        plt.xlabel("False Positive Rate")
        plt.ylabel("True Positive Rate")
        plt.legend(loc="lower right")
        file_path = os.path.join(self.output_dir, "roc_curve.png")
        plt.savefig(file_path)
        plt.close()

In [10]:
bangla_evaluator = Evaluator(combined_model, combined_test_dataloader, combined_classes, output_dir="evaluation_outputs")
bangla_evaluator.evaluate_and_save()