In [1]:
import transformers
import pandas as pd
from tqdm.notebook import tqdm
from toolz import curry
import huggingface_hub

huggingface_hub.login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [2]:
def get_prompt(type):
    with open(f"./Prompts/prompt-{type}.txt", 'r') as f:
        return f.read()

In [3]:
# Create dataframe from ETHICS commonsense test dataset
cm_dataset = pd.read_csv("./ethics/commonsense/cm_test.csv")

# Remove all long 
cm_dataset = cm_dataset.loc[cm_dataset.loc[:, "is_short"] == True, ["label", "input"]]

In [12]:
# Setup the pipeline for the given model
def get_pipeline(model_id, tokenizer_id=None):
    if tokenizer_id:
        tokenizer = transformers.AutoTokenizer.from_pretrained(tokenizer_id)
    else:
        tokenizer = transformers.AutoTokenizer.from_pretrained(model_id)
        
    pipe = transformers.pipeline(
        "text-generation",
        model=model_id,
        trust_remote_code=True,
        tokenizer=tokenizer,
        torch_dtype="auto",
        device_map="cuda",
        #pad_token_id = 128001
    )
    
    return pipe, tokenizer

In [5]:
@curry
def prompt_formater_chat_sys(system_instructions, tokenizer, input):
    messages = [
        {"role": "system", "content": system_instructions},
        {"role": "user", "content": f"SITUATION: {input}"},
    ]

    prompt = tokenizer.apply_chat_template(
        messages, 
        tokenize=False, 
        add_generation_prompt=True
    )

    return prompt


@curry
def prompt_formater_chat_user(system_instructions, tokenizer, input):
    messages = [
        {"role": "user", "content": system_instructions},
        {"role": "user", "content": f"SITUATION: {input}"},
    ]

    prompt = tokenizer.apply_chat_template(
        messages, 
        tokenize=False, 
        add_generation_prompt=True
    )
    
    return prompt


@curry
def prompt_formater_basic(system_instructions, input):
    prompt = f"<System>{system_instructions}</System>\n\n<User>SITUATION: {input}</user>\n\n<assistant>"
    
    return prompt

def get_prompt_formater(type, system_instructions, tokenizer):
    if type == "chat-sys":
        return prompt_formater_chat_sys(system_instructions, tokenizer)
        
    elif type == "chat-user":
        return prompt_formater_chat_user(system_instructions, tokenizer)
        
    elif type == "basic":
        return prompt_formater_basic(system_instructions)
    else:
        raise Exception(f"Error retrieving prompt formater {type}.")
    

In [6]:
# Create a dataloader for faster tokenization
def dataloader(dataset, formater):
    for i, (label, input) in dataset.iterrows():
        prompt = formater(input)

        yield i, label, prompt

In [7]:
def parser_basic (output: str) -> int: 
    try:
        if "uneth" in output.lower():
            return 0
        elif "eth" in output.lower():
            return 1
        raise(1)
    except:
        print(f"Parse error: Output ({output}) was malformed. Returning -1.")
        return -1


def parser_answer (output: str) -> int: 
    try:
        output = output.lower()
        answer = output.split("swer:")[1]
        if "uneth" in answer:
            return 0
        elif "eth" in answer:
            return 1
        raise(1)
    except:
        print(f"Parse error: Output ({output}) was malformed. Returning -1.")
        return -1


def get_parser(type):
    if type == "basic":
        return parser_basic
    elif type == "answer":
        return parser_answer
    else:
        raise(f"Error retrieving parser {type}.")

In [8]:
def evaluate(pipe, parser, formater, dataset, outfile_name, max_new_tokens=200, verbose=False):
    generation_dataset = {"input":[],"true_label":[],"predited_label":[], "output":[]}
    
    # Evaluation loop
    for i, label, prompt in tqdm(dataloader(dataset, formater), total=len(cm_dataset)):
        # Generate the output
        output: str = pipe(
            prompt,
            max_new_tokens=max_new_tokens,
            do_sample=True,
            return_full_text=False
        )[0]['generated_text']
    
        # Parse output
        answer = parser(output)
    
        generation_dataset["input"].append(cm_dataset.loc[i, "input"])
        generation_dataset["true_label"].append(label)
        generation_dataset["predited_label"].append(answer)
        generation_dataset["output"].append(output)
    
        if verbose:
            print(cm_dataset.loc[i, "input"], output, answer, label)
    
    generation_dataset = pd.DataFrame().from_dict(generation_dataset)
    generation_dataset.to_csv(f"./Data/{outfile_name}.csv")

    return generation_dataset

In [14]:
model_id = "microsoft/Phi-3-mini-4k-instruct"
prompt_type = "step_by_step"
prompt = get_prompt(prompt_type)
parser = get_parser("answer")
pipe, tokenizer = get_pipeline(model_id)
formater = get_prompt_formater("chat-user", prompt, tokenizer)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
`flash-attention` package not found, consider installing for better performance: No module named 'flash_attn'.
Current `flash-attenton` does not support `window_size`. Either upgrade or use `attn_implementation='eager'`.


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [15]:
generation_dataset = evaluate(pipe, parser, formater, cm_dataset, f"{prompt_type}/Phi-3", 500, verbose=False)

  0%|          | 0/2109 [00:00<?, ?it/s]

You are not running the flash-attention implementation, expect numerical differences.
