In [None]:
%matplotlib inline
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
import random
import numpy as np
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset
import matplotlib.pyplot as plt
import time
import re
from PIL import Image, ImageEnhance

In [None]:
# Constants
vocab_set = set()
max_label_length = 0
vocab_size = 0
input_size = 512
output_size = vocab_size
hidden_size = 512
batch_size = 64
dropout_rate = 0.3
epochs = 150
learning_rate = 0.0008
height = 32
width = 128

In [None]:
import kagglehub
path = kagglehub.dataset_download("nibinv23/iam-handwriting-word-database")

In [None]:
#path = "/root/.cache/kagglehub/datasets/nibinv23/iam-handwriting-word-database/versions/2/"
path = "/Users/hufen/.cache/kagglehub/datasets/nibinv23/iam-handwriting-word-database/versions/2/"
for root, dirs, files in os.walk(path):
    print(f"Root: {root}, Files: {files}")

In [None]:
def sanitize_filename(filename):
    # Replace invalid characters with an underscore
    return re.sub(r'[<>:"/\\|?*\x00-\x1F]', '_', filename)

def gray_scale_check(inputPath, outputPath):
  try:
    image = Image.open(inputPath)
    if image.mode == "RGB": #only convert if RBG
        pixels = image.load()
        width, height = image.size
        grayscaleImage = Image.new("L", (width, height))
        grayscalePixels = grayscaleImage.load()
        for x in range(width):
            for y in range(height):
                r, g, b = pixels[x, y]
                grayValue = int(0.299 * r + 0.587 * g + 0.114 * b) #grayscale formula
                grayscalePixels[x, y] = grayValue
        grayscaleImage.save(outputPath)
        return grayscaleImage
    else:
        image.save(outputPath)
        return image
  except(OSError, Image.UnidentifiedImageError) as e:
      print(f"Image processing error, return none")
      return None #these images will be excluded from the dataset

def label_augment_preprocess():
    #datasetRoot = "/root/.cache/kagglehub/datasets/nibinv23/iam-handwriting-word-database/versions/2/"
    datasetRoot = "/Users/hufen/.cache/kagglehub/datasets/nibinv23/iam-handwriting-word-database/versions/2/"
    wordsNewPath = os.path.join(datasetRoot, "words_new.txt")
    Path = os.path.join(datasetRoot, "iam_words/words.txt")
    iamWordsDir = os.path.join(datasetRoot, "iam_words/words")
    #outputDir = "/content/labeledImages"
    #inputFolder = "/content/labeledImages"
    #preprocessedFolder = "/content/preprocessed"
    outputDir = "/Users/hufen/Coding/ECS 174/Project/labeledImages"
    inputFolder = "/Users/hufen/Coding/ECS 174/Project/labeledImages"
    preprocessedFolder = "/Users/hufen/Coding/ECS 174/Project/preprocessed"

    Labels = {}
    with open(wordsNewPath, "r") as f:
        lines = f.readlines()
    for line in lines:
        if line.startswith("#") or len(line.strip()) == 0: continue
        cols = line.split()
        if cols[1] == "err": continue #just skip if theres an error
        fileId = cols[0]
        transcription = " ".join(cols[8:])
        Labels[fileId] = transcription
    os.makedirs(preprocessedFolder, exist_ok=True)

    for root, _, files in os.walk(iamWordsDir):
        for filename in files:
            if filename.endswith(('.png', '.jpg', '.jpeg')): #valid extension
                fileId = filename.rsplit('.', 1)[0]
                if fileId in Labels:
                    label = Labels[fileId]
                    # Sanitize the label to remove invalid characters
                    label = sanitize_filename(label)
                    labelDir = os.path.join(outputDir, label)
                    global max_label_length
                    global vocab_set
                    if len(label) > max_label_length:
                        max_label_length = len(label)
                    for char in label: # The block is to adjust the vocab size
                      vocab_set.add(char)

                    os.makedirs(labelDir, exist_ok=True)
                    inputPath = os.path.join(root, filename)
                    outputPath = os.path.join(labelDir, filename)

                    try:
                        with open(inputPath, 'rb') as srcFile, open(outputPath, 'wb') as destFile:
                            destFile.write(srcFile.read())
                    except(OSError, Image.UnidentifiedImageError) as e:
                        print(f"Image processing error: {e}")

    for root, _, files in os.walk(inputFolder):
        for filename in files:
            if filename.endswith(('.png', '.jpg', '.jpeg')):
                inputPath = os.path.join(root, filename)
                relativePath = os.path.relpath(inputPath, inputFolder)
                outputPath = os.path.join(preprocessedFolder, relativePath)

                os.makedirs(os.path.dirname(outputPath), exist_ok=True)
                grayscaleImage = gray_scale_check(inputPath, outputPath)

                if grayscaleImage is None:
                  continue #exclude image from handling

                imgWidth, imgHeight = grayscaleImage.size
                cropWidth, cropHeight = int((imgWidth * 0.95)), int((imgHeight * 0.95)) #crop to 95% input height and width at most
                croppedImage = None
                if imgWidth >= width and imgHeight >= height:
                    top = random.randint(0, imgHeight - cropHeight)
                    left = random.randint(0, imgWidth - cropWidth)
                    right = left + cropWidth
                    bottom = top + cropHeight
                    croppedImage = grayscaleImage.crop((left, top, right, bottom))
                    #only crops if it is large enough to be cropped

                bFactor = random.uniform(0.8, 1.2)
                cFactor = random.uniform(0.8, 1.2)

                brightnessAdjusted = (ImageEnhance.Brightness(grayscaleImage)).enhance(bFactor)
                contrastAdjusted = ImageEnhance.Contrast(brightnessAdjusted).enhance(cFactor)
                if croppedImage:
                    croppedOutputPath = outputPath.replace('.png', '_crop.png').replace('.jpg', '_crop.jpg')
                    croppedImage.save(croppedOutputPath) #finally save augmented images
                contrastAdjustedOutputPath = outputPath.replace('.png', '_bc.png').replace('.jpg', '_bc.jpg')

                contrastAdjusted.save(contrastAdjustedOutputPath) # ^^

label_augment_preprocess()

In [None]:
# Instantiate a dataset class
class HandwritingDataset(Dataset):
    def __init__(self, preprocessedFolder, vocab, transform=None):
        self.data_dir = preprocessedFolder
        self.transform = transform
        self.vocab = vocab
        self.image_paths = []
        self.labels = []

        for root, _, files in os.walk(self.data_dir):
          for file in files:
            if file.endswith(('.png', '.jpg', '.jpeg')):
              self.image_paths.append(os.path.join(root, file))
              self.labels.append(os.path.basename(root))

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
      image_path = self.image_paths[idx]
      label = self.labels[idx]
      image = Image.open(image_path)

      # transform (resizing and conversion to tensor)
      if self.transform:
            image = self.transform(image)

      # convert label into a tensor and pad it to the max length
      label = torch.tensor([self.vocab[char] for char in label], dtype = torch.long)

      padding_length = max_label_length - label.size(0)
      if padding_length > 0:
            label = torch.cat([label, torch.zeros(padding_length, dtype=torch.long)])

      return image, label

# Initialize folders and the datasets
#preprocessedFolder = "/content/preprocessed"
#nonprocessedFolder = "/content/labeledImages"
preprocessedFolder = "/Users/hufen/Coding/ECS 174/Project/preprocessed"
nonprocessedFolder = "/Users/hufen/Coding/ECS 174/Project/labeledImages"

# Manually add in characters that were sanitized from preprocessing
vocab_set.add('_')
vocab_set.add('*')
vocab_set.add('<')
vocab_set.add('>')
vocab_set.add(':')
vocab_set.add('/')
vocab_set.add('\\')
vocab_set.add('?')
vocab_set.add('"')


# Make a dictionary for vocab
vocab = {char: idx for idx, char in enumerate(sorted(vocab_set))}
vocab_size = len(vocab) + 1
output_size = vocab_size

transform = transforms.Compose([transforms.Resize((32, 128)),
                                transforms.ToTensor(),
                                transforms.Normalize((0.5,), (0.5,))])

trainset = HandwritingDataset(preprocessedFolder, vocab, transform = transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size = batch_size, shuffle=True, num_workers = 0, pin_memory=True)

testset = HandwritingDataset(nonprocessedFolder, vocab, transform = transform)
testloader = torch.utils.data.DataLoader(testset, batch_size = batch_size, shuffle=False, num_workers = 0, pin_memory=True)

print(f"Vocab size: {len(vocab)}")
print(vocab)
print(f"Number of training samples: {len(trainloader.dataset)}")
print(f"Number of test samples: {len(testloader.dataset)}")

In [None]:
# cnn
class testCNN(nn.Module):
  def __init__(self, in_channels: int, out_channels: int):
    super(testCNN, self).__init__()
    self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1)
    self.bn1 = nn.BatchNorm2d(out_channels)

    self.conv2 = nn.Conv2d(out_channels, 64, kernel_size=3, stride=1, padding=1)
    self.bn2 = nn.BatchNorm2d(64)

    self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
    self.bn3 = nn.BatchNorm2d(128)

    self.conv4 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
    self.bn4 = nn.BatchNorm2d(256)

    self.conv5 = nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1)
    self.bn5 = nn.BatchNorm2d(512)

    self.conv6 = nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1)
    self.bn6 = nn.BatchNorm2d(512)

    self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

    # could change droupout
    self.dropout = nn.Dropout(0.3)

  def forward(self, x):
    x = self.pool(F.relu(self.bn1(self.conv1(x))))
    x = self.pool(F.relu(self.bn2(self.conv2(x))))
    x = self.dropout(x)
    x = self.pool(F.relu(self.bn3(self.conv3(x))))
    x = self.dropout(x)
    x = self.pool(F.relu(self.bn4(self.conv4(x))))
    x = self.dropout(x)
    x = self.pool(F.relu(self.bn5(self.conv5(x))))
    x = self.dropout(x)
    x = F.relu(self.bn6(self.conv6(x)))
    x = self.dropout(x)

    return x

In [None]:
# 2 LSTM layers
class BidirectionalLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, dropout_rate):
        super().__init__()
        # cnn layers
        self.cnnLayer = testCNN(1, 32)
        # lstm layers
        self.lstm = nn.LSTM(input_size = input_size, hidden_size = hidden_size, num_layers = 2, bias = True, batch_first = False, dropout = dropout_rate, bidirectional = True)
        self.fc = nn.Linear(hidden_size * 2, output_size)

    def forward(self, x):
        x = self.cnnLayer(x)
        # reshape output from cnn
        batch_size, channels, height, width = x.size()
        x = x.permute(3, 0, 2, 1).contiguous()
        x = x.view(width, batch_size, -1)

        x, _ = self.lstm(x)
        x = self.fc(x)
        x = F.log_softmax(x, dim = 2)
        return x

In [None]:
# Loss and optimizers
model = BidirectionalLSTM(input_size = input_size, hidden_size = hidden_size, output_size = output_size, dropout_rate = dropout_rate)
criterion = nn.CTCLoss(blank = 0, zero_infinity = True)
optimizer = optim.Adam(model.parameters(), lr = learning_rate)

In [None]:
# Lists to store metrics for plotting
loss_metric = []
training_accuracy = []
test_accuracy = []

In [None]:
# Training
t0 = time.time()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

for epoch in range(epochs):

    # Statistics
    correct = 0
    total = 0
    running_loss = 0.0
    skipped = 0
    for i, data in enumerate(trainloader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs)

        # introduce input and target lengths for ctc loss
        input_lengths = torch.full((outputs.size(1),), outputs.size(0), dtype=torch.long, device=device)
        target_lengths = torch.tensor([len(label[label > 0]) for label in labels], dtype=torch.long, device=device)
        flattened_labels = labels[labels > 0].view(-1).to(device)

        loss = criterion(outputs, flattened_labels, input_lengths, target_lengths)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        # print statistics and calculate accuracy
        running_loss += loss.item()

        # Decode predictions
        _, predicted = outputs.max(2)
        predicted = predicted.transpose(0, 1)

        # Remove blanks from predictions
        decoded_preds = []
        for seq in predicted:
            decoded_seq = []
            prev_char = None
            for char in seq:
                if char != prev_char:
                    decoded_seq.append(char.item())
                    prev_char = char
            decoded_preds.append(decoded_seq)

        for pred, label in zip(decoded_preds, labels):
            if pred == label[label > 0].tolist():
                correct += 1
        total += labels.size(0)

        if i % 400 == 399:    # print every 400 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 400:.3f}')
            print(f'training accuracy: {correct / total * 100:.2f}%')
            running_loss = 0.0

    # Record the training accuracy and loss
    epoch_loss = running_loss / len(trainloader)
    epoch_accuracy = correct / total
    loss_metric.append(epoch_loss)
    training_accuracy.append(epoch_accuracy)

    # Record the testing accuracy
    correct = 0
    total = 0
    with torch.no_grad():
        try:
            for data in testloader:
                images, labels = data
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)

                _, predicted = outputs.max(2)
                predicted = predicted.transpose(0, 1)

                decoded_preds = []
                for seq in predicted:
                    decoded_seq = []
                    prev_char = None
                    for char in seq:
                        if char != prev_char:
                            decoded_seq.append(char.item())
                        prev_char = char
                    decoded_preds.append(decoded_seq)

                for pred, label in zip(decoded_preds, labels):
                    if pred == label[label > 0].tolist():
                        correct += 1
                total += labels.size(0)
        except(OSError, Image.UnidentifiedImageError) as e:
            skipped += 1


    acc = correct / total
    test_accuracy.append(acc)
    print(f'Testing Accuracy: {acc * 100:.2f}%')
    print(f'Skipped Images: {skipped}')

    t1 = time.time()
    total_time = t1-t0
    print(f"Training and Testing took {total_time:.2f} seconds until now")

print('Finished Training')

In [None]:
# Plot loss
plt.subplot(1, 3, 1)
plt.plot(loss_metric, label='Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.grid(True)

# Plot training accuracy
plt.subplot(1, 3, 2)
plt.plot(training_accuracy, label='Training Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.title('Training Accuracy')
plt.grid(True)

# Plot test accuracy
plt.subplot(1, 3, 3)
plt.plot(test_accuracy, label='Test Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.title('Test Accuracy')
plt.grid(True)

plt.tight_layout()
plt.show()

In [None]:
# Testing
correct = 0
total = 0
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
    try:
        for data in testloader:
            images, labels = data
            outputs = model(images)

            _, predicted = outputs.max(2)
            predicted = predicted.transpose(0, 1)

            decoded_preds = []
            for seq in predicted:
                decoded_seq = []
                prev_char = None
                for char in seq:
                    if char != prev_char:
                        decoded_seq.append(char.item())
                    prev_char = char
                decoded_preds.append(decoded_seq)

            for pred, label in zip(decoded_preds, labels):
                if pred == label[label > 0].tolist():
                    correct += 1
            total += labels.size(0)

    except(OSError, Image.UnidentifiedImageError) as e:
        print(f"Skipping image due to unidentifies image: {e}")

print(f'Accuracy of the network on the test images: {100 * correct // total} %')

In [None]:
# Saving the model
torch.save(model, '/Users/hufen/Coding/ECS 174/Project/model/model.pth')

In [None]:
# Using the model
def gray_scale_testing(inputPath):
  try:
    image = Image.open(inputPath)
    if image.mode == "RGB":
        pixels = image.load()
        width, height = image.size
        grayscaleImage = Image.new("L", (width, height))
        grayscalePixels = grayscaleImage.load()
        for x in range(width):
            for y in range(height):
                r, g, b = pixels[x, y]
                grayValue = int(0.299 * r + 0.587 * g + 0.114 * b)
                grayscalePixels[x, y] = grayValue
        return grayscaleImage
    else:
        return image
  except(OSError, Image.UnidentifiedImageError) as e:
      print(f"Image processing error, return none")
      return None

model = torch.load('/Users/hufen/Coding/ECS 174/Project/model/model.pth')

image_path = '/path/to/image.jpg'
image = Image.open(image_path)

grayImage = gray_scale_testing(image)

processed_image = transform(grayImage).unsqueeze(0)

with torch.no_grad():
  output = model(processed_image)

_, predicted = outputs.max(2)
predicted = predicted.transpose(0, 1)

plt.figure(figsize=(30, 30))
plt.imshow(image)  # Show the original (unprocessed) image
plt.title(f"Predicted: {predicted}", fontsize=16)
plt.axis("off")
plt.show()