In [1]:
import time
import numpy as np
import matplotlib.pyplot as plt
from pynq import Overlay, allocate

# Load Bitstream

ol = Overlay("design_8_wrapper.bit")
cnn = ol.CNN_MNIST_TOP_0

# Loading Weights and Biases

W1 = np.load("conv1_weights.npy")  # (3,3,1,8)
B1 = np.load("conv1_bias.npy")

W2 = np.load("conv2_weights.npy")  # (3,3,8,16)
B2 = np.load("conv2_bias.npy")

W3 = np.load("dense_weights.npy")  # (400,10)
B3 = np.load("dense_bias.npy")

# Allocating buffers

input_buf   = allocate(shape=(784,),  dtype=np.int16)
w1_buf      = allocate(shape=(72,),   dtype=np.int16)
b1_buf      = allocate(shape=(8,),    dtype=np.int16)
w2_buf      = allocate(shape=(1152,), dtype=np.int16)
b2_buf      = allocate(shape=(16,),   dtype=np.int16)
w3_buf      = allocate(shape=(4000,), dtype=np.int16)
b3_buf      = allocate(shape=(10,),   dtype=np.int16)
output_buf  = allocate(shape=(10,),   dtype=np.int16)

# Reorder for HLS layout

# Conv1: (K,K,IN,OUT) to (OUT,K,K)
w1 = np.transpose(W1, (3,0,1,2))
w1 = w1.reshape(8,3,3)

# Conv2: (K,K,IN,OUT) to (OUT,IN,K,K)
w2 = np.transpose(W2, (3,2,0,1))

# Dense: (IN,OUT) â†’ (OUT,IN)
w3 = W3.T

# Converting to float to fixed decimal points
FRAC_BITS = 8

def to_fixed(x):
    return np.round(x * (2**FRAC_BITS)).astype(np.int16)

w1_fixed = to_fixed(w1)
w2_fixed = to_fixed(w2)
w3_fixed = to_fixed(w3)
b1_fixed = to_fixed(B1)
b2_fixed = to_fixed(B2)
b3_fixed = to_fixed(B3)

# Loading weights and biases into DDR (They are constantly in DDR)
w1_buf[:] = w1_fixed.flatten()
b1_buf[:] = b1_fixed
w2_buf[:] = w2_fixed.flatten()
b2_buf[:] = b2_fixed
w3_buf[:] = w3_fixed.flatten()
b3_buf[:] = b3_fixed
output_buf[:] = 0

# Flush
w1_buf.flush()
b1_buf.flush()
w2_buf.flush()
b2_buf.flush()
w3_buf.flush()
b3_buf.flush()
output_buf.flush()

# Writing Physical Addresses of all buffers to FPGA
def write_addr(name, buf):
    addr = buf.physical_address
    setattr(cnn.register_map, name + "_1", addr & 0xFFFFFFFF)
    setattr(cnn.register_map, name + "_2", (addr >> 32) & 0xFFFFFFFF)

write_addr("input_r", input_buf)
write_addr("weight1", w1_buf)
write_addr("bias1", b1_buf)
write_addr("weight2", w2_buf)
write_addr("bias2", b2_buf)
write_addr("weight3", w3_buf)
write_addr("bias3", b3_buf)
write_addr("output_r", output_buf)


In [2]:

# Loading test images and labels (all 10000 images of MNIST)
x_test = np.load("mnist_test_images.npy")
y_test = np.load("mnist_test_labels.npy")

# Accuracy and Time testing Loop
correct = 0
N = len(x_test)
total_time = 0.0
for i in range(N):
    img = x_test[i].astype(np.float32)/255.0
    img = to_fixed(img)
    img_flat = img.flatten()
    input_buf[:] = img_flat
    
#     # Warm up
#     for j in range(5):
#         cnn.register_map.CTRL.AP_START = 1
#         while (cnn.register_map.CTRL.AP_DONE & 1) == 0:
#             pass
    
    # Accelerator
    t0 = time.perf_counter()

    cnn.register_map.CTRL.AP_START = 1
    while (cnn.register_map.CTRL.AP_DONE & 1) == 0:
        pass

    t1 = time.perf_counter()

    pred = np.argmax(output_buf)
    if pred == y_test[i]:
        correct += 1
    
    total_time += (t1-t0)*1e6

print("Accuracy:", correct/N)
print("Average time taken per image: ", total_time/N, "us")

Accuracy: 0.98
Average time taken per image:  383.2800971220418 us


In [3]:
W1 = np.transpose(W1, (3,2,0,1))
W2 = np.transpose(W2, (3,2,0,1))
W3 = W3.T

# print(W1.shape)
# print(W2.shape)
# print(W3.shape)

In [4]:
def relu(x):
    return np.maximum(0, x)

def conv2d(x, W, b):
    # x shape: (C_in, H, W)
    C_out, C_in, K, _ = W.shape
    C_x, H, W_in = x.shape

    H_out = H - K + 1
    W_out = W_in - K + 1

    out = np.zeros((C_out, H_out, W_out), dtype=np.float32)

    for oc in range(C_out):
        for ic in range(C_in):
            for i in range(H_out):
                for j in range(W_out):
                    out[oc, i, j] += np.sum(
                        x[ic, i:i+K, j:j+K] * W[oc, ic]
                    )
        out[oc] += b[oc]

    return relu(out)

def maxpool2x2(x):
    C, H, W = x.shape

    H_out = H // 2
    W_out = W // 2

    out = np.zeros((C, H_out, W_out), dtype=np.float32)

    for c in range(C):
        for i in range(H_out):
            for j in range(W_out):
                out[c, i, j] = np.max(
                    x[c, i*2:i*2+2, j*2:j*2+2]
                )

    return out

def dense(x, W, b):
    x_flat = x.reshape(-1)  # 400
    return np.dot(W, x_flat) + b

def forward(image):

    # Add channel dimension
    x = image[np.newaxis, :, :]  # (1,28,28)

    x = conv2d(x, W1, B1)   # (8,26,26)
    x = maxpool2x2(x)       # (8,13,13)
    x = conv2d(x, W2, B2)   # (16,11,11)
    x = maxpool2x2(x)       # (16,5,5)
    x = dense(x, W3, B3)    # (10,)

    return x


num_images = 10
latencies = []
correct = 0

for i in range(num_images):

    start = time.perf_counter()

    out = forward(x_test[i])

    end = time.perf_counter()

    latencies.append(end - start)

    pred = np.argmax(out)

    if pred == y_test[i]:
        correct += 1

avg_time = np.mean(latencies)

print("Average time per image (ms):", avg_time * 1000)

Average time per image (ms): 3457.5612898992404
