In [1]:
import os

import huggingface_hub
import torch
import wandb
from datasets import load_dataset
from torcheval.metrics.functional import multiclass_accuracy, multiclass_f1_score
from tqdm.auto import tqdm
from transformers import (
    AutoModelForCausalLM,
    AutoModelForSequenceClassification,
    AutoTokenizer,
    BitsAndBytesConfig,
    GenerationConfig,
    pipeline
)

In [2]:
# prevent env load failed
%load_ext dotenv
%dotenv

In [3]:
huggingface_hub.login(token=os.environ.get("HF_TOKEN", ""), add_to_git_credential=True)

Token is valid (permission: read).
Your token has been saved in your configured git credential helpers (store).
Your token has been saved to /home/hermeschen/.cache/huggingface/token
Login successful


In [4]:
wandb.login(key=os.environ.get("WANDB_API_KEY", ""), relogin=True)

[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /home/hermeschen/.netrc


True

# Load and Process Dataset

In [5]:
dataset = load_dataset("daily_dialog", split="test", num_proc=8, trust_remote_code=True)

In [6]:
dataset = dataset.remove_columns("act")

In [7]:
dataset = dataset.rename_column("emotion", "emotion_id")
emotion_labels: list = dataset.features["emotion_id"].feature.names
emotion_labels[0] = "neutral"
dataset = dataset.map(lambda samples: {
    "emotion": [[emotion_labels[emotion_id] for emotion_id in sample] for sample in samples]
}, input_columns="emotion_id", remove_columns="emotion_id", batched=True, num_proc=8)

In [8]:
dataset = dataset.map(lambda samples: {
    "emotion_history": [sample[:-1] for sample in samples],
    "emotion_bot": [sample[-1] for sample in samples]
}, input_columns="emotion", remove_columns="emotion", batched=True, num_proc=16)

In [9]:
dataset = dataset.map(lambda samples: {
    "dialog": [[dialog.strip() for dialog in sample] for sample in samples]
}, input_columns="dialog", batched=True, num_proc=16)
dataset = dataset.map(lambda samples: {
    "dialog": [sample if len(sample) % 2 == 0 else sample[:-1] for sample in samples]
}, input_columns="dialog", batched=True, num_proc=16)

In [10]:
dataset = dataset.map(lambda samples: {
    "dialog_history": [sample[:-1] for sample in samples],
    "dialog_bot": [sample[-1] for sample in samples]
}, input_columns="dialog", remove_columns="dialog", batched=True, num_proc=16)

In [11]:
dataset[0]

{'emotion_history': ['neutral',
  'surprise',
  'neutral',
  'neutral',
  'neutral',
  'neutral',
  'neutral',
  'neutral',
  'neutral',
  'neutral',
  'fear'],
 'emotion_bot': 'neutral',
 'dialog_history': ['Hey man , you wanna buy some weed ?',
  'Some what ?',
  'Weed ! You know ? Pot , Ganja , Mary Jane some chronic !',
  'Oh , umm , no thanks .',
  'I also have blow if you prefer to do a few lines .',
  'No , I am ok , really .',
  'Come on man ! I even got dope and acid ! Try some !',
  'Do you really have all of these drugs ? Where do you get them from ?',
  'I got my connections ! Just tell me what you want and I ’ ll even give you one ounce for free .',
  'Sounds good ! Let ’ s see , I want .',
  'Yeah ?'],
 'dialog_bot': 'I want you to put your hands behind your head ! You are under arrest !'}

In [27]:
bot: str = "assistant"
test_data = dataset.map(lambda samples: {
    "prompt": [[{"role": "user" if i % 2 == 0 else "assistant", "content": dialog}
                for i, dialog in enumerate(sample)] for sample in samples]
}, input_columns="dialog_history", batched=True, num_proc=16)

In [28]:
test_data = test_data.map(lambda sample: {
    "history": "\n".join([f"""{'user' if i % 2 == 0 else 'bot'}({v[0]}): {v[1]}"""
                          for i, v in enumerate(zip(sample["emotion_history"], sample["dialog_history"]))])
}, remove_columns=["emotion_history", "dialog_history"], num_proc=8)

Map (num_proc=8):   0%|          | 0/1000 [00:00<?, ? examples/s]

In [30]:
test_data[0]["history"]

'user(neutral): Hey man , you wanna buy some weed ?\nbot(surprise): Some what ?\nuser(neutral): Weed ! You know ? Pot , Ganja , Mary Jane some chronic !\nbot(neutral): Oh , umm , no thanks .\nuser(neutral): I also have blow if you prefer to do a few lines .\nbot(neutral): No , I am ok , really .\nuser(neutral): Come on man ! I even got dope and acid ! Try some !\nbot(neutral): Do you really have all of these drugs ? Where do you get them from ?\nuser(neutral): I got my connections ! Just tell me what you want and I ’ ll even give you one ounce for free .\nbot(neutral): Sounds good ! Let ’ s see , I want .\nuser(fear): Yeah ?'

# Evaluation

In [19]:
prompt_type: str = "history"

In [11]:
base_model: str = "meta-llama/Llama-2-7b-hf"
fine_tuned_model_name: str = base_model

## Load Model

In [49]:
device_map: str = "auto" if torch.cuda.is_available() else "cpu"

In [22]:
quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

In [23]:
model = AutoModelForCausalLM.from_pretrained(
    fine_tuned_model_name,
    quantization_config=quantization_config if torch.cuda.is_available() else None,
    device_map=device_map,
    low_cpu_mem_usage=True,
    attn_implementation="flash_attention_2"
)

Downloading shards:   0%|          | 0/2 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
model = torch.compile(model)

## Configuration

In [None]:
generation_config = GenerationConfig(
    max_new_tokens=20,
    min_new_tokens=5,
    repetition_penalty=1.5
)

## Log

In [None]:
wandb.init(
    project=os.environ.get("WANDB_PROJECT", ""),
    config={
        "base_model": base_model,
        "fine_tuned_model": fine_tuned_model_name,
        "quantization_configuration": quantization_config,
        "generation_configuration": generation_config,
        "detail_about_evaluation": "evaluate some possible base model"
    }
)

## Generate Response

In [45]:
tokenizer = AutoTokenizer.from_pretrained("state-spaces/mamba-130m-hf", trust_remote_code=True)

tokenizer_config.json:   0%|          | 0.00/4.79k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.11M [00:00<?, ?B/s]

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [42]:
test_data_list = [
    {"role": "user", "content": "user(angry): Fuck you!"},
    {"role": "assistant", "content": "assistant(sad): "}
]

In [57]:
tokenized_chat: list = []
for sample in test_data:
    tokenized_chat.append(
        tokenizer.apply_chat_template(sample, tokenize=False, add_generation_prompt=True, return_tensors="pt"))

In [58]:
tokenized_chat[0]

'<s>[INST] user(neutral): Good morning , sir . Is there a bank near here ? [/INST] user(neutral): </s>'

In [46]:
from pprint import pprint

pprint(tokenizer.default_chat_template)


No chat template is defined for this tokenizer - using the default template for the GPTNeoXTokenizerFast class. If the default is not appropriate for your model, please set `tokenizer.chat_template` to an appropriate template. See https://huggingface.co/docs/transformers/main/chat_templating for more information.



'{% for message in messages %}{{ message.content }}{{ eos_token }}{% endfor %}'


In [None]:
import transformers


class MyPipeline(transformers.Pipeline):
    def _sanitize_parameters(self, **kwargs):
        # preprocess_kwargs = {}
        # forward_kwargs = {}
        # 
        # try: 
        #     forward_kwargs["model"] = kwargs["model"]
        # except ValueError as e:
        #     print(f"model is a required argument")
        # preprocess_kwargs["tokenizer"] = kwargs["tokenizer"] if "tokenizer" in kwargs else kwargs["model"]

        return {}, {}, {}

    def preprocess(self, sample, tokenizer):
        tokenized_prompt = tokenizer.apply_chat_template(sample["prompt"], tokenize=False, add_generation_prompt=True,
                                                         return_tensors="pt")
        return {"model_input": tokenized_prompt}

    def _forward(self, model_inputs):
        # model_inputs == {"model_input": model_input}
        outputs = self.model(**model_inputs)
        # Maybe {"logits": Tensor(...)}
        return outputs

    def postprocess(self, model_outputs):
        best_class = model_outputs["logits"].softmax(-1)
        return best_class

In [28]:
device: str = "cuda" if torch.cuda.is_available() else "cpu"
for sample in tqdm(test_data[:10]):
    tokenized_prompt = tokenizer(sample["prompt"], return_tensors="pt").to(device)
    response_ids = model.generate(**tokenized_prompt, generation_config=generation_config)
    response = tokenizer.decode(response_ids[0], skip_special_tokens=True, clean_up_tokenization_spaces=True)
    sample["response_bot"] = response.replace(response.split("[/INST]")[0], "").removeprefix("[/INST]").strip()

  0%|          | 0/10 [00:00<?, ?it/s]

In [29]:
result = dataset.from_list(test_data)

In [30]:
result[0]

{'prompt': '<s>[INST]user(neutral): Good morning , sir . Is there a bank near here ?\nbot(neutral): [/INST]',
 'emotion_bot': 'neutral',
 'dialog_user': 'Good morning , sir . Is there a bank near here ?',
 'dialog_bot': 'There is one . 5 blocks away from here ?',
 'response_bot': "bank[/INS]. There is one on the corner. It's open until 8"}

## Sentiment Analysis

In [31]:
sentiment_analysis_model = AutoModelForSequenceClassification.from_pretrained(
    "../sentiment _analysis/emotion_text_classifier_on_dd_v1",
    quantization_config=quantization_config if torch.cuda.is_available() else None,
    device_map=device_map,
    low_cpu_mem_usage=True)
# sentiment_analysis_model = torch.compile(sentiment_analysis_model)

In [32]:
sentiment_analysis_tokenizer = AutoTokenizer.from_pretrained(
    "michellejieli/emotion_text_classifier",
    trust_remote_code=True)

In [52]:
analyser = pipeline(
    "sentiment-analysis",
    model=sentiment_analysis_model,
    tokenizer=sentiment_analysis_tokenizer,
    device_map=device_map,
    trust_remote_code=True)

config.json:   0%|          | 0.00/1.09k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/329M [00:00<?, ?B/s]

KeyboardInterrupt: 

In [34]:
# to prevent "The model 'OptimizedModule' is not supported for sentiment-analysis." problem
sentiment_analysis_model = torch.compile(sentiment_analysis_model)

In [36]:
sentiment_analysis_tokenizer(result[0]["response_bot"], return_tensors="pt")

{'input_ids': tensor([[    0,  5760, 48505, 15102,  8174,   345,    16,    65,    15,     5,
          2797,     4,    85,    18,   490,   454,   290,     2]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])}

In [35]:
result = result.map(lambda samples: {
    "response_sentiment_bot": analyser(samples)
}, input_columns="response_bot", batched=True)

Map:   0%|          | 0/13809 [00:00<?, ? examples/s]

ValueError: You need to specify either `text` or `text_target`.

In [None]:
result[0]

## Metrics

In [None]:
emotion_label_to_id: dict = {label: index for index, label in enumerate(emotion_labels)}
emotion_label_to_id

In [None]:
result = result.map(lambda samples: {
    "emotion_bot_id": emotion_label_to_id[samples]
}, input_columns="emotion_bot", num_proc=8)

In [None]:
result = result.map(lambda samples: {
    "response_sentiment_bot_id": emotion_label_to_id[samples["label"]]
    if samples["label"] in emotion_label_to_id.keys()
    else 4
}, input_columns="response_sentiment_bot", num_proc=8)

## Result

In [None]:
sentiment_true: torch.tensor = torch.tensor([sample["emotion_bot_id"]
                                             for sample in result.select_columns("emotion_bot_id").to_list()])
sentiment_pred: torch.tensor = torch.tensor([sample["response_sentiment_bot_id"]
                                             for sample in
                                             result.select_columns("response_sentiment_bot_id").to_list()])

In [None]:
result = result.remove_columns(["emotion_bot_id", "response_sentiment_bot_id"])

In [None]:
f1 = multiclass_f1_score(sentiment_true, sentiment_pred, num_classes=len(emotion_labels), average="micro")

In [None]:
accuracy = multiclass_accuracy(sentiment_true, sentiment_pred, num_classes=len(emotion_labels))

In [None]:
wandb.log({"F1-score": f1, "Accuracy": accuracy})

In [None]:
wandb.log({"generated_test_result": wandb.Table(data=result, columns=result.column_names)})

In [None]:
result.to_csv(f"evaluation_result_type_{fine_tuned_model_name}_{prompt_type}.csv", header=result.column_names,
              num_proc=8)