In [107]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten, Dense, Conv2D, MaxPooling2D, Dropout
import matplotlib.pyplot as plt
import numpy as np
from math import log, exp
from numpy.random import randn

## CNN Class


In [108]:
class CNNClass():
    def __init__(self, shape=(128, 128, 3), epochs=40, batch_size=32, learning_rate=0.001):
        self.model = Sequential([
            Conv2D(32, (3, 3), activation='relu', input_shape=shape),
            MaxPooling2D((2, 2)),
            Conv2D(64, (3, 3), activation='relu'),
            MaxPooling2D((2, 2)),
            Conv2D(64, (3, 3), activation='relu'),
            Flatten(),
            Dense(128, activation='relu'),
            Dense(1, activation='sigmoid')
        ])
        self.model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
        self.epochs = epochs
        self.batch_size = batch_size
        self.learning_rate = learning_rate

    def train_classifier(self, train_inputs, train_outputs,epochs=40, batch_size=32, learning_rate=0.001):
        self.epochs = epochs
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        
        return self.model.fit(train_inputs, train_outputs, epochs=self.epochs, batch_size=self.batch_size)

    def get_predicted_data(self, test_inputs):
        return np.round(self.model.predict(test_inputs))

In [109]:
class MyCNN:
    def __init__(self) -> None:
        self.layers = []

    def addConvolutionLayer(self, numberOfFilters, filterSize):
        height, width = filterSize
        filters = [[randn() for i in range(0, width)] for j in range(0, height)]
        self.layers.append(
            {"type": "convolution", "numberOfFilters": numberOfFilters, "filter": filters, "filterSize": filterSize})

    def addMaxPoolingLayer(self, filterSize):
        self.layers.append({"type": "maxPooling", "filterSize": filterSize})

    def addFullyConnectedLayer(self, inputSize, outputSize, activationFunction=lambda x: x):
        weights = [[randn() for j in range(0, inputSize)] for i in range(0, outputSize)]
        self.layers.append({"type": "fullyConnected", "weights": weights, "activationFunction": activationFunction})

    def _applyLayer(self, input, layer):
        type = layer["type"]
        if type == "convolution":
            return self.applyConvolution(layer, input)
        elif type == "maxPooling":
            return self.applyMaxPooling(layer, input)
        else:
            return self.applyFullyConnected(layer, input)

    def applyConvolution(self, layer, input):
        height, width, depth = len(input), len(input[0]), len(input[0][0])
        filter = layer["filter"]
        filterSize = layer["filterSize"]
        numberOfFilters = layer["numberOfFilters"]

        lineStart = filterSize[0] // 2
        lineFinish = height - filterSize[0] // 2

        columnStart = filterSize[1] // 2
        columnFinish = width - filterSize[1] // 2

        result = []
        for i in range(lineStart, lineFinish):
            result.append([])
            for j in range(columnStart, columnFinish):
                result[-1].append([])
                for k in range(0, numberOfFilters):
                    for d in range(0, depth):
                        suma = 0
                        for h in range(0, filterSize[0]):
                            for w in range(0, filterSize[1]):
                                suma = suma + filter[h][w] * input[i + h - lineStart][j + w - columnStart][d]
                        result[-1][-1].append(suma)
        return result

    def applyMaxPooling(self, layer, input):
        filterSize = layer["filterSize"]
        height, width = filterSize[0], filterSize[1]
        inputHeight, inputWidth, inputDepth = len(input), len(input[0]), len(input[0][0])

        result = []
        for i in range(0, inputHeight - height, height):
            result.append([])
            for j in range(0, inputWidth - width, width):
                result[-1].append([])
                for d in range(0, inputDepth):
                    filterValues = []
                    for row in range(i, i + height):
                        for col in range(j, j + width):
                            filterValues.append(input[row][col][d])
                    maxValue = max(filterValues)
                    result[-1][-1].append(maxValue)
        return result

    def inputLinearization(self, input):
        if all(isinstance(x, (int, float)) for x in input):
            return input
        result = []
        inputHeight, inputWidth, inputDepth = len(input), len(input[0]), len(input[0][0])
        for h in range(0, inputHeight):
            for w in range(0, inputWidth):
                for d in range(0, inputDepth):
                    result.append(input[h][w][d])
        return result

    def applyFullyConnected(self, layer, input):
        linearInput = self.inputLinearization(input)
        weights = layer["weights"]
        activationFunction = layer["activationFunction"]

        expectedInputSize = len(weights[0])
        if len(linearInput) != expectedInputSize:
            linearInput = linearInput[:expectedInputSize]

        result = []
        for w in range(0, len(weights)):
            value = sum(weights[w][i] * linearInput[i] for i in range(0, len(linearInput)))
            computedValue = activationFunction(value)
            result.append(computedValue)
        return result


    def applyAllLayers(self, input):
        result = input
        for layer in self.layers:
            result = self._applyLayer(result, layer)
        return result

    def balanceWeightsConvolution(self, errors, layer):
        learning_rate = 0.0000001
        for error in errors:
            filter = layer["filter"]
            filterSize = layer["filterSize"]
            for h in range(0, filterSize[0]):
                for w in range(0, filterSize[1]):
                    layer["filter"][h][w] = layer["filter"][h][w] - error * learning_rate

    def balanceWeigthsFullyConnected(self, errors, layer, inputLine):
        learning_rate = 0.0000001
        for e in range(0, len(errors)):
            result = inputLine
            for layer1 in self.layers[:-1]:
                result = self._applyLayer(result, layer1)
            linearResult = self.inputLinearization(result)

            for i in range(0, len(linearResult)):
                layer["weights"][e][i] = layer["weights"][e][i] - errors[e] * learning_rate * linearResult[i]

    def backpropagation(self, errors, inputLine):
        for i in range(len(self.layers) - 1, -1, -1):
            layer = self.layers[i]
            type = layer["type"]
            if type == "convolution":
                self.balanceWeightsConvolution(errors, layer)
            elif type == "fullyConnected":
                self.balanceWeigthsFullyConnected(errors, layer, inputLine)

    def train(self, input, output, numberOfEpochs=10):
        for e in range(0, numberOfEpochs):
            loss = 0
            for i in range(0, len(input)):
                inputLine = input[i]
                outputLine = output[i]
                result = self.applyAllLayers(inputLine)

                if outputLine == "YES":
                    errors = [1 - result[0], 0 - result[1]]
                    determined = 1
                else:
                    errors = [0 - result[0], 1 - result[1]]
                    determined = 0

                loss = loss + determined * log(result[0]) + (1 - determined) * log(1 - result[0])
                self.backpropagation(errors, inputLine)

            loss = -1 * loss / len(input)
            print("Epoch ", e, " loss = ", loss, "\n")

    def predict(self, input):
        output = []
        for inputItem in input:
            probabilities = self.applyAllLayers(inputItem)
            if probabilities[0] > probabilities[1]:
                output.append("YES")
            else:
                output.append("NO")
        return output

In [2]:
def reshape_data(train_inputs, train_outputs, test_inputs, test_outputs):
    
    train_inputs=np.array(train_inputs)
    test_inputs=np.array(test_inputs)
    test_outputs=np.array(test_outputs)
    train_outputs=np.array(train_outputs)
    train_inputs = np.array(train_inputs).reshape(-1, 128, 128, 3)  
    test_inputs = np.array(test_inputs).reshape(-1, 128, 128, 3)
    train_outputs = np.array(train_outputs).flatten()
    test_outputs = np.array(test_outputs).flatten()
    print(train_inputs.shape,train_outputs.shape,test_inputs.shape,test_outputs.shape)
    return train_inputs, train_outputs, test_inputs, test_outputs


In [111]:
def tang_func(x):
    from numpy import tanh
    return tanh(x)
def with_MyCNN(train_inputs, train_outputs, test_inputs, test_outputs):
    train_inputs, train_outputs, test_inputs, test_outputs = reshape_data(train_inputs, train_outputs, test_inputs, test_outputs)
    print(train_inputs.shape, train_outputs.shape, test_inputs.shape, test_outputs.shape)
    neuralNetwork = MyCNN()
    neuralNetwork.addConvolutionLayer(3, (3, 3))
    neuralNetwork.addMaxPoolingLayer((4, 4))
    neuralNetwork.addFullyConnectedLayer(15 * 15 * 9, 10, tang_func)
    neuralNetwork.addFullyConnectedLayer(10, 2, lambda x: 1 / (1 + exp(-x).real))
    neuralNetwork.train(train_inputs, train_outputs)
    predicted_outputs = neuralNetwork.predict(test_inputs)
    predicted_outputs=[1 if x=='YES' else 0 for x in predicted_outputs]
    return predicted_outputs


## Getting computed outputs

In [43]:
def with_CNN(train_inputs,train_outputs,test_inputs,test_outputs,output_names=['no_sepia','sepia']):
    train_inputs = np.array(train_inputs)
    train_outputs = np.array(train_outputs)
    test_inputs = np.array(test_inputs)
    test_outputs = np.array(test_outputs)
    cnnTool = CNNClass()
    cnnTool.train_classifier(train_inputs, train_outputs)
    
    computed_outputs = cnnTool.get_predicted_data(test_inputs)
    return computed_outputs
    

## Running experiments

In [44]:
def run_cnn_experiments(train_inputs, train_outputs, test_inputs, test_outputs):
    hyperparameters = [
        {'epochs': 20, 'batch_size': 16, 'learning_rate': 0.001},
        {'epochs': 30, 'batch_size': 32, 'learning_rate': 0.001},
        {'epochs': 40, 'batch_size': 64, 'learning_rate': 0.0005},
        {'epochs': 50, 'batch_size': 128, 'learning_rate': 0.0005}
    ]
    train_inputs = np.array(train_inputs)
    train_outputs = np.array(train_outputs)
    test_inputs = np.array(test_inputs)
    test_outputs = np.array(test_outputs)
    results = []
    for config in hyperparameters:
        cnn = CNNClass()
        print(f"Training with config: {config}")
        cnn.train_classifier(train_inputs, train_outputs, epochs=config['epochs'], batch_size=config['batch_size'], learning_rate=config['learning_rate'])
        
        predicted_outputs = cnn.get_predicted_data(test_inputs)
        accuracy = np.mean(predicted_outputs.flatten() == test_outputs.flatten())
        print(f"Test Accuracy: {accuracy}")
        results.append((config, accuracy))

    return results


In [55]:
class SimpleCNN:
    def __init__(self, input_dim, num_filters, filter_size, pool_size, output_dim):
        self.z_pool_flat = None
        self.num_filters = num_filters
        self.filter_size = filter_size
        self.pool_size = pool_size
        self.output_dim = output_dim

        
        conv_output_size = (input_dim[0] - filter_size + 1) // pool_size  # Assuming square filters and square pooling
        self.fc_input_dim = conv_output_size * conv_output_size * num_filters
        self.filters = np.random.randn(num_filters, filter_size, filter_size, input_dim[2]) * 0.1
        self.fc_weights = np.random.randn(self.fc_input_dim, output_dim) * 0.1
        self.fc_bias = np.zeros((1, output_dim))

    def convolve(self, image, filt):
        filter_height, filter_width, num_channels = filt.shape
        image_height, image_width, _ = image.shape
        out_height = image_height - filter_height + 1
        out_width = image_width - filter_width + 1
        output = np.zeros((out_height, out_width))
        for i in range(out_height):
            for j in range(out_width):
                output[i, j] = np.sum(image[i:i+filter_height, j:j+filter_width] * filt)
        return output

    def relu(self, x):
        return np.maximum(0, x)

    def max_pool(self, image, size):
        pool_height, pool_width = size, size
        downsampled_height = image.shape[0] // pool_height
        downsampled_width = image.shape[1] // pool_width
        output = np.zeros((downsampled_height, downsampled_width))
        for i in range(downsampled_height):
            for j in range(downsampled_width):
                h_start = i * pool_height
                w_start = j * pool_width
                h_end = h_start + pool_height
                w_end = w_start + pool_width
                output[i, j] = np.max(image[h_start:h_end, w_start:w_end])
        return output

    def forward(self, x):
        batch_size = x.shape[0]
        z_conv = np.array([[self.convolve(x[n], self.filters[f]) for f in range(self.num_filters)] for n in range(batch_size)])
        a_conv = self.relu(z_conv)
        z_pool = np.array([[self.max_pool(a_conv[n][f], self.pool_size) for f in range(self.num_filters)] for n in range(batch_size)])
        self.z_pool_flat = z_pool.reshape(batch_size, -1)
        z_fc = np.dot(self.z_pool_flat, self.fc_weights) + self.fc_bias
        a_fc = self.sigmoid(z_fc)
        return a_fc


    def sigmoid(self, x):
        x_clipped = np.clip(x, -20, 20)
        return 1 / (1 + np.exp(-x_clipped))

    def compute_loss(self, y_true, y_pred):
        epsilon = 1e-10  
        y_pred = np.clip(y_pred, epsilon, 1 - epsilon)  
        return -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))

    def train(self, x, y, epochs, learning_rate):
        for epoch in range(epochs):
            # Forward pass
            y_pred = self.forward(x)
            loss = self.compute_loss(y, y_pred)
    
            # Ensure y is reshaped to match y_pred if necessary
            if y.ndim == 1 or y.shape[1] != y_pred.shape[1]:
                y = y.reshape(-1, 1)
    
            # Gradient of loss w.r.t. final layer output
            dL_dz = y_pred - y
    
            # Gradients of loss w.r.t weights and biases (fully connected layer)
            dL_dw = np.dot(self.z_pool_flat.T, dL_dz)  # Check the shapes here
            dL_db = np.sum(dL_dz, axis=0, keepdims=True)
    
            # Update parameters
            self.fc_weights -= learning_rate * dL_dw
            self.fc_bias -= learning_rate * dL_db
    
            print(f'Epoch {epoch+1}, Loss: {loss}')
            print("Gradient shapes:", dL_dw.shape, self.fc_weights.shape, dL_db.shape, self.fc_bias.shape)




    def predict(self, x):
        y_pred = self.forward(np.array(x))
        predictions = (y_pred > 0.5).astype(np.int32)
        return predictions

In [53]:
def tang_func(x):
    from numpy import tanh
    return tanh(x)
def with_MyCNNd(train_inputs, train_outputs, test_inputs, test_outputs):
    train_inputs, train_outputs, test_inputs, test_outputs = reshape_data(train_inputs, train_outputs, test_inputs, test_outputs)
    input_dim = (128, 128, 3)  # Height, Width, Channels
    num_filters = 8
    filter_size = 3
    pool_size = 2
    output_dim = 1

    model = SimpleCNN(input_dim, num_filters, filter_size, pool_size, output_dim)
    model.train(train_inputs,train_outputs,10,0.1)
    return model


In [2]:
def predict(x,model):
    x=np.array(x)
    x = np.array(x).reshape(-1, 128, 128, 3)
    pred=model.predict(x)
    return pred