Analysis pinned to Kernel version 5.10

In [1]:
#todo: make the regexes more strict.

In [2]:
import re
import itertools
from itertools import islice
from subprocess import run

from pathlib import Path
import sqlite3

KERNEL = Path(".")
OUTDIR = Path("../function_survey/output/")
all_calls = OUTDIR/"cscope_all_calls.txt"
kernel_tags = OUTDIR/"kernel_tags"
test_targets = OUTDIR/"cscope_test_targets"
all_c_code = OUTDIR/"all_c_code.txt"
blame_files = Path("../blame")
blame_parsed = Path(OUTDIR/"blame_parsed")


connection = sqlite3.connect(OUTDIR/"function_survey.db")
cursor = connection.cursor()

In [3]:
def parse(filename, expression):
    with open(filename) as f:
        return [re.match(expression, line) for line in f]
    
def head(iterable, n=10):
    return list(islice(iterable, n))

## Blame-Parser

In [4]:
# How does cregit handle ifdefs in functions?

In [5]:
def strict_match(pattern, line, flags=0):
    m = re.match(pattern, line, flags=0)
    if m:
        return m
    raise(ValueError, f"{pattern} did not match {line}")

def blame_contents(match):
    return match["contents"].split("|")

def parse_blame_file(filename):
    expression = (r"^(?P<hash>\w{40});"
                  r"(?P<previous_file_name>[^\s;]*);"
                  r"\t(?P<contents>.*)$"
                 )
    with open(filename) as f:
        yield from (blame_contents(strict_match(expression, line)) for line in f)

In [6]:
def parse_whole(filename):
    lines = parse_blame_file(filename)

    for start, *rest in lines:
        if (start in ("begin_unit", "end_unit", "")):
            pass
        elif start.startswith("begin_"):
            item = start[len("begin_"):]
            if item == "function":
                yield ("function", *parse_function(lines_in_item(lines, item)))
            elif item == "include":
                yield ("include", *parse_include(lines_in_item(lines, item)))
            else:
                # function_decl will be skipped
                yield skip(lines, item)

def parse_function(lines):
    lines = iter(lines)
    function_name, specifiers = parse_function_decl(lines)
    callees = parse_function_body(lines)
    return (function_name, specifiers, callees)

def parse_function_decl(lines):
    specifiers = []
    function_name = None
    for start, *rest in lines:
        if start == "specifier":
            assert len(rest) == 1, rest
            specifiers += rest
            # are there other specifiers than static
        elif start == "DECL":
            assert rest[0] == "function", (start, rest)
            function_name = rest[1].split()[0]
        elif start == "name":
            pass # we may be able to weed out the function name, and get the types if that is usefull
        elif start == "parameter_list" and rest == [")"]:
            break  # this marks the end of the function header
        else:
            pass # TODO: check what other declaration parts end up here
    # assert function_name is not None 
    # in blame/drivers/ssb/main.c the function name_show
    # does not have it's declaration detected
    # by the blame file parser
    # This might be correctible by using the name directly before the function parameters as the function name
    # I will be ignoring this edge case for now.
    return function_name, specifiers

def parse_function_body(lines):
    names = []
    callees = []
    
    contents = next(lines)
    # check if this holds for empty blocks
#     assert (contents == ["block", "{"] or contents == ["block", "{}"]), "function body must start with block"
    
    prev_name = None
    for start, *rest in lines:
        if start == "name":
            assert len(rest) == 1
            names += rest
            prev_name = rest[0]
        elif start == "argument_list":
            if rest in [["("], ["()"]]:
                assert prev_name is not None
                callees.append(prev_name)
            prev_name = None
        elif start=="sizeof":
            prev_name = "sizeof"
            # this treats sizeof like a function. Weeding them out completly would probably be better
        else:
            prev_name = None # assume that argument lists always follow function names directly
    return callees
            
def parse_include(lines):
    assert len(lines) == 3, lines
    lines = iter(lines)
    assert next(lines) == ["include", "#"], lines
    assert next(lines) == ["directive", "include"], lines
    start, *rest = next(lines)
    assert start == "file" and len(rest) == 1, lines
    return rest
    
def skip(lines, item):
    """Skips to the end of a begin/end pair"""
    lines_in_item(lines, item)
    return (item, "skipped")
        
def lines_in_item(lines, item):
    """Returns a list of all lines between the begin_? and end_? markers"""
    result = []
    
    start, *rest = contents = next(lines)
    while start != f"end_{item}":
        assert not start.startswith("end_"), "end of different item found"
        assert not start.startswith("begin_"), "start of different item found"
        result.append(contents)
        start, *rest = contents = next(lines)
    return result
            

def output_location(input_path):
    output_path = blame_parsed/input_path.relative_to(blame_files)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    return output_path

def parse_to_file(filename):
    parsed = list(parse_whole(filename))
    output_path = output_location(filename)
    with open(output_path.with_suffix(".all_items"), "w") as all_items,\
         open(output_path.with_suffix(".functions"), "w") as functions,\
         open(output_path.with_suffix(".specifiers"), "w") as specifiers,\
         open(output_path.with_suffix(".calls"), "w") as calls,\
         open(output_path.with_suffix(".includes"), "w") as includes:
        all_items.writelines(str(x)+"\n" for x in parsed)
        for line in parsed:
            if line[0] == "function":
                function_name = line[1]
                functions.write(function_name + "\n")
                for specifier in line[2]:
                    specifiers.write(function_name + "," + specifier + "\n")
                for callee in line[3]:
                    calls.write(function_name + "," + callee + "\n")
            elif line[0] == "include":
                includes.write(line[1]+"\n")
            else:
                assert line[1] == "skipped"
    
def parse_kernel():
    for filename in blame_files.rglob("*.c.blame"):
        yield filename, parse_whole(filename)


In [7]:
parse_to_file(Path('../blame/drivers/uio/uio_mf624.c.blame'))

In [9]:
faillures = []
for filename in blame_files.rglob("*.c.blame"):
    try:
        parse_to_file(filename)
    except Exception as e:
        faillures.append((filename,e))

In [10]:
head(faillures)

[(PosixPath('../blame/drivers/sh/clk/cpg.c.blame'), AssertionError()),
 (PosixPath('../blame/drivers/sh/intc/handle.c.blame'), AssertionError()),
 (PosixPath('../blame/drivers/ssb/host_soc.c.blame'), AssertionError()),
 (PosixPath('../blame/drivers/ssb/pcmcia.c.blame'), AssertionError()),
 (PosixPath('../blame/drivers/ssb/main.c.blame'),
  TypeError("unsupported operand type(s) for +: 'NoneType' and 'str'")),
 (PosixPath('../blame/drivers/remoteproc/ti_k3_r5_remoteproc.c.blame'),
  AssertionError()),
 (PosixPath('../blame/drivers/fpga/dfl-fme-main.c.blame'), AssertionError()),
 (PosixPath('../blame/drivers/fpga/altera-pr-ip-core.c.blame'),
  AssertionError()),
 (PosixPath('../blame/drivers/fpga/dfl-fme-perf.c.blame'), AssertionError()),
 (PosixPath('../blame/drivers/ide/ide-disk.c.blame'), AssertionError())]

In [46]:
kernel_files = dict(parse_kernel())

In [47]:
len(kernel_files)

29473

In [48]:
len([func for file in kernel_files.values() for func in file]) #529075 same number of functions found

Exception from ../blame/drivers/ssb/main.c.blame


AssertionError: <list_iterator object at 0x7ff1518238b0>

In [None]:
def reset_cregit_db():
    # all functions
    cursor.execute("CREATE TABLE IF NOT EXISTS cregit_functions (file, name)")
    # all calls from one function to annother
    cursor.execute("CREATE TABLE IF NOT EXISTS cregit_calls (file, caller, callee)")
    # clear tables
    cursor.execute("DELETE FROM cregit_functions")
    cursor.execute("DELETE FROM cregit_calls")

In [13]:
head(kernel_files.items())

[(PosixPath('../blame/drivers/uio/uio_mf624.c.blame'),
  {'mf624_disable_interrupt': ['iowrite32',
    'ioread32',
    'iowrite32',
    'ioread32',
    'iowrite32',
    'ioread32'],
   'mf624_enable_interrupt': ['iowrite32',
    'ioread32',
    'iowrite32',
    'ioread32',
    'iowrite32',
    'ioread32'],
   'mf624_irq_handler': ['ioread32',
    'ioread32',
    'mf624_disable_interrupt',
    'ioread32',
    'ioread32',
    'mf624_disable_interrupt'],
   'mf624_irqcontrol': ['mf624_disable_interrupt', 'mf624_enable_interrupt'],
   'mf624_setup_mem': ['pci_resource_start',
    'pci_resource_len',
    'pci_ioremap_bar'],
   'mf624_pci_probe': ['kzalloc',
    'pci_enable_device',
    'pci_request_regions',
    'mf624_setup_mem',
    'mf624_setup_mem',
    'mf624_setup_mem',
    'uio_register_device',
    'pci_set_drvdata',
    'iounmap',
    'iounmap',
    'iounmap',
    'pci_release_regions',
    'pci_disable_device',
    'kfree'],
   'mf624_pci_remove': ['pci_get_drvdata',
    'mf624_di

In [None]:
kernel_files = parse_kernel()
reset_cregit_db()
# trim ../blame/ and .blame from ends of path so that paths can be compared between methods
cursor.executemany("INSERT INTO cregit_calls VALUES (?,?,?)", (
    (str(file)[len("../blame/"):-len(".blame")], caller, callee)
    for file, functions in kernel_files.items()
    for caller, callees in functions.items()
    for callee in set(callees))
)
cursor.executemany("INSERT INTO cregit_functions VALUES (?,?)", (
    (str(file)[len("../blame/"):-len(".blame")], function_name)
    for file, file_functions in kernel_files.items()
    for function_name in file_functions
))

In [None]:
# get test functions (based on name, case insensitive)
print(head(cursor.execute("SELECT caller FROM cregit_calls WHERE caller LIKE '%test%'")))
# get tested functions
head(cursor.execute("SELECT COUNT(DISTINCT callee) FROM cregit_calls WHERE caller LIKE '%test%'"))

In [None]:
tested_functions = set(x[0] for x in cursor.execute("SELECT callee FROM cregit_calls WHERE caller LIKE '%test%'"))

In [None]:
# Total number of functions
print(head(cursor.execute("SELECT COUNT(*) FROM cregit_functions")))

In [None]:
print(head(cursor.execute("SELECT COUNT(DISTINCT name)*1.0/COUNT(*) FROM cregit_functions")))

In [None]:
list(cursor.execute(
    "WITH A AS (SELECT name FROM cregit_functions GROUP BY name HAVING COUNT(name)>3) SELECT COUNT(*) FROM A"))

In [None]:
list(cursor.execute(
    "SELECT name, COUNT(name) FROM cregit_functions GROUP BY name ORDER BY COUNT(name) DESC LIMIT 20"))

In [None]:
connection.commit()