Data Setup

In [29]:
import shutil
import os
from pathlib import Path
import zipfile as zipfile


# Destination directories for storing the downloaded data
supervised_dest_dir = 'supervised'
unsupervised_synthetic_dest_dir = 'unsupervised_synthetic'
unsupervised_dest_dir = 'unsupervised'

# Create destination directories if they don't exist
os.makedirs(supervised_dest_dir, exist_ok=True)
os.makedirs(unsupervised_synthetic_dest_dir, exist_ok=True)
os.makedirs(unsupervised_dest_dir, exist_ok=True)

# Function to download a folder from Google Drive
def download_folder(zip_name,destination):

    with zipfile.ZipFile(zip_name, 'r') as zip_ref:
        zip_ref.extractall(destination)

    # Move the contents of the extracted folder to the destination
    extracted_folder = os.path.join(destination, Path(zip_name).stem)
    for item in os.listdir(extracted_folder):
        s = os.path.join(extracted_folder, item)
        d = os.path.join(destination, item)
        if os.path.isdir(s):
            shutil.move(s, d)
        else:
            shutil.copy2(s, d)

    # Clean up temporary files
    #os.remove(zip_name)
    shutil.rmtree(extracted_folder)

# Download and organize the supervised dataset
#download_folder('supervised_data.zip', supervised_dest_dir)

# Download and organize the unsupervised dataset
download_folder("unsupervised_data.zip", unsupervised_dest_dir)

#download_folder("unsupervised_synthetic_data.zip", unsupervised_synthetic_dest_dir)


In [None]:
import os
import hashlib
import shutil

def get_file_checksum(file_path):
    """Calculate the checksum of a file."""
    sha256_hash = hashlib.sha256()
    with open(file_path, "rb") as f:
        # Read and update hash string value in blocks of 4K
        for byte_block in iter(lambda: f.read(4096), b""):
            sha256_hash.update(byte_block)
    return sha256_hash.hexdigest()

def remove_duplicate_images(folder_path):
    """Remove duplicate images in a folder."""
    # Dictionary to store checksums and corresponding file paths
    checksums = {}

    # List all files in the folder
    files = os.listdir(folder_path)

    for file_name in files:
        file_path = os.path.join(folder_path, file_name)

        # Check if the file is a regular file and not a directory
        if os.path.isfile(file_path):
            # Calculate the checksum of the file
            checksum = get_file_checksum(file_path)

            # Check if the checksum is already in the dictionary
            if checksum in checksums:
                # If a duplicate is found, remove the file
                print(f"Removing duplicate: {file_path}")
                os.remove(file_path)
            else:
                # Add the checksum to the dictionary
                checksums[checksum] = file_path

def rename_files(folder_path):
    """Rename files with '(1)' in their names."""
    # List all files in the folder
    files = os.listdir(folder_path)

    for file_name in files:
        file_path = os.path.join(folder_path, file_name)

        # Check if the file is a regular file and not a directory
        if os.path.isfile(file_path):
            # Check if the file name contains '(1)'
            if '(1)' in file_name:
                # Rename the file by removing '(1)'
                new_file_name = file_name.replace('(1)', '')
                new_file_path = os.path.join(folder_path, new_file_name)
                os.rename(file_path, new_file_path)
                print(f"Renamed: {file_path} to {new_file_path}")

if __name__ == "__main__":
    folder_path = r"unsupervised_synthetic\times_alphabet_images_rotated"
    remove_duplicate_images(folder_path)
    #rename_files(folder_path)


In [1]:
import torch
from torch import nn

Dataset builder- Supervised

In [2]:
import torch

def word_to_tensor(word):
    letter_count = [0] * 52
    for char in word:
        if 'a' <= char <= 'z':
            letter_count[ord(char) - ord('a')] += 1
        elif 'A' <= char <= 'Z':
            letter_count[ord(char) - ord('A') + 26] += 1
    return torch.tensor(letter_count, dtype=torch.float32)


In [3]:
import os
import csv
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torchvision import transforms


class SupervisedDataset(Dataset):
    def __init__(self, root_dir, labels_path):
        self.root_dir = root_dir
        self.labels_path = labels_path
        self.data = []
        with open(labels_path, newline="") as labels_file:
            labels_reader = csv.reader(labels_file)
            for row in labels_reader:
                self.data.append(row)  # a list of [filename, [chars in image]]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = os.path.join(self.root_dir, self.data[idx][0])

        image = Image.open(img_path).convert("RGB")

        label_text = self.data[idx][1]

        # Convert label text to array of letter counts
        label_tensor = word_to_tensor(label_text)

        return image, label_tensor


# Set your root directory
root_dir = "supervised"
# subfolder = 'supervised_data'

# Create datasets for each model
models = [
    "arial",
    "bradhitc",
    "century_schoolbook",
    "comic",
    "cour",
    "papyrus",
    "times",
]
train_datasets, val_datasets = [], []

for model in models:
    model_dir = os.path.join(root_dir, f"{model}_images")
    labels_path = os.path.join(root_dir, f"{model}.csv")
    all_data = SupervisedDataset(model_dir, labels_path)

    # Split data into training and validation sets
    train_size = 1000
    val_size = 100
    train_data, val_data = torch.utils.data.random_split(
        all_data, [train_size, val_size]
    )

    train_datasets.append(train_data)
    val_datasets.append(val_data)

# Create DataLoaders
train_loaders = [
    DataLoader(dataset, batch_size=32, shuffle=True) for dataset in train_datasets
]
val_loaders = [
    DataLoader(dataset, batch_size=32, shuffle=False) for dataset in val_datasets
]

Dataset builder - Unsupervised

In [4]:
from torchvision import transforms
from torch.utils.data import ConcatDataset, Dataset, DataLoader
from PIL import Image
import os

class CustomFontDataset(Dataset):
    def __init__(self, root, transform=None):
        self.root = root
        
        self.transform = transform
        self.unsupervised_data = self.load_unsupervised_data()
        self.synthetic_data = self.load_synthetic_data()

    def load_unsupervised_data(self):
        unsupervised_path = os.path.join(self.root, 'unsupervised')
        unsupervised_images = [os.path.join(unsupervised_path, img) for img in os.listdir(unsupervised_path)]
        return unsupervised_images

    def load_synthetic_data(self):
        synthetic_path = os.path.join(self.root, f'unsupervised_synthetic')
        synthetic_images = [os.path.join(synthetic_path, img) for img in os.listdir(synthetic_path)]
        return synthetic_images

    def __len__(self):
        return len(self.unsupervised_data) + len(self.synthetic_data)

    def __getitem__(self, index):
        if index < len(self.unsupervised_data):
            img_path = self.unsupervised_data[index]
            label = 0  # You can set the label for unsupervised data to 0 or any other value
        else:
            adjusted_index = index - len(self.unsupervised_data)
            img_path = self.synthetic_data[adjusted_index]
            label = 1  # You can set the label for synthetic data to 1 or any other value

        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)

        return image, label

# Define the transform (you can customize this based on your needs)
transform = transforms.Compose([transforms.ToTensor()])
transform = transforms.Compose([
            transforms.Resize((50, 50)),
            transforms.ToTensor(),
        ])
# Set root to the current working directory
current_working_directory = os.getcwd()

# Create datasets for each font
dataset = CustomFontDataset(root=current_working_directory, transform=transform)

# Concatenate all datasets into one


# Create a dataloader for the combined dataset
batch_size = 32
unsupervised_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Access the combined dataloader
print(f"Number of samples in the combined dataloader: {len(unsupervised_loader.dataset)}")

Number of samples in the combined dataloader: 16120


In [23]:
# Stacked Convolutional Auto-Encoder (the unsupervised sub-network)
class SCAE(nn.Module):
  def __init__(self, num_channels):
    super().__init__()

    self.conv1 = nn.Conv2d(
        in_channels=num_channels,
        out_channels=64,
        kernel_size=11,
        padding=1,
        stride=2
    )
    self.conv2 = nn.Conv2d(
        in_channels=64,
        out_channels=128,
        kernel_size=5,
        padding=2
    )
    self.deconv1 = nn.ConvTranspose2d(
        in_channels = 128,
        out_channels = 64,
        kernel_size = 5,
        padding = 2
    )
    self.deconv2 = nn.ConvTranspose2d(
        in_channels=64,
        out_channels=1,
        kernel_size=11,
        padding=1,
        # using stride in the conv1 layer means that multiple input sizes are mapped to the same size
        # output_padding of 1 ensures that the output is the same size as the input
        # in the specific case that the model is producing an output 1 smaller than the input in both dimensions
        # change the output padding value if you change the input image size
        output_padding=1,
        stride=2
    )
    self.maxpool = nn.MaxPool2d(2, return_indices=True)
    self.unpool = nn.MaxUnpool2d(2)
    self.relu = nn.ReLU()

  def forward(self, x):

    x1 = self.conv1(x)
    x2 = self.relu(x1)
    x3, indices = self.maxpool(x2)

    x4 = self.conv2(x3)
    x5 = self.relu(x4)

    x6 = self.deconv1(x5)
    x7 = self.unpool(x6, indices, output_size=x2.size())
    x8 = self.relu(x7)

    x9 = self.deconv2(x8)
    x10 = self.relu(x9)

    return x10

In [6]:
class DeepFont(nn.Module):
  def __init__(self, num_channels, num_classes):
    super().__init__()
    
    self.conv1 = nn.Conv2d(
        in_channels=num_channels,
        out_channels=64,
        kernel_size=11,
        padding=1,
        stride=2
    )
    self.conv2 = nn.Conv2d(
        in_channels=64,
        out_channels=128,
        kernel_size=5,
        padding=2
    )
    self.conv3 = nn.Conv2d(
        in_channels=128,
        out_channels=256,
        kernel_size=3,
        padding=1
    )
    self.conv4 = nn.Conv2d(
        in_channels=256,
        out_channels=256,
        kernel_size=3,
        padding=1
    )
    self.conv5 = nn.Conv2d(
        in_channels=256,
        out_channels=256,
        kernel_size=3,
        padding=1
    )
    self.fc6 = nn.Linear(in_features=12*12*256, out_features=4096) # assuming input image size of 28x28. change in_feats for different sample size
    self.fc7 = nn.Linear(in_features=4096, out_features=4096)
    self.fc8 = nn.Linear(in_features=4096, out_features=num_classes)
    self.norm1 = nn.BatchNorm2d(num_features=64)
    self.norm2 = nn.BatchNorm2d(num_features=128)
    self.dropout = nn.Dropout(0.5)
    self.maxpool = nn.MaxPool2d(2)
    self.relu = nn.ReLU()
    self.flatten = nn.Flatten()

  def forward(self, x):
    x =self.conv1(x)
    x = self.norm1(x)
    x = self.maxpool(x)
    x = self.relu(x)

    x = self.conv2(x)
    x = self.norm2(x)
    x = self.maxpool(x)
    x = self.relu(x)

    x = self.conv3(x)
    x = self.relu(x)

    x = self.conv4(x)
    x = self.relu(x)

    x = self.conv5(x)
    x = self.relu(x)

    x = self.flatten(x)

    x = self.dropout(self.fc6(x))
    x = self.relu(x)

    x = self.dropout(self.fc7(x))
    x = self.relu(x)

    x = self.fc8(x)

    return x

In [7]:
import torch

def sensitivity(preds, labels, num_classes=52):
    scores = []
    
    for value in range(num_classes):
        mask = labels == value
        tp = (preds[mask] == value).sum().item()
        fn = (preds[mask] != value).sum().item()
        
        if tp + fn > 0:
            scores.append(tp / (tp + fn))
    
    return sum(scores) / len(scores) if len(scores) > 0 else 0

In [8]:
import torch

def specificity(preds, labels, num_classes=52):
    scores = []
    
    for value in range(num_classes):
        mask = labels != value
        tn = (preds[mask] != value).sum().item()
        fp = (preds[mask] == value).sum().item()
        
        if tn + fp > 0:
            scores.append(tn / (tn + fp))
    
    return sum(scores) / len(scores) if len(scores) > 0 else 0

In [9]:
import torch

def precision(preds, labels, num_classes=52):
    scores = []
    
    for value in range(num_classes):
        mask = preds == value
        tp = (labels[mask] == value).sum().item()
        fp = (labels[mask] != value).sum().item()
        
        if tp + fp > 0:
            scores.append(tp / (tp + fp))
    
    return sum(scores) / len(scores) if len(scores) > 0 else 0

In [10]:
def training_unsupervised(model, dataloader, criterion, optimizer, device, epochs, model_path):
    model = model.to(device)
    model.train()
    best_loss = torch.inf
    for _ in range(epochs):
        total_loss = 0
        for batch_index, (images, _) in enumerate(dataloader):
            optimizer.zero_grad()
            images = images.to(device)
            outputs = model(images)
            loss = criterion(outputs, images)
            total_loss += loss.item() * images.size(0)
            loss.backward()
            optimizer.step()
        avg_loss = total_loss / (batch_index+1)
        if avg_loss < best_loss:
            best_loss = avg_loss
            torch.save(model.state_dict(), model_path)

In [11]:
def evaluation(model, dataloader, criterion, device, phase='Validation'):
    model.eval()
    predictions = []
    ground_truth = []
    with torch.no_grad():
        total_loss = 0
        total = 0
        correct = 0
        for _, (images, labels) in enumerate(dataloader):
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            total_loss += loss.item() * images.size(0)
            total += images.size(0)
            _, preds = torch.max(outputs, 1)
            predictions.extend(preds.cpu().numpy())
            ground_truth.extend(labels.cpu().numpy())
            correct += (preds == labels).sum().item()

        accuracy = correct / total
        loss = total_loss / total
        sensitivity_score = sensitivity(torch.tensor(predictions), torch.tensor(ground_truth))
        specificity_score = specificity(torch.tensor(predictions), torch.tensor(ground_truth))
        precision_score = precision(torch.tensor(predictions), torch.tensor(ground_truth))

        print(f'\t{phase}\tAccuracy={accuracy:<10.4f}' +
              f'\t\tLoss= {loss:<10.4f}' +
              f'\t\tSensitivity: {sensitivity_score:<10.4f}' +
              f'\t\tSpecificity: {specificity_score:<10.4f}' +
              f'\t\tPrecision: {precision_score:<10.4f}')

        return {'loss': loss,
                'accuracy': accuracy,
                'sensitivity': sensitivity_score,
                'specificity': specificity_score,
                'precision': precision_score,
                'ground_truth': ground_truth,
                'predictions': predictions}

In [12]:
# Supervised model training function adapted from code provided by Dr. Farhad Maleki in MNISTFasion_CNN.ipynb
def training_supervised(model, train_loader, val_loader, criterion, optimizer, device, epochs, best_model_path):
    model = model.to(device)
    model.train()
    best_loss = torch.inf
    best_results = None

    for epoch in range(epochs):
        total_loss = 0
        total = 0
        correct = 0

        for _, (images, labels) in enumerate(train_loader):
            optimizer.zero_grad()
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item() * images.size(0)
            total += images.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()

        accuracy = correct / total
        loss = total_loss / total

        # Adapt the metrics functions to handle tensors
        sensitivity_score = sensitivity(preds, labels)
        specificity_score = specificity(preds, labels)
        precision_score = precision(preds, labels)

        print(f'{epoch:<4}\tTrain\tAccuracy={accuracy:<10.4f}' +
              f'\t\tLoss= {loss:<10.4f}' +
              f'\t\tSensitivity: {sensitivity_score:<10.4f}' +
              f'\t\tSpecificity: {specificity_score:<10.4f}' +
              f'\t\tPrecision: {precision_score:<10.4f}')

        results = evaluation(model, val_loader, criterion, device)
        
        if results['loss'] < best_loss:
            torch.save(model, best_model_path)
            best_loss = results['loss']
            best_results = results

        print()

    return best_results

In [47]:
import torch.nn as nn
import torch.optim as optim

models_dir = 'models'
device = torch.device("cpu")

# Train the unsupervised sub-network
# Hyperparameters
learning_rate = 0.01
momentum = 0.9
weight_decay = 5e-4
epochs = 10

scae_model = SCAE(num_channels=3)
criterion = nn.MSELoss()
optimizer = optim.SGD(scae_model.parameters(), lr=learning_rate, momentum=momentum, weight_decay=weight_decay)

model_path = os.path.join(models_dir, f"SCAE.pt")
training_unsupervised(scae_model, unsupervised_loader, criterion, optimizer, device, epochs, model_path)

RuntimeError: Found an invalid max index: 418 (output volumes are of size 20x20

In [None]:
# Train the supervised sub-network
supervised_model = DeepFont(num_channels=3, num_classes=52) # one class per letter (case-sensitive)

# Import the convolutional layers of the SCAE as conv1 and conv2
scae_path = os.path.join(models_dir, f"SCAE.pt")
supervised_model.load_state_dict(torch.load(scae_path), strict=False)

# Freeze the convolutional layers from SCAE
for param in supervised_model.conv1.parameters():
    param.requires_grad = False
for param in supervised_model.conv2.parameters():
    param.requires_grad = False

for i in range(len(models)):
    font_name = models[i]
    print(font_name)
    
    train_loader = train_loaders[i]
    val_loader = val_loaders[i]
    best_model_path = os.path.join(models_dir, f"{font_name}_model.pt")
    best_results = training_supervised(supervised_model, train_loader, val_loader, criterion, optimizer, device, epochs, best_model_path)

In [None]:
# Testing our models

models_dir = 'models'
criterion = nn.MSELoss()
device = torch.device("cpu")

for i in range(len(models)):
    font_name = models[i]
    print(font_name)
    model_path = os.path.join(models_dir, f"{font_name}_model.pt")
    model = torch.load(model_path)
    results = evaluation(model, test_loader, criterion, device, 'Test')