# Parameters

In [None]:
build_dir = '/workspace/build'
model_name = 'CNV'

# Whether to download an example model created by the FINN developers or
# use a custom model.onnx
download_example_model = False

# Whether to show each partial result in Netron
enable_netron = True

# Choose the memory mode for the MVTU units, decoupled or const
mem_mode = 'decoupled'

# Folding factors for the layers
# Each tuple is (PE, SIMD, in_fifo_depth) for a layer
# PE = Parallelization over Outputs
# SIMD = Parallelization over inputs
# Higher PE/SIMD equals a faster execution but also more FPGA ressource usage
folding = [
    (16, 3, 128),
    (32, 32, 128),
    (16, 32, 128),
    (16, 32, 128),
    (4, 32, 81),
    (1, 32, 2),
    (1, 4, 2),
    (1, 8, 128),
    (3, 1, 3),
]

# The following parameters are only required if deploying on a real board
pynq_type = 'Pynq-Z1'
pynq_port = 22
pynq_username = 'xilinx'
pynq_password = 'xilinx'
pynq_target_dir = '/home/xilinx/finn_network'

# Clock rate in nanoseconds (e.g. 10ns would be 100 Mhz)
target_clk_ns = 10

# Setup Workspace and Helper Functions

Before running make sure to execute **ONE** of the following steps:
* Put the exported model from Breviates into */workspace/build/* and change the filename to *model.onnx* (Parameter *download_example_model* must be *False*)
* Set the parameter *download_example_model* to *True*

(*/workspace/build/* is the root of the project repository on your host system)

In [None]:
import onnx
import brevitas.onnx as bo
from os import environ, makedirs, path
from finn.core.modelwrapper import ModelWrapper
from finn.util.test import get_test_model_trained
from finn.util.visualization import showSrc
from finn.util.basic import make_build_dir
from utils.notebook_helpers import eprint, show_in_netron

def getModelPath(suffix: str = None):
    if suffix:
        return str(path.join(build_dir, f'{model_name}_{suffix}.onnx'))
    return str(path.join(build_dir, f'{model_name}.onnx'))

if download_example_model:
    cnv = get_test_model_trained("CNV", 1, 1)
    bo.export_finn_onnx(cnv, (1, 3, 32, 32), getModelPath())
if not path.isfile(getModelPath()):
    eprint(f'The file "{getModelPath()}" is missing!')
    
pynq_ip = environ.get('PYNQ_IP', None)
if not pynq_ip:
    eprint('The environment variable PYNQ_IP is not set!')
    eprint('Deployment on the physical board will NOT work!')
    
show_in_netron(getModelPath())

# Tidy-Up input model

In [None]:
from finn.transformation.infer_shapes import InferShapes
from finn.transformation.fold_constants import FoldConstants
from finn.transformation.general import GiveReadableTensorNames, GiveUniqueNodeNames, RemoveStaticGraphInputs

model = ModelWrapper(getModelPath())
model = model.transform(InferShapes())
model = model.transform(FoldConstants())
model = model.transform(GiveUniqueNodeNames())
model = model.transform(GiveReadableTensorNames())
model = model.transform(RemoveStaticGraphInputs())
model.save(getModelPath(suffix='tidy'))

show_in_netron(getModelPath(suffix='tidy'))

## Adding Pre- and Postprocessing

In [None]:
import brevitas.onnx as bo
from finn.util.pytorch import ToTensor
#from finn.transformation.insert_topk import InsertTopK
from finn.transformation.infer_datatypes import InferDataTypes
from finn.transformation.merge_onnx_models import MergeONNXModels
from finn.core.datatype import DataType

model = ModelWrapper(getModelPath(suffix='tidy'))
global_inp_name = model.graph.input[0].name
ishape = model.get_tensor_shape(global_inp_name)
# preprocessing: torchvision's ToTensor divides uint8 inputs by 255
totensor_pyt = ToTensor()
chkpt_preproc_name = getModelPath(suffix='prepoc')
bo.export_finn_onnx(totensor_pyt, ishape, chkpt_preproc_name)

# join preprocessing and core model
pre_model = ModelWrapper(chkpt_preproc_name)
model = model.transform(MergeONNXModels(pre_model))
# add input quantization annotation: UINT8 for all BNN-PYNQ models
global_inp_name = model.graph.input[0].name
model.set_tensor_datatype(global_inp_name, DataType.UINT8)

# postprocessing: insert Top-1 node at the end
#model = model.transform(InsertTopK(k=1))
# tidy-up again
model = model.transform(InferShapes())
model = model.transform(FoldConstants())
model = model.transform(GiveUniqueNodeNames())
model = model.transform(GiveReadableTensorNames())
model = model.transform(InferDataTypes())
model = model.transform(RemoveStaticGraphInputs())
model.save(getModelPath(suffix='pre_post'))

show_in_netron(getModelPath(suffix='pre_post'))

# Lowering and Streamlining

In [None]:
from finn.transformation.streamline import Streamline
from finn.transformation.lower_convs_to_matmul import LowerConvsToMatMul
from finn.transformation.bipolar_to_xnor import ConvertBipolarMatMulToXnorPopcount
import finn.transformation.streamline.absorb as absorb
from finn.transformation.streamline.reorder import MakeMaxPoolNHWC, MoveScalarLinearPastInvariants
from finn.transformation.infer_data_layouts import InferDataLayouts
from finn.transformation.general import RemoveUnusedTensors

model = ModelWrapper(getModelPath(suffix='pre_post'))
model = model.transform(MoveScalarLinearPastInvariants())
model = model.transform(Streamline())
model = model.transform(LowerConvsToMatMul())
model = model.transform(MakeMaxPoolNHWC())
model = model.transform(absorb.AbsorbTransposeIntoMultiThreshold())
model = model.transform(ConvertBipolarMatMulToXnorPopcount())
model = model.transform(Streamline())
# absorb final add-mul nodes into TopK
#model = model.transform(absorb.AbsorbScalarMulAddIntoTopK())
model = model.transform(InferDataLayouts())
model = model.transform(RemoveUnusedTensors())
model.save(getModelPath(suffix='streamlined'))

show_in_netron(getModelPath(suffix='streamlined'))

## Partitioning, Conversion to HLS Layers and Folding

In [None]:
import finn.transformation.fpgadataflow.convert_to_hls_layers as to_hls
from finn.transformation.fpgadataflow.create_dataflow_partition import CreateDataflowPartition
from finn.transformation.move_reshape import RemoveCNVtoFCFlatten
from finn.custom_op.registry import getCustomOp
from finn.transformation.infer_data_layouts import InferDataLayouts

model = ModelWrapper(getModelPath(suffix='streamlined'))
model = model.transform(to_hls.InferBinaryStreamingFCLayer(mem_mode))
model = model.transform(to_hls.InferQuantizedStreamingFCLayer(mem_mode))
# TopK to LabelSelect
#model = model.transform(to_hls.InferLabelSelectLayer())
# input quantization (if any) to standalone thresholding
model = model.transform(to_hls.InferThresholdingLayer())
model = model.transform(to_hls.InferConvInpGen())
model = model.transform(to_hls.InferStreamingMaxPool())
# get rid of Reshape(-1, 1) operation between hlslib nodes
model = model.transform(RemoveCNVtoFCFlatten())
# get rid of Tranpose -> Tranpose identity seq
model = model.transform(absorb.AbsorbConsecutiveTransposes())
# infer tensor data layouts
model = model.transform(InferDataLayouts())
parent_model = model.transform(CreateDataflowPartition())
parent_model.save(getModelPath(suffix='dataflow_parent'))

sdp_node = parent_model.get_nodes_by_op_type("StreamingDataflowPartition")[0]
sdp_node = getCustomOp(sdp_node)
dataflow_model_filename = sdp_node.get_nodeattr("model")
# save the dataflow partition with a different name for easier access
model = ModelWrapper(dataflow_model_filename)
model.save(getModelPath(suffix='dataflow_model'))

fc_layers = model.get_nodes_by_op_type("StreamingFCLayer_Batch")

for fcl, (pe, simd, ififodepth) in zip(fc_layers, folding):
    fcl_inst = getCustomOp(fcl)
    fcl_inst.set_nodeattr("PE", pe)
    fcl_inst.set_nodeattr("SIMD", simd)
    fcl_inst.set_nodeattr("inFIFODepth", ififodepth)

# use same SIMD values for the sliding window operators
swg_layers = model.get_nodes_by_op_type("ConvolutionInputGenerator")
for i in range(len(swg_layers)):
    swg_inst = getCustomOp(swg_layers[i])
    simd = folding[i][1]
    swg_inst.set_nodeattr("SIMD", simd)

model = model.transform(GiveUniqueNodeNames())
model.save(getModelPath(suffix='folded'))

show_in_netron(getModelPath(suffix='folded'))

# Hardware Generation

**This will take a long time! Expect it to take more than 30 minutes to finish!**

In [None]:
from finn.transformation.fpgadataflow.make_zynq_proj import ZynqBuild

model = ModelWrapper(getModelPath(suffix='folded'))
model = model.transform(ZynqBuild(platform = pynq_type, period_ns = target_clk_ns))
model.save(getModelPath(suffix='synth'))

show_in_netron(getModelPath(suffix='synth'))

# Deployment

**This will only work if the physical target board is present under the specified IP!**

In [None]:
from finn.transformation.fpgadataflow.make_deployment import DeployToPYNQ

if not pynq_ip:
    eprint('Pynq IP not configured. This will fail!')

# FINN will use ssh to deploy and run the generated accelerator
model = ModelWrapper(getModelPath(suffix='synth'))
model = model.transform(DeployToPYNQ(pynq_ip, pynq_port, pynq_username, pynq_password, pynq_target_dir))
model.save(getModelPath(suffix='pynq_deploy'))

target_dir = pynq_target_dir + "/" + model.get_metadata_prop("pynq_deployment_dir").split("/")[-1]

print(f'Target directory is "{target_dir}".')
print('It contains the following files:')
! sshpass -p {pynq_password} ssh {pynq_username}@{pynq_ip} -p {pynq_port} 'ls -l {target_dir}'

# Remote Execution

**This will only work if the physical target board is present under the specified IP!**

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from finn.core.onnx_exec import execute_onnx

if not pynq_ip:
    eprint('Pynq IP not configured. This will fail!')

fn = '/workspace/finn/src/finn/qnn-data/cifar10/cifar10-test-data-class3.npz'
x = np.load(fn)["arr_0"]
x = x.reshape(3, 32, 32).transpose(1, 2, 0)
plt.imshow(x)

model = ModelWrapper(getModelPath(suffix='pynq_deploy'))
iname = model.graph.input[0].name
oname = model.graph.output[0].name
ishape = model.get_tensor_shape(iname)
input_dict = {iname: x.astype(np.float32).reshape(ishape)}
ret = execute_onnx(model, input_dict, True)

print(f'Result: {ret[oname]}')

# Accuracy Validation


**This will only work if the physical target board is present under the specified IP!**

Install dataset_loading (only required to be executed once)

In [None]:
if not pynq_ip:
    eprint('Pynq IP not configured. This will fail!')

! sshpass -p {pynq_password} ssh -t {pynq_username}@{pynq_ip} -p {pynq_port} 'echo {pynq_password} | sudo -S pip3 install git+https://github.com/fbcotter/dataset_loading.git@0.0.4#egg=dataset_loading'

Execute generated validation script

In [None]:
if not pynq_ip:
    eprint('Pynq IP not configured. This will fail!')

! sshpass -p {pynq_password} ssh -t {pynq_username}@{pynq_ip} -p {pynq_port} 'cd {target_dir}; echo {pynq_password} | sudo -S python3.6 validate.py --dataset cifar10 --batchsize 1000'