In [169]:
# Imports
import numpy as np
from typing import List, Dict, Tuple, Union
from numpy.typing import ArrayLike

class NeuralNetwork:
    """
    This is a class that generates a fully-connected neural network.

    Parameters:
        nn_arch: List[Dict[str, float]]
            A list of dictionaries describing the layers of the neural network.
            e.g. [{'input_dim': 64, 'output_dim': 32, 'activation': 'relu'}, {'input_dim': 32, 'output_dim': 8, 'activation:': 'sigmoid'}]
            will generate a two-layer deep network with an input dimension of 64, a 32 dimension hidden layer, and an 8 dimensional output.
        lr: float
            Learning rate (alpha).
        seed: int
            Random seed to ensure reproducibility.
        batch_size: int
            Size of mini-batches used for training.
        epochs: int
            Max number of epochs for training.
        loss_function: str
            Name of loss function.

    Attributes:
        arch: list of dicts
            (see nn_arch above)
    """

    def __init__(
        self,
        nn_arch: List[Dict[str, Union[int, str]]],
        lr: float,
        seed: int,
        batch_size: int,
        epochs: int,
        loss_function: str
    ):

        # Save architecture
        self.arch = nn_arch

        # Save hyperparameters
        self._lr = lr
        self._seed = seed
        self._epochs = epochs
        self._loss_func = loss_function
        self._batch_size = batch_size

        # Initialize the parameter dictionary for use in training
        self._param_dict = self._init_params()

    def _init_params(self) -> Dict[str, ArrayLike]:
        """
        DO NOT MODIFY THIS METHOD! IT IS ALREADY COMPLETE!

        This method generates the parameter matrices for all layers of
        the neural network. This function returns the param_dict after
        initialization.

        Returns:
            param_dict: Dict[str, ArrayLike]
                Dictionary of parameters in neural network.
        """

        # Seed NumPy
        np.random.seed(self._seed)

        # Define parameter dictionary
        param_dict = {}

        # Initialize each layer's weight matrices (W) and bias matrices (b)
        for idx, layer in enumerate(self.arch):
            layer_idx = idx + 1
            input_dim = layer['input_dim']
            output_dim = layer['output_dim']
            param_dict['W' + str(layer_idx)] = np.random.randn(output_dim, input_dim) * 0.1
            param_dict['b' + str(layer_idx)] = np.random.randn(output_dim, 1) * 0.1

        return param_dict

    def _single_forward(
        self,
        W_curr: ArrayLike,
        b_curr: ArrayLike,
        A_prev: ArrayLike,
        activation: str
    ) -> Tuple[ArrayLike, ArrayLike]:
            """
            This method is used for a single forward pass on a single layer.

            Args:
                W: ArrayLike
                    Current layer weight matrix.
                b: ArrayLike
                    Current layer bias matrix.
                A_prev: ArrayLike
                    Previous layer activation matrix.
                activation: str
                    Name of activation function for current layer.

            Returns:
                A_curr: ArrayLike
                    Current layer activation matrix.
                Z_curr: ArrayLike
                    Current layer linear transformed matrix.
            """
            # print(f"W_curr: {W_curr.shape}")
            # print(f"b_curr: {b_curr.shape}")
            # print(f"A_prev: {A_prev.shape}")
            Z_curr = np.dot(W_curr, A_prev) + b_curr
            if activation == 'relu':
                A_curr = self._relu(Z_curr)
            elif activation == 'sigmoid':
                A_curr = self._sigmoid(Z_curr)
            else:
                raise ValueError(f"{activation} is not a valid activation function.")
            # print(f"A_curr: {A_curr.shape}")
            # print(f"Z_curr: {Z_curr.shape}")
            return A_curr, Z_curr


    def forward(self, X: ArrayLike) -> Tuple[ArrayLike, Dict[str, ArrayLike]]:
        """
        This method is responsible for one forward pass of the entire neural network.

        Args:
            X: ArrayLike
                Input matrix with shape [batch_size, features].

        Returns:
            output: ArrayLike
                Output of forward pass.
            cache: Dict[str, ArrayLike]:
                Dictionary storing Z and A matrices from `_single_forward` for use in backprop.
        """
        # Initialize cache dictionary
        cache = {}

        # Compute the activation matrix of the input layer
        A_curr = X.T

        # Loop through each layer of the neural network
        for idx, layer in enumerate(self.arch):
            # Increment layer index
            layer_idx = idx + 1

            # Get the weight matrix, bias matrix and activation function for the current layer
            W_curr = self._param_dict[f'W{layer_idx}']
            b_curr = self._param_dict[f'b{layer_idx}']
            activation = layer['activation']

            # Compute activation and linear transformation matrices for current layer
            A_curr, Z_curr = self._single_forward(W_curr, b_curr, A_curr, activation)

            # Store A and Z matrices in cache dictionary
            cache[f"A{layer_idx}"] = A_curr
            cache[f"Z{layer_idx}"] = Z_curr

        # Return final output and cache dictionary
        output = A_curr.T
        return output, cache

    def _single_backprop(
        self,
        W_curr: ArrayLike,
        b_curr: ArrayLike,
        Z_curr: ArrayLike,
        A_prev: ArrayLike,
        dA_curr: ArrayLike,
        activation: str
    ) -> Tuple[ArrayLike, ArrayLike, ArrayLike]:
        """
        This function performs the backprop for a single layer in the network.

        Args:
            W_curr: ArrayLike
                Weight matrix for the current layer.
            b_curr: ArrayLike
                Bias matrix for the current layer.
            Z_curr: ArrayLike
                Output matrix of the linear transform for the current layer.
            A_prev: ArrayLike
                Activation matrix from the previous layer.
            dA_curr: ArrayLike
                Partial derivative of current layer activations.
            activation: str
                String indicating the activation function to use.

        Returns:
            dA_prev: ArrayLike
                Partial derivative of the previous layer's activations.
            dW_curr: ArrayLike
                Partial derivative of the current layer's weight matrix.
            db_curr: ArrayLike
                Partial derivative of the current layer's bias matrix.
        """
        m = A_prev.shape[1]

        if activation == 'sigmoid':
            dZ_curr = dA_curr * self._sigmoid_backprop(dA_curr, Z_curr)
        elif activation == 'relu':
            dZ_curr = dA_curr * self._relu_backprop(dA_curr, Z_curr)

        dW_curr = 1/m * np.dot(dZ_curr, A_prev.T).T  # Transposed here
        db_curr = 1/m * np.sum(dZ_curr, axis=1, keepdims=True)
        dA_prev = np.dot(W_curr.T, dZ_curr)

        return dA_prev, dW_curr, db_curr


    def backprop(self, y: ArrayLike, y_hat: ArrayLike, cache: Dict[str, ArrayLike]):
        """
        This method is responsible for the backprop of the whole fully connected neural network.

        Args:
            y (array-like):
                Ground truth labels.
            y_hat: ArrayLike
                Predicted output values.
            cache: Dict[str, ArrayLike]
                Dictionary containing the information about the
                most recent forward pass, specifically A and Z matrices.

        Returns:
            grad_dict: Dict[str, ArrayLike]
                Dictionary containing the gradient information from this pass of backprop.
        """
        # Initialize dictionary to store gradients
        grad_dict = {}

        # Compute derivative of loss with respect to output layer activations
        if self._loss_func == 'binary_cross_entropy':
            dA_curr = self._binary_cross_entropy_backprop(y, y_hat)
        elif self._loss_func == 'mse':
            dA_curr = self._mean_squared_error_backprop(y, y_hat)
        else:
            raise ValueError("Unsupported loss function.")

        # Loop through layers backwards
        for layer_idx, layer in reversed(list(enumerate(self.arch))):
            layer_idx = layer_idx + 1

            # Get relevant parameter values
            A_prev = cache['A' + str(layer_idx - 1)]
            W_curr = self._param_dict['W' + str(layer_idx)]
            b_curr = self._param_dict['b' + str(layer_idx)]
            Z_curr = cache['Z' + str(layer_idx)]

            # Compute gradients for current layer
            dA_prev, dW_curr, db_curr = self._single_backprop(W_curr, b_curr, Z_curr, A_prev, dA_curr, layer['activation'])

            # Save gradients for current layer
            grad_dict['dW' + str(layer_idx)] = dW_curr
            grad_dict['db' + str(layer_idx)] = db_curr

            # Set dA_curr for next iteration to be dA_prev
            dA_curr = dA_prev

        return grad_dict

    def _update_params(self, grad_dict: Dict[str, ArrayLike]):
        """
        This function updates the parameters in the neural network after backprop. This function
        only modifies internal attributes and does not return anything

        Args:
            grad_dict: Dict[str, ArrayLike]
                Dictionary containing the gradient information from most recent round of backprop.
        """
        for key in self._param_dict:
            if 'W' in key:
                self._param_dict[key] -= self._lr * grad_dict['d' + key]
            elif 'b' in key:
                self._param_dict[key] -= self._lr * grad_dict['d' + key]



    def fit(
        self,
        X_train: ArrayLike,
        y_train: ArrayLike,
        X_val: ArrayLike,
        y_val: ArrayLike
    ) -> Tuple[List[float], List[float]]:
        """
        This function trains the neural network by backpropagation for the number of epochs defined at
        the initialization of this class instance.

        Args:
            X_train: ArrayLike
                Input features of training set.
            y_train: ArrayLike
                Labels for training set.
            X_val: ArrayLike
                Input features of validation set.
            y_val: ArrayLike
                Labels for validation set.

        Returns:
            per_epoch_loss_train: List[float]
                List of per epoch loss for training set.
            per_epoch_loss_val: List[float]
                List of per epoch loss for validation set.
        """
        num_train_samples = X_train.shape[0]
        num_val_samples = X_val.shape[0]

        per_epoch_loss_train = []
        per_epoch_loss_val = []

        for epoch in range(self._epochs):
            # Shuffle the training data
            permutation = np.random.permutation(num_train_samples)
            X_train = X_train[permutation]
            y_train = y_train[permutation]

            # Initialize loss for epoch
            epoch_loss_train = 0

            # Train on mini-batches
            for i in range(0, num_train_samples, self._batch_size):
                X_batch = X_train[i:i + self._batch_size]
                y_batch = y_train[i:i + self._batch_size]

                # Forward pass
                y_hat_batch, cache = self.forward(X_batch)

                # Compute loss
                loss_batch = self._binary_cross_entropy(y_batch, y_hat_batch)
                epoch_loss_train += loss_batch

                # Backward pass
                grad_dict = self.backprop(y_batch, y_hat_batch, cache)

                # Update parameters
                self._update_params(grad_dict)

            # Compute per-epoch loss and add to list
            epoch_loss_train /= num_train_samples
            per_epoch_loss_train.append(epoch_loss_train)

            # Compute validation loss
            y_hat_val, _ = self.forward(X_val)
            epoch_loss_val = self._binary_cross_entropy(y_val, y_hat_val)
            per_epoch_loss_val.append(epoch_loss_val)

            # Print progress
            print(f"Epoch {epoch + 1}: train_loss={epoch_loss_train:.4f}, val_loss={epoch_loss_val:.4f}")

        return per_epoch_loss_train, per_epoch_loss_val

    

    def predict(self, X: ArrayLike) -> ArrayLike:
        """
        This function returns the prediction of the neural network.

        Args:
            X: ArrayLike
                Input data for prediction.

        Returns:
            y_hat: ArrayLike
                Prediction from the model.
        """
        # Do a forward pass
        output, _ = self.forward(X)

        # Get predictions (assuming binary classification)
        y_hat = np.round(output)

        return y_hat


    def _sigmoid(self, Z: ArrayLike) -> ArrayLike:
        """
        Sigmoid activation function.

        Args:
            Z: ArrayLike
                Output of layer linear transform.

        Returns:
            nl_transform: ArrayLike
                Activation function output.
        """
        nl_transform = 1 / (1 + np.exp(-Z))
        return nl_transform
    

    def _sigmoid_backprop(self, dA: ArrayLike, Z: ArrayLike):
        """
        Sigmoid derivative for backprop.

        Args:
            dA: ArrayLike
                Partial derivative of previous layer activation matrix.
            Z: ArrayLike
                Output of layer linear transform.

        Returns:
            dZ: ArrayLike
                Partial derivative of current layer Z matrix.
        """
        sigmoid = self._sigmoid(Z)

        # print(f"dA shape: {dA.shape}")
        # print(f"sigmoid shape: {sigmoid.shape}")
        # print(f"(1 - sigmoid) shape: {(1 - sigmoid).shape}")
        
        dZ = dA * sigmoid * (1 - sigmoid)
        return dZ


    def _relu(self, Z: ArrayLike) -> ArrayLike:
        """
        ReLU activation function.

        Args:
            Z: ArrayLike
                Output of layer linear transform.

        Returns:
            nl_transform: ArrayLike
                Activation function output.
        """
        return np.maximum(0, Z)

    def _relu_backprop(self, dA: ArrayLike, Z: ArrayLike) -> ArrayLike:
        """
        ReLU derivative for backprop.

        Args:
            dA: ArrayLike
                Partial derivative of previous layer activation matrix.
            Z: ArrayLike
                Output of layer linear transform.

        Returns:
            dZ: ArrayLike
                Partial derivative of current layer Z matrix.
        """
        dZ = np.array(dA, copy=True)
        dZ[Z <= 0] = 0
        return dZ
    
    def _binary_cross_entropy(self, y: ArrayLike, y_hat: ArrayLike) -> float:
        """
        Binary cross entropy loss function.

        Args:
            y_hat: ArrayLike
                Predicted output.
            y: ArrayLike
                Ground truth output.

        Returns:
            loss: float
                Average loss over mini-batch.
        """
        m = y.shape[0]
        loss = -1/m * np.sum(y * np.log(y_hat) + (1-y) * np.log(1-y_hat))
        return loss
    

    def _binary_cross_entropy_backprop(self, y: ArrayLike, y_hat: ArrayLike) -> ArrayLike:
        """
        Binary cross entropy loss function derivative for backprop.

        Args:
            y_hat: ArrayLike
                Predicted output.
            y: ArrayLike
                Ground truth output.

        Returns:
            dA: ArrayLike
                partial derivative of loss with respect to A matrix.
        """
        return -(np.divide(y, y_hat) - np.divide(1 - y, 1 - y_hat))

    def _mean_squared_error(self, y: ArrayLike, y_hat: ArrayLike) -> float:
        """
        Mean squared error loss.

        Args:
            y: ArrayLike
                Ground truth output.
            y_hat: ArrayLike
                Predicted output.

        Returns:
            loss: float
                Average loss of mini-batch.
        """
        m = y.shape[0]
        loss = np.sum((y - y_hat) ** 2) / m
        return loss

    def _mean_squared_error_backprop(self, y: ArrayLike, y_hat: ArrayLike) -> ArrayLike:
        """
        Mean square error loss derivative for backprop.

        Args:
            y_hat: ArrayLike
                Predicted output.
            y: ArrayLike
                Ground truth output.

        Returns:
            dA: ArrayLike
                partial derivative of loss with respect to A matrix.
        """
        m = y.shape[0]
        dA = -2 * (y - y_hat) / m
        return dA
    

In [170]:
# Imports
import numpy as np
from typing import List, Tuple
from numpy.typing import ArrayLike

def sample_seqs(seqs: List[str], labels: List[bool]) -> Tuple[List[str], List[bool]]:
    """
    This function should sample the given sequences to account for class imbalance. 
    Consider this a sampling scheme with replacement.
    
    Args:
        seqs: List[str]
            List of all sequences.
        labels: List[bool]
            List of positive/negative labels

    Returns:
        sampled_seqs: List[str]
            List of sampled sequences which reflect a balanced class size
        sampled_labels: List[bool]
            List of labels for the sampled sequences
    """
    pos_seqs = [seq for seq, label in zip(seqs, labels) if label]
    neg_seqs = [seq for seq, label in zip(seqs, labels) if not label]
    
    # Calculate number of sequences to sample from each class
    n_pos = len(pos_seqs)
    n_neg = len(neg_seqs)
    n_samples = min(n_pos, n_neg)
    
    # Sample sequences with replacement
    pos_samples = np.random.choice(pos_seqs, n_samples, replace=True)
    neg_samples = np.random.choice(neg_seqs, n_samples, replace=True)
    
    # Combine the sampled sequences and labels
    sampled_seqs = list(pos_samples) + list(neg_samples)
    sampled_labels = [True] * n_samples + [False] * n_samples
    
    # Shuffle the sequences and labels
    shuffle_idx = np.random.permutation(len(sampled_seqs))
    sampled_seqs = [sampled_seqs[i] for i in shuffle_idx]
    sampled_labels = [sampled_labels[i] for i in shuffle_idx]
    
    return sampled_seqs, sampled_labels

def one_hot_encode_seqs(seq_arr: List[str]) -> ArrayLike:
    """
    This function generates a flattened one-hot encoding of a list of DNA sequences
    for use as input into a neural network.

    Args:
        seq_arr: List[str]
            List of sequences to encode.

    Returns:
        encodings: ArrayLike
            Array of encoded sequences, with each encoding 4x as long as the input sequence.
            For example, if we encode:
                A -> [1, 0, 0, 0]
                T -> [0, 1, 0, 0]
                C -> [0, 0, 1, 0]
                G -> [0, 0, 0, 1]
            Then, AGA -> [1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0].
    """
    # Define the one-hot encoding dictionary
    encoding_dict = {'A': [1, 0, 0, 0], 'T': [0, 1, 0, 0], 'C': [0, 0, 1, 0], 'G': [0, 0, 0, 1]}

    # Initialize an empty list to store the encodings
    encodings = []

    # Iterate over each sequence in seq_arr
    for seq in seq_arr:
        # Initialize an empty list to store the one-hot encoding of this sequence
        encoding = []
        # Iterate over each nucleotide in the sequence
        for nt in seq:
            # Append the one-hot encoding of this nucleotide to the encoding list
            encoding += encoding_dict[nt]
        # Append the encoding to the list of encodings
        encodings.append(encoding)

    # Convert the list of encodings to a NumPy array and return it
    return np.array(encodings)

In [171]:
from sklearn.datasets import load_digits

# Load the digits dataset
digits = load_digits()

# Get the data and labels
X = digits.data
y = digits.target

# Print the shape of the data and labels
print(f"Data shape: {X.shape}")
print(f"Labels shape: {y.shape}")

# Split the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

# Define architecture
autoencoder_arch = [
    {'input_dim': 64, 'output_dim': 16, 'activation': 'relu'},
    {'input_dim': 16, 'output_dim': 64, 'activation': 'sigmoid'}
]


# Initialize and train the neural network
nn = NeuralNetwork(nn_arch=[{'input_dim': 64, 'output_dim': 16, 'activation': 'relu'}, {'input_dim': 16, 'output_dim': 64, 'activation': 'sigmoid'}],
                   lr=0.1, seed=42, batch_size=32, epochs=100, loss_function='mse')
nn.fit(X_train, X_train, X_val, X_val)

Data shape: (1797, 64)
Labels shape: (1797,)


ValueError: operands could not be broadcast together with shapes (32,64) (64,32) 