# 生成与迭代反馈

In [4]:
import os
import json
from pprint import pprint
import subprocess
import json
import argparse
import numpy as np
import re
from scipy.fftpack import fft
from typing import Tuple
from pathlib import Path
from typing import Dict
from langchain_openai import ChatOpenAI
from functools import partial
from scipy.signal import lfilter

# --------------------------------------- API配置 ----------------------------------------
os.environ["OPENAI_API_KEY"] = "***********************************"
os.environ["OPENAI_API_BASE"] = "***********************************"
model = ChatOpenAI(model="gpt-4o")

# 流式输出结果
def stream_and_model(question, _model) -> str:
    all_text = ""
    for chunk in _model.stream(question):
        print(chunk.content, end="", flush=True)
        all_text += chunk.content
    return all_text
stream = partial(stream_and_model, _model=model)
# --------------------------------------- 基础配置 ----------------------------------------
task_name = "csr" #cpualu idu imm RegisterFile csr cpu_top
workspace = Path("Result/SPEC_test")  # /SPEC_test
questions_path = Path("G:/Desktop/LLM_ICDesign/课题/SPEC_test") / task_name   # /SPEC_test
problem_prompt_path = questions_path / f"{task_name}_prompt.txt"

# ----------------------------------- LLM 第一次生成结果 -----------------------------------
history = []
history.append("System: You are a Verilog expert, I need your help to write verilog code for me")
history.append(f"User: {problem_prompt_path.read_text()}")
history.append(f"User: above is all the information for you to solve the problem. Remember")
history.append(f"User: Now you analyze the problem and at last write the code for me.")

ans = stream(str(history))
history.append(f"AI: {ans}")

def remove_markdown(json_text) -> str:
    json_text = json_text.replace("```json", "").replace("```", "")
    return json_text

def abstract_code_from_answer(ans: str) -> Dict:
    prompt = f"""User: You have analyzed the verilog code writing problem and write the code for me.
I need you to extract the analysis and code from the above text.
You should output in Json format as below:
{{
    analysis: "Your analysis here",
    code: "Your code here"
}}
-------------------------
Here is the text:
{ans}
"""
    ans = stream(prompt)
    return json.loads(remove_markdown(ans))

code = abstract_code_from_answer(ans)["code"]
print("\n-----------------------------code generate by LLM -------------------------------------------")
print(code)
history.append("AI: The first generated code:")
history.append(code)

# -----------------------------------对LLM 生成的结果进行测试-----------------------------------    
# ----------------------------------- 声明语法和功能检查工具 -----------------------------------   
def solve_syntax_error(code: str, syntax_err: str) -> Tuple[str, str]:
    """return analyze and corrected code"""
    prompt = f"""Please solve the syntax error according to the error message below:
======================Code======================
{code}
======================Error======================
{syntax_err}
=================================================
Please analysis the error, and at last give me the corrected code.
"""
    # Syntax Error 的改进方向：
    ans = stream(prompt)
    ans = abstract_code_from_answer(ans)
        
    analysis = ans["analysis"]
    corrected_code = ans["code"]
    
    corrected_code = corrected_code.replace("```verilog", "").replace("```", "")
    return analysis, corrected_code
    
def solve_functional_error(code: str, functional_error: str) -> Tuple[str, str]:
    """return analyze and corrected code"""
    prompt = f"""Please solve the functional error according to the error message below:
======================Code======================
{code}
======================Error======================
{functional_error}
=================================================
Please analysis the error, and at last give me the corrected code.
"""
    # Functional Error 的改进较为复杂，不同的电路会有不同的debug方法，同学们可以挑选一种电路，尝试为他设计一种改进方法。
    # 如真值表电路，可以添加一条思维链，提取代码中的所有公式，然后与真值表对比是否正确，或者用大模型将代码转化为python代码，用真值表的结论来演算，获得更多反馈来给大模型目前的结果打分
    
    ans = stream(prompt)
    ans = abstract_code_from_answer(ans)
        
    analysis = ans["analysis"]
    corrected_code = ans["code"]
    
    corrected_code = corrected_code.replace("```verilog", "").replace("```", "")
    return analysis, corrected_code

def is_their_any_functional_error(vvp_output: str) -> bool:
    prompt = f"""Here is the output runned by IVerilog.
    Please check if it means there is any functional error in the code.
    =========================
    {vvp_output}
    =========================
    If there is any functional error, the text above will tell you there is mismatch in some Output.
    If all the output has no mismatch, then there is no functional error.
    Give me your analysis and at the end firmly tell me if there is any functional error or not.
    """
    ans = stream(prompt)
    
    prompt = f"""Please tell me if there is any functional error in the code. You will output in Json format as below:
    if there is no functional_error, functional_ok should be True, otherwise False
    {{
        analysis: "Your analysis here",
        functional_ok: bool
    }}
    =============================
    Here is the text: {ans}
    """
    ans = stream(prompt)
    return json.loads(remove_markdown(ans))["functional_ok"]

def run_iverilog(task_name: str, rtl_code: str, tb_code: str, ref_code:str,subref_code: str = "" , sv_flag = bool) -> Tuple[str, str]:
    """编译 Verilog 代码

    Returns:
        Tuple[str, str]: syntax, functional
    """
    rtl_code = rtl_code.replace("```verilog", "").replace("```", "")
    tb_code = tb_code.replace("```verilog", "").replace("```", "")
    
    
    workdir = workspace / task_name
    workdir.mkdir(parents=True, exist_ok=True) # 如果目录不存在，则会创建它及其父目录
    
    rtl_path = workdir / f"{task_name}_solve.v"
    rtl_path.write_text(rtl_code)
    
    if sv_flag:
        tb_path = workdir / f"{task_name}_test.sv"
    else:
        tb_path = workdir / f"{task_name}_test.v"
    tb_path.write_text(tb_code)
    
    if task_name == "idu":
        ref_code = ref_code.replace("```verilog", "").replace("```", "")
        ref_path = workdir / f"{task_name}_ref.v"
        ref_path.write_text(ref_code)
    
    if subref_code:
        subref_path = workdir / "subref.v"
        subref_path.write_text(subref_code)
    
    output_file_path = workdir / "result.out"  # 定义了编译后的输出文件路径
    
    # 使用 iverilog 工具编译 solve.v、test.v 和 ref.v 文件，并将输出写入到 result.out 文件中
    # compile_cmd = ["iverilog", "-o", str(output_file_path), "-I", str(workdir), str(rtl_path), str(tb_path)]
    compile_cmd = ["iverilog"]
    if sv_flag:
        compile_cmd.append("-g2012")
        
    # compile_cmd.extend(["-o", str(output_file_path), "-I", str(workdir), str(rtl_path), str(tb_path)])
    compile_cmd.extend(["-o", str(output_file_path), str(rtl_path), str(tb_path)])
    
    if task_name == "idu":
        compile_cmd.append(str(ref_path))
        
      
    if subref_code:
        compile_cmd.append(str(subref_path))
        
    print("compile_cmd:",compile_cmd)
    print("start compile")
    
    result = subprocess.run(compile_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    syntax_err = result.stderr.decode("utf-8")
    if syntax_err:
        return syntax_err, None , None
    print("compile success")
    print("start run")
    # 运行仿真并生成 VCD 文件
    vcd_file_path = workdir / "Result.vcd"
    result = subprocess.run(["vvp", str(output_file_path)], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    functional_error = result.stdout.decode("utf-8")
    print("run success")
    return None, functional_error, str(vcd_file_path)
# ----------------判断 是否具有语法或者功能错误 以及在没有错误的时候检查计算是否正确----------------
# 原始的tb code 、 ref code 、subref code 路径  便于后续的调用
if (task_name != "csr") & (task_name != "cpu_top"):
    # 生成带有 _tb 后缀的文件名
    tb_task_name = f"{task_name}_tb"
    # 检查 questions_path 路径下是否有以 .sv 结尾的文件
    sv_files = list(questions_path.glob("*.sv"))
    if sv_files:
        tb_filename = f"{tb_task_name}.sv"
        sv_flag = True
    else:
        tb_filename = f"{tb_task_name}.v"
        sv_flag = False
    tb_code = (questions_path / tb_filename).read_text()
else:
    tb_code = ""
    sv_flag = False
# 读取 ref.v 文件
ref_code = (questions_path / f"{task_name}_ref.v").read_text()

subref_file = questions_path / "subref.v"
if subref_file.exists():
    subref_code = subref_file.read_text()
else:
    subref_code = ""
    
# 一直循环直到没有语法和功能错误 或者达到最大迭代次数
max_iter = 5
current_round = 1

# # ----------------- Test -----------------
rtl_code = code
syntax_err, functional_error, vcd_file = run_iverilog(task_name, rtl_code,tb_code,ref_code,subref_code,sv_flag)
print("sunatax error:",syntax_err)
if syntax_err != None:
    print("over")
else:
    print("functional_error",functional_error)
    
    # functional_ok = is_their_any_functional_error(functional_error)
    # if not functional_ok :
    #         print("Functional error:", functional_error)
    # else:
    #     print("No Functional error")


# while True:
#     print(f"====================================================DEBUG ROUND {current_round}, current history length: {len(str(history))}=========================================================")
#     # 使用iverilog来检查是否有语法错误和功能错误
#     # 获取上一次生成的code的代码
#     rtl_code = code
#     syntax_err, functional_error, vcd_file = run_iverilog(task_name, rtl_code, tb_code,subref_code)
#     # 判断是否有语法错误
#     if syntax_err:
#         print("Syntax error:", syntax_err)
#         history.append("Tool: Syntax error occured! Tool output is below:")
#         history.append(syntax_err)
#         analysis, code = solve_syntax_error(code, syntax_err)
#     else:
#         print("No Syntax error")
#         functional_ok = is_their_any_functional_error(functional_error)
#         # 判断是否有功能错误
#         if not functional_ok :
#             print("Functional error:", functional_error)
#             history.append("Tool: Functional error occured! Tool output is below")
#             history.append(functional_error)
#             analysis, code = solve_functional_error(code, functional_error)
#         else:
#             print("No Functional error")
#         # 与正确结果对比
#         # if not syntax_err and not functional_ok:
#             mse_ok = is_mse_error_ok(task_name)
#             if not mse_ok:
#                 print("MSE is large")
#                 history.append("Tool: Functional error occured! Tool output is below")
#                 history.append("The MSE of the result is larger than the threadhold, which means the calculation steps have promblems.Please check the code again!")
#                 history.append("You should pay close attention to the code in the middle of the generate statement in the source code. Other code does not need to be modified, but you should also pay attention to the range of m k should match the size dimensions of en_comnect, xm_deal, and xm_imag arrays")
#                 analysis, code = solve_functional_error(code, functional_error)
#             else:
#                 print("Congratulations! All is well!")
#                 # 将代码写入文件
#                 llm_gen_path = workspace / task_name / f"{task_name}_llm_gen.v"
#                 with llm_gen_path.open("w") as f:
#                     f.write(code)
#                     print(f"Code has been written to {llm_gen_path}")
#                 break
            
#     if current_round > max_iter:
#         print("Max Iteration reached")
#         break
#     else :
#         current_round += 1
        
#     history.append(f"AI: Analysis for the error: {analysis}")
#     history.append(f"AI: modified code:")
#     history.append(code)

To implement the Control and Status Register (CSR) module as described, we need to adhere to the specified requirements for register initialization, exception handling, CSR writes, and reads. Here's a step-by-step breakdown followed by the Verilog code:

### Step-by-Step Analysis
1. **Definitions and Registers**:
   - We define CSR addresses using parameters.
   - Internal registers include `mtvec_BASE`, `mtvec_MODE`, `mstatus`, `mepc`, `mcause_INTR`, and `mcause_ECODE`.

2. **Wire Signals**:
   - `mtvecout` and `mcauseout` are formed by concatenating relevant register bits.
   - `ecode` is a wire that holds the exception code.

3. **Input Ports**:
   - Include clock `clk`, `reset`, `csr_add`, `csr_wdata`, `csr_write`, `csr_read`, `ecall`, `mret`, `epc`, and `csr_mask`.

4. **Output Ports**:
   - `entry` is a wire that provides the complete interrupt vector base address.
   - `csr_rdata` is a register that stores the output data from CSR reads.

5. **Register Initialization and Reset**