In [88]:
from functools import reduce
import xml.etree.ElementTree as ET
from xml.etree.ElementTree import Element
import json

def get_xhtml_text(item: Element) -> str:
    if len(item.text.strip()) > 0:
        return item.text.strip()
    texts = [text.strip() for text in item.itertext() if len(text.strip()) > 0]
    return reduce(lambda x, y: x+"\n"+y, texts)

def get_cwe_info(cwe_id: str) -> str:
    """get cwe json string formatted information from xml with the given cwe id
    
    Args:
        cwe_id (str): cwe id
    
    Returns:
        str: cwe json string formatted information
    """
    root = ET.parse("./data/cwec_v4.16.xml")
    for item in root.getroot():
        if item.tag == "{http://cwe.mitre.org/cwe-7}Weaknesses":
            weaknesses = item

    for weakness in weaknesses:
        id = weakness.attrib["ID"]
        if id != cwe_id:
            continue
        name = weakness.attrib["Name"]
        abstraction = weakness.attrib["Abstraction"]
        description = ""
        ext_description = ""
        background_details = ""
        likelihood_of_exploit = ""
        consequences = []
        detection_methods = []
        potential_mitigations = []
        for item in weakness:
            match (item.tag):
                case "{http://cwe.mitre.org/cwe-7}Description":
                    description = get_xhtml_text(item)
                case "{http://cwe.mitre.org/cwe-7}Extended_Description":
                    ext_description = get_xhtml_text(item)
                case "{http://cwe.mitre.org/cwe-7}Background_Details":
                    background_details = get_xhtml_text(item)
                case "{http://cwe.mitre.org/cwe-7}Likelihood_Of_Exploit":
                    likelihood_of_exploit = get_xhtml_text(item)
                case "{http://cwe.mitre.org/cwe-7}Common_Consequences":
                    for consequence in item:
                        scope = ""
                        impact = ""
                        note = ""
                        for subconsequence in consequence:
                            match (subconsequence.tag):
                                case "{http://cwe.mitre.org/cwe-7}Scope":
                                    scope = get_xhtml_text(subconsequence)
                                case "{http://cwe.mitre.org/cwe-7}Impact":
                                    impact = get_xhtml_text(subconsequence)
                                case "{http://cwe.mitre.org/cwe-7}Note":
                                    note = get_xhtml_text(subconsequence)
                        conseq = {
                            "scope": scope,
                            "impact": impact,
                            "note": note,
                        }
                        consequences.append(conseq)
                case "{http://cwe.mitre.org/cwe-7}Detection_Methods":
                    for method in item:
                        detection_method_id = method.attrib["Detection_Method_ID"] if "Detection_Method_ID" in method.keys() else ""
                        method_name = ""
                        description = ""
                        effectiveness = ""
                        for method_detail in method:
                            match (method_detail.tag):
                                case "{http://cwe.mitre.org/cwe-7}Method":
                                    method_name = get_xhtml_text(method_detail)
                                case "{http://cwe.mitre.org/cwe-7}Description":
                                    description = get_xhtml_text(method_detail)
                                case "{http://cwe.mitre.org/cwe-7}Effectiveness":
                                    effectiveness = get_xhtml_text(method_detail)
                        detection_method = {
                            "detection_method_id": detection_method_id,
                            "method": method_name,
                            "description": description,
                            "effectiveness": effectiveness,
                        }
                        detection_methods.append(detection_method)
                case "{http://cwe.mitre.org/cwe-7}Potential_Mitigations":
                    for mitigation in item:
                        phase = ""
                        description = ""
                        effectiveness = ""
                        effectiveness_notes = ""
                        for mitigation_detail in mitigation:
                            match (mitigation_detail.tag):
                                case "{http://cwe.mitre.org/cwe-7}Phase":
                                    phase = get_xhtml_text(mitigation_detail)
                                case "{http://cwe.mitre.org/cwe-7}Description":
                                    description = get_xhtml_text(mitigation_detail)
                                case "{http://cwe.mitre.org/cwe-7}Effectiveness":
                                    effectiveness = get_xhtml_text(mitigation_detail)
                                case "{http://cwe.mitre.org/cwe-7}Effectiveness_Notes":
                                    effectiveness_notes = get_xhtml_text(
                                        mitigation_detail
                                    )
                        potential_mitigation = {
                            "phase": phase,
                            "description": description,
                            "effectiveness": effectiveness,
                            "effectiveness_notes": effectiveness_notes,
                        }
                        potential_mitigations.append(potential_mitigation)
        # construct cwe information parsed from weakness element
        cwe = {
            "id": id,
            "name": name,
            "abstraction": abstraction,
            "description": description,
            "extended_description": ext_description,
            "background_details": background_details,
            "likelihood_of_exploit": likelihood_of_exploit,
            "common_consequences": consequences,
            "detection_methods": detection_methods,
            "potential_mitigations": potential_mitigations,
        }
        return json.dumps(cwe)

cwe_78 = get_cwe_info("78")
print(cwe_78)



## SSDLC 安全检查结果分析器
### 步骤一，初始化AI模型

In [87]:
from dotenv import load_dotenv, find_dotenv

_ = load_dotenv(find_dotenv("./env/.env"))

import dashscope
from http import HTTPStatus
from pprint import pprint
import json

from langchain.prompts import ChatPromptTemplate
from langchain_community.chat_models import ChatTongyi

llm_model = "qwen-max"

llm = ChatTongyi(temperature=1.0, model=llm_model)

### 步骤二，分析统计结果

In [105]:
json_prompt = """你精通python代码的静态检查相关的工具和原理，假设我拿到了python工具bandit扫描的结果，结果是json格式的，我需要分析这个结果从而确认是否误报，是否需要高优先级修复。
首先，请根据给出的json数据，帮我找出总共有多少高危（Severity = HIGH）、中危（Severity = MEDIUM）、低危（Severity = LOW）的漏洞。

json数据：
```json
{json_data}
```
"""

json_analysis_prompt = ChatPromptTemplate.from_template(json_prompt)

with open("./data/sysom-3.3.0/result.json", "r") as f:
    json_data = json.load(f)

metrics = json_data["metrics"]["_totals"]
high_results = json_data["results"]

response = llm.invoke(json_analysis_prompt.format_messages(json_data=metrics))

print(response.content)

根据你提供的JSON数据，我们可以直接读取与严重性（SEVERITY）相关的字段来确定不同级别的漏洞数量。这些字段分别是`SEVERITY.HIGH`、`SEVERITY.MEDIUM`和`SEVERITY.LOW`。下面是具体的数值：

- 高危 (Severity = HIGH) 的漏洞数量: 53
- 中危 (Severity = MEDIUM) 的漏洞数量: 61
- 低危 (Severity = LOW) 的漏洞数量: 236

从这个信息中可以看出：
- 总共有53个高危漏洞。
- 总共有61个中危漏洞。
- 总共有236个低危漏洞。

这样的统计可以帮助你快速了解扫描结果的整体风险分布情况，从而优先处理那些被标记为高危或中危的问题。对于每一个具体的报告项，如果你怀疑某些可能是误报，则需要进一步查看该问题的具体描述以及代码上下文来进行判断。如果确实认为是误报，可以通过适当的方式（如添加注释等）在代码中标记这一点，以便后续的自动化检查能够识别并忽略这些已知的“安全”实例。同时，确保所有真正存在的安全问题得到及时修复，特别是那些被评为高危级别的问题。


### 步骤三，提取高危扫描结果


In [117]:
from time import sleep
high_risk_prompt = """你精通python代码的静态检查相关的工具和原理，假设我拿到了python工具bandit扫描的结果，结果是json格式的，我需要分析这个结果从而确认是否误报，是否需要高优先级修复。扫描结果格式如下所示：
```json
{{
  "metrics": {{
    "_totals": {{
      "CONFIDENCE.HIGH": <所有文件中扫描出的问题信心指数高的个数>,
      "CONFIDENCE.LOW": <所有文件中扫描出的问题信心指数低的个数>,
      "CONFIDENCE.MEDIUM":<所有文件中扫描出的问题信心指数中的个数>,
      "CONFIDENCE.UNDEFINED": <所有文件中扫描出的问题信心指数不确定的个数>,
      "SEVERITY.HIGH": <所有文件中扫描出的问题严重性高的个数>,
      "SEVERITY.LOW": <所有文件中扫描出的问题严重性低的个数>,
      "SEVERITY.MEDIUM": <所有文件中扫描出的问题严重性的中的个数>,
      "SEVERITY.UNDEFINED": <所有文件中扫描出的问题严重性不确定的个数>,
      "loc": <所有文件代码行数>,
      "nosec": <所有文件nosec打标数量>,
      "skipped_tests": <所有文件跳过的测试数量>
    }},
    "results":[<检查工具扫描出的问题列表>
    <example>
    {{
      "code": "3 import json\n4 import requests\n5 import subprocess\n6 import logging\n7 \n",
      "col_offset": 0,
      "end_col_offset": 17,
      "filename": "./bench/common/system.py",
      "issue_confidence": "HIGH",
      "issue_cwe": {{
        "id": 78,
        "link": "https://cwe.mitre.org/data/definitions/78.html"
      }},
      "issue_severity": "LOW",
      "issue_text": "Consider possible security implications associated with the subprocess module.",
      "line_number": 5,
      "line_range": [
        5
      ],
      "more_info": "https://bandit.readthedocs.io/en/1.8.0/blacklists/blacklist_imports.html#b404-import-subprocess",
      "test_id": "B404",
      "test_name": "blacklist"
    }}
    </example>
    ]
  }}
}}
```

你需要做的事情是：
1.请根据给出的json数据，帮我找出总共有多少高危（Severity = HIGH）、中危（Severity = MEDIUM）、低危（Severity = LOW）的漏洞。
2.输出扫描结果中高危的问题列表。
3.根据扫描结果中metrics记录，帮我找出扫描结果中，存在高危漏洞的文件列表。

要求：
1.根据我的要求帮我完成上述任务，不要做其他事情。
2.不需要输出任何解释，只需要按照我的要求做完事就好。
3.输出必须是json格式，包括扫描结果的总数量和问题列表。
4.输出结果不需要包括```json, ```这样符号。
5.输出格式如下：
```json
{{
  "total_vulnerabilities":{{
    "high_severity": <高危问题数量>,
    "medium_severity": <中危问题数量>,
    "low_severity": <低危问题数量>,
  }},
  "high_severity_issues": [
    {{
      "code": <问题代码>,
      "col_offset": <问题的描述>,
      "end_col_offset": <问题的严重程度>,
      "filename": <问题的修复建议>,
      "issue_confidence": <参考资料链接>,
      "issue_cwe": {{
        "id": <CWE编号>,
        "name": <CWE名称>
      }},
      "issue_severity": <问题严重等级>,
      "issue_text": <问题描述>,
      "line_range": <问题所在行>,
      "more_info": <问题修复建议>,
      "test_id": <问题类型>,
      "test_name": <问题类型名称>
    }},
    ...
  ],
  "files_with_high_severity_issues": [
    <文件路径>,
    ...
  ],
}}
```

json数据：
```json
{json_data}
```
"""

json_data_simplified = {}
for item in json_data["metrics"]:
    json_data_simplified["metrics"] = {}
    if item == "_totals":
        json_data_simplified["metrics"][item] = json_data["metrics"][item]

chunks = [
    json_data["results"][x : x + 5] for x in range(0, len(json_data["results"]), 5)
]
count = 0
for chunk in chunks:
    count += len(chunk)

print(count)

high_risk_analysis_prompt = ChatPromptTemplate.from_template(high_risk_prompt)
high_risks = []
for idx, chunk in enumerate(chunks):
    print(f"chunk {idx}")
    json_data_simplified["results"] = chunk
    print(len(json_data_simplified["results"]))
    response = llm.invoke(
        high_risk_analysis_prompt.format_messages(json_data=json_data_simplified)
    )

    print("ok")
    high_risk = response.content
    high_risks.append(high_risk)
    sleep(60)


53
chunk 0
5
chunk 1
5
chunk 2
5
chunk 3
5
chunk 4
5
chunk 5
5
chunk 6
5
chunk 7
5
chunk 8
5
chunk 9
5
chunk 10
3


#### cache high risk result


In [118]:
cache_high_risks = json.dumps(high_risks)
with open("cache_high_risks.json", "w") as f:
    f.write(cache_high_risks)

#### merge high risk result

In [169]:
objs = []
for idx, high_risk_chunk in enumerate(high_risks):
    try:
        # 尝试直接解析 JSON
        high_risk_json_obj = json.loads(high_risk_chunk)
        objs.append(high_risk_json_obj)
    except json.JSONDecodeError as e:
        # print(e)
        # 预处理字符串，移除非法转义字符
        cleaned_chunk = high_risk_chunk.replace("\\'", "'")
        try:
            # 再次尝试解析清理后的 JSON
            high_risk_json_obj = json.loads(cleaned_chunk)
            objs.append(high_risk_json_obj)
        except json.JSONDecodeError as e2:
            print(high_risk_chunk[2000:2130])
            print(f"Failed to parse even after cleaning: {e2}")

merged_obj = objs[0]
for obj in objs[1:]:
    merged_obj["high_severity_issues"].extend(obj["high_severity_issues"])
    merged_obj["files_with_high_severity_issues"].extend(obj["files_with_high_severity_issues"])

print(len(merged_obj["high_severity_issues"]))
print(len(merged_obj["files_with_high_severity_issues"]))

53
18


In [170]:
pprint(merged_obj["high_severity_issues"][0])

{'code': '460     return True\n'
         '461 def do_cmd(cmd):\n'
         '462     output = os.popen(cmd)\n'
         '463     ret = output.read().strip()\n'
         '464     output.close()\n',
 'col_offset': 13,
 'end_col_offset': 26,
 'filename': './script/server/sysom_vmcore/parse_panic.py',
 'issue_confidence': 'HIGH',
 'issue_cwe': {'id': 78,
               'name': 'CWE-78: Improper Neutralization of Special Elements '
                       "used in an OS Command ('OS Command Injection')"},
 'issue_severity': 'HIGH',
 'issue_text': 'Starting a process with a shell, possible injection detected, '
               'security issue.',
 'line_range': [462],
 'more_info': 'https://bandit.readthedocs.io/en/1.8.0/plugins/b605_start_process_with_a_shell.html',
 'test_id': 'B605',
 'test_name': 'start_process_with_a_shell'}


### 步骤四，提取高危问题源码和CWE信息


In [188]:
import os
from IPython.display import display, Markdown

risk = merged_obj["high_severity_issues"][0]
issue_code = risk["code"]
issue_source_code_path = risk["filename"]
issue_source_code = open(os.path.join("./data", "sysom-3.3.0", issue_source_code_path), "r").read()
calling_source_code_path = [risk["filename"]]
cwe_id = risk["issue_cwe"]["id"]
cwe_info = get_cwe_info(str(cwe_id))

risk_attack_vector = {
    "issue_text": risk["issue_text"],
    "code": issue_code,
    "issue_source_code_path": issue_source_code_path,
    "issue_source_code": issue_source_code,
    "calling_source_code_path": calling_source_code_path,
    "cwe": cwe_id,
    "cwe_info": cwe_info,
}
print(risk_attack_vector)

display(Markdown(f"""```python
{issue_code}
```
---
```python
{issue_source_code[:500]}
```
---
{cwe_78[:500]}
"""
    )
)

cache_risks = json.dumps(merged_obj["high_severity_issues"][:25])
with open("cache_risks_0_25.json", "w") as f:
    f.write(cache_risks)
cache_risks_2 = json.dumps(merged_obj["high_severity_issues"][25:])
with open("cache_risks_25_53.json", "w") as f:
    f.write(cache_risks_2)



```python
460     return True
461 def do_cmd(cmd):
462     output = os.popen(cmd)
463     ret = output.read().strip()
464     output.close()

```
---
```python
# -*- coding: utf-8 -*-
# @Author: lichen/zhilan

import os
import sys
import time
import subprocess
import re 
import sqlite3
import json
import traceback
import importlib
import argparse
import requests
import vmcore_const
import time
from datetime import datetime
import threading
import queue
queue = queue.Queue()

if sys.version[0] == '2':
    reload(sys)
    sys.setdefaultencoding('utf8')

# crashkey_type={
# 0:func_name
# 1:calltrace
# 2:crashkey
# 3:bugon_file
#}
nfs_root = '/usr/vmcore-n
```
---
{"id": "78", "name": "Improper Neutralization of Special Elements used in an OS Command ('OS Command Injection')", "abstraction": "Base", "description": "When using PHP, configure the application so that it does not use register_globals. During implementation, develop the application so that it does not rely on this feature, but be wary of implementing a register_globals emulation that is subject to weaknesses such as CWE-95, CWE-621, and similar issues.", "extended_description": "This weakness 
