In [1]:
import os
import json
import datetime
from tqdm.notebook import tqdm
from collections import defaultdict, deque

from cpgqls_client import CPGQLSClient

import mistralai

from mistralai import Mistral
from mistralai.models import UserMessage, AssistantMessage, SystemMessage

import nest_asyncio
nest_asyncio.apply()

In [None]:
repo_path = "..."
repo_name = "..."
api_key = "..."
context = True

In [3]:
def joern_output_to_dict(out):
    out = out['stdout']
    tq = '"""'
    start, end = out.find(tq) + len(tq), out.rfind(tq)
    if start < 0 or end < 0 or end <= start:
        raise "something went wrong"
    out = out[start : end]
    result = json.loads(out)
    
    return result

In [None]:
import json

server_endpoint = "localhost:8080"
basic_auth_credentials = ("username", "password")
joern_client = CPGQLSClient(server_endpoint, auth_credentials=basic_auth_credentials)

result = joern_client.execute('close')


def parse_joern_output(out):
    out = out['stdout']
    tq = '"""'
    start, end = out.find(tq) + len(tq), out.rfind(tq)
    if start < 0 or end < 0 or end <= start:
        raise Exception("JSON extraction failed")
    out = out[start : end]
    return json.loads(out)

query_func = """
    cpg.method
        .isExternal(false)
        .whereNot(_.name("<.*"))
        .map(m => (m.name,
                   m.code,
                   m.call.whereNot(_.name("<.*")).name.l,
                   m.parameter.name.l,
                   m.parameter.typeFullName.l,
                   m.ast.isReturn.code.l,
                   m.methodReturn.typeFullName,
                  )
            )
        .toJsonPretty
"""

query = f'importCode(inputPath="{repo_path}", projectName="{repo_name}")'
result = joern_client.execute(query)

funcs_result = joern_client.execute(query_func)
all_data = parse_joern_output(funcs_result)

data = {
    x['_1']: {
        'code': x['_2'],
        'calls': x['_3'],
        'arg_names': x['_4'],
        'arg_types': x['_5'],
        'return_expressions': x['_6'],
        'return_type': x['_7'],
    }
    for x in all_data
}

In [6]:
for x in data.values():
    x['calls'] = set(filter(lambda y: y in data, x['calls']))
    x['return_expressions'] = set(map(lambda s: s[7:-1], x['return_expressions']))
    
    if len(x['arg_names']) == 1 and x['arg_names'][0] == '':
        x['arg_names'] = []
        x['arg_types'] = []        

In [91]:
mistral_client = Mistral(api_key=api_key)
model = "codestral-latest"

In [7]:
def bfs_call_graph(root, depth=3):
    queue = deque([root])
    visited = set()
    code = []
    
    while len(queue) > 0:
        func_name = queue.popleft()
        if func_name in visited:
            continue
        visited.add(func_name)
        code.append(data[func_name]['code'])
        queue += data[func_name]['calls']
    
    return '\n\n'.join(code)

In [None]:
import json
import time

output_file = "llm_annotations.json"

try:
    with open(output_file, 'r') as f:
        results = json.load(f)
except FileNotFoundError:
    results = {}

processed = 0

for func_name, content in data.items():
    if func_name in results:
        continue
    
    time.sleep(5.0)
    
    if context:
        code = bfs_call_graph(func_name)[:1000]
    else:
        code = content['code']

    prompt = """You are C developer. Your task is to answer questions on a code.
    Which variables contain pointers to the memory allocated in function {func_name}, put the answer in "allocated_variables" field.
    Which variables contain pointers to the memory deallocated in function {func_name}, put the answer in "deallocated_variables" field. 
    Return the ultimate answer in short JSON object.
        
    # code
    {code}""".format(func_name=func_name, code=code)

    messages = [{"role": "user", "content": prompt}]
    chat_response = mistral_client.chat.complete(
        model=model,
        messages=messages,
        response_format={"type": "json_object"}
    )
    
    raw_output = chat_response.choices[0].message.content
    
    try:
        json_output = json.loads(raw_output)
        
        results[func_name] = {
            "allocated_variables": json_output.get("allocated_variables", []),
            "deallocated_variables": json_output.get("deallocated_variables", [])
        }
        
        if processed % 10 == 0:
            with open(output_file, 'w') as f:
                json.dump(results, f, indent=2)
        
        processed += 1
        
    except json.JSONDecodeError:
        continue

with open(output_file, 'w') as f:
    json.dump(results, f, indent=2)

In [93]:
import json

with open("llm_annotations.json", 'r') as f:
    llm_results = json.load(f)

for func_name in data:
    if func_name in llm_results:
        data[func_name]["allocated_variables"] = llm_results[func_name]["allocated_variables"]
        data[func_name]["deallocated_variables"] = llm_results[func_name]["deallocated_variables"]

annotations = {}
ann_alloc = "AllocSource::1"
ann_free = "FreeSink::1"
log_entries = []

for func_name, v in data.items():
    if "deallocated_variables" not in v:
        continue
    
    key = f"{func_name}({func_name})"
    annotations[key] = [[] for _ in range(len(v["arg_names"]) + 1)]
    
    for i, arg in enumerate(v["arg_names"]):
        if arg in v.get("deallocated_variables", []):
            annotations[key][i + 1] = [ann_free]
        if arg in v.get("allocated_variables", []):
            annotations[key][i + 1] = [ann_alloc]
    
    for arg in v.get("return_expressions", []):
        if arg in v.get("deallocated_variables", []):
            annotations[key][0] = [ann_free]
        if arg in v.get("allocated_variables", []):
            annotations[key][0] = [ann_alloc]
    
    log_entries.append({
        "function": func_name,
        "key": key,
        "annotations": annotations[key],
        "arg_names": v["arg_names"],
        "allocated": v.get("allocated_variables", []),
        "deallocated": v.get("deallocated_variables", [])
    })

with open("annotations.json", 'w') as f:
    json.dump(annotations, f, indent=2)

with open("annotation_log.json", 'w') as f:
    json.dump(log_entries, f, indent=2)

In [None]:
# Используем Joern для нахождения data-flow между парами аллокатор/деаллокатор

import json

with open("annotations.json", 'r') as f:
    annotations = json.load(f)

with open("annotation_log.json", 'r') as f:
    annotation_log = json.load(f)

functions_with_return = []
for entry in annotation_log:
    if entry.get("allocated") or entry.get("deallocated"):
        functions_with_return.append(entry["function"])

flow_results = []

for i, func_a in enumerate(functions_with_return):
    for j, func_b in enumerate(functions_with_return):
            
        ann_a = annotations.get(f"{func_a}({func_a})", [])
        ann_b = annotations.get(f"{func_b}({func_b})", [])
        
        is_a_source = False
        if ann_a and len(ann_a) > 0:
            if ann_a[0] and any("AllocSource" in a for a in ann_a[0]):
                is_a_source = True
        
        is_b_sink = False
        if ann_b and len(ann_b) > 1:
            for ann in ann_b[1:]:
                if ann and any("FreeSink" in a for a in ann):
                    is_b_sink = True
                    break
        
        if is_a_source and is_b_sink:
            query = f'''
def source = cpg.method.name("{func_a}").methodReturn
def sink = cpg.method.name("{func_b}").parameter

sink.reachableByFlows(source).p
'''
            
            result = joern_client.execute(query)
            
            if result.get('success') and result.get('stdout', '').strip():
                stdout = result['stdout'].strip()
                if stdout and '┌──' in stdout:
                    flow_results.append({
                        "allocator": func_a,
                        "deallocator": func_b,
                        "query": query,
                        "has_flow": True
                    })

with open("alloc_free_flows.json", 'w') as f:
    json.dump(flow_results, f, indent=2)

In [159]:
import json
from collections import deque

with open("annotations.json", 'r') as f:
    annotations = json.load(f)

with open("annotation_log.json", 'r') as f:
    annotation_log = json.load(f)

def get_subgraph_functions(root, max_depth=3):
    queue = deque([(root, 0)])
    visited = set()
    subgraph = set()
    
    while queue:
        current_func, depth = queue.popleft()
        if current_func in visited or depth > max_depth:
            continue
        visited.add(current_func)
        subgraph.add(current_func)
        if current_func in data and 'calls' in data[current_func]:
            for call in data[current_func]['calls']:
                if call in data:
                    queue.append((call, depth + 1))
    return subgraph

alloc_functions = []
free_functions = []

for entry in annotation_log:
    func_name = entry["function"]
    anns = entry["annotations"]
    if anns and len(anns) > 0:
        if anns[0] and any("AllocSource" in a for a in anns[0]):
            alloc_functions.append(func_name)
    for ann in anns[1:]:
        if ann and any("FreeSink" in a for a in ann):
            free_functions.append(func_name)
            break

target_functions = alloc_functions + free_functions

functions_with_flows = []

for func_name in data:
    subgraph = get_subgraph_functions(func_name, max_depth=4)
    
    has_alloc = any(alloc in subgraph for alloc in alloc_functions)
    has_free = any(free in subgraph for free in free_functions)
    
    if not (has_alloc and has_free):
        continue
    
    for alloc in alloc_functions:
        if alloc not in subgraph:
            continue
        for free in free_functions:
            if free not in subgraph:
                continue
            
            query = f'''
def source = cpg.method("{func_name}").call("{alloc}")
def sink = cpg.method("{func_name}").call("{free}").argument
sink.reachableByFlows(source).l
'''
            
            result = joern_client.execute(query)
            
            if result.get('success') and result.get('stdout', '').strip():
                stdout = result['stdout'].strip()
                if 'Path(' in stdout:
                    functions_with_flows.append({
                        "function": func_name,
                        "allocator": alloc,
                        "deallocator": free,
                        "has_flow": True
                    })

output = []
for item in functions_with_flows:
    output.append(f"Function: {item['function']}")
    output.append(f"  Allocator: {item['allocator']}")
    output.append(f"  Deallocator: {item['deallocator']}")
    output.append("")

with open("functions_with_flows.json", 'w') as f:
    json.dump(functions_with_flows, f, indent=2)

In [122]:
import json

with open("annotation_log.json", 'r') as f:
    annotation_log = json.load(f)

alloc_functions = []
free_functions = []

for entry in annotation_log:
    func_name = entry["function"]
    anns = entry["annotations"]
    
    if anns and len(anns) > 0:
        if anns[0] and any("AllocSource" in a for a in anns[0]):
            alloc_functions.append(func_name)
    
    for i, ann in enumerate(anns[1:], 1):
        if ann and any("FreeSink" in a for a in ann):
            free_functions.append(func_name)
            break

with open("allocators.txt", 'w') as f:
    for func in sorted(alloc_functions):
        f.write(func + "\n")

with open("deallocators.txt", 'w') as f:
    for func in sorted(free_functions):
        f.write(func + "\n")

In [None]:
# Спрашиваем модель считает ли она что найденная пара alloc/dealloc действительно является таковой

import json
import time
from collections import deque

def bfs_call_graph(root, max_depth=3):
    queue = deque([(root, 0)])
    visited = set()
    functions = []
    
    while queue:
        func_name, depth = queue.popleft()
        if func_name in visited or depth > max_depth:
            continue
        visited.add(func_name)
        functions.append(func_name)
        
        if func_name in data and 'calls' in data[func_name]:
            for call in data[func_name]['calls']:
                if call in data:
                    queue.append((call, depth + 1))
    
    code_parts = []
    for func in functions:
        if func in data:
            code_parts.append(f"// Function: {func}\n{data[func]['code'][:1000]}")
    
    return '\n\n'.join(code_parts)

with open("functions_with_flows.json", 'r') as f:
    found_pairs = json.load(f)

output_file = "llm_dataflow_analysis.json"

try:
    with open(output_file, 'r') as f:
        validated_results = json.load(f)
except FileNotFoundError:
    validated_results = {}

processed = 0
seen_pairs = set()

for pair in found_pairs:
    func_name = pair["function"]
    allocator = pair["allocator"]
    deallocator = pair["deallocator"]
    
    pair_key = f"{allocator}|{deallocator}"
    if pair_key in seen_pairs:
        continue
    
    seen_pairs.add(pair_key)
    query = f'''
def source = cpg.method("{func_name}").call("{allocator}")
def sink = cpg.method("{func_name}").call("{deallocator}").argument
sink.reachableByFlows(source).p
'''
    
    result = joern_client.execute(query)
    stdout = result['stdout'].strip()
    
    if not stdout or '┌──' not in stdout:
        continue

    flow_path = stdout
    if '"""' in stdout:
        flow_path = stdout.split('"""')[1].strip()
    
    context_code = bfs_call_graph(func_name, max_depth=3)
    
    prompt = f"""Analyze this DATA FLOW path in C code.

CONTEXT (call graph, depth <= 3):
{context_code[:4000]}

DATA FLOW PATH found by static analysis:
{flow_path[:2000]}

Question: Does this data flow represent an IDEOLOGICAL allocation-deallocation pair?
- {allocator} allocates memory
- Memory flows through variables: {flow_path.split('│')[1:3] if '│' in flow_path else 'N/A'}
- {deallocator} receives that memory as argument

Is this a meaningful pair (like malloc/free) or just coincidental?
Return JSON: {{"is_ideological_pair": true/false, "confidence": 0-100, "dataflow_analysis": "analyze the actual flow path"}}
"""
    
    messages = [{"role": "user", "content": prompt}]
    chat_response = mistral_client.chat.complete(
        model=model,
        messages=messages,
        response_format={"type": "json_object"}
    )
    
    raw_output = chat_response.choices[0].message.content
    
    try:
        json_output = json.loads(raw_output)
        
        validated_results[pair_key] = {
            "allocator": allocator,
            "deallocator": deallocator,
            "is_ideological_pair": json_output.get("is_ideological_pair", False),
            "confidence": json_output.get("confidence", 0),
            "dataflow_analysis": json_output.get("dataflow_analysis", ""),
            "example_function": func_name,
            "flow_path_preview": flow_path[:500]
        }
        
        if processed % 3 == 0:
            with open(output_file, 'w') as f:
                json.dump(validated_results, f, indent=2)
        
        processed += 1
        time.sleep(6.0)
        
    except json.JSONDecodeError:
        continue

with open(output_file, 'w') as f:
    json.dump(validated_results, f, indent=2)

ideological_pairs = []
for result in validated_results.values():
    if result.get("is_ideological_pair", False) and result.get("confidence", 0) > 75:
        ideological_pairs.append(result)

```

Expected result

cJSON_New_Item -> cJSON_Delete
cJSON_CreateString -> cJSON_Delete
cJSON_CreateArray -> cJSON_Delete
cJSON_CreateObject -> cJSON_Delete
cJSON_CreateRaw -> cJSON_Delete
cJSON_strdup -> cJSON_free
cJSON_malloc -> cJSON_free
cJSON_ParseWithLengthOpts -> cJSON_Delete
cJSON_Duplicate -> cJSON_Delete

```