In [None]:
!pip install transformers
!pip install datasets
!pip install seqeval

In [None]:
import os
import itertools
import pandas as pd
import numpy as np
from transformers import AutoTokenizer
from transformers import AutoModelForTokenClassification, TrainingArguments, Trainer
from transformers import DataCollatorForTokenClassification
import torch
from datasets import Dataset
from datasets import load_metric
import warnings
import spacy
from spacy.training import offsets_to_biluo_tags

In [None]:
!pip install spacy

warnings.filterwarnings('ignore')

data = pd.read_json(path_or_buf='admin.jsonl', lines=True)
cls = spacy.util.get_lang_class('en') 
nlp = cls()

tags_list = []
tokens_list = []

for i in range(len(data)):
    doc = nlp(data['text'][i])
    doc_tokens= []
    for d in range(len(doc)):
        doc_tokens.append(str(doc[d]))
    tokens_list.append(doc_tokens)
    entities = data['label'][i]
    tags = offsets_to_biluo_tags(doc, entities)
    tags_list.append(tags)

data['tokens'] = tokens_list
data['ner_tags'] = tags_list

data = data.drop(['id', 'label', 'text'], axis=1)

In [None]:
train_percent = 0.8
train_size = int(train_percent*len(data))
train_df = data[:train_size]
test_df = data[train_size:]

print("FULL Dataset: {}".format(len(data)))
print("TRAIN Dataset: {}".format(len(train_df)))
print("TEST Dataset: {}".format(len(test_df)))

In [None]:
label_list = ['B-code', 'I-code', 'L-code', 'U-code', 'B-address', 'I-address', 'L-address', 'U-address', 
                 'B-event', 'I-event', 'L-event', 'U-event', 'B-name', 'I-name', 'L-name', 'U-name', 'O', '-']

label_encoding_dict = {'B-code': 0, 'I-code': 1, 'L-code': 2, 'U-code': 3, 'B-address': 4, 'I-address': 5, 'L-address': 6, 
                       'U-address': 7, 'B-event': 8, 'I-event': 9, 'L-event': 10, 'U-event': 11, 'B-name': 12, 'I-name': 13,
                       'L-name': 14, 'U-name': 15, 'O': 16, '-': 17}

task = "ner" 
model_checkpoint = "xlnet-base-cased"
batch_size = 16
    
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, add_prefix_space=True)

def get_all_tokens_and_ner_tags(directory):
    return pd.concat([get_tokens_and_ner_tags(os.path.join(directory, filename)) for filename in os.listdir(directory)]).reset_index().drop('index', axis=1)
    
def get_tokens_and_ner_tags(filename):
    with open(filename, 'r', encoding="utf8") as f:
        lines = f.readlines()
        split_list = [list(y) for x, y in itertools.groupby(lines, lambda z: z == '\n') if not x]
        tokens = [[x.split('\t')[0] for x in y] for y in split_list]
        entities = [[x.split('\t')[1][:-1] for x in y] for y in split_list] 
    return pd.DataFrame({'tokens': tokens, 'ner_tags': entities})
  
train_dataset = Dataset.from_pandas(train_df)
test_dataset = Dataset.from_pandas(test_df)

In [None]:
def tokenize_and_align_labels(examples):
    label_all_tokens = True
    tokenized_inputs = tokenizer(list(examples["tokens"]), truncation=True, is_split_into_words=True)

    labels = []
    for i, label in enumerate(examples[f"{task}_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif label[word_idx] == '0':
                label_ids.append(0)
            elif word_idx != previous_word_idx:
                label_ids.append(label_encoding_dict[label[word_idx]])
            else:
                label_ids.append(label_encoding_dict[label[word_idx]] if label_all_tokens else -100)
            previous_word_idx = word_idx
        labels.append(label_ids)
        
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

train_tokenized_datasets = train_dataset.map(tokenize_and_align_labels, batched=True)
train_tokenized_datasets = train_tokenized_datasets.remove_columns(['tokens', 'ner_tags'])
test_tokenized_datasets = test_dataset.map(tokenize_and_align_labels, batched=True)
test_tokenized_datasets = test_tokenized_datasets.remove_columns(['tokens', 'ner_tags'])

Run model without pre-training

In [None]:
model = AutoModelForTokenClassification.from_pretrained(model_checkpoint, num_labels=len(label_list))

args = TrainingArguments(
    f"test-{task}",
    evaluation_strategy = "epoch",
    logging_strategy="epoch",
    learning_rate=1e-4,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=8,
    weight_decay=1e-5,
)

data_collator = DataCollatorForTokenClassification(tokenizer)
metric = load_metric("seqeval")

def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    true_predictions = [[label_list[p] for (p, l) in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels)]
    true_labels = [[label_list[l] for (p, l) in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels)]

    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {"precision": results["overall_precision"], "recall": results["overall_recall"], "f1": results["overall_f1"], "accuracy": results["overall_accuracy"], 
            "address-precision": results["address"]["precision"], "address-recall": results["address"]["recall"], "address-f1": results["address"]["f1"], "address-number": results["address"]["number"], 
            "name-precision": results["name"]["precision"], "name-recall": results["name"]["recall"], "name-f1": results["name"]["f1"], "name-number": results["name"]["number"],  
            "event-precision": results["event"]["precision"], "event-recall": results["event"]["recall"], "event-f1": results["event"]["f1"], "event-number": results["event"]["number"], 
            "code-precision": results["code"]["precision"], "code-recall": results["code"]["recall"], "code-f1": results["code"]["f1"], "code-number": results["code"]["number"]}

trainer = Trainer(
    model,
    args,
    train_dataset=train_tokenized_datasets,
    eval_dataset=test_tokenized_datasets,
    data_collator=data_collator,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

trainer.train()
trainer.evaluate()

In [None]:
from datasets import load_dataset

dataset = load_dataset("conll2003")

def tokenize_and_align_labels_pretraining(examples):
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)

    labels = []
    for i, label in enumerate(examples[f"ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)  # Map tokens to their respective word.
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:  # Set the special tokens to -100.
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:  # Only label the first token of a given word.
                label_ids.append(label[word_idx])
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx
        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

inputs = tokenizer(dataset["train"][0]["tokens"], is_split_into_words=True)

tokenized_dataset= dataset.map(
    tokenize_and_align_labels_pretraining,
    batched=True,
    remove_columns=dataset["train"].column_names
)

In [None]:
from transformers import DataCollatorForTokenClassification

data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

In [None]:
from transformers import AutoModelForTokenClassification, TrainingArguments, Trainer

label_names = dataset["train"].features["ner_tags"].feature.names
id2label = {str(i): label for i, label in enumerate(label_names)}
label2id = {v: k for k, v in id2label.items()}

label_names = dataset["train"].features["ner_tags"].feature.names
model = AutoModelForTokenClassification.from_pretrained(model_checkpoint, id2label=id2label, label2id=label2id)


Pre-train

In [None]:
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["validation"],
    tokenizer=tokenizer,
    data_collator=data_collator,
)

trainer.train()
trainer.save_model('./xlnet_ner_pt') #For reuse

Run model with pre-training

In [None]:
model = AutoModelForTokenClassification.from_pretrained('xlnet_ner_pt', num_labels=len(label_list), ignore_mismatched_sizes=True)

args = TrainingArguments(
    f"test-{task}",
    evaluation_strategy = "epoch",
    logging_strategy="epoch",
    learning_rate=1e-4,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=5,
    weight_decay=1e-5,
)

data_collator = DataCollatorForTokenClassification(tokenizer)
metric = load_metric("seqeval")

def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    true_predictions = [[label_list[p] for (p, l) in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels)]
    true_labels = [[label_list[l] for (p, l) in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels)]

    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {"precision": results["overall_precision"], "recall": results["overall_recall"], "f1": results["overall_f1"], "accuracy": results["overall_accuracy"], 
            "address-precision": results["address"]["precision"], "address-recall": results["address"]["recall"], "address-f1": results["address"]["f1"], "address-number": results["address"]["number"], 
            "name-precision": results["name"]["precision"], "name-recall": results["name"]["recall"], "name-f1": results["name"]["f1"], "name-number": results["name"]["number"],  
            "event-precision": results["event"]["precision"], "event-recall": results["event"]["recall"], "event-f1": results["event"]["f1"], "event-number": results["event"]["number"], 
            "code-precision": results["code"]["precision"], "code-recall": results["code"]["recall"], "code-f1": results["code"]["f1"], "code-number": results["code"]["number"]}
trainer = Trainer(
    model,
    args,
    train_dataset=train_tokenized_datasets,
    eval_dataset=test_tokenized_datasets,
    data_collator=data_collator,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

trainer.train()
trainer.evaluate()