In [None]:
!pip install transformers torch pandas tqdm accelerate



In [None]:
import json
import os
import torch
import transformers
import accelerate

In [None]:
from google.colab import drive

drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
# os.chdir('../data')
os.chdir('/content/drive/MyDrive/reasoning-teacher/data')

In [None]:
def load_data(dataset_path):
    try:
        with open(dataset_path, 'r') as file:
            data = json.load(file)
    except Exception as e:
        print(e)
        return

    return data["metadata"], data["data"]

def load_fs_prompts(prompt_file_path, key):
    try:
        with open(prompt_file_path, 'r') as file:
            prompts = json.load(file)
        separated_qa = prompts[key]['prompt'].split('\n')
        if len(separated_qa)>10:
            prompts[key]['prompt'] = '\n'.join(separated_qa[:10])
        return prompts[key]
    except Exception as e:
        print(e)
        return

def generate_prompts(dataset_name="addsub",
                    dataset_dir="./dataset",
                    prompt_file_path="./few_shot_cot_prompts.json"):
    dataset_path = os.path.join(dataset_dir,dataset_name+".json")
    if not os.path.exists(dataset_path):
        raise FileNotFoundError("Dataset is not available")

    metadata, data = load_data(dataset_path)
    few_shot_prompts = load_fs_prompts(prompt_file_path, key=dataset_name)

    prompts = []
    answers = []
    indices=[]
    for qa in data:
        index = qa['sample_index']

        if index in few_shot_prompts["sample_indices"]:
            continue

        prompt = few_shot_prompts["prompt"] + "\nQ: " + qa["question"] + "\nA: "
        answer = qa["answer"]

        prompts.append(prompt)
        answers.append(answer)
        indices.append(index)

    return prompts, answers, indices

In [None]:
import re
from typing import List, Tuple, Union, Optional, Dict

import pandas as pd

prediction_prefix = "The answer is"

def _extract_prediction_candidates(prediction: str, dataset_key) -> List[str]:
    """
    Extracts all potential answer predictions which satisfy the dataset's answer format from the
    prediction string
    """
    if dataset_key in ("aqua", "commonsense_qa"):
        prediction = re.findall(r'[ABCDE]', prediction)
    elif dataset_key == "date_understanding":
        prediction = re.findall(r'[ABCDEF]', prediction)
    elif dataset_key in ("tracking_shuffled_objects"):
        prediction = re.findall(r'[ABC]', prediction)
    elif dataset_key in ("gsm8k", "addsub", "multiarith", "svamp", "single_eq"):
        prediction = prediction.replace(",", "")
        prediction = re.findall(r'-?\d+(?:\.\d+)?', prediction)
        if dataset_key in ("addsub", "svamp", "single_eq"):
            prediction = [float(s) for s in prediction]
    elif dataset_key in ("strategy_qa", "coin_flip"):
        prediction = prediction.lower()
        prediction = re.sub("\"|\'|\n|\.|\s|\:|\,", " ", prediction)
        prediction = prediction.split(" ")
        prediction = [i for i in prediction if i in ("yes", "no")]
    elif dataset_key == "last_letter_concatenation":
        prediction = re.sub("\"|\'|\n|\.|\s", "", prediction)
        prediction = [prediction]
    else:
        raise ValueError("Invalid dataset: {}".format(dataset_key))

    return prediction

def _compare_prediction_and_answer(prediction, answer, dataset_key) -> bool:
    if dataset_key in ("addsub", "svamp", "single_eq"):
        return prediction is not None and abs(prediction - answer) <= 1e-6
    else:
        return prediction is not None and prediction == answer

def cleanse_prediction(completion: str, return_all: bool,dataset_key) -> Union[str, Tuple[str, List[str]]]:
    if prediction_prefix is None:
        # If no prefix, use first candidate
        predictions = _extract_prediction_candidates(completion,dataset_key=dataset_key)
        first = True
    else:
        index = completion.find(prediction_prefix)
        if index == -1:
            # If prefix not found, use *last* candidate
            predictions = _extract_prediction_candidates(completion,dataset_key=dataset_key)
            first = False
        else:
            # If prefix found, use *first* candidate after prefix
            start_of_answer = index + len(prediction_prefix)
            predictions = _extract_prediction_candidates(completion[start_of_answer:],
                                                         dataset_key=dataset_key)
            first = True

    answer = None
    if predictions:
        answer = (predictions[0] if first else predictions[-1])

    return (answer, predictions) if return_all else answer

def cleanse_answer(answer: str,dataset_key) -> str:
    if dataset_key in ["gsm8k", "addsub", "multiarith", "svamp", "single_eq"]:
        answer = answer.replace(",", "")
    if dataset_key == "strategy_qa":
        answer = answer.lower()
    if dataset_key in ["addsub", "svamp", "single_eq"]:
        answer = float(answer)

    return answer

def check_answer(completion_string: str, answer: str, dataset_key) -> bool:
    """
    Check if a single prediction is correct.
    """
    prediction = cleanse_prediction(completion_string, return_all=False,dataset_key=dataset_key)
    answer = cleanse_answer(answer,dataset_key)
    return _compare_prediction_and_answer(prediction, answer,dataset_key)

In [None]:
from transformers import T5ForConditionalGeneration, OPTForCausalLM, AutoTokenizer
import torch
from tqdm.auto import tqdm

def get_model_inference(question, model_name, model_type):
    device = "cuda" if torch.cuda.is_available() else "cpu"

    if model_type == "seq2seq":
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = T5ForConditionalGeneration.from_pretrained(model_name,
                                                           )
    elif model_type == "causal":
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = OPTForCausalLM.from_pretrained(model_name,
                                               offload_folder=".")
    else:
        raise ModelNotFoundError("No Such model type available")

    question += " Let's think step by step. "
    prompt_inpt = tokenizer(question, return_tensors="pt")
    input_ids = prompt_inpt.input_ids.to(device)
    attention_mask = prompt_inpt.attention_mask.to(device)

    outputs = model.generate(input_ids=input_ids,
                             attention_mask=attention_mask,
                             top_p=0.8,
                             temperature=0.5,
                             do_sample=True)
    return tokenizer.decode(outputs[0])

def make_completion_dataset(model_name="google/flan-t5-xl",
                            model_type="seq2seq",
                            dataset_name="addsub",
                            dataset_dir="./dataset",
                            prompt_file_path="./few_shot_cot_prompts.json"):

    q,a,sam_ind = generate_prompts(dataset_name=dataset_name,
                                       dataset_dir=dataset_dir,
                                      prompt_file_path=prompt_file_path)
    print(len(q))
    completion_data = []

    import random
#     sample_indices = random.sample(range(0,len(q)),200)

    for i, (question,answer) in tqdm(enumerate(zip(q,a))):
#         if i not in sample_indices:
#             continue

        model_inference = get_model_inference(question, model_name, model_type)
        correct = check_answer(completion_string=model_inference,
                              answer=answer,
                              dataset_key=dataset_name)
        single_completion = {'prompt': question,
                             'answer': answer,
                             'model_inference': model_inference,
                             'correct': correct,
                             'sample_index': sam_ind[i]}
        completion_data.append(single_completion)
        if i > 1:
            break
    return completion_data

In [None]:
save_path = "./completion-data-all"
if not os.path.exists(save_path):
    os.makedirs(save_path)

In [None]:
completion_data = make_completion_dataset(model_name="google/flan-t5-base",
                                          model_type="seq2seq",
                                          dataset_name="addsub")
with open(os.path.join(save_path,"addsub_out.json"), mode="w") as f:
    json.dump(completion_data,f, indent=4)

395


0it [00:00, ?it/s]

In [None]:
completion_data = make_completion_dataset(model_name="google/flan-t5-base",
                                          model_type="seq2seq",
                                          dataset_name="date_understanding")
with open(os.path.join(save_path,"date_understanding_out.json"), mode="w") as f:
    json.dump(completion_data,f,indent=4)

In [None]:
completion_data = make_completion_dataset(model_name="google/flan-t5-base",
                                          model_type="seq2seq",
                                          dataset_name="coin_flip")
with open(os.path.join(save_path,"coin_flip_out.json"), mode="w") as f:
    json.dump(completion_data,f)

In [None]:
completion_data = make_completion_dataset(model_name="google/flan-t5-base",
                                          model_type="seq2seq",
                                          dataset_name="commonsense_qa")
with open(os.path.join(save_path,"commonsense_qa_out.json"), mode="w") as f:
    json.dump(completion_data,f,indent=4)

In [None]:
completion_data = make_completion_dataset(model_name="google/flan-t5-base",
                                          model_type="seq2seq",
                                          dataset_name="last_letter_concatenation")
with open(os.path.join(save_path,"last_letter_concatenation_out.json"), mode="w") as f:
    json.dump(completion_data,f,indent=4)

In [None]:
completion_data = make_completion_dataset(model_name="google/flan-t5-base",
                                          model_type="seq2seq",
                                          dataset_name="single_eq")
with open(os.path.join(save_path,"single_eq_out.json"), mode="w") as f:
    json.dump(completion_data,f,indent=4)

In [None]:
completion_data = make_completion_dataset(model_name="google/flan-t5-base",
                                          model_type="seq2seq",
                                          dataset_name="strategy_qa")
with open(os.path.join(save_path,"strategy_qa_out.json"), mode="w") as f:
    json.dump(completion_data,f,indent=4)

In [None]:
completion_data = make_completion_dataset(model_name="google/flan-t5-base",
                                          model_type="seq2seq",
                                          dataset_name="multiarith")
with open(os.path.join(save_path,"multiarith_out.json"), mode="w") as f:
    json.dump(completion_data,f,indent=4)

In [None]:
completion_data = make_completion_dataset(model_name="google/flan-t5-base",
                                          model_type="seq2seq",
                                          dataset_name="svamp")
with open(os.path.join(save_path,"svamp_out.json"), mode="w") as f:
    json.dump(completion_data,f,indent=4)

In [None]:
completion_data = make_completion_dataset(model_name="google/flan-t5-base",
                                          model_type="seq2seq",
                                          dataset_name="tracking_shffled_objects")
with open(os.path.join(save_path,"tracking_shhuffled_objects_out.json"), mode="w") as f:
    json.dump(completion_data,f,indent=4)

In [None]:
completion_data

[{'prompt': 'Q: Do hamsters provide food for any animals?\nA: Hamsters are prey animals. Prey are food for predators. Thus, hamsters provide food for some animals. The answer is yes.\nQ: Could Brooke Shields succeed at University of Pennsylvania?\nA: Brooke Shields went to Princeton University. Princeton University is about as academically rigorous as the University of Pennsylvania. Thus, Brooke Shields could also succeed at the University of Pennsylvania. The answer is yes.\nQ: Yes or no: Hydrogen’s atomic number squared exceeds number of Spice Girls?\nA: Hydrogen has an atomic number of 1. 1 squared is 1. There are 5 Spice Girls. Thus, Hydrogen’s atomic number squared is less than 5. The answer is no.\nQ: Yes or no: Is it common to see frost during some college commencements?\nA: College commencement ceremonies can happen in December, May, and June. December is in the winter, so there can be frost. Thus, there could be frost at some commencements. The answer is yes.\nQ: Yes or no: Co

In [None]:
# completion_data = make_completion_dataset(model_name="google/flan-t5-xl",
#                                           model_type="seq2seq",
#                                           dataset_name="svamp")
# with open("/kaggle/working/svamp_out.json", mode="w") as f:
#     json.dump(completion_data,f,indent=4)

In [None]:
completion_data