## Neural Network Hardware Accelerator

An overlay is used to read and write data from a neural network accelerator implemented on the FPGA. 
The model implemented is MLP, with 2 hidden fully connected layers with ReLU activation function.
An Axilite MMIO interface is used to handle reading and writing of data between the CPU and FPGA.

In [1]:
from pynq import Overlay
import struct
import time
import pynq
import pandas as pd


In [2]:
# Initializes DataRecorder object to monitor power rails using PMBus
def init_power_recorder(): 
    rails = pynq.get_rails()
    RAIL_NAME = 'PSINT_FP'
    return pynq.DataRecorder(rails[RAIL_NAME].power)

In [3]:
# Axilite MMIO interface does not support read/write of floats
def float_to_integer(f):
    return struct.unpack('<I', struct.pack('<f', f))[0]
    
def integer_to_float(i):
    return struct.unpack('f', struct.pack('I', i))[0]

In [12]:
# Initialize pynq Overlay to program FPGA using bitstream
overlay = Overlay('MLP6.bit')
accelerate_mlp = overlay.accelerate_MLP_0

In [5]:
# Read in test inputs and labels as lists 
x_test = pd.read_csv('X_test.csv', header=None).values.tolist()
test_length = len(x_test)
input_length = len(x_test[0])

# Get y labels from the txt file
y_test = pd.read_csv('y_test.csv', names=['Activity'], squeeze=True).tolist()

In [6]:
len(x_test[0])

480

In [7]:
def predict(inputs):   

    # Mapping for MMIO obtained from Vivado HLS

    # AXILiteS
    # 0x0000 : Control signals
    #          bit 0  - ap_start (Read/Write/COH)
    #          bit 1  - ap_done (Read/COR)
    #          bit 2  - ap_idle (Read)
    #          bit 3  - ap_ready (Read)
    #          bit 7  - auto_restart (Read/Write)
    #          others - reserved
    # 0x0800 ~
    # 0x0fff : Memory 'inputs' (480 * 32b)
    #          Word n : bit [31:0] - inputs[n]
    # 0x1000 ~
    # 0x100f : Memory 'outputs' (3 * 32b)
    #          Word n : bit [31:0] - outputs[n]

    INPUT_OFFSET = 0x0800
    OUTPUT_OFFSET = 0x1000
    CONTROL_OFFSET = 0x0000
    AP_START = 1
    AP_DONE = 2
    # Each input is 32 bits = 4 bytes
    NUM_BYTES = 4
    INPUT_LENGTH = 480
    OUTPUT_LENGTH = 3
    
    #write_start = time.time()
    #Write input values to IP
    for i in range(INPUT_LENGTH):
        int_input = float_to_integer(inputs[i])
        accelerate_mlp.write(INPUT_OFFSET + i*NUM_BYTES, int_input)
    #write_end = time.time()
    #print(f"writing: {write_end-write_start}")
    
     # Start executing IP
    accelerate_mlp.write(CONTROL_OFFSET, AP_START)
    
    #execute_start = time.time()
    # Wait for AP_DONE bit to be asserted when outputs are ready
    while accelerate_mlp.read(CONTROL_OFFSET) & AP_DONE == 0:
        continue
    #execute_end = time.time()
    #print(f"execution: {execute_end-execute_start}")
    
    #read_start = time.time()
    # Read output values from IP
    result = []
    for i in range(OUTPUT_LENGTH):
        res = accelerate_mlp.read(OUTPUT_OFFSET + i*NUM_BYTES)
        result.append(integer_to_float(res))
    #read_end = time.time()
    #print(f"reading: {read_end-read_start}")
    
    # Return label with highest value
    return result.index(max(result))

In [8]:
# Single prediction test
start_time = time.time()
x = predict(x_test[0])
latency = time.time() - start_time
print(f"latency: {latency}")

latency: 0.01224374771118164


In [9]:
# Sample the power every 0.1 seconds 
recorder = init_power_recorder()
with recorder.record(0.1):
    start_time = time.time()
    correct_predictions = 0
    
    # Test FPGA model 
    for i in range(test_length):
        res = predict(x_test[i])
        if res == y_test[i]:
            correct_predictions += 1
    
    latency = time.time() - start_time
    print(f"latency: {latency}\n")

    print(f"accuracy: {correct_predictions/test_length * 100}%")

print(f"Average latency: {latency / test_length}\n")

latency: 6.456728458404541

accuracy: 87.17472118959108%
Average latency: 0.012001354011904351



In [49]:
# Sample the power every 0.1 seconds 
recorder = init_power_recorder()
with recorder.record(0.1):
    # Model execution
    start_time = time.time()
    correct_predictions = 0
    
    # Test FPGA model 
    for i in range(10):
        res = predict(x_test[i])
        if res == y_test[i]:
            correct_predictions += 1
    
    latency = time.time() - start_time
    print(f"latency: {latency}\n")

    print(f"accuracy: {correct_predictions/10 * 100}%")

print(f"Average latency: {latency / 10}\n")

latency: 0.12263107299804688

accuracy: 80.0%
Average latency: 0.012263107299804687



In [37]:
# View the sampled power data
recorder.frame

Unnamed: 0,Mark,PSINT_FP_power
2021-10-05 07:41:10.436277,0.0,0.75
