In [31]:
import logging
import os
import traceback
from pathlib import Path
from pprint import pp

import io
from git import Repo
from tree_sitter import Language, Parser

TREE_SITTER_LIB_PREFIX = "./lib"

languages = ["java"]
language_dirs = []

for lang in languages:
    clone_dir = os.path.join(TREE_SITTER_LIB_PREFIX, f"tree-sitter-{lang}")
    language_dirs.append(clone_dir)

lib_file = os.path.join(TREE_SITTER_LIB_PREFIX, "build/languages.so")
Language.build_library(
    # Store the library in the `build` directory
    lib_file,
    # Include one or more languages
    [
        os.path.join(TREE_SITTER_LIB_PREFIX, f"tree-sitter-{lang}"),
    ],
)

LANGUAGE = Language(lib_file, languages[0])
parser = Parser()
parser.set_language(LANGUAGE)


def parse_file(filename):
    with open(filename, "rb") as f:
        tree = parser.parse(f.read())
    return tree

def print_node(node):
    text = node.text.decode()
    if "\n" in text:
        text = text.splitlines(keepends=False)[0] + "..."
    print(node, text)

def print_tree(root):
    q = [(root, 0)]
    while len(q) > 0:
        n, level = q.pop(-1)
        print("  " * level, end="")
        print_node(n)
        q.extend([(m, level+1) for m in reversed(n.children)])

tree = parser.parse("""public class Foo {
    public void main() {
        System.out.println("Hello, world!");
    }
}""".encode())
print_tree(tree.root_node)

<Node type=program, start_point=(0, 0), end_point=(4, 1)> public class Foo {...
  <Node type=class_declaration, start_point=(0, 0), end_point=(4, 1)> public class Foo {...
    <Node type=modifiers, start_point=(0, 0), end_point=(0, 6)> public
      <Node type="public", start_point=(0, 0), end_point=(0, 6)> public
    <Node type="class", start_point=(0, 7), end_point=(0, 12)> class
    <Node type=identifier, start_point=(0, 13), end_point=(0, 16)> Foo
    <Node type=class_body, start_point=(0, 17), end_point=(4, 1)> {...
      <Node type="{", start_point=(0, 17), end_point=(0, 18)> {
      <Node type=method_declaration, start_point=(1, 4), end_point=(3, 5)> public void main() {...
        <Node type=modifiers, start_point=(1, 4), end_point=(1, 10)> public
          <Node type="public", start_point=(1, 4), end_point=(1, 10)> public
        <Node type=void_type, start_point=(1, 11), end_point=(1, 15)> void
        <Node type=identifier, start_point=(1, 16), end_point=(1, 20)> main
       

In [32]:
tree = parser.parse("""public class FooWithNative {
    private static native boolean parse(String bytes);
}""".encode())
print_tree(tree.root_node)

<Node type=program, start_point=(0, 0), end_point=(2, 1)> public class FooWithNative {...
  <Node type=class_declaration, start_point=(0, 0), end_point=(2, 1)> public class FooWithNative {...
    <Node type=modifiers, start_point=(0, 0), end_point=(0, 6)> public
      <Node type="public", start_point=(0, 0), end_point=(0, 6)> public
    <Node type="class", start_point=(0, 7), end_point=(0, 12)> class
    <Node type=identifier, start_point=(0, 13), end_point=(0, 26)> FooWithNative
    <Node type=class_body, start_point=(0, 27), end_point=(2, 1)> {...
      <Node type="{", start_point=(0, 27), end_point=(0, 28)> {
      <Node type=method_declaration, start_point=(1, 4), end_point=(1, 54)> private static native boolean parse(String bytes);
        <Node type=modifiers, start_point=(1, 4), end_point=(1, 25)> private static native
          <Node type="private", start_point=(1, 4), end_point=(1, 11)> private
          <Node type="static", start_point=(1, 12), end_point=(1, 18)> static
     

In [33]:
import jsonlines
import tqdm
import itertools
import numpy as np
import json
from dataclasses import dataclass, field
from typing import List, Set, Tuple, Union

@dataclass
class Stats:
    coverage: List[Set[int]] = field(default_factory=list)
    code: str = None
    key: Tuple[Union[str, int]] = None
    inputs: List = field(default_factory=list)

    @property
    def unique_functions(self):
        return len(set(map(str, self.inputs)))

    def add_input(self, inp):
        self.inputs.append(inp)

    def add_code(self, example):
        if self.code is None:
            self.code = example["code"]

    def add_coverage(self, example):
        self.coverage.append(set(s["relative_lineno"] for s in example["lines_covered"]))
    
    def calculate_line_coverage(self):
        should_be_covered = get_all_lines_in_code(self.code)
        lines_code = self.code.splitlines()
        exclude_lines = set(i for i, l in enumerate(lines_code, start=1) if l.strip() == "}" or (l.strip().startswith("catch") and l.strip().endswith("{")))
        new_coverage = []
        if len(self.coverage) == 0:
            self.coverage = []
            all_covered_lines = set()
        else:
            for cs in self.coverage:
                cs = set(x for x in cs if x not in exclude_lines and x >= 0 and x <= len(lines_code))
                new_coverage.append(cs)
            self.coverage = new_coverage
            all_covered_lines = set.union(*self.coverage)

        intersection = set(l for l in all_covered_lines if any(a <= l <= b for a, b in should_be_covered))
        difference = all_covered_lines.difference(intersection)
        difference = sorted(difference)
        covered = set((a, b) for a, b in should_be_covered if any(a <= l <= b for l in all_covered_lines))
        not_covered = set((a, b) for a, b in should_be_covered if not any(a <= l <= b for l in all_covered_lines))
        not_covered_start = [x[0] if x[0] == x[1] else x for x in sorted(not_covered)]

        line_coverage = (len(covered) / len(should_be_covered)) if len(should_be_covered) > 0 else 1.0

        if (debug and (select_key is None or self.key == select_key)):
            print("KEY:", self.key)
            print("EXPORT STATS")
            # print("TRACED", stats.coverage)
            # print("TRACED (UNION)", list(sorted(all_covered_lines)))
            print("EXECUTABLE:", list(sorted(should_be_covered)))
            print("INTERSECTION:", list(sorted(covered)))
            print("LINE COVERAGE:", line_coverage*100 , "%")
            if len(not_covered) > 0:
                print("NOT COVERED:", not_covered)
            if len(difference) > 0:
                print("TRACED NOT IN EXECUTABLE:", difference)
            print("CODE:")
            print(
                "\n".join(str(i).rjust(3, " ") + " "
                + l +
                (" // NOT COVERED" if i in not_covered_start else "") +
                (" // NOT PARSED" if i in difference else "")
                for i, l in enumerate(self.code.splitlines(), start=1)))
            print("TREE:")
            print_tree(parser.parse(self.code.encode()).root_node)

        # if len(difference) > 0:
        #     print("KEY:", self.key)
        #     print("COVERED:", all_covered_lines)
        #     print("SHOULD BE:", should_be_covered)
        #     print("CODE:")
        #     print(
        #         "\n".join(str(i).rjust(3, " ") + " "
        #         + l +
        #         (" // NOT PARSED" if i in difference else "")
        #         for i, l in enumerate(self.code.splitlines(), start=1)))
        #     print("TREE:")
        #     print_tree(parser.parse(self.code.encode()).root_node)
            # assert len(difference) == 0, (difference)
        return line_coverage

def has_ancestor(n, fn):
    # print("SEARCHING FOR =")
    # print_node(n)
    q = [n]
    while len(q) > 0:
        n = q.pop()
        # print("\t", end="")
        # print_node(n)
        if fn(n):
            return True
        q.extend(reversed(n.children))
    return False

def get_all_lines_in_code(code):
    tree = parser.parse(("public class Foo{" + code + "}").encode())
    # print_tree(tree.root_node)
    traceable_lines = []
    q = [tree.root_node]
    while len(q) > 0:
        n = q.pop()
        if n.type == "method_declaration":
            block = next((c for c in n.children if c.type == "block"), None)
            if block is None:
                print("block is missing:")
                print(code)
                continue
            if not any(c.is_named for c in block.children):
                traceable_lines.append(block)
        if (
            n.type.endswith("_statement")
            or n.type in ("switch_expression",)
            or (n.type == "local_variable_declaration" and has_ancestor(n, lambda n: n.type == "="))
            # or (n.type == "}" and n.parent.parent.type == "method_declaration")
            ):
            # print("EXECUTABLE STATEMENT:", end="")
            # print_node(n)
            # print_node(n.parent)
            traceable_lines.append(n)
        # else:
        q.extend(reversed(n.children))
    
    should_be_covered = set((n.start_point[0]+1, n.end_point[0]+1) for n in traceable_lines)
    return should_be_covered

# debug = True
debug = False
select_key = None
# select_key = ('apache-commons', 'org.apache.commons.compress.archivers.zip.NioZipEncoding', 'newDecoder', 176)

function_idx = 0
current_key = None
from collections import defaultdict
stats_by_key = defaultdict(Stats)
# current_stats = Stats()
all_line_coverage = {}
fname = "postprocessed_runall_checkmethod/postprocessed_dedup_sort_filter.jsonl"
# fname = "postprocessed_test_init_dedup_sort_filter.jsonl"
# fname = "postprocessed_myfilter.jsonl"
num_lines = 0
failed_examples = 0
projects = set()
keys = set()
with open(fname) as f:
    for line in tqdm.tqdm(f, desc="count lines"):
        num_lines += 1
with jsonlines.open(fname) as reader, jsonlines.open("failed_assert.jsonl", "w") as writer:
    pbar = tqdm.tqdm(reader, total=num_lines, desc="calculate stats")
    for example in pbar:
        key = (example["project"], example["class"], example["method"], example["start_point"][0]+1)
        projects.add(example["project"])
        keys.add(key)
        # if example["code"] is None:
        #     print("No code:", key)
        #     continue
        try:
            current_stats = stats_by_key[key]
            # if current_key is None:
            #     current_key = key
            #     current_stats.key = key
            # if key != current_key:
            #     all_line_coverage[current_stats.key] = (current_stats.calculate_line_coverage(), current_stats.unique_functions)
            #     current_stats = Stats()
            #     current_key = key
            #     current_stats.key = key
            current_stats.add_coverage(example)
            current_stats.add_code(example)
            current_stats.add_input(example["entry_variables"])
        except AssertionError:
            # print("ERROR IN", example["xml_file_path"])
            writer.write(example)
            failed_examples += 1
            pass
            # print(json.dumps(example, indent=2))
            # raise
        pbar.set_description(f"{failed_examples} failed, {len(projects)} projects seen, {len(keys)} keys seen")
    # try:
    #     all_line_coverage[current_stats.key] = (current_stats.calculate_line_coverage(), current_stats.unique_functions)
    # except AssertionError:
    #     writer.write(example)

count lines: 490111it [00:03, 134737.98it/s]
0 failed, 77 projects seen, 2649 keys seen: 100%|██████████| 490111/490111 [07:50<00:00, 1042.07it/s]


In [34]:
from matplotlib import pyplot as plt
import seaborn as sns
import pandas as pd

In [35]:
# all_data = []
# for k, v in current_stats.items():
#     p, c, m, l = k
#     current_stats = v
#     cov = current_stats.calculate_line_coverage()
#     funcs = current_stats.unique_functions
#     all_data.append((p,))
df = pd.DataFrame(data=[{
    "project": p,
    "class": c,
    "method": m,
    "start_line": l,
    "line_coverage": current_stats.calculate_line_coverage(),
    "num_unique_inputs": current_stats.unique_functions,
} for (p, c, m, l), current_stats in stats_by_key.items()])
df

block is missing:
private static native boolean parse(String bytes);


Unnamed: 0,project,class,method,start_line,line_coverage,num_unique_inputs
0,angus-mail,ASCIIUtilityFuzzer,fuzzerTestOneInput,24,1.000000,2
1,angus-mail,BASE64EncoderStreamFuzzer,fuzzerTestOneInput,25,0.800000,2
2,angus-mail,com.sun.mail.util.ASCIIUtility,parseInt,43,0.939394,55
3,angus-mail,com.sun.mail.util.ASCIIUtility,parseLong,133,0.787879,35
4,angus-mail,com.sun.mail.util.ASCIIUtility,toString,219,1.000000,55
...,...,...,...,...,...,...
2644,zip4j,net.lingala.zip4j.crypto.StandardDecrypter,init,50,0.153846,12
2645,zip4j,net.lingala.zip4j.model.ExtraDataRecord,setData,45,1.000000,42
2646,zip4j,net.lingala.zip4j.model.AbstractFileHeader,setZip64ExtendedInfo,142,1.000000,12
2647,zip4j,net.lingala.zip4j.model.Zip64ExtendedInfo,setCompressedSize,46,1.000000,6


In [36]:
df

Unnamed: 0,project,class,method,start_line,line_coverage,num_unique_inputs
0,angus-mail,ASCIIUtilityFuzzer,fuzzerTestOneInput,24,1.000000,2
1,angus-mail,BASE64EncoderStreamFuzzer,fuzzerTestOneInput,25,0.800000,2
2,angus-mail,com.sun.mail.util.ASCIIUtility,parseInt,43,0.939394,55
3,angus-mail,com.sun.mail.util.ASCIIUtility,parseLong,133,0.787879,35
4,angus-mail,com.sun.mail.util.ASCIIUtility,toString,219,1.000000,55
...,...,...,...,...,...,...
2644,zip4j,net.lingala.zip4j.crypto.StandardDecrypter,init,50,0.153846,12
2645,zip4j,net.lingala.zip4j.model.ExtraDataRecord,setData,45,1.000000,42
2646,zip4j,net.lingala.zip4j.model.AbstractFileHeader,setZip64ExtendedInfo,142,1.000000,12
2647,zip4j,net.lingala.zip4j.model.Zip64ExtendedInfo,setCompressedSize,46,1.000000,6
