# Coverage tracker

The purpose of this script is to retrieve and format coverage data produced by `afl-cov`.

In [None]:
import re
import os
import glob
import pandas as pd

%run utils.ipynb

### Format the coverage data by regex

In [3]:
'''
Compute coverage data list
'''
def get_df_da(m):
    # list of execution counts  for  each  instrumented  line
    # DA:<line number>,<execution count>[,<checksum>]
    # execution count is computed in a cumulative fashion
    if not m.match(r'^DA:(\d+),(\d+)'):
        print("!DA information missing...")
        return None
    
    rematch = m.group()
    # ignore if exec = 0
    return pd.DataFrame([[int(m_obj[0]), int(m_obj[1])] for m_obj in rematch if int(m_obj[1]) != 0], columns=["line", "exec"])

'''
Compute function list
'''
def get_df_fn(m):
    fn_dict = None
    # list of functions with their starting line
    # FN:<line number of function start>,<function name>
    # the function information should be the same regardless
    if m.match(r'^FN:(\d+),(\w+)'):
        rematch = m.group()
    return pd.DataFrame([[int(m_obj[0]), m_obj[1]] for m_obj in rematch], columns=["line", "fn"])


'''
Compute function dict
'''
def get_fn_dict(m):
    fn_dict = None
    # list of functions with their starting line
    # FN:<line number of function start>,<function name>
    # the function information should be the same regardless
    if m.match(r'^FN:(\d+),(\w+)'):
        rematch = m.group()
    return {m_obj[1]: int(m_obj[0]) for m_obj in rematch}

'''
Find LH, LH:<number of lines with a non-zero execution count>
'''
def get_lh(m):
    if not m.match(r'^LH:(\d+)'):
        print("!LH information missing...")
        return False
    
    rematch = m.group()[0]
    return int(rematch)

'''
Find SF, SF:<absolute path to the source file>
'''
def get_sf(m):
    if not m.match(r'^SF:(.*)'):
        print("!SF information missing...")
        return False
    
    rematch = m.group()[0]
    return rematch


'''
Future todos: branch coverage information
'''
# branch coverage information is stored which one line per branch
# BRDA:<line number>,<block number>,<branch number>,<taken>

'''
Future todos: function execution counts
'''
# list of execution counts  for  each  instrumented  function, which is stored in variable `fn`
# FNDA:<execution count>,<function name>

'\nFuture todos: function execution counts\n'

### Process and compute the coverage data

In [2]:
'''
Simple regex matcher
'''
class Matcher(object):
    def __init__(self, string):
        self.string = string

    def match(self,regexp):
        self.rematch = re.findall(regexp, self.string, re.MULTILINE)
        return bool(self.rematch)

    def group(self):
        return self.rematch

'''
Structure the coverage string
'''
def add_da_to_cov(trace_id, m, info_cov):
    if trace_id-1 not in cov and trace_id != 0:
        print("Error: trace_id", trace_id-1, "missing")
        return
    # for trace_id after 0, subtract the preceding trace_id's coverage.
    if trace_id != 0:
        this_cov = {key: cov[trace_id].get(key, 0) - cov[trace_id-1].get(key, 0) for key in cov[trace_id-1]}
    else:
        this_cov = {key: cov[trace_id].get(key, 0) - info_cov.get(key, 0) for key in info_cov}
    print(trace_id, this_cov)
    cov[trace_id] = this_cov

'''
Retrieve trace_id from test_file path
'''
def get_trace_id(test_file):
    trace_id = os.path.basename(test_file)
    i1 = trace_id.index('_', 0)+1
    i2 = trace_id.index('_', i1)
    return int(trace_id[i1:i2])

### Core driver functions

#### Coverage collection

In [None]:
'''
Determine which function a given line belongs to
'''
def get_fn(row, fn_dict):
    line = row['line']
    visited = None
    for fn, start_line in dict(sorted(fn_dict.items(), key=lambda item: item[1])).items():
        if start_line < line:
            visited = fn
        else:
            return visited
    return visited

'''
Compute the code coverage for each source file, store in df_exec_c
'''
def process_per_sf_cov(trace_id, test_file):
    global df_exec_c
    
    # build matcher object for coverage string
    cov_str = read_file_as_str(test_file)

    # for file
    sf_str_list = cov_str.split("end_of_record")
    # -1 to skip the `end_of_record` string
    for cov_str in sf_str_list[:-1]:
        m = Matcher(cov_str)
        # skip if the instrumented lines is 0
        if(get_lh(m) == 0):
            continue

        # compute coverage
        df_da = get_df_da(m)
        df_da['trace_id'] = trace_id
        # find the source file and use the relative path
        df_da['file'] = os.path.relpath(get_sf(m), proj_loc)
        df_da = df_da[df_da['exec'] != 0]
        # find the corresponding function
        # !this slows down the pre-processing but just in case that some test cases might have different functions
        fn_dict = get_fn_dict(m)
        df_da['fn'] = df_da.apply(lambda row: get_fn(row, fn_dict), axis=1)
        
        df_exec_c.append(df_da)

'''
Get code coverage from test cases/inputs generated by AFL
'''
def collect_cov(proj_loc):
    global df_exec_c, fn_dict
    
    lcov_loc = os.path.join(proj_loc, 'seeds_out', 'cov', 'lcov')
    # get list of all coverage files
    test_files = glob.glob(lcov_loc+'/*_trace.lcov_info_final')
    id_dict = {get_trace_id(test_file): test_file for test_file in test_files}
    
    # collect coverage for info file
    info_trace_file = os.path.join(lcov_loc, 'trace.lcov_info_final')
    # add source file coverage to df_exec_c
    process_per_sf_cov(-1, info_trace_file)
    
    # collect coverage for all other trace_id
    # -1 to skip the last one which might contain no instrumentation when the fuzzing run is manually interrupted
    for trace_id in sorted(id_dict)[:-1]:
        #print("Trace", trace_id)
        test_file = id_dict[trace_id]
        # add source file coverage to df_exec_c
        process_per_sf_cov(trace_id, test_file)
         
    # combine all df into one single df
    df_exec_c = pd.concat(df_exec_c)
    

#### Spectra computation

In [None]:
'''
Compute the coverage spectra
'''
def compute_cov_spectra(df_exec_c):
    df_cov_spectra = df_exec_c[['line', 'fn', 'file']]
    df_cov_spectra = df_cov_spectra.drop_duplicates(subset=['line', 'fn', 'file'])
    
    # sort
    df_cov_spectra = df_cov_spectra.sort_values(by=['file', 'line']).reset_index(drop=True)
    df_cov_spectra.index.name = "cid"
    
    return df_cov_spectra.reset_index()

'''
Preprocess code coverage, reshape the data to optimize the processing time
'''
def preprocess_cov(spectra_csv, exec_c_csv):
    global df_exec_c
    
    shape = df_exec_c.shape
    # format the code coverage
    df_spectra = compute_cov_spectra(df_exec_c)
    # format the execution count
    df_exec_c = pd.merge(df_exec_c, df_spectra.reset_index(),  how='outer', on=['line', 'file', 'fn'])
    df_exec_c = df_exec_c[['trace_id', 'cid', 'exec']].sort_values(by=['trace_id', 'cid'])
    print("\tprocessed coverage shape {} -> exec_c {} & spectra {}".format(shape, df_exec_c.shape, df_spectra.shape))

    df_spectra.to_csv(spectra_csv, encoding='utf-8', index=False)
    df_exec_c.to_csv(exec_c_csv, encoding='utf-8', index=False)
    return df_spectra, df_exec_c

#### Individual coveration processing

In [None]:
'''
Process the coverage backward by subtracting the last trace_id by the before last one to compute the diff
'''
def diff_cov(row, df_exec_c):
    global counter, verbose
    
    exec_c, trace_id, cid = row['exec'], row['trace_id'], row['cid']
    
    if verbose:
        counter = counter + 1
        if (counter % 1000) == 0:
            print("counter at", counter)
    
    # select next_row to compute the diff, note that info_trace takes the trace_id of -1
    next_row = df_exec_c[(df_exec_c["trace_id"] < trace_id) & (df_exec_c['cid'] == cid)]
    
    if next_row.empty:
        return row
    else:
        # always subtract by the highest exec, which should be the next instrumented test
        row['exec'] = row['exec'] - next_row['exec'].max()
    return row