In [17]:
import json,re
from pathlib import Path

In [36]:
import json
import re

input_name = "vul_alpaca_train_data"
input_path = f"{input_name}.json"
output_path = f"cleaned_functioned_{input_name}.json"
skipped_output_path = f"skipped_info_{input_name}.json"


def extract_functions_with_lines(code_lines):
    """Extract Solidity functions, constructors, and modifiers with accurate line ranges."""
    functions = []
    current_function = []
    inside_block = False
    brace_count = 0
    start_line = None
    seen_open_brace = False
    contract_name = None

    # Pre-scan to find contract name for constructor detection
    for line in code_lines:
        if ':' not in line:
            continue
        _, content = line.split(':', 1)
        match = re.search(r'contract\s+(\w+)', content)
        if match:
            contract_name = match.group(1)
            break

    for line in code_lines:
        if ':' not in line:
            continue

        line_num, content = line.split(':', 1)
        line_num = int(line_num.strip())
        content = content.rstrip()

        is_function = re.search(r'\bfunction\b', content)
        is_constructor = contract_name and re.search(rf'\b{contract_name}\s*\(', content)
        is_modifier = re.search(r'\bmodifier\b', content)

        if (is_function or is_constructor or is_modifier) and not inside_block:
            inside_block = True
            current_function = [(line_num, content)]
            start_line = line_num
            brace_count = content.count('{') - content.count('}')
            seen_open_brace = content.count('{') > 0
            continue

        if inside_block:
            current_function.append((line_num, content))
            brace_count += content.count('{') - content.count('}')
            if content.count('{') > 0:
                seen_open_brace = True
            if seen_open_brace and brace_count == 0:
                functions.append((start_line, line_num, current_function))
                inside_block = False
                seen_open_brace = False

    return functions


def map_vulnerability_to_function(entry, skipped_info, vuln_counter):
    instruction = entry["instruction"]
    input_lines = entry["input"].splitlines()
    base_prompt = input_lines[0]
    code_lines = input_lines[1:]

    try:
        output_data = json.loads(entry["output"]) if isinstance(entry["output"], str) else entry["output"]
    except Exception as e:
        skipped_info.append({
            "reason": f"Failed to parse output JSON: {str(e)}",
            "entry": entry
        })
        return []

    vulnerabilities = output_data.get("vulnerabilities", []) if isinstance(output_data, dict) else output_data
    if not vulnerabilities:
        skipped_info.append({
            "reason": "No vulnerabilities listed in output",
            "entry": entry
        })
        return []

    functions = extract_functions_with_lines(code_lines)
    result_entries = []

    for vuln in vulnerabilities:
        vuln_line_range = vuln.get("vulnerableLines", "")
        try:
            vuln_from, vuln_to = map(int, vuln_line_range.split("-"))
        except:
            skipped_info.append({
                "reason": "Malformed vulnerableLines",
                "vulnerableLines": vuln_line_range,
                "entry": entry
            })
            vuln_counter[0] += 1
            continue

        matched = False
        for f_start, f_end, f_lines in functions:
            if any(f_start <= i <= f_end for i in range(vuln_from, vuln_to + 1)):
                matched = True
                break

        if not matched:
            for f_start, f_end, f_lines in functions:
                if f_start > vuln_to:
                    matched = True
                    break

        if not matched:
            skipped_info.append({
                "reason": "No matching or adjacent function found for vulnerable line",
                "vulnerableLines": vuln.get("vulnerableLines", ""),
                "vulnerableCode": vuln.get("vulnerableCode", []),
                "entry": entry
            })
            vuln_counter[0] += 1
            continue

        # Re-number lines and adjust vulnerableLines within function
        numbered_func = [f"{i+1}: {line}" for i, (_, line) in enumerate(f_lines)]
        original_lines = [line.strip() for line in vuln["vulnerableCode"]]
        match_start = None

        for idx in range(len(f_lines)):
            slice_lines = [line.strip() for _, line in f_lines[idx:idx + len(original_lines)]]
            if slice_lines == original_lines:
                match_start = idx + 1
                break

        if match_start:
            vuln_lines = f"{match_start}-{match_start + len(original_lines) - 1}"
        else:
            vuln_lines = vuln["vulnerableLines"]

        result_entries.append({
            "instruction": instruction,
            "input": base_prompt + "\n" + "\n".join(numbered_func),
            "output": {
                "vulnerabilities": [
                    {
                        "vulnerableLines": vuln_lines,
                        "vulnerableCode": vuln["vulnerableCode"],
                        "vulnerabilityReason": vuln["vulnerabilityReason"],
                        "potentialSecurityRisk": vuln["potentialSecurityRisk"],
                        "fixedCode": vuln["fixedCode"]
                    }
                ]
            }
        })

    return result_entries


def main():
    with open(input_path, "r", encoding="utf-8") as f:
        dataset = json.load(f)

    processed_data = []
    skipped_info = []
    skipped_contracts = 0
    skipped_vulnerabilities = [0]  # Mutable container

    for entry in dataset:
        results = map_vulnerability_to_function(entry, skipped_info, skipped_vulnerabilities)
        processed_data.extend(results)
        if not results:
            skipped_contracts += 1

    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(processed_data, f, indent=2)

    with open(skipped_output_path, "w", encoding="utf-8") as f:
        json.dump(skipped_info, f, indent=2)

    print(f" Total entries processed         : {len(dataset)}")
    print(f" Total entries written          : {len(processed_data)}")
    print(f" Total contracts skipped        : {skipped_contracts}")
    print(f"  Total vulnerabilities skipped : {skipped_vulnerabilities[0]} (see {skipped_output_path})")


if __name__ == "__main__":
    main()


 Total entries processed         : 949
 Total entries written          : 4817
 Total contracts skipped        : 3
  Total vulnerabilities skipped : 52 (see skipped_info_vul_alpaca_train_data.json)


In [51]:
json_file = "alpaca_val_smart.json"
with open(json_file, "r", encoding="utf-8") as f:
    dataset = json.load(f)

functions = extract_functions_with_lines(dataset[0].get("input").split("\n"))
# print(dataset[0].get("input"))

In [53]:
functions[0]

(39,
 46,
 [(39, '     function release() public {'),
  (40, '         uint256 unreleased = releasableAmount();'),
  (41, '         require(unreleased > 0);'),
  (42, ''),
  (43, '         released = released.add(unreleased);'),
  (44, ''),
  (45, '         token.safeTransfer(pgoWallet, unreleased);'),
  (46, '     }')])

In [42]:
# import json

# cleaned_file_path = "cleaned_functioned_vul_alpaca_valid_data.json" 
# search_keywords = ["function ownerRefundPlayer(bytes32 originalPlayerBetId, address sendTo,"] 

# def search_contracts(file_path, keywords):
#     with open(file_path, "r", encoding="utf-8") as f:
#         data = json.load(f)

#     matching_entries = []

#     for i, entry in enumerate(data):
#         code = entry.get("input", "")
#         if any(keyword in code for keyword in keywords):
#             matching_entries.append((i, entry["input"].splitlines()[1:10]))  # preview only

#     print(f"\n Total matching entries: {len(matching_entries)}\n")
#     for idx, preview in matching_entries[:3]:  # Show only first 3
#         print(f" Match at index {idx}:\n" + "\n".join(preview) + "\n---")

#     return matching_entries

# matches = search_contracts(cleaned_file_path, search_keywords)






In [1]:
# import json
# import re
# from pathlib import Path

# input_dir = "vul_alpaca_test_data.json"
# file_path = Path(input_dir)
# output_path = file_path.with_name("vul_alpaca_test_data_split_by_function.json")

# def extract_function_by_code(code_lines, vulnerable_code_snippet):
#     """
#     Extracts the function block that contains the vulnerable code.
#     `code_lines` should be a list of "line_number: code" strings.
#     `vulnerable_code_snippet` is a list of lines from "vulnerableCode".
#     """
#     functions = []
#     current_function = []
#     inside_function = False
#     brace_count = 0

#     for line in code_lines:
#         if ':' not in line:
#             continue
#         line_num_str, content = line.split(':', 1)

#         if re.search(r'\bfunction\b', content) and not inside_function:
#             inside_function = True
#             brace_count = content.count('{') - content.count('}')
#             current_function = [line]
#         elif inside_function:
#             brace_count += content.count('{') - content.count('}')
#             current_function.append(line)
#             if brace_count == 0:
#                 functions.append(current_function)
#                 inside_function = False

#     # Try to find which function contains any line of the vulnerable code
#     for func in functions:
#         func_text = "\n".join(func)
#         if any(vuln_line.strip() in func_text for vuln_line in vulnerable_code_snippet):
#             return func_text
#     return None

# # === Load JSON Data ===
# with open(file_path, "r", encoding="utf-8") as f:
#     dataset = json.load(f)

# new_records = []

# # === Process Each Record ===
# for record in dataset:
#     instruction = record.get("instruction", "")
#     input_code_lines = record.get("input", "").splitlines()
#     output_data = json.loads(record["output"]) if isinstance(record["output"], str) else record["output"]
#     vulnerabilities = output_data if isinstance(output_data, list) else output_data.get("vulnerabilities", [])

#     for vuln in vulnerabilities:
#         vuln_code_lines = vuln.get("vulnerableCode", [])
#         extracted_func = extract_function_by_code(input_code_lines, vuln_code_lines)
#         if extracted_func:
#             new_record = {
#                 "instruction": instruction,
#                 "input": extracted_func,
#                 "output": json.dumps({"vulnerabilities": [vuln]}, indent=2)
#             }
#             new_records.append(new_record)

# # === Save New Records to File ===
# with open(output_path, "w", encoding="utf-8") as f:
#     json.dump(new_records, f, indent=2)

# print(f"✅ Done! Saved split data to: {output_path}")


✅ Done! Saved split data to: vul_alpaca_test_data_split_by_function.json


In [19]:
pwd

'C:\\Users\\erfan\\Downloads\\dataset_llm\\Smart_LLaMA\\datasets\\Project_2_Dataset\\src\\fine-tuning\\hugging-face\\refineData\\corrected_v1'

In [None]:
import json
import re

input_name = "vul_alpaca_train_data"
input_path = f"{input_name}.json"
output_path = f"cleaned_functioned_{input_name}.json"
skipped_output_path = f"skipped_info_{input_name}.json"


def extract_functions_with_lines(code_lines):
    """Extract Solidity functions, constructors, and modifiers with accurate line ranges."""
    functions = []
    current_function = []
    inside_block = False
    brace_count = 0
    start_line = None
    seen_open_brace = False
    contract_name = None

    # Pre-scan to find contract name for constructor detection
    for line in code_lines:
        if ':' not in line:
            continue
        _, content = line.split(':', 1)
        match = re.search(r'contract\s+(\w+)', content)
        if match:
            contract_name = match.group(1)
            break

    for line in code_lines:
        if ':' not in line:
            continue

        line_num, content = line.split(':', 1)
        line_num = int(line_num.strip())
        content = content.rstrip()

        is_function = re.search(r'\bfunction\b', content)
        is_constructor = contract_name and re.search(rf'\b{contract_name}\s*\(', content)
        is_modifier = re.search(r'\bmodifier\b', content)

        if (is_function or is_constructor or is_modifier) and not inside_block:
            inside_block = True
            current_function = [(line_num, content)]
            start_line = line_num
            brace_count = content.count('{') - content.count('}')
            seen_open_brace = content.count('{') > 0
            continue

        if inside_block:
            current_function.append((line_num, content))
            brace_count += content.count('{') - content.count('}')
            if content.count('{') > 0:
                seen_open_brace = True
            if seen_open_brace and brace_count == 0:
                functions.append((start_line, line_num, current_function))
                inside_block = False
                seen_open_brace = False

    return functions


def map_vulnerability_to_function(entry, skipped_info, vuln_counter):
    instruction = entry["instruction"]
    input_lines = entry["input"].splitlines()
    base_prompt = input_lines[0]
    code_lines = input_lines[1:]

    try:
        output_data = json.loads(entry["output"]) if isinstance(entry["output"], str) else entry["output"]
    except Exception as e:
        skipped_info.append({
            "reason": f"Failed to parse output JSON: {str(e)}",
            "entry": entry
        })
        return []

    vulnerabilities = output_data.get("vulnerabilities", []) if isinstance(output_data, dict) else output_data
    if not vulnerabilities:
        skipped_info.append({
            "reason": "No vulnerabilities listed in output",
            "entry": entry
        })
        return []

    functions = extract_functions_with_lines(code_lines)
    result_entries = []

    for vuln in vulnerabilities:
        vuln_line_range = vuln.get("vulnerableLines", "")
        try:
            vuln_from, vuln_to = map(int, vuln_line_range.split("-"))
        except:
            skipped_info.append({
                "reason": "Malformed vulnerableLines",
                "vulnerableLines": vuln_line_range,
                "entry": entry
            })
            vuln_counter[0] += 1
            continue

        matched = False
        for f_start, f_end, f_lines in functions:
            if any(f_start <= i <= f_end for i in range(vuln_from, vuln_to + 1)):
                matched = True
                break

        if not matched:
            for f_start, f_end, f_lines in functions:
                if f_start > vuln_to:
                    matched = True
                    break

        if not matched:
            skipped_info.append({
                "reason": "No matching or adjacent function found for vulnerable line",
                "vulnerableLines": vuln.get("vulnerableLines", ""),
                "vulnerableCode": vuln.get("vulnerableCode", []),
                "entry": entry
            })
            vuln_counter[0] += 1
            continue

        # Re-number lines and adjust vulnerableLines within function
        numbered_func = [f"{i+1}: {line}" for i, (_, line) in enumerate(f_lines)]
        original_lines = [line.strip() for line in vuln["vulnerableCode"]]
        match_start = None

        for idx in range(len(f_lines)):
            slice_lines = [line.strip() for _, line in f_lines[idx:idx + len(original_lines)]]
            if slice_lines == original_lines:
                match_start = idx + 1
                break

        if match_start:
            vuln_lines = f"{match_start}-{match_start + len(original_lines) - 1}"
        else:
            vuln_lines = vuln["vulnerableLines"]

        result_entries.append({
            "instruction": instruction,
            "input": base_prompt + "\n" + "\n".join(numbered_func),
            "output": {
                "vulnerabilities": [
                    {
                        "vulnerableLines": vuln_lines,
                        "vulnerableCode": vuln["vulnerableCode"],
                        "vulnerabilityReason": vuln["vulnerabilityReason"],
                        "potentialSecurityRisk": vuln["potentialSecurityRisk"],
                        "fixedCode": vuln["fixedCode"]
                    }
                ]
            }
        })

    return result_entries


def main():
    with open(input_path, "r", encoding="utf-8") as f:
        dataset = json.load(f)

    processed_data = []
    skipped_info = []
    skipped_contracts = 0
    skipped_vulnerabilities = [0]  # Mutable container

    for entry in dataset:
        results = map_vulnerability_to_function(entry, skipped_info, skipped_vulnerabilities)
        processed_data.extend(results)
        if not results:
            skipped_contracts += 1

    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(processed_data, f, indent=2)

    with open(skipped_output_path, "w", encoding="utf-8") as f:
        json.dump(skipped_info, f, indent=2)

    print(f" Total entries processed         : {len(dataset)}")
    print(f" Total entries written          : {len(processed_data)}")
    print(f" Total contracts skipped        : {skipped_contracts}")
    print(f"  Total vulnerabilities skipped : {skipped_vulnerabilities[0]} (see {skipped_output_path})")


if __name__ == "__main__":
    main()
