In [1]:
import os
import re
import math
import random
from typing import List
import pandas as pd
import numpy as np
import argparse
import torch
import datasets
from tqdm import tqdm
from transformers.trainer_utils import set_seed
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers import BitsAndBytesConfig
from transformers.generation import GenerationConfig
from datasets import load_dataset, load_from_disk
import jsonlines
import textwrap


class args:
    checkpoint_path = '/gemini/code/lamma3_eval/lamma3_model/8B_instruct'
    eval_data_path = '/gemini/code/lamma3_eval/eval_data/humaneval'
    save_result_dir = "/gemini/code/lamma3_eval/eval_result/humaneval_chat"
    # choices = ["A", "B", "C", "D"]
    debug = False
    overwrite = False
    batch_size = 16

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# dataset = datasets.load_dataset("openai_humaneval")
# dataset.save_to_disk(args.eval_data_path)

dataset = load_from_disk(args.eval_data_path)

In [3]:
print(dataset["test"]["task_id"])

['HumanEval/0', 'HumanEval/1', 'HumanEval/2', 'HumanEval/3', 'HumanEval/4', 'HumanEval/5', 'HumanEval/6', 'HumanEval/7', 'HumanEval/8', 'HumanEval/9', 'HumanEval/10', 'HumanEval/11', 'HumanEval/12', 'HumanEval/13', 'HumanEval/14', 'HumanEval/15', 'HumanEval/16', 'HumanEval/17', 'HumanEval/18', 'HumanEval/19', 'HumanEval/20', 'HumanEval/21', 'HumanEval/22', 'HumanEval/23', 'HumanEval/24', 'HumanEval/25', 'HumanEval/26', 'HumanEval/27', 'HumanEval/28', 'HumanEval/29', 'HumanEval/30', 'HumanEval/31', 'HumanEval/32', 'HumanEval/33', 'HumanEval/34', 'HumanEval/35', 'HumanEval/36', 'HumanEval/37', 'HumanEval/38', 'HumanEval/39', 'HumanEval/40', 'HumanEval/41', 'HumanEval/42', 'HumanEval/43', 'HumanEval/44', 'HumanEval/45', 'HumanEval/46', 'HumanEval/47', 'HumanEval/48', 'HumanEval/49', 'HumanEval/50', 'HumanEval/51', 'HumanEval/52', 'HumanEval/53', 'HumanEval/54', 'HumanEval/55', 'HumanEval/56', 'HumanEval/57', 'HumanEval/58', 'HumanEval/59', 'HumanEval/60', 'HumanEval/61', 'HumanEval/62', '

In [4]:
def load_models_tokenizer():
    tokenizer = AutoTokenizer.from_pretrained(
        args.checkpoint_path,
        padding_side='left'
    )

    quantization_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_compute_dtype=torch.bfloat16,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4")
    
    model = AutoModelForCausalLM.from_pretrained(
        args.checkpoint_path,
        device_map="auto",
        # quantization_config=quantization_config
        # torch_dtype=torch.bfloat16
    ).eval()
    model.generation_config = GenerationConfig.from_pretrained(
        args.checkpoint_path
    )
    # model.generation_config.do_sample = False  # use greedy decoding
    # model.generation_config.repetition_penalty = 1.0  # disable repetition penalty
    return model, tokenizer

In [5]:
model, tokenizer = load_models_tokenizer()
tokenizer.pad_token_id = tokenizer.eos_token_id

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Loading checkpoint shards: 100%|██████████| 4/4 [00:36<00:00,  9.22s/it]


In [6]:
def batch_process(func, *args):
    '''
    args 负责接受 一个或多个 batch
    '''
    # print(f'args len: {len(args)}')
    if len(args) > 1:
        elem_len = len(args[0])
        assert all(len(elem) == elem_len for elem in args), "各参数长度不同"

    text_ls = []
    for sample in zip(*args):
        text_ls.append(func(*sample))
    return text_ls

In [7]:
def clear_output_item(text, raw_txt_len):  # 可能需要改进，可以把raw_text长度传进来，直接截断，然后提取第一个回答。这里相当 取了最后一个回答
    pad = tokenizer.pad_token
    output = text
    
    # 先去掉填充
    output = output.replace(pad, "").strip()
    # 再截取
    output = output[raw_txt_len:]

    # 最后去掉停止token
    stop_words = ["<|end_of_text|>", "<|eot_id|>"]
    for sw in stop_words:
        output = output.replace(sw, "").strip()
    
    assert output != "", f"输出为空\n{text}"
    return output

def clear_output(text, raw_txt_len):
    return batch_process(clear_output_item, text, raw_txt_len)

In [8]:
def generate_answer(model, tokenizer, input_txt):
    chat_template = [[{'content': t, 'role': 'user'}] for t in input_txt]
    input_txt = tokenizer.apply_chat_template(chat_template, tokenize=False, add_generation_prompt=True)
    input_ids = tokenizer(input_txt, padding=True, return_tensors="pt",add_special_tokens=False).to(model.device)

    raw_txt_len = [len(t) for t in input_txt]
    # print(input_txt)
    # print(input_ids['input_ids'])
    # print(input_ids['attention_mask'])

    outputs_id = model.generate(**input_ids, max_new_tokens = 256, eos_token_id = 128009, pad_token_id = tokenizer.pad_token_id)
    
    outputs = tokenizer.batch_decode(outputs_id, skip_special_tokens=False)
    answer = clear_output(outputs, raw_txt_len)

    return answer

In [9]:
def extract_code_item(text, entry_point):
    # 正则表达式匹配代码块
    code_block_pattern = re.compile(
        rf"```(?:[Pp]ython\n)?.*?def\s+{entry_point}.*?:\n(.*?)\n```", re.DOTALL
    )
    code_block = code_block_pattern.search(text)

    if code_block is None:
        code_block_pattern = re.compile(
            rf"def\s+{entry_point}.*?:\n(.*?)(?:\n(?!\n*(?:  |\t))|$)", re.DOTALL
        )
        code_block = code_block_pattern.search(text)
    if code_block is None:
        code_block_pattern = re.compile(
            r"def.*?:\n(.*?)(?:\n(?!\n*(?:  |\t))|$)", re.DOTALL
        )
        code_block = code_block_pattern.search(text)

    if code_block is not None:
        return code_block.group(1)

    # if no code block is found, assume the LM is simply filling the code
    return textwrap.indent(text, " " * 4)


def extract_code(text, entry_point):
    return batch_process(extract_code_item, text, entry_point)

In [10]:
def generate_sample(model, tokenizer, question, entry_point):
    response = generate_answer(
        model,
        tokenizer,
        question
    )
    # print(question)
    # print(response)
    answer = extract_code(response, entry_point)
    return answer, response

In [11]:
def preprocess_item(question):
    signature = re.search(
        rf"def\s+({question['entry_point']}.*?):\s*\n", question["prompt"]
    ).group(1)
    # print(signature)
    description = "\n".join(
        [
            line.strip()
            for line in re.search(
                rf"(?:\"\"\"|''')(.*?)(?:\"\"\"|''')", question["prompt"], re.DOTALL
            )
            .group(1)
            .split("\n")
        ]
    )
    # print(description)

    prompt = (
        f"Write a Python function `{signature}` to solve the following problem, just give the code:\n"
        f"{description}\n"
        f"{question['prompt']}"
    )
    return prompt


def preprocess(question):
    return batch_process(preprocess_item, question)

In [12]:
def main():
    result_path = os.path.join(args.save_result_dir, f"result.jsonl")
    if not args.overwrite and os.path.exists(result_path):
        print(f"{result_path} existed, skip!")

    else:
        test = dataset["test"]
        os.makedirs(args.save_result_dir, exist_ok=True)
        f_output = jsonlines.Writer(open(result_path, "w", encoding="utf-8"))

        with f_output as output:
            for i in tqdm(range(0, len(test), args.batch_size)):
                batch = test.select(range(i, min(i+args.batch_size, len(test))))
                prompt = preprocess(batch)
                task_id = batch["task_id"]

                answer, response = generate_sample(
                    model, tokenizer, prompt, batch["entry_point"]
                )
                for i in range(len(batch)):
                    gen = {"task_id": task_id[i], "completion": answer[i], "response": response[i]}
                    output.write(gen)
        f_output.close()
        

In [13]:
if __name__ == "__main__":
    main()

  0%|          | 0/11 [00:00<?, ?it/s]

100%|██████████| 11/11 [08:46<00:00, 47.85s/it]
