# Imports

In [84]:
import os
import numpy as np
from scipy.signal import convolve2d
from PIL import Image

# Constants

In [85]:
DATA_DIR = os.path.join(os.getcwd(), "data")
TRAIN_DIR = os.path.join(DATA_DIR, "train")
TEST_DIR = os.path.join(DATA_DIR, "test")

# Convolutional Neural Network

## Reading Data
Data are images 48x48 in grayscale (expected size is 48x48, but grayscale is mandatory for the images).  
Grayscale meaning, there is only 1 channel (for RGB image there are 3 channels).  
So the actual image is of size (48, 48, 1) - (image_width, image_height, n_channels).  

This section extracts the images and labels.

1. Read images, from each image remember its label (f.e. happy, sad, ...). We now have images and label_names lists.
2. Map the label names list, each label name will now have its own identificator. (f.e. happy = 0, sad = 1, ...). We now have labels list.
3. Convert the labels list into a 2D one-hot encoded vector array.

In [86]:
def get_data(data_dir: str, image_size=(48, 48)) -> (np.array, np.array):
    
    """
    Reading data from data_dir.
    Each image is processed, converted into a numpy array and then normalized (every value is divided by 255).
    Image is expected to be in grayscale (.convert('L')), and its size should be 48x48 (default).
    
    Expected directory tree from which the images are processed:
    - data_dir
        - facial_expression_dir1
            - image1
            - image2
            - ...
        - facial_expression_dir2 
        - facial_expression_dir3
        - ...
        - facial_expression_dirN
        
    :param data_dir: directory from which the images are processed
    :param image_size: size of the images, default is 48x48
    :return: two numpy arrays, first numpy array has stored all the images, and second numpy array has stored all the label names
    """
    
    images = []
    label_names = []
    
    for expression_dirname in sorted(os.listdir(data_dir)):  
        # get every directory, this directory contains images of facial expressions 
        
        expression_dir = os.path.join(data_dir, expression_dirname)
        
        if not os.path.isdir(expression_dir):  # process only directories, skip non-directories - files
            continue
            
        for expression_image in os.listdir(expression_dir):
            # get every image in the directory
            
            image_path = os.path.join(expression_dir, expression_image)
            
            try:
                image = Image.open(image_path).convert('L')  # L mode, because images are grayscaled
                image = image.resize(image_size)  # resize the image to expected 48x48
                
                # convert image into an array, image is a 2D array
                image_array = np.array(image)
                
                # normalize image values, values between 0 - 1
                image_array = image_array / 255.0

                images.append(image_array)
                label_names.append(expression_dirname)  # directory name is already a label name of a facial expression
                
            except Exception as e:
                print(f"Failed to process image: {image_path}")
                print(e)
    
    # convert images and labels list into numpy arrays adn return them
    return np.array(images), np.array(label_names)            

In [87]:
train_images, train_label_names = get_data(TRAIN_DIR)
test_images, test_label_names = get_data(TEST_DIR)

print("Train Images:")
print(train_images.shape)
print(train_images, '\n')

print("Test Images:")
print(test_images.shape)
print(test_images, '\n')

print("Train Labels:")
print(train_label_names.shape)
print(train_label_names, '\n')

print("Test Labels:")
print(test_label_names.shape)
print(test_label_names, '\n')

Train Images:
(28709, 48, 48)
[[[0.70980392 0.70196078 0.69411765 ... 0.71372549 0.71372549 0.71372549]
  [0.70196078 0.69803922 0.69019608 ... 0.70196078 0.69411765 0.68627451]
  [0.70196078 0.69803922 0.69019608 ... 0.67843137 0.70196078 0.7254902 ]
  ...
  [0.76862745 0.70980392 0.74901961 ... 0.90196078 0.89411765 0.80392157]
  [0.76078431 0.72941176 0.78431373 ... 0.89019608 0.87058824 0.91372549]
  [0.77647059 0.77254902 0.83137255 ... 0.88627451 0.85882353 0.95294118]]

 [[0.08235294 0.07058824 0.10588235 ... 0.32941176 0.20392157 0.24705882]
  [0.08235294 0.08235294 0.10980392 ... 0.34509804 0.28235294 0.36078431]
  [0.09019608 0.10980392 0.12941176 ... 0.4        0.41176471 0.45882353]
  ...
  [0.99607843 0.99215686 1.         ... 0.63921569 0.61960784 0.61176471]
  [1.         1.         1.         ... 0.59607843 0.64705882 0.59607843]
  [0.99607843 1.         0.99215686 ... 0.61568627 0.57647059 0.56078431]]

 [[0.16078431 0.24705882 0.33333333 ... 0.13333333 0.1372549  0.12

In [88]:
print("Example of one Image:")
print(train_images[0].shape)
print(train_images[0])

Example of one Image:
(48, 48)
[[0.70980392 0.70196078 0.69411765 ... 0.71372549 0.71372549 0.71372549]
 [0.70196078 0.69803922 0.69019608 ... 0.70196078 0.69411765 0.68627451]
 [0.70196078 0.69803922 0.69019608 ... 0.67843137 0.70196078 0.7254902 ]
 ...
 [0.76862745 0.70980392 0.74901961 ... 0.90196078 0.89411765 0.80392157]
 [0.76078431 0.72941176 0.78431373 ... 0.89019608 0.87058824 0.91372549]
 [0.77647059 0.77254902 0.83137255 ... 0.88627451 0.85882353 0.95294118]]


### Map the label names into actual labels
Each label name should have its integer identificator.  
From label names list get labels list, where label name has been replaced by its identificator

In [89]:
def map_label_names(label_names: np.array) -> np.array:
    
    """
    Map the label names, each label name will have its own identificator.
    Replace the label names with their unique identifier.
    :param label_names: list of label names
    :return: an array of labels, where now the label names have been replaced by their unique identifier.
    """
    
    mapped_labels = {}
    
    # map the unique label names
    for label, unique_label_name in enumerate(np.unique(label_names)):
        mapped_labels[unique_label_name] = label

    # replace label name by its identificator
    labels = np.array([mapped_labels[label_name] for label_name in label_names])
    
    return labels

In [90]:
train_labels = map_label_names(train_label_names)
test_labels = map_label_names(test_label_names)

print("Train labels:")
print(train_labels.shape)
print(train_labels, '\n')

print("Test labels:")
print(test_labels.shape)
print(test_labels)

Train labels:
(28709,)
[0 0 0 ... 6 6 6] 

Test labels:
(7178,)
[0 0 0 ... 6 6 6]


### Encode the labels into one hot vectors


In [91]:
def one_hot_encode(labels: np.array, num_classes: int):
    
    """
    Encode the labels 1D vector into one-hot vectors encoding.
    One hot encoded vector has all zeros, but only one 1.
    
    :param labels: 1D vector of labels 
        F.e.:
        happy: 0
        sad: 1
        angry: 2
        labels: [0, 0, 1, 1, 2]
        
    :param num_classes: number of unique classes - of unique labels (f.e. 3 - happy, sad, and angry)
    :return: a 2D one hot encoded array.
    
            F.e.:
            labels = [0, 0, 1, 1, 2]
            one_hot = 
                [
                    [1 0 0]
                    [1 0 0]
                    [0 1 0]
                    [0 1 0]
                    [0 0 1]
                ]
            - shape of one_hot: (n_labels, unique_labels)
    """
    
    # an array full of zeros of shape (num_labels, num_classes) 
    one_hot = np.zeros((len(labels), num_classes))
    
    # set the 1 to appropriate labels
    for n_row, label in enumerate(labels):
        one_hot[n_row, label] = 1
    
    return one_hot

In [92]:
num_classes = len(np.unique(test_labels))

train_labels_one_hot = one_hot_encode(train_labels, num_classes)
test_labels_one_hot = one_hot_encode(test_labels, num_classes)

print("Train labels one-hot:")
print(train_labels_one_hot.shape)
print(train_labels_one_hot, '\n')

print("Test labels one-hot:")
print(test_labels_one_hot.shape)
print(test_labels_one_hot, '\n')

Train labels one-hot:
(28709, 7)
[[1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]] 

Test labels one-hot:
(7178, 7)
[[1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]] 



### Final Representation of Data


Training and testing sets:  
- image arrays: train_images and test_images
- one-hot encoded vector arrays: train_labels_one_hot, test_labels_one_hot

In [93]:
print("Train Images (first 5):")
print(train_images.shape)
print(train_images[:5], '\n')

print("Test Images (first 5):")
print(test_images.shape)
print(test_images[:5], '\n')

print("Train Labels one-hot:")
print(train_labels_one_hot.shape)
print(train_labels_one_hot, '\n')

print("Test Labels one-hot:")
print(test_labels_one_hot.shape)
print(test_labels_one_hot, '\n')

Train Images (first 5):
(28709, 48, 48)
[[[0.70980392 0.70196078 0.69411765 ... 0.71372549 0.71372549 0.71372549]
  [0.70196078 0.69803922 0.69019608 ... 0.70196078 0.69411765 0.68627451]
  [0.70196078 0.69803922 0.69019608 ... 0.67843137 0.70196078 0.7254902 ]
  ...
  [0.76862745 0.70980392 0.74901961 ... 0.90196078 0.89411765 0.80392157]
  [0.76078431 0.72941176 0.78431373 ... 0.89019608 0.87058824 0.91372549]
  [0.77647059 0.77254902 0.83137255 ... 0.88627451 0.85882353 0.95294118]]

 [[0.08235294 0.07058824 0.10588235 ... 0.32941176 0.20392157 0.24705882]
  [0.08235294 0.08235294 0.10980392 ... 0.34509804 0.28235294 0.36078431]
  [0.09019608 0.10980392 0.12941176 ... 0.4        0.41176471 0.45882353]
  ...
  [0.99607843 0.99215686 1.         ... 0.63921569 0.61960784 0.61176471]
  [1.         1.         1.         ... 0.59607843 0.64705882 0.59607843]
  [0.99607843 1.         0.99215686 ... 0.61568627 0.57647059 0.56078431]]

 [[0.16078431 0.24705882 0.33333333 ... 0.13333333 0.137

## Helpful Functions
Functions that have nothing to do with CNN.  
Helpful functions to make the program run smoothly.

## Convolutional Layers

### Custom Errors
Made custom errors, which can be raised in the convolutional layer.

In [95]:
class InvalidNumberOfFilters(Exception):
    """ Invalid number of filters specified """
    def __init__(self, message="Invalid number of filters specified by the user"):
        self.message = message
        super().__init__(self.message)


class InvalidFilterSize(Exception):
    """ Invalid filter size specified """
    def __init__(self, message="Invalid filter size specified by the user"):
        self.message = message
        super().__init__(self.message)
        
        
class InvalidNumberOfChannels(Exception):
    """ Invalid number of channels specified """
    def __init__(self, message="Invalid number of channels specified by the user"):
        self.message = message
        super().__init__(self.message)
        
        
class InvalidPadding(Exception):
    """ Invalid padding specified """
    def __init__(self, message="Invalid padding specified by the user"):
        self.message = message
        super().__init__(self.message)
        

class InvalidStride(Exception):
    """ Invalid stride specified """
    def __init__(self, message="Invalid stride specified by the user"):
        self.message = message
        super().__init__(self.message)

In [138]:
class ConvLayer:
    
    """
    """
    
    def __init__(self, num_filters: int, filter_size: int, num_channels: int):
        """
        Function constructor, initialize the filter array.
        
        :param num_filters: number of filters in the convolutional layer
        :param filter_size: size of the filter, num_channels x filter_size x filter_size
        :param num_channels: depth of the filter - kernel
        :raises 
        """
        
        if not isinstance(num_filters, int):
            raise ValueError("Number of filters \"num_filters\" must be an integer!")
        elif num_filters < 1:
            raise InvalidNumberOfFilters(f"Number of filters \"num_filters\" must be at least 1, \"num_filters={num_filters}\" specified instead!")
        
        if not isinstance(filter_size, int):
            raise ValueError("Filter - kernel size \"filter_size\" must be an integer!")
        elif filter_size < 2:
            raise InvalidFilterSize(f"Filter - kernel size \"filter_size\" must be at least 2, \"filter_size={filter_size}\" specified instead!")
            
        if not isinstance(num_channels, int):
            raise ValueError("Number of channels \"num_channels\" must be an integer!")
        elif num_channels < 1:
            raise InvalidNumberOfChannels(f"Number of channels \"num_channels\" must be at least 1, \"num_channels={num_channels}\" was specified instead!")
            
        self.num_filters = num_filters
        self.filter_size = filter_size
        self.num_channels = num_channels
        
        # creates a 4D numpy array
        # filter is of size (c x f x f), where f is the filter_size, and c is the num_channels
        # (c x f x f) because image size is expected to be (c x height x width)
        self.filters = np.random.randn(num_filters, num_channels, filter_size, filter_size) / filter_size ** 2
    
    
    def shift_filter_window(self, image: np.array, stride: int) -> (np.array, int, int):
        
        """
        Shift the filter window to get all the regions in the image.
        These regions are for the convolutional operations with the kernel - filter.
        
        :param image: image from which to get all the regions from.
        :param stride: size of the stride, shift size - how big a step/shift
        :return: a region (where input image and kernel intersect) which is a 2D array, and (i, j) coordinates of the region  
        """
        
        _, height, width = image.shape
        
        # shift the filter window, to process all regions in the input image
        for i in range(0, height - self.filter_size + 1, stride):
            for j in range(0, width - self.filter_size + 1, stride):
                # extract the region of the image where the filter is applied
                region = image[:, i: (i + self.filter_size), j: (j + self.filter_size)]
                yield region, i, j  # what yield does that it returns a value, but this function has to be interated to get next() values
                
                
    def forward(self, image: np.array, stride: int = 1, padding: int = 0):

        """
        Perform forward pass through the convolutional layer.
        Perform convolutional operations with the kernel - filter 
        
        :param image: image to perform convolutional operation on 
        :param padding: padding applied to the image
        :param stride: stride applied to the convolutional operation
        :return: 
        """
        
        if not isinstance(stride, int):
            raise ValueError("Stride must be an integer!")
        elif stride < 1:
            raise InvalidStride(f"Stride must be at least 1, \"stride={stride}\" specified instead!")
        
        if not isinstance(padding, int):
            raise ValueError("Padding must be an integer!")
        elif padding < 0:
            raise InvalidPadding(f"Padding must be at least 0, \"padding={padding}\" specified instead!")
        
        # if image is 2D (single channel, number of channels is 1), expand from (height x width) into (1 x height x width) so it has a depth=1
        if image.ndim == 2:
            image = np.expand_dims(image, axis=0)
    
        padded_image = self.pad_image(image, padding)  # apply padding to the image
        _, height, width = image.shape  # get only height and width, image is of size (n_channels x height x width)
        
        # expected output height and width
        output_height = (height + 2 * padding - self.filter_size) // stride + 1
        output_width = (width + 2 * padding - self.filter_size) // stride + 1
        
        output = np.zeros((self.num_filters, output_height, output_width))

        for region, i, j in self.shift_filter_window(padded_image, stride):
            for n_filter in range(self.num_filters):
                # perform convolution on the region using the filter
                output[n_filter, i // stride, j // stride] = self.conv(region, self.filters[n_filter]) 
        
        return output
        
    
    @staticmethod
    def conv(image_region: np.array, kernel: np.array) -> np.array:
        
        """
        Perform convolutional operation on the image region using the filter.
        Convolution operation is sum(image_region * filter)
        
        :param image_region: region to perform convolutional operation on
        :param kernel: filter - kernel to perform convolutional operation with
        :return: convolution result
        """
        
        result = 0
  
        # perform convolution for each channel separately and sum the result
        for channel in range(image_region.shape[0]):  # loop over the channels
            conv_result = convolve2d(image_region[channel], kernel[channel], mode="valid")
            result += conv_result.item()  # extract the scalar value if it is a 1 x 1 array

        return result
        
        
    @staticmethod
    def pad_image(image: np.array, padding: int) -> np.array:
        
        """
        Apply zero-padding to the input image with multiple channels.
        
        :param image: image to apply padding on
        :param padding: padding applied to the image
        :return: padded image
        """
        
        if padding > 0:
            # apply padding to the height and width dimensions (1 and 2), keeping channels dimension intact
            padded_image = np.pad(image, ((0, 0), (padding, padding), (padding, padding)), mode="constant")
        else:
            padded_image = image
            
        return padded_image
        

In [150]:
# Example of using one convolutional layer, using only forward pass

conv_layer1 = ConvLayer(num_filters=3, filter_size=4, num_channels=1)

batch_size = 300
convolved_images = []

for i in range(0, train_images.shape[0], batch_size):  # loop over all the number of images (28709)
    
    print(f"{(i // batch_size) + 1}. batch {i}/{train_images.shape[0]}")
    
    convolved_batch = []
    batch_images = train_images[i: i + batch_size]  # Get the i-th image (shape 48x48)
    
    for image in batch_images:
        convolved_image = conv_layer1.forward(image, padding=0, stride=1)
        convolved_batch.append(convolved_image)
    
    convolved_images.extend(np.array(convolved_batch))

convolved_images = np.array(convolved_images)

1. batch 0/28709
2. batch 300/28709
3. batch 600/28709
4. batch 900/28709
5. batch 1200/28709
6. batch 1500/28709
7. batch 1800/28709
8. batch 2100/28709
9. batch 2400/28709
10. batch 2700/28709
11. batch 3000/28709
12. batch 3300/28709
13. batch 3600/28709
14. batch 3900/28709
15. batch 4200/28709
16. batch 4500/28709
17. batch 4800/28709
18. batch 5100/28709
19. batch 5400/28709
20. batch 5700/28709
21. batch 6000/28709
22. batch 6300/28709
23. batch 6600/28709
24. batch 6900/28709
25. batch 7200/28709
26. batch 7500/28709
27. batch 7800/28709
28. batch 8100/28709
29. batch 8400/28709
30. batch 8700/28709
31. batch 9000/28709
32. batch 9300/28709
33. batch 9600/28709
34. batch 9900/28709
35. batch 10200/28709
36. batch 10500/28709
37. batch 10800/28709
38. batch 11100/28709
39. batch 11400/28709
40. batch 11700/28709
41. batch 12000/28709
42. batch 12300/28709
43. batch 12600/28709
44. batch 12900/28709
45. batch 13200/28709
46. batch 13500/28709
47. batch 13800/28709
48. batch 14100

In [151]:
convolved_images.shape

(28709, 3, 45, 45)