In [63]:
import cv2
import numpy as np

import tvm
from tvm import relay
from tvm.contrib import graph_executor
from tvm.contrib.download import download_testdata

import torch
import torchvision
from tqdm import tqdm

In [64]:
import tvm
from tvm import relay
from tvm.contrib import graph_executor
import numpy as np

class NeuronCoverageTrackerTVM:
    def __init__(self, mod, params, dummy_data, target_ops=["nn.conv2d", "nn.dense"], threshold=0.9, opt_level=1, device="cpu", input_name="input0"):
        self.mod = mod
        self.params = params
        self.target_ops = set(target_ops)
        self.threshold = threshold
        self.opt_level = opt_level
        self.device = device
        self.input_name = input_name

        self.target = "llvm" if self.device == "cpu" else "cuda"
        print("build model...")
        self.model = self._build_model()
        print("build intermediate layers...")
        self.relay_layer_functions, self.layer_mods, self.layer_models = self._get_model_layers()
        print("get the output sizes of each layer...")
        self.output_shape_of_layers = self._get_layer_output_sizes(dummy_data)
        self.cov_tracker = {}

    def _build_model(self):
        with tvm.transform.PassContext(opt_level=self.opt_level):
            lib = relay.build(self.mod, target=self.target, params=self.params)
        return graph_executor.GraphModule(lib["default"](tvm.device(self.target, 0)))

    def _get_model_layers(self):
        set_layers = set({})
        layers = []
        
        def _traverse_expr(expr, set_layers, layers):
            if isinstance(expr, relay.expr.Call):
                if expr.op.name in self.target_ops:
                    if expr not in set_layers:
                        set_layers |= {expr}
                        layers.append(expr)
                for arg in expr.args:
                    _traverse_expr(arg, set_layers, layers)
            elif isinstance(expr, relay.expr.TupleGetItem):
                for arg in expr.tuple_value.args:
                    _traverse_expr(arg, set_layers, layers)
                    
        main_func = self.mod["main"]
        expr = main_func.body
        _traverse_expr(expr, set_layers, layers)
        
        relay_layer_functions =  [relay.Function(main_func.params, layer) for layer in layers]
        layer_mods = [tvm.IRModule.from_expr(rlf) for rlf in relay_layer_functions]
        layer_libs = []
        for lm in tqdm(layer_mods):
            with tvm.transform.PassContext(opt_level=self.opt_level):
                lib = relay.build(lm, target=self.target, params=self.params)
                layer_libs.append(lib)
        layer_models = [graph_executor.GraphModule(lib["default"](tvm.device(self.target, 0))) for lib in layer_libs]
        return relay_layer_functions, layer_mods, layer_models

    def _get_layer_output_sizes(self, dummy_data):
        output_shapes = []
        
        for tmp_model in self.layer_models:
            tmp_model.set_input(self.input_name, tvm.nd.array(input_data))
            tmp_model.run()
            output_shapes.append(tmp_model.get_output(0).asnumpy().shape)

        return output_shapes

    def _get_intermediate_outputs(self, data, layer_idx):
        tmp_model = self.layer_models[layer_idx]
        tmp_model.set_input(self.input_name, tvm.nd.array(input_data))
        tmp_model.run()
        return tmp_model.get_output(0).asnumpy()

    def _init_cov_tracker(self):
        for i, layer_shape in enumerate(self.output_shape_of_layers):
            self.cov_tracker[i] = np.zeros(layer_shape, dtype=bool)

    def _update_cov_tracker(self, x):
        for i in range(len(self.layer_models)):
            layer_output = self._get_intermediate_outputs(x, i)
            layer_output_scaled = self._scale(layer_output)
            mask_index = layer_output_scaled > self.threshold
            self.cov_tracker[i] |= mask_index

    def _scale(self, out, rmax=1, rmin=0):
        out_std = (out - out.min()) / (out.max() - out.min())
        return out_std * (rmax - rmin) + rmin

    def coverage(self, dataloader, pos_of_x=None, initialize=True, update=True):
        if initialize:
            self._init_cov_tracker()
        if not update:
            cov_tracker_prev = self.cov_tracker.copy()

        for data in dataloader:
            x = data if pos_of_x is None else data[pos_of_x]
            x = np.array(x)
            self._update_cov_tracker(x)

        num_covered_neurons = sum([np.sum(is_covered) for is_covered in self.cov_tracker.values()])
        num_total_neurons = sum([len(is_covered.reshape(-1)) for is_covered in self.cov_tracker.values()])

        if not update:
            self.cov_tracker = cov_tracker_prev

        return num_covered_neurons / num_total_neurons

In [65]:
model_name = "resnet18"
model = getattr(torchvision.models, model_name)(pretrained=True)
model = model.eval()

# We grab the TorchScripted model via tracing
input_shape = [1, 3, 224, 224]
dummy_data = torch.randn(input_shape)
scripted_model = torch.jit.trace(model, dummy_data).eval()

input_name = "input0"
shape_list = [(input_name, input_shape)]
mod, params = relay.frontend.from_pytorch(scripted_model, shape_list)

#print(model.maxpool(model.bn1(model.conv1(input_data))).shape)
#print(model.layer1(model.maxpool(model.bn1(model.conv1(input_data)))).shape)
#print(model.layer2(model.layer1(model.maxpool(model.bn1(model.conv1(input_data))))).shape)
#print(model.layer3(model.layer2(model.layer1(model.maxpool(model.bn1(model.conv1(input_data)))))).shape)
#print(model.layer4(model.layer3(model.layer2(model.layer1(model.maxpool(model.bn1(model.conv1(input_data))))))).shape)
#print(model.avgpool(model.layer4(model.layer3(model.layer2(model.layer1(model.maxpool(model.bn1(model.conv1(input_data)))))))).shape)

In [66]:
nct = NeuronCoverageTrackerTVM(mod, params, dummy_data)

build model...
build intermediate layers...


100%|███████████████████████████████████████████████████████████████████████████████████| 21/21 [01:10<00:00,  3.36s/it]


get the output sizes of each layer...


In [67]:
img_url = (
    "https://raw.githubusercontent.com/dmlc/web-data/master/gluoncv/detection/street_small.jpg"
)
img_path = download_testdata(img_url, "test_street_small.jpg", module="data")

img = cv2.imread(img_path).astype("float32")
img = cv2.resize(img, (input_shape[-2], input_shape[-1]))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = np.transpose(img / 255.0, [2, 0, 1])
img = np.expand_dims(img, axis=0)

In [68]:
nct.coverage([img])

0.0008061296439989826

In [57]:
sum([np.sum(is_covered) for is_covered in nct.cov_tracker.values()])

2003

In [62]:
sum([np.sum(is_covered) for is_covered in nct.cov_tracker.values()]) / sum([len(is_covered.reshape(-1)) for is_covered in nct.cov_tracker.values()])

0.0008061296439989826

In [60]:
for is_covered in nct.cov_tracker.values():
    print(is_covered.reshape(-1))

(1, 1000)
(1, 512, 7, 7)
(1, 512, 7, 7)
(1, 512, 7, 7)
(1, 512, 7, 7)
(1, 256, 14, 14)
(1, 256, 14, 14)
(1, 256, 14, 14)
(1, 256, 14, 14)
(1, 128, 28, 28)
(1, 128, 28, 28)
(1, 128, 28, 28)
(1, 128, 28, 28)
(1, 64, 56, 56)
(1, 64, 56, 56)
(1, 64, 56, 56)
(1, 64, 56, 56)
(1, 64, 112, 112)
(1, 128, 28, 28)
(1, 256, 14, 14)
(1, 512, 7, 7)


In [11]:
target = "llvm"
with tvm.transform.PassContext(opt_level=1):
    lib = relay.build(mod, target=target, params=params)
model = graph_executor.GraphModule(lib["default"](tvm.device(target, 0)))

One or more operators have not been tuned. Please tune your model for better performance. Use DEBUG logging level to see more details.


In [12]:
main_func = mod["main"]
expr = main_func.body
set_layers = set({})
layers = []

def traverse_expr(expr):
    global set_layers, layers
    if isinstance(expr, relay.expr.Call):
        if expr.op.name == "nn.conv2d" or expr.op.name == "nn.dense":
            if expr not in set_layers:
                set_layers |= {expr}
                layers.append(expr)
        for arg in expr.args:
            traverse_expr(arg)
            
    elif isinstance(expr, relay.expr.TupleGetItem):
        for arg in expr.tuple_value.args:
            traverse_expr(arg)

traverse_expr(expr)

In [13]:
assert len(layers) == 21

In [14]:
relay_layer_functions = [relay.Function(main_func.params, layer) for layer in layers]
layer_mods = [tvm.IRModule.from_expr(rlf) for rlf in relay_layer_functions]
layer_libs = []

target = "llvm" #if self.device == "cpu" else "cuda"
for lm in layer_mods:
    with tvm.transform.PassContext(opt_level=1):
        lib = relay.build(lm, target=target, params=params)
        layer_libs.append(lib)

In [15]:
layer_models = [graph_executor.GraphModule(lib["default"](tvm.device(target, 0))) for lib in layer_libs]

In [16]:
dummy_output_list = []

for tmp_model in layer_models:
    tmp_model.set_input("input0", tvm.nd.array(input_data))
    tmp_model.run()
    dummy_output_list.append(tmp_model.get_output(0).asnumpy())

In [19]:
output_sizes = []
for do in dummy_output_list:
    output_sizes.append(do.shape[1:])

In [17]:
for do in dummy_output_list:
    print(do.shape)

(1, 1000)
(1, 512, 7, 7)
(1, 512, 7, 7)
(1, 512, 7, 7)
(1, 512, 7, 7)
(1, 256, 14, 14)
(1, 256, 14, 14)
(1, 256, 14, 14)
(1, 256, 14, 14)
(1, 128, 28, 28)
(1, 128, 28, 28)
(1, 128, 28, 28)
(1, 128, 28, 28)
(1, 64, 56, 56)
(1, 64, 56, 56)
(1, 64, 56, 56)
(1, 64, 56, 56)
(1, 64, 112, 112)
(1, 128, 28, 28)
(1, 256, 14, 14)
(1, 512, 7, 7)


In [None]:
output_sizes = {}
for i, (name, _) in enumerate(self.layer_dict.items()):
    output = self._get_intermediate_outputs(dummy_data, i)
    output_sizes[name] = output.shape[1:]
return output_sizes