# This notebook performs a look up of vulnerable functions in the function call graph

In [None]:
import os
import json
import multiprocessing
from multiprocessing.managers import BaseManager, DictProxy
import subprocess
import shlex
from collections import defaultdict
import importlib
import sys
import itertools

In [None]:
class MyManager(BaseManager):
    pass
MyManager.register('defaultdict', defaultdict, DictProxy)

In [None]:
from arango import ArangoClient
from arango.http import DefaultHTTPClient

In [None]:
class MyCustomHTTPClient(DefaultHTTPClient):
    REQUEST_TIMEOUT = 10000 # Set the timeout you want in seconds here
client_g = ArangoClient(hosts='http://localhost:8529', http_client=DefaultHTTPClient(request_timeout=10000))
db_g = client_g.db('sysfilter', username='root', password='root')

In [None]:
def func_lookup_exact(func_name):
    SEARCH_QUERY = f'FOR doc in functions FILTER doc.name == @value RETURN doc'
    cursor = db.aql.execute(SEARCH_QUERY,
                               bind_vars={'value': f'{func_name}'}
                           )
    return [ doc for doc in cursor ]

In [None]:
def func_key_elf_name_lookup_exact(func_name, elf_name):
    SEARCH_QUERY = f'FOR doc in functions FILTER doc.name == @func_name && doc.lib == @lib_name RETURN doc'
    cursor = db.aql.execute(SEARCH_QUERY,
                               bind_vars={'func_name': f'{func_name}', 'lib_name' : f'{elf_name}'}
                           )
    return [ doc for doc in cursor ]

In [None]:
def get_funcs_from_elf(elf_name):
    SEARCH_QUERY = f'FOR doc in functions FILTER doc.lib == @value RETURN doc'
    cursor = db.aql.execute(SEARCH_QUERY,
                               bind_vars={'value': f'{elf_name}'}
                           )
    return [ doc for doc in cursor ]

In [None]:
def exact_elf_lookup(elf_name, deb_name):
    SEARCH_QUERY = f'FOR doc in elf_bins FILTER doc._key == @value RETURN doc'
    cursor = db.aql.execute(SEARCH_QUERY,
                           bind_vars={'value': f'{elf_name}@{sanitize_name(deb_name)}'},
                           count=True)
    if cursor.count() == 1:
        for node in cursor:
            return node
    return None

In [None]:
apt_src_deb_map = json.load(open("./data/apt_src_deb_maps.json", "r"))

In [None]:
cve_funcs_git_src_tuples = []
func_exclude_list = ["main"]
with open("data/cve_funcs_git_src_tuples.csv", "r") as f:
    for line in f.readlines():
        line = line.strip()
        func_name = line.split(",")[1]
        if func_name in func_exclude_list:
            continue
        cve_funcs_git_src_tuples.append(line)
cve_funcs_git_src_tuples = list(set(cve_funcs_git_src_tuples))

In [None]:
len(cve_funcs_git_src_tuples)

In [None]:
total_cves = []
for tuple in cve_funcs_git_src_tuples:
    cve = tuple.split(",")[0]
    total_cves.append(cve)
total_cves = list(set(total_cves))

In [None]:
len(total_cves)

In [None]:
print(f"We fall from 1042 to 1031 CVEs because we exclude all CVEs that change the 'main' function")

In [None]:
len(total_cves)

In [None]:
def get_arango_connection():
    client = ArangoClient(hosts='http://localhost:8529')
    # client = ArangoClient(hosts='http://localhost:8529')
    global db
    db = client.db('sysfilter', username='root', password='root')

In [None]:
def get_debs_from_git_src(git_src):
    tokens = git_src.split("/")
    for token in tokens:
        if token not in apt_src_deb_map:
            continue
        return apt_src_deb_map[token]
    return []

In [None]:
def sanitize_name(name):
    if '~' in name:
        name = name.replace('~', '%7E')
    if '+' in name:
        name = name.replace('+', '%2B')
    return name

In [None]:
cve_funcs_git_src_tuples = list(set(cve_funcs_git_src_tuples))

In [None]:
cve_exact_manager = multiprocessing.Manager()
cve_lookup_details_shared = cve_exact_manager.dict()

In [None]:
def find_record_in_DB(tuple, cve_lookup_details_shared):

    def update_shared_dict(key, subkey, subval1, subval2):
        nonlocal cve_lookup_details_shared
        if key not in cve_lookup_details_shared:
            cve_lookup_details_shared[cve] = {subkey:{"STATE":subval1, "NODES":subval2}}
        else:
            temp = cve_lookup_details_shared[cve]
            temp.update({subkey:{"STATE":subval1, "NODES":subval2}})
            cve_lookup_details_shared[cve] = temp
    
    tokens = tuple.strip().split(",")
    cve = tokens[0]
    func_name = tokens[1]
    git_src = tokens[2].lower() # All apt sources are in lower case
    apt_src = tokens[-1]

    potential_debs = []
    potential_debs = [ sanitize_name(deb) for deb in get_debs_from_git_src(git_src) ]
    
    if potential_debs == []:
        # If git source not found, fall back to apt source
        potential_debs = [ sanitize_name(deb) for deb in get_debs_from_git_src(apt_src) ]
    
    if potential_debs == []:
        update_shared_dict(cve,tuple,"GIT_SRC_ISSUE",[])
        return
        
    # Look up the exact function node using the function name (not key) and deb name
    func_nodes = func_lookup_exact(func_name)
    if func_nodes == []:
        update_shared_dict(cve,tuple,"FUNC_NOT_FOUND",[])
        return
    
    fin_nodes = []
    exe_fin_nodes = []
    for func_node in func_nodes:
        elf_node = exact_elf_lookup(func_node['lib'], func_node['deb'])
        if elf_node is None:
            update_shared_dict(cve,tuple,"NO_ELF",[])
            continue
            
        if sanitize_name(func_node['deb'].split("_")[0]) in potential_debs:
            if elf_node['type'] == 'SHARED':
                fin_nodes.append(func_node)
            else:
                exe_fin_nodes.append(func_node)

    if fin_nodes == [] and exe_fin_nodes != []:
        update_shared_dict(cve,tuple,"EXE",exe_fin_nodes)
    
    if fin_nodes == [] and exe_fin_nodes == []:
        update_shared_dict(cve,tuple,"FUNC_NOT_FOUND",[])

    if fin_nodes != [] and exe_fin_nodes == []:
        update_shared_dict(cve,tuple,"LIB",fin_nodes)
            
    if fin_nodes != [] and exe_fin_nodes != []:
        update_shared_dict(cve,tuple,"AMBI",fin_nodes + exe_fin_nodes)

In [None]:
pool = multiprocessing.Pool(initializer=get_arango_connection, processes=48)
pool.starmap(find_record_in_DB, zip(cve_funcs_git_src_tuples, itertools.repeat(cve_lookup_details_shared)))

In [None]:
cve_lookup_details = cve_lookup_details_shared._getvalue()

In [None]:
with open("data/cve_lookup_details.json", "w") as f:
    json.dump(cve_lookup_details, f, indent=4)

In [None]:
cve_func_node_found_tuples = []
cves_found = []
for cve, tuples in cve_lookup_details.items():
    for tuple, tuple_dict in tuples.items():
        if tuple_dict['STATE'] == 'LIB':
            if 'NODES' not in tuple_dict:
                print(tuple_dict)
                continue
            for func_node in tuple_dict['NODES']:
                cve_func_node_found_tuples.append(f"{cve},{func_node['deb']},{func_node['lib']},{func_node['_key']},FIXED")
                cves_found.append(cve)
cve_func_node_found_tuples = list(set(cve_func_node_found_tuples))
cves_found = list(set(cves_found))

In [None]:
len(cve_func_node_found_tuples)

In [None]:
len(cves_found)

In [None]:
cve_func_node_found_tuples

In [None]:
with open("data/cve_funcs_debs_elfs_found.csv", "w") as f:
    for tuple in cve_func_node_found_tuples:
        f.write(f'{tuple}\n')

In [None]:
len(cve_lookup_details.keys())

In [None]:
cve_func_found = []
cve_func_found_tuples = []
cve_func_not_found = []
cve_func_exe = []
cve_func_ambi = []
cve_func_no_git_src = []
cve_func_not_found_tuple = []
cve_elf_not_found = []
for cve, dets in cve_lookup_details.items():
    for tuple in dets:
        if dets[tuple]["STATE"] == "LIB":
            cve_func_found.append(cve)
            cve_func_found_tuples.append(tuple)
        if dets[tuple]["STATE"] == "AMBI":
            cve_func_ambi.append(cve)
        if dets[tuple]["STATE"] == "GIT_SRC_ISSUE":
            cve_func_no_git_src.append(cve)
        if dets[tuple]["STATE"] == "EXE":
            cve_func_exe.append(cve)
        if dets[tuple]["STATE"] == "NO_ELF":
            cve_elf_not_found.append(cve)
        if dets[tuple]["STATE"] == "FUNC_NOT_FOUND":
            cve_func_not_found.append(cve)
            cve_func_not_found_tuple.append(tuple)

cve_func_found = list(set(cve_func_found))
cve_func_not_found = list(set(cve_func_not_found))
cve_func_ambi = list(set(cve_func_ambi))
cve_func_no_git_src = list(set(cve_func_no_git_src))
cve_func_exe = list(set(cve_func_exe))
cve_elf_not_found = list(set(cve_elf_not_found))

for cve in cve_func_found + cve_func_ambi + cve_func_exe + cve_func_no_git_src:
    if cve in cve_func_not_found:
        cve_func_not_found.remove(cve)
cve_func_not_found_tuples_filtered = []
for tuple in cve_func_not_found_tuple:
    cve = tuple.split(",")[0]
    if cve in cve_func_not_found:
        cve_func_not_found_tuples_filtered.append(tuple)
cve_func_not_found_tuples_filtered = list(set(cve_func_not_found_tuples_filtered))

In [None]:
len(cve_func_found)

____________________________________________________________________________