## Installing packages

In [None]:
%pip install --upgrade transformers accelerate bitsandbytes;

In [None]:
# needed for Mistral-7B downloading
from huggingface_hub import notebook_login
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [None]:
# assuming all datasets are here
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## LLM manager

The following is the corrected code for model initialization and quantization

In [None]:
from abc import ABC, abstractmethod

import torch
import transformers
from transformers import AutoTokenizer, BitsAndBytesConfig
from transformers.generation import GenerationConfig

from openai import OpenAI
import os
import tiktoken


class LlmManager(ABC):
    """
    An "interface" for various LLM manager objects.
    """

    @abstractmethod
    def chat_completion(
        self,
        prompt,
        print_result=False,
        seed=42,
        max_new_tokens=128,
        do_sample=True,
        temperature=0.7,
        top_p=0.95,
        repetition_penalty=1.0,
    ):
        pass


class HuggingFaceLlmManager(LlmManager):
    def __init__(
        self,
        model_name,
        cache_dir="/vol/bitbucket/clarg/argumentative-llm/cache",
        model_args=None,
        input_device="cuda:0",
        quantization="4bit",
    ):
        super().__init__()

        if quantization == "4bit":
            quantization_config = BitsAndBytesConfig(
                load_in_4bit=True,
                bnb_4bit_compute_dtype=torch.bfloat16,
            )
        elif quantization == "8bit":
            quantization_config = BitsAndBytesConfig(
                load_in_8bit=True,
            )
        elif quantization == "none":
            quantization_config = None
        else:
            raise ValueError(f"Invalid quantization value {quantization}")

        self.tokenizer = AutoTokenizer.from_pretrained(model_name, cache_dir=cache_dir)
        self.pipeline = transformers.pipeline(
            "text-generation",
            model=model_name,
            device_map="auto",
            model_kwargs={
                "torch_dtype": torch.float16,
                "quantization_config": quantization_config,
                "cache_dir": cache_dir,
            },
        )
        self.input_device = input_device

    def chat_completion(
        self,
        message,
        print_result=False,
        seed=42,
        max_new_tokens=128,
        do_sample=True,
        temperature=0.7,
        top_p=0.95,
        repetition_penalty=1.0,
        constraint_prefix=None,
        constraint_options=None,
        constraint_end_after_options=False,
        trim_response=True,
        apply_template=True,
    ):
        transformers.set_seed(seed)
        messages = [{"role": "user", "content": message}]

        if apply_template:
            prompt = self.pipeline.tokenizer.apply_chat_template(
                messages, tokenize=False, add_generation_prompt=True
            )
        else:
            prompt = message

        if constraint_prefix is not None or constraint_options is not None:
            prefix_allowed_tokens_fn = self.construct_constraint_fun(
                prompt,
                force_prefix=constraint_prefix,
                force_options=constraint_options,
                end_after_options=constraint_end_after_options,
            )
        else:
            prefix_allowed_tokens_fn = None

        response = self.pipeline(
            prompt,
            max_new_tokens=max_new_tokens,
            do_sample=do_sample,
            temperature=temperature,
            top_p=top_p,
            repetition_penalty=repetition_penalty,
            prefix_allowed_tokens_fn=prefix_allowed_tokens_fn,
            pad_token_id=self.tokenizer.eos_token_id
        )[0]["generated_text"]

        if print_result:
            print(response, flush=True)

        if trim_response:
            response = response.replace(prompt, "").strip()

        return response

    def construct_constraint_fun(self, prompt, force_prefix=None, force_options=None, end_after_options=False):
        if force_prefix is not None:
            force_prefix = self.tokenizer(force_prefix).input_ids # [1:]
            # print(force_prefix)
        if force_options is not None:
            force_options = [self.tokenizer(op).input_ids[1:] for op in force_options]

        def constraint_fun(batch_id, input_ids):
            prompt_len = len(self.tokenizer(prompt).input_ids[1:])
            generated_tokens = input_ids[prompt_len:].tolist()
            num_generated = len(generated_tokens)
            prefix_len = 0 if force_prefix is None else len(force_prefix)

            if force_prefix is not None and num_generated < prefix_len:
                # Force prefix to be generated first if provided
                return [force_prefix[num_generated]]
            elif num_generated >= prefix_len and force_options is not None:
                # Determine what option tokens have been generated
                op_tokens = generated_tokens[prefix_len:]
                num_op = len(op_tokens)

                # Calculate valid option continuations
                possible_continuations = [
                    c[num_op] for c in force_options if num_op < len(c) and c[:num_op] == op_tokens
                ]

                if not possible_continuations and end_after_options:
                    # No further continuations — terminate generation as requested
                    return [self.tokenizer.eos_token_id]
                elif not possible_continuations:
                    # No further continuations, but can continue free generation
                    return list(range(self.tokenizer.vocab_size))
                else:
                    # Allow generation to terminate if desirable
                    if op_tokens in force_options:
                        possible_continuations.append(self.tokenizer.eos_token_id)
                    # Force generation according to options
                    return possible_continuations
            else:
                return list(range(self.tokenizer.vocab_size))

        return constraint_fun



In [None]:
llm_manager = HuggingFaceLlmManager(
            model_name="mistralai/Mistral-7B-v0.1",
            quantization="4bit",
        )

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/996 [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/493k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.80M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/414 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/571 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/25.1k [00:00<?, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/9.94G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/4.54G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

Device set to use cuda:0


## Loading datasets

In [None]:
import os
path = "/content/drive/llms_argumentation master replication-Datasets"
datasets_names = [f for f in os.listdir(path) if os.path.isdir(os.path.join(path, f))]
datasets_names.remove("few_shot")

In [None]:
from datasets import load_from_disk

DATASET_NUMBER = 11
ds = load_from_disk(path + "/" + datasets_names[DATASET_NUMBER])
DATASET_NAME = datasets_names[DATASET_NUMBER]
DATASET_NAME

'WebDataset'

## Inference

In [None]:
import csv
def generate_prompt(arg1, arg2, nr=False, support_label="Support", attack_label="Attack", no_label="No", n_shot="0", primer=None, instruction=True, **_):
        def formatter(relation):
            logits = None
            if type(relation) == tuple:
                logits = relation[1]
                relation = relation[0]

            if (rel := relation.replace("Relation:", "").strip()) not in [support_label, attack_label, no_label]: # !!!!
                if logits:
                    logits = {token.token: token.logprob for token in logits}
                    support=attack=no=-float('inf')
                    if support_label in logits.keys():
                        support = logits[support_label]

                    if attack_label in logits.keys():
                        attack = logits[attack_label]

                    if no_label in logits.keys():
                        support = logits[no_label]

                    if support > attack and support > no:
                        return 1
                    elif attack > support and attack > no:
                        return 0
                    elif no > attack and no > support:
                        return 2
                return -1

            return (1 if rel == support_label else 0) if rel != no_label else 2

        constraints = {
            "constraint_prefix": "Relation:",
            "constraint_options": [support_label, attack_label] + ([no_label] if nr else []),
            "constraint_end_after_options": True,
        }

        instructions = (f"In this task, you will be given two arguments and your goal is to classify " +
                       (f"the relation between them as either “{support_label}”, or “{attack_label}” based on the " if not nr else
                        f"the relation between them as either “{support_label}”, “{attack_label}”, or “{no_label}” based on the ") +
                        f"definitions below.\n'{support_label}': It is an argument that is in favour of to the parent "
                        f"argument.\n'{attack_label}': It is an argument that contradicts or opposes the parent "
                        f"argument.\n" + (f"\n" if not nr else f"'{no_label}': It is an argument that has no relation "
                                                               f"to the parent argument.\n"))
        if not instruction:
            instructions = ""

        if n_shot != "0":
            file = open(path + f"/few_shot/{n_shot}/{primer}")
            reader = csv.reader(file, delimiter='#', quotechar='|', quoting=csv.QUOTE_MINIMAL)
            primer = ""
            for row in reader:
                primer += (f"Arg1: {row[0]}\nArg2: {row[1]}\nRelation: "
                           f"{(support_label if row[2] == '1' else attack_label) if row[2] != '2' else no_label}\n\n")
        else:
            primer = ""

        prompt = instructions + primer + f"Arg1: {arg1}\nArg2: {arg2}"

        return prompt, constraints, formatter

In [None]:
# parameters from the paper
N_SHOT = "2A2S"
SEED = 2
PRIMER = f"seed_{SEED}.csv"

In [None]:
# generation params from the paper
generation_args = {
        "temperature": 0.7,
        "max_new_tokens": 10,
        "top_p": 1,
        "do_sample": False,
    }

In [None]:
# main loop, computers classes for a given dataset
from tqdm import tqdm
filename = f"{DATASET_NAME}_{N_SHOT}_{SEED}_constrained.txt"
for data in tqdm(ds):
    generated_prompt, constraints, format_fun = generate_prompt(
                    data["arg1"], data["arg2"], n_shot=N_SHOT, instruction=None, nr=False,
                    primer=PRIMER
                )
    output = llm_manager.chat_completion(
                generated_prompt,
                print_result=False,
                trim_response=True,
                apply_template=False,
                **constraints,
                **generation_args,
            )
    prediction = format_fun(output)
    ground_truth = data['support']

    with open(filename, "a") as f:
        f.write(f"{prediction},{ground_truth}\n")

In [None]:
# optional: downloading all predictions
from google.colab import files
files.download(filename)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
# scores computation
from sklearn.metrics import f1_score

ground_truth_labels = []
predicted_labels = []

filename = f"{DATASET_NAME}_{N_SHOT}_{SEED}_constrained.txt"

with open(filename, 'r') as f:
    for line in f:
        pred, gt = map(int, line.strip().split(','))
        ground_truth_labels.append(gt)
        predicted_labels.append(pred)

f1_per_class = f1_score(ground_truth_labels, predicted_labels, average=None)


f1_macro = f1_score(ground_truth_labels, predicted_labels, average='macro')
f1_weighted = f1_score(ground_truth_labels, predicted_labels, average='weighted')

print(f"F1 Score per class (0: Attack, 1: Support{', 2: No' if 2 in set(ground_truth_labels + predicted_labels) else ''}): {f1_per_class}")
print(f"Macro F1 Score: {f1_macro}")
print(f"Weighted F1 Score: {f1_weighted}")


F1 Score per class (0: Attack, 1: Support): [0.7012987  0.56152513]
Macro F1 Score: 0.6314119156406852
Weighted F1 Score: 0.6257188909662962
