# core

> Fill in a module description here

In [None]:
# | default_exp core

In [None]:
# | export
import evaluate
import random
import time

import numpy as np

from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    DataCollatorWithPadding,
    Trainer,
)

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# | hide
from nbdev.showdoc import *

In [None]:
# | export

def classify(x, labels, llm_labeler, max_failures=5, default_label=0):
    # do random sleep to avoid rate limiting
    num_sleep = random.randint(0, 5)
    time.sleep(num_sleep)
    failures = 0
    while failures < max_failures:
        try:
            label = labels.index(llm_labeler(x)[0])
            time.sleep(1)
            return label
        except Exception as e:
            failures += 1
            print(e)
            time.sleep(1)
            pass
    if failures == max_failures:
        return default_label

In [None]:
# | export
def label_dataset(
    dataset, text_column, labeler_model, labels, sample=0.1, num_workers=4, max_chars=4_096
):
    """
    Filters a dataset using a labeler model.

    Args:
        dataset (datasets.Dataset): Dataset to filter
        text_column (str): Name of the column containing the text to classify
        labeler_model (Any): Model to use for labeling
        labels (List[str]): List of labels
        sample (float): The fraction of the dataset to label and use for filtering
        batch_size (int): Batch size for labeling
        num_workers (int): Number of workers for labeling
        max_chars (int): Maximum number of characters to truncate the text to before labeling (reduces rate limiting errors)
    """

    # Get a subset of the dataset
    subset = dataset.shuffle(seed=115).select(range(int(len(dataset) * sample)))

    # Label the subset
    subset = subset.map(
        lambda x: {"label": classify(x[text_column][:max_chars], labels, labeler_model)},
        batched=False,
        num_proc=num_workers,
    )

    return subset

In [None]:
from functools import partial
from datasets import load_dataset


def mock_labeler(x, labels):
    return [np.random.choice(labels, p=[0.25, 0.75])]


labels = ["positive", "negative"]
labeler = partial(mock_labeler, labels=labels)
ds = load_dataset("bigcode/the-stack-smol", data_dir="data/python")["train"]

subset = label_dataset(ds, "content", labeler, labels, sample=0.1)

assert "label" in subset.column_names

Using custom data configuration bigcode--the-stack-smol-8f8055c3a4e4b4e3
Found cached dataset json (/admin/home-nathan/.cache/huggingface/datasets/bigcode___json/bigcode--the-stack-smol-8f8055c3a4e4b4e3/0.0.0/e6070c77f18f01a5ad4551a8b7edfba20b8438b7cad4d94e6ad9378022ce4aab)
100%|██████████| 1/1 [00:00<00:00,  2.87it/s]
Loading cached shuffled indices for dataset at /admin/home-nathan/.cache/huggingface/datasets/bigcode___json/bigcode--the-stack-smol-8f8055c3a4e4b4e3/0.0.0/e6070c77f18f01a5ad4551a8b7edfba20b8438b7cad4d94e6ad9378022ce4aab/cache-feaf44b92e145e5a.arrow
Loading cached processed dataset at /admin/home-nathan/.cache/huggingface/datasets/bigcode___json/bigcode--the-stack-smol-8f8055c3a4e4b4e3/0.0.0/e6070c77f18f01a5ad4551a8b7edfba20b8438b7cad4d94e6ad9378022ce4aab/cache-17846c759c765b1d.arrow
Loading cached processed dataset at /admin/home-nathan/.cache/huggingface/datasets/bigcode___json/bigcode--the-stack-smol-8f8055c3a4e4b4e3/0.0.0/e6070c77f18f01a5ad4551a8b7edfba20b8438b7cad4d

In [None]:
# | export
def train_labeler(
    dataset,
    text_column,
    base_model_name,
    labels,
    training_args,
    test_set_size=0.05,
    num_workers=4,
    max_length=512,
    push_to_hub=False,
):
    """
    Trains a labeler model on a labeled dataset.

    Args:
        dataset (datasets.Dataset): Dataset to train on
        text_column (str): Name of the text column
        base_model_name (str): Name of the base model to use
        labels (list): List of labels
        training_args (transformers.TrainingArguments): Training arguments
        test_set_size (float): Fraction of the dataset to use for testing
        num_workers (int): Number of workers for training
        max_length (int): Maximum length of the input
    """
    # Load the tokenizer
    tokenizer = AutoTokenizer.from_pretrained(base_model_name, max_length=max_length)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    # Load the model
    model = AutoModelForSequenceClassification.from_pretrained(
        base_model_name, num_labels=len(labels), max_length=max_length
    )
    model.config.id2label = {i: label for i, label in enumerate(labels)}

    # Preprocess the dataset
    dataset = dataset.map(
        lambda x: tokenizer(
            x[text_column], padding="max_length", truncation=True, max_length=max_length
        ),
        batched=True,
        num_proc=num_workers,
    )

    # Split the dataset
    dataset = dataset.train_test_split(test_size=test_set_size, seed=115)

    # Get the data collator
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

    def compute_metrics(eval_preds):
        acc_metric = evaluate.load("accuracy")
        precision_metric = evaluate.load("precision")
        recall_metric = evaluate.load("recall")
        f1_metric = evaluate.load("f1")
        logits, labels = eval_preds
        if isinstance(logits, tuple): # Some models return tuples
            logits = logits[0]
            
        predictions = np.argmax(logits, axis=-1)
        acc = acc_metric.compute(predictions=predictions, references=labels)
        precision = precision_metric.compute(predictions=predictions, references=labels, average="macro" if len(labels) > 2 else "binary")
        recall = recall_metric.compute(predictions=predictions, references=labels, average="macro" if len(labels) > 2 else "binary")
        f1 = f1_metric.compute(predictions=predictions, references=labels, average="macro" if len(labels) > 2 else "binary")
        return {**acc, **precision, **recall, **f1}

    # Get the trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=dataset["train"],
        eval_dataset=dataset["test"],
        data_collator=data_collator,
        compute_metrics=compute_metrics,
    )

    # Train the model
    trainer.train()

    # Push the model to the hub
    if push_to_hub:
        trainer.push_to_hub()

    # Return the model
    return model, tokenizer

In [None]:
# from transformers import TrainingArguments

# base_model_name = "prajjwal1/bert-small"
# batch_size = 4
# training_args = TrainingArguments(
#     output_dir="./data",
#     num_train_epochs=1,
#     per_device_train_batch_size=batch_size,
#     per_device_eval_batch_size=batch_size,
#     logging_dir="./logs",
#     logging_steps=50,
#     evaluation_strategy="epoch",
#     save_strategy="epoch",
#     metric_for_best_model="accuracy",
#     greater_is_better=True,
#     seed=115,
#     push_to_hub=False
# )
# model, tokenizer = train_labeler(
#     ds,
#     "content",
#     base_model_name,
#     labels=labels,
#     training_args=training_args,
# )
# assert type(model) == AutoModelForSequenceClassification

Some weights of the model checkpoint at prajjwal1/bert-small were not used when initializing BertForSequenceClassification: ['cls.predictions.decoder.bias', 'cls.predictions.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.decoder.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.weight']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForSequenceClassification were not initi

In [None]:
# | export
def filter_dataset(
    dataset, text_column, labeler_model, labels_to_keep, batch_size=32, num_workers=4
):
    """
    Filters a dataset using a labeler model.

    Args:
        dataset (datasets.Dataset): Dataset to filter
        text_column (str): Name of the text column
        labeler_model (transformers.pipelines.TextClassificationPipeline): Model to use for labeling
        labels_to_keep (list): List of labels to keep
        batch_size (int): Batch size for labeling
        num_workers (int): Number of workers for labeling
    """

    def label(x):
        predicted = labeler_model(x, padding=True, truncation=True, max_length=512)
        return {
            "label": [l["label"] for l in predicted],
            "score": [l["score"] for l in predicted],
        }


    # TODO: first just label the dataset with scores and everything
    # then just split the dataset into the number of subsets and configs so that people can specify which one they want

    # Label the dataset
    dataset = dataset.map(
        lambda x: label(x[text_column]),
        batched=True,
        batch_size=batch_size,
        num_proc=num_workers,
    )

    # Filter the dataset
    dataset = dataset.filter(lambda x: x["label"] in labels_to_keep)

    return dataset

In [None]:
pipe = pipeline(
    "text-classification", model=model, tokenizer=tokenizer, device=model.device
)
filtered_ds = filter_dataset(ds, "content", pipe, [0])

assert len(filtered_ds) < len(ds)

  0%|          | 0/10 [00:00<?, ?ba/s]

  0%|          | 0/10 [00:00<?, ?ba/s]

In [None]:
# | hide
import nbdev

nbdev.nbdev_export()