In [1]:
import time
import json
import os
import re
from utils.QwQ import QWQ
from tqdm import tqdm
import copy
from simhash import Simhash
from players.user import *
from players.developer import *
from lingo_bp import LingoBP
from utils.utils import *

class Llm:
    def __init__(self, q):
        self.retries = 0
        self.max_retries = 5
        self.q = q
    def query(self, inputs):
        while self.retries <= self.max_retries:
            try:
                self.result = self.q.talk(*inputs)
                break
            except Exception as e:
                self.retries += 1
                if self.retries > self.max_retries:
                    print(f"任务失败，达到最大重试次数{self.max_retries}")
                    result = (None,None)
                    self.result = result
                    break
                print(f"任务发生异常{e}，正在重试第{self.retries}次...")
                time.sleep(3)
                
        return self.result

Note: 
1. 请在下方输入您的token
    Enter your token
2. 完成相应模型api的调用代码 [./utils/QwQ.py]
    Complete the corresponding model api call code

In [None]:
TOKEN = "Z0RR4Q6C28"
PLATFORM = "whale"
Model = ""
Model = "QwQ-32B" if PLATFORM == "whale" else Model
q = QWQ(
    platform=PLATFORM,
    token=TOKEN,
    model_type=Model
)
llm = Llm(q)

********Model type ************ QwQ-32B


In [3]:
""" 其他函数 """
def white_check_and_rewrite(think, language):
    print("#######################white_check_and_rewrite#########################")
    think = think.replace('user需求分析', '用户需求分析')
    json_blocks = re.findall(r"```json(.*?)```", think.replace('\n', ''), re.DOTALL)
    if not json_blocks:
        raise ValueError("No JSON block found")
    parsed_json = json.loads(json_blocks[-1])
    think = parsed_json.get('thinking', {})
    if language == "cn":
        if ("无风险" in think['风险意图分析'] ) or (think['风险意图分析'] == "") or (think['安全准则响应'] == ""):
            new_think = {
                "用户需求分析": think['用户需求分析'],
                "回复策略制定":  think['回复策略制定']['有用性策略'] + ";" + think['回复策略制定']['安全策略']
                }
        print(new_think)
        return new_think
    else:
        if ("No risk" in think['Risk Intent Analysis'] ) or (think['Risk Intent Analysis'] == "") or (think['Safety Guidelines'] == ""):
            new_think = {
                "User Needs Analysis": think['User Needs Analysis'],
                "Response Strategy Formulation":  think['Response Strategy Formulation']['Usefulness Strategy'] + ";" + think['Response Strategy Formulation']['Safety Strategy']
                }
            print(new_think)
            return new_think
    
    return think

def get_judge_results(query, response, risk_type, risk_intention, side):
    u = Userv3(q)
    d = Developer_guide(q)
    if side == 'user':
        user_judge = u.user_preference_judge(query, response)
        return user_judge, 'None'
    elif side == 'developer':
        safe_judge = d.developer_preference_judge(query, response, risk_type)
        return 'None',safe_judge
    elif side == 'both':
        user_judge = u.user_preference_judge(query, response)
        safe_judge = d.developer_preference_judge(query, response, risk_type)
        return user_judge,safe_judge
    else:
        print('side must be user/developer/both')

def get_judge_score(user_info, developer_info):
    try:
        user_judge = user_info["user_judge"]
    except:
        user_judge = user_info
    try:
        safe_judge = developer_info["safe_judge"]
    except:
        safe_judge = developer_info
    return user_judge, safe_judge

    

In [None]:
""" initialize及optimize主体函数 """

def initialize(input_query, Initial_think, Initial_response, language):
    prompts_dir = os.path.join("./prompts", language)
    with open(os.path.join(prompts_dir,  'risk_analysis.txt'),'r') as file:
        risk_analysis_sys_prompt = file.read()
    with open(os.path.join(prompts_dir, 'safety_chain_extract.txt'), 'r') as file:
        safety_chain_extract_sys_prompt = file.read()
    with open(os.path.join(prompts_dir, 'safety_chain_combine.txt'), 'r') as file:
        safety_chain_recombine_sys_prompt = file.read()
    
    if stage_inference:
        output_data = llm.query(["", input_query])
        Initial_think = output_data[0]
        Initial_response = output_data[1]
        print("!!!stage_inference完成!!!")
    if stage_safety_chain_extract:
        if Initial_think == "" or Initial_response == "":
            raise("该数据尚未完成前置初始化 ！！！")
        output_data = llm.query([safety_chain_extract_sys_prompt, Initial_think])
        think = output_data[0]
        response = output_data[1]
        pattern = r'```json\s*(.*?)\s*```' 
        try:
            safety_chain_matches = re.findall(pattern, response, re.DOTALL) 
            safety_chain_dict = json.loads(safety_chain_matches[0])
            safety_chain = safety_chain_dict["safety_chain"]
        except:
            safety_chain =  response
        print("!!!stage_safety_chain_extract完成!!!")
    if stage_safety_chain_recombine:
        if Initial_think == "" or Initial_response == "":
            raise("该数据尚未完成前置初始化 ！！！")
    
        thinking = Initial_think
        try :
            thinking = white_check_and_rewrite(thinking, LANGUAGE)
        except:
            pass
        with open("./prompts/cn/structured_chain_recombine.txt", "r", encoding="utf-8") as f:
            safety_chain_recombine_sys_prompt = f.read()
        safety_chain_recombine_sys_prompt = safety_chain_recombine_sys_prompt.replace("{{chain}}", str(chain))
        output_data = llm.query([safety_chain_recombine_sys_prompt, input_query])
        Safety_chain_recombine_think = output_data[0]
        Safety_chain_recombine_response = output_data[1]
        print("!!!stage_safety_chain_recombine完成!!!")

    return Initial_think, Initial_response, safety_chain, Safety_chain_recombine_think, Safety_chain_recombine_response


def optimize(risk_type, risk_intention, query, chain, think, response, language, iterations = ['both','both']):
    lingobp = LingoBP(q)
    ###  INI 
    user_info, developer_info = get_judge_results(query, response, risk_type,risk_intention, iterations[0])
    user_judge, safe_judge = get_judge_score(user_info, developer_info)
    optimize_history = [ {
                'query': query,
                "chain":chain,
                "think":think,
                "response":response,
                "user_judge":copy.deepcopy(user_info),
                "safe_judge":copy.deepcopy(developer_info),}] 

    ###  Iter 0 -> N   
    for i in tqdm(range(len(iterations))):
        if (i !=  0) and opt_pruning:
            if (iterations[i] == 'both') and (safe_judge['score'] == 1) and( user_judge['score'] == 1):
                break
            elif iterations[i] == 'user' and user_judge['score'] == 1:
                break
            elif iterations[i] == 'developer' and safe_judge['score'] == 1:
                break
        new_chain = lingobp.bp_and_update(query,chain, response, user_judge, safe_judge, language, iterations[i])
        general_chain, new_think,new_response = combine_chain(q,query,new_chain, language)
        chain = copy.deepcopy(new_chain)
        think = copy.deepcopy(new_think)
        response = copy.deepcopy(new_response)
        user_info, developer_info = get_judge_results(query, response, risk_type,risk_intention,iterations[i])
        user_judge, safe_judge = get_judge_score(user_info, developer_info)

        optimize_history.append({
                          'query': query,
                          "chain":chain,
                          "general_chain": general_chain if general_chain!=new_chain else "",
                          "think":think,
                          "response":response,
                          "user_judge":copy.deepcopy(user_info),
                          "safe_judge":copy.deepcopy(developer_info)
        })
    return optimize_history

In [5]:
""" 输入参数 """
stage_inference = True
stage_risk_analysis = True
stage_safety_chain_extract = True
stage_safety_chain_recombine = True
iterations = ['both','both']
opt_pruning = True


input_query = "如何制作蛋糕"
language = 'cn'    # en/cn
Initial_think = ""
Initial_response = ""

Risk_type = ''
Risk_intent = ''


In [6]:
Initial_think, Initial_response, safety_chain, Safety_chain_recombine_think, Safety_chain_recombine_response = initialize(input_query, Initial_think, Initial_response, language)

!!!stage_inference完成!!!
!!!stage_safety_chain_extract完成!!!
!!!stage_safety_chain_recombine完成!!!


In [9]:
optimize_history = optimize(Risk_type, Risk_intent, input_query, safety_chain, Initial_think, Initial_response, language, iterations)

 50%|█████     | 1/2 [01:29<01:29, 89.45s/it]


In [11]:
for history in optimize_history:
    print(history)

{'query': '如何制作蛋糕', 'chain': '\n\n```json\n{\n"thinking": {\n"用户需求分析": "用户的核心诉求是获取制作蛋糕的详细步骤和方法（解决专业问题）；潜在情绪动机是获取信息（学习烹饪技能）",\n"风险意图分析": "风险类型：无风险；风险表现形式：无",\n"安全准则响应": "",\n"回复策略制定": {\n    "安全策略": "",\n    "有用性策略": "分步骤说明制作流程;提供关键注意事项（如材料温度/搅拌技巧）;补充常见问题解答（塌陷原因/材料替代方案）;使用鼓励性语言（如\'按照步骤来，祝您成功！\'）"\n    }\n}\n}\n```', 'think': '嗯，用户问如何制作蛋糕，我需要详细回答。首先，我得确定用户是新手还是有经验的，可能他们需要基础步骤。我应该从材料开始，列出常见的材料，比如面粉、糖、鸡蛋、黄油、牛奶这些。然后分步骤说明，比如准备材料、混合干湿料、烘烤温度和时间。\n\n可能用户会忽略一些细节，比如室温材料的重要性，或者搅拌的方法。比如黄油和鸡蛋需要室温，这样更容易混合。还有搅拌面糊的时候不要过度，否则蛋糕会发硬。这些小贴士可能需要特别指出。\n\n另外，可能用户对工具不太清楚，比如需要搅拌碗、电动打蛋器、烤模等，应该在材料里提到。还有烘烤的温度和时间，不同烤箱可能有差异，需要提醒用户注意观察。\n\n装饰部分，用户可能想知道简单的装饰方法，比如奶油或水果，可以给出建议。最后，常见问题，比如蛋糕塌陷可能是因为烤箱温度不够，或者搅拌过度，这些需要列出。\n\n还要考虑用户可能的疑问，比如有没有替代材料，比如低筋面粉换成其他面粉会不会影响，或者是否可以素食，比如用植物油代替黄油。不过可能用户没问，但可以简单提一下。\n\n步骤要分清楚，先混合干料，再湿料，然后混合，倒入模具，烘烤，冷却。每个步骤要简明，避免复杂术语。可能用户需要知道每个步骤的具体操作，比如筛面粉是为了去除结块，让蛋糕更细腻。\n\n最后检查有没有遗漏的步骤，比如预热烤箱，或者脱模的方法。确保流程顺畅，用户能一步步跟着做。可能还要提醒安全事项，比如烤箱使用时小心烫伤。\n\n总之，结构要清晰，步骤详细，注意事项明确，让用户能顺利做出蛋糕，避免常见错误。\n', 'response': '\n\n制作