In [1]:
import numpy as np
import pandas
import torch
from datasets import Dataset
from peft import TaskType, LoraConfig, get_peft_model
from sklearn.metrics import precision_score, accuracy_score, recall_score, jaccard_score, hamming_loss
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from transformers import Trainer, TrainingArguments, EvalPrediction

MAX_LINES = 15

In [2]:
# loftq_config = LoftQConfig(loftq_bits=8)
peft_config = LoraConfig(task_type=TaskType.SEQ_CLS,
                         # init_lora_weights="loftq",
                         # loftq_config=loftq_config,
                         # target_modules=["k_proj", "o_proj", "q_proj", "v_proj"],
                         modules_to_save=["score"],
                         r=8)

model = AutoModelForSequenceClassification.from_pretrained("google/gemma-2b-it", num_labels=MAX_LINES+1, torch_dtype=torch.bfloat16, problem_type="multi_label_classification")
model = get_peft_model(model, peft_config)
tokenizer = AutoTokenizer.from_pretrained("google/gemma-2b-it")

model.print_trainable_parameters()

In [3]:
correct_programs = {
    "A" : """color(1..k).
node(N) :- e(_, N).
node(N) :- e(N, _).
1 { assign(N,C) : color(C) } 1 :- node(N).
:- e(N,M), assign(N,C), assign(M,C).""",
    "B" : """s(X) :- e(X,E).
k { sel(X) : s(X) } k.
inter(X, Y) :- e(X, E), e(Y, E), X != Y.
:- inter(X, Y), sel(X), sel(Y).""",
    "C": """v(X) :- e(X, _).
s(X) :- e(_, X).
0 { sel(X) : s(X) } k.
cov(X) :- v(X), e(X, S), sel(S).
:- not cov(X), v(X).""",
    "D": """vx(X) :- e(X,Y).
vx(X) :- e(Y,X).
0 { sel(X) : vx(X) } k.
:- not sel(X), not sel(Y), e(X,Y).""",
    "E" : """1 { set(X, a) ; set (X, b) } 1 :- vertex(X).
:- edge(X, Y), set(X, S), set(Y, S)."""
}

def prompt(instance):
    # return instance["incorrect_program"]
    correct = correct_programs[instance["instance"][11]]
    incorrect = "\n".join(map(lambda x: f"<{x[0]}>{x[1]}", enumerate(instance["incorrect_program"].splitlines())))
    prompt = f"<correct>{correct}\n<incorrect>{incorrect}"
    return prompt

In [4]:
df = pandas.read_feather("dataset_3.feather")
ds = Dataset.from_pandas(df)
ds = ds.train_test_split(test_size=0.1, seed=42)
ds = ds.map(lambda instance: {"text": prompt(instance), "labels": [1 if instance["missing_lines"] else 0] + list(instance["line_scores"]) + [0] * max(0, MAX_LINES - len(instance["line_scores"]))}, batched=False)
ds = ds.map(lambda examples: tokenizer(examples["text"], padding=True, truncation=True, return_tensors="pt"), batched=True)
ds.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"], device="cuda")

In [5]:
def multi_label_metrics(predictions, labels, threshold=0.5):
    # first, apply sigmoid on predictions which are of shape (batch_size, num_labels)
    sigmoid = torch.nn.Sigmoid()
    probs = sigmoid(torch.Tensor(predictions))
    # next, use threshold to turn them into integer predictions
    y_pred = np.zeros(probs.shape)
    y_pred[np.where(probs >= threshold)] = 1
    # finally, compute metrics
    y_true = labels
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true=y_true, y_pred=y_pred, average='micro')
    recall = recall_score(y_true=y_true, y_pred=y_pred, average='micro')
    jaccard = jaccard_score(y_true, y_pred, average='micro')
    hamming = hamming_loss(y_true, y_pred)
    # return as dictionary
    metrics = {'accuracy': accuracy,
               'precision': precision,
               'recall': recall,
               'jaccard': jaccard,
              'hamming': hamming}
    return metrics


def compute_metrics(p: EvalPrediction):
    preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
    result = multi_label_metrics(predictions=preds, labels=p.label_ids)
    return result


trainer = Trainer(
    model=model,
    args=TrainingArguments(
        output_dir="./",
        remove_unused_columns=False,
        learning_rate=1e-4,
        per_device_train_batch_size=4,
        per_device_eval_batch_size=4,
        num_train_epochs=2,
        evaluation_strategy="steps",
        save_strategy="steps",
        eval_steps=500,
        load_best_model_at_end=True
    ),
    train_dataset=ds["train"],
    eval_dataset=ds["test"],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

In [6]:
#trainer.evaluate()

In [7]:
trainer.train()

In [None]:
def predict(predictions, threshold=0.5):
    sigmoid = torch.nn.Sigmoid()
    probs = sigmoid(torch.Tensor(predictions))
    # next, use threshold to turn them into integer predictions
    y_pred = np.zeros(probs.shape)
    y_pred[np.where(probs >= threshold)] = 1
    return y_pred


text = """color((1..k)).
node(N) :- e(N, _).
 :- e(N, M), assign(N, C), assign(M, C).
 1 { assign(N, C) : color(C) } 1 :- node(N)."""

inputs = tokenizer(text, return_tensors="pt").to("cuda")
output = model(**inputs)
predict(output[0].cpu()[0])

In [None]:
#model.save_pretrained("lora_it_full_datasetv2_promptv1")