In [1]:
from modelscope.utils.hf_util import AutoModelForCausalLM, AutoTokenizer,snapshot_download
import torch
import time
import os
import re
import pandas as pd
import numpy as np
# Whether the model has loaded.
start = 0

# We strongly recommend using independently python environment on each LLM.
model_dir_list = ["D:\Jupyter_Code\model\Yi-34B-Chat-4bits", 
                  "D:\Jupyter_Code\model\Yi-6B",
                  "D:\Jupyter_Code\model\Phi-3-mini-4k-instruct",
                  "D:\Jupyter_Code\model\LLM-Research\Llama-3.2-3B",
                  "D:\\Jupyter_Code\\model\\Qwen-14B-Chat-Int8"]

2024-12-11 21:43:35,108 - modelscope - INFO - PyTorch version 2.1.1+cu121 Found.
2024-12-11 21:43:35,109 - modelscope - INFO - Loading ast index from C:\Users\Administrator\.cache\modelscope\ast_indexer
2024-12-11 21:43:35,143 - modelscope - INFO - Loading done! Current index file version is 1.10.0, with md5 256e9fa31dca7b4f62f9ab98dddffdae and a total number of 946 components indexed


In [2]:
# Which model in model_dir_list will be used.
model_index = 4
# path of choice list.
choice_path = "dataset\\RQ2_dataset\\choice\\"
# path of test target.
task_path = "dataset\\RQ2_dataset\\task\\"
# path of the prompt.
prompt_path = "dataset\\RQ2_dataset\\prompt\\"
# Your textual and selection index result will be saved in this folder.
result_path = "result\\"

In [3]:
def Yi_text_generation(input, max_new_tokens = 256, temperature = 1, top_k = 3, do_sample = True, print = True):
    '''
    This function returns the string result of LLM.
    We package this LLM into one function. Just call this function, and it will give the result(response).
    There are three models: Yi-6B, Yi-34B(cannnot run in this computer), and Yi-34B-Chat-4bits.
    The format of input depends on your mode. If mode is 0, input should be a str which the model will generate the text after it;
    if mode is 1, input should be the format of chat. We highly recommend you use mode 0.
    Other parameters are used to give LLM. 
    Parameters maybe not important when using GPT-4o, but it is important when using open-source LLM. 
    The best parameter depends on your task. You may try some times to get the optimum.
    To make it more convenient to use, we give the default parameters. You can set it when you use this function.

    '''
    global start, model_dir, model, tokenizer, model_index

    if start == 0:
        start = 1
        model_dir = model_dir_list[model_index]
        if model_index == 0:
            model = AutoModelForCausalLM.from_pretrained(model_dir, device_map="cuda", torch_dtype="auto",
                                                         offload_folder="offload_folder", trust_remote_code=True)
        elif model_index == 1:
            model = AutoModelForCausalLM.from_pretrained(model_dir, device_map="cuda", trust_remote_code=True)
        elif model_index == 2:
            model = AutoModelForCausalLM.from_pretrained(model_dir, device_map="auto", trust_remote_code=True)
        elif model_index == 3:
            model = AutoModelForCausalLM.from_pretrained(model_dir, device_map="auto", trust_remote_code=True,)
        elif model_index == 4:
            model = AutoModelForCausalLM.from_pretrained(model_dir, device_map="auto", trust_remote_code=True,)
        
        tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True, download_path = download_path)
    model_inputs = tokenizer(input, return_tensors="pt")
    inputs = tokenizer.encode(input, return_tensors="pt")
    attention_mask = torch.ones(inputs.shape, dtype = torch.long, device="cuda")
    outputs = model.generate(model_inputs.input_ids.cuda(), 
                             temperature = temperature,
                             top_k = top_k,
                             do_sample = do_sample,
                             max_new_tokens = max_new_tokens,
                             attention_mask = attention_mask,
                             pad_token_id = tokenizer.eos_token_id
                             )
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    if print == True:
        print(response)
    return response

In [5]:
def compare(str_return, choice_list):
    # Compare result of LLM and the choice_list and ground truth; return the index of selected choice.
    # find_id is the return value; 0 means fail to select any widget.
    find_id = 0
    # find_pos means the match position in the return string of LLM, aim at to find the first one matched. 
    find_pos = len(str_return) + 1
    # We try to match the first occurrence of a valid widget name
    ad = 1
    for choice in choice_list:
        choice = choice[choice.find(":") + 2:]
        now_pos = str_return.find(choice)
        if now_pos >= 0 and now_pos < find_pos:
            find_id = ad
            find_pos = now_pos
        ad = ad + 1
    # If find_id is not zero, return the value.
    if find_id != 0:
        return find_id
    # Else we try to match the index.
    for i in range(len(choice_list)):
        str_com = str(i + 1)
        now_pos = str_return.find(choice)
        if now_pos >= 0 and now_pos < find_pos:
            find_id = ad
            find_pos = now_pos
        ad = ad + 1
    # Whether success or not, return the find_id.
    return find_id

In [7]:
def group_text_generation(file_str, std_str, num_res, print_step = True, cost_time = 0):
    choice_list = []
    choice_name = choice_path + "choice" + std_str + ".txt"
    with open(choice_name, 'r') as file: 
        line = file.readline() 
        while line: 
            line = line.replace("\n", "")
            choice_list.append(line)
            line = file.readline()
    # test target and ground truth are saved in task_path.
    np_task = np.array(pd.read_csv(task_path + "task" + std_str + ".csv"))
    np_result = np.zeros((5, 12))
    np_time = np.zeros((5, 12))
    np_score = np.zeros(12)
    start_time = time.time()
    record_time = time.time()
    

    if print_step == True:
        print("start", end = "...")
    str_filename = prompt_path + file_str
    message = ""
    with open(str_filename, 'r', encoding = "UTF-8") as f:
        message = f.read()
    for topk in range(2, 5):
        for temp in range(7, 11):
            np_id = (topk - 2) * 4 + (temp - 7)
            for repeat in range(0, 5):
                str_return = Yi_text_generation(message, max_new_tokens = 128, temperature = temp / 10, top_k = topk, print = False)
                str_return = str_return[str_return.index("###"):]
                with open (result_path + "result" + std_str + ".txt", "a+", encoding = "UTF-8") as f:
                    f.write("###Answer top_k = " + str(topk) + ", temperature = " + str(temp / 10) + ", repeat time = " + str(repeat + 1) + "\n")
                    f.write(str_return)
                    f.write("\n###Answer end\n")
                comp_res = compare(str_return, choice_list)
                np_result[repeat][np_id] = comp_res
                mid_time = time.time()
                np_time[repeat][np_id] = mid_time - record_time
                record_time = mid_time
                if comp_res == int(np_task[num_res][1]):
                    np_score[np_id] = np_score[np_id] + 0
                else:
                    np_score[np_id] = np_score[np_id] + 0.2
    if print_step == True:
        now_time = time.time()
        print("end, time cost: ", str(now_time - start_time), " seconds", end = "\n")
            
    pd_result = pd.DataFrame(np_result)
    pd_score = pd.DataFrame(np_score)
    pd_time = pd.DataFrame(np_time)
    pd_result.to_csv(result_path + "result" + std_str + "_" + str(num_res) + ".csv")
    pd_score.to_csv(result_path + "score" + std_str + "_" + str(num_res) + ".csv")
    pd_time.to_csv(result_path + "time" + std_str + "_" + str(num_res) + ".csv")

In [None]:
choice_files = os.listdir(choice_path)
# Given the lengthy experimental procedure, this variable allows resuming the experiment from the last interruption point.
result_files = os.listdir(result_path)
for choice_file in choice_files:
    num_pat = re.compile(r"[0-9]+")
    num_res = int(num_pat.findall(choice_file)[-1])
    #print(num_res)
    std_file_str = re.sub(r"[0-9]+.txt", "", choice_file)
    std_file_str = std_file_str.replace("LLM_prompt", "")
    #print(choice_file, std_file_str)

    find_file = 0
    for result_file in result_files:
        if result_file.find(".csv") == -1:
            continue
        if result_file.find(std_file_str + "_" + str(num_res)) != -1:
            find_file = 1
    if find_file == 1:
        continue

    print(choice_file, ",", end = "")
    group_text_generation(choice_file, std_file_str, num_res)