## Vectorizing the sparse representation and testing with multi layer perceptron

In [7]:
%matplotlib inline
import json
import numpy as np
import os
import random
from matplotlib import pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets
from torchvision.transforms import v2
from PIL import Image
import pywt
from wavelet_preprocessing import *

dataset_path = "../data"
batch_size = 32
toTensor = v2.Compose([v2.ToImage(), v2.ToDtype(torch.float32, scale=True)])
train_data = datasets.FashionMNIST(dataset_path, train=True, transform=toTensor, download=True)
test_data = datasets.FashionMNIST(dataset_path, train=False, transform=toTensor, download=True)
sample_data = datasets.FashionMNIST(dataset_path, train=True, transform=None, download=True)
label_names = test_data.classes

train_dataloader = DataLoader(train_data, batch_size, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size, shuffle=True)

PERCENTILE_THRESH = 0

In [16]:
def get_sample_dwt(dataset, id):
    img, label = dataset[id]
    img = np.array(img) # grayscale images
    details = dec_single_level_gray_combined(img, PERCENTILE_THRESH) # zeros out anything lower than 99th percentile
    details /= np.max(details)
    nz_row, nz_col = np.nonzero(details)
    print(nz_row.shape[0])
    vec = np.zeros((nz_row.shape[0], 3))
    count = 0
    for idx, i in enumerate(nz_row):
        j = nz_col[idx]
        vec[count, 0] = details[i,j]
        vec[count, 1] = i / details.shape[0] # normalize row and col coords between [0, 1]
        vec[count, 2] = j / details.shape[1]
        count += 1
    # sort by highest value
    vec = vec[abs(vec[:, 0]).argsort()[::-1]]
    vec = vec.ravel()
    if vec.shape[0] < VEC_LEN:
        diff = VEC_LEN - vec.shape[0]
        zeros = np.zeros((diff,))
        vec = np.concatenate([vec, zeros]) # pad zeros to ensure
    else:
        vec = vec[:VEC_LEN]
    vec = torch.Tensor(vec)
    return vec, label

def preprocess_dwt(x, vec_len, thresh):
    x_ = torch.zeros((x.shape[0], vec_len))
    arr = x.squeeze().cpu().data.numpy()
    for batch, img in enumerate(arr):
        details = dec_single_level_gray_combined(img, thresh) # zeros out anything lower than percentile
        details /= np.max(details)
        nz_row, nz_col = np.nonzero(details)
        vec = np.zeros((nz_row.shape[0], 3))
        count = 0
        for idx, i in enumerate(nz_row):
            j = nz_col[idx]
            vec[count, 0] = details[i,j]
            vec[count, 1] = i / details.shape[0] # normalize row and col coords between [0, 1]
            vec[count, 2] = j / details.shape[1]
            count += 1
        # sort by highest value
        vec = vec[abs(vec[:, 0]).argsort()[::-1]]
        vec = vec.ravel()
        if vec.shape[0] < vec_len:
            diff = vec_len - vec.shape[0]
            zeros = np.zeros((diff,))
            vec = np.concatenate([vec, zeros]) # pad zeros to ensure
        else:
            vec = vec[:vec_len]
        x_[batch, :] = torch.Tensor(vec) # add coefficients to output tensor
    return x_

def preprocess_dwt_coords(x, vec_len, thresh):
    x_ = torch.zeros((x.shape[0], vec_len))
    arr = x.squeeze().cpu().data.numpy()
    for batch, img in enumerate(arr):
        details = dec_single_level_gray_combined(img, thresh) # zeros out anything lower than percentile
        details /= np.max(details)
        nz_row, nz_col = np.nonzero(details)
        vec = np.zeros((nz_row.shape[0], 3))
        count = 0
        for idx, i in enumerate(nz_row):
            j = nz_col[idx]
            vec[count, 0] = details[i,j]
            vec[count, 1] = i / details.shape[0] # normalize row and col coords between [0, 1]
            vec[count, 2] = j / details.shape[1]
            count += 1
        # sort by highest value
        vec = vec[abs(vec[:, 0]).argsort()[::-1]]
        vec = vec[:, 1:]
        vec = vec.ravel()
        if vec.shape[0] < vec_len:
            diff = vec_len - vec.shape[0]
            zeros = np.zeros((diff,))
            vec = np.concatenate([vec, zeros]) # pad zeros to ensure
        else:
            vec = vec[:vec_len]
        x_[batch, :] = torch.Tensor(vec) # add coefficients to output tensor
    return x_

def get_thumbnail(x):
    x_ = torch.zeros((x.shape[0], x.shape[1], x.shape[2]//2, x.shape[3]//2))
    arr = x.squeeze().cpu().data.numpy()
    for batch, img in enumerate(arr):
        details = dec_gray_thumbnail(img) # zeros out anything lower than percentile
        x_[batch, 0, :, :] = torch.Tensor(details) # add coefficients to output tensor
    return x_

def get_sample_normal(dataset, id):
    img, label = dataset[id]
    img = np.array(img) # grayscale images
    img = img.ravel()
    vec = torch.Tensor(img)
    return vec, label

def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100
    return acc

In [10]:
class MLP(nn.Module):
    def __init__(self, input_shape: int, hidden_units: int, output_shape: int):
        super().__init__()
        self.layer_stack = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=input_shape, out_features=hidden_units), # in_features = number of features in a data sample (784 pixels)
            nn.ReLU(),
            nn.Linear(in_features=hidden_units, out_features=output_shape),
            nn.Softmax(dim=0)
        )
    
    def forward(self, x):
        return self.layer_stack(x)
    

normal_loss_fn = nn.CrossEntropyLoss()
dwt_loss_fn = nn.CrossEntropyLoss()

In [20]:
def train_step(model: torch.nn.Module, 
               dataloader: torch.utils.data.DataLoader, 
               loss_fn: torch.nn.Module, 
               optimizer: torch.optim.Optimizer,
               dwt = False,
               dwt_len = 50,
               thresh = 90):
    # Put model in train mode
    model.train()
    
    # Setup train loss and train accuracy values
    train_loss, train_acc = 0, 0
    
    # Loop through data loader data batches
    for batch, (X, y) in enumerate(dataloader):
        if dwt:
            X = preprocess_dwt(X, dwt_len, thresh)
        # else:
        #     X = get_thumbnail(X)
        # 1. Forward pass
        y_pred = model(X)

        # 2. Calculate  and accumulate loss
        loss = loss_fn(y_pred, y)
        train_loss += loss.item() 
        train_acc += accuracy_fn(y, y_pred.argmax(dim=1))

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

    # Adjust metrics to get average loss and accuracy per batch 
    train_loss /= len(dataloader)
    train_acc /= len(dataloader)
    return train_loss, train_acc

def test_step(model: torch.nn.Module, 
              dataloader: torch.utils.data.DataLoader, 
              loss_fn: torch.nn.Module,
              dwt = False,
              dwt_len = 50,
              thresh = 90):
    # Put model in eval mode
    model.eval() 
    
    # Setup test loss and test accuracy values
    test_loss, test_acc = 0, 0
    
    # Turn on inference context manager
    with torch.inference_mode():
        # Loop through DataLoader batches
        for batch, (X, y) in enumerate(dataloader):
            if dwt:
                X = preprocess_dwt(X, dwt_len, thresh)
            # else:
            #     X = get_thumbnail(X)
            # 1. Forward pass
            test_pred = model(X)

            # 2. Calculate and accumulate loss
            loss = loss_fn(test_pred, y)
            test_loss += loss.item()
            test_acc += accuracy_fn(y, test_pred.argmax(dim=1))
            
    # Adjust metrics to get average loss and accuracy per batch 
    test_loss /= len(dataloader)
    test_acc /= len(dataloader)
    return test_loss, test_acc

# 1. Take in various parameters required for training and test steps
def train(model: torch.nn.Module,
          train_dataloader: torch.utils.data.DataLoader, 
          test_dataloader: torch.utils.data.DataLoader, 
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module = nn.CrossEntropyLoss(),
          epochs: int = 5,
          dwt = False,
          dwt_len = 100,
          thresh = 90,
          save: bool = False,
          save_freq: int = 100):
    
    # 2. Create empty results dictionary
    results = {"train_loss": [],
                "test_loss": [],
                "train_acc": [],
                 "test_acc": []}
    
    # 3. Loop through training and testing steps for a number of epochs
    for epoch in range(epochs):
        train_loss, train_acc = train_step(model=model,
                                dataloader=train_dataloader,
                                loss_fn=loss_fn,
                                optimizer=optimizer,
                                dwt=dwt,
                                dwt_len=dwt_len,
                                thresh=thresh)
        test_loss, test_acc = test_step(model=model,
                              dataloader=test_dataloader,
                              loss_fn=loss_fn,
                              dwt=dwt,
                              dwt_len=dwt_len,
                              thresh=thresh)
        
        # 4. Print out what's happening
        print(f"Epoch: {epoch+1}\t| train_loss:\t{train_loss:.5f} | train_accuracy:\t{train_acc:.3f} | test_loss:\t{test_loss:.5f} | test_accuracy:\t{test_acc:.3f}")

        # 5. Update results dictionary
        results["train_loss"].append(train_loss)
        results["test_loss"].append(test_loss)
        results["train_acc"].append(train_acc)
        results["test_acc"].append(test_acc)
        if (epoch % save_freq == 0 and epoch != 0 and save) or (epoch == epochs-1 and save):
            torch.save(model.state_dict(), f"../data/CNN_{epoch}_epochs.pkl")

    return results

In [15]:
def train_step_coords(model: torch.nn.Module, 
               dataloader: torch.utils.data.DataLoader, 
               loss_fn: torch.nn.Module, 
               optimizer: torch.optim.Optimizer,
               dwt = False,
               dwt_len = 50,
               thresh = 90):
    # Put model in train mode
    model.train()
    
    # Setup train loss and train accuracy values
    train_loss, train_acc = 0, 0
    
    # Loop through data loader data batches
    for batch, (X, y) in enumerate(dataloader):
        if dwt:
            X = preprocess_dwt_coords(X, dwt_len, thresh)
        # else:
        #     X = get_thumbnail(X)
        # 1. Forward pass
        optimizer.zero_grad()
        y_pred = model(X)

        # 2. Calculate  and accumulate loss
        loss = loss_fn(y_pred, y)
        train_loss += loss.item() 
        train_acc += accuracy_fn(y, y_pred.argmax(dim=1))

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

    # Adjust metrics to get average loss and accuracy per batch 
    train_loss /= len(dataloader)
    train_acc /= len(dataloader)
    return train_loss, train_acc

def test_step_coords(model: torch.nn.Module, 
              dataloader: torch.utils.data.DataLoader, 
              loss_fn: torch.nn.Module,
              dwt = False,
              dwt_len = 50,
              thresh = 90):
    # Put model in eval mode
    model.eval() 
    
    # Setup test loss and test accuracy values
    test_loss, test_acc = 0, 0
    
    # Turn on inference context manager
    with torch.inference_mode():
        # Loop through DataLoader batches
        for batch, (X, y) in enumerate(dataloader):
            if dwt:
                X = preprocess_dwt_coords(X, dwt_len, thresh)
            # else:
            #     X = get_thumbnail(X)
            # 1. Forward pass
            test_pred = model(X)

            # 2. Calculate and accumulate loss
            loss = loss_fn(test_pred, y)
            test_loss += loss.item()
            test_acc += accuracy_fn(y, test_pred.argmax(dim=1))
            
    # Adjust metrics to get average loss and accuracy per batch 
    test_loss /= len(dataloader)
    test_acc /= len(dataloader)
    return test_loss, test_acc

# 1. Take in various parameters required for training and test steps
def train_coords(model: torch.nn.Module,
          train_dataloader: torch.utils.data.DataLoader, 
          test_dataloader: torch.utils.data.DataLoader, 
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module = nn.CrossEntropyLoss(),
          epochs: int = 5,
          dwt = False,
          dwt_len = 100,
          thresh = 90,
          save: bool = False,
          save_freq: int = 100):
    
    # 2. Create empty results dictionary
    results = {"train_loss": [],
                "test_loss": [],
                "train_acc": [],
                 "test_acc": []}
    
    # 3. Loop through training and testing steps for a number of epochs
    for epoch in range(epochs):
        train_loss, train_acc = train_step_coords(model=model,
                                dataloader=train_dataloader,
                                loss_fn=loss_fn,
                                optimizer=optimizer,
                                dwt=dwt,
                                dwt_len=dwt_len,
                                thresh=thresh)
        test_loss, test_acc = test_step_coords(model=model,
                              dataloader=test_dataloader,
                              loss_fn=loss_fn,
                              dwt=dwt,
                              dwt_len=dwt_len,
                              thresh=thresh)
        
        # 4. Print out what's happening
        print(f"Epoch: {epoch+1}\t| train_loss:\t{train_loss:.5f} | train_accuracy:\t{train_acc:.3f} | test_loss:\t{test_loss:.5f} | test_accuracy:\t{test_acc:.3f}")

        # 5. Update results dictionary
        results["train_loss"].append(train_loss)
        results["test_loss"].append(test_loss)
        results["train_acc"].append(train_acc)
        results["test_acc"].append(test_acc)
        if (epoch % save_freq == 0 and epoch != 0 and save) or (epoch == epochs-1 and save):
            torch.save(model.state_dict(), f"../data/CNN_{epoch}_epochs.pkl")

    return results

In [13]:
# wavelet sparse model
epochs = 10
vec_len = 90
thresh = 90
dwt_model = MLP(input_shape=vec_len, hidden_units=256, output_shape=len(label_names))
dwt_optimizer = torch.optim.SGD(params=dwt_model.parameters(), lr=0.1)
dwt_results = train(dwt_model, train_dataloader, test_dataloader, dwt_optimizer, dwt_loss_fn, epochs, dwt=True, dwt_len=vec_len, thresh=thresh)

Epoch: 1	| train_loss:	2.30118 | train_accuracy:	16.940 | test_loss:	2.29904 | test_accuracy:	22.504
Epoch: 2	| train_loss:	2.29229 | train_accuracy:	25.202 | test_loss:	2.28137 | test_accuracy:	26.717
Epoch: 3	| train_loss:	2.26940 | train_accuracy:	28.150 | test_loss:	2.26075 | test_accuracy:	29.113
Epoch: 4	| train_loss:	2.25286 | train_accuracy:	30.443 | test_loss:	2.24860 | test_accuracy:	30.591
Epoch: 5	| train_loss:	2.24228 | train_accuracy:	31.802 | test_loss:	2.24056 | test_accuracy:	31.639
Epoch: 6	| train_loss:	2.23655 | train_accuracy:	31.930 | test_loss:	2.23558 | test_accuracy:	31.979
Epoch: 7	| train_loss:	2.23115 | train_accuracy:	32.380 | test_loss:	2.23135 | test_accuracy:	32.109
Epoch: 8	| train_loss:	2.22864 | train_accuracy:	32.393 | test_loss:	2.23091 | test_accuracy:	31.530
Epoch: 9	| train_loss:	2.22487 | train_accuracy:	32.468 | test_loss:	2.22859 | test_accuracy:	31.480
Epoch: 10	| train_loss:	2.22393 | train_accuracy:	32.275 | test_loss:	2.22700 | test_accura

In [21]:
# wavelet sparse model
epochs = 5
vec_len = 196 * 3
thresh = 0
dwt_model100 = MLP(input_shape=vec_len, hidden_units=256, output_shape=len(label_names))
dwt_optimizer100 = torch.optim.SGD(params=dwt_model100.parameters(), lr=0.1)
dwt_results = train(dwt_model100, train_dataloader, test_dataloader, dwt_optimizer100, dwt_loss_fn, epochs, dwt=True, dwt_len=vec_len, thresh=thresh)

KeyboardInterrupt: 

In [18]:
# wavelet sparse model only coordinates
epochs = 5
vec_len = 90
thresh = 85
dwt_model2 = MLP(input_shape=vec_len, hidden_units=256, output_shape=len(label_names))
dwt_optimizer2 = torch.optim.SGD(params=dwt_model2.parameters(), lr=0.1)
dwt_results = train_coords(dwt_model2, train_dataloader, test_dataloader, dwt_optimizer2, dwt_loss_fn, epochs, dwt=True, dwt_len=vec_len, thresh=thresh)

Epoch: 1	| train_loss:	2.30107 | train_accuracy:	21.248 | test_loss:	2.29869 | test_accuracy:	27.985
Epoch: 2	| train_loss:	2.29005 | train_accuracy:	30.527 | test_loss:	2.27664 | test_accuracy:	33.317
Epoch: 3	| train_loss:	2.26449 | train_accuracy:	34.483 | test_loss:	2.25450 | test_accuracy:	33.956
Epoch: 4	| train_loss:	2.24789 | train_accuracy:	34.902 | test_loss:	2.24261 | test_accuracy:	34.655
Epoch: 5	| train_loss:	2.23840 | train_accuracy:	34.997 | test_loss:	2.23598 | test_accuracy:	34.435


In [14]:
# test normal model
epochs = 10
normal_model = MLP(input_shape=28*28, hidden_units=512, output_shape=len(label_names))
normal_optimizer = torch.optim.SGD(params=normal_model.parameters(), lr=0.1)
normal_results = train(normal_model, train_dataloader, test_dataloader, normal_optimizer, normal_loss_fn, epochs, dwt=False)

Epoch: 1	| train_loss:	2.15524 | train_accuracy:	63.372 | test_loss:	2.10357 | test_accuracy:	66.094
Epoch: 2	| train_loss:	2.09134 | train_accuracy:	68.732 | test_loss:	2.08258 | test_accuracy:	69.669
Epoch: 3	| train_loss:	2.07890 | train_accuracy:	71.587 | test_loss:	2.07620 | test_accuracy:	71.615
Epoch: 4	| train_loss:	2.07479 | train_accuracy:	72.682 | test_loss:	2.07514 | test_accuracy:	72.045
Epoch: 5	| train_loss:	2.07279 | train_accuracy:	73.333 | test_loss:	2.07372 | test_accuracy:	73.343
Epoch: 6	| train_loss:	2.07074 | train_accuracy:	74.017 | test_loss:	2.07364 | test_accuracy:	73.223
Epoch: 7	| train_loss:	2.06991 | train_accuracy:	74.603 | test_loss:	2.07308 | test_accuracy:	74.311
Epoch: 8	| train_loss:	2.06887 | train_accuracy:	75.073 | test_loss:	2.07181 | test_accuracy:	74.012
Epoch: 9	| train_loss:	2.06854 | train_accuracy:	75.487 | test_loss:	2.07203 | test_accuracy:	74.341
Epoch: 10	| train_loss:	2.06771 | train_accuracy:	75.697 | test_loss:	2.07153 | test_accura

In [6]:
# test normal model on thumbnail
epochs = 10
model_thumbnail = MLP(input_shape=14*14, hidden_units=256, output_shape=len(label_names))
thumbnail_optimizer = torch.optim.SGD(params=model_thumbnail.parameters(), lr=0.1)
thumbnail_results = train(model_thumbnail, train_dataloader, test_dataloader, thumbnail_optimizer, normal_loss_fn, epochs, dwt=False)

Epoch: 1	| train_loss:	2.14957 | train_accuracy:	62.608 | test_loss:	2.10397 | test_accuracy:	65.994
Epoch: 2	| train_loss:	2.09430 | train_accuracy:	67.585 | test_loss:	2.08690 | test_accuracy:	68.970
Epoch: 3	| train_loss:	2.08166 | train_accuracy:	70.547 | test_loss:	2.08167 | test_accuracy:	70.377
Epoch: 4	| train_loss:	2.07724 | train_accuracy:	72.383 | test_loss:	2.08101 | test_accuracy:	70.767
Epoch: 5	| train_loss:	2.07387 | train_accuracy:	73.200 | test_loss:	2.07623 | test_accuracy:	71.975
Epoch: 6	| train_loss:	2.07332 | train_accuracy:	73.583 | test_loss:	2.07281 | test_accuracy:	73.343
Epoch: 7	| train_loss:	2.07279 | train_accuracy:	74.087 | test_loss:	2.07557 | test_accuracy:	73.333
Epoch: 8	| train_loss:	2.07173 | train_accuracy:	74.397 | test_loss:	2.07392 | test_accuracy:	73.492
Epoch: 9	| train_loss:	2.07059 | train_accuracy:	74.862 | test_loss:	2.07394 | test_accuracy:	74.121
Epoch: 10	| train_loss:	2.07034 | train_accuracy:	75.130 | test_loss:	2.07346 | test_accura

## Testing with the Describable Textures Dataset (DTD)

In [None]:
import numpy as np
import random
from matplotlib import pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets
from torchvision.transforms import v2
from PIL import Image
import pywt
from wavelet_preprocessing import *

dataset_path = "../data"
batch_size = 16
train_transform = v2.Compose([v2.ToImage(), v2.ToDtype(torch.float32, scale=True), v2.Resize((240,240))])
train_data = datasets.DTD(dataset_path, train=True, transform=train_transform, download=True)
test_data = datasets.DTD(dataset_path, train=False, transform=train_transform, download=True)
sample_data = datasets.DTD(dataset_path, train=True, transform=None, download=True)
label_names = test_data.classes

train_dataloader = DataLoader(train_data, batch_size, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size, shuffle=True)