In [4]:
import os
import json
import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModelForTokenClassification, Trainer, TrainingArguments, AdamW
from datasets import Dataset, ClassLabel
from seqeval.metrics import f1_score, precision_score, recall_score
from seqeval.scheme import IOB2
from torch.nn import CrossEntropyLoss
from sklearn.metrics import accuracy_score, precision_recall_fscore_support


# Function to generate BIO tags with label validation
def generate_bio_tags(text, labels, tokenizer):
    tokenized_text = tokenizer(text)
    tokens = tokenized_text["input_ids"]
    bio_tags = ["O"] * len(tokens)

    token_offsets = tokenizer.encode_plus(
        text,
        return_offsets_mapping=True
    )["offset_mapping"]

    # Loop over entity labels to generate BIO tags
    for entity in labels:
        start = entity["start"]
        end = entity["end"]
        entity_label = entity["labels"][0]

        # Apply BIO tagging
        start_idx = None
        for i, offset in enumerate(token_offsets):
            if offset[0] >= start and start_idx is None:
                bio_tags[i] = "B-" + entity_label  # Apply "B-" prefix
                start_idx = i
            elif start_idx is not None and offset[0] < end:
                bio_tags[i] = "I-" + entity_label  # Apply "I-" prefix
            elif start_idx is not None and offset[0] >= end:
                break

    return tokens, bio_tags


def chunk_tokens_and_labels(tokenized_text, bio_tags, chunk_size, tokenizer):
    # Lists to store chunks of text and labels
    text_chunks = []
    label_chunks = []

    padding_token = tokenizer.pad_token_id  # The token used for padding
    padding_label = "O"  # Default label for padding

    # Split the tokenized text and bio_tags into chunks
    start_idx = 0

    while start_idx < len(tokenized_text):
        end_idx = min(start_idx + chunk_size, len(tokenized_text))

        # Adjust end_idx to avoid splitting entities
        if end_idx < len(tokenized_text):
            while end_idx > start_idx and bio_tags[end_idx].startswith("I-"):
                end_idx -= 1

        # Get the chunked text and labels
        chunk_text = tokenized_text[start_idx:end_idx]
        chunk_labels = bio_tags[start_idx:end_idx]

        # Apply padding if the chunk is shorter than 128 tokens
        if len(chunk_text) < chunk_size:
            padding_length = chunk_size - len(chunk_text)
            chunk_text += [padding_token] * padding_length
            chunk_labels += [padding_label] * padding_length

        text_chunks.append(chunk_text)
        label_chunks.append(chunk_labels)

        # Move the start index for the next chunk
        start_idx = end_idx

    return text_chunks, label_chunks


def prepare_sentences(json_data, tokenizer):
    sentences = []

    for item in json_data:
        file_path = item["text"]  # The file path to the text file
        if "label" in item:
            labels = item["label"]
        else:
            print(f"Missing 'label' key ")
            continue

        with open(f'balance_context/{file_path}', 'r') as txt_file:
            text = txt_file.read()

        # Ensure labels are correctly formatted
        tokens, bio_tags = generate_bio_tags(text, labels, tokenizer)
        text_rows, label_rows = chunk_tokens_and_labels(tokens, bio_tags, 128, tokenizer)

        for text_row, label_row in zip(text_rows, label_rows):
            sentence = {'text': tokenizer.decode(text_row), 'labels': label_row}
            sentences.append(sentence)

    return sentences


def encode_labels(labels, num_classes, device):
    encoded_labels = torch.zeros(labels.size(0), labels.size(1), num_classes, device=device)
    for i in range(labels.size(0)):
        for j in range(labels.size(1)):
            encoded_labels[i, j, int(labels[i, j])] = 1
    return encoded_labels


def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)

    # Convert predicted labels to multi-class format if they are one-hot encoded
    if len(preds.shape) > 1:
        preds = preds.argmax(axis=1)

    # Ensure labels are in a multi-class format
    if len(labels.shape) > 1:
        labels = labels.argmax(axis=1)

    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average="weighted", zero_division=1)

    acc = accuracy_score(labels, preds)
    return {
        "accuracy": acc,
        "f1": f1,
        "precision": precision,
        "recall": recall
    }
classmap = ClassLabel(
    num_classes=11,
    names=[
        'B-ASSET', 'I-ASSET', 'B-DEBT', 'I-DEBT', 'B-EQUITY', 'I-EQUITY',
        'B-DEPOSIT', 'I-DEPOSIT', 'B-MISC', 'I-MISC', 'O'
    ]
)
id2label = {i: classmap.int2str(i) for i in range(classmap.num_classes)}
label2id = {classmap.str2int(c): c for c in classmap.names}


def model_init():
    torch.cuda.empty_cache()
    return AutoModelForTokenClassification.from_pretrained(
        "KB/bert-base-swedish-cased-ner",
        id2label=id2label,
        label2id=label2id,
        finetuning_task="ner",
        ignore_mismatched_sizes=True,
        return_dict=True
    )


# Load the tokenizer
tokenizer = AutoTokenizer.from_pretrained("KB/bert-base-swedish-cased-ner")


# Modify the CustomTrainer class to use one-hot encoded labels



# Define training arguments
training_args = TrainingArguments(
    output_dir="models/ner_model_output1",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    save_steps=300,
    eval_steps=300,
    learning_rate=2e-5,  # Consider experimenting with this value
    num_train_epochs=5,  # Adjust based on performance
    per_device_train_batch_size=32,  # Test smaller or larger sizes
    warmup_steps=100,  # Adjust based on total number of steps
    weight_decay=0.01,  # Increase if overfitting is observed
    logging_steps=50,  # More frequent logs can help monitor training
    load_best_model_at_end=True  # Load the best model at the end based on validation loss
    # lr_scheduler_type='linear'  # Apply a learning rate scheduler
)
# Define the class mapping with accurate labels

# Create id2label and label2id mappings based on classmap


# Load the model with correct classifier initialization

device = torch.device("cuda")
# model.to(device)

# Load the JSON data
with open('context_balance.json', 'r') as f:
    json_data = json.load(f)

# Prepare sentences
sentences = prepare_sentences(json_data, tokenizer)

# Create a dataset from the collected sentences
ds = Dataset.from_pandas(pd.DataFrame(data=sentences))
ds.set_format("torch")
datasetDict = ds.train_test_split(test_size=0.4)

# Tokenize the text with proper truncation and padding
ds_train = datasetDict.map(lambda x: tokenizer(x["text"], truncation=True, max_length=128, padding='max_length'))
# Use the classmap to convert label names to class indices
ds_train = ds_train.map(lambda y: {"labels": classmap.str2int(y["labels"])})

from transformers import DataCollatorForTokenClassification

data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)
import optuna

def optuna_hp_space(trial):
    return {
        "learning_rate": trial.suggest_float("learning_rate", 1e-6, 1e-4, log=True),
        "per_device_train_batch_size": trial.suggest_categorical("per_device_train_batch_size", [8, 16, 32, 64, 128]),
        "weight_decay": trial.suggest_float("weight_decay", 0.0, 0.3),
        "warmup_steps": trial.suggest_int("warmup_steps", 0, 1000),
        "num_train_epochs": trial.suggest_int("num_train_epochs", 1, 10),
        "lr_scheduler_type": trial.suggest_categorical("lr_scheduler_type", ["linear", "cosine", "polynomial"]),
        # Add more parameters as needed
    }

# Set up the Trainer
trainer = Trainer(
    model_init=model_init,
    args=training_args,
    train_dataset=ds_train['train'],
    eval_dataset=ds_train['test'],
    tokenizer=tokenizer,
    compute_metrics = compute_metrics
)


import warnings

warnings.filterwarnings("ignore")
#best_trial = trainer.hyperparameter_search(
#    direction="maximize",
#    backend="optuna",
#    hp_space=optuna_hp_space,
#    n_trials=20
#)

# Train the model
#trainer.train()




Missing 'label' key 
Missing 'label' key 


Map:   0%|          | 0/442 [00:00<?, ? examples/s]

Map:   0%|          | 0/295 [00:00<?, ? examples/s]

Map:   0%|          | 0/442 [00:00<?, ? examples/s]

Map:   0%|          | 0/295 [00:00<?, ? examples/s]

dataloader_config = DataLoaderConfiguration(dispatch_batches=None, split_batches=False, even_batches=True, use_seedable_sampler=True)
Some weights of the model checkpoint at KB/bert-base-swedish-cased-ner were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForTokenClassification were not initialized from the model checkpoint at KB/bert-base-swedish-cased-ner and are newly initialized because the shapes did not match:
- classif

In [3]:
print(best_trial.hyperparameters)

torch.cuda.empty_cache()

NameError: name 'best_trial' is not defined

In [8]:

# Define training arguments
training_args = TrainingArguments(
    output_dir="models/ner_model_output",
    learning_rate=5.799352882984339e-05,
    per_device_train_batch_size=8,
    weight_decay= 0.16,
    warmup_steps= 100,
    num_train_epochs=3,
    evaluation_strategy="epoch",
    save_strategy="epoch",
      # Increase if overfitting is observed
    logging_steps=50,  # More frequent logs can help monitor training
    load_best_model_at_end=True,
    #**best_trial.hyperparameters
)


# Set up the Trainer
trainer = Trainer(
    model_init=model_init,
    args=training_args,
    train_dataset=ds_train['train'],
    eval_dataset=ds_train['test'],
    tokenizer=tokenizer,
    compute_metrics = compute_metrics
)


import warnings

warnings.filterwarnings("ignore")

trainer.train()




huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Some weights of the model checkpoint at KB/bert-base-swedish-cased-ner were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForTokenClassification w

Epoch,Training Loss,Validation Loss,Accuracy,F1,Precision,Recall
1,No log,0.481517,0.938983,0.909435,0.942706,0.938983
2,1.070700,0.175707,0.938983,0.909435,0.942706,0.938983
3,1.070700,0.069709,0.915254,0.927453,0.958914,0.915254


TrainOutput(global_step=84, training_loss=0.6977024248668126, metrics={'train_runtime': 46.1007, 'train_samples_per_second': 28.763, 'train_steps_per_second': 1.822, 'total_flos': 86626922982912.0, 'train_loss': 0.6977024248668126, 'epoch': 3.0})

In [9]:
trainer.save_model("models/random")


In [None]:
# testing on a test set. 

In [10]:
import torch
from transformers import AutoTokenizer, AutoModelForTokenClassification
from transformers import pipeline

# Load the saved model and tokenizer
model_path = "models/random"
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForTokenClassification.from_pretrained(model_path)
NER = pipeline('ner', model= model, tokenizer = tokenizer)

# Function to make predictions on new text
def predict(text):
    # Tokenize the text
    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=128, padding="max_length")
    # Forward pass through the model
    with torch.no_grad():
        outputs = model(**inputs)
    # Get the predicted labels
    predicted_labels = torch.argmax(outputs.logits, dim=2)
    # Decode the labels
    decoded_labels = [model.config.id2label[label.item()] for label in predicted_labels[0]]
    # Return the tokenized text and predicted labels
    return tokenizer.decode(inputs.input_ids[0]), decoded_labels

# Example text for prediction
text_to_predict = """Vintergatan Film TV AB 2018-06-30
Org nr 556267-2658
BALANSRÄKNING
Not 2018-06-302017-06-30
EGET KAPITAL OCH SKULDER
Eget kapital 2
Bundet eget kapital
Aktiekapital (1000 st å 50kr) 105000 105000
Reservfond 21000 21000
Summa bundet eget kapital 126000 126000
Fritt eget kapital
Balanserat resultat 322680 145808
Årets resultat -71881 176872
Summa fritt eget kapital 250799 322680
summa eget kapital 376799 448680
Periodiseringsfond 73000 73000 |
Kortfristiga skulder
Leverantörsskulder 12507 22556
Skatteskulder 0 22093
Övriga kortfristiga skulder 113193 102553
Upplupna kostnader och förutbetalda intäkter 35000 134398
Summa kortfristiga skulder 160700 281600
SUMMA EGET KAPITAL OCH SKULDER 610499 803280
STÄLLDA SÄKERHETER OCH ANSVARSFÖRBINDELSER
ställda säkerheter Inga
Ansvarsförbindelser Inga
Na |


"""

# Make prediction
#tokenized_text, predicted_labels = predict(text_to_predict)
#print("Tokenized text:", tokenized_text)
#print("Predicted labels:", predicted_labels)
l = []
for token in NER(text_to_predict):
    if token['word'].startswith('##'):
        l[-1]['word'] += token['word'][2:]
    else:
        l += [ token ]




def print_ner_results(text, predictions):
    last_end = 0
    result_text = ""
    # Sort predictions by the start position
    predictions.sort(key=lambda x: x['start'])
    
    for prediction in predictions:
        start, end = prediction['start'], prediction['end']
        entity = prediction['entity']
        word = text[start:end]
        
        # Append text from last entity to current entity start
        result_text += text[last_end:start]
        
        # Apply color coding to entity
        if 'ASSET' in entity:
            # Green for ASSETS
            result_text += f"\033[92m{word}\033[0m"
        elif 'EQUITY' in entity:
            # Blue for EQUITY
            result_text += f"\033[94m{word}\033[0m"
        else:
            # Yellow for others
            result_text += f"\033[93m{word}\033[0m"
        
        last_end = end
    
    # Append any remaining text after the last entity
    result_text += text[last_end:]
    
    print(result_text)

# Example usage
print(NER(text_to_predict))
print_ner_results(text_to_predict, NER(text_to_predict))



Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


[{'entity': 'B-EQUITY', 'score': 0.9469383, 'index': 53, 'word': 'kapital', 'start': 144, 'end': 151}, {'entity': 'I-EQUITY', 'score': 0.93538254, 'index': 54, 'word': 'Aktie', 'start': 152, 'end': 157}, {'entity': 'B-EQUITY', 'score': 0.85348624, 'index': 66, 'word': '##000', 'start': 192, 'end': 195}, {'entity': 'I-EQUITY', 'score': 0.72364104, 'index': 67, 'word': 'Reserv', 'start': 196, 'end': 202}, {'entity': 'B-EQUITY', 'score': 0.9704223, 'index': 85, 'word': 'kapital', 'start': 270, 'end': 277}, {'entity': 'I-EQUITY', 'score': 0.9526658, 'index': 86, 'word': 'Bal', 'start': 278, 'end': 281}, {'entity': 'I-EQUITY', 'score': 0.97080964, 'index': 87, 'word': '##anser', 'start': 281, 'end': 286}, {'entity': 'I-EQUITY', 'score': 0.9634478, 'index': 88, 'word': '##at', 'start': 286, 'end': 288}, {'entity': 'B-EQUITY', 'score': 0.89252377, 'index': 95, 'word': '##8', 'start': 310, 'end': 311}, {'entity': 'I-EQUITY', 'score': 0.9647261, 'index': 96, 'word': 'Årets', 'start': 312, 'end'