In [0]:
!pip install transformers==4.31.0 datasets==2.15.0 trl==0.7.10 peft==0.7.1 bitsandbytes==0.42.0 flash-attn==2.5.0

In [0]:
import os
import time
import torch
import pickle
import datasets
import warnings
import numpy as np
import pandas as pd
from tqdm import tqdm
from trl import SFTTrainer
from datasets import Dataset, load_dataset
from peft import LoraConfig
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
    TrainingArguments,
)

from peft import (
    PeftConfig,
    PeftModel,
)

from sklearn.metrics import (
    accuracy_score,
    f1_score,
    precision_score,
    recall_score,
)

In [0]:
pretrained_ckpt = "tiiuae/falcon-7b"

legal_type = "consumer_protection"

experiment_run = 1
lora_r = 64
epochs = 20
dropout = 0.25

results_dir = f"experiments/falcon_nli-{legal_type}_epochs-{epochs}_rank-{lora_r}_dropout-{dropout}_expRun-{str(experiment_run)}"

In [0]:
TRAINING_PROMPT = """###Premise:{premise} ###Hypothesis:{hypothesis} ###Label:{label}"""
INFERENCE_PROMPT = """###Premise:{premise} ###Hypothesis:{hypothesis} ###Label:"""

In [0]:
def get_train_nli_data(legal_type: str) -> pd.DataFrame:
    justice_lens_dataset = datasets.load_dataset("darrow-ai/LegalLensNLI")

    df = (
        justice_lens_dataset["train"]
        .filter(lambda example: example["legal_act"] != legal_type)
        .to_pandas()
    )
    return df


def get_test_nli_data(legal_type: str) -> pd.DataFrame:
    justice_lens_dataset = datasets.load_dataset("darrow-ai/LegalLensNLI")

    df = (
        justice_lens_dataset["train"]
        .filter(lambda example: example["legal_act"] == legal_type)
        .to_pandas()
    )
    return df


def prepare_instruction(premise, hypothesis, label, is_train=False):
    if is_train:
        instruction = TRAINING_PROMPT.format(
            premise=premise, hypothesis=hypothesis, label=label
        )
    else:
        instruction = INFERENCE_PROMPT.format(
            premise=premise,
            hypothesis=hypothesis,
        )

    return instruction


def get_instructions(df, is_train=False):
    instructions = []
    for idx, row in df.iterrows():
        premise = row["premise"]
        hypothesis = row["hypothesis"]
        label = row["label"]

        prompt = prepare_instruction(premise, hypothesis, label, is_train=is_train)

        instructions.append(prompt)

    return instructions


def predict_one_sample(prompt, model, tokenizer):
    input_ids = tokenizer(prompt, return_tensors="pt", truncation=True).input_ids.cuda()
    with torch.inference_mode():
        outputs = model.generate(
            input_ids=input_ids,
            max_new_tokens=250,
            do_sample=True,
            top_p=0.95,
            temperature=0.01,
        )
        output = tokenizer.batch_decode(
            outputs.detach().cpu().numpy(), skip_special_tokens=True
        )[0][len(prompt) :]

    return output

In [0]:
df_train = get_train_nli_data(legal_type=legal_type)
train_instructions = get_instructions(df_train, is_train=True)
train_dataset = datasets.Dataset.from_pandas(
    pd.DataFrame(data={"instructions": train_instructions})
)

df_test = get_test_nli_data(legal_type=legal_type)
test_instructions = get_instructions(df_test, is_train=False)
test_dataset = datasets.Dataset.from_pandas(
    pd.DataFrame(data={"instructions": test_instructions})
)

In [0]:
use_flash_attention = False

tokenizer = AutoTokenizer.from_pretrained(pretrained_ckpt)
tokenizer.pad_token = tokenizer.eos_token

print("Getting PEFT method")

peft_config = LoraConfig(
    task_type="CAUSAL_LM",
    lora_alpha=32,
    r=lora_r,
    lora_dropout=dropout,
    target_modules=["query_key_value", "dense", "dense_h_to_4h", "dense_4h_to_h"],
)

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_use_double_quant=True,
)

model_name_or_path = pretrained_ckpt
model = AutoModelForCausalLM.from_pretrained(
    pretrained_ckpt,
    quantization_config=bnb_config,
    trust_remote_code=True,
    device_map={"": 0},
)
model.config.use_cache = False

# Define training args
training_args = TrainingArguments(
    logging_steps=100,
    report_to="none",
    per_device_train_batch_size=1,
    gradient_accumulation_steps=4,
    per_device_eval_batch_size=8,
    output_dir=results_dir,
    learning_rate=2e-4,
    num_train_epochs=epochs,
    logging_dir=f"{results_dir}/logs",
    fp16=True,
    optim="paged_adamw_32bit",
    lr_scheduler_type="constant",
    max_grad_norm=0.3,
    warmup_ratio=0.03,
)

print(f"training_args = {training_args}")
trainer = SFTTrainer(
    model=model,
    args=training_args,
    peft_config=peft_config,
    tokenizer=tokenizer,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    max_seq_length=512,
    dataset_text_field="instructions",
    packing=True,
)

trainer_stats = trainer.train()
train_loss = trainer_stats.training_loss
print(f"Training loss:{train_loss}")

peft_model_id = f"{results_dir}/assets"
trainer.model.save_pretrained(peft_model_id)
tokenizer.save_pretrained(peft_model_id)

with open(f"{results_dir}/results.pkl", "wb") as handle:
    run_result = [
        epochs,
        lora_r,
        dropout,
        train_loss,
    ]
    pickle.dump(run_result, handle)
print("Experiment over")

In [0]:
peft_model_id = os.path.join(results_dir, "assets")

config = PeftConfig.from_pretrained(peft_model_id)

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16,
)

model = AutoModelForCausalLM.from_pretrained(
    config.base_model_name_or_path,
    quantization_config=bnb_config,
    trust_remote_code=True,
)
model = PeftModel.from_pretrained(model, peft_model_id)
model.eval()

tokenizer = AutoTokenizer.from_pretrained(config.base_model_name_or_path)
tokenizer.pad_token = tokenizer.eos_token

# Do few-shot prompting
responses = []
labels = df_test["label"].to_list()

save_dir = os.path.join(results_dir, "inference")
if not os.path.exists(save_dir):
    os.makedirs(save_dir)

for prompt in tqdm(test_instructions):
    response = predict_one_sample(prompt, model, tokenizer)
    responses.append(response)


metrics = {
    "micro_f1": f1_score(labels, responses, average="micro"),
    "macro_f1": f1_score(labels, responses, average="macro"),
    "micro_precision": precision_score(labels, responses, average="micro"),
    "micro_recall": recall_score(labels, responses, average="micro"),
    "macro_precision": precision_score(labels, responses, average="macro"),
    "macro_recall": recall_score(labels, responses, average="macro"),
    "accuracy": accuracy_score(labels, responses),
}
print(metrics)