# APS360 Group Project

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import numpy as np
from torch.utils.data import SubsetRandomSampler
from torch.utils.data.dataloader import default_collate
import matplotlib.pyplot as plt
import os

In [None]:
global device
print("Cuda Available:", torch.cuda.is_available())
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
FONT_DATASET_PATH = "./fonts_image_dataset"
# Convert the images to tensors and normalize them
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)), transforms.Grayscale(num_output_channels=1)])
fonts_dataset = torchvision.datasets.ImageFolder(root = FONT_DATASET_PATH, transform=transform)
    
num_classes = len(fonts_dataset.classes)
    
# Create a list of indices for all the images in the dataset
dataset_size = len(fonts_dataset)
indices = list(range(dataset_size))
np.random.seed(0)
np.random.shuffle(indices)
np.savetxt("indices", indices)
# Split the indices into 60% Training 20% Validation 20% Testing
split1 = int(0.6 * dataset_size)
split2 = int(0.8 * dataset_size)
train_indices, val_indices, test_indices = indices[:split1], indices[split1:split2], indices[split2:]
# Create a sampler for the training, validation, and testing sets
train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)
test_sampler = SubsetRandomSampler(test_indices)


#FONT_DATASET_PATH2 = "./fonts_image_dataset_no_rotations"
# Dataset 2
#fonts_dataset2 = torchvision.datasets.ImageFolder(root = FONT_DATASET_PATH2, transform=transform)


def load_dataset(f_dataset = fonts_dataset, batch_size = 32):
    
    def custom_collate_fn(batch):
    
        # Use the default collate function to batch the data (images)
        batch = default_collate(batch)
        images, labels = batch
        
        # Apply one-hot encoding to the labels
        labels = F.one_hot(labels, num_classes=num_classes)

        return images, labels

    # Create the dataloaders for the training, validation, and testing sets
    train_loader = torch.utils.data.DataLoader(f_dataset, batch_size=batch_size,sampler=train_sampler,collate_fn=custom_collate_fn)
    val_loader = torch.utils.data.DataLoader(f_dataset, batch_size=batch_size,sampler=val_sampler,collate_fn=custom_collate_fn)
    test_loader = torch.utils.data.DataLoader(f_dataset, batch_size=batch_size,sampler=test_sampler,collate_fn=custom_collate_fn)

    print("Done Loading Data")

    return train_loader, val_loader, test_loader, f_dataset.classes


### Functions for Training and some Data Visualization

In [None]:
def total_error(outputs, labels):
    
    # Find the indices of the max values
    _, indices = torch.max(outputs, dim=1, keepdim=True)

    # Create a tensor of zeros with the same shape as x
    zeros = torch.zeros_like(outputs)

    # Set the max values to 1
    zeros.scatter_(1, indices, 1)
    
    return (zeros != labels).any(dim=1).float().sum()

def evaluate(net, loader):

    net.eval()
    
    total_loss = 0.0
    total_err = 0.0
    total_epoch = 0
    

    criterion = nn.CrossEntropyLoss()

    with torch.no_grad():

        for i, (inputs, labels) in enumerate(loader, 0):
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Forward pass
            outputs = net(inputs)
            
            # Calculate the statistics

            total_err += total_error(outputs, labels)
            total_loss += criterion(outputs, labels.float()).item()
            total_epoch += len(labels)

    loss = float(total_loss) / (i + 1)
    err = float(total_err) / total_epoch
    return err, loss
    
def train_net(net, model_name, BATCH_SIZE = 128, learning_rate = 0.01, num_epochs = 30, patience = None):

    torch.cuda.empty_cache()

    torch.manual_seed(0)
    torch.cuda.manual_seed(0)
    train_loader, val_loader, test_loader, classes = load_dataset(f_dataset = fonts_dataset, batch_size = BATCH_SIZE)


    # Create the directory to store model if it does not exist
    if not os.path.exists(model_name):
      os.makedirs(model_name)
    
    # Set the seed for reproducibility
    torch.manual_seed(0)
    torch.cuda.manual_seed(0)

    criterion = nn.CrossEntropyLoss()
        
    optimizer = optim.AdamW(net.parameters(), lr=learning_rate,weight_decay=1e-3)

    if patience != None:
        num_epochs = 60
    
    # Set up some numpy arrays to store the loss/error rate
    train_err = np.zeros(num_epochs)
    train_loss = np.zeros(num_epochs)
    val_err = np.zeros(num_epochs)
    val_loss = np.zeros(num_epochs)
    
    min_validation_loss = 10000000
    min_validation_err = 10000000
    stop_counter = 0
    
    print("Starting Training")
    
    # Train the network
    for epoch in range(num_epochs):
        
        total_train_loss = 0.0
        total_train_err = 0.0
        total_epoch = 0
        
        
        for i, (inputs, labels) in enumerate(train_loader, 0):
            inputs, labels = inputs.to(device), labels.to(device)
                        
            net.train()
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass, backward pass, and optimize
            outputs = net(inputs)
            
            loss = criterion(outputs, labels.float())
            loss.backward()
            optimizer.step()

            net.eval()
            
            # Calculate the statistics
            total_train_err += total_error(outputs, labels)
            total_train_loss += loss.item()
            total_epoch += len(labels)
            
        train_loss[epoch] = float(total_train_loss) / (i+1)
        
        train_err[epoch] = float(total_train_err) / total_epoch
        val_err[epoch], val_loss[epoch] = evaluate(net, val_loader)
        # Print the statistics
        print(f"Epoch {epoch + 1}: Train err: {train_err[epoch]}, Train loss: {train_loss[epoch]} | Validation err: {val_err[epoch]}, Validation loss: {val_loss[epoch]}")
        # Write the err into CSV file for plotting later
        np.savetxt(f"{model_name}/val_err.csv", val_err)
        np.savetxt(f"{model_name}/train_err.csv", train_err)
                
        
        # Write the loss into CSV file for plotting later
        np.savetxt(f"{model_name}/train_loss.csv", train_loss)
        np.savetxt(f"{model_name}/val_loss.csv", val_loss)
        
        # Save the best model

        if val_err[epoch] <= min_validation_err:
            min_validation_err = val_err[epoch]
            torch.save(net.state_dict(), f"{model_name}/best_model")
            stop_counter = 0
        else:
            stop_counter += 1
        
        if patience != None and stop_counter >= patience:
            break
        

    print('Finished Training')
    net.load_state_dict(torch.load(f"{model_name}/best_model"))

In [None]:
def plot_training_curve(path):
    train_err = np.loadtxt("{}/train_err.csv".format(path))
    val_err = np.loadtxt("{}/val_err.csv".format(path))
    train_loss = np.loadtxt("{}/train_loss.csv".format(path))
    val_loss = np.loadtxt("{}/val_loss.csv".format(path))
    plt.title("Train vs Validation Error")
    num_epochs = len(train_err)
    plt.plot(range(1,num_epochs+1), train_err, label="Train")
    plt.plot(range(1,num_epochs+1), val_err, label="Validation")
    plt.xlabel("Epoch")
    plt.ylabel("Error")
    plt.legend(loc='best')
    plt.show()
    plt.title("Train vs Validation Loss")
    plt.plot(range(1,num_epochs+1), train_loss, label="Train")
    plt.plot(range(1,num_epochs+1), val_loss, label="Validation")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend(loc='best')
    plt.show()  

def visualize_output(net, num_images = 5, f_dataset = fonts_dataset):

    # Load the data
    train_loader, val_loader, test_loader, classes = load_dataset(f_dataset = f_dataset, batch_size = num_images)
    dataiter = iter(test_loader)
    images, labels = next(dataiter)
    net = net.to("cpu")
    # Get ground truth labels
    ground_truth = [classes[np.argmax(labels[j], axis=0)] for j in range(num_images)]

    # Get model predictions
    outputs = net(images)
    outputs_np = outputs.detach().numpy()
    best3ind = np.argsort(outputs_np,axis=1)[:,-3:][:, ::-1] 
    best3prob = np.sort(outputs_np,axis=1)[:,-3:][:, ::-1]
    predicted = [['%s, confidence: %s'%(classes[best3ind[i][j]], round(best3prob[i][j]*100,2))+'%' for j in range(3)] for i in range(num_images)]

    fig, axs = plt.subplots(1, num_images, figsize=(30, 20))

    # Print Images
    for i in range(num_images):
        img = images[i]
        npimg = img.numpy()

        axs[i].imshow(np.transpose(npimg, (1, 2, 0)), cmap='gray')
        axs[i].set_yticklabels([])
        axs[i].set_xticklabels([])
        axs[i].set_xticks([])
        axs[i].set_yticks([])

        axs[i].set_title(f"Prediction: {predicted[i][0]} \n {predicted[i][1]} \n {predicted[i][2]}\n Ground Truth: {ground_truth[i]}",fontsize = 24)
    plt.show()
    net = net.to(device)
    
def generate_confusion_matrix(net, model_name, f_dataset = fonts_dataset):
  
    # Load the data
    train_loader, val_loader, test_loader, classes = load_dataset(f_dataset = f_dataset)
    confusion_matrix = np.zeros((len(classes), len(classes)))
    net = net.to("cpu")

    with torch.no_grad():

        for i, (inputs, labels) in enumerate(test_loader, 0):
            if i == 100:
                break
                
            # Forward pass
            outputs = net(inputs)
            
            # Find the indices of the max values
            _, indices = torch.max(outputs, dim=1, keepdim=True)
            
            for j in range(len(labels)):
                confusion_matrix[np.argmax(labels[j], axis=0), indices[j]] += 1
        
    plt.figure(figsize=(12,10))
    plt.imshow(confusion_matrix, interpolation='nearest')
    plt.title('Confusion matrix')
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=90)
    plt.yticks(tick_marks, classes)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.show()
    net = net.to(device)
                
    np.savetxt(f"{model_name}/confusion_matrix.csv", confusion_matrix)
    return confusion_matrix

### Defining and Training Models

In [None]:
class BaselineModel(nn.Module):
    def __init__(self):
        super(BaselineModel, self).__init__() 
        self.conv1 = nn.Conv2d(1, 5, 3) 
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(5, 10, 3)
        self.conv3 = nn.Conv2d(10, 20, 3)
        self.fc = nn.Linear(26*26*20, 42)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = x.view(-1, 26*26*20)

        x = self.fc(x)

        return x

In [None]:
class PrimaryModel6c(nn.Module):
    def __init__(self):
        super(PrimaryModel6c, self).__init__() 
        self.conv1 = nn.Conv2d(1, 16, 3, 2) 
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, 3, 2)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 64, 3, 2)
        self.bn3 = nn.BatchNorm2d(64)
        self.conv4 = nn.Conv2d(64, 32, 1)
        self.bn4 = nn.BatchNorm2d(32)
        self.fc1 = nn.Linear(32*27*27, 32*32)
        self.bn5 = nn.BatchNorm1d(32*32)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(32*32, 42)

    def forward(self, x):
        x = self.bn1(F.relu(self.conv1(x)))
        x = self.bn2(F.relu(self.conv2(x)))
        x = self.bn3(F.relu(self.conv3(x)))
        x = self.bn4(F.relu(self.conv4(x)))
        
        x = x.view(-1, 32*27*27)
        
        x = self.bn5(self.dropout(F.relu(self.fc1(x))))
        x = F.softmax(self.fc2(x), dim=1)

        return x

In [None]:
baseline_model_final = BaselineModel()
baseline_model_final = baseline_model_final.to(device)
train_net(baseline_model_final, "baseline_model_final", BATCH_SIZE = 32, learning_rate = 0.005, num_epochs = 15)

In [None]:
primary_model_final = PrimaryModel6c()
primary_model_final = primary_model_final.to(device)
train_net(primary_model_final, "primary_model_final", BATCH_SIZE = 64, learning_rate = 0.0001, patience = 10)

### Testing and Visualizing Baseline Model and Primary Model

In [None]:
train_loader, val_loader, test_loader, classes = load_dataset(f_dataset=fonts_dataset, batch_size = 32)

In [None]:
baseline_model_final.load_state_dict(torch.load("baseline_model_final/best_model"))
baseline_model_final = baseline_model_final.to(device)

test_err, test_loss = evaluate2(baseline_model_final, test_loader)
print(f"Test error: {test_err}, Test loss: {test_loss}")
plot_training_curve("baseline_model_final")
visualize_output(baseline_model_final, num_images = 5, f_dataset=fonts_dataset)
generate_confusion_matrix(baseline_model_final, "baseline_model_final", f_dataset=fonts_dataset)

In [None]:
primary_model_final.load_state_dict(torch.load("primary_model_final/best_model"))
primary_model_final = primary_model_final.to(device)

test_err, test_loss = evaluate2(primary_model_final, test_loader)
print(f"Test error: {test_err}, Test loss: {test_loss}")
plot_training_curve("primary_model_final")
visualize_output(primary_model_final, num_images = 5, f_dataset=fonts_dataset)
generate_confusion_matrix(primary_model_final, "primary_model_final", f_dataset=fonts_dataset)

### Visualizing Primary Model's Performance

In [None]:
handwritten_dataset = torchvision.datasets.ImageFolder(root = "handwritten", transform=transform)
loader = torch.utils.data.DataLoader(handwritten_dataset, batch_size=len(handwritten_dataset), shuffle=True)
handwritten_classes = handwritten_dataset.classes

dataiter = iter(loader)
images, labels = next(dataiter)
primary_model_final = primary_model_final.to("cpu")

images_per_class = len(handwritten_dataset) // len(handwritten_classes)
inputs, labels = next(iter(loader))
outputs = primary_model_final(images)
outputs_np = outputs.detach().numpy()
best3ind = np.argsort(outputs_np,axis=1)[:,-3:][:, ::-1] 
best3prob = np.sort(outputs_np,axis=1)[:,-3:][:, ::-1]
num_images = 3
predicted = [['%s, confidence: %s'%(classes[best3ind[i][j]], round(best3prob[i][j]*100,2))+'%' for j in range(3)] for i in range(num_images)]

primary_model_final = primary_model_final.to(device)
fig, axs = plt.subplots(1, num_images, figsize=(30, 20))
for i in range(num_images):
        img = images[i]
        npimg = img.numpy()

        axs[i].imshow(np.transpose(npimg, (1, 2, 0)), cmap='gray')
        axs[i].set_yticklabels([])
        axs[i].set_xticklabels([])
        axs[i].set_xticks([])
        axs[i].set_yticks([])

        axs[i].set_title(f"Prediction: {predicted[i][0]} \n {predicted[i][1]} \n {predicted[i][2]}",fontsize = 24)
plt.show()

In [None]:
def generate_tsne(net, dataset, points_per_font, handwritten_dataset_path = None):
    import sklearn
    from sklearn.manifold import TSNE
    import pandas as pd
    import seaborn as sns
    
    train_loader, val_loader, test_loader, classes = load_dataset(dataset, batch_size = 32)
    net = net.to("cpu")
    
    counts = {c:0 for c in classes}
    selected_labels = []
    selected_embeddings = []
    
    if handwritten_dataset_path != None:
        handwritten_dataset = torchvision.datasets.ImageFolder(root = handwritten_dataset_path, transform=transform)
        loader = torch.utils.data.DataLoader(handwritten_dataset, batch_size=len(handwritten_dataset), shuffle=False)
        handwritten_classes = handwritten_dataset.classes
        marker_types = ['s', '*', '^', 'D']
        markers_dict = {c:marker_types[i] for i,c in enumerate(handwritten_classes)}
        images_per_class = len(handwritten_dataset) // len(handwritten_classes)
    
    net.eval()
    with torch.no_grad():
        for i, (inputs, labels) in enumerate(test_loader, 0):
            
            labels = torch.argmax(labels, dim=1)
            
            outputs = net(inputs)
                        
            for j in range(inputs.shape[0]):
                
                word_label = classes[labels[j].item()]
                
                if counts[word_label] < points_per_font:
                    
                    selected_labels.append(word_label)
                    selected_embeddings.append(outputs[j].numpy())
                    counts[word_label] += 1
            
            if all([counts[c] == points_per_font for c in classes]):
                break
                    
            print(i, end=",")
        
        num_generated_points = len(selected_labels)
        
        if handwritten_dataset_path != None:
            inputs, labels = next(iter(loader))
            outputs = net(inputs)
            for j in range(inputs.shape[0]):
                selected_labels.append(handwritten_classes[labels[j].item()])
                selected_embeddings.append(outputs[j].numpy())

        
    selected_embeddings = np.array(selected_embeddings)
    selected_labels = np.array(selected_labels)
    
    tsne = TSNE(2)
    tsne_result = tsne.fit_transform(selected_embeddings)
    
    tsne_result_df = pd.DataFrame({'tsne_1': tsne_result[:num_generated_points,0], 'tsne_2': tsne_result[:num_generated_points,1], 'label': selected_labels[:num_generated_points]})
    fig, ax = plt.subplots(1)
    sns.scatterplot(x='tsne_1', y='tsne_2', hue='label',data=tsne_result_df, ax=ax, s=5)
    colors = ['r', 'g', 'b', 'y']
    for i in range(num_generated_points, len(selected_labels), images_per_class):
        # scatter
        ax.scatter(tsne_result[i:i+images_per_class,0], tsne_result[i:i+images_per_class,1], c=colors[(i-num_generated_points)//images_per_class], edgecolors = 'black', s=20, marker=markers_dict[selected_labels[i]], label =selected_labels[i])
    
    lim = (tsne_result.min()-5, tsne_result.max()+5)
    ax.set_xlim(lim)
    ax.set_ylim(lim)
    ax.set_aspect('equal')
    ax.set_xlabel('t-SNE 1')
    ax.set_ylabel('t-SNE 2')
    legend = ax.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.0, ncol=3, markerscale=3)    
    fig.savefig('tsne.png')
    net = net.to(device)

In [None]:
generate_tsne(net = primary_model_final, dataset = fonts_dataset, points_per_font = 20, handwritten_dataset_path="handwritten")