In [4]:

# Requires: pip install openai python-dotenv
import os, time, re, json
from typing import Optional
from dotenv import load_dotenv

load_dotenv()

try:
    from openai import OpenAI
except Exception as e:
    raise RuntimeError("Install the OpenAI SDK first: pip install openai") from e

DEEPSEEK_API_KEY = os.environ.get("DEEPSEEK_API_KEY", "")
DEEPSEEK_BASE_URL = os.environ.get("DEEPSEEK_BASE_URL", "https://api.deepseek.com")
DEEPSEEK_MODEL = os.environ.get("DEEPSEEK_MODEL", "deepseek-chat")  # or your preferred

if not DEEPSEEK_API_KEY:
    raise RuntimeError("Please set DEEPSEEK_API_KEY in your environment (or .env).")

client = OpenAI(api_key=DEEPSEEK_API_KEY, base_url=DEEPSEEK_BASE_URL)

def _chat_once(prefix: str, suffix: str, temperature=0.8, top_p=0.95, max_tokens=128) -> str:
    """
    Sends a single chat completion asking for ONLY the Solidity predicate (no require(...);).
    Uses a FIM-like instruction by giving prefix/suffix and a <FILL_ME> placeholder.
    """
    prompt = (
        "You are filling a single-line Solidity predicate to go inside require(...).\n"
        "Given a source split into PREFIX and SUFFIX around a placeholder `require(<FILL_ME>);`, "
        "produce ONLY the predicate expression for the require (no semicolon, no backticks, no commentary).\n"
        "Examples:\n"
        "  - Output like: msg.sender == owner\n"
        "  - NOT like: require(msg.sender == owner);\n"
        "  - NOT like: ```solidity ... ```\n"
        "\n"
        "PREFIX:\n"
        "-----\n"
        f"{prefix}\n"
        "-----\n"
        "SUFFIX:\n"
        "-----\n"
        f"{suffix}\n"
        "-----\n"
    )
    resp = client.chat.completions.create(
        model=DEEPSEEK_MODEL,
        messages=[
            {"role": "system", "content": "Return only the Solidity predicate expression."},
            {"role": "user", "content": prompt},
        ],
        temperature=temperature,
        top_p=top_p,
        max_tokens=max_tokens,
        stop=None,
    )
    return (resp.choices[0].message.content or "").strip()

def _extract_predicate(text: str) -> str:
    """
    Normalize model output to a single-line predicate:
    - Strip code fences/backticks/semicolons
    - If it accidentally returned require(...), extract the (...)
    """
    t = text.strip()
    t = re.sub(r"^```[a-zA-Z]*\s*", "", t).strip()
    t = re.sub(r"```$", "", t).strip()
    # require(...) wrapper?
    m = re.search(r"require\s*\(\s*(.+?)\s*\)\s*;?\s*$", t, flags=re.S)
    if m:
        return m.group(1).strip()
    # strip trailing semicolon if present
    t = re.sub(r";+$", "", t).strip()
    # collapse lines
    t = " ".join(t.splitlines()).strip()
    return t

def deepseek_generate_fim(prefix: str, suffix: str) -> str:
    # lightweight retry/backoff
    delays = [0, 1, 2, 4]
    for d in delays:
        if d: time.sleep(d)
        try:
            raw = _chat_once(prefix, suffix)
            return _extract_predicate(raw)
        except Exception:
            if d == delays[-1]:
                raise
            continue


In [5]:
# %% Injection utilities: build masked prefix/suffix at a location (single-turn)
import validation_library as veri

def _insert_mask_at_line(src: str, line_no_1based: int, indent_fallback="    "):
    """
    Insert `require(<FILL_ME>);` BEFORE 1-based line_no_1based.
    Returns (prefix, suffix, indent_str).
    """
    lines = src.splitlines()
    idx = max(0, min(len(lines), line_no_1based - 1))
    # Determine indentation: use target line's indentation if exists
    if 0 <= idx < len(lines):
        m = re.match(r"^(\s*)", lines[idx])
        indent = m.group(1) if m else indent_fallback
    else:
        indent = indent_fallback
    masked = f"{indent}require(<FILL_ME>);"
    prefix = "\n".join(lines[:idx] + [masked])
    suffix = "\n".join(lines[idx:])
    return prefix, suffix, indent

def _insert_mask_at_function_entry(src: str, vl_line_1based: int, indent_fallback="    "):
    """
    Insert mask as the first statement of the function that contains vl_line_1based.
    Uses veri.find_function_bounds to resolve the function block.
    """
    pre_line_0based, post_line_0based = veri.find_function_bounds(src, vl_line_1based)  # returns 0-based
    # we want to insert just after the opening brace -> typical pre = opening brace line
    insert_at = pre_line_0based + 1  # 1-based
    return _insert_mask_at_line(src, insert_at, indent_fallback=indent_fallback)

def _insert_mask_at_function_exit(src: str, vl_line_1based: int, indent_fallback="    "):
    """
    Insert mask just before the closing brace of the function that contains vl_line_1based.
    """
    pre_line_0based, post_line_0based = veri.find_function_bounds(src, vl_line_1based)
    insert_at = post_line_0based + 1  # convert to 1-based
    return _insert_mask_at_line(src, insert_at, indent_fallback=indent_fallback)


In [6]:
# %% Core: single-turn synthesizer for {V, alpha, omega}
def synthesize_require_statement(
    contract_text: str,
    line_no: int,               # VL reference line (1-based)
    injection: str = "V",       # "V" (target line), "alpha" (function entry), "omega" (function exit)
    indent_fallback: str = "    ",
    extra_hint: Optional[str] = None,
) -> str:
    """
    Returns ONLY the predicate (no semicolon). We will wrap as require({pred});
    """
    if injection == "V":
        prefix, suffix, _ = _insert_mask_at_line(contract_text, line_no, indent_fallback)
    elif injection == "alpha":
        prefix, suffix, _ = _insert_mask_at_function_entry(contract_text, line_no, indent_fallback)
    elif injection == "omega":
        prefix, suffix, _ = _insert_mask_at_function_exit(contract_text, line_no, indent_fallback)
    else:
        raise ValueError("injection must be one of: 'V', 'alpha', 'omega'")

    if extra_hint:
        prefix = f"// Hint: {extra_hint}\n" + prefix

    predicate = deepseek_generate_fim(prefix, suffix)
    return predicate


In [10]:
# %% Seed loader (auto from your aggregated VL artifact)
# This replaces the previous seed_contracts-based cell.

import os, json
from collections import defaultdict

# We’ll auto-pick one of these paths unless SEED_VL_JSON is set.
CANDIDATES = [
    os.environ.get("SEED_VL_JSON", "").strip(),
    "reports/aggregated/DeepSeek/contract_VL.json",
    "reports/aggregated/DeepSeek/contracts_VL.json",
    "reports/aggregated/contract_VL.json",
]

seed_path = next((p for p in CANDIDATES if p and os.path.exists(p)), None)
if not seed_path:
    raise RuntimeError(
        "Could not find a VL seed JSON. "
        "Set SEED_VL_JSON to your file or generate reports/aggregated/DeepSeek/contract_VL.json first."
    )

with open(seed_path, "r", encoding="utf-8") as f:
    raw = json.load(f)

# raw is expected to be: { name: [ [contract_text, { 'VL':[line, guard], 'pre':[line, ''], 'post':[line, ''] } ], ... ] }
# (Tuples become lists in JSON; guards may be empty strings.)
contracts_with_results = defaultdict(list)

def _read_line_pair(x):
    # Accept [line, guard] or just line
    if isinstance(x, list) and x:
        return int(x[0]), (x[1] if len(x) > 1 else "")
    if isinstance(x, int):
        return int(x), ""
    return None, ""

loaded = 0
for name, entries in raw.items():
    for entry in entries:
        # entry may be [contract_text, annotations] or a dict with same fields
        if isinstance(entry, list) and len(entry) == 2:
            contract_text, ann = entry
        elif isinstance(entry, dict):
            contract_text = entry.get("contract") or entry.get("contract_text") or entry.get("0")
            ann = entry.get("annotations") or entry.get("1") or {k: entry.get(k) for k in ("VL","pre","post")}
        else:
            continue

        if not isinstance(contract_text, str) or not contract_text.strip():
            continue

        vl_line, vl_guard = _read_line_pair((ann or {}).get("VL"))
        pre_line, pre_guard = _read_line_pair((ann or {}).get("pre"))
        post_line, post_guard = _read_line_pair((ann or {}).get("post"))

        # Fallback: recompute pre/post lines if missing using your helper
        if not isinstance(vl_line, int) or vl_line <= 0:
            continue
        try:
            pre0, post0 = veri.find_function_bounds(contract_text, vl_line)   # 0-based
            if not isinstance(pre_line, int) or pre_line <= 0:
                pre_line = pre0 + 1        # store function-start (brace) line (0-based+1)
                # We actually insert at first statement; later logic uses VL+injection to compute exact spot.
            if not isinstance(post_line, int) or post_line <= 0:
                post_line = post0 + 1      # closing brace line (0-based+1)
        except Exception:
            continue

        contracts_with_results[name].append((
            contract_text,
            {
                "VL":  (vl_line, vl_guard or ""),
                "pre": (pre_line, pre_guard or ""),
                "post":(post_line, post_guard or ""),
            }
        ))
        loaded += 1

print(f"[seed] Loaded {loaded} contract entries from {seed_path}")
len(contracts_with_results)


[seed] Loaded 170 contract entries from reports/aggregated/DeepSeek/contract_VL.json


130

In [11]:
# %% Output directory for SINGLE-TURN results
os.makedirs("reports/isolated/DeepSeek", exist_ok=True)


In [12]:
# %% SINGLE-TURN: Vulnerable-line (V) inference (original contract only)
from tqdm import tqdm

all_contracts = []
mapping = []
results_V = []

for contract_name, entries in tqdm(contracts_with_results.items(), desc="VL single-turn"):
    for contract, annotations in entries:
        vl_line, _ = annotations["VL"]
        # SINGLE-TURN: use ORIGINAL contract (no prior injections)
        pred = synthesize_require_statement(contract, line_no=vl_line, injection="V")
        results_V.append(pred)
        all_contracts.append(contract)
        mapping.append((contract_name, contract, vl_line))

# write back to annotations["VL"]
results_V += [''] * (len(all_contracts) - len(results_V))
flat_entries = [(cn, entry) for cn, es in contracts_with_results.items() for entry in es]

for (generated, (contract_name, contract, vl_line)), (_, (existing_contract, annotations)) in \
        zip(zip(results_V, mapping), flat_entries):
    annotations["VL"] = (vl_line, f"require({generated});")

# Minimal VL-only drop (single-turn)
import validation_library as veri
veri.print_json_report("reports/isolated/DeepSeek/contract_VL_single.json", contracts_with_results)


VL single-turn:   0%|          | 0/130 [00:00<?, ?it/s]

VL single-turn: 100%|██████████| 130/130 [10:31<00:00,  4.86s/it]


In [13]:
# %% SINGLE-TURN: PRE (alpha) inference (original contract only)
from tqdm import tqdm

all_contracts = []
mapping = []
results_alpha = []

for contract_name, entries in tqdm(contracts_with_results.items(), desc="PRE single-turn"):
    for contract, annotations in entries:
        vl_line, _ = annotations["VL"]
        pre_line, _ = annotations["pre"]
        pred = synthesize_require_statement(contract, line_no=vl_line, injection="alpha")
        results_alpha.append(pred)
        all_contracts.append(contract)
        mapping.append((contract_name, contract, pre_line))

# write back to annotations["pre"]
results_alpha += [''] * (len(all_contracts) - len(results_alpha))
flat_entries = [(cn, entry) for cn, es in contracts_with_results.items() for entry in es]

for (generated, (contract_name, contract, line)), (_, (existing_contract, annotations)) in \
        zip(zip(results_alpha, mapping), flat_entries):
    line, _ = annotations["pre"]
    annotations["pre"] = (line, f"require({generated});")

veri.print_json_report("reports/isolated/DeepSeek/contract_PV_single.json", contracts_with_results)


PRE single-turn: 100%|██████████| 130/130 [10:24<00:00,  4.81s/it]


In [14]:
# %% SINGLE-TURN: POST (omega) inference (original contract only)
from tqdm import tqdm

all_contracts = []
mapping = []
results_omega = []

for contract_name, entries in tqdm(contracts_with_results.items(), desc="POST single-turn"):
    for contract, annotations in entries:
        vl_line, _ = annotations["VL"]
        post_line, _ = annotations["post"]
        pred = synthesize_require_statement(contract, line_no=vl_line, injection="omega")
        results_omega.append(pred)
        all_contracts.append(contract)
        mapping.append((contract_name, contract, post_line))

# write back to annotations["post"]
results_omega += [''] * (len(all_contracts) - len(results_omega))
flat_entries = [(cn, entry) for cn, es in contracts_with_results.items() for entry in es]

for (generated, (contract_name, contract, line)), (_, (existing_contract, annotations)) in \
        zip(zip(results_omega, mapping), flat_entries):
    annotations["post"] = (line, f"require({generated});")

veri.print_json_report("reports/isolated/DeepSeek/contracts_PVP_single.json", contracts_with_results)


POST single-turn: 100%|██████████| 130/130 [10:23<00:00,  4.79s/it]


In [None]:
# This just shows the three locations we populated for the first item.
one_key = next(iter(contracts_with_results.keys()))
first_contract, ann = contracts_with_results[one_key][0]
ann

{'VL': (11, 'require(max > 0);'),
 'pre': (11, 'require(salt == 0);'),
 'post': (26, 'require(msg.sender == address(this));')}