In [48]:
!pip install seqeval

Collecting seqeval
  Downloading seqeval-1.2.2.tar.gz (43 kB)
  Preparing metadata (setup.py) ... [?25done
Collecting scikit-learn>=0.21.3 (from seqeval)
  Downloading scikit_learn-1.6.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (18 kB)
Collecting scipy>=1.6.0 (from scikit-learn>=0.21.3->seqeval)
  Downloading scipy-1.15.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
Collecting joblib>=1.2.0 (from scikit-learn>=0.21.3->seqeval)
  Downloading joblib-1.4.2-py3-none-any.whl.metadata (5.4 kB)
Collecting threadpoolctl>=3.1.0 (from scikit-learn>=0.21.3->seqeval)
  Downloading threadpoolctl-3.5.0-py3-none-any.whl.metadata (13 kB)
Downloading scikit_learn-1.6.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.1/13.1 MB[0m [31m24.9 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Downloading joblib-1.4.2-py3-none-any.whl (301 kB)
Downloading scipy-1.15.

In [None]:
import os
import torch
import transformers
import peft
import datasets
import evaluate
import time
assert torch.cuda.is_available(), "you need cuda for this part"
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')


  from .autonotebook import tqdm as notebook_tqdm


In [None]:
#device = 'cpu'

In [None]:
label_list = ["O","B","I"]
id2label = {
    0: "O",
    1: "B",
    2: "I",
}

label2id = {
    "O": 0,
    "B": 1,
    "I": 2,
}

In [None]:
dataset = datasets.load_dataset("hodza/BlackBox.Shkola.2014")['train']

In [None]:
dataset

In [None]:


lines = [[line, "/Mod/" in fname]  for text, fname in zip(dataset['texts'], dataset['names']) for line in text.split('\n') if len(line) > 10 and not line.isspace()]

is_code = [is_code for l, is_code in lines]
lines = [l for l, is_code in lines]

dataset = datasets.Dataset.from_dict({"texts" : lines, "is_code" : is_code})

In [None]:
def labelize(sample):
    tkns = sample['texts'].split()
    if sample['is_code']:
        labels = [label2id['I'] for x in tkns]
        labels[0] = label2id['B']
    else:
        labels = [label2id['O'] for x in tkns]
    return {"tokens" :tkns, "tags" : labels}


dataset = dataset
dataset = dataset.map(labelize)
dataset = dataset.train_test_split()

In [None]:
print(dataset['train'][140]['texts'], dataset['train'][140]['tags'])

In [None]:
model_name = "distilbert/distilbert-base-cased"
tokenizer = transformers.AutoTokenizer.from_pretrained(model_name)


In [None]:
model = transformers.AutoModelForTokenClassification.from_pretrained(model_name, device_map=device, num_labels=3, id2label=id2label, label2id=label2id)

In [None]:
data_collator = transformers.DataCollatorForTokenClassification(tokenizer=tokenizer)

In [None]:
import evaluate

seqeval = evaluate.load("seqeval")

In [None]:
import numpy as np
#labels = [label_list[i] for i in dataset[f"tags"]]
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)
    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    results = seqeval.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

In [None]:
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)
    labels = []
    for i, label in enumerate(examples["tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)  # Map tokens to their respective word.
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:  # Set the special tokens to -100.
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:  # Only label the first token of a given word.
                label_ids.append(label[word_idx])
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx
        labels.append(label_ids)
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [None]:
tokenized_dataset = dataset.map(tokenize_and_align_labels, batched=True)

In [20]:
training_args = transformers.TrainingArguments(
    output_dir="./results/code_model",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=2,
    weight_decay=0.01,
    eval_strategy="epoch",
    save_strategy="epoch",
    gradient_accumulation_steps = 1,
    load_best_model_at_end=True,
    push_to_hub=False,
 
)

trainer = transformers.Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["test"],
    processing_class=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    
)

trainer.train()

Epoch,Training Loss,Validation Loss,Precision,Recall,F1,Accuracy
1,No log,0.089645,0.895833,0.945055,0.919786,0.976318
2,No log,0.10254,0.878788,0.956044,0.915789,0.976737


TrainOutput(global_step=100, training_loss=0.06323957920074463, metrics={'train_runtime': 48.3513, 'train_samples_per_second': 32.678, 'train_steps_per_second': 2.068, 'total_flos': 206435630592000.0, 'train_loss': 0.06323957920074463, 'epoch': 2.0})

In [47]:
test = """
MODULE  AAA;
IMPORT StdLog;
PROCEDURE AAA;
BEGIN
END AAA;
END AAA.
"""

In [59]:
words = test.split()

inputs = tokenizer(words, return_tensors="pt", truncation=True, is_split_into_words=True).to(device)
word_ids = inputs.word_ids(batch_index=0)

with torch.no_grad():
    logits = model(**inputs).logits
predictions = torch.argmax(logits, dim=2)
predicted_token_class = [model.config.id2label[t.item()] for t in predictions[0]]

tags = []
prev_word_id = None
for wid, tag in zip(word_ids, predicted_token_class):
    if wid != prev_word_id:
        if prev_word_id is not None:
            print(prev_word_id, words[prev_word_id], tags)
        prev_word_id = wid
        tags = []
    else:
        tags.append(tag)
        


0 MODULE ['O', 'O', 'O']
1 AAA; ['O']
2 IMPORT ['O', 'O', 'O']
3 StdLog; ['O', 'O', 'O', 'O']
4 PROCEDURE ['O', 'O', 'O', 'O']
5 AAA; ['O']
6 BEGIN ['O', 'O']
7 END ['O']
8 AAA; ['O']
9 END ['O']
10 AAA. ['O']
