## Define LLM's API

In [53]:
import logging
import requests
from openai import OpenAI

def generate(query, system_prompt='vvYou are a helpful AI assistant', base_url='https://xiaoai.plus/v1', model='gpt-4o-2024-08-06', retries=3):

    if model == 'arktsLLM':
        for attempt in range(retries):
            try:
                data = {
                    "model": model,
                    "prompt": query,
                    "stream": False
                }
                response = requests.post('http://localhost:11434/api/generate', json=data)
                return response.json()['response']

            except Exception as e:
                logging.error(f"Attempt {attempt + 1} failed with error: {e}")
                if attempt + 1 == retries:
                    raise

    else: 
        if base_url == 'https://xiaoai.plus/v1':
            api_key="sk-Y7BDSlT4EKjhVfPpA603Bb0cC549424b9d1734262f6fE6C0"
        else:
            api_key="sk-bee3f43476e04c0ba5a0eee09961f325"
            
        client = OpenAI(
            base_url=base_url,
            api_key=api_key
        )

        for attempt in range(retries):
            try:
                if 'o1-mini' in model:
                    # 如果query是list，则认为是conversation
                    if isinstance(query, list):
                        chat_completion = client.chat.completions.create(
                            model=model,
                            messages=query
                        )
                    else:
                        chat_completion = client.chat.completions.create(
                            model=model,
                            messages=[
                                {"role": "user", "content": query}
                            ],
                    )
                else:
                    # 如果query是list，则认为是conversation
                    if isinstance(query, list):
                        chat_completion = client.chat.completions.create(
                            model=model,
                            messages=query
                        )
                    else:
                        chat_completion = client.chat.completions.create(
                        model=model,
                        messages=[
                            {"role": "system", "content": system_prompt},
                            {"role": "user", "content": query}
                        ],
                    )
                return chat_completion.choices[0].message.content
            except Exception as e:
                logging.error(f"Attempt {attempt + 1} failed with error: {e}")
                if attempt + 1 == retries:
                    raise

In [3]:
import json
import re
import os

def sort_json_lines(data):
    if isinstance(data, list):
        return sorted(data, key=lambda x: x.get('line', 0))
    return data

def handle_vul_type_res(res):
    # 提取 JSON 格式数据块
    json_match = re.search(r'```json\n(.*?)\n```', res, flags=re.DOTALL)
    if json_match:
        cleaned_res = json_match.group(1)
    else:
        print("No JSON block found")
        return None
    
    try:
        data = json.loads(cleaned_res)
        # 假设 sort_json_lines 是有效的排序函数
        data = sort_json_lines(data)
        return data
    except json.JSONDecodeError as e:
        print(f"Error when parsing JSON: {e}")
        return cleaned_res

def read_code():
    dir = "../defect"
    codes = {}
    for filename in sorted(os.listdir(dir)):
        if filename.endswith('.ets'):
            with open(os.path.join(dir, filename), 'r', encoding='utf-8') as file:
                code = file.read()
                codes[filename.split('.')[0]] = code
    return codes

import os

def parse_ets_file(file_path):
    description = []
    code_example = ""
    in_comment = True

    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            stripped_line = line.strip()
            if in_comment and stripped_line.startswith('//'):
                description.append(stripped_line[2:].strip())
            else:
                in_comment = False
                code_example += stripped_line + " "

    return {
        'description': ' '.join(description),
        'defect_code_example': code_example.strip()
    }

# 新增函数用于获取正例数据
def get_positive_example(file_path):
    code_example = ""
    in_comment = False  # 正例不需要提取注释部分
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            stripped_line = line.strip()
            code_example += stripped_line + " "
    return code_example.strip()

# 修改后的规则构建函数，包含正例数据
def build_rules_dict(negative_dir, positive_dir):
    rules = {}
    for filename in os.listdir(negative_dir):
        if filename.endswith('.ets'):
            rule_name = os.path.splitext(filename)[0]
            rule_key = f"@performance/{rule_name}"
            negative_file_path = os.path.join(negative_dir, filename)
            rule_data = parse_ets_file(negative_file_path)

            # 查找对应的正例文件
            positive_file_path = os.path.join(positive_dir, filename)
            if os.path.exists(positive_file_path):
                positive_example = get_positive_example(positive_file_path)
                rule_data['positive_code_example'] = positive_example
            else:
                rule_data['positive_code_example'] = None  # 没有正例时返回 None

            rules[rule_key] = rule_data
    return rules

### Get rules' description

In [4]:
negative_directory = '../pages/negative'
positive_directory = '../pages/positive'
rules = build_rules_dict(negative_directory, positive_directory)

print(rules)




In [46]:
system_prompt = """
I am an expert at analyzing CodeLinter results and extracting minimal code blocks containing defects. When you provide me with ArkTS code and CodeLinter analysis results, I will:

1. Learn the defect patterns from the rules
2. Take the CodeLinter results as ground truth defect locations
3. For each defect reported by CodeLinter:
   - Find the minimal code block containing the defect and its required context
   - Include any related code that is necessary to understand the defect
   - Ensure the extracted block captures the full scope of the issue
  

Input format:
1. Complete ArkTS source code
2. CodeLinter results in JSON:
[
  {
    "rule": "@performance/rule-name",
    "line": line_number, 
    "message": "Issue description",
    "code": "Code snippet"
  },
  {
    "rule": "@performance/rule-name",
    "line": line_number, 
    "message": "Issue description",
    "code": "Code snippet"
  }
]

Important notes:
- I will treat each CodeLinter result as definitive evidence of a defect
- The defect_block will be the complete code unit containing the issue, for example, the define and use of a variable. 
- You should find all the code related to the defect and return the defect_block. For example, for a constant-property-referencing-check-in-loops defect, you need to extract the constant variable assignment outside of the outermost loop
- Line numbers will be preserved for accurate block replacement
- Output will be valid JSON only, sorted by line number
- Each block must be independently understandable and fixable

I must output valid JSON only, sorted by line number. Make sure the output is correct JSON format which can be parsed by json.loads() and follow the format: 
```json
[
  {
    "rule": "@performance/rule-name",
    "defect_block": {
      "code": ["Code block containing the defect"],
      "start_line": "first_line_number",
      "end_line": "last_line_number",
      "line of interest": "The code where the defect is reported by CodeLinter. Code instead of line number!!"
    },
    "analysis": "The description of the defect"
  },
  ...
]
```

Following are the defect patterns I will look for:
"""

for rule, details in rules.items():
    system_prompt += "Rule: {}\nDescription: {}\nDefect Example:\n{}\nFixed Example:\n{}\n\n".format(
        rule,
        details['description'],
        details['defect_code_example'],
        details['positive_code_example']
    )


In [33]:
### read code from ../EdgeTest.ts
filename = 'VerticalStack.ts'
with open(filename, 'r') as f:
    code = f.read()


In [34]:
import json
import pandas as pd

# 读取Excel文件，指定第二行为表头
df = pd.read_excel('../result.xlsx', header=1)

# 初始化结果列表
results = []

# 将代码按行拆分
code_lines = code.split('\n')

for _, row in df.iterrows():
    source_file = str(row['Source File'])
    if source_file.endswith(filename):
        line_num = int(row['Line'])
        result = {
            "rule": row['RuleName'],
            "line": line_num,
            "message": row['Detail'],
            "code": code_lines[line_num - 1].strip() if line_num <= len(code_lines) else ""
        }
        results.append(result)

# 按line排序结果
results.sort(key=lambda x: x['line'])

unique_results = []
seen = set()

for result in results:
    key = (result['rule'], result['line'])
    if key not in seen:
        unique_results.append(result)
        seen.add(key)

# 转换为JSON字符串
codelinter_res = json.dumps(unique_results, indent=2)
print(codelinter_res)


[
  {
    "rule": "@performance/constant-property-referencing-check-in-loops",
    "line": 61,
    "message": "Suggestion: This property access occurs within a loop and returns a constant result; you are advised to extract it outside the loop body to reduce the number of property access times.",
    "code": "const n = j * VerticalStack.e_rowCount + i;"
  }
]


  warn("Workbook contains no default style, apply openpyxl's default")


In [49]:
# 解析codelinter_res为Python对象
codelinter_results = json.loads(codelinter_res)

vul_type_data = []
defects = []

for i, defect in enumerate(codelinter_results):
  print(f"Processing defect {i+1} of {len(codelinter_results)}")
  prompt = f"""Following is my arkts code which you should check defects: 
{code}

Following is the CodeLinter analysis result:
{json.dumps(defect, indent=2)}
""" + """
Now, please output the defect contained the defect code block in JSON format:
```json
{
  "rule": "@performance/rule-name",
  "defect_block": {
    "code": ["Code block containing the defect"],
    "start_line": "first_line_number",
    "end_line": "last_line_number",
    "line of interest": "The code where the defect is reported by CodeLinter. Code instead of line number!!"
  },
  "analysis": "The description of the defect"
}
```
               """
  # print(prompt)
  res = generate(prompt, system_prompt=system_prompt, base_url='https://xiaoai.plus/v1', model='gpt-4o-2024-08-06')
  vul_type_data = handle_vul_type_res(res) 
  ## 如果是数组的话，取出第一个元素
  if isinstance(vul_type_data, list):
    vul_type_data = vul_type_data[0]

  defects.append(vul_type_data)




Processing defect 1 of 1


In [50]:
print(json.dumps(defects, indent=4))


[
    {
        "rule": "@performance/constant-property-referencing-check-in-loops",
        "defect_block": {
            "code": [
                "for (let j = 0; j < VerticalStack.e_columnCount; ++j) {",
                "    const shape = new box2d.b2PolygonShape();",
                "    shape.SetAsBox(0.5, 0.5);",
                "",
                "    const fd = new box2d.b2FixtureDef();",
                "    fd.shape = shape;",
                "    fd.density = 1.0;",
                "    fd.friction = 0.3;",
                "",
                "    const thisRowCount = VerticalStack.e_rowCount;",
                "",
                "    for (let i = 0; i < thisRowCount; ++i) {",
                "        const bd = new box2d.b2BodyDef();",
                "        bd.type = box2d.b2BodyType.b2_dynamicBody;",
                "",
                "        const n = j * thisRowCount + i;",
                "        // DEBUG: box2d.b2Assert(n < thisRowCount * VerticalStack.e_colum

### Begin to repair

In [10]:
from openai import OpenAI
import re
import json
import logging
from transformers import AutoTokenizer, AutoModel
from pinecone import Pinecone
import pandas as pd
import os
import requests
import time

def load_model_and_index():
    model_name = "bert-base-uncased"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModel.from_pretrained(model_name)
    # 初始化 Pinecone
    pc = Pinecone(api_key="40075f49-8396-4571-924a-4b6d342cc81d")

    # 创建 Pinecone 索引
    index_name = "arkts-defects"
    dimension = 768  # BERT base 的输出维度是 768
    # 连接到索引
    index = pc.Index(index_name)
    return model, tokenizer, index

def get_embedding(text, model, tokenizer):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
    outputs = model(**inputs)
    # 使用最后一个隐藏层的平均池化作为句子嵌入
    embeddings = outputs.last_hidden_state.mean(dim=1).squeeze().detach().numpy()
    return embeddings

  from .autonotebook import tqdm as notebook_tqdm


In [11]:
def get_fix_prompt(repair_example, model, tokenizer, index):
    query_text = repair_example["problem_code"]
    query_vector = get_embedding(query_text, model, tokenizer)
    
    results = index.query(
        namespace="arkts",
        vector=query_vector.tolist(),
        top_k=5,
        include_metadata=True,
        filter={"rule": repair_example["rule"]}
    )
    fix_prompt = "I will show you similar errors below. Please help me fix my code based on these error fixes.\n"
    matches = results.matches
    for j, match in enumerate(matches):
        metadata_text = match['metadata']['text']
        try:
            parsed_text = json.loads(metadata_text)
            fix_prompt += (f"Demo {j+1}: \nRule Type: \n{parsed_text['rule']}\n\nDescription: \n{parsed_text['description']}\n\n"
                       f"Problem Code: \n```arkts\n{parsed_text['problem_code']}\n```\n\nFix Explanation: \n{parsed_text['problem_explain']}\n\n"
                       f"Fixed Code: \n\n```arkts\n{parsed_text['problem_fix']}\n```\n\n")
        except json.JSONDecodeError as e:
            logging.error(f"Error decoding JSON for ID {match['id']}: {e}")
            logging.error(f"Original metadata text: {metadata_text}")
    
    return fix_prompt

In [12]:
### 上面的prompt是修复单个缺陷的，现在我有vul_type_data这个检测出来的多缺陷数据，我需要逐个缺陷进行修复

def repair_multiple_defects(defects, model, tokenizer, index):
    """
    修复多个缺陷
    Args:
        vul_type_data: 包含多个缺陷的数据
        model: BERT模型
        tokenizer: BERT分词器
        index: Pinecone索引
    Returns:
        修复建议列表
    """
    repair_suggestions = []
    
    for defect in defects:
        problem_code = ""
        for code_line in defect["defect_block"]['code']:
            problem_code += code_line + "\n"
            
        repair_example = {
            "rule": defect["rule"],
            "description": defect["analysis"],
            "problem_code": problem_code,
            "line_of_interest": defect["defect_block"]['line of interest']
        }
        
        fix_prompt = get_fix_prompt(repair_example, model, tokenizer, index)
        repair_suggestions.append({
            "defect": defect,
            "fix_prompt": fix_prompt,
            "problem_code": problem_code,
            "line_of_interest": defect["defect_block"]['line of interest'],
            "analysis": defect["analysis"]
        })
        
    return repair_suggestions



In [13]:
model, tokenizer, index = load_model_and_index()



In [52]:
# 测试修复多个缺陷
repair_prompts = repair_multiple_defects(defects, model, tokenizer, index)
repair_results = []
for i, result in enumerate(repair_prompts):
    print(f"Processing defect {i+1} of {len(repair_prompts)}")
    ## 将修复建议和问题代码组合，传给大模型进行修复
    fix_prompt = result['fix_prompt'] + f"\n待修复代码为：\n```arkts\n{code}\n```\n出现错误的代码行：\n```arkts\n{result['line_of_interest']}\n```\n其中的problem_code(错误的代码上下文)为：\n```arkts\n{result['problem_code']}\n```\n修复建议是：\n{result['analysis']}\n请根据上面所给出的缺陷修复建议,对上述待修复代码开始修复(不需要考虑其它问题，仅考虑建议中提出的), 修复的时候需要给出有问题的代码段以及修复的代码段：" + """
请以如下json格式输出：
```json
{
  "problem_code": "需要和传入的problem_code(包含错误的代码上下文)保持一致",
  "problem_fix": "修复后的代码段(包含修复的代码以及上下文),请保留换行和缩进"
}
```
"""
    # print(f"修复建议: {fix_prompt}\n")
    # print('----------------')
    res = generate(fix_prompt, system_prompt="你是arkts代码修复专家。你将获得用户给出的错误代码以及问题类型，出现错误的代码所在的行以及上下文，以及对应问题类型的修复案例。请参考修复案例，根据用户给出的错误代码以及问题类型，帮助用户修复代码。")#, base_url='http://localhost:11434/v1', model='arktsLLM')
    print(res)
    repair_results.append(res)
    # break

Processing defect 1 of 1
 ```json
{
  "problem_code": "for (let i = 0; i < thisRowCount; ++i) {\n        const bd = new box2d.b2BodyDef();\n        bd.type = box2d.b2BodyType.b2_dynamicBody;\n\n        const n = j * thisRowCount + i;\n        // DEBUG: box2d.b2Assert(n < thisRowCount * VerticalStack.e_columnCount);\n        this.m_indices[n] = n;\n        bd.userData = this.m_indices[n];\n\n        const x = 0.0;\n        //const x = box2d.b2RandomRange(-0.02, 0.02);\n        //const x = i % 2 === 0 ? -0.01 : 0.01;\n        bd.position.Set(xs[j] + x, 0.55 + 1.1 * i);\n        const body = this.m_world.CreateBody(bd);\n\n        this.m_bodies[n] = body;\n\n        body.CreateFixture(fd);\n    }",
  "problem_fix": "for (let i = 0; i < thisRowCount; ++i) {\n        const bd = new box2d.b2BodyDef();\n        bd.type = box2d.b2BodyType.b2_dynamicBody;\n\n        const n = j * VerticalStack.e_rowCount + i;\n        // DEBUG: box2d.b2Assert(n < thisRowCount * VerticalStack.e_columnCount);\n  

In [43]:
def handle_result(res):
    # 从字符串中提取JSON部分
    start = res.find('```json')
    if start == -1:  # 如果没有找到```json
        return None
    start += 7
    end = res.find('```', start)
    if end == -1:  # 如果没有找到结尾的```
        # 取到字符串结尾
        json_str = res[start:].strip()
    else:
        json_str = res[start:end].strip()
    
    try:
        # 解析JSON字符串
        result_dict = json.loads(json_str)
        return {
            'problem_code': result_dict['problem_code'],
            'problem_fix': result_dict['problem_fix']
        }
    except json.JSONDecodeError as e:
        print(f"JSON解析错误: {e}")
        return None

for i, result in enumerate(repair_results):
    print(f"Processing defect {i+1} of {len(repair_results)}")
    data_json = handle_result(result)
    print('-'*50)
    print(data_json["problem_code"])
    print('-'*50)
    print(data_json["problem_fix"])
    print('-'*50)

Processing defect 1 of 1
--------------------------------------------------
for (let i = 0; i < VerticalStack.e_rowCount; ++i) {
    const bd = new box2d.b2BodyDef();
    bd.type = box2d.b2BodyType.b2_dynamicBody;

    const n = j * VerticalStack.e_rowCount + i;
    this.m_indices[n] = n;
    bd.userData = this.m_indices[n];

    const x = 0.0;
    bd.position.Set(xs[j] + x, 0.55 + 1.1 * i);
    const body = this.m_world.CreateBody(bd);

    this.m_bodies[n] = body;

    body.CreateFixture(fd);
}
--------------------------------------------------
const rowCount = VerticalStack.e_rowCount;
for (let i = 0; i < rowCount; ++i) {
    const bd = new box2d.b2BodyDef();
    bd.type = box2d.b2BodyType.b2_dynamicBody;

    const n = j * rowCount + i;
    this.m_indices[n] = n;
    bd.userData = this.m_indices[n];

    const x = 0.0;
    bd.position.Set(xs[j] + x, 0.55 + 1.1 * i);
    const body = this.m_world.CreateBody(bd);

    this.m_bodies[n] = body;

    body.CreateFixture(fd);
}
----------

In [44]:
## 将修复后的代码替换到原代码中

res = generate(f"代码如下:\n```arkts\n{code}\n```\n, 修复的patch如下: \n{json.dumps(repair_results, indent=4)}\n请根据上面的patch来对代码进行修复", system_prompt="你是arkts代码修复专家，我将给出你代码中存在的缺陷，以及解决这些缺陷需要进行的patch，请你帮我执行这些patch，得到修复后的代码", base_url='https://xiaoai.plus/v1', model='gpt-4o-2024-08-06')
print(res)

In [45]:
print(res)

根据提供的修复建议，需要对代码进行如下修改：

原代码段：
```arkts
for (let i = 0; i < VerticalStack.e_rowCount; ++i) {
    const bd = new box2d.b2BodyDef();
    bd.type = box2d.b2BodyType.b2_dynamicBody;

    const n = j * VerticalStack.e_rowCount + i;
    this.m_indices[n] = n;
    bd.userData = this.m_indices[n];

    const x = 0.0;
    bd.position.Set(xs[j] + x, 0.55 + 1.1 * i);
    const body = this.m_world.CreateBody(bd);

    this.m_bodies[n] = body;

    body.CreateFixture(fd);
}
```

修复后的代码段：
```arkts
const rowCount = VerticalStack.e_rowCount;
for (let i = 0; i < rowCount; ++i) {
    const bd = new box2d.b2BodyDef();
    bd.type = box2d.b2BodyType.b2_dynamicBody;

    const n = j * rowCount + i;
    this.m_indices[n] = n;
    bd.userData = this.m_indices[n];

    const x = 0.0;
    bd.position.Set(xs[j] + x, 0.55 + 1.1 * i);
    const body = this.m_world.CreateBody(bd);

    this.m_bodies[n] = body;

    body.CreateFixture(fd);
}
```

通过将 `VerticalStack.e_rowCount` 提取为局部常量 `rowCount`，可以在循环中重复使用，该修改易于维护且提