<a href="https://colab.research.google.com/github/TapasKumarDutta1/deep_neural_network_from_scratch_pneumonia_detection/blob/main/training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import seaborn as sns
import os
import cv2
import seaborn as sns
import numpy as np
import gc
import matplotlib.pyplot as plt
import math
from tqdm import tqdm
from tqdm import tqdm  # Import tqdm for progress tracking
from sklearn.metrics import *  # Import necessary metrics from scikit-learn

np.random.seed(777)  # Set random seed for reproducibility

import warnings
warnings.filterwarnings("ignore")  # Ignore warnings to keep the output clean

In [None]:
def get_data(data_dir):
    labels = ["PNEUMONIA", "NORMAL"]
    data = []
    for label in labels:
        path = os.path.join(data_dir, label)
        class_num = labels.index(label)
        for en, img in enumerate(os.listdir(path)):
            if en == 0:
                print(img)
            try:
                img_arr = cv2.imread(os.path.join(path, img), cv2.IMREAD_GRAYSCALE)
                resized_arr = cv2.resize(img_arr, (IMG_WIDTH, IMG_HEIGHT))
                data.append([resized_arr, class_num])
            except Exception as e:
                pass

    return np.array(data)

# Load and preprocess training data
train = get_data("../input/chest-xray-pneumonia/chest_xray/chest_xray/train")

trn = []
tar = []
for i in tqdm(range(5216)):
    img = train[i][0]

    trn.append(img.reshape((1, 240, 240, 1)))
    tar.append(train[i][1])

trn = np.asarray(trn)
tar = np.asarray(tar)

# Normalize training data
trn = trn / 255


In [None]:
# Function to shuffle two arrays in unison
def unison_shuffled_copies(a, b):
    """
    Shuffle two arrays in unison.

    Parameters:
        a (ndarray): First array to be shuffled.
        b (ndarray): Second array to be shuffled.

    Returns:
        ndarray, ndarray: Shuffled versions of input arrays a and b.
    """
    assert len(a) == len(b)
    p = np.random.permutation(len(a))
    return a[p], b[p]

# Shuffle the training data and labels in unison
trn, tar = unison_shuffled_copies(trn, tar)

# Reshape training data and labels for further processing
trn = trn.reshape((5216, 240, 240, 1))
tar = tar.reshape((5216, 1))

# LAYERS

# Sigmoid activation function
def sigmoid(x):
    """
    Sigmoid activation function.

    Parameters:
        x (ndarray): Input array.

    Returns:
        ndarray: Output array after applying sigmoid activation element-wise.
    """
    return 1 / (1 + np.exp(x * -1))

# Dot product (also known as inner product or scalar product) of two arrays
def dot(a, b):
    """
    Compute the dot product of two arrays.

    Parameters:
        a (ndarray): First input array.
        b (ndarray): Second input array.

    Returns:
        ndarray: Result of the dot product.
    """
    return np.sum(a * b, axis=(-1, -2, -3))

# Glorot (Xavier) initialization for weight matrices
def glorot(size1, in1, out1):
    """
    Glorot (Xavier) initialization for weight matrices.

    Parameters:
        size1 (int): Size of the weight matrix.
        in1 (int): Number of input units.
        out1 (int): Number of output units.

    Returns:
        ndarray: Weight matrix with values initialized using the Glorot initialization method.
    """
    dst = np.random.normal(0, 1, size1)
    mx = np.max(dst)
    mn = np.min(dst)

    # Transform values to be within [0, 1]
    dst = (dst - mn) / (mx - mn)

    # Transform values to be within [-1, 1]
    dst = (dst - 0.5) * 2

    # Calculate the scale factor using the Glorot initialization formula
    scl = 2.449489742783178 / (np.sqrt(in1 + out1))

    # Scale the values within the range [-scl, scl]
    dst *= scl
    return dst


# Maxpooling operation
def maxpool(image, kernel):
    """
    Maxpooling operation.

    Parameters:
        image (ndarray): Input image tensor of shape (batch_size, length, width, channels).
        kernel (int): Size of the maxpooling kernel.

    Returns:
        ndarray, int, ndarray, ndarray: Maxpooled image tensor, kernel size, indices of max values, and input image tensor.
    """
    bs, l, w, c = image.shape
    out = np.zeros((bs, int(math.ceil(l / kernel)), int(math.ceil(w / kernel)), c))
    idxs = []
    for i in range(int(math.ceil(l / kernel))):
        for j in range(int(math.ceil(w / kernel))):
            look = image[:, i * kernel : (i + 1) * kernel, j * kernel : (j + 1) * kernel, :]
            out[:, i, j, :] = np.max(look, axis=(1, 2))
            idxs.append(np.where(np.in1d(look, out[:, i, j, :]))[0])
    return out, kernel, np.asarray(idxs), image


# Convolution operation
def convolution(image, kernel, out):
    """
    Convolution operation.

    Parameters:
        image (ndarray): Input image tensor of shape (batch_size, s1, s2, s3).
        kernel (int): Size of the convolution kernel.
        out (int): Number of output channels.

    Returns:
        ndarray, ndarray, ndarray: Convolutional weight matrix, input image tensor, and convolved output tensor.
    """
    bs, s1, s2, s3 = image.shape
    b = np.zeros((bs, (s1 - kernel) + 1, (s2 - kernel) + 1, out))
    in1 = len(image.ravel()) / bs
    out1 = len(b.ravel()) / bs
    a = glorot((kernel, kernel, s3, out), kernel * kernel * s3, out)
    for i in range(out):
        for j in range(b.shape[0]):
            for k in range(b.shape[1]):
                out = dot(a[:, :, :, i], image[:, j : j + kernel, k : k + kernel])
                b[:, j, k, i] = out
    return a, image, b


# Flatten operation
def flatten(image):
    """
    Flatten operation.

    Parameters:
        image (ndarray): Input image tensor.

    Returns:
        ndarray, ndarray: Input image tensor and flattened output tensor.
    """
    bs, _, _, _ = image.shape
    return image, image.reshape(bs, -1)


# Dense layer
def dense(input1, out):
    """
    Dense layer.

    Parameters:
        input1 (ndarray): Input tensor of shape (batch_size, b).
        out (int): Number of output units.

    Returns:
        ndarray, ndarray, ndarray: Dense layer weight matrix, input tensor, and output tensor.
    """
    bs, b = input1.shape
    in1 = b
    out1 = out
    weights = glorot((b, out), in1, out1)
    out = np.dot(input1, weights)
    return weights, input1, sigmoid(out)


# Loss function
def loss(true, pred):
    """
    Cross-entropy loss function.

    Parameters:
        true (ndarray): True labels.
        pred (ndarray): Predicted probabilities.

    Returns:
        ndarray: Loss value.
    """
    return -1 * ((true * np.log(pred)) + ((1 - true) * (np.log(1 - pred))))


# Loss derivative (backpropagation)
def loss_back(true, pred):
    """
    Derivative of the loss function.

    Parameters:
        true (ndarray): True labels.
        pred (ndarray): Predicted probabilities.

    Returns:
        ndarray: Loss derivative.
    """
    return pred - true


# Rotate array by 180 degrees
def rotate_180(array):
    """
    Rotate the array by 180 degrees.

    Parameters:
        array (ndarray): Input array.

    Returns:
        ndarray: Rotated array.
    """
    M, N = array.shape
    out = np.zeros_like(array)
    for i in range(M):
        for j in range(N):
            out[i, N - 1 - j] = array[M - 1 - i, j]
    return out

# Maxpooling derivative
def maxpool_derivative(derivative, kernel, idxs):
    """
    Maxpooling derivative.

    Parameters:
        derivative (ndarray): Derivative tensor of shape (batch_size, length, width, channels).
        kernel (int): Size of the maxpooling kernel.
        idxs (ndarray): Indices of max values.

    Returns:
        ndarray: Maxpooling derivative tensor of shape (batch_size, length * kernel, width * kernel, channels).
    """
    bs, l, w, c = derivative.shape
    out = np.zeros((bs, int(l * kernel), int(w * kernel), c))
    en = 0
    for i in range(l):
        for j in range(w):
            look = np.zeros((bs, kernel, kernel, c))
            place = idxs[en]
            look.ravel()[place] = 1
            look *= derivative[:, i, j, :].reshape(bs, 1, 1, c)
            en += 1
            out[:, i * kernel : (i + 1) * kernel, j * kernel : (j + 1) * kernel, :] = look
    return out


# Calculate new gradients (backpropagation)
def cng(grad, wts, x):
    """
    Calculate new gradients during backpropagation for convolutional layers.

    Parameters:
        grad (ndarray): Gradient tensor of shape (batch_size, s1, s2, grad.shape[-1]).
        wts (ndarray): Weights of the layer.
        x (ndarray): Input tensor.

    Returns:
        ndarray: New gradient tensor of the same shape as input tensor.
    """
    bs, s1, s2, s3 = x.shape
    grad1 = np.zeros((bs, s1 + 1, s2 + 1, grad.shape[-1]))
    grad1[:, 1:s1, 1:s2, :] = grad
    out = np.zeros_like(x)
    p, q, r, s = wts.shape
    for l in range(s):
        for k in range(r):
            for i in range(s2):
                for j in range(s1):
                    gd = dot(rotate_180(wts[:, :, k, l]), grad1[:, i : i + p, j : j + q, l])
                    out[:, i, j, k] += gd
    return out


# Derivative of sigmoid function
def sigmoid_derivative(x):
    """
    Derivative of the sigmoid function.

    Parameters:
        x (ndarray): Input tensor.

    Returns:
        ndarray: Derivative tensor.
    """
    out = 1
    return out


# Convolution operation (only forward pass)
def conv(input, kernel):
    """
    Convolution operation (only forward pass).

    Parameters:
        input (ndarray): Input tensor of shape (batch_size, s1, s2, z).
        kernel (ndarray): Convolution kernel of shape (_, q1, q2, x).

    Returns:
        ndarray: Output tensor after convolution of shape (s1 - q1 + 1, s2 - q2 + 1, z, x).
    """
    bs, s1, s2, z = input.shape
    _, q1, q2, x = kernel.shape
    b = np.zeros(((s1 - q1) + 1, (s2 - q2) + 1, z, x))
    for l in range(z):
        for y in range(x):
            for j in range(b.shape[0]):
                for k in range(b.shape[1]):
                    out = dot(kernel[:, :, :, y], input[:, j : j + q1, k : k + q2, l]) / input.shape[0]
                    b[j, k, l, y] = out
    return b



def calculate_grad(layers, true):
    """
    Calculate gradients for each layer during backpropagation.

    Parameters:
        layers (list): List of layer dictionaries representing the neural network.
        true (numpy.ndarray): The true labels of the training data.

    Returns:
        list: Updated list of layer dictionaries with computed gradients.

    Notes:
        This function performs backpropagation to compute gradients for each layer in the neural network.
        The gradients are used to update the weights during training.

    """
    for layer in layers:
        # Extract relevant variables from the layer dictionary
        try:
            wts, inp, out, layer_type, pos = (
                layer["weight"],
                layer["input"],
                layer["output"],
                layer["type"],
                layer["position"],
            )
        except:
            try:
                out, shp, layer_type, pos = (
                    layer["output"],
                    layer["shape"],
                    layer["type"],
                    layer["position"],
                )
            except:
                out, layer_type, pos, shp, index = (
                    layer["output"],
                    layer["type"],
                    layer["position"],
                    layer["kernel"],
                    layer["index"],
                )

        for i in range(0, pos):
            # Handle different layer types and compute gradients accordingly
            try:
                wts1, inp1, out1, type1, pos1 = (
                    layers[i]["weight"],
                    layers[i]["input"],
                    layers[i]["output"],
                    layers[i]["type"],
                    layers[i]["position"],
                )
            except:
                try:
                    out1, shp1, type1, pos1 = (
                        layers[i]["output"],
                        layers[i]["shape"],
                        layers[i]["type"],
                        layers[i]["position"],
                    )
                except:
                    inp1, out1, type1, pos1, shp1, index1 = (
                        layers[i]["input"],
                        layers[i]["output"],
                        layers[i]["type"],
                        layers[i]["position"],
                        layers[i]["kernel"],
                        layers[i]["index"],
                    )

            if i == 0:
                # Compute the initial gradient for the loss function
                grad = loss_back(true, out1)
                ot = sigmoid_derivative(out1)
                grad *= ot
                # grad.shape: (batch_size, 1)

            if type1 == "out" and layer_type != "out" and i != pos - 1:
                # Compute the gradient for previous hidden layers in fully connected networks
                grad = np.dot(grad, wts1.T)
                # grad.shape: (batch_size, -1)

            if type1 == "max" and i != pos - 1:
                # Compute the gradient for the maxpooling layer
                grad = maxpool_derivative(grad, shp1, index1)

            if type1 == "flatten" and i != pos - 1:
                # Reshape the gradient to match the output shape of the flatten layer
                grad = grad.reshape(shp1)

            if type1 == "conv" and i != pos - 1:
                # Compute the gradient for convolutional layers
                grad = cng(grad, wts1, inp1)

        if layer_type == "out" or layer_type == "dense":
            # Compute the gradient for the fully connected or output layer
            grad1 = np.dot(inp.T, grad) / true.shape[0]
            layer["grad"] = grad1

        if layer_type == "conv":
            # Compute the gradient for convolutional layers
            grad1 = conv(inp, grad)
            layer["grad"] = grad1

    return layers
def propagate(layers, lr, itr):
    """
    Update the weights of each layer based on gradients and momentum.

    Parameters:
        layers (list): List of layer dictionaries representing the neural network.
        lr (float): Learning rate, controls the step size during weight updates.
        itr (int): Current training iteration, used for momentum calculation.

    Returns:
        list: Updated list of layer dictionaries with updated weights.

    Notes:
        This function updates the weights of each layer based on the calculated gradients and momentum.
        The learning rate (lr) and current training iteration (itr) are used to adjust the step size
        during weight updates for faster or more stable convergence.

    """
    for layer in layers:
        if layer["type"] != "flatten" and layer["type"] != "max":
            # Update the momentum for each layer using a moving average with momentum coefficient 0.9
            layer["momentum"] = 0.9 * layer["momentum"] + 0.1 * layer["grad"]

            # Update the weights using the momentum and learning rate
            layer["weight"] -= lr * layer["momentum"]

    return layers
def update(layers):
    """
    Forward pass through the neural network to update the output of each layer.

    Parameters:
        layers (list): List of layer dictionaries representing the neural network.

    Returns:
        list: Updated list of layer dictionaries with updated outputs.

    Notes:
        This function performs a forward pass through the neural network to update the output of each layer.
        The output of each layer is computed based on the input and weights. The function handles both
        convolutional and output layers. The list of layer dictionaries is reversed at the beginning
        to perform the forward pass in the correct order.

    """
    layers = layers[::-1]  # Reverse the list to perform forward pass in the correct order
    for en, layer in enumerate(layers):
        try:
            wts, inp, out, type, pos = (
                layer["weight"],
                layer["input"],
                layer["output"],
                layer["type"],
                layer["position"],
            )
        except:
            try:
                out, shp, type, pos = (
                    layer["output"],
                    layer["shape"],
                    layer["type"],
                    layer["position"],
                )
            except:
                out, type, pos, shp, index = (
                    layer["output"],
                    layer["type"],
                    layer["position"],
                    layer["kernel"],
                    layer["index"],
                )

        # Perform forward pass for convolutional layer
        if type == "conv":
            bs, a, b, c = inp.shape
            bs, x, y, z = out.shape
            kernel = wts.shape[1]
            for i in range(z):
                for j in range(x):
                    for k in range(y):
                        out[:, j, k, i] = dot(
                            wts[:, :, :, i], inp[:, j : j + kernel, k : k + kernel]
                        )

        # Perform forward pass for output layer
        if type == "out":
            out = np.dot(inp, wts)
            out = sigmoid(out)

        # Update the input of the next layer if it exists
        if pos != 1:
            layers[en + 1]["input"] = out

        layer["output"] = out  # Update the output of the current layer

    return layers[::-1]  # Return the list of layer dictionaries in the original order


def save(layers, epoch):
    """
    Save the weights and momentum of each layer at a specific epoch.

    Parameters:
        layers (list): List of layer dictionaries representing the neural network.
        epoch (int): Current epoch number.

    Notes:
        This function saves the weights and momentum of each layer at a specific epoch.
        The saved files are in numpy format with names based on the layer type, index, and epoch number.

    """
    for en, i in enumerate(layers):
        try:
            np.save(i["type"] + str(en) + str(epoch) + "_weight.npy", i["weight"])
            np.save(i["type"] + str(en) + str(epoch) + "_momentum.npy", i["momentum"])
        except:
            continue
def train(lr, trn, target, epoch):
    """
    Train the neural network.

    Parameters:
        lr (float): Learning rate for training.
        trn (numpy.ndarray): Training data.
        target (numpy.ndarray): Target labels for training data.
        epoch (int): Number of epochs for training.

    Returns:
        list: List of loss values for each epoch.
        list: List of updated layer dictionaries representing the neural network.

    Notes:
        This function trains the neural network using stochastic gradient descent (SGD) for the specified number of epochs.
        It calculates the loss for each mini-batch, updates the weights of the layers, and saves the weights at each epoch.
        The function returns a list of loss values for each epoch and the final updated layer dictionaries.

    """
    loss1 = []  # List to store loss values for each epoch
    acc = []   # List to store accuracy values (not used in the provided code)
    prev_loss = 999  # Variable to store previous loss (not used in the provided code)

    for i in range(epoch):
        LOSS1 = []  # List to store loss values for each mini-batch in the current epoch
        for e in tqdm(range(int(math.ceil(trn.shape[0] / 32)))):
            # Extract the mini-batch of data and corresponding labels
            img = []
            tar = []
            for ii in range(e * 32, (e + 1) * 32):
                if ii < trn.shape[0]:
                    img.append(trn[ii].reshape(240, 240, 1))
                    tar.append(target[ii])

            img = np.asarray(img)
            true = np.asarray(tar)

            # Initialize or update the layers at the beginning of training
            if e == 0 and i == 0:
                a = convolution(img.reshape(32, 240, 240, 1), 2, 2)
                b = convolution(a[2], 2, 4)
                g = maxpool(b[2], 2)
                f = flatten(g[0])
                o = dense(f[1], 1)

            layers = [
                {
                    "weight": o[0],
                    "input": o[1],
                    "output": o[2],
                    "type": "out",
                    "position": 1,
                    "momentum": np.zeros_like(o[0]),
                },
                {"shape": f[0].shape, "output": f[1], "type": "flatten", "position": 2},
                {
                    "output": g[0],
                    "kernel": g[1],
                    "index": g[2],
                    "type": "max",
                    "position": 3,
                    "input": g[-1],
                },
                {
                    "weight": b[0],
                    "input": b[1],
                    "output": b[2],
                    "type": "conv",
                    "position": 4,
                    "momentum": np.zeros_like(b[0]),
                },
                {
                    "weight": a[0],
                    "input": img,
                    "output": a[2],
                    "type": "conv",
                    "position": 5,
                    "momentum": np.zeros_like(a[0]),
                },
            ]

            layers = calculate_grad(layers, true)
            # Visualize gradients for debugging (optional)
            for layer in layers:
                try:
                    sns.distplot(layer["grad"].ravel())
                    plt.show()
                except:
                    pass

            layers = propagate(layers, lr, e)
            layers = update(layers)
            pred = layers[0]["output"]
            LOSS = np.mean(loss(true, pred))
            LOSS1.append(LOSS)
            del [img, true, tar]
            gc.collect()
            save(layers, i)

        print(np.mean(LOSS1))
        loss1.append(np.mean(LOSS))
    return loss1, layers


loss1, layers = train(1e-4, trn, tar, 8)


sns.scatterplot(x=range(len(loss1)), y=loss1)

np.save("loss.npy", loss1)
