In [51]:

from pynq import Overlay
import asyncio
from psutil import cpu_percent
import numpy as np
from pynq import Xlnk
from pynq.lib import dma
from scipy.linalg import dft
import matplotlib.pyplot as plt
import time

ol = Overlay("interrupt.bit")
ol.download()
# IP's addresses
IP_CTRL = 0x00
AP_START = 0x1
GIER = 0x04
IP_IER = 0x08
IP_ISR = 0x0C
INTRPT_AP_DONE = 0x1
INTRPT_AP_READY = 0x2
OUT_REG = 0x10
INP_REG = 0x18

_INTRPT = INTRPT_AP_DONE

fact_ip = ol.fft_0
fact_ip.write(GIER, 0x1)
fact_ip.write(IP_IER, _INTRPT)

NUM_SAMPLES = 256
real_error=np.zeros(NUM_SAMPLES)
imag_error=np.zeros(NUM_SAMPLES)
ind=np.arange(NUM_SAMPLES)
real_rmse=np.zeros(NUM_SAMPLES)
imag_rmse=np.zeros(NUM_SAMPLES)
xlnk = Xlnk()
in_r = xlnk.cma_array(shape=(NUM_SAMPLES,), dtype=np.float32) 
in_i = xlnk.cma_array(shape=(NUM_SAMPLES,), dtype=np.float32)           
out_r = xlnk.cma_array(shape=(NUM_SAMPLES,), dtype=np.float32) 
out_i = xlnk.cma_array(shape=(NUM_SAMPLES,), dtype=np.float32)
a = [i for i in range(NUM_SAMPLES)]
a=np.cos(a)
real=a.real                # Change input real and imaginary value here
img=a.imag
np.copyto(in_r, real)
np.copyto(in_i, img)

# Coroutine that waits for an IP to be done.
async def read_ip(ip):
    while True:
        # Wait for the IP to finish.
        await ip.interrupt.wait()
        # Clear the interrupt and then print output's value.
        if (ip.read(IP_ISR) & _INTRPT):
            ip.write(IP_ISR, _INTRPT)
            golden_op=np.fft.fft(a)
            for i in range(NUM_SAMPLES):
                real_error[i]="{0:.6f}".format(abs(out_r[i]-golden_op.real[i]))
                imag_error[i]="{0:.6f}".format(abs(out_i[i]-golden_op.imag[i]))
            sum_sq_real=0
            sum_sq_imag=0
            for i in range(NUM_SAMPLES):
                sum_sq_real =sum_sq_real+(real_error[i]*real_error[i])
                real_rmse = np.sqrt(sum_sq_real / (i+1))
                sum_sq_imag =sum_sq_imag+(imag_error[i]*imag_error[i])
                imag_rmse = np.sqrt(sum_sq_imag / (i+1))
            print("Real Part RMSE: ", real_rmse, "Imaginary Part RMSE:", imag_rmse)    
            if real_rmse<0.001 and imag_rmse<0.001:
                print("PASS")
            else:
                print("FAIL")

# Task for IP using the coroutine
ip_task = asyncio.ensure_future(read_ip(fact_ip))

# Coroutine for writing input and starting the IP with delay
async def write_wait(interval):
    await asyncio.sleep(interval)
    
    # write to input
    fact_ip.write(0x30, NUM_SAMPLES)
    fact_ip.write(0x10,in_r.physical_address)
    fact_ip.write(0x18,in_i.physical_address)
    fact_ip.write(0x20,out_r.physical_address)
    fact_ip.write(0x28,out_i.physical_address)
    
    fact_ip.write(IP_CTRL, AP_START) # You can comment it out to test the interrupt
    print("IP started")
    await asyncio.sleep(interval)

# Run the event loop until the time interval expires
time_interval = 2  # time in seconds
loop = asyncio.get_event_loop()
write_task = asyncio.ensure_future(write_wait(time_interval))

# Using psutil to record CPU utilization.
cpu_percent(percpu=True)  # Initializing the CPU monitoring.
loop.run_until_complete(write_task)
cpu_used = cpu_percent(percpu=True)

# Printing the CPU utilization
print('CPU Utilization = {cpu_used}'.format(**locals()))

# Removing the IP task from the event loop.
ip_task.cancel()

IP started
Real Part RMSE:  2.39300739134e-05 Imaginary Part RMSE: 7.69816333615e-06
PASS
CPU Utilization = [8.1, 4.3]


True