In [1]:
# Lauren Montion & Ella Wiser
# DS4420 - Section 3
# Implementation of CNN from scratch 

In [2]:
# import necessary libraries
import numpy as np
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt
import os
from pathlib import Path
from sklearn.model_selection import train_test_split
from tqdm import tqdm

In [3]:
# connect to nested folder structure:
# Dataset_BUSI_with_GT -> train -> benign
#                                  malignant
#                                  normal
#                         test  -> benign
#                                  malignant
#                                  normal

# connect to path where the data is
downloads_path = os.path.expanduser("~/Downloads")
dataset_path = os.path.join(downloads_path, "Dataset_BUSI_with_GT")

# set image size
# original sizes all over the place, using 112x112 for efficiency without getting rid of too much information
img_size = (112, 112)

# class labels, asking "does this MRI contain a benign tumor or no tumor?" another way of determining if someone has cancer or not
class_labels = {'benign': 1, 'normal': 0}

# load in training data
train_labels = []
train_images = []

train_path = os.path.join(dataset_path, 'train')

for class_name in ['benign', 'normal']:
    class_path = os.path.join(train_path, class_name)
    
    for img_file in os.listdir(class_path):
        if img_file.lower().endswith(('.png')):
            img_path = os.path.join(class_path, img_file)
            
            img = Image.open(img_path).convert('L') 
            img = img.resize(img_size)
            img_array = np.array(img)
            img_array = np.expand_dims(img_array, axis=-1)
            
            train_images.append(img_array)
            train_labels.append(class_labels[class_name])

# load in test data
test_labels = []
test_images = []

# connects to the test folder within the Dataset_BUSI_with_GT folder
test_path = os.path.join(dataset_path, 'test')

# grab images in benign and normal folders
for class_name in ['benign', 'normal']:
    class_path = os.path.join(test_path, class_name)
    
    for img_file in os.listdir(class_path):
        if img_file.lower().endswith(('.png')):
            img_path = os.path.join(class_path, img_file)

            # convert to grayscale
            img = Image.open(img_path).convert('L') 

            # resize images to 224x224
            img = img.resize(img_size)
            img_array = np.array(img)
            
            # add channel dimension
            img_array = np.expand_dims(img_array, axis=-1) 
            
            test_images.append(img_array)
            test_labels.append(class_labels[class_name])

# convert to numpy arrays and standardize
x_train = np.array(train_images, dtype='float32') / 255.0
y_train = np.array(train_labels, dtype='float32')

x_test = np.array(test_images, dtype='float32') / 255.0
y_test = np.array(test_labels, dtype='float32')

# print shapes for future use if there are bugs in CNN
print(f"x_train shape: {x_train.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"x_test shape: {x_test.shape}")
print(f"y_test shape: {y_test.shape}")
print(f"\nBenign (1): {np.sum(y_train == 1)} train, {np.sum(y_test == 1)} test")
print(f"Normal (0): {np.sum(y_train == 0)} train, {np.sum(y_test == 0)} test")

x_train shape: (462, 112, 112, 1)
y_train shape: (462,)
x_test shape: (108, 112, 112, 1)
y_test shape: (108,)

Benign (1): 353 train, 84 test
Normal (0): 109 train, 24 test


In [4]:
# class to apply convolutional layers during training - decided to use 3x3 for kernel size
class ConvLayers:
    def __init__(self, num_kernels, input_channels=1):
        self.num_kernels = num_kernels
        self.input_channels = input_channels
        # kernels shape: (num_kernels, 3, 3, input_channels)
        self.kernels = np.random.randn(num_kernels, 3, 3, input_channels) / 9

    # using a 3x3 kernel- produces all 3x3 regions the kernel will pass over, NOT using padding
    def iterate_regions(self, image):
        h, w, c = image.shape
        for i in range(h - 2):
            for j in range(w - 2):
                im_region = image[i:(i + 3), j:(j + 3), :]
                yield im_region, i, j

    # forward pass of conv layer
    def forward(self, input):
        self.last_input = input
        batch_size, h, w, c = input.shape
        output = np.zeros((batch_size, h - 2, w - 2, self.num_kernels))
        
        for b in range(batch_size):
            for im_region, i, j in self.iterate_regions(input[b]):
                for f in range(self.num_kernels):
                    output[b, i, j, f] = np.sum(im_region * self.kernels[f])
        
        return output

    # backward pass of conv layer using backpropagation
    def backprop(self, d_L_d_out, learn_rate):
        '''
        Performs a backward pass of the conv layer.
        - d_L_d_out is the loss gradient for this layer's outputs (batch, h, w, num_kernels)
        - learn_rate is a float.
        '''
        batch_size = d_L_d_out.shape[0]
        d_L_d_kernels = np.zeros(self.kernels.shape)
        d_L_d_input = np.zeros(self.last_input.shape)
        
        for b in range(batch_size):
            for im_region, i, j in self.iterate_regions(self.last_input[b]):
                for f in range(self.num_kernels):
                    d_L_d_kernels[f] += d_L_d_out[b, i, j, f] * im_region
                    d_L_d_input[b, i:(i+3), j:(j+3), :] += d_L_d_out[b, i, j, f] * self.kernels[f]
        
        # update kernels
        self.kernels -= learn_rate * d_L_d_kernels / batch_size
        
        return d_L_d_input

In [5]:
# class so that we can use max pooling to reduce the size of the images in the conv layers, keeps important features
class MaxPool:
    
    # using a 2x2 max pooling layer size- generate 2x2 regions in each image
    def iterate_regions(self, image):
        h, w, _ = image.shape
        new_h = h // 2
        new_w = w // 2
        for i in range(new_h):
            for j in range(new_w):
                im_region = image[(i * 2):(i * 2 + 2), (j * 2):(j * 2 + 2)]
                yield im_region, i, j

    # forward pass of max pooling layer
    def forward(self, input):
        self.last_input = input
        batch_size, h, w, num_kernels = input.shape
        output = np.zeros((batch_size, h // 2, w // 2, num_kernels))
        
        for b in range(batch_size):
            for im_region, i, j in self.iterate_regions(input[b]):
                output[b, i, j] = np.amax(im_region, axis=(0, 1))
        
        return output

    # backward pass of max pooling layer
    def backprop(self, d_L_d_out):
        '''
        Performs a backward pass of the maxpool layer.
        Returns the loss gradient for this layer's inputs.
        - d_L_d_out is the loss gradient for this layer's outputs (batch, h/2, w/2, num_kernels)
        '''
        batch_size = d_L_d_out.shape[0]
        d_L_d_input = np.zeros(self.last_input.shape)
        
        for b in range(batch_size):
            for im_region, i, j in self.iterate_regions(self.last_input[b]):
                h, w, f = im_region.shape
                amax = np.amax(im_region, axis=(0, 1))
                for i2 in range(h):
                    for j2 in range(w):
                        for f2 in range(f):
                            # If this pixel was the max value, copy the gradient to it.
                            if im_region[i2, j2, f2] == amax[f2]:
                                d_L_d_input[b, i * 2 + i2, j * 2 + j2, f2] = d_L_d_out[b, i, j, f2]
        
        return d_L_d_input

In [6]:
# class to define a fully connected layer, so that every input maps to every output
# will be used once convolution is over and we pass through the NN as nodes
class FullyConnect:
    def __init__(self, input_size, output_size):
        self.weights = np.random.randn(input_size, output_size) / input_size
        self.biases = np.zeros(output_size)
    
    def forward(self, input):
        self.last_input_shape = input.shape
        self.last_input = input
        return input @ self.weights + self.biases
    
    def backprop(self, d_L_d_out, learn_rate):
        d_L_d_w = self.last_input.T @ d_L_d_out
        d_L_d_b = np.sum(d_L_d_out, axis=0)
        d_L_d_input = d_L_d_out @ self.weights.T
        
        self.weights -= learn_rate * d_L_d_w / self.last_input.shape[0]
        self.biases -= learn_rate * d_L_d_b / self.last_input.shape[0]
        
        return d_L_d_input

In [7]:
# necessary activation functions. ReLU to be used in convolution layers and hidden layers of NN
# Sigmoid only used in final layer of NN before final output of 1 (benign) or 0 (normal)

# adds non-linearity to recognize complex patterns
class ReLU:
    def forward(self, x):
        self.mask = (x > 0)
        return x * self.mask
    
    def backward(self, d_out):
        return d_out * self.mask

# puts values between 0 and 1, useful since this is binary classification task
class Sigmoid:
    def forward(self, x):
        self.out = 1 / (1 + np.exp(-np.clip(x, -500, 500)))
        return self.out
    
    def backward(self, d_out):
        return d_out * self.out * (1 - self.out)

In [8]:
# binary cross entropy loss, since this is a binary problem. tracked during training
def binary_cross_entropy(y_pred, y_true):
    eps = 1e-7
    y_pred = np.clip(y_pred, eps, 1 - eps)
    return -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))

# need derivative to determine in which direction to shift weights during training
def binary_cross_entropy_derivative(y_pred, y_true):
    return y_pred - y_true

In [9]:
# initialize model, define components of structure
# dimensions included as sanity check

# 112x112x1
conv1 = ConvLayers(num_kernels=8, input_channels=1)  
relu1 = ReLU()

# 110x110x8
conv2 = ConvLayers(num_kernels=16, input_channels=8) 
relu2 = ReLU()

# 108x108x16
conv3 = ConvLayers(num_kernels=32, input_channels=16) 
relu3 = ReLU()

# 106x106x32
pool = MaxPool() 

# dimension of output before flattening: 53x53x32

fully_connected_layer1 = FullyConnect(53*53*32, 64) 
relu4 = ReLU()
# now have 89888 nodes

fully_connected_layer2 = FullyConnect(64, 1) 
sigmoid = Sigmoid()
# output is 1 node

In [11]:
# begin CNN
print('CNN is initialized')

# initialize parameters
learning_rate = 0.005
epochs = 20
batch_size = 16

# train model
for epoch in range(epochs):
    print(f'--- Epoch {epoch + 1} ---')
    
    # shuffle training data so model doesn't rely on data order (would make accuracy untrustworthy)
    indices = np.random.permutation(len(x_train))
    x_train_shuffled = x_train[indices]
    y_train_shuffled = y_train[indices]
    
    # batch training so computer doesn't get overwhelmed
    loss = 0
    num_correct = 0
    num_batches = 0
    
    for i in range(0, len(x_train_shuffled), batch_size):
        batch_x = x_train_shuffled[i:i+batch_size]
        batch_y = y_train_shuffled[i:i+batch_size].reshape(-1, 1)
        
        # forward pass
        out = conv1.forward(batch_x)
        out = relu1.forward(out)
        
        out = conv2.forward(out)
        out = relu2.forward(out)
        
        out = conv3.forward(out)
        out = relu3.forward(out)
        out = pool.forward(out)

        # convolution is done, now we flatten and it's a "normal" NN
        out = out.reshape(out.shape[0], -1)
        out = fully_connected_layer1.forward(out)
        out = relu4.forward(out)
        out = fully_connected_layer2.forward(out)
        # sigmoid on output since this is a binary problem
        out = sigmoid.forward(out)
        
        # calculate loss and accuracy 
        training_loss = binary_cross_entropy(out, batch_y)
        acc = np.sum((out > 0.5) == batch_y)
        
        loss += training_loss
        num_correct += acc
        num_batches += 1
        
        # backward pass, backpropagation through all 3 layers
        gradient = binary_cross_entropy_derivative(out, batch_y)
        gradient = sigmoid.backward(gradient)
        
        gradient = fully_connected_layer2.backprop(gradient, learning_rate)
        gradient = relu4.backward(gradient)
        
        gradient = fully_connected_layer1.backprop(gradient, learning_rate)

        # now we "unflatten" to go through conv layers
        gradient = gradient.reshape(batch_x.shape[0], 53, 53, 32)
        gradient = pool.backprop(gradient)
        gradient = relu3.backward(gradient)
        gradient = conv3.backprop(gradient, learning_rate)
        
        gradient = relu2.backward(gradient)
        gradient = conv2.backprop(gradient, learning_rate)
        gradient = relu1.backward(gradient)
        
        gradient = conv1.backprop(gradient, learning_rate)
        
        # printing progress every 10 batches for sanity. help
        if (num_batches) % 10 == 0:
            print(
                '[Batch %d] Last 10 batches: Average Loss %.3f | Accuracy: %d%%' %
                (num_batches, loss / 10, (num_correct / (10 * batch_size)) * 100)
            )
            loss = 0
            num_correct = 0

CNN is initialized
--- Epoch 1 ---
[Batch 10] Last 10 batches: Average Loss 0.674 | Accuracy: 78%
[Batch 20] Last 10 batches: Average Loss 0.672 | Accuracy: 78%
--- Epoch 2 ---
[Batch 10] Last 10 batches: Average Loss 0.671 | Accuracy: 77%
[Batch 20] Last 10 batches: Average Loss 0.673 | Accuracy: 73%
--- Epoch 3 ---
[Batch 10] Last 10 batches: Average Loss 0.667 | Accuracy: 78%
[Batch 20] Last 10 batches: Average Loss 0.668 | Accuracy: 75%
--- Epoch 4 ---
[Batch 10] Last 10 batches: Average Loss 0.665 | Accuracy: 76%
[Batch 20] Last 10 batches: Average Loss 0.663 | Accuracy: 76%
--- Epoch 5 ---
[Batch 10] Last 10 batches: Average Loss 0.655 | Accuracy: 81%
[Batch 20] Last 10 batches: Average Loss 0.664 | Accuracy: 73%
--- Epoch 6 ---
[Batch 10] Last 10 batches: Average Loss 0.656 | Accuracy: 76%
[Batch 20] Last 10 batches: Average Loss 0.655 | Accuracy: 76%
--- Epoch 7 ---
[Batch 10] Last 10 batches: Average Loss 0.655 | Accuracy: 75%
[Batch 20] Last 10 batches: Average Loss 0.652 | A

In [13]:
# test cnn
print('\n--- Testing the CNN ---')
loss = 0
num_correct = 0

for i in range(0, len(x_test), batch_size):
    batch_x = x_test[i:i+batch_size]
    batch_y = y_test[i:i+batch_size].reshape(-1, 1)
    
    # forward pass using same structure as in training
    out = conv1.forward(batch_x)
    out = relu1.forward(out)
    
    out = conv2.forward(out)
    out = relu2.forward(out)
    
    out = conv3.forward(out)
    out = relu3.forward(out)
    out = pool.forward(out)
    
    out = out.reshape(out.shape[0], -1)
    out = fully_connected_layer1.forward(out)
    out = relu4.forward(out)
    
    out = fully_connected_layer2.forward(out)
    out = sigmoid.forward(out)
    
    test_loss = binary_cross_entropy(out, batch_y)
    acc = np.sum((out > 0.5) == batch_y)
    
    loss += test_loss
    num_correct += acc

num_tests = len(x_test)

# print loss and accuracy
print('Test Loss:', loss / (num_tests // batch_size))
print('Test Accuracy:', (num_correct / num_tests))


--- Testing the CNN ---
Test Loss: 0.6659720516269472
Test Accuracy: 0.7777777777777778
