In [None]:
import torch
from datasets import load_dataset
import pandas as pd
from datasets import load_dataset
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from sklearn.metrics import classification_report
from transformers import set_seed
set_seed(42)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


In [None]:
# specify how to quantize the model
quantization_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16
)

# model_name = "mistralai/Mistral-7B-v0.1"
model_name= "mistralai/Mistral-7B-Instruct-v0.2"
# model_name = "meta-llama/Llama-2-7b-chat-hf"
# model_name = "meta-llama/Llama-2-7b-hf"
# model_name = "microsoft/phi-2"
# model_name = "intfloat/e5-mistral-7b-instruct"
save_name = "Mistral-7B-Instruct-v0.2"

model = AutoModelForCausalLM.from_pretrained(model_name, quantization_config=quantization_config, device_map="auto")
tokenizer = AutoTokenizer.from_pretrained(model_name, padding_side = 'left')
# tokenizer.padding_side = 'left'

In [None]:
def make_few_shot_promt(training_set, testing: str):
    # Convert dataset to pandas DataFrame
    df = pd.DataFrame(training_set)

    # Example DataFrame with 'class' column
    # Replace 'class_column_name' with the actual name of the column that represents the class
    class_column_name = 'Label'

    # Randomly choose three datasets from each class
    random_samples = df.groupby(class_column_name).apply(lambda x: x.sample(n=min(3, len(x))))
   
    return promt_generation3(random_samples, testing)
    # return promt_mistral(random_samples, testing)
   

def promt_generation3(sample, testing):
    # instruction = "You are a classification model. Based on the input, you need to predict the most relevant category label from {Hard-goal, Soft-goal, Task, Capability}. If the text doesn't fit into any of the above categories, classify it as Hard-goal. Each input has only one label. \n"
    instruction = "You are a classification model. Based on the input, you need to predict the most relevant category label from {HG, SG, Task, Cap}. If the text doesn't fit into any of the above categories, classify it as HG. Each input has only one label. \n"
    prompt_training_data = ""
    instruct = f"### Instruction\n{instruction}"
    for _, i in sample.iterrows():
        # print(i)
        context = f"### Input\n{i['UserStory']} \n" 
        response = f"### Output\n {i['Label']} \n"
        prompt_training_data = prompt_training_data + context + response
    test_prompt =  f"### Input\n{testing} \n" + f"### Output\n"
    return instruct + prompt_training_data + test_prompt

def promt_mistral(sample, testing):
    
    prompt_training_data = ""
    for _, i in sample.iterrows():
        context = f"Inquiry:\n{i['UserStory']} \n" 
        response = f"Category:\n {i['Label']} \n"
        prompt_training_data = prompt_training_data + context + response
    
    user_message = (
        f"""
        You are a agile bot in software development. Your task is to assess developer to categorize inquiry after <<<>>> into one of the following predefined categories:
        
        Hard-goal
        Soft-goal
        Task
        Capability
        
        If the text doesn't fit into any of the above categories, classify it as:
        Hard-goal
        
        You will only respond with the predefined category. Do not include the word "Category". Do not provide explanations or notes. 
        
        ####
        Here are some examples:
        
        {prompt_training_data}
        ###
    
        <<<
        Inquiry: {testing}
        Category:
        >>>
        """
    )
    
    return user_message

def decoder(prompt_mss):
    model_inputs = tokenizer(prompt_mss, return_tensors="pt").to("cuda")
    generated_ids = model.generate(**model_inputs, max_new_tokens=1, temperature=0.1 , do_sample=True, pad_token_id=tokenizer.eos_token_id)
    # print(tokenizer.batch_decode(generated_ids)[0])
    return tokenizer.batch_decode(generated_ids)[0]

def getLabelFromPrompt(result):
    pattern = "### Output\n" 
    # pattern = "Category:" # for Mistral Prompt Template
    output = result.split(pattern)
    return output[len(output)-1]

def correct_label(label):
   
    if label.strip() == 'Cap': return "Capability"    
    elif label.strip() == 'Hard': return "Hard-goal"
    elif label.strip() == "Soft": return "Soft-goal"
    else: return label.strip()
    
# target_names = ['Capability', 'Hard-goal', 'Soft-goal', 'Task']
target_names = ['Cap', 'HG', 'SG', 'Task']
def verify_inconsistency_label(training_set, testing):
    prompt_mss = make_few_shot_promt(training_set, testing)
    result_mss = decoder(prompt_mss)
    # label_tmp = correct_label(getLabelFromPrompt(result_mss))
    label_tmp = getLabelFromPrompt(result_mss)
    
    if label_tmp not in target_names:
        print('Label Not found')
        return verify_inconsistency_label(training_set, testing)
    else: return label_tmp
    
def evaluation(labels, preds, target_names):
    metricReport = classification_report(labels, preds, target_names=target_names, zero_division=0, output_dict=True)
    return {
        'Accuracy': metricReport['accuracy'],
        'CapP': metricReport[target_names[0]]['precision'],
        'CapR': metricReport[target_names[0]]['recall'],              
        'CapF1': metricReport[target_names[0]]['f1-score'],
        'HGP': metricReport[target_names[1]]['precision'],
        'HGR': metricReport[target_names[1]]['recall'],
        'HGF1': metricReport[target_names[1]]['f1-score'],
        'SGP': metricReport[target_names[2]]['precision'],
        'SGR': metricReport[target_names[2]]['recall'],
        'SGF1': metricReport[target_names[2]]['f1-score'],
        'TP': metricReport[target_names[3]]['precision'],
        'TR': metricReport[target_names[3]]['recall'],
        'TF1': metricReport[target_names[3]]['f1-score'],
    }
    
def promt_qa_generation(sample, testing):
    instruction = "You are a classification model. Based on the input, you need to predict the most relevant category label from {Hard-goal, Soft-goal, Task, Capability}. If the text doesn't fit into any of the above categories, classify it as Hard-goal. Each input has only one label. \n"
    prompt_training_data = ""
    instruct = f"### Instruction\n{instruction}"
    for _, i in sample.iterrows():
        # print(i)
        context = f"### Input\n{i['UserStory']} \n" 
        response = f"### Output\n {i['Label']} \n"
        prompt_training_data = prompt_training_data + context + response
    test_prompt =  f"### Input\n{testing} \n" + f"### Output\n"
    return instruct + prompt_training_data + test_prompt
    
def changeLabel(label):
    
    replace_dict = {
        "Capability": "Cap",
        "Hard-goal": "HG",
        "Soft-goal": "SG",
        "Task": "Task",
    }
    # updated_labels = [replace_dict.get(item) for item in labels]
    
    label['Label'] = replace_dict.get(label['Label'])
    return {'Label': label['Label']}
    # return replace_dict.get(label)
# print(make_few_shot_promt(dataset['train'], "i want to see correct status labels on the submission dashboard"))
# prompt_mss = make_few_shot_promt(dataset['train'], "i want to see correct status labels on the submission dashboard")
# print(getLabelFromPrompt(decoder(prompt_mss)).strip())

In [None]:

data_dir = "../data/us/newDataset/separate_5_folds_2/"
results = []
for iteratorDataset in range(1, 6): 
    dataset = load_dataset('csv', data_files={'train': data_dir +  'train_' + str(iteratorDataset) + '.csv', 'test': data_dir + 'test_' + str(iteratorDataset) + '.csv'}, encoding = "utf-8")
    dataset = dataset.remove_columns(['Unnamed: 0'])
    dataset = dataset.map(changeLabel)
    y_pred = []
    for i in dataset['test']:
        # print('****')
        # print(make_few_shot_promt(dataset['train'], i['UserStory']))
        # print('Gold Label: ', i['UserStory'], i['Label'])
        
        # prompt_mss = make_few_shot_promt(dataset['train'], i['UserStory'])
        # result_mss = decoder(prompt_mss)
        # label_tmp = correct_label(getLabelFromPrompt(result_mss))
        
        # if label_tmp not in target_names:
        #     print(result_mss)
        #     prompt_mss = make_few_shot_promt(dataset['train'], i['UserStory'])
        #     result_mss = decoder(prompt_mss)
        #     label_tmp = getLabelFromPrompt(result_mss)
        # else:
        # print(result_mss)
        label_tmp = verify_inconsistency_label(dataset['train'], i['UserStory'])
        y_pred.append(label_tmp)
    print(classification_report(dataset['test']['Label'], y_pred))
    # results.append(evaluation(dataset['test']['Label'], y_pred, target_names=target_names))
    print(y_pred)
    
    
# pd.DataFrame(results).mean().to_excel('../results/'+ save_name +'.xlsx')
    