In [1]:
# Required Libraries
import os
import numpy as np
import pandas as pd
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt

In [2]:
# Helper Functions
def softmax(x):
    exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
    return exp_x / np.sum(exp_x, axis=1, keepdims=True)

def cross_entropy_loss(y_true, y_pred):
    epsilon = 1e-15
    y_pred = np.clip(y_pred, epsilon, 1 - epsilon)
    loss = -np.sum(y_true * np.log(y_pred)) / y_true.shape[0]
    return loss

def relu(x):
    return np.maximum(0, x)

def relu_derivative(x):
    return x > 0

def flatten(x):
    return x.reshape(x.shape[0], -1)

In [4]:
# Load Data
image_folder = r"C:\Users\barto\Desktop\Repos\MachineLearning\Project4\trainset1"
excel_file = r"Labels.csv"

# Load the Excel file containing labels
df = pd.read_csv(excel_file)

# Prepare data and labels
image_data = []
labels = []

# Function to preprocess images
def preprocess_image(image_path):
    image = Image.open(image_path).convert("RGB")  # Ensure RGB format
    return np.array(image)  # Convert to NumPy array

# Iterate through the Excel rows
for index, row in df.iterrows():
    image_id = row['ImageId']
    classification = row['ClassName']
    image_path = os.path.join(image_folder, image_id)
    
    # Check if the image exists and preprocess it
    if os.path.exists(image_path):
        try:
            image_array = preprocess_image(image_path)
            image_data.append(image_array)
            labels.append(classification)
        except Exception as e:
            print(f"Error processing {image_path}: {e}")

# Convert lists to numpy arrays
image_data = np.array(image_data).astype(np.float32)
image_data = image_data / 255.0
labels = np.array(labels)

In [5]:
# Encode labels to numerical values and one-hot encode
label_encoder = LabelEncoder()
labels_numeric = label_encoder.fit_transform(labels)
y_train_one_hot = np.zeros((labels_numeric.size, len(label_encoder.classes_)))
y_train_one_hot[np.arange(labels_numeric.size), labels_numeric] = 1

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(image_data, y_train_one_hot, test_size=0.2, random_state=42)
y_test = y_test.astype(int)
y_train = y_train.astype(int)

In [68]:
class ConvolutionalNeuralNetwork:
    def __init__(self, input_shape, num_classes):
        """
        Initialize the CNN.
        """
        self.input_shape = input_shape
        self.num_classes = num_classes

        # Filters for two convolutional layers
        self.conv_filter1 = np.random.randn(3, 3, input_shape[2], 8) * 0.1  # 8 filters for layer 1
        self.conv_filter2 = np.random.randn(3, 3, 8, 16) * 0.1  # 16 filters for layer 2

        # Calculate flattened size after two max-pooling layers
        pool_size = 2
        stride = 2
        # After two poolings, the height and width will be reduced by half twice
        output_height = ((input_shape[0] - 2) // pool_size) // pool_size  # After 2 poolings
        output_width = ((input_shape[1] - 2) // pool_size) // pool_size  # After 2 poolings
        flattened_size = 16 * output_height * output_width  # Number of channels * reduced height * reduced width

        # Fully connected and output layers
        self.fc_weights = np.random.randn(33856, 128) * 0.1
        self.output_weights = np.random.randn(128, num_classes) * 0.1

        self.fc_bias = np.zeros((1, 128))
        self.output_bias = np.zeros((1, num_classes))

    def convolve(self, x, filters, stride=1, padding=0):
        """
        Perform convolution operation.
        """
        batch_size, height, width, channels = x.shape
        filter_height, filter_width, _, num_filters = filters.shape
        
        # Compute output dimensions
        output_height = (height - filter_height + 2 * padding) // stride + 1
        output_width = (width - filter_width + 2 * padding) // stride + 1
        output = np.zeros((batch_size, output_height, output_width, num_filters))
        
        for b in range(batch_size):
            for f in range(num_filters):
                for i in range(0, height - filter_height + 1, stride):
                    for j in range(0, width - filter_width + 1, stride):
                        region = x[b, i:i+filter_height, j:j+filter_width, :]
                        output[b, i // stride, j // stride, f] = np.sum(region * filters[:, :, :, f])
        return output

    def max_pool(self, x, pool_size=2, stride=2):
        """
        Perform max pooling operation.
        """
        batch_size, height, width, channels = x.shape
        
        # Compute output dimensions
        output_height = (height - pool_size) // stride + 1
        output_width = (width - pool_size) // stride + 1
        output = np.zeros((batch_size, output_height, output_width, channels))
        
        for b in range(batch_size):
            for c in range(channels):
                for i in range(0, height - pool_size + 1, stride):
                    for j in range(0, width - pool_size + 1, stride):
                        region = x[b, i:i+pool_size, j:j+pool_size, c]
                        output[b, i // stride, j // stride, c] = np.max(region)
        return output


    def forward(self, x):
        """
        Forward pass through the CNN.
        """
        # Convolutional layer 1
        self.conv_output1 = self.convolve(x, self.conv_filter1)
        self.conv_output1 = relu(self.conv_output1)

        # Max pooling layer 1
        self.pooled_output1 = self.max_pool(self.conv_output1)

        # Convolutional layer 2
        self.conv_output2 = self.convolve(self.pooled_output1, self.conv_filter2)
        self.conv_output2 = relu(self.conv_output2)

        # Max pooling layer 2
        self.pooled_output2 = self.max_pool(self.conv_output2)

        # Flatten
        self.flattened = flatten(self.pooled_output2)

        # Fully connected layer
        self.fc_output = relu(np.dot(self.flattened, self.fc_weights) + self.fc_bias)

        # Output layer
        logits = np.dot(self.fc_output, self.output_weights) + self.output_bias
        predictions = softmax(logits)

        return predictions



    def backward(self, x, y_true, y_pred, learning_rate=0.01):
        """
        Optimized backward pass with reduced memory usage.
        """
        # Gradients for output layer
        output_error = y_pred - y_true
        d_output_weights = np.dot(self.fc_output.T, output_error)
        d_output_bias = np.sum(output_error, axis=0, keepdims=True)

        # Gradients for fully connected layer
        fc_error = np.dot(output_error, self.output_weights.T) * relu_derivative(self.fc_output)
        d_fc_weights = np.dot(self.flattened.T, fc_error)
        d_fc_bias = np.sum(fc_error, axis=0, keepdims=True)

        # Gradients for convolutional layer
        
        d_conv_filter1 = np.zeros_like(self.conv_filter1)
        d_conv_filter2 = np.zeros_like(self.conv_filter2)

        batch_size = x.shape[0]
        filter_height, filter_width, _, num_filters1 = self.conv_filter1.shape
        filter_height2, filter_width2, _, num_filters2 = self.conv_filter2.shape

        fc_error_reshaped = np.dot(fc_error, self.fc_weights.T).reshape(self.pooled_output2.shape)
        print(f"conv_output1 shape: {self.conv_output1.shape}")
        print(f"pooled_output1 shape: {self.pooled_output1.shape}")
        print(f"conv_output2 shape: {self.conv_output2.shape}")
        print(f"pooled_output2 shape: {self.pooled_output2.shape}")


        # Compute gradients for the second convolutional layer
        for b in range(batch_size):
            for f in range(num_filters2):
                for i in range(self.pooled_output1.shape[1] - filter_height2 + 1):  # Ensures region doesn't exceed bounds
                    for j in range(self.pooled_output1.shape[2] - filter_width2 + 1):
                        region = self.pooled_output1[b, i:i+filter_height2, j:j+filter_width2, :]
                        d_conv_filter2[:, :, :, f] += region * fc_error_reshaped[b, i, j, f]


        # Compute gradients for the first convolutional layer
        fc_error_reshaped1 = np.dot(fc_error_reshaped, self.conv_filter2.reshape(-1, num_filters2).T).reshape(self.pooled_output1.shape)
        for b in range(batch_size):
            for f in range(num_filters1):
                for i in range(self.conv_output1.shape[1]):
                    for j in range(self.conv_output1.shape[1]):
                        region = x[b, i:i+filter_height, j:j+filter_width, :]
                        d_conv_filter1[:, :, :, f] += region * fc_error_reshaped1[b, i, j, f]

        # Update weights and biases
        self.output_weights -= learning_rate * d_output_weights
        self.output_bias -= learning_rate * d_output_bias
        self.fc_weights -= learning_rate * d_fc_weights
        self.fc_bias -= learning_rate * d_fc_bias
        self.conv_filter1 -= learning_rate * d_conv_filter1
        self.conv_filter2 -= learning_rate * d_conv_filter2

    def train(self, x_train, y_train, num_epochs=10, learning_rate=0.01):
        """
        Train the CNN using SGD.
        """
        for epoch in range(num_epochs):
            y_pred = self.forward(x_train)
            print('We forwarded yippee!')
            loss = cross_entropy_loss(y_train, y_pred)
            self.backward(x_train, y_train, y_pred, learning_rate)
            print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss:.4f}")

    def calculate_accuracy(self, x_test, y_test):
        """
        Calculate the accuracy of the CNN.
        """
        y_pred = self.forward(x_test)
        predicted_classes = np.argmax(y_pred, axis=1)
        true_classes = np.argmax(y_test, axis=1)
        accuracy = np.mean(predicted_classes == true_classes) * 100
        return accuracy

In [69]:
# Initialize and Train the CNN
input_shape = (190, 190, 3)
num_classes = len(label_encoder.classes_)
cnn = ConvolutionalNeuralNetwork(input_shape=input_shape, num_classes=num_classes)

In [None]:
X_train[190]

In [None]:
cnn.forward(X_train)

In [70]:
batch_size = 32
num_samples = len(X_train)
for start in range(0, num_samples, batch_size):
    end = start + batch_size
    x_batch = X_train[start:end]
    y_batch = y_train[start:end]
    cnn.train(x_batch, y_batch, num_epochs=2, learning_rate=0.001)

# Evaluate
accuracy = cnn.calculate_accuracy(X_test, y_test)
print(f"Test Accuracy: {accuracy:.2f}%")

We forwarded yippee!
conv_output1 shape: (32, 188, 188, 8)
pooled_output1 shape: (32, 94, 94, 8)
conv_output2 shape: (32, 92, 92, 16)
pooled_output2 shape: (32, 46, 46, 16)


IndexError: index 46 is out of bounds for axis 2 with size 46