In [1]:
import npu
import numpy as np
from npu.build.appbuilder import AppBuilder
from npu.runtime import AppRunner
import pylab as plt
import scipy

In [2]:
M     = 3          # Number of taps
P     = 1024       # Number of 'branches', also fft length
W     = 1024       # Number of windows of length M*P in input time stream
n_int = 10          # Number of time integrations on output data
n_points=M*P
dt=np.float32

In [3]:
def createKernel(kernel_name):
    kernel_path = f"./kernels/{kernel_name}.c"
    with open(kernel_path, 'r') as file:
        data = file.read()
    get_ipython().run_cell_magic('kernel', '', data)

def generateKernels():
    # Passtrough kernel
    createKernel("passthrough")
    dt = np.float32
    passthrough.ports[0].array = np.ndarray([1, n_points], dtype=dt) # uint16_t as placeholder of same size
    passthrough.ports[1].array = np.ndarray([1, n_points], dtype=dt)

    return [passthrough]

In [4]:
kernels = generateKernels()
passthrough

<npu.build.kernel.Kernel at 0x733d8d5552b0>

In [5]:
class SimpleApplication(AppBuilder):
    
    def __init__(self, kernel):
        self.kernel = kernel
        super().__init__()

    def callgraph(self, x_in:np.ndarray, x_out:np.ndarray):
        rows = x_in.shape[0]
        bytes_per_row = x_in.shape[1]
        for row in range(rows):
            kernel_output = self.kernel(x_in[row], bytes_per_row)
            x_out[row] = kernel_output

In [6]:
app_builder = SimpleApplication(kernel=passthrough)

In [7]:
intput_img = np.zeros(shape=(1, n_points), dtype=dt)
output_img = np.zeros(shape=(1, n_points), dtype=dt)

app_builder.build(intput_img, output_img)

Using cached passthrough kernel object file...
Building the xclbin...
ERROR! WSL failed 

cp: cannot stat '/tmp/pnx5heyabme/build_template/final.xclbin': No such file or directory



CalledProcessError: Command '['cp', '/tmp/pnx5heyabme/build_template/final.xclbin', './SimpleApplication.xclbin']' returned non-zero exit status 1.

In [None]:
app_builder.metadata

In [None]:
noise = np.random.normal(loc=0.5, scale=0.1, size=n_points).astype(dt)
samples = np.arange(M*P)
freq    = 1
amp     = 1
cw_signal = (amp * np.sin(samples * freq))
data = noise + cw_signal
plt.plot(data)
plt.show()
plt.clf()

In [None]:
print(type(data))
print(data)

In [None]:
app = AppRunner('SimpleApplication.xclbin')

In [None]:
# Allocate app input and output buffers to exchange data with NPU
input_image = app.allocate(shape=(1, n_points), dtype=dt)
output_image = app.allocate(shape=(1, n_points), dtype=dt)

In [None]:
# Load RGBA 720p image into input_image buffer
input_image[:] = data

In [None]:
# Pass input_image buffer to NPU
input_image.sync_to_npu()

In [None]:
# Run app on NPU
app.call(input_image, output_image)

In [None]:
# Get results from NPU via output_image buffer 
output_image.sync_from_npu()

In [None]:
# Compare the arrays for equality
print(f"Are the arrays are equal?: {np.array_equal(data, output_image)}")
plt.plot(output_image)
plt.show()
plt.clf()

In [None]:
print(type(output_image))
print(output_image)

In [None]:
gg = np.zeros(shape=(1, n_points), dtype=dt)
gg[:] = output_image
gg = gg[0]
print(type(gg))
print(gg.shape)
print(gg.dtype)
print(gg)

In [None]:
plt.plot(gg)
plt.show()
plt.clf()

In [None]:
s = 0
for i in range(n_points):
    if data[i] != gg[i]:
        s += 1
print(s)

In [None]:
diff = np.subtract(data, gg)
print(type(diff))
print(diff.shape)
print(diff.dtype)
plt.plot(diff)
plt.show()
plt.clf()

In [None]:
del app