In [1]:
import os
import numpy as np # to process the data
import random
import math
from random import shuffle
from PIL import Image
from sklearn.model_selection import train_test_split

In [2]:
class FlowerDataset:
    def __init__(self, directory):
        self.directory = directory
        self.images = []
        self.labels = []

    def load_data(self):
        image_files = os.listdir(self.directory)
        for img in image_files[:10]:
            try:
                img_path = os.path.join(self.directory, img)
                with Image.open(img_path) as img_array:
                    gray_array = img_array.convert('L')
                    resized_array = gray_array.resize((150, 150))
                    self.images.append(list(resized_array.getdata()))
                    label = img.split('_')[0]
                    self.labels.append(label)
            except Exception as e:
                print(f"Error loading image {img}: {e}")

    def preprocess_data(self):
        max_pixel_value = 255.0
        self.images = [[pixel / max_pixel_value for pixel in image] for image in self.images]

    def one_hot_encode(self):
        unique_labels = sorted(set(self.labels))
        label_to_int = {label: index for index, label in enumerate(unique_labels)}
        self.label_indices = [label_to_int[label] for label in self.labels]
        self.labels = [[int(i == label_index) for i in range(len(unique_labels))] for label_index in self.label_indices]

def split_data(images, labels, train_ratio, validation_ratio, test_ratio):
    combined = list(zip(images, labels))
    shuffle(combined)
    shuffled_images, shuffled_labels = zip(*combined)

    train_end = int(len(shuffled_images) * train_ratio)
    validation_end = train_end + int(len(shuffled_images) * validation_ratio)

    x_train = shuffled_images[:train_end]
    y_train = shuffled_labels[:train_end]
    x_val = shuffled_images[train_end:validation_end]
    y_val = shuffled_labels[train_end:validation_end]
    x_test = shuffled_images[validation_end:]
    y_test = shuffled_labels[validation_end:]

    return list(x_train), list(x_val), list(x_test), list(y_train), list(y_val), list(y_test)

# Usage
dataset = FlowerDataset('flowers')
dataset.load_data()
dataset.preprocess_data()
dataset.one_hot_encode()

x_train, x_val, x_test, y_train, y_val, y_test = split_data(dataset.images, dataset.labels, 0.75, 0.15, 0.10)

# Print the sizes of the datasets
print(f"Train set size: {len(x_train)}, Validation set size: {len(x_val)}, Test set size: {len(x_test)}")

# Print the unique classes in the training set
unique_classes = sorted(set([label.index(1) for label in y_train]))
print(f"Unique classes in the training set: {unique_classes}")

num_classes = len(set(dataset.label_indices))
print(f"Number of classes for prediction: {num_classes}")


Train set size: 7, Validation set size: 1, Test set size: 2
Unique classes in the training set: [0, 1, 2, 3, 4]
Number of classes for prediction: 7


In [19]:
# Base Layer class
class Layer:
    def forward(self, input_data):
        raise NotImplementedError

    def backward(self, output_error, learning_rate):
        raise NotImplementedError

# Convolutional Layer
class ConvLayer(Layer):
    def __init__(self, num_filters, filter_size, padding=0, stride=1):
        self.num_filters = num_filters
        self.filter_size = filter_size
        self.filters = np.random.randn(num_filters, filter_size, filter_size) / 9  # normalize values
        self.padding = padding
        self.stride = stride
        
    def forward(self, input_data):
        self.last_input = input_data
    
        # Get the width, height and depth of the input
        h, w, d = input_data.shape

        # Print the input dimensions, filter size, padding, and stride
        print(f"Input dimensions: height={h}, width={w}, depth={d}")
        print(f"Filter size: {self.filter_size}")
        print(f"Padding: {self.padding}, Stride: {self.stride}")
        
        # Apply padding to the input
        input_padded = np.pad(input_data, ((self.padding, self.padding), (self.padding, self.padding), (0, 0)), 'constant')
        
        # Update the output dimensions calculation to include padding and stride
        new_h = int((h + 2 * self.padding - self.filter_size) / self.stride) + 1
        new_w = int((w + 2 * self.padding - self.filter_size) / self.stride) + 1

        # Print the calculated output dimensions
        print(f"Calculated output dimensions: new_height={new_h}, new_width={new_w}")
        
        output = np.zeros((new_h, new_w, self.num_filters))
        
        # Perform the convolution operation
        for i in range(h - self.filter_size + 1):
            for j in range(w - self.filter_size + 1):
                for f in range(self.num_filters):
                    # Apply the filters to the input
                    output[i, j, f] = np.sum(input_data[i:i+self.filter_size, j:j+self.filter_size] * self.filters[f])
    
        return output

    def backward(self, output_error, learning_rate):
        d_filters = np.zeros(self.filters.shape)
        d_input = np.zeros(self.last_input.shape)
        
        h, w, d = output_error.shape
        h_i, w_i, d_i = self.last_input.shape
        
        for i in range(h):
            for j in range(w):
                for f in range(self.num_filters):
                    # Update the gradients for the filters
                    d_filters[f] += output_error[i, j, f] * self.last_input[i:i+self.filter_size, j:j+self.filter_size]
                    
                    # Calculate the gradient with respect to the input
                    d_input[i:i+self.filter_size, j:j+self.filter_size] += self.filters[f] * output_error[i, j, f]
        
        # Update the filters
        self.filters -= learning_rate * d_filters
        
        return d_input
        
# MaxPooling Layer
class MaxPoolingLayer(Layer):
    def __init__(self, pool_size):
        self.pool_size = pool_size

    def forward(self, input_data):
        self.last_input = input_data
        h, w, num_filters = input_data.shape
        output = np.zeros((h // self.pool_size, w // self.pool_size, num_filters))
        for i in range(h // self.pool_size):
            for j in range(w // self.pool_size):
                for f in range(num_filters):
                    output[i, j, f] = np.max(input_data[i*self.pool_size:(i+1)*self.pool_size, j*self.pool_size:(j+1)*self.pool_size, f])
        return output

    def backward(self, output_error, learning_rate):
        d_input = np.zeros(self.last_input.shape)
        h, w, num_filters = self.last_input.shape
        for i in range(h // self.pool_size):
            for j in range(w // self.pool_size):
                for f in range(num_filters):
                    h_start, w_start = i*self.pool_size, j*self.pool_size
                    patch = self.last_input[h_start:h_start+self.pool_size, w_start:w_start+self.pool_size, f]
                    h_i, w_i = np.unravel_index(np.argmax(patch), patch.shape)
                    d_input[h_start+h_i, w_start+w_i, f] = output_error[i, j, f]
        return d_input
        
class FlattenLayer(Layer):
    def forward(self, input_data):
        self.last_input_shape = input_data.shape
        return input_data.flatten()
        
    def backward(self, output_error, learning_rate):
        return output_error.reshape(self.last_input_shape)
        
# Dropout Layer
class DropoutLayer(Layer):
    def __init__(self, dropout_rate):
        self.dropout_rate = dropout_rate

    def forward(self, input_data):
        self.mask = (np.random.rand(*input_data.shape) > self.dropout_rate) / (1.0 - self.dropout_rate)
        return input_data * self.mask

    def backward(self, output_error, learning_rate):
        return output_error * self.mask

# Dense Layer
class DenseLayer(Layer):
    def __init__(self, input_size, output_size):
        self.weights = np.random.randn(input_size, output_size) * np.sqrt(2. / input_size)
        self.biases = np.zeros(output_size)
        
    def forward(self, input_data):
        self.last_input = input_data
        return np.dot(input_data, self.weights) + self.biases
    
    def backward(self, output_error, learning_rate):
        d_weights = np.outer(self.last_input, output_error)
        d_biases = output_error
        self.weights -= learning_rate * d_weights
        self.biases -= learning_rate * d_biases
        return np.dot(output_error, self.weights.T)
        
class SoftmaxLayer(Layer):
    def forward(self, input_data):
        exp = np.exp(input_data - np.max(input_data, axis=0))  # subtract max for numerical stability
        return exp / np.sum(exp, axis=0)

    def backward(self, output_error, learning_rate):
        return output_error  # Error is passed straight through to the next layer
        
# ReLU Activation
class ReLULayer(Layer):
    def forward(self, input_data):
        self.last_input = input_data
        return np.maximum(0, input_data)

    def backward(self, output_error, learning_rate):
        # The derivative of ReLU is 1 for positive inputs and 0 for negative inputs
        return output_error * (self.last_input > 0)

# Cross-Entropy Loss
class CrossEntropyLoss:
    def calculate_loss(self, predicted, actual):
        return -np.sum(actual * np.log(predicted + 1e-9))  # add small constant to avoid log(0)
        
    def calculate_gradient(self, predicted, actual):
        return predicted - actual

In [20]:
class CNN:
    def __init__(self, threshold=0.5, early_stopping_patience=5):
        self.layers = [
            ConvLayer(num_filters=32, filter_size=3),
            ReLULayer(),
            MaxPoolingLayer(pool_size=2),
            FlattenLayer(),
            DenseLayer(input_size=32*74*74, output_size=num_classes),  # Assuming the input image size is 150x150
            SoftmaxLayer()
        ]
        self.loss = CrossEntropyLoss()
        self.threshold = threshold
        self.early_stopping_patience = early_stopping_patience
        self.best_loss = np.inf
        self.patience_counter = 0

    def forward(self, X):
        for layer in self.layers:
            # Add a channel dimension to X if it's missing
            if len(X.shape) == 2:
                X = X.reshape(X.shape[0], X.shape[1], 1)
            X = layer.forward(X)
        return X

    def backward(self, X, y, learning_rate):
        output_error = self.loss.calculate_gradient(X, y)
        for layer in reversed(self.layers):
            output_error = layer.backward(output_error, learning_rate)

    def calculate_loss(self, X, y):
        return self.loss.calculate_loss(X, y)

    def calculate_accuracy(self, X, y):
        predictions = self.predict(X)
        return np.mean(predictions == y)

    def train(self, X_train, y_train, X_val, y_val, epochs, learning_rate):
        # Calculate the output size after the convolution and pooling layers
        sample_output = self.forward(np.expand_dims(X_train[0], axis=-1))
        flattened_size = sample_output.size
        
        # Update the DenseLayer with the correct input size
        self.layers[-2] = DenseLayer(input_size=flattened_size, output_size=num_classes)
        
        for epoch in range(epochs):
            train_losses = []
            train_accuracies = []
            
            for j in range(len(X_train)):
                # Ensure the input data has three dimensions: height, width, and depth
                input_data = np.expand_dims(X_train[j], axis=-1)  # Add the depth dimension
                output = self.forward(input_data)
                self.backward(output, y_train[j], learning_rate)
                
                # Calculate loss and accuracy for each training example
                train_loss = self.calculate_loss(output, y_train[j])
                train_losses.append(train_loss)
                train_accuracy = self.calculate_accuracy(output, y_train[j])
                train_accuracies.append(train_accuracy)
            
            # Calculate average training loss and accuracy
            avg_train_loss = np.mean(train_losses)
            avg_train_accuracy = np.mean(train_accuracies)
            
            # Validation loss and accuracy
            val_output = self.forward(X_val)
            val_loss = self.calculate_loss(val_output, y_val)
            val_accuracy = self.calculate_accuracy(val_output, y_val)
            
            print(f"Epoch: {epoch}, Train Loss: {avg_train_loss}, Train Accuracy: {avg_train_accuracy}, Val Loss: {val_loss}, Val Accuracy: {val_accuracy}")

            # Early stopping
            if val_loss < self.best_loss:
                self.best_loss = val_loss
                self.patience_counter = 0
            else:
                self.patience_counter += 1
            if self.patience_counter >= self.early_stopping_patience:
                print("Early stopping due to no improvement in validation loss.")
                break

    def predict(self, X):
        predictions = []
        for i in range(len(X)):
            output = self.forward(X[i])
            predictions.append(np.argmax(output))
        return np.array(predictions)



cnn = CNN()
cnn.train(x_train, y_train, x_val, y_val, epochs=5, learning_rate=1)

Input dimensions: height=22500, width=1, depth=1
Filter size: 3
Padding: 0, Stride: 1
Calculated output dimensions: new_height=22498, new_width=-1


ValueError: negative dimensions are not allowed