In [1]:
import cv2
import numpy as np
import os
import time
from IPython.display import clear_output
import matplotlib.pyplot as plt
from random import randrange
import pynq
from pynq import Overlay
from pynq import allocate
import struct

In [24]:
class nn:
    weights = list()
    #dnet/dw -> dnetL1/dw5 x.outputs[0][0]
    outputs = list()
    #dout/dnet -> doutL1/dnetL1 outputs_transfer_derivative[1][0]
    outputs_transfer_derivative = list()
    inputs = list()
    def __init__(self, weights):
        self.weights = np.array(weights, dtype=np.float32)
        layers_count, neurons_count, weights_count = self.weights.shape
        self.outputs = np.zeros((layers_count, neurons_count), dtype=np.float32)
        self.outputs_transfer_derivative = self.outputs.copy()
        
    def transfer_derivative(self, output):
        if output > 0:
            return 1
        else:
            return 0
        #return output*(1-output)
    
    def activation_function(self, output):
        if output > 0:
            return output
        else:
            return 0.000001
        #return 1/(1+np.exp(-output))
    
    def calculate_error(self, target):
        target = np.array(target, dtype=np.float32)
        target_size = target.shape[0]
        buf = 0
        for i in range(target_size):
            buf = buf + ((self.outputs[self.outputs.shape[0]-1][i] - target[i])**2)/2
        return buf
    
    def output_error(self, target):
        target = np.array(target, dtype=np.float32)
        target_size = target.shape[0]
        buf = np.zeros(target.shape)
        for i in range(target_size):
            buf[i] = (self.outputs[self.outputs.shape[0]-1][i] - target[i])
        return buf
    
    def calculate_delta_outputs(self, target, neuron_nb):
        buf = self.output_error(target)
        s = 0 
        for w in range(self.weights.shape[0]):
            wyn = buf[w]*self.outputs_transfer_derivative[self.weights.shape[0]-1][w]*self.weights[self.weights.shape[0]-1][w][neuron_nb]
            s = s + wyn
            buf[w] = wyn
        
        return buf 
    
    #{l}{n}{w}
    def forward_propagate(self, inputs):
        output_buf = inputs
        layers_count, neurons_count, weights_count = self.weights.shape
        self.inputs = inputs
        #krnl
        for l in range(layers_count):
            for_prop = output_buf.copy()
            for n in range(neurons_count):
                activation_value = 0
                for w in range(weights_count-1):
                    activation_value = activation_value + self.weights[l][n][w] * output_buf[w]
                activation_value = activation_value + self.weights[l][n][w+1]
                for_prop[n] = self.activation_function(activation_value)
                self.outputs_transfer_derivative[l][n] = self.transfer_derivative(for_prop[n])
            output_buf = for_prop.copy()
            self.outputs[l][:] = output_buf.copy()
        #krnl
     
    def backpropagationG(self, target, lr):
        layers_count, neurons_count, weights_count = self.weights.shape
        wnew = self.weights.copy()
        self.outputs = np.insert(self.outputs, 0, values=self.inputs, axis=0)
        #krnl
        l = layers_count
        en = self.output_error(target)
        #krnl

        for n in range(neurons_count): 
            for w in range(weights_count-1):              
                wnew[l-1][n][w] = self.weights[l-1][n][w] - lr*en[n] * self.outputs_transfer_derivative[l-1][n] * self.outputs[l-1][w] #self.outputs[l-1][w]
                               
        l = layers_count-1
        for n in range(neurons_count):
            en = self.calculate_delta_outputs(target, n)
            for w in range(weights_count-1):
                wnew[l-1][n][w] = self.weights[l-1][n][w] - lr* sum(en) * self.outputs_transfer_derivative[l-1][n] * self.outputs[l-1][w] #self.outputs[l-1][w]
                
        #krnl
        self.outputs = np.delete(self.outputs, 0, axis=0)
        self.weights = wnew.copy()
        
    def nn_software_trainer(self, iterations, lr):

        img_white_vector = np.ones((self.inputs.shape), np.float32)
        img_black_vector = np.zeros((self.inputs.shape), np.float32)

        inputsw = img_white_vector
        goldenw = np.array([3 for i in range(img_white_vector.shape[0])])
        inputsb = img_black_vector
        goldenb = np.array([0 for i in range(img_black_vector.shape[0])])
        errordataw = {}
        errordatab = {}

        for i in range(iterations):
            clear_output(wait=True)

            self.forward_propagate(inputsw)
            self.backpropagationG(goldenw, lr) 
            errordataw[i]=(self.calculate_error(goldenw))
            print(f"ERRORZw: {i} {self.calculate_error(goldenw)}")
            print(f"Max of outputs: {np.average(self.outputs[1])}")

            self.forward_propagate(inputsb)
            self.backpropagationG(goldenb, lr) 
            errordatab[i]=(self.calculate_error(goldenb))
            print(f"ERRORZb: {i} {self.calculate_error(goldenb)}")
            print(f"Max of outputs: {np.average(self.outputs[1])}")

        lists = sorted(errordataw.items()) # sorted by key, return a list of tuples
        x, y = zip(*lists) # unpack a list of pairs into two tuples
        plt.plot(x, y)
        plt.xlabel('iteration')
        plt.ylabel('loss')
        plt.show()

        lists = sorted(errordatab.items()) # sorted by key, return a list of tuples
        x, y = zip(*lists) # unpack a list of pairs into two tuples
        plt.plot(x, y)
        plt.xlabel('iteration')
        plt.ylabel('loss')
        plt.show()

class nn_hw_accel:
#31 speedup (192 image vector)
    def __init__(self, weights):
        self.nn_overlay = Overlay("/home/xilinx/jupyter_notebooks/design_neural.bit")
        
        print("Turning off leds...")
        self.led_0_control(0)
        self.led_1_control(0)
        
        print("Translating weight list to numpy array...")
        self.w_state = 0
        w = np.array(weights, dtype=np.float32)
        layers_count, neurons_count, weights_count = w.shape
        self.layers_count = layers_count
        self.neurons_count = neurons_count
        self.weights_count = weights_count
        
        print("Allocating dma...")
        self.target = allocate(shape=(768), dtype=np.float32)
        self.calc_error = allocate(shape=(768), dtype=np.float32)
        
        print("Allocating weights...")
        self.weights_new = allocate((2, 768, 769), dtype=np.float32)
        self.weights = allocate((2, 768, 769), dtype=np.float32) 
        for x in range(w.shape[0]):
            for y in range(w.shape[1]):
                for z in range(w.shape[2]):
                    self.weights[x][y][z]=weights[x][y][z]
                    self.weights_new[x][y][z]=weights[x][y][z]
           
        print("Allocating neural network outputs...")    
        self.outputs = allocate(shape=(3, 768), dtype=np.float32)
        print("Setting physical adresses for IP's")
        
        print("Forward propagation IP")
        lk = self.float_to_uint(0.0)
        self.nn_overlay.forward_propagate_L2_0.register_map.leak = lk
        self.nn_overlay.forward_propagate_L2_0.register_map.weight = self.weights.physical_address
        self.nn_overlay.forward_propagate_L2_0.register_map.output_r = self.outputs.physical_address
        self.nn_overlay.forward_propagate_L2_0.register_map.m_lay = layers_count
        self.nn_overlay.forward_propagate_L2_0.register_map.m_neu = neurons_count
        
        print("Back propagation IP")
        self.nn_overlay.back_propagate_L2_New_0.register_map.m_lay = layers_count
        self.nn_overlay.back_propagate_L2_New_0.register_map.m_neu = neurons_count
        self.nn_overlay.back_propagate_L2_New_0.register_map.new_weight = self.weights_new.physical_address
        self.nn_overlay.back_propagate_L2_New_0.register_map.weight = self.weights.physical_address
        self.nn_overlay.back_propagate_L2_New_0.register_map.output_offset = self.outputs.physical_address
        self.nn_overlay.back_propagate_L2_New_0.register_map.en_offset = self.calc_error.physical_address
        print("Finished")
        
    def calculate_error(self, target):
        target = np.array(target, dtype=np.float32)
        target_size = target.shape[0]
        buf = 0
        for i in range(target_size):
            buf = buf + ((self.outputs[2][i] - target[i])**2)/2
        return buf
    
    def output_error(self):
        self.nn_overlay.axi_dma_target.sendchannel.transfer(self.target)
        self.nn_overlay.axi_dma_nn_out.sendchannel.transfer(self.outputs[2])
        self.nn_overlay.axi_dma_out_r.recvchannel.transfer(self.calc_error)
        self.nn_overlay.axi_dma_nn_out.sendchannel.wait()
        self.nn_overlay.axi_dma_target.sendchannel.wait()
        self.nn_overlay.axi_dma_out_r.recvchannel.wait()
        #free running

    def float_to_uint(self, f):
        return int(struct.unpack('<I', struct.pack('<f', f))[0])
    
    def uint_to_float(self, f):
        return float(struct.unpack('<f', struct.pack('<I', f))[0])

    def backpropagationG(self, targ, lr):
        for i in range(targ.shape[0]):
            self.target[i] = targ[i]
            
        #krnl_dma
        self.output_error()
        #krnl_dma
        
        #krnl
        le_lr = self.float_to_uint(lr)
        self.nn_overlay.back_propagate_L2_New_0.register_map.learning_rate = le_lr
        self.nn_overlay.back_propagate_L2_New_0.register_map.CTRL.AP_START = 1
        #krnl
        while True:
            done_idle = self.nn_overlay.back_propagate_L2_New_0.register_map.CTRL.AP_IDLE
            done_start = self.nn_overlay.back_propagate_L2_New_0.register_map.CTRL.AP_START
            if (done_idle == 1 and done_start == 0):
                break
            print(f"Waiting...")        
        
    def forward_propagate(self, inputs):
        inputs = np.array(inputs, dtype=np.float32)
        for x in range(inputs.shape[0]):
            self.outputs[0][x] = inputs[x]
    
        #krnl
        self.nn_overlay.forward_propagate_L2_0.register_map.CTRL.AP_START = 1
        #krnl
    
    def led_1_control(self, to_leds):
        #Use this to manipulate leds
        actual_led_vals = self.nn_overlay.axi_gpio_0.channel2.read()
        led_val = (actual_led_vals & 0b000111) | to_leds << 3
        self.nn_overlay.axi_gpio_0.channel2.write(led_val, 0xffffff)
        
    def led_0_control(self, to_leds):
        #Use this to manipulate leds
        actual_led_vals = self.nn_overlay.axi_gpio_0.channel2.read()
        led_val = (actual_led_vals & 0b111000) | to_leds
        self.nn_overlay.axi_gpio_0.channel2.write(led_val, 0xffffff)
    
    def button_0_control(self):
        button_status = self.nn_overlay.axi_gpio_0.channel1.read()
        return button_status
    
def setup_camera():
    for id in range(8):     
        camera_module = cv2.VideoCapture(id)
        if (camera_module.isOpened() == True):
            return camera_module
        else:
            print(f"No camera found on {id}...")
            continue
    raise Exception("No camera has been found or camera is already connected")
    
def get_camera_frame(camera_module, width = 32, height = 32):
    dsize = (width, height)
    failed = 0
    while True:
        ret, frame = camera_module.read()  
        if (ret == False): 
            print("Failed to get frame!")
            failed += 1
            clear_output(wait=True)
            time.sleep(0.5)
            if (failed > 8):
                break
        else: 
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            resized_frame = cv2.resize(frame, dsize)
            return resized_frame
        
def normalise_image(image):
    #Image normalisation
    image = image/255
    return image

def show_image(image):
    plt.imshow(image)
    plt.show()
    #print(f"frame.shape: {frame.shape}")
    
def generate_weights_based_on_image(img_vector):
    w = np.empty(shape=(2, img_vector.shape[0], img_vector.shape[0]+1), dtype=np.float32)
    for l in range(w.shape[0]):
            for n in range(w.shape[1]):
                w[l][n] = np.random.uniform(low=0.01, high=0.15, size=(w.shape[2],))
    return w        

In [None]:
#This is for camera learning :)
#cam_module = setup_camera()
#Gaussian blur
kernel = np.ones((5,5),np.float32)/25
#Flatten image and normalise
image = get_camera_frame(cam_module, width = 12, height = 12)
img_vector = np.reshape(image.astype(np.float32), -1)
img_vector = normalise_image(img_vector)
print(f"Image shape: {img_vector.shape[0]}")
#Generate weights
weights = generate_weights_based_on_image(img_vector)

xn = nn_hw_accel(weights)
goldenw = np.array([3 for i in range(img_vector.shape[0])], dtype=np.float32)
goldenb = np.array([0 for i in range(img_vector.shape[0])], dtype=np.float32)
while True:
    #time.sleep(0.1)
    clear_output(wait=True)
    image = get_camera_frame(cam_module, width = 16, height = 1)
    image = cv2.filter2D(image,-1,kernel)
    img_vector = np.reshape(image.astype(np.float32), -1)
    img_vector = normalise_image(img_vector)
    xn.forward_propagate(img_vector)
    #If button one is pressed learn new image pattern to light up leds
    if (xn.button_0_control() == 1):
        #blue
        print("White")
        xn.led_1_control(to_leds=1)
        xn.backpropagationG(goldenw, 0.01)            
    elif (xn.button_0_control() == 2):
        #green
        print("Black")
        xn.led_1_control(to_leds=2)
        xn.backpropagationG(goldenb, 0.01)
    elif (xn.button_0_control() == 3):
        xn.led_1_control(to_leds=0b000)
        break
    else: 
        xn.led_1_control(to_leds=0b000)
    to_L = np.int(np.round(np.average(xn.outputs[2][0:img_vector.shape[0]])))
    if (to_L > 3):
        to_L = 3
    elif (to_L < 0):
        to_L = 0
        
    xn.led_0_control(to_leds=to_L)
    print(f"To leds: {to_L}")
    print(f"NN output: {np.average(xn.outputs[2][0:img_vector.shape[0]])}")

To leds: 2
NN output: 2.336979866027832


In [14]:
while True:
    #time.sleep(0.1)
    clear_output(wait=True)
    image = get_camera_frame(cam_module, width = 16, height = 1)
    image = cv2.filter2D(image,-1,kernel)
    img_vector = np.reshape(image.astype(np.float32), -1)
    img_vector = normalise_image(img_vector)
    xn.forward_propagate(img_vector)
    #If button one is pressed learn new image pattern to light up leds
    if (xn.button_0_control() == 1):
        #blue
        print("White")
        xn.led_1_control(to_leds=1)
        xn.backpropagationG(goldenw, 0.04)            
    elif (xn.button_0_control() == 2):
        #green
        print("Black")
        xn.led_1_control(to_leds=2)
        xn.backpropagationG(goldenb, 0.04)
    elif (xn.button_0_control() == 3):
        xn.led_1_control(to_leds=0b000)
        break
    else: 
        xn.led_1_control(to_leds=0b000)
    to_L = np.int(np.round(np.average(xn.outputs[2][0:img_vector.shape[0]])))
    if (to_L > 3):
        to_L = 3
    elif (to_L < 0):
        to_L = 0
        
    xn.led_0_control(to_leds=to_L)
    print(f"To leds: {to_L}")
    print(f"NN output: {np.average(xn.outputs[2][0:img_vector.shape[0]])}")

To leds: 0
NN output: 0.0


xn.weights

In [None]:
#Led blinker
while True:
    for i in range(8): 
        time.sleep(1)
        xn.led_0_control(to_leds=0)
        xn.led_1_control(to_leds=i-1)
        time.sleep(1)
        xn.led_1_control(to_leds=0)
        xn.led_0_control(to_leds=i)
