In [None]:
# rq1 & rq2 query
from openai import OpenAI
import json
import os

# API configuration
basic_api_key = "api_key"
basic_api_base = "api_url"

def llm_output1(system_message,human_message):
    client = OpenAI(api_key=basic_api_key, base_url=basic_api_base)
    response = client.chat.completions.create(
        model="deepseek-chat", # model
        messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": human_message},
        ],
        response_format={'type': 'json_object'},
        stream=False
    )
    answer = response.choices[0].message.content
    return (answer)

# query code pattern
def query_feature(cwe_content,language,cweid):
    system1 = '''
### Role Protocol:
You are a software security expert. Strictly follow the instructions below to generate the output.

### Task:
Output only the "Core Vulnerability Patterns" for the specified %s.

### Output Specifications:
1 Focus solely on %s applications.
2 Output only the "Core Vulnerability Patterns" for the specified %s. No examples, mitigation steps, introductory, or concluding sentences.
3 The content must be direct and concise, presented as a bulleted list.
4 Ensure that all characteristics mentioned under the CWE entry for %s are included in the patterns.
5 As much as possible, encompass all characteristics of this vulnerability type as referenced in OWASP resources, such as broken access control.
6 The "Core Vulnerability Patterns" must describe:
  a. Unsafe data sources and sinks.
  b. Dangerous data handling or propagation methods.
  c. Key security controls that are missing or bypassed.
'''%(cwe_content,language,cweid,cweid)
    a1 = llm_output1(system1)
    return a1


def llm_output2(system_message,human_message):
    client = OpenAI(api_key=basic_api_key, base_url=basic_api_base)
    response = client.chat.completions.create(
        model="deepseek-chat",
        messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": human_message},
        ],
        response_format={'type': 'json_object'},
        stream=False
    )
    answer = response.choices[0].message.content
    return (answer)

# example+pattern localization query
def query_rq3_ep(code,des,cwe_content,cwe_example,cwe_feature):
    system2 = '''
### Code Security Analysis Engine Protocol:
You are a high-precision code vulnerability analysis engine, strictly follow the following requirements:

### Task:
You will be provided with: a complete source code, a description of the vulnerability, and the corresponding CWE(Common Weakness Enumeration) type with its core patterns and examples.
Analyze the source code and identify ALL code blocks matching the description, CWE type, or correlating the CWE's core vulnerability patterns and examples. 
The provided CWE examples may be in a different programming language than the target source code, so focus exclusively on the logical and functional commonality.
Output them in JSON format.

### EXAMPLE JSON OUTPUT:
[{"Start_line": <int>,
"End_line": <int>,
"Code": "<exact_lines>",
"Function": "<enclosing_function, or "Not in function">",
"Reason": "<technical_explanation>",
"Probability": <0-100, confidence_score>},
{...}]

### Output Specifications:
1. Output STRICT JSON array ONLY, parsable by Python json.loads()
2. Return [] if NO vulnerabilities found
3. MUST correlate findings with BOTH textual description AND CWE technical specifications
4. Extract code snippets with SURGICAL PRECISION - strictly necessary lines only
5. if "Probability" is 0, discard that one.
'''
    human2='''
<Source code>: %s
<Description>: %s
<CWE>: %s
<CWE Pattern>: %s
<CWE Example>: %s
'''%(code,des,cwe_content,cwe_example,cwe_feature)
    a2 = llm_output2(system2,human2)
    return a2

# only example localization query
def query_rq3_e(code,des,cwe_content,cwe_example):
    system2 = '''
### Code Security Analysis Engine Protocol:
You are a high-precision code vulnerability analysis engine, strictly follow the following requirements:

### Task:
You will be provided with: a complete source code, a description of the vulnerability, and the corresponding CWE(Common Weakness Enumeration) type with its examples.
Analyze the source code and identify ALL code blocks matching the description, CWE type, or correlating the patterns of CWE examples. 
The provided CWE examples may be in a different programming language than the target source code, so focus exclusively on the logical and functional commonality.
Output them in JSON format.

### EXAMPLE JSON OUTPUT:
[{"Start_line": <int>,
"End_line": <int>,
"Code": "<exact_lines>",
"Function": "<enclosing_function, or "Not in function">",
"Reason": "<technical_explanation>",
"Probability": <0-100, confidence_score>},
{...}]

### Output Specifications:
1. Output STRICT JSON array ONLY, parsable by Python json.loads()
2. Return [] if NO vulnerabilities found
3. MUST correlate findings with BOTH textual description AND CWE technical specifications
4. Extract code snippets with SURGICAL PRECISION - strictly necessary lines only
5. if "Probability" is lower than 50, discard that one.
'''
    human2='''
<Source code>: %s
<Description>: %s
<CWE>: %s
<CWE Example>: %s
'''%(code,des,cwe_content,cwe_example)
    a2 = llm_output2(system2,human2)
    return a2

# only pattern localization query
def query_rq3_p(code,des,cwe_content,cwe_feature):
    system2 = '''
### Code Security Analysis Engine Protocol:
You are a high-precision code vulnerability analysis engine, strictly follow the following requirements:

### Task:
You will be provided with: a complete source code, a description of the vulnerability, and the corresponding CWE(Common Weakness Enumeration) type with its core patterns.
Analyze the source code and identify ALL code blocks matching the description, CWE type, or correlating the CWE's core vulnerability patterns. 
Output them in JSON format.

### EXAMPLE JSON OUTPUT:
[{"Start_line": <int>,
"End_line": <int>,
"Code": "<exact_lines>",
"Function": "<enclosing_function, or "Not in function">",
"Reason": "<technical_explanation>",
"Probability": <0-100, confidence_score>},
{...}]

### Output Specifications:
1. Output STRICT JSON array ONLY, parsable by Python json.loads()
2. Return [] if NO vulnerabilities found
3. MUST correlate findings with BOTH textual description AND CWE technical specifications
4. Extract code snippets with SURGICAL PRECISION - strictly necessary lines only
5. if "Probability" is lower than 50, discard that one.
'''
    human2='''
<Source code>: %s
<Description>: %s
<CWE>: %s
<CWE Pattern>: %s
'''%(code,des,cwe_content,cwe_feature)
    a2 = llm_output2(system2,human2)
    return a2

In [None]:
# rq3 main query

# Switch languages using the code commented out below
language = "python"
# number of Python output error files
cve_code_all_wrong = [5, 6, 8, 9, 11, 12, 22, 28, 29, 34, 37, 39, 47, 49, 50, 53, 59, 64, 71, 76, 78, 86, 97, 101, 112, 119, 121, 123, 125, 132, 138, 144, 159, 160, 174, 184, 189, 200, 202, 210, 214, 217, 219]
# language = "java"
# number of Java output error files
# cve_code_all_wrong = [1, 2, 5, 16, 18, 20, 21, 23, 35, 36, 37, 39, 55, 59, 64, 81, 85, 90, 93, 95, 103, 104, 107, 111, 112, 113, 118, 122, 130, 131, 135, 139, 141, 154, 166, 168, 172, 174, 181, 185, 187, 191, 192, 196]


# example+pattern localization
cve_jump = []
concept_type = "cwe_example_feature"
input_file = f"./data/{language}/{language}_vul_2.jsonl"
key_extract = ['Language','ExampleCode']

num = 0
with open(input_file, 'rb') as f1:
    for line in f1:
        if num >=0: # Starting from which line, usually set to 0
            if num >=223.: # Ending to which line, total: python-223, java-199
                break
            else:
                if num in cve_code_all_wrong: # Continue only in the completely wrong set
                    data_info = json.loads(line)
                    filenum = data_info["num"]
                    filename_new = f"./data/{language}/RQ3_ep/{filenum}.json" 
                    if os.path.exists(filename_new):
                        print(f"{num},{filenum} already query")
                        num += 1
                        continue
                else:
                    num += 1
                    continue
        else:
            num += 1
            continue

        # CWE content
        cwe_list = data_info["cwe_list"]
        if cwe_list == ["NVD-CWE-Other"] or cwe_list == ["NVD-CWE-noinfo"]:
            print(f"{num},{filenum} cwe is null")
            cve_jump.append(num)
            num += 1
            continue
        cwe_name_lst = data_info["cwe_name_lst"]
        cwe_content = f"{cwe_list[0]},{cwe_name_lst[0]}"
    
        # get CWE example
        cweid = cwe_list[0].split("-")[1]
        cwe_source = f'https://cwe-api.mitre.org/api/v1/cwe/weakness/{cweid}'
        file2 = ""
        t1 = 0
        while t1 < 3:
            try:
                file2 = requests.get(cwe_source, verify=False, timeout = 10)
                break
            except:
                t1 += 1
        response_dict1 = file2.json()
        if 'DemonstrativeExamples' in response_dict1['Weaknesses'][0]: 
            examples = response_dict1['Weaknesses'][0]['DemonstrativeExamples']
        else:
            print(f"{num},{filenum} has no example")
            cve_jump.append(num)
            num += 1
            continue
        example_all = [] # Used to save to dataset
        example_onlycode = [] # Used for inputting queries
        for example_single in examples:
            for j in example_single['Entries']:
                if 'Nature' in j: # If there is' Nature 'in the field and the value is' Bad', it indicates that the code is vulnerable
                    if j['Nature'] == 'Bad':
                        new_dict = {key: j[key] for key in key_extract if key in j}
                        example_all.append(new_dict)
                        example_onlycode.append(j['ExampleCode'])
        cwe_example = example_onlycode
        if cwe_example == []: # If cwe does not have an example, skip it directly
            print(f"{num},{filenum} has no example")
            cve_jump.append(num)
            num += 1
            continue

        # get cwe feature
        cwe_feature = query_feature(cwe_content,language,cweid)

        des = data_info["des"]
        cve = data_info["cve"]
        filename_list = data_info["filename_list"]

        # main query
        try:
            print(f"Start query {num},{filenum}")
            concept_output = []
            for i in range(0,len(filename_list)):
                with open(filename_list[i], 'r', encoding='utf-8') as f3:
                    code = f3.read()

                data_cwe = {}
                cwe_content = f"{cwe_list[0]},{cwe_name_lst[0]}"

                answer = query_rq3_ep(code,des,cwe_content,cwe_example,cwe_feature)
                a3 = answer
                if a3 != []:
                    candidate = json.loads(a3)
                else:
                    candidate = a3

                filename_original = filename_list[i].strip(f'./data/{language}/{language}_filter/').strip('.txt').strip()
                data_file = {"file_name":filename_original,"candidate":candidate}
                concept_output.append(data_file)

            date_concept = {"cve":cve,"cwe":cwe_content, "cwe_feature":cwe_feature, "cwe_example": example_all, "concept":concept_type, "concept_output":concept_output}

            with open(filename_new,"w") as f4:
                json.dump(date_concept,f4)
            num += 1

        except:
            print(f"{num},{filenum} something wrong, not save")
            num += 1
            continue

print("end 1")

In [None]:
# rq3 only example query

# Switch languages using the code commented out below
language = "python"
# number of Python output error files
cve_code_all_wrong = [5, 6, 8, 9, 11, 12, 22, 28, 29, 34, 37, 39, 47, 49, 50, 53, 59, 64, 71, 76, 78, 86, 97, 101, 112, 119, 121, 123, 125, 132, 138, 144, 159, 160, 174, 184, 189, 200, 202, 210, 214, 217, 219]
# language = "java"
# number of Java output error files
# cve_code_all_wrong = [1, 2, 5, 16, 18, 20, 21, 23, 35, 36, 37, 39, 55, 59, 64, 81, 85, 90, 93, 95, 103, 104, 107, 111, 112, 113, 118, 122, 130, 131, 135, 139, 141, 154, 166, 168, 172, 174, 181, 185, 187, 191, 192, 196]

# only example localization
cve_jump = []
concept_type = "cwe_example"
input_file = f"./data/{language}/{language}_vul_2.jsonl"
key_extract = ['Language','ExampleCode']

num = 0
with open(input_file, 'rb') as f1:
    for line in f1: 
        if num >= 0: # Starting from which line, usually set to 0
            if num >= 230: # Ending to which line, total: python-223, java-199
                break
            else:
                if num in cve_code_all_wrong: # Continue only in the completely wrong set
                    data_info = json.loads(line)
                    filenum = data_info["num"]
                    filename_new = f"./data/{language}/RQ3_e/{filenum}.json"
                    if os.path.exists(filename_new): 
                        print(f"{num},{filenum} already query")
                        num += 1
                        continue
                else:
                    num += 1
                    continue
        else: 
            num += 1
            continue

        # CWE content
        cwe_list = data_info["cwe_list"]
        if cwe_list == ["NVD-CWE-Other"] or cwe_list == ["NVD-CWE-noinfo"]:
            print(f"{num},{filenum} cwe is null")
            cve_jump.append(num)
            num += 1
            continue
        cwe_name_lst = data_info["cwe_name_lst"]
        cwe_content = f"{cwe_list[0]},{cwe_name_lst[0]}"
    
        # CWE example has been acquired in front step, just search corresponding file.
        example_feature_file = f"./data/{language}/RQ3_ep/{filenum}.json"
        if not os.path.exists(example_feature_file): # 文件已经存在了就跳过
            print(f"{num},{filenum} cwe is empty")
            num += 1
            continue
        example_onlycode = []
        with open(example_feature_file, 'r') as f2:
            cwe_all = json.load(f2)
        example_all = cwe_all['cwe_example']
        for k in example_all:
            example_onlycode.append(k["ExampleCode"])
        cwe_example = example_onlycode
        if cwe_example == []: 
            print(f"{num},{filenum} has no example")
            cve_jump.append(num)
            num += 1
            continue

        des = data_info["des"]
        cve = data_info["cve"]
        filename_list = data_info["filename_list"]

        # example query
        try:
            print(f"Start query {num},{filenum}")
            concept_output = []
            for i in range(0,len(filename_list)):
                with open(filename_list[i], 'r', encoding='utf-8') as f3:
                    code = f3.read()

                data_cwe = {}
                cwe_content = f"{cwe_list[0]},{cwe_name_lst[0]}"

                answer = query_rq3_e(code,des,cwe_content,cwe_example)
                a3 = answer
                if a3 != []:
                    candidate = json.loads(a3)
                else:
                    candidate = a3

                filename_original = filename_list[i].strip(f'./data/{language}/{language}_filter/').strip('.txt').strip()
                data_file = {"file_name":filename_original,"candidate":candidate}
                concept_output.append(data_file)

            date_concept = {"cve":cve,"cwe":cwe_content, "cwe_example": example_all, "concept":concept_type, "concept_output":concept_output}

            with open(filename_new,"w") as f4:
                json.dump(date_concept,f4)
            num += 1

        except:
            print(f"{num},{filenum} something wrong, not save")
            num += 1
            continue

print("end 2")

In [None]:
# rq3 only pattern query 

# Switch languages using the code commented out below
language = "python"
# number of Python output error files
cve_code_all_wrong = [5, 6, 8, 9, 11, 12, 22, 28, 29, 34, 37, 39, 47, 49, 50, 53, 59, 64, 71, 76, 78, 86, 97, 101, 112, 119, 121, 123, 125, 132, 138, 144, 159, 160, 174, 184, 189, 200, 202, 210, 214, 217, 219]
# language = "java"
# number of Java output error files
# cve_code_all_wrong = [1, 2, 5, 16, 18, 20, 21, 23, 35, 36, 37, 39, 55, 59, 64, 81, 85, 90, 93, 95, 103, 104, 107, 111, 112, 113, 118, 122, 130, 131, 135, 139, 141, 154, 166, 168, 172, 174, 181, 185, 187, 191, 192, 196]

# only pattern localization
cve_jump = []
concept_type = "cwe_feature"
input_file = f"./data/{language}/{language}_vul_2.jsonl"
key_extract = ['Language','ExampleCode']

num = 0
with open(input_file, 'rb') as f1:
    for line in f1: 
        if num >= 0: # Starting from which line, usually set to 0
            if num >= 230: # Ending to which line, total: python-223, java-199
                break
            else:
                if num in cve_code_all_wrong: # Continue only in the completely wrong set
                    data_info = json.loads(line)
                    filenum = data_info["num"]
                    filename_new = f"./data/{language}/RQ3_p/{filenum}.json"
                    if os.path.exists(filename_new): 
                        print(f"{num},{filenum} already query")
                        num += 1
                        continue
                else:
                    num += 1
                    continue
        else: 
            num += 1
            continue

        # CWE content
        cwe_list = data_info["cwe_list"]
        if cwe_list == ["NVD-CWE-Other"] or cwe_list == ["NVD-CWE-noinfo"]: 
            print(f"{num},{filenum} cwe is null")
            cve_jump.append(num)
            num += 1
            continue
        cwe_name_lst = data_info["cwe_name_lst"]
        cwe_content = f"{cwe_list[0]},{cwe_name_lst[0]}"
    
        # CWE pattern has been acquired in front step, just search corresponding file.
        example_feature_file = f"./data/{language}/RQ3_ep/{filenum}.json"
        if not os.path.exists(example_feature_file): 
            print(f"{num},{filenum} cwe is empty")
            num += 1
            continue
        with open(example_feature_file, 'r') as f2:
            cwe_all = json.load(f2)
        cwe_feature = cwe_all['cwe_feature']

        des = data_info["des"]
        cve = data_info["cve"]
        filename_list = data_info["filename_list"]

        # pattern query
        try:
            print(f"Start query {num},{filenum}")
            concept_output = []
            for i in range(0,len(filename_list)):
                with open(filename_list[i], 'r', encoding='utf-8') as f3:
                    code = f3.read()

                data_cwe = {}
                cwe_content = f"{cwe_list[0]},{cwe_name_lst[0]}"

                answer = query_rq3_p(code,des,cwe_content,cwe_feature)
                a3 = answer
                if a3 != []:
                    candidate = json.loads(a3)
                else:
                    candidate = a3

                filename_original = filename_list[i].strip(f'./data/{language}/{language}_filter/').strip('.txt').strip()
                data_file = {"file_name":filename_original,"candidate":candidate}
                concept_output.append(data_file)

            date_concept = {"cve":cve,"cwe":cwe_content, "cwe_feature":cwe_feature, "concept":concept_type, "concept_output":concept_output}

            with open(filename_new,"w") as f4:
                json.dump(date_concept,f4)
            num += 1

        except:
            print(f"{num},{filenum} something wrong, not save")
            num += 1
            continue

print("end 3")