In [1]:
from accelerate import Accelerator

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import os
import torch
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    AutoConfig,
    BitsAndBytesConfig,
)
import time
import json
import matplotlib.pyplot as plt
import seaborn as sns
import wandb

In [3]:
os.environ["CUDA_VISIBLE_DEVICES"]="0"

In [4]:
def get_prompt(sample_data_point, speaker=False, video=False, audio=False):
	if speaker and video and audio:
		instruction = """Identify the emotion label of utterance {} out of the 7 emotions: joy, anger, disgust, fear, sadness, surprise, neutral. The video_captions for each utterance describe the video corresponding to the text and audio captions describe the audio.\nDon't give an explanation.\nOutput only one line in the format:\nutterance_id :: emotion_label""".format(sample_data_point["utterance_id"])
	elif speaker and video:
		instruction = """Identify the emotion label of utterance {} out of the 7 emotions: joy, anger, disgust, fear, sadness, surprise, neutral. The video_captions for each utterance describe the video corresponding to the text.\nDon't give an explanation.\nOutput only one line in the format:\nutterance_id :: emotion_label""".format(sample_data_point["utterance_id"])
	else:
		instruction = """Identify the emotion label of utterance {} out of the 7 emotions: joy, anger, disgust, fear, sadness, surprise, neutral.\nDon't give an explanation.\nOutput only one line in the format:\nutterance_id :: emotion_label""".format(sample_data_point["utterance_id"])

	return f"""<s>[INST]\n{sample_data_point['conversation']}{instruction}\n[/INST]\n""", f"""utterance_{sample_data_point["utterance_id"]} :: {sample_data_point['emotion_label']}\n</s>"""

In [5]:
def get_formatted_conversation(data_sample, speaker=False, video=False, audio=False):
    conversation_str = '"conversation": [\n'
    for utt in data_sample["conversation"]:
        # conversation_str
        if speaker and video and audio:
            conversation_str += "{{\n\"utterance_ID\": {}\n\"text\": {}\n\"speaker\": {}\n\"video_caption\": {}\n\"audio_caption\": {}".format(utt["utterance_ID"], utt["text"], utt["speaker"], utt["video_caption"], utt["audio_caption"])
            conversation_str += "\n}\n"
        elif speaker and video:
            conversation_str += "{{\n\"utterance_ID\": {}\n\"text\": {}\n\"speaker\": {}\n\"video_caption\": {}".format(utt["utterance_ID"], utt["text"], utt["speaker"], utt["video_caption"])
            conversation_str += "\n}\n"
        elif speaker:
            conversation_str += "{{\n\"utterance_ID\": {}\n\"text\": {}\n\"speaker\": {}\n}}\n".format(utt["utterance_ID"], utt["text"], utt["speaker"])
        else:
            conversation_str += "{{\n\"utterance_ID\": {}\n\"text\": {}\n}}\n".format(utt["utterance_ID"], utt["text"])
    return conversation_str

In [6]:
def form_input(data_sample, utt_id, speaker=False, video=False, audio=False):
    conversation_str = get_formatted_conversation(data_sample, speaker, video, audio)
    emotions_str = data_sample["conversation"][utt_id]["emotion"]
    sample_data_point = {} # has utterance_id, conversation, emotion_label, conv id

    sample_data_point["utterance_id"] = utt_id+1
    sample_data_point["conversation"] = conversation_str
    sample_data_point["emotion_label"] = emotions_str
    sample_data_point["conversation_id"] = data_sample["conversation_ID"]
    return sample_data_point

In [7]:
def generate_inst_for_each_utt_of_conv(data_point, speaker=False, video=False, audio=False):
    num_utt = len(data_point["conversation"])
    list_output_objects = []
    for i in range(num_utt):
        sample_data_point = form_input(data_point, i, speaker, video, audio)
        inst, out = get_prompt(sample_data_point, speaker, video, audio)
        obj = {
            "instruction": inst,
            "out": out,
            "text": inst + out,
            "emotion_label": sample_data_point["emotion_label"],
            "utterance_id": sample_data_point["utterance_id"],
            "conversation_id": sample_data_point["conversation_id"],
        }
        list_output_objects.append(obj)
    return list_output_objects

In [8]:
def process_dataset(data, speaker=False, video=False, audio=False):
    flattened_list = [element for sublist in map(lambda x: generate_inst_for_each_utt_of_conv(x, speaker, video, audio), data) for element in sublist]
    ds = {}
    ds["instruction"] = [sample["instruction"] for sample in flattened_list]
    ds["out"] = [sample["out"] for sample in flattened_list]
    ds["text"] = [sample["text"] for sample in flattened_list]
    ds["emotion_label"] = [sample["emotion_label"] for sample in flattened_list]
    ds["utterance_id"] = [sample["utterance_id"] for sample in flattened_list]
    ds["conversation_id"] = [sample["conversation_id"] for sample in flattened_list]
    return ds

In [9]:
from generate_input import *

In [10]:
# Params
class Config:
    def __init__(self) -> None:
        self.seed = 42
        self.speaker = True
        self.video = False
        self.audio = False

In [11]:
config = Config()
test_file, train_file, val_file = generate_input(config)

with open(train_file, 'r') as f:
    train_data = json.load(f)
with open(val_file, 'r') as f:
    val_data = json.load(f)
with open(test_file, 'r') as f:
    test_data = json.load(f)

Input already present at input\test\speaker

Input already present at input\train\speaker

Input already present at input\val\speaker



In [12]:
# Form datasets from data
train_dataset = process_dataset(train_data, speaker=config.speaker, video=config.video)
val_dataset = process_dataset(val_data, speaker=config.speaker, video=config.video)
test_dataset = process_dataset(test_data, speaker=config.speaker, video=config.video)

In [13]:
from datasets import Dataset
train_dataset = Dataset.from_dict(train_dataset)
val_dataset = Dataset.from_dict(val_dataset)
test_dataset = Dataset.from_dict(test_dataset)

In [14]:
print("Train len {}".format(len(train_dataset)))
print("Test len {}".format(len(test_dataset)))
print("Val len {}".format(len(val_dataset)))

Train len 10914
Test len 1406
Val len 1299


In [17]:
print(train_dataset[0])

{'instruction': '<s>[INST]\n"conversation": [\n{\n"utterance_ID": 1\n"text": Really ? You would ... you would do that for me ? !\n"speaker": Joey\n}\n{\n"utterance_ID": 2\n"text": Yeah !\n"speaker": Ross\n}\n{\n"utterance_ID": 3\n"text": Thanks !\n"speaker": Joey\n}\n{\n"utterance_ID": 4\n"text": All right , we will start off slow . The only thing you have to do tonight is come up with the name of your main character .\n"speaker": Ross\n}\n{\n"utterance_ID": 5\n"text": Done !\n"speaker": Joey\n}\n{\n"utterance_ID": 6\n"text": And it can not be Joey .\n"speaker": Ross\n}\n{\n"utterance_ID": 7\n"text": It is not .\n"speaker": Joey\n}\n{\n"utterance_ID": 8\n"text": Or Joseph .\n"speaker": Ross\n}\n{\n"utterance_ID": 9\n"text": Oh .\n"speaker": Joey\n}\nIdentify the emotion label of utterance 1 out of the 7 emotions: joy, anger, disgust, fear, sadness, surprise, neutral.\nDon\'t give an explanation.\nOutput only one line in the format:\nutterance_id :: emotion_label\n[/INST]\n', 'out': 'ut

In [23]:
print(train_dataset["instruction"][0])

<s>[INST]
"conversation": [
{
"utterance_ID": 1
"text": Really ? You would ... you would do that for me ? !
"speaker": Joey
}
{
"utterance_ID": 2
"text": Yeah !
"speaker": Ross
}
{
"utterance_ID": 3
"text": Thanks !
"speaker": Joey
}
{
"utterance_ID": 4
"text": All right , we will start off slow . The only thing you have to do tonight is come up with the name of your main character .
"speaker": Ross
}
{
"utterance_ID": 5
"text": Done !
"speaker": Joey
}
{
"utterance_ID": 6
"text": And it can not be Joey .
"speaker": Ross
}
{
"utterance_ID": 7
"text": It is not .
"speaker": Joey
}
{
"utterance_ID": 8
"text": Or Joseph .
"speaker": Ross
}
{
"utterance_ID": 9
"text": Oh .
"speaker": Joey
}
Identify the emotion label of utterance 1 out of the 7 emotions: joy, anger, disgust, fear, sadness, surprise, neutral.
Don't give an explanation.
Output only one line in the format:
utterance_id :: emotion_label
[/INST]



In [25]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

model_id = "models-NousResearch-Llama-2-7b-chat-hf"

tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    torch_dtype=torch.float16,
    device_map="auto",
)

messages = [
    {"role": "system", "content": "You are an artificial intelligence assistant for an expert in sentiment analysis!"},
    {"role": "user", "content": train_dataset["instruction"][0]},
]

input_ids = tokenizer.apply_chat_template(
    messages,
    add_generation_prompt=True,
    return_tensors="pt"
).to(model.device)

terminators = [
    tokenizer.eos_token_id,
    tokenizer.convert_tokens_to_ids("<|eot_id|>")
]

outputs = model.generate(
    input_ids,
    max_new_tokens=256,
    eos_token_id=terminators,
    do_sample=True,
    temperature=0.6,
    top_p=0.9,
)
response = outputs[0][input_ids.shape[-1]:]
print(tokenizer.decode(response, skip_special_tokens=True))

Loading checkpoint shards: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:09<00:00,  4.50s/it]


 Sure, I can help you with that! Here are the emotion labels for each utterance in the conversation:

utterance_id :: emotion_label

1 :: joy


In [26]:
model.config.use_cache = True
model.eval()

LlamaForCausalLM(
  (model): LlamaModel(
    (embed_tokens): Embedding(32000, 4096, padding_idx=0)
    (layers): ModuleList(
      (0-31): 32 x LlamaDecoderLayer(
        (self_attn): LlamaAttention(
          (q_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (k_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (v_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (o_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (rotary_emb): LlamaRotaryEmbedding()
        )
        (mlp): LlamaMLP(
          (gate_proj): Linear(in_features=4096, out_features=11008, bias=False)
          (up_proj): Linear(in_features=4096, out_features=11008, bias=False)
          (down_proj): Linear(in_features=11008, out_features=4096, bias=False)
          (act_fn): SiLU()
        )
        (input_layernorm): LlamaRMSNorm()
        (post_attention_layernorm): LlamaRMSNorm()
      )
    )
    (norm): LlamaRMSNorm()
  )
 

In [28]:
print(test_dataset[0]["instruction"])

<s>[INST]
"conversation": [
{
"utterance_ID": 1
"text": Hey !
"speaker": Joey
}
{
"utterance_ID": 2
"text": So , what are you guys in the market for ? We have got uh , scarves , tulip post cards ...
"speaker": The Vendor
}
{
"utterance_ID": 3
"text": Check this out ? Huh ? Yeah . That is the stuff . What do you think ?
"speaker": Joey
}
{
"utterance_ID": 4
"text": Well , I do not have to buy that , " I am with stupid " T ... shirt anymore .
"speaker": Chandler
}
{
"utterance_ID": 5
"text": Well , I like it . Here you go .
"speaker": Joey
}
{
"utterance_ID": 6
"text": All right , look , you are not really gonna buy that are you ? Do not you think you have embarrassed me enough for one day ?
"speaker": Chandler
}
{
"utterance_ID": 7
"text": Oh , I embarrass you ?
"speaker": Joey
}
{
"utterance_ID": 8
"text": How can I answer that when I am pretending I do not know you ?
"speaker": Chandler
}
{
"utterance_ID": 9
"text": Oh really ? Then how come no one here is wearing them ?
"speaker": Ch

In [29]:
import string

uppercase_set = set(string.ascii_uppercase)
lowercase_set = set(string.ascii_lowercase)
chars = uppercase_set.union(lowercase_set)

In [35]:
def extract_emotion_label(pred_label):
    """pred_label is raw output string from the model
    Like ["utterance_id :: anger ...", "2 :: fear ...", ]
    It could be inconsistent
    """
    # print("pred_label:    ",pred_label)
    label = pred_label.split("::")[1]
    label = label.strip()
    out_label = ""
    for c in label:
        if c not in chars:
            break
        out_label += c
    return out_label.strip()

In [36]:
def get_full_output(prompt):
    input_ids = tokenizer(prompt, return_tensors='pt',truncation=True).input_ids.cuda()
    outputs = model.generate(input_ids=input_ids, max_new_tokens=256)
    output = tokenizer.batch_decode(outputs.detach().cpu().numpy(), skip_special_tokens=True)[0]
    return output

In [37]:
def infer_emotion(dataset):
    pred_labels_list = []
    true_labels_list = []
    for i in range(len(dataset)):
        # predict
        prompt = dataset["instruction"][i]
        # print("prompt:    ",prompt)
        output = get_full_output(prompt)
        print("output:********",output)
        pred_labels_list.append(output)
        # baseline
        baseline_output = dataset["out"][i]
        baseline_output = extract_emotion_label(baseline_output.strip())
        baseline_output = baseline_output.strip().lower()
        print("True:********", baseline_output)
        print("---------------------")
        true_labels_list.append(baseline_output)
    return pred_labels_list, true_labels_list

In [38]:
test_dataset[0]

{'instruction': '<s>[INST]\n"conversation": [\n{\n"utterance_ID": 1\n"text": Hey !\n"speaker": Joey\n}\n{\n"utterance_ID": 2\n"text": So , what are you guys in the market for ? We have got uh , scarves , tulip post cards ...\n"speaker": The Vendor\n}\n{\n"utterance_ID": 3\n"text": Check this out ? Huh ? Yeah . That is the stuff . What do you think ?\n"speaker": Joey\n}\n{\n"utterance_ID": 4\n"text": Well , I do not have to buy that , " I am with stupid " T ... shirt anymore .\n"speaker": Chandler\n}\n{\n"utterance_ID": 5\n"text": Well , I like it . Here you go .\n"speaker": Joey\n}\n{\n"utterance_ID": 6\n"text": All right , look , you are not really gonna buy that are you ? Do not you think you have embarrassed me enough for one day ?\n"speaker": Chandler\n}\n{\n"utterance_ID": 7\n"text": Oh , I embarrass you ?\n"speaker": Joey\n}\n{\n"utterance_ID": 8\n"text": How can I answer that when I am pretending I do not know you ?\n"speaker": Chandler\n}\n{\n"utterance_ID": 9\n"text": Oh reall

In [78]:
print(len(test_dataset)-1000)

406


In [39]:
pred, true = infer_emotion(test_dataset[:len(test_dataset)-1000])

output:******** [INST]
"conversation": [
{
"utterance_ID": 1
"text": Hey !
"speaker": Joey
}
{
"utterance_ID": 2
"text": So , what are you guys in the market for ? We have got uh , scarves , tulip post cards ...
"speaker": The Vendor
}
{
"utterance_ID": 3
"text": Check this out ? Huh ? Yeah . That is the stuff . What do you think ?
"speaker": Joey
}
{
"utterance_ID": 4
"text": Well , I do not have to buy that , " I am with stupid " T ... shirt anymore .
"speaker": Chandler
}
{
"utterance_ID": 5
"text": Well , I like it . Here you go .
"speaker": Joey
}
{
"utterance_ID": 6
"text": All right , look , you are not really gonna buy that are you ? Do not you think you have embarrassed me enough for one day ?
"speaker": Chandler
}
{
"utterance_ID": 7
"text": Oh , I embarrass you ?
"speaker": Joey
}
{
"utterance_ID": 8
"text": How can I answer that when I am pretending I do not know you ?
"speaker": Chandler
}
{
"utterance_ID": 9
"text": Oh really ? Then how come no one here is wearing them ?


In [63]:
import re

def extract_first_emotion(pred_str):
    target_strings = ["anger", "fear", "joy", "disgust", "sadness", "surprise", "neutral"]
    pattern = re.compile('|'.join(map(re.escape, target_strings)))
    # Find the first match
    match = pattern.search(pred_str)
    # Print the result
    if match:
        print(f"Found emotion: {match.group()}")
        print(f"Index: {match.start()}")
        return match.group()
    else:
        print("No emotion found.")
        return ""

In [66]:
def parse_formatted_output(pred):
    """Extracts the emotion from well-formatted output of the form
    <text> utterance_id :: emotion_label <text> :: emotion_label"""
    pred_labels = []
    for i in range(len(pred)):
        output = pred[i]
        if len(output.split("utterance_id :: emotion_label\n[/INST]")) == 2:
            output = extract_first_emotion(output.split("utterance_id :: emotion_label\n[/INST]")[1])
            pred_labels.append(output)
            continue
        output = output.split("utterance_id :: emotion_label\n[/INST]")[1]
        print("output:*****",output)
        output = extract_emotion_label(output.strip())
        output = output.strip().lower()
        print("Predicted Output:*****", output)
        pred_labels.append(output)
    return pred_labels

In [67]:
pred_labels = parse_formatted_output(pred)

Found emotion: joy
Index: 38
Found emotion: joy
Index: 38
Found emotion: joy
Index: 7
Found emotion: disgust
Index: 7
Found emotion: joy
Index: 7
Found emotion: disgust
Index: 38


In [68]:
print(pred_labels)

['joy', 'joy', 'joy', 'disgust', 'joy', 'disgust']


In [72]:
print(true)

['joy', 'neutral', 'joy', 'neutral', 'neutral', 'anger']


In [70]:
def evaluate_emotion_labels(pred_labels_list, true_labels_list):
    total_correct = 0
    for i in range(len(pred_labels_list)):
        if pred_labels_list[i] == true_labels_list[i]:
            total_correct += 1
    return total_correct / len(pred_labels_list)

In [71]:
print("Accuracy:")
print(evaluate_emotion_labels(pred_labels, true))

Accuracy:
0.3333333333333333


# Evaluate in one go

In [73]:
pred_labels = parse_formatted_output(pred)

Found emotion: joy
Index: 38
Found emotion: joy
Index: 38
Found emotion: joy
Index: 7
Found emotion: disgust
Index: 7
Found emotion: joy
Index: 7
Found emotion: disgust
Index: 38


In [74]:
print(len(test_dataset["instruction"]))

1406


In [75]:
pred_labels_list = []
true_labels_list = []
correct=0
for i in range(len(test_dataset)-1000):
    print(f"Iter {i}")
    print("------------------------------------")
    # predict
    prompt = test_dataset["instruction"][i]
    output = get_full_output(prompt)
    pred_labels_list.append(output)
    # extract
    if len(output.split("utterance_id :: emotion_label\n[/INST]")) == 2:
        output = extract_first_emotion(output.split("utterance_id :: emotion_label\n[/INST]")[1])
        print("Predicted: ", output)
    else:
        output = output.split("utterance_id :: emotion_label\n[/INST]")[1]
        output = extract_emotion_label(output.strip())
        print("Predicted: ", output.strip().lower())
    # baseline
    baseline_output = test_dataset["out"][i]
    baseline_output = extract_emotion_label(baseline_output.strip())
    baseline_output = baseline_output.strip().lower()
    print("True : ", baseline_output)
    true_labels_list.append(baseline_output)
    # Accuracy
    if output == baseline_output:
        print("Correct")
        correct += 1
    else:
        print("Incorrect")
    print("------------------------------------")

Iter 0
------------------------------------
Found emotion: joy
Index: 38
Predicted:  joy
True :  joy
Correct
------------------------------------
Iter 1
------------------------------------
Found emotion: joy
Index: 38
Predicted:  joy
True :  neutral
Incorrect
------------------------------------
Iter 2
------------------------------------
Found emotion: joy
Index: 7
Predicted:  joy
True :  joy
Correct
------------------------------------
Iter 3
------------------------------------
Found emotion: disgust
Index: 7
Predicted:  disgust
True :  neutral
Incorrect
------------------------------------
Iter 4
------------------------------------
Found emotion: joy
Index: 7
Predicted:  joy
True :  neutral
Incorrect
------------------------------------
Iter 5
------------------------------------
Found emotion: disgust
Index: 38
Predicted:  disgust
True :  anger
Incorrect
------------------------------------
Iter 6
------------------------------------
Found emotion: surprise
Index: 38
Predicted: 

In [76]:
print(correct)

105


In [77]:
print("Accuracy: {}".format(correct / 406))

Accuracy: 0.25862068965517243
