In [None]:
import pandas as pd
from datasets import Dataset, DatasetDict
from transformers import AutoTokenizer, AutoModelForTokenClassification, DataCollatorForTokenClassification, pipeline, \
    TrainingArguments, Trainer
import evaluate
import numpy as np

In [None]:
import yaml
import logging
from datetime import datetime

# YAML config
try:
    with open(r".\config.yaml", "r") as f:
        config = yaml.safe_load(f)
except Exception as e:
    raise

# Logger
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(name)s - %(funcName)s - %(message)s",
    filename=config["log_dir"] +
    f"{datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.log",
    filemode="w"
)
logger = logging.getLogger(__name__)

logger.info("Config file and logger setup completed.")

In [None]:
import requests

response = requests.get("path")
response = response.text
response = response.splitlines()
train_tokens = []
train_tags = []

temp_tokens = []
temp_tags = []
for line in response:
    if line != "":
        tag, token = line.strip().split("\t")
        temp_tags.append(tag)
        temp_tokens.append(token)
    else:
        train_tokens.append(temp_tokens)
        train_tags.append(temp_tags)

        temp_tokens, temp_tags = [], []

len(train_tokens), len(train_tags)

In [None]:
response = requests.get("path")
response = response.text
response = response.splitlines()

test_tokens = []
test_tags = []

temp_tokens = []
temp_tags = []
for line in response:
    if line != "":
        tag, token = line.strip().split("\t")
        temp_tags.append(tag)
        temp_tokens.append(token)
    else:
        test_tokens.append(temp_tokens)
        test_tags.append(temp_tags)

        temp_tokens, temp_tags = [], []

len(test_tokens), len(test_tags)

In [None]:
# Data Preprocessing
def load_and_preprocess_data(train_tags, train_tokens, test_tags, test_tokens) -> DatasetDict:
    """Loads training and testing data, performs tokenization and label alignment."""
    try:
        # Load data
        # train_tags = []
        # with open(train_tags, "r") as train_file_tags:
        #     train_tags.append(train_file_tags.readlines())
        # train_tags = [[i.rstrip() for i in item] for item in train_tags]
        # train_tags = [item[i].split() for item in train_tags for i in range(len(train_tags[0]))]

        # train_tokens = []
        # with open(train_tokens, "r") as train_file_tokens:
        #     train_tokens.append(train_file_tokens.readlines())
        # train_tokens = [[i.rstrip() for i in item] for item in train_tokens]
        # train_tokens = [item[i].split() for item in train_tokens for i in range(len(train_tokens[0]))]

        # test_tags = []
        # with open(test_tags, "r") as test_file_tags:
        #     test_tags.append(test_file_tags.readlines())
        # test_tags = [[i.rstrip() for i in item] for item in test_tags]
        # test_tags = [item[i].split() for item in test_tags for i in range(len(test_tags[0]))]

        # test_tokens = []
        # with open(test_tokens, "r") as test_file_tokens:
        #     test_tokens.append(test_file_tokens.readlines())
        # test_tokens = [[i.rstrip() for i in item] for item in test_tokens]
        # test_tokens = [item[i].split() for item in test_tokens for i in range(len(test_tokens[0]))]

        # Create Pandas DataFrames
        df_train = pd.DataFrame(
            {"tokens": train_tokens, "ner_tags_str": train_tags})
        df_test = pd.DataFrame(
            {"tokens": test_tokens, "ner_tags_str": test_tags})

        # Convert to Datasets
        train = Dataset.from_pandas(df_train)
        test = Dataset.from_pandas(df_test)

        # Create DatasetDict
        dataset = DatasetDict(
            {"train": train, "test": test, "validation": test})

        return dataset

    except FileNotFoundError as e:
        logger.error(f"File not found: {e}")
        return None
    except Exception as e:
        logger.error(f"An error occurred: {e}")
        return None

In [None]:
# Label Encoding
def create_label_index(dataset):
    """Creates a mapping from tag names to integer indices."""
    try:
        unique_tags = set()
        for tag in dataset["train"]["ner_tags_str"]:
            unique_tags.update(tag)

        unique_tags = list(set([x[2:] for x in list(unique_tags) if x != "O"]))

        tag2index = {"O": 0}
        for i, tag in enumerate(unique_tags):
            tag2index[f"B-{tag}"] = len(tag2index)
            tag2index[f"I-{tag}"] = len(tag2index)

        index2tag = {v: k for k, v in tag2index.items()}

        return tag2index, index2tag
    except Exception as e:
        logger.error(f"Label and index creation failed: {e}")

In [None]:
dataset = load_and_preprocess_data(
    train_tags, train_tokens, test_tags, test_tokens)

In [None]:
tag2index, index2tag = create_label_index(dataset)

In [None]:
dataset = dataset.map(lambda example: {"ner_tags": [
                      tag2index[tag] for tag in example["ner_tags_str"]]})

In [None]:
# Tokenization
tokenizer = AutoTokenizer.from_pretrained(config["model_checkpoint"])


def tokenize_and_align_labels(examples):
    """Tokenizes the input and aligns labels with the token IDs."""
    try:
        tokenized_inputs = tokenizer(
            examples["tokens"], truncation=True, is_split_into_words=True)

        labels = []
        for i, label in enumerate(examples["ner_tags"]):
            word_ids = tokenized_inputs.word_ids(batch_index=i)
            previous_word_idx = None
            label_ids = []

            for word_idx in word_ids:
                if word_idx is None:
                    label_ids.append(-100)
                elif word_idx != previous_word_idx:
                    label_ids.append(label[word_idx])
                else:
                    label_ids.append(-100)

                previous_word_idx = word_idx
            labels.append(label_ids)
        tokenized_inputs["labels"] = labels
        return tokenized_inputs

    except Exception as e:
        logger.error(f"Tokenizing failed: {e}")

In [None]:
tokenized_dataset = dataset.map(tokenize_and_align_labels, batched=True)

In [None]:
# Data Collator
def create_data_collator(tokenizer):
    """Creates a data collator for the token classification model."""
    return DataCollatorForTokenClassification(tokenizer=tokenizer)

In [None]:
# Metric Calculation
def compute_metrics(eval_preds):
    """Computes evaluation metrics for the token classification model."""
    try:
        metric = evaluate.load("seqeval")
        label_names = list(tag2index)

        logits, labels = eval_preds
        predictions = np.argmax(logits, axis=-1)
        true_labels = [[label_names[l]
                        for l in label if l != -100] for label in labels]
        true_predictions = [[label_names[p] for p, l in zip(prediction, label) if l != -100]
                            for prediction, label in zip(predictions, labels)]

        all_metrics = metric.compute(
            predictions=true_predictions, references=true_labels)

        return {
            "precision": all_metrics["overall_precision"],
            "recall": all_metrics["overall_recall"],
            "f1": all_metrics["overall_f1"],
            "accuracy": all_metrics["overall_accuracy"],
        }
    except Exception as e:
        logger.error(f"Evaluation failed: {e}")

In [None]:
model = AutoModelForTokenClassification.from_pretrained(
    config["model_checkpoint"], id2label=index2tag, label2id=tag2index)

In [None]:
data_collator = create_data_collator(tokenizer=tokenizer)

In [None]:
# Training Arguments
training_args = TrainingArguments(
    output_dir=config["output_dir"],
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    num_train_epochs=1,
    weight_decay=0.01,
    report_to="none"
)

# Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["validation"],
    data_collator=data_collator,
    # compute_metrics=compute_metrics,
    tokenizer=tokenizer
)

trainer.train()

In [None]:
trainer.save_model("path")

In [None]:
def create_pipe(model_path: str):
    """
    Make prediction over custom text.

    Args:
        model_path: Path for trained model.
    """
    try:
        pipe = pipeline("token-classification", model=model_path,
                        aggregation_strategy="simple")
        return pipe
    except Exception as e:
        logger.error(f"Custom prediction failed: {e}")

In [None]:
pipe = create_pipe("path")

In [None]:
pipe("query")