In [21]:
import torch
import pickle
from sklearn.preprocessing import LabelEncoder
from sctokenizer import CTokenizer

from models import StackLSTM

In [44]:
from collections import defaultdict

In [7]:
vocab_path = "vocab.pkl"

In [37]:
with open(vocab_path, "rb") as input_file:
    vocab = pickle.load(input_file)
le = LabelEncoder()
le.fit(["<SOC>", "<EOC>"])
le.fit(list(vocab))


In [38]:
def process_and_encode(text):
    tokenizer = CTokenizer()
    all_tokens = tokenizer.tokenize(text)
    filtered = [token for token in all_tokens if token.token_value in vocab]
    values = [token.token_value for token in filtered]
    lines = [token.line for token in filtered]
    code = torch.tensor(le.transform(values))
    return code, lines

In [33]:
C_SAMPLE = """void test(void)
{
        char buf[MAXSIZE];
        cin>>buf;
        cout<<buf<<endl;
}"""

In [12]:
HIDDEN_SIZE_CONTROLLER = 8
EMBED_DIM = 164
HIDDEN_SIZE_STACK = 8

In [13]:
model = StackLSTM(embedding_size=len(vocab),
                  embedding_dim=EMBED_DIM,
                  hidden_size_controller=HIDDEN_SIZE_CONTROLLER,
                  hidden_size_stack=HIDDEN_SIZE_STACK,
                  batch_size=1,
                  label_encoder=le)
model.load_state_dict(torch.load("model_7"))
model.eval()

StackLSTM(
  (embedding): Embedding(10001, 164)
  (controller): LSTMCell(172, 8)
  (output_linear): Linear(in_features=8, out_features=10001, bias=True)
  (softmax): Softmax(dim=None)
  (push_fc): Linear(in_features=8, out_features=1, bias=True)
  (pop_fc): Linear(in_features=8, out_features=1, bias=True)
  (values_fc): Linear(in_features=8, out_features=8, bias=True)
  (classifier): Linear(in_features=10001, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)

In [39]:
code, lines = process_and_encode(C_SAMPLE)

model(code.unsqueeze(1))

tensor([[0.5789]], grad_fn=<SigmoidBackward0>)

In [43]:
import csv
csv.field_size_limit(1000000000)
groundtruth = {}
with open("../test.csv", 'r', newline='') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            if row["target"] == "0":
                continue
            if row["flaw_line"] is None:
                 continue
            groundtruth[row["index"]] = row["flaw_line"].split("/~/")

In [41]:
import re

@torch.no_grad()
def sort_lines(scores):
    _, indices = torch.sort(torch.as_tensor(scores), descending=True)
    return indices.tolist()


def get_flaw_indices(lines, flaw_lines):
    indices = []
    def clean(line):
        # line = re.sub("^\s", "", line)
        # line = re.sub("\s$", "", line)
        line = re.sub("\s", "", line)
        return line
    flaw_lines = [clean(flaw_line) for flaw_line in flaw_lines if len(clean(flaw_line)) != 0]
    lines = [clean(line) for line in lines]

    for i, line in enumerate(lines):
        if len(line) == 0:
            continue
        if any(line in flaw_line for flaw_line in flaw_lines) or \
            any(flaw_line in line for flaw_line in flaw_lines):
            indices.append(i)
    return indices


def min_rank_of_indices(sorted_indices, searched_indices):
    rank_mapping = {index: rank for rank, index in enumerate(sorted_indices)}
    return min(
        (rank_mapping[index] for index in searched_indices if index in rank_mapping),
        default=float("inf"),
    )

def get_c_lines(idx, set="test", label=1):
    with open(f"../data/test/{set}/{idx}_{label}.c", "r", encoding="utf-8") as f:
        return f.readlines()

In [130]:
from torchray.attribution.common import (
    NullContext,
    gradient_to_saliency,
    get_backward_gradient,
    attach_debug_probes,
    get_module,
    Probe,
    resize_saliency,
    imsmooth,
)

def gradient_to_grad_cam_saliency(x):
    r"""Convert activation and gradient to a Grad-CAM saliency map.
    The tensor :attr:`x` must have a valid gradient ``x.grad``.
    The function then computes the saliency map :math:`s`: given by:
    .. math::
        s_{n1u} = \max\{0, \sum_{c}x_{ncu}\cdot dx_{ncu}\}
    Args:
        x (:class:`torch.Tensor`): activation tensor with a valid gradient.
    Returns:
        :class:`torch.Tensor`: saliency map.
    """
    # Apply global average pooling (GAP) to gradient.
    grad_weight = torch.mean(x.grad, (0, 1), keepdim=True)

    # Linearly combine activations and GAP gradient weights.
    saliency_map = torch.sum(grad_weight * x, 2, keepdim=True)
    return saliency_map

def saliency(model,
             input,
             target,
             saliency_layer='',
             resize=False,
             resize_mode='bilinear',
             smooth=0,
             context_builder=NullContext,
             gradient_to_saliency=gradient_to_saliency,
             get_backward_gradient=get_backward_gradient,
             debug=False):
    """
    monkey-patch torchray saliency
    """

    # Clear any existing gradient.
    # if input.grad is not None:
    #     input.grad.data.zero_()

    # Disable gradients for model parameters.
    # orig_requires_grad = {}
    # for name, param in model.named_parameters():
    #     orig_requires_grad[name] = param.requires_grad
    #     param.requires_grad_(False)

    # Set model to eval mode.
    model.train()
    if model.training:
        orig_is_training = True
        # model.eval()
    else:
        orig_is_training = False

    # Attach debug probes to every module.
    debug_probes = attach_debug_probes(model, debug=debug)

    # Attach a probe to the saliency layer.
    probe_target = 'input' if saliency_layer == '' else 'output'
    saliency_layer = get_module(model, saliency_layer)
    assert saliency_layer is not None, 'We could not find the saliency layer'
    probe = Probe(saliency_layer, target="output")

    # Do a forward and backward pass.
    with context_builder():
        output = model(input)
        # backward_gradient = get_backward_gradient(output, target)
        output.backward()

    # Get saliency map from gradient.
    saliency_map = gradient_to_saliency(probe.data[0])

    # Resize saliency map.
    # saliency_map = resize_saliency(input,
    #                                saliency_map,
    #                                resize,
    #                                mode=resize_mode)

    # Smooth saliency map.
    # if smooth > 0:
        # saliency_map = imsmooth(
        #     saliency_map,
        #     sigma=smooth * max(saliency_map.shape[2:]),
        #     padding_mode='replicate'
        # )

    # Remove probe.
    probe.remove()

    # Restore gradient saving for model parameters.
    # for name, param in model.named_parameters():
    #     param.requires_grad_(orig_requires_grad[name])

    # Restore model's original mode.
    if orig_is_training:
        model.train()

    return saliency_map

def grad_cam(*args,
             saliency_layer,
             gradient_to_saliency=gradient_to_grad_cam_saliency,
             **kwargs):
    return saliency(*args,
                    saliency_layer=saliency_layer,
                    gradient_to_saliency=gradient_to_saliency,
                    **kwargs,)

In [48]:
with open("../data/merged.txt", "r") as f:
    successful_idxs = set(f.read().split("\n"))

In [77]:
from tqdm import tqdm

In [134]:
def eval_linelevel_merged(C_SET):
    ranks = []
    print(f"Evaluating {C_SET}")
    for name in tqdm(list(map(lambda idx: f"../data/test/{C_SET}/{idx}_1.c", successful_idxs))):
        idx = name.split("/")[-1].split("_")[0]
        if idx not in groundtruth:
            continue

        c_lines = get_c_lines(idx, C_SET)
        flaw_indices = get_flaw_indices(c_lines, groundtruth[idx])
        if len(flaw_indices) < 1:
            continue

        target = torch.tensor([1.], requires_grad=True)
        data, lines = process_and_encode("".join(c_lines))
        token_c = grad_cam(model, data.unsqueeze(1), target.long(), saliency_layer="embedding")
        token_mask = token_c.sigmoid()[:, 0, 0]

        linescores = defaultdict(int)
        for token_idx, score in enumerate(token_mask):
            line = lines[token_idx]
            linescores[line] += score
        
        lines = [0 for _ in range(max(linescores.keys()))]
        for line, score in linescores.items():
            lines[line-1] = score
        
        sorted_lines = sort_lines(lines)
        rank = min_rank_of_indices(sorted_lines, flaw_indices)
        ranks.append(rank)
    
    ranks = torch.as_tensor(ranks)
    ranks = ranks[ranks.isfinite()]
    
    def topk_acc(k):
            return round((
                torch.sum(ranks < k) / len(ranks)
            ).item() * 100, 2)
    print({
        "Top1-Acc": topk_acc(1),
        "Top3-Acc": topk_acc(3),
        "Top5-Acc": topk_acc(5),
    })

In [133]:
idx = "178360"
c_lines = get_c_lines(idx, "test")
flaw_indices = get_flaw_indices(c_lines, groundtruth[idx])

target = torch.tensor([1.], requires_grad=True)
data, lines = process_and_encode("".join(c_lines))
token_c = grad_cam(model, data.unsqueeze(1), target.long(), saliency_layer="embedding")
token_mask = token_c.sigmoid()[:, 0, 0]

linescores = defaultdict(int)
for token_idx, score in enumerate(token_mask):
    line = lines[token_idx]
    linescores[line] += score

lines = [0 for _ in range(max(linescores.keys()))]
for line, score in linescores.items():
    lines[line-1] = score

sorted_lines = sort_lines(lines)
rank = min_rank_of_indices(sorted_lines, flaw_indices)

In [135]:
eval_linelevel_merged("test")

Evaluating test


100%|██████████| 181/181 [04:51<00:00,  1.61s/it]

{'Top1-Acc': 39.23, 'Top3-Acc': 58.56, 'Top5-Acc': 66.3}





In [136]:
eval_linelevel_merged("apply_codestyle_Chromium")

Evaluating apply_codestyle_Chromium


100%|██████████| 181/181 [06:06<00:00,  2.03s/it]

{'Top1-Acc': 45.86, 'Top3-Acc': 60.77, 'Top5-Acc': 69.61}





In [137]:
eval_linelevel_merged("apply_codestyle_Mozilla")

Evaluating apply_codestyle_Mozilla


100%|██████████| 181/181 [05:55<00:00,  1.96s/it]

{'Top1-Acc': 44.2, 'Top3-Acc': 60.77, 'Top5-Acc': 70.17}





In [138]:
eval_linelevel_merged("apply_codestyle_Google")

Evaluating apply_codestyle_Google


100%|██████████| 181/181 [03:31<00:00,  1.17s/it]

{'Top1-Acc': 45.3, 'Top3-Acc': 61.88, 'Top5-Acc': 69.61}





In [139]:
eval_linelevel_merged("apply_codestyle_LLVM")

Evaluating apply_codestyle_LLVM


100%|██████████| 181/181 [03:25<00:00,  1.13s/it]

{'Top1-Acc': 44.2, 'Top3-Acc': 60.77, 'Top5-Acc': 69.61}





In [140]:
eval_linelevel_merged("apply_codestyle_GNU")

Evaluating apply_codestyle_GNU


100%|██████████| 181/181 [03:03<00:00,  1.01s/it]

{'Top1-Acc': 39.78, 'Top3-Acc': 59.67, 'Top5-Acc': 67.4}



