## Restricted Botlzman Machines (RBM)

In [None]:
#FIXME: Review the generation process (theoretically) and fix the implementation 

In [None]:
import os
from typing import List, Dict, Tuple, Literal, Optional, Union, Iterable

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import scipy.io
from tqdm import tqdm
from numpy._typing import ArrayLike

ArrayLike = Union[List, Tuple, np.ndarray]

In [None]:
DATA_FOLDER = "../data/"
ALPHA_DIGIT_PATH = os.path.join(DATA_FOLDER, "binaryalphadigs.mat")
MNIST_PATH = os.path.join(DATA_FOLDER, "mnist_all.mat")

if not os.path.exists(ALPHA_DIGIT_PATH):
    raise FileNotFoundError(f"The file {ALPHA_DIGIT_PATH} does not exist.")

### 3.1 Implementing a RBM and testing on Binary AlphaDigits

In [None]:
def _load_data(file_path: str) -> Dict[str, np.ndarray]:
    """
    Load Binary AlphaDigits data from a .mat file.

    Parameters:
    - file_path (str): Path to the .mat file containing the data.

    Returns:
    - data (dict): Loaded data dictionary.
    """
    if file_path is None:
        raise ValueError("File path must be provided.")

    return scipy.io.loadmat(file_path)


data = _load_data(ALPHA_DIGIT_PATH)
class_labels = data["classlabels"].flatten() 
class_count = data["classcounts"].flatten()
df = pd.DataFrame(
    {
        "Class Labels": class_labels,
        "Class Count": class_count
    }
)
df["Class Labels"] = df["Class Labels"].apply(lambda x: x[0])
df["Class Count"] = df["Class Count"].apply(lambda x: x[0][0])
df

In [None]:
def _load_data(file_path: str, which: Literal["alphadigit", "mnist"]="alphadigit") -> Dict[str, np.ndarray]:
    """
    Load Binary AlphaDigits data from a .mat file.

    Parameters:
    - file_path (str): Path to the .mat file containing the data.
    - which (Literal["alphadigit", "mnist"], optional): Specifies 
        which data to load. The default value is "alphadigit".

    Returns:
    - data (dict): A dictionary containing the loaded data.

    Raises:
    - ValueError: If the file_path parameter is None.
    - ValueError: If the which parameter is not "alphadigit".

    Example Usage:
    ```python
    data = _load_data("data.mat", "alphadigit")
    ```
    """
    if file_path is None:
        raise ValueError("File path must be provided.")
    
    if which == "alphadigit":
        return scipy.io.loadmat(file_path)["dat"]
    
    raise ValueError("MNIST NOT YET AVAILABLE.")

alphadigit_data = _load_data(ALPHA_DIGIT_PATH) 
print(alphadigit_data.shape)
print(alphadigit_data[0][0].shape)

In [None]:
def _map_characters_to_indices(characters: Union[str, int, List[Union[str, int]]]) -> List[int]:
    """
    Map alphanumeric character to its corresponding index.

    Parameters:
    - character (str, int, list of str or int): Alphanumeric character or its index.

    Returns:
    - char_index (int): Corresponding index for the character.
    """
    if isinstance(characters, list):
        return [_map_characters_to_indices(char) for char in characters]
    if isinstance(characters, int) and 0 <= characters <= 35:
        return [characters]
    if (isinstance(characters, str) and characters.isdigit()
          and 0 <= int(characters) <= 9):
        return [int(characters)]
    if (isinstance(characters, str) and characters.isalpha()
          and 'A' <= characters.upper() <= 'Z'):
        return [ord(characters.upper()) - ord('A') + 10]
    
    raise ValueError(
        "Invalid character input. It should be an alphanumeric" 
        "character '[0-9|A-Z]' or its index representing '[0-35]'."
    )

for char in [0, 10, "A", [1, "C"], 36]:
    try:
        map = _map_characters_to_indices(char)
        print(f"{char} > map to > {map}")
    except:
        print(f"{char} > no mapping available, out of range")

In [None]:
def read_alpha_digit(characters: Optional[Union[str, int, List[Union[str, int]]]] = None,
                     file_path: Optional[str] = ALPHA_DIGIT_PATH,
                     data: Optional[Dict[str, np.ndarray]] = None,
                     use_data: bool = False,
                     ) -> np.ndarray:
    """
    Reads binary AlphaDigits data from a .mat file or uses already loaded data. 
    It extracts the data for a specified alphanumeric character or its index, and 
    flattens the images into one-dimensional vectors.

    Parameters:
    - characters (Union[str, int, List[Union[str, int]]], optional): Alphanumeric character 
        or its index whose data needs to be extracted. It can be a single character or 
        a list of characters. Default is None.
    - file_path (str, optional): Path to the .mat file containing the data. 
        Default is None.
    - data (dict, optional): Already loaded data dictionary. 
        Default is None.
    - use_data (bool): Flag to indicate whether to use already loaded data.
        Default is False.

    Returns:
    - flattened_images (numpy.ndarray): Flattened images for the specified character(s).
    """
    if not use_data:
        data = _load_data(file_path, which="alphadigit")

    char_indices = _map_characters_to_indices(characters)

    # Select the rows corresponding to the characters indices.
    char_data: np.ndarray = data[char_indices]
    
    # Flatten each image into a one-dimensional vector.
    flattened_images = np.array([image.flatten() for image in char_data.flatten()])
    return flattened_images

def plot_characters(chars, data):
    num_chars = len(chars)
    num_images_per_char = data.shape[0] // num_chars
    fig, ax = plt.subplots(1, num_chars, figsize=(num_chars * 2, 2))

    for i, char in enumerate(chars):
        # Find the index of the first image corresponding to the current char
        start_index = i * num_images_per_char
        image = data[start_index].reshape(20, 16)
        ax[i].imshow(image, cmap='gray')
        ax[i].set_title(f'Char: {char}')
        ax[i].axis('off')

    plt.tight_layout()
    plt.show()

# Example
chars = [0, "K", 7, "Z"]
data = read_alpha_digit(chars, data=alphadigit_data, use_data=True)
plot_characters(chars, data)

In [None]:
print("data shape:", data.shape)

In [None]:
class RBM:
    def __init__(self, n_visible: int, n_hidden: int=100, random_state=None) -> None:
        """
        Initialize the Restricted Boltzmann Machine.

        Parameters:
        - n_visible (int): Number of visible units.
        - n_hidden (int): Number of hidden units. Default 100.
        - random_state: Random seed for reproducibility.
        """
        self.n_visible = n_visible
        self.n_hidden = n_hidden
        
        self.a = np.zeros((1, n_visible)) # visible_bias
        self.b = np.zeros((1, n_hidden)) # hidden_bias
        self.rng = np.random.default_rng(random_state)
        self.W = 1e-4 * self.rng.standard_normal(size=(n_visible, n_hidden)) # weights

    def __repr__(self) -> str:
        return f"RBM(n_visible={self.n_visible}, n_hidden={self.n_hidden})"

    def _sigmoid(self, x: np.ndarray) -> np.ndarray:
        """
        Sigmoid activation function.

        Parameters:
        - x (numpy.ndarray): Input array.

        Returns:
        - numpy.ndarray: Result of applying the sigmoid function to the input.
        """
        return 1 / (1 + np.exp(-x))
    
    def _reconstruction_error(self, input: np.ndarray, image: np.ndarray) -> float:
        """
        Compute reconstruction error.

        Parameters:
        - input (numpy.ndarray): Original input data.
        - image (numpy.ndarray): Reconstructed image.

        Returns:
        - float: Reconstruction error.
        """
        return np.round(np.power(image - input, 2).mean(), 5)

    def input_output(self, data: np.ndarray) -> np.ndarray:
        """
        Compute hidden units given visible units.

        Parameters:
        - data (numpy.ndarray): Input data, shape (n_samples, n_visible).

        Returns:
        - numpy.ndarray: Hidden unit activations, shape (n_samples, n_hidden).
        """
        return self._sigmoid(data @ self.W + self.b)

    def output_input(self, data_h: np.ndarray) -> np.ndarray:
        """
        Compute visible units given hidden units.

        Parameters:
        - data_h (numpy.ndarray): Hidden unit activations, shape (n_samples, n_hidden).

        Returns:
        - numpy.ndarray: Reconstructed visible units, shape (n_samples, n_visible).
        """
        return self._sigmoid(data_h @ self.W.T + self.a)
    
    def calcul_softmax(self, data: np.ndarray) -> np.ndarray:
        """
        Calculate softmax probabilities for the output units.

        Parameters:
        - input_data (numpy.ndarray): Input data, shape (n_samples, n_visible).

        Returns:
        - numpy.ndarray: Softmax probabilities, shape (n_samples, n_hidden).
        """
        # Compute activations for the hidden layer
        hidden_activations = self.input_output(data)
        
        # Compute softmax probabilities for the output layer
        exp_hidden_activations = np.exp(hidden_activations)
        softmax_probs = exp_hidden_activations / np.sum(exp_hidden_activations, axis=1, keepdims=True)
        
        return softmax_probs

    def update(
            self, 
            batch: np.ndarray,
            learning_rate: float=0.1,
            batch_size: Optional[int]=None,
            return_output: bool=False
        ):
        """_summary_

        Args:
            batch (np.ndarray): _description_
            learning_rate (float, optional): _description_. Defaults to 0.1.
            batch_size (Optional[int], optional): _description_. Defaults to None.
            return_output (bool, optional): _description_. Defaults to False.
        """
        if not batch_size:
            batch_size = batch.shape[0]
        pos_h_probs = self.input_output(batch)
        pos_v_probs = self.output_input(pos_h_probs)
        neg_h_probs = self.input_output(pos_v_probs)
        
        # Update weights and biases
        self.W += learning_rate * (batch.T @ pos_h_probs - pos_v_probs.T @ neg_h_probs) / batch_size
        self.b += learning_rate * (pos_h_probs - neg_h_probs).mean(axis=0)
        self.a += learning_rate * (batch - pos_v_probs).mean(axis=0)

        if return_output:
            return self, pos_v_probs
        
        return self 

    def train(self, 
              data: np.ndarray,
              learning_rate: float=0.1,
              n_epochs: int=10,
              batch_size: int=10,
              print_each=10
        ) -> 'RBM':
        """
        Train the RBM using Contrastive Divergence.

        Parameters:
        - data (numpy.ndarray): Input data, shape (n_samples, n_visible).
        - learning_rate (float): Learning rate for gradient descent. Default is 0.1.
        - n_epochs (int): Number of training epochs. Default is 10.
        - batch_size (int): Size of mini-batches. Default is 10.

        Returns:
        - RBM: Trained RBM instance.
        """
        n_samples = data.shape[0]
        for epoch in range(n_epochs):
            self.rng.shuffle(data)
            for i in tqdm(range(0, n_samples, batch_size), desc=f"Epoch {epoch}"):
                batch = data[i:i+batch_size]
                _, pos_v_probs = self.update(
                    batch=batch,
                    learning_rate=learning_rate,
                    batch_size=batch_size,
                    return_output=True
                )
                
            if epoch % print_each == 0:
                tqdm.write(
                    f"Reconstruction error: {self._reconstruction_error(batch, pos_v_probs)}.")

        return self

    def generate_image(self, n_samples: int=1, n_gibbs_steps: int=1) -> np.ndarray:
        """
        Generate samples from the RBM using Gibbs sampling.

        Parameters:
        - n_samples (int): Number of samples to generate. Default is 10.
        - n_gibbs_steps (int): Number of Gibbs sampling steps. Default is 1.

        Returns:
        - numpy.ndarray: Generated samples, shape (n_samples, n_visible).
        """
        samples = np.zeros((n_samples, self.n_visible))
        
        # Matrix of initlization value of Gibbs samples for each sample. 
        V = self.rng.binomial(1, self.rng.random(), size=n_samples*self.n_visible).reshape((n_samples, self.n_visible))
        for i in range(n_samples):
            for _ in range(n_gibbs_steps):
                h_probs = self._sigmoid(V[i] @ self.W + self.b) # vector
                h = self.rng.binomial(1, h_probs)
                v_probs = self._sigmoid(h @ self.W.T + self.a)
                v = self.rng.binomial(1, v_probs)
            samples[i] = v
        return samples

In [None]:
# Load the alpha_digit data
data = read_alpha_digit(file_path=ALPHA_DIGIT_PATH, characters=['Z'])

In [None]:
# Parameters
n_visible = data.shape[1]  # Number of visible units (size of each image)
n_hidden = 200  # Number of hidden units (hyperparameter)

# Initialize RBM
rbm = RBM(n_visible=n_visible, n_hidden=n_hidden, random_state=42)
print(rbm)

# Train RBM
rbm.train(data, learning_rate=0.1, n_epochs=500, batch_size=10)

In [None]:
np.testing.assert_allclose(rbm.calcul_softmax(data).sum(1), 1)

In [None]:
# Generate samples
generated_samples = rbm.generate_image(n_samples=10, n_gibbs_steps=1)

# Plot original and generated samples
plt.figure(figsize=(12, 6))
for i in range(10):
    plt.subplot(2, 10, i + 1)
    plt.imshow(data[i].reshape(20, 16), cmap='gray')
    plt.title('Original')
    plt.axis('off')
    
    plt.subplot(2, 10, i + 11)
    plt.imshow(generated_samples[i].reshape(20, 16), cmap='gray')
    plt.title('Generated')
    plt.axis('off')

plt.show()

In [None]:
print(rbm)

### 3.2 Implementing a Deep Belief Network (DBN) and test on Binary AlphaDigits

In [None]:
class DBN:
    def __init__(self, n_visible: int, hidden_layer_sizes: list[int], random_state=None):
        """
        Initialize the Deep Belief Network.

        Parameters:
        - n_visible (int): Number of visible units.
        - hidden_layer_sizes (list[int]): List of sizes for each hidden layer.
        - random_state: Random seed for reproducibility.
        """
        self.n_visible = n_visible
        self.hidden_layer_sizes = hidden_layer_sizes
        self.rbms: List[RBM] = []
        self.rng = np.random.default_rng(random_state)

        # Initialize the first RBM
        first_rbm = RBM(
            n_visible=n_visible,
            n_hidden=hidden_layer_sizes[0],
            random_state=random_state,
        )
        self.rbms.append(first_rbm)

        # Initialize RBMs for subsequent hidden layers
        for i, size in enumerate(hidden_layer_sizes[1:], start=1):
            rbm = RBM(
                n_visible=hidden_layer_sizes[i - 1],
                n_hidden=size,
                random_state=random_state,
            )
            self.rbms.append(rbm)


    def __getitem__(self, key):
        return self.rbms[key]
    

    def __repr__(self):
        """
        Return a string representation of the DBN object.
        """
        rbm_reprs = [f"{'':4}{repr(rbm)}" for rbm in self.rbms]
        join_rbm_reprs = ',\n'.join(rbm_reprs)
        return f"DBN([\n{join_rbm_reprs}\n])"


    def train(self,
        data: np.ndarray,
        learning_rate: float=0.1,
        n_epochs: int=10,
        batch_size: int=10,
        print_each: int=10,
    ) -> "DBN":
        """
        Train the DBN using Greedy layer-wise procedure.

        Parameters:
        - data (numpy.ndarray): Input data, shape (n_samples, n_visible).
        - learning_rate (float): Learning rate for gradient descent. Default is 0.1.
        - n_epochs (int): Number of training epochs. Default is 10.
        - batch_size (int): Size of mini-batches. Default is 10.
        - print_each: Print reconstruction error each `print_each` epochs.
        - verbose

        Returns:
        - DBN: Trained DBN instance.
        """
        input_data = data
        for rbm in self.rbms:
            rbm.train(
                input_data,
                learning_rate=learning_rate,
                n_epochs=n_epochs,
                batch_size=batch_size,
                print_each=print_each,
            )
            # Update input data for the next RBM
            input_data = rbm.input_output(input_data)

        return self

    def generate_image(self, n_samples: int=1, n_gibbs_steps: int=1) -> np.ndarray:
        """
        Generate samples from the DBN using Gibbs sampling.

        Parameters:
        - n_samples (int): Number of samples to generate. Default is 1.
        - n_gibbs_steps (int): Number of Gibbs sampling steps. Default is 100.

        Returns:
        - numpy.ndarray: Generated samples, shape (n_samples, n_visible).
        """
        # samples = np.zeros((n_samples, self.n_visible))

        # Generate samples using the first RBM in the DBN
        samples = self.rbms[-1].generate_image(n_samples, n_gibbs_steps)
        for rbm in reversed(self.rbms[:-1]):
            # Sample from the conditional probability of layer l-1 given layer l: p(h_{s-1}|h_{s}).
            h_probs = rbm.output_input(samples)
            h = self.rng.binomial(1, p=h_probs) 
            samples = h
        return samples

In [None]:
# from principal_dbn_alpha import DBN

In [None]:
n_visible=data.shape[1]
hidden_layer_sizes = [100, 50, 25]

dbn = DBN(n_visible=n_visible, hidden_layer_sizes=hidden_layer_sizes, random_state=42)
dbn.train(data, learning_rate=0.1, n_epochs=10, batch_size=10)

In [None]:
# Check if the RBM are accessibles via a slicing 
print(dbn[1:3])

In [None]:
# # Generate images
generated_images = dbn.generate_image(n_samples=5, n_gibbs_steps=1)

# Display generated images
fig, axes = plt.subplots(nrows=1, ncols=5, figsize=(12, 4))
for i in range(5):
    axes[i].imshow(generated_images[i].reshape(20, 16), cmap='gray')
    axes[i].set_title(f"Image {i+1}")
    axes[i].axis('off')
plt.tight_layout()
plt.show()

In [None]:
class DNN(DBN):
    def __init__(
        self,
        input_dim: int,
        output_dim: int,
        hidden_layer_sizes: List[int],
        random_state=None
    ):
        """
        Initialize the Deep Neural Network (DNN).

        Parameters:
        - input_dim (int): Dimension of the input.
        - output_dim (int): Dimension of the output.
        - hidden_layer_sizes (List[int]): List of sizes for each hidden layer.
        - random_state: Random seed for reproducibility.
        """
        super().__init__(
            n_visible=input_dim,
            hidden_layer_sizes=hidden_layer_sizes,
            random_state=random_state
        )
        #--> self.rbms contains only the pre-trainable RBMs 
        self.clf = RBM(self.rbms[-1].n_hidden, output_dim)
        self.network = self.rbms + [self.clf]  # DNN = [DBN + Classifier] ~ [RBM_0,...,RBM_N, RBM_Clf]

    def __getitem__(self, key):
        return self.network[key]
    
    def __repr__(self):
        join_repr = "\n".join([f"{'':4}{repr(rbm)}," for rbm in self.network])
        return f"DNN([\n{join_repr} <HEAD CLF>\n])"
    
    
    def pretrain(self, n_epochs: int, learning_rate: float, batch_size: int, data: np.ndarray) -> "DNN":
        """
        Pretrain the hidden layers of the DNN using the DBN training method.

        Parameters:
        - n_epochs (int): Number of training epochs.
        - learning_rate (float): Learning rate for gradient descent.
        - batch_size (int): Size of mini-batches.
        - data (numpy.ndarray): Input data, shape (n_samples, n_visible).

        Returns:
        - DNN: Pretrained DNN instance.
        """
        # NOTE: Use the inherited `train` method to perform pre-training since `self.rbms`
        # only contains the pre-trainable RBMs.
        return self.train(data, n_epochs=n_epochs, learning_rate=learning_rate, batch_size=batch_size)
    
    def input_output(self, input_data: np.ndarray) -> Tuple[List[np.ndarray], np.ndarray]:
        """
        Get the outputs on each layer of the DNN and the softmax probabilities on the output layer.

        Parameters:
        - input_data (numpy.ndarray): Input data, shape (n_samples, n_visible).

        Returns:
        - Tuple[List[np.ndarray], np.ndarray]: Outputs on each layer & softmax probabilities.
        """
        layer_outputs = []
        
        # Input layer output
        layer_outputs.append(input_data)
        
        # Hidden layers output
        for rbm in self.rbms:
            layer_outputs.append(rbm.input_output(layer_outputs[-1]))
        
        # Softmax probabilities on the output layer
        output_probs = self.network[-1].calcul_softmax(layer_outputs[-1])
        
        return layer_outputs, output_probs
    

    def _cross_entropy(batch_labels: np.ndarray, output_probs: np.ndarray, eps: float = 1e-15) -> float:
        """
        Calculate the cross entropy between the batch labels and output probabilities.

        Parameters:
        - batch_labels (numpy.ndarray): True labels for the batch, shape (batch_size, n_classes).
        - output_probs (numpy.ndarray): Predicted probabilities for the batch, shape (batch_size, n_classes).
        - eps (float): Small value to avoid numerical instability in logarithm calculation. Default is 1e-15.

        Returns:
        - float: Cross entropy value.
        """
        return -np.mean(np.sum(batch_labels * np.log(output_probs + eps), axis=1))


    def backpropagation(
            self,
            input_data: np.ndarray,
            labels: np.ndarray,
            n_epochs: int,
            learning_rate: float,
            batch_size: int,
            eps: float = 1e-15
        ) -> "DNN":
        """
        Estimate the weights/biases of the network using backpropagation algorithm.

        Parameters:
        - input_data (numpy.ndarray): Input data, shape (n_samples, n_visible).
        - labels (numpy.ndarray): Labels for the input data, shape (n_samples, n_classes).
        - n_epochs (int): Number of training epochs.
        - learning_rate (float): Learning rate for gradient descent.
        - batch_size (int): Size of mini-batches.
        - eps (float): Small value to avoid numerical instability in logarithm calculation. Default is 1e-15.

        Returns:
        - DNN: Updated DNN instance.
        """
        n_samples = input_data.shape[0]
        
        for epoch in tqdm(range(n_epochs), desc="Training", unit="epoch"):
            for batch_start in range(0, n_samples, batch_size):
                batch_end = min(batch_start + batch_size, n_samples)
                batch_input = input_data[batch_start:batch_end]
                batch_labels = labels[batch_start:batch_end]

                # Forward pass
                layer_outputs, output_probs = self.input_output(batch_input)

                # Backward pass (update weights and biases)
                self.network[-1].update(batch_labels, layer_outputs[-1], learning_rate)
                for i in range(len(self.network) - 2, -1, -1):
                    self.network[i].update(layer_outputs[i], layer_outputs[i + 1], self.network[i + 1].weights, learning_rate)

            # Calculate cross entropy after each epoch
            loss = self._cross_entropy(batch_labels, output_probs, eps)
            tqdm.write(f"Epoch {epoch + 1}/{n_epochs}, Cross Entropy: {loss}")

        return self


In [None]:
n_visible=data.shape[1]
hidden_layer_sizes = [100, 50, 25]
output_dim = 20

dnn = DNN(input_dim=n_visible, hidden_layer_sizes=hidden_layer_sizes, output_dim=output_dim, random_state=42)
# keep last RBM's weights for further test.
weights = dnn[-1].W 

dnn.train(data, learning_rate=0.1, n_epochs=10, batch_size=10)

# Check that the last RBM has not been trained.
np.testing.assert_equal (dnn[-1].a, 0) # visible bias
np.testing.assert_equal (dnn[-1].b, 0) # hidden bias
np.testing.assert_equal (dnn[-1].W, weights) # weights

In [None]:
layer_outputs, softmax_output = dnn.input_output(data)
n_layers_net = len(layer_outputs) + 1
print("No. network output =", n_layers_net)

print(f"Input data (0): {layer_outputs[0].shape}")
for idx, layer_output in enumerate(layer_outputs[1:]):
    print(f"Hidden layer input ({idx+1}): {layer_output.shape}")

print(f"Softmax output ({n_layers_net - 1}):", softmax_output.shape)