In [None]:
from pynq.overlays.base import BaseOverlay

base = BaseOverlay("base.bit")

In [None]:
from time import time
import numpy as np

In [None]:
import gzip
import os
from urllib import request
from struct import unpack
from random import randint

In [None]:
def read_images(file_path):
    with open(file_path, 'rb') as f:
        magic, num_images, rows, cols = unpack('>IIII', f.read(16))
        images = np.fromfile(f, dtype=np.uint8).reshape(num_images, rows, cols, 1)
    return images

def read_labels(file_path):
    with open(file_path, 'rb') as f:
        magic, num_labels = unpack('>II', f.read(8))
        labels = np.fromfile(f, dtype=np.uint8)
    return labels

In [None]:
def load_mnist():
    data_dir = "MNIST_data"
    files = ['train-images.idx3-ubyte', 'train-labels.idx1-ubyte',
             't10k-images.idx3-ubyte', 't10k-labels.idx1-ubyte']
    datasets = []
    for file in files:
        path = os.path.join(file)
        datasets.append(path)
    X_train = read_images(datasets[0])
    y_train = read_labels(datasets[1])
    X_test = read_images(datasets[2])
    y_test = read_labels(datasets[3])
    return X_train, y_train, X_test, y_test

In [None]:
def conv2d(X, kernel, bias, stride=1, padding=0):
    if len(X.shape) == 3:
        input_height, input_width, input_channels = X.shape
    elif len(X.shape) == 2:
        # If the input is a single-channel image, add a singleton dimension for channels
        input_height, input_width = X.shape
        input_channels = 1
        X = X.reshape(input_height, input_width, 1)
    else:
        raise ValueError("Input data must be 2D or 3D array")

    kernel_height, kernel_width, _, num_filters = kernel.shape
    output_height = (input_height + 2 * padding - kernel_height) // stride + 1
    output_width = (input_width + 2 * padding - kernel_width) // stride + 1
    output = np.zeros((output_height, output_width, num_filters))

    X_padded = np.pad(X, ((padding, padding), (padding, padding), (0, 0)), mode='constant')

    for f in range(num_filters):
        for i in range(0, output_height):
            for j in range(0, output_width):
                output[i, j, f] = np.sum(X_padded[i * stride:i * stride + kernel_height, j * stride:j * stride + kernel_width, :] * kernel[:, :, :, f]) + bias[:, :, :, f]
    return output


In [None]:
def max_pooling(X, pool_size=(2, 2), stride=2):
    input_height, input_width, input_channels = X.shape  # Update to handle 3D array
    pool_height, pool_width = pool_size
    output_height = (input_height - pool_height) // stride + 1
    output_width = (input_width - pool_width) // stride + 1
    output = np.zeros((output_height, output_width, input_channels))  # Update output shape
    for i in range(output_height):
        for j in range(output_width):
            for k in range(input_channels):  # Loop over channels
                output[i, j, k] = np.max(X[i * stride:i * stride + pool_height, j * stride:j * stride + pool_width, k])
    return output


In [None]:
# Function to perform ReLU activation
def relu(X):
    return np.maximum(X, 0)

# Function to perform softmax activation
def softmax(X):
    exp_vals = np.exp(X - np.max(X))
    return exp_vals / np.sum(exp_vals)

def initialize_parameters():
    np.random.seed(1)
    parameters = {}
    parameters['W1'] = np.random.randn(3, 3, 1, 8) * 0.1
    parameters['b1'] = np.zeros((1, 1, 1, 8))
    parameters['W2'] = np.random.randn(3, 3, 8, 16) * 0.1
    parameters['b2'] = np.zeros((1, 1, 1, 16))

    # Compute the size of the flattened output from the second convolutional layer
    flattened_size = 7 * 7 * 16  # Assuming pooling with pool size (2, 2) and stride 2

    parameters['W3'] = np.random.randn(1, 400) * 0.1  # Update W3 dimensions
    parameters['b3'] = np.zeros((10, 1))
    return parameters



In [None]:
# Function to forward propagate through the network
def forward_propagation(X, parameters):
    W1, b1, W2, b2, W3, b3 = parameters['W1'], parameters['b1'], parameters['W2'], parameters['b2'], parameters['W3'], parameters['b3']
    Z1 = conv2d(X, W1, b1)
    A1 = relu(Z1)
    P1 = max_pooling(A1)
    Z2 = conv2d(P1, W2, b2)
    A2 = relu(Z2)
    P2 = max_pooling(A2)
    P2_flatten = P2.reshape(P2.shape[0] * P2.shape[1] * P2.shape[2], 1)
    Z3 = np.dot(W3, P2_flatten) + b3
    return softmax(Z3)


In [None]:
def backward_propagation(Y_pred, Y, caches, parameters):
    m = Y.shape[1]
    gradients = {}
    dZ3 = Y_pred - Y
    dW3 = 1 / m * np.dot(dZ3, caches['P2'].T)
    db3 = 1 / m * np.sum(dZ3, axis=1, keepdims=True)
    dZ2 = np.dot(parameters['W3'].T, dZ3) * relu_backward(caches['Z2'])
    dW2 = 1 / m * np.dot(dZ2, caches['P1'].T)
    db2 = 1 / m * np.sum(dZ2, axis=1, keepdims=True)
    dZ1 = np.dot(parameters['W2'].T, dZ2) * relu_backward(caches['Z1'])
    dW1 = 1 / m * np.dot(dZ1, caches['X'].T)
    db1 = 1 / m * np.sum(dZ1, axis=1, keepdims=True)

    gradients['dW3'] = dW3
    gradients['db3'] = db3
    gradients['dW2'] = dW2
    gradients['db2'] = db2
    gradients['dW1'] = dW1
    gradients['db1'] = db1

    return gradients

In [None]:
def update_parameters(parameters, gradients, learning_rate):
    parameters['W3'] -= learning_rate * gradients['dW3']
    parameters['b3'] -= learning_rate * gradients['db3']
    parameters['W2'] -= learning_rate * gradients['dW2']
    parameters['b2'] -= learning_rate * gradients['db2']
    parameters['W1'] -= learning_rate * gradients['dW1']
    parameters['b1'] -= learning_rate * gradients['db1']

    return parameters


In [None]:
# Function to compute cross-entropy loss
def compute_loss(Y_pred, Y):
    m = Y.shape[0]
    logprobs = np.multiply(np.log(Y_pred), Y)
    loss = -np.sum(logprobs) / m
    return loss

In [None]:
def train_model(X_train, y_train, learning_rate=0.01, num_epochs=1):
    parameters = initialize_parameters()
    m = len(X_train)
    epoch_ = 0
    for epoch in range(num_epochs):
        epoch_loss = 0
        for i in range(m):
            index = randint(0, m - 1)
            X_batch, y_batch = X_train[index], y_train[index]
            X_batch = X_batch.squeeze()  # Remove the singleton dimension
            Y_pred = forward_propagation(X_batch, parameters)
            Y = np.zeros((10, 1))
            Y[y_batch] = 1
            loss = compute_loss(Y_pred, Y)
            epoch_loss += loss
            if(i%50==0):
                print("Epoch: ", epoch_)
                epoch_+=1
            # Backpropagation and parameter updates are not implemented for simplicity

    return parameters


In [None]:
# Function to predict labels for test data
def predict(X_test, parameters):
    m = len(X_test)
    predictions = np.zeros(m)
    for i in range(m):
        Y_pred = forward_propagation(X_test[i], parameters)
        predictions[i] = np.argmax(Y_pred)
    return predictions

In [None]:
# Load the MNIST dataset
X_train, y_train, X_test, y_test = load_mnist()
X_train, y_train, X_test, y_test = X_train[:1000], y_train[:1000], X_test[:100], y_test[:100]


print("X_train: ", X_train.shape[0])
print("X_test: ", X_test.shape[0])
print("y_train: ", y_train.shape[0])
print("y_test: ", y_test.shape[0])


X_train:  1000
X_test:  100
y_train:  1000
y_test:  100


In [None]:
# Train the model
start = time()

trained_parameters = train_model(X_train, y_train)

stop = time()
execution_time = stop-start
total = X_train.shape

  output[i, j, f] = np.sum(X_padded[i * stride:i * stride + kernel_height, j * stride:j * stride + kernel_width, :] * kernel[:, :, :, f]) + bias[:, :, :, f]


Epoch:  0
Epoch:  1
Epoch:  2
Epoch:  3
Epoch:  4
Epoch:  5
Epoch:  6
Epoch:  7
Epoch:  8
Epoch:  9
Epoch:  10
Epoch:  11
Epoch:  12
Epoch:  13
Epoch:  14
Epoch:  15
Epoch:  16
Epoch:  17
Epoch:  18
Epoch:  19


In [None]:
total = X_train.shape[0]
print("  Execution time: ", execution_time)
print("      Throughput: ", (total/execution_time))

  Execution time:  97.79597163200378
      Throughput:  10.225370056784113


In [None]:
start = time()
predictions = predict(X_test, trained_parameters)
stop = time()
execution_time = stop-start

  output[i, j, f] = np.sum(X_padded[i * stride:i * stride + kernel_height, j * stride:j * stride + kernel_width, :] * kernel[:, :, :, f]) + bias[:, :, :, f]


In [None]:
total = X_test.shape[0]
print("  Execution time: {:.4f}s".format(execution_time))
print("      Throughput: {:.4f}FPS".format(total/execution_time))

  Execution time: 10.7606s
      Throughput: 9.2932FPS


In [None]:
# Calculate accuracy
accuracy = np.mean(predictions == y_test)
print("Accuracy:", accuracy)

Accuracy: 0.08
