# Midterm 2 Assignment 3 Davide Amadei

### Imports

In [None]:
import numpy as np
from matplotlib import pyplot as plt
from typing import Iterator
import time
from sewar.full_ref import uqi

### Utility functions

In [None]:
def show_img(image: np.ndarray, gray: bool = True, title: str = None) -> None:
    """simple utility to show images with axes disabled

    Parameters
    ----------
    image : np.ndarray
        image to show
    gray : bool, optional
        if true show image in grayscale, by default True
    """
    if gray:
        plt.imshow(image, cmap="gray")
    else:
        plt.imshow(image)
    if title is not None:
        plt.title(title)
    plt.axis("off")
    plt.show()

In [None]:
def mse(x: np.ndarray, y: np.ndarray) -> np.ndarray:
    """Mean Squared Error

    Parameters
    ----------
    x : np.ndarray
        first input
    y : np.ndarray
        second input

    Returns
    -------
    np.ndarray
        returns the Mean Squared Error computed on x and y
    """
    return ((x-y)**2).mean()

def sigmoid(x: np.ndarray) -> np.ndarray:
    """sigmoid function applied elementwise to a matrix or vector

    Parameters
    ----------
    x : np.ndarray
        input matrix or vector

    Returns
    -------
    np.ndarray
        matrix or vector of same size of input containing elementwise sigmoid
    """
    return 1/(1+np.exp(-x))

### Function to read MNIST dataset
As the MNIST dataset is provided in idx format it has to be read from the original file to be converted into a usable form.<br>
This function does exactly that, decoding the file based on the format explained on the MNIST site http://yann.lecun.com/exdb/mnist/

In [None]:
def read_MNIST_file(path: str, reshape: bool = False) -> tuple[np._ShapeType, np.ndarray]:
    """simple utility function to read a file containing MNIST datasets

    Parameters
    ----------
    path : str
        path containing the file to load
    reshape : bool, optional
        flag determining wheter the dataset is returned already reshaped as images or flattened.
        If True each element of the dataset is reshaped to a matrix of size 28x28, 
        else each element is a vector of size 784, by default False

    Returns
    -------
    tuple[np._ShapeType, np.ndarray]
        the shape of the output data and the dataset loaded from the file

    Raises
    ------
    ValueError
        if the magic numbers of the file are wrong
    """
    with open(path, "rb") as f:
        magic = int.from_bytes(f.read(4))
        n_items = int.from_bytes(f.read(4))

        # image file
        if magic == 2051:
            n_rows = int.from_bytes(f.read(4))
            n_columns = int.from_bytes(f.read(4))

            # if reshape is true change the output shape
            if reshape:
                shape = (n_items, n_rows, n_columns)
            else:
                shape = (n_items, n_rows * n_columns)
            images = np.ndarray(shape)

            # read each image one by one
            for i in range(n_items):
                img_buffer = f.read(n_rows*n_columns)
                # convert binary data buffer to numpy int array
                img = np.frombuffer(img_buffer, np.uint8)

                # reshape the image if needed
                if reshape:
                    img = np.reshape(img, (n_rows, n_columns))

                images[i] = img
            return (shape, images/255)
        # label file, not implemented as it was not needed for this assignment
        elif magic == 2049:
            pass
        # wrong type of file
        else:
            raise ValueError("wrong file format")

Load the training set and the test set

In [None]:
tr_img_shape, tr_images = read_MNIST_file("./data/train-images-idx3-ubyte")
ts_img_shape, ts_images = read_MNIST_file("./data/t10k-images-idx3-ubyte")

## Restricted Boltzmann Machine implementation
This is the implementation of the Restricted Boltzmann Machine and all the needed methods to train it and to use it for reconstructing data outside of the training set.<br>
All the sampling is done binarizing the data as seen in the code example in class. This means starting from an array of elements $0 \leq x \leq 1$  $\forall x$ and using them as probablities to get a binary array where each $x$ is used as the probabliity of being sampled to 1.

### Quantitative measures for the error

To get a quantitive measure of the error of the RBM during training and on the test data two measures are used. The first one is simply the MSE, compute for each batch and then averaged at the end of the epoch. The other measure is the Universal image Quality Index (UQI). It always returns a value between 0 and 1, with 1 being that the two images are the same. The implementation in the sewar library was used https://github.com/andrewekhalel/sewar. More on this index can be found on the paper it was introduced in https://ieeexplore.ieee.org/document/995823. I decided to use this in addition to MSE to get a possibly easier to interpret score of the similarity between the original image and the one reconstructed by the RBM.<br>
Unfortunately because of the way the UQI works and maybe because of how it is implemented in sewar it needs to be computed on each image separately, which also need to be reshaped for the UQI to work. This takes a lot of time overall and slows down the training tremendously.

In [None]:
class RBM:
    """
    class implementing a Restricted Boltzmann Machine with a method to fit it on given data 
    and a method to reconstruct data given in input
    """
    def __init__(self, v_units: int, h_units:int = 100, seed:int = None):
        """init method

        Parameters
        ----------
        v_units : int
            number of visible units, must be the same as the size of a single element of training data
        h_units : int
            number of hidden units, by default 100
        seed : int, optional
            seed to initialize RNG, by default None
        """
        self._rng = np.random.default_rng(seed)
        self._seed = seed
        self._weights = self._rng.normal(size=(v_units, h_units))
        self._bv = self._rng.normal(size=v_units)
        self._bh = self._rng.normal(size=h_units)
        # self._bv = self._rng.uniform(0, 1, size=v_units)
        # self._bh = self._rng.uniform(0, 1, size=h_units)
    
    @staticmethod
    def get_minibatches(
        x: np.ndarray, batchsize: int
    ) -> Iterator[tuple[np.ndarray, np.ndarray]]:
        """Returns minibatches of given size over (x, y).

        Parameters
        ----------
        x : np.ndarray
            data array
        batchsize : int
            batch size of yielded minibatches

        Yields
        ------
        Iterator[tuple[np.ndarray,np.ndarray]]
            iterator over minibatches
        """
        if batchsize in [None, 0, -1]:
            batchsize = x.shape[0]
        size = x.shape[0]
        batchtotal, remainder = divmod(size, batchsize)
        for i in range(batchtotal):
            mini_x = x[i * batchsize : (i + 1) * batchsize]
            yield mini_x
        if remainder > 0:
            yield (x[batchtotal * batchsize :])

    def sample_data(self, data: np.ndarray, seed: int = None) -> np.ndarray:
        """method to get a sample of binary data from a given array of elements x where 0 <= x <= 1.
        These elements are seen as probabilities and become 0 or 1 indipendently of each other.
        P(x->1) = x, P(x->0) = 1-x 

        Parameters
        ----------
        data : np.ndarray
            data to sample
        seed : int, optional
            seed to use for sampling, if None uses the RNG of the current obect, by default None

        Returns
        -------
        np.ndarray
            an array containing the sampled data. The elements of this array are either 0 or 1
        """
        if seed is None:
            return self._rng.binomial(1, p=data)
        else:
            tmp_rng = np.random.default_rng(seed)
            return tmp_rng.binomial(1, p=data)
        
    def reset(self, seed: int = None):
        """method to reset the RBM object by reinitializing weights and RNG

        Parameters
        ----------
        seed : int, optional
            seed to use for RNG reinitialization. If none uses old seed, by default None
        """
        if seed is not None:
            self._seed = seed
        self._rng = np.random.default_rng(self._seed)
        self._weights = self._rng.normal(size=self._weights.shape)
        self._bv = self._rng.normal(size=self._bv.shape)
        self._bh = self._rng.normal(size=self._bh.shape)
        
    def fit(self, tr_data: np.ndarray, max_epochs=10, lr: float = 1, batchsize=128, img_shape: np._ShapeType = None):
        """function to train the RBM

        Parameters
        ----------
        tr_data : np.ndarray
            data to use for training
        max_epochs : int, optional
            max number of epochs, by default 10
        lr : float, optional
            learning rate, multiplies the updates to the weights, by default 1
        batchsize : int, optional
            size of each batch, by default 128
        img_shape : np._ShapeType, optional
            shape of the images that are being processed during training.
            If not None in addition to the MSE the Universal image Quality Index (UQI) is also computed, by default None
        """
        start = time.time()

        # keep track of the total time spent on UQI computation
        tot_uqi_time = 0

        for i in range(max_epochs):
            shuffled_data = tr_data[self._rng.permutation(tr_data.shape[0])]
            loss_list = []
            if img_shape is not None:
                uqi_loss_list = []
                epoch_uqi_time = 0
            for batch in RBM.get_minibatches(shuffled_data, batchsize):
                
                sampled_data = self.sample_data(batch)
                h_prob0 = sigmoid(np.dot(sampled_data, self._weights) + self._bh)
                sampled_h0 = self.sample_data(h_prob0)

                wake = np.dot(sampled_data.T, sampled_h0)

                recon_data_prob = sigmoid(np.dot(sampled_h0, self._weights.T) + self._bv)
                recon_data = self.sample_data(recon_data_prob)

                h_prob1 = sigmoid(np.dot(recon_data, self._weights) + self._bh)
                # sampled_h1 = self.sample_data(h_prob1)
                
                dream = np.dot(recon_data.T, h_prob1)

                self._weights += lr * (wake - dream)/batch.shape[0]
                self._bv += lr * (np.sum(sampled_data, axis=0) - 
                                  np.sum(recon_data, axis=0))/batch.shape[0]
                self._bh += lr * (np.sum(h_prob0, axis=0) - 
                                  np.sum(h_prob1, axis=0))/batch.shape[0]
                if img_shape != None:
                    uqi_time_start = time.time()
                    uqi_loss_batch = 0
                    reshaped_batch = np.reshape(batch, (batch.shape[0],) + img_shape)
                    reshaped_recon_data = np.reshape(recon_data_prob, (recon_data_prob.shape[0],) + img_shape)

                    for t in range(batch.shape[0]):
                        uqi_loss_batch += uqi(reshaped_batch[t], reshaped_recon_data[t])

                    uqi_loss_list.append(uqi_loss_batch / batch.shape[0])
                    uqi_time_end = time.time()
                    epoch_uqi_time += uqi_time_end - uqi_time_start

                loss_list.append(mse(batch, recon_data_prob))

            loss_avg = np.round(np.mean(loss_list), 4)
            loss_std = np.round(np.std(loss_list), 4)
            
            if img_shape is not None:
                uqi_loss_avg = np.round(np.mean(uqi_loss_list), 4)
                uqi_loss_std = np.round(np.std(uqi_loss_list), 4)
                tot_uqi_time += epoch_uqi_time
                
            end = time.time()
            eta = (end - start) * (max_epochs - i - 1) / (i + 1)
            if img_shape is not None:
                print(f"Time spent on UQI computation: {np.round(epoch_uqi_time, 2)}s")
                print(f"Epoch number {i+1} done. MSE: {loss_avg} \xB1 {loss_std}. "
                    f"UQI: {uqi_loss_avg} \xB1 {uqi_loss_std} "
                    f"Elapsed time: {np.round(end-start, 2)}s. ETA: {np.round(eta, 2)}s")
            else:
                print(f"Epoch number {i+1} done. MSE: {loss_avg} \xB1 {loss_std}. "
                    f"Elapsed time: {np.round(end-start, 2)}s. ETA: {np.round(eta, 2)}s")
        if img_shape is not None:
            print(f"Total time spent on UQI computation: {np.round(tot_uqi_time, 2)}s")
    
    def reconstruct(self, ts_data, seed = None):
        sampled_data = self.sample_data(ts_data, seed=seed)

        h_prob0 = sigmoid(np.dot(sampled_data, self._weights) + self._bh)
        sampled_h0 = self.sample_data(h_prob0, seed=seed)
        
        recon_data_prob = sigmoid(np.dot(sampled_h0, self._weights.T) + self._bv)
        return recon_data_prob


In [None]:
boltzmann_machine = RBM(v_units = tr_img_shape[1], h_units = 100, seed = 123)

In [None]:
boltzmann_machine.reset()
boltzmann_machine.fit(tr_images, max_epochs = 20, lr = 1, batchsize = 50, img_shape = (28,28))

In [None]:
reconstructed_test = boltzmann_machine.reconstruct(ts_images)
reconstructed_test_reshaped = np.reshape(reconstructed_test, (reconstructed_test.shape[0], 28, 28))
ts_images_reshaped = np.reshape(ts_images, (ts_images.shape[0], 28, 28))

In [None]:

loss_avg = np.round(mse(ts_images, reconstructed_test), 4)
loss_std = np.round(((ts_images - reconstructed_test)**2).std(), 4)
uqi_loss_list = []

for i in range(ts_images_reshaped.shape[0]):
    uqi_loss_list.append(uqi(ts_images_reshaped[i], reconstructed_test_reshaped[i]))
uqi_loss_avg = np.round(np.mean(uqi_loss_list), 4)
uqi_loss_std = np.round(np.std(uqi_loss_list), 4)

print(f"MSE: {loss_avg} \xB1 {loss_std}\n"
      f"UQI: {uqi_loss_avg} \xB1 {uqi_loss_std}")

In [None]:
index = 2983
print(uqi(ts_images_reshaped[index], reconstructed_test_reshaped[index]))
show_img(ts_images_reshaped[index])
show_img(reconstructed_test_reshaped[index])

In [None]:
w_test = boltzmann_machine._weights
w_test = np.reshape(w_test, (28, 28, w_test.shape[1]))
show_img(w_test[:, :, 40])

In [None]:
test = np.ones((20, 3))
lul = np.random.random(3)
print(lul)
print(test)
print(test - lul)
v = test - lul
c = test - np.broadcast_to(lul, test.shape)
print(v-c)

In [None]:
test = np.random.random(10)
print(np.random.binomial(1, p=test))