In [None]:
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
print(torch.cuda.get_arch_list())

print(f"Is CUDA supported by this system?{torch.cuda.is_available()}")
print(f"CUDA version: {torch.version.cuda}")
 
# Storing ID of current CUDA device
cuda_id = torch.cuda.current_device()
print(f"ID of current CUDA device:{torch.cuda.current_device()}")
       
print(f"Name of current CUDA device:{torch.cuda.get_device_name(cuda_id)}")

In [None]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from brevitas.nn import QuantConv2d, QuantReLU, QuantLinear
from brevitas.quant.scaled_int import Int8ActPerTensorFloat
from brevitas.quant.scaled_int import Int8WeightPerTensorFloat
from brevitas.quant.scaled_int import Int8BiasPerTensorFloatInternalScaling


class MobileNetV2(nn.Module):
    def __init__(self):
        super(MobileNetV2, self).__init__()
        myWeight_bit_width = 4
        act_bit_width = 4

        self.inverted_residual_setting = [
            # t, c, n, s
            [1, 16, 1, 1],
            [6, 24, 2, 2],
            [6, 32, 3, 2],
            [6, 64, 4, 2],
            [6, 96, 3, 1],
            [6, 160, 3, 2],
            [6, 320, 1, 1],
        ]
        self.conv1 = QuantConv2d(3, 32, kernel_size=3, stride=2, padding=1,  bias=True, input_quant=Int8ActPerTensorFloat,output_quant=Int8ActPerTensorFloat,
                        weight_quant=Int8WeightPerTensorFloat, bias_quant=Int8BiasPerTensorFloatInternalScaling, return_quant_tensor=True,weight_bit_width=myWeight_bit_width)
        self.relu = QuantReLU(bit_width=act_bit_width, max_val=6, act_quant=Int8ActPerTensorFloat, 
                      input_quant=Int8ActPerTensorFloat,output_quant=Int8ActPerTensorFloat,return_quant_tensor=True)
        self.features = self._make_layers(in_channels=32)
        self.conv_last = QuantConv2d(320, 1280, kernel_size=1,  bias=True, input_quant=Int8ActPerTensorFloat,output_quant=Int8ActPerTensorFloat,
                        weight_quant=Int8WeightPerTensorFloat, bias_quant=Int8BiasPerTensorFloatInternalScaling, return_quant_tensor=True,weight_bit_width=myWeight_bit_width)
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.classifier = QuantLinear(1280, 2,  bias=True, input_quant=Int8ActPerTensorFloat,output_quant=Int8ActPerTensorFloat,
                        weight_quant=Int8WeightPerTensorFloat, bias_quant=Int8BiasPerTensorFloatInternalScaling, return_quant_tensor=True,weight_bit_width=myWeight_bit_width)

    def _make_layers(self, in_channels):
        layers = []
        for t, c, n, s in self.inverted_residual_setting:
            for i in range(n):
                if i == 0:
                    layers.append(QuantBottleneck(in_channels, c, s, t))
                else:
                    layers.append(QuantBottleneck(in_channels, c, 1, t))
                in_channels = c
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.features(x)
        x = self.conv_last(x)
        x = self.relu(x)
        x = self.avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x


class QuantBottleneck(nn.Module):
    def __init__(self, in_planes, out_planes, stride, expansion):
        super(QuantBottleneck, self).__init__()
        
        myWeight_bit_width = 4
        act_bit_width = 4

        self.stride = stride
        planes = expansion * in_planes
        self.conv1 = QuantConv2d(in_planes, planes, kernel_size=1, bias=True, input_quant=Int8ActPerTensorFloat,output_quant=Int8ActPerTensorFloat,
                        weight_quant=Int8WeightPerTensorFloat, bias_quant=Int8BiasPerTensorFloatInternalScaling, return_quant_tensor=True,weight_bit_width=myWeight_bit_width)
        self.bn1 = nn.BatchNorm2d(planes)
        self.relu1 = QuantReLU(bit_width=act_bit_width, max_val=6, act_quant=Int8ActPerTensorFloat, 
                      input_quant=Int8ActPerTensorFloat,output_quant=Int8ActPerTensorFloat,return_quant_tensor=True)
        self.conv2 = QuantConv2d(planes, planes, kernel_size=3, stride=stride, padding=1, groups=planes, bias=True,
                                  weight_quant=Int8WeightPerTensorFloat, 
                                  act_quant=Int8ActPerTensorFloat)
        self.bn2 = nn.BatchNorm2d(planes)
        self.relu2 = QuantReLU(bit_width=act_bit_width, max_val=6, act_quant=Int8ActPerTensorFloat, 
                      input_quant=Int8ActPerTensorFloat,output_quant=Int8ActPerTensorFloat,return_quant_tensor=True)
        self.conv3 = QuantConv2d(planes, out_planes, kernel_size=1, bias=True, input_quant=Int8ActPerTensorFloat,output_quant=Int8ActPerTensorFloat,
                        weight_quant=Int8WeightPerTensorFloat, bias_quant=Int8BiasPerTensorFloatInternalScaling, return_quant_tensor=True,weight_bit_width=myWeight_bit_width)
        self.bn3 = nn.BatchNorm2d(out_planes)

        self.shortcut = nn.Sequential()
        if stride == 1 and in_planes != out_planes:
            self.shortcut = nn.Sequential(
                QuantConv2d(in_planes, out_planes, kernel_size=1, stride=stride,  bias=True, input_quant=Int8ActPerTensorFloat,output_quant=Int8ActPerTensorFloat,
                        weight_quant=Int8WeightPerTensorFloat, bias_quant=Int8BiasPerTensorFloatInternalScaling, return_quant_tensor=True,weight_bit_width=myWeight_bit_width),
                nn.BatchNorm2d(out_planes),
            )

    def forward(self, x):
        out = self.relu1(self.bn1(self.conv1(x)))
        out = self.relu2(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x) if self.stride == 1 else out
        return out

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_path = "trained_models/mobilenetv2_w4a4_model.pth"
model = MobileNetV2()
model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))


In [None]:
for name, param in model.named_parameters():
     print(f"Layer: {name}, Type: {param.dtype}, Size: {param.nelement()*param.element_size()} bytes")

In [None]:
from finn.util.visualization import showSrc, showInNetron
from finn.util.basic import make_build_dir
import os
    
build_dir = os.environ["FINN_ROOT"]


In [None]:
import onnx
import torch
from brevitas.export import export_qonnx
from qonnx.util.cleanup import cleanup as qonnx_cleanup

from qonnx.core.modelwrapper import ModelWrapper
from qonnx.core.datatype import DataType
from finn.transformation.qonnx.convert_qonnx_to_finn import ConvertQONNXtoFINN

model_for_export = model.cpu()

export_onnx_path = 'onnx_models/modbilenetv2_w4a4.onnx'
input_t = torch.randn(1, 3, 224, 224)

export_onnx(model_for_export,export_path = export_onnx_path,input_t = input_t)
qonnx_cleanup(export_onnx_path,out_file = export_onnx_path)

showInNetron(export_onnx_path)

In [None]:
#Wrap the onnx model to work with Finn

onnx_model = ModelWrapper(export_onnx_path)

export_finn_onnx_path = 'onnx_models/mobilenetv2_w4a4_finn.onnx'


model = model.transform(ConvertQONNXtoFINN())
model.save(export_finn_onnx_path)


In [None]:
showInNetron(export_finn_onnx_path)

In [None]:
#Transformations and Tidy up

from qonnx.transformation.general import GiveReadableTensorNames, GiveUniqueNodeNames, RemoveStaticGraphInputs
from qonnx.transformation.infer_shapes import InferShapes
from qonnx.transformation.infer_datatypes import InferDataTypes
from qonnx.transformation.fold_constants import FoldConstants

model = model.transform(InferShapes())
model = model.transform(FoldConstants())
model = model.transform(GiveUniqueNodeNames())
model = model.transform(GiveReadableTensorNames())
model = model.transform(InferDataTypes())
model = model.transform(RemoveStaticGraphInputs())

model.save("onnx_models/mobilenetv2_w4a4_tidy.onnx")
           

In [None]:
showInNetron("onnx_models/mobilenetv2_w4a4_tidy.onnx")

In [None]:
from finn.util.pytorch import ToTensor
from qonnx.transformation.merge_onnx_models import MergeONNXModels
from qonnx.core.datatype import DataType

model = ModelWrapper("onnx_models/mobilenetv2_w4a4_tidy.onnx")
global_inp_name = model.graph.input[0],name
ishape = model.get_tensor_shape(global_inp_name)

# preprocessing: torchvision's ToTensor divides uint8 inputs by 255

totensor_pyt = ToTensor()
chkpt_preproc_name = "onnx_models/mobilenetv2_w4a4_preproc.onnx"

export_qonnx(totensor_pyt, torch.randn(ishape), chkpt_preproc_name)
qonnx_cleanup(chkpt_preproc_name, out_file=chkpt_preproc_name)
pre_model = ModelWrapper(chkpt_preproc_name)
pre_model = pre_model.transform(ConvertQONNXtoFINN())


# join preprocessing and core model
model = model.transform(MergeONNXModels(pre_model))
# add input quantization annotation: UINT8 for all BNN-PYNQ models
global_inp_name = model.graph.input[0].name
model.set_tensor_datatype(global_inp_name, DataType["UINT8"])

model.save("onnx_models/mobilenetv2_w4a4_with_preproc.onnx")


In [None]:
showInNetron("onnx_models/mobilenetv2_w4a4_with_preproc.onnx")

In [None]:
from qonnx.transformation.insert_topk import InsertTopK

# postprocessing: insert Top-1 node at the end
model = model.transform(InsertTopK(k=1))
chkpt_name = "onnx_models/mobilenetv2_w4a4_pre_post.onnx"

# tidy-up again
model = model.transform(InferShapes())
model = model.transform(FoldConstants())
model = model.transform(GiveUniqueNodeNames())
model = model.transform(GiveReadableTensorNames())
model = model.transform(InferDataTypes())
model = model.transform(RemoveStaticGraphInputs())
model.save(chkpt_name)


In [None]:
showInNetron("onnx_models/mobilenetv2_w4a4_pre_post.onnx")

In [None]:
#Streamline the model

from finn.transformation.streamline import Streamline
from finn.transformation.streamline.reorder import MoveScalarLinearPastInvariants
import finn.transformation.streamline.absorb as absorb
#showSrc(Streamline)

model = ModelWrapper("onnx_models/mobilenetv2_w4a4_pre_post.onnx")
model.transform(MoveScalarLinearPastInvariants())

model = model.transform(Streamline())
model.save("onnx_models/mobilenetv2_w4a4_streamlined.onnx")

showInNetron("onnx_models/mobilenetv2_w4a4_streamlined.onnx")

In [None]:
from qonnx.transformation.bipolar_to_xnor import ConvertBipolarMatMulToXnorPopcount
from finn.transformation.streamline.round_thresholds import RoundAndClipThresholds
from qonnx.transformation.infer_data_layouts import InferDataLayouts
from qonnx.transformation.general import RemoveUnusedTensors

model = model.transform(ConvertBipolarMatMulToXnorPopcount())
model = model.transform(absorb.AbsorbAddIntoMultiThreshold())
model = model.transform(absorb.AbsorbMulIntoMultiThreshold())
# absorb final add-mul nodes into TopK
model = model.transform(absorb.AbsorbScalarMulAddIntoTopK())
model = model.transform(RoundAndClipThresholds())

# bit of tidy-up
model = model.transform(InferDataLayouts())
model = model.transform(RemoveUnusedTensors())

model.save("onnx_models/mobilenetv2_w4a4_ready_for_hw_conversion.onnx")
showInNetron("onnx_models/mobilenetv2_w4a4_ready_for_hw_conversion.onnx")

In [None]:
#Converion to HW layers

import finn.transformation.fpgadataflow.convert_to_hw_layers as to_hw

model = ModelWrapper("onnx_models/mobilenetv2_w4a4_ready_for_hw_conversion.onnx")
model = model.transform(to_hw.InferBinaryMatrixVectorActivation())
# TopK to LabelSelect
model = model.transform(to_hw.InferLabelSelectLayer())
# input quantization (if any) to standalone thresholding
model = model.transform(to_hw.InferThresholdingLayer())
model.save("onnx_models/mobilenetv2_w4a4_hw_layers.onnx")
showInNetron("onnx_models/mobilenetv2_w4a4_hw_layers.onnx")

In [None]:
from finn.transformation.fpgadataflow.create_dataflow_partition import CreateDataflowPartition

model = ModelWrapper("onnx_model/mobilenetv2_w4a4_hw_layers.onnx")
parent_model = model.transform(CreateDataflowPartition())

parent_model.save("onnx_models/mobilenetv2_w4a4_dataflow_parent.onnx")
showInNetron("onnx_models/mobilenetv2_w4a4_dataflow_parent.onnx")

In [None]:
from qonnx.custom_op.registry import getCustomOp

sdp_node = parent_model.get_nodes_by_op_type("StreamingDataflowPartition")[0]
sdp_node = getCustomOp(sdp_node)
dataflow_model_filename = sdp_node.get_nodeattr("model")
showInNetron(dataflow_model_filename)

In [None]:
model = ModelWrapper(dataflow_model_filename)

In [None]:
thresh_node = model.get_nodes_by_op_type("Thresholding")[0]
thresh_node_inst = getCustomOp(thresh_node)
thresh_node_inst.set_nodeattr("preferred_impl_style", "hls")

In [None]:
from finn.transformation.fpgadataflow.specialize_layers import SpecializeLayers
model = model.transform(SpecializeLayers())

model.save("onnx_models/mobilenetv2_w4a4_specialize_layers.onnx")
showInNetron("onnx_models/mobilenetv2_w4a4_specialize_layers.onnx")

In [None]:
fc0 = model.graph.node[1]
fc0w = getCustomOp(fc0)

print("CustomOp wrapper is of class " + fc0w.__class__.__name__)

fc0w.get_nodeattr_types()

In [None]:
fc_layers = model.get_nodes_by_op_type("MVAU_hls")
# (PE, SIMD, in_fifo_depth, out_fifo_depth, ramstyle) for each layer
config = [
    (16, 49, [16], [64], "block"),
    (8, 8, [64], [64], "auto"),
    (8, 8, [64], [64], "auto"),
    (10, 8, [64], [10], "distributed"),
]
for fcl, (pe, simd, ififo, ofifo, ramstyle) in zip(fc_layers, config):
    fcl_inst = getCustomOp(fcl)
    fcl_inst.set_nodeattr("PE", pe)
    fcl_inst.set_nodeattr("SIMD", simd)
    fcl_inst.set_nodeattr("inFIFODepths", ififo)
    fcl_inst.set_nodeattr("outFIFODepths", ofifo)
    fcl_inst.set_nodeattr("ram_style", ramstyle)
    
# set parallelism for input quantizer to be same as first layer's SIMD
inp_qnt_node = model.get_nodes_by_op_type("Thresholding_hls")[0]
inp_qnt = getCustomOp(inp_qnt_node)
inp_qnt.set_nodeattr("PE", 49)

In [None]:
model.save("onnx_models/mobilenetv2_w4a4_set_folding_factors.onnx")
showInNetron("onnx_models/mobilenetv2_w4a4_set_folding_factors.onnx")

In [None]:
#Evaluate the Inference Cost
#First, we have to export the model to Brevitas quantized variant of the ONNX interchange format.
#Radio-ml-challenge-21

from brevitas.export.onnx.generic.manager import BrevitasONNXManager
from finn.util.inference_cost import inference_cost
import json

export_inference_onnx_path = "util/inference_cost/model_export.onnx"
final_onnx_path = "util/inference_cost/model_final.onnx"
cost_dict_path = "util/inference_cost/model_cost.json"

inference_cost(export_inference_onnx_path, output_json=cost_dict_path, output_onnx=final_onnx_path,preprocess=True, discount_sparsity=True)

with open(cost_dict_path, 'r') as f:
    inference_cost_dict = json.load(f)

bops = int(inference_cost_dict["total_bops"])
w_bits = int(inference_cost_dict["total_mem_w_bits"])

bops_baseline = 807699904
w_bits_baseline = 1244936

score = 0.5*(bops/bops_baseline) + 0.5*(w_bits/w_bits_baseline)
print("Normalized inference cost score: %f" % score)