In [1]:
# !pip3 install brevitas -q

In [1]:
import sys, os, shutil, json
import cv2 as cv
import matplotlib.pyplot as plt
import torch
from torch.nn import Module, Conv2d, ReLU6, BatchNorm2d, MaxPool2d
# from brevitas_examples.bnn_pynq.models.common import CommonWeightQuant, CommonActQuant
# import brevitas.nn as qnn 
# import brevitas.quant as quant 
# from brevitas.core.restrict_val import RestrictValueType

sys.path.append('..')
# from .. 
import training,preprocessing, utils, metrics
import numpy as np
metrics.CONSTANTS.OLD_TORCH = True
metrics.CONSTANTS.BBOX_BORDER_WIDTH = 1

preprocessing.YoloDataGenerator.NAIVE_RESIZE = True
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
dataset_local_path = '../../DATASETS/Merged_dataset'
folds = training.load_folds('../folds_state_path_bbox.pkl')
batch_size = 1

In [3]:
class Clamp(torch.nn.Module):
    
    def __init__(self,ch_in) -> None:
        super().__init__()
        self.ch_in = ch_in
        torch_type = torch.float32
        self.c1 = torch.tensor(1,dtype=torch_type)
        self.nc1 = torch.tensor(-1,dtype=torch_type)
        self.nc2 = torch.tensor(-2,dtype=torch_type)
        self.r = torch.nn.ReLU()
    
    def add_conv(self):
        ch_in = ch_out = self.ch_in
        self.conv1 = Conv2d(ch_in,ch_out,1,groups=self.ch_in,padding=0,bias=True)
        self.conv2 = Conv2d(ch_in,ch_out,1,groups=self.ch_in,padding=0,bias=True)
        self.conv3 = Conv2d(ch_in,ch_out,1,groups=self.ch_in,padding=0,bias=True)
        
        with torch.no_grad():
            self.conv1.weight[:] = 1.0
            self.conv1.bias[:] = 1.0
            self.conv2.weight[:] = 1.0
            self.conv2.bias[:] = -1.0
            self.conv3.weight[:] = -1.0
            self.conv3.bias[:] = -1.0
    
    def forward(self,x):
        v1 = self.r(self.conv1(x))
        v2 = self.r(self.conv2(x))
        v3 = self.conv3(v2)
        x = v1+v3
        
        return x

# use_brevitas = True
use_brevitas = False
def blk(ch_in,ch_out,clamp=True,mp=True):
    if use_brevitas:
        import brevitas.nn as qnn
        from brevitas_examples.bnn_pynq.models.common import CommonActQuant, CommonWeightQuant
        L = [qnn.QuantConv2d(ch_in,ch_out,3,padding=1,bias=False,weight_quant=CommonWeightQuant, weight_bit_width=8),
            BatchNorm2d(ch_out)]
        if clamp:
            L.append(qnn.QuantIdentity(act_quant=CommonActQuant,bit_width=8))
        if mp:
            L.append(qnn.QuantMaxPool2d(2,2))
        return tuple(L)
    else:
        L = [Conv2d(ch_in,ch_out,3,padding=1,bias=False),
            BatchNorm2d(ch_out)]
        if clamp:
            L.append(Clamp(ch_out))
        if mp:
            L.append(MaxPool2d(2,2))
        return tuple(L)

def input_layer():
    if use_brevitas:
        import brevitas.nn as qnn
        from brevitas_examples.bnn_pynq.models.common import CommonActQuant, CommonWeightQuant
        from brevitas.inject.enum import RestrictValueType
        
        return tuple([qnn.QuantIdentity( # for Q1.7 input format
                act_quant=CommonActQuant,
                bit_width=8,
                min_val=-1.0,
                # max_val=1.0,
                max_val=1.0,
                narrow_range=False,
                restrict_scaling_type=RestrictValueType.POWER_OF_TWO)])
    else:
        return tuple([])

# float 
net = torch.nn.Sequential(
    # *input_layer(),
    *blk(3,16),
    *blk(16,32),
    *blk(32,64),
    *blk(64,64),
    *blk(64,64,mp=False),
    *blk(64,30,mp=False,clamp=False),
    
).to(device)

sd = torch.load('best.pt',map_location=device)['model']
v1 = list(sd.values())
k2 = list(net.state_dict().keys()) # net keys
    
net_sd = {k:v for k,v in zip(k2,v1)}
net.load_state_dict(net_sd)

for m in net.modules():
    if isinstance(m,Clamp):
            m.add_conv()
net.to(device)
net = net.eval()
net = net.train(False)


# for k,v in net.state_dict().items():
#     print(k,v.shape)
#     if 'conv' in k:
#         print(v.flatten())

In [4]:
image_shape = (160, 320, 3)

after_load = preprocessing.numpy_to_torch_iou_params(device)
to_anchors_single = lambda *x: preprocessing.to_anchors_for_iou_loss(*x,False,False)
# to_anchors_multi = lambda *x: preprocessing.to_anchors_for_iou_loss(*x,True,False)

anchors = [
    	[10.762251, 13.063103],
        [25.158768, 42.200066],
        [19.567272, 25.438337],
        [91.87796, 35.945087],
        [38.639523, 69.15513]
    ],
anchors = np.array(anchors).reshape((-1,2))

# pass example tensor
torch.cuda.empty_cache()
tensor = torch.rand((8,3,)+image_shape[:2]).to(device)
print("Input shape =",tensor.shape)
with torch.no_grad():
    result = net(tensor)
print("Result shape =",result.shape)

# get yolo paremeters
output_sizes = np.array(result.shape[2:][::-1])
del tensor
del result

print("Anchors: ")
print(anchors) 
print("Output sizes: ")
print(output_sizes) 

# CREATE GENERATORS
def numpy_to_tensor(X,y,device=device):
    return utils.data_to_tensor_v3(X,y,device)

val_generator = preprocessing.YoloDataGenerator(
                            dataset_local_path,
                            input_shape=image_shape,
                            anchors=anchors,
                            images_labes=[], 
                            batch_size=batch_size,
                            name='ValGenerator', 
                            augmentator=None,
                            output_size=output_sizes,
                            after_load=after_load,
                            bbox_to_anchors=to_anchors_single,
                            )
test_generator = preprocessing.YoloDataGenerator(
                            dataset_local_path,
                            input_shape=image_shape,
                            anchors=anchors,
                            images_labes=[], 
                            batch_size=batch_size,
                            name='TestGenerator', 
                            augmentator=None,
                            output_size=output_sizes,
                            after_load=after_load,
                            bbox_to_anchors=to_anchors_single,
                            )

_, val_set = folds.__getitem__(0, train_folds=4)
val_generator.images_labes = val_set
test_generator.images_labes = folds.test_set[:3000]

# decorator -> reorder channels before metric calculation 
metric_iou = metrics.SingleObjectIOUsBasedMetrics(anchors, image_shape, device)
def mean_iou(y_pred:torch.Tensor, y_ref, metric_iou=metric_iou):
    # possible reorder
    y_pred = y_pred.view(-1,5,6,10,20).permute(0,2,1,3,4).reshape(-1,30,10,20)
    # y_pred = y_pred[:,:25,...].contiguous()
    y_pred = torch.cat([y_pred[:,20:25,...],y_pred[:,:20,...]],dim=1)
    
    return metric_iou(y_pred, y_ref)



Input shape = torch.Size([8, 3, 160, 320])
Result shape = torch.Size([8, 30, 10, 20])
Anchors: 
[[10.762251 13.063103]
 [25.158768 42.200066]
 [19.567272 25.438337]
 [91.87796  35.945087]
 [38.639523 69.15513 ]]
Output sizes: 
[20 10]


In [5]:
import torch
import torch.nn as nn


def evaluate(model,
             dataloader,
             evaluator
             ):
    with torch.no_grad():
        score = 0.0
        cntr = 0
        for i in range(len(dataloader)):
            XY = dataloader[i]
            X = XY[0]*2-1
            Y = XY[1]
            L = X.shape[0]
            y_pred = model(X)
            score = score*cntr + X.shape[0]*evaluator(y_pred, Y)
            cntr += X.shape[0]
            score /= cntr
            print("\rEvaluation {}/{}. Score = {}".format(i,len(dataloader), score),end='')
        
        print("\rEvaluation {}/{}. Score = {}".format(len(dataloader),len(dataloader), score),end='\n')


def quantize(float_model:torch.nn.Module, 
             input_shape:tuple,
             quant_dir:str, 
             quant_mode:str, 
             device:torch.device,
             dataloader,
             evaluator):
    """
    :param float_model: float model with loaded weights
    :param input_shape: shape of input(CH,W,H)
    :param quant_dir: path to directory with quantized model components
    :param quant_mode: quant_mode in ['calib', 'test'] 
    :param data_loader: data_loader target is not needed - for 'calib' must be batch_size == 1
    :param evaluator: fcn/obj like: fcn(y_pred, y_ref) -> float 
    """
    # available in docker or after packaging 
    # vitis-AI-tools/..../pytorch../pytorch_nndct
    # and installing the package
    from pytorch_nndct.apis import torch_quantizer, dump_xmodel
    # model to device
    model = float_model.to(device)
    
    # That was present in vai tutorial.
    # I don't know if it affects to anything?
    # Force to merge BN with CONV for better quantization accuracy
    optimize = 1

    rand_in = torch.randn((1,)+input_shape[-1:]+input_shape[:2])
    print("get qunatizer start")
    try:
        quantizer = torch_quantizer(
            quant_mode, model, rand_in, output_dir=quant_dir, device=device)
    except Exception as e:
        print("exception:")
        print(e)
        return
    print("get qunatizer end")
        
    print("get quantized model start")
    quantized_model = quantizer.quant_model
    print("get quantized model end")

    # evaluate
    print("testing st")
    evaluate(quantized_model, dataloader, evaluator)
    print("testing end")

    # export config
    if quant_mode == 'calib':
        print("export config")
        quantizer.export_quant_config()
        print("export config end")
    # export model
    if quant_mode == 'test':
        print("export xmodel")
        quantizer.export_xmodel(deploy_check=False, output_dir=quant_dir)
        print("export xmodel end")

    return

"""
Needed to run of quantize.
first with quant_mode = 'calib'
second with quant_mode = 'test'
"""


"\nNeeded to run of quantize.\nfirst with quant_mode = 'calib'\nsecond with quant_mode = 'test'\n"

In [6]:
# Use only subset of val set
# set whole dataset
val_generator.images_labes = val_set
# shuffle samples
val_generator.on_epoch_end()
# get subset (100) of samples
subset = val_generator.images_labes[:200]
val_generator.images_labes = subset
# process only one image per forward
val_generator.batch_size = 1
test_generator.batch_size = 1

In [7]:
# Evaluate float model on test dataset
# test_generator.images_labes = ds['Bird1']
evaluate(net,test_generator,evaluator=mean_iou)
# Evaluate float model on val dataset
evaluate(net,val_generator,evaluator=mean_iou)


Evaluation 3000/3000. Score = 0.7209295767936862
Evaluation 200/200. Score = 0.7442172173783184


In [8]:
# Quantize model - calib
quantize(net, 
         image_shape,
         quant_dir='quant_dir',
         quant_mode='calib',
         device=device,
         dataloader=val_generator,
         evaluator=mean_iou)

No CUDA runtime is found, using CUDA_HOME='/usr/local/cuda'

[0;32m[NNDCT_NOTE]: Loading NNDCT kernels...[0m
get qunatizer start

[0;32m[NNDCT_NOTE]: Quantization calibration process start up...[0m

[0;32m[NNDCT_NOTE]: =>Quant Module is in 'cpu'.[0m

[0;32m[NNDCT_NOTE]: =>Parsing Sequential...[0m

[0;32m[NNDCT_NOTE]: =>Doing weights equalization...[0m

[0;32m[NNDCT_NOTE]: =>Quantizable module is generated.(quant_dir/Sequential.py)[0m
get qunatizer end
get quantized model start

[0;32m[NNDCT_NOTE]: =>Get module with quantization.[0m
get quantized model end
testing st
Evaluation 200/200. Score = 0.7024353872239587
testing end
export config

[0;32m[NNDCT_NOTE]: =>Exporting quant config.(quant_dir/quant_info.json)[0m
export config end


In [13]:

try:
    # Quantize model - test
    quantize(net, 
             image_shape,
             quant_dir='quant_dir',
             quant_mode='test',
             device=device,
#              dataloader=val_generator,
             dataloader=test_generator,
             evaluator=mean_iou)
except Exception as e:
    print(e)
except:
    print("XD")

get qunatizer start

[0;32m[NNDCT_NOTE]: Quantization test process start up...[0m

[0;32m[NNDCT_NOTE]: =>Quant Module is in 'cpu'.[0m

[0;32m[NNDCT_NOTE]: =>Parsing Sequential...[0m

[0;32m[NNDCT_NOTE]: =>Doing weights equalization...[0m

[0;32m[NNDCT_NOTE]: =>Quantizable module is generated.(quant_dir/Sequential.py)[0m
get qunatizer end
get quantized model start

[0;32m[NNDCT_NOTE]: =>Get module with quantization.[0m
get quantized model end
testing st
Evaluation 3000/3000. Score = 0.6688202833534429
testing end
export xmodel

[0;32m[NNDCT_NOTE]: =>Converting to xmodel ...[0m

[0;32m[NNDCT_NOTE]: =>Successfully convert 'Sequential' to xmodel.(quant_dir/Sequential_int.xmodel)[0m
export xmodel end


In [14]:
!vai_c_xir --xmodel quant_dir/Sequential_int.xmodel --arch arch.json --net_name FINN_VAI --output_dir  build_test

**************************************************
* VITIS_AI Compilation - Xilinx Inc.
**************************************************
[UNILOG][INFO] The compiler log will be dumped at "/tmp/vitis-ai-user/log/xcompiler-20220223-090245-285893"
[UNILOG][INFO] Compile mode: dpu
[UNILOG][INFO] Debug mode: function
[UNILOG][INFO] Target architecture: DPUCZDX8G_CUSTOMIZED
[UNILOG][INFO] Graph name: Sequential, with op num: 156
[UNILOG][INFO] Begin to compile...
[UNILOG][INFO] Total device subgraph number 3, DPU subgraph number 1
[UNILOG][INFO] Compile done.
[UNILOG][INFO] The meta json is saved to "/workspace/TRAIN/Vitis_AI_FINN/build_test/meta.json"
[UNILOG][INFO] The compiled xmodel is saved to "/workspace/TRAIN/Vitis_AI_FINN/build_test/FINN_VAI.xmodel"
[UNILOG][INFO] The compiled xmodel's md5sum is 33510f34a2ed2650c4bf52b080448e01, and been saved to "/workspace/TRAIN/Vitis_AI_FINN/build_test/md5sum.txt"


In [106]:
# CREATE TEST EVAL SET
subset = folds.test_set[:3000]
d = {}
generator = preprocessing.YoloDataGenerator(
                            dataset_local_path,
                            input_shape=image_shape,
                            anchors=anchors,
                            images_labes=subset, 
                            batch_size=1,
                            name='Generator', 
                            augmentator=None,
                            output_size=output_sizes,
                            after_load=after_load,
                            bbox_to_anchors=to_anchors_single,
                            )

for i in range(len(generator)):
    print("\rImages",i,'/',len(generator),end='')
    img,Y = generator[i]
    img = img.reshape(3,-1).numpy().T.reshape((160,320,3))
    img = (img*255).astype(np.uint8)
    bbox = np.round(Y[0].reshape(-1).numpy()).astype(int).flatten().tolist()
    
    new_path = 'images/img_'+(str(i).zfill(4))+'.png'
    dst = os.path.join('eval_images_finn/'+new_path)
    bbox = {k:v for k,v in zip('ltrb',bbox)}
    d[str(i)] = {'path':new_path,'bbox':bbox}
    cv.imwrite(dst, img)
    
print("\rImages",len(generator),'/',len(generator))
# print(d)
with open('eval_images_finn/gt.json','w') as f:
    f.write(json.dumps(d, indent=4,))
    print('Groundtruth is saved under:','eval_images_finn/gt.json')

Images 3000 / 3000
Groundtruth is saved under: eval_images_finn/gt.json
