In [None]:
import os

import pandas as pd

In [None]:
from google.colab import drive

drive.mount("./drive")

In [None]:
os.chdir(
    "/content/drive/Shareddrives/GOV.UK teams/2020-2021/Data labs/govNER_ContentMetadata"
)

In [None]:
pd.read_csv("./Data/IntermediateData/test.csv").head()

In [None]:
! pip install datasets transformers seqeval

In [None]:
import transformers

print(transformers.__version__)

In [None]:
task = "ner"  # Should be one of "ner", "pos" or "chunk"
dataset_name = "conll2003"
model_checkpoint = "distilbert-base-uncased"
batch_size = 16

In [None]:
from datasets import load_dataset, load_metric

In [None]:
datasets = load_dataset(dataset_name)

In [None]:
datasets

In [None]:
datasets["train"][0]

In [None]:
datasets["train"].features[f"ner_tags"]

In [None]:
label_list = datasets["train"].features[f"{task}_tags"].feature.names
label_list

In [None]:
import random

import pandas as pd
from datasets import ClassLabel, Sequence
from IPython.display import HTML, display


def show_random_elements(dataset, num_examples=10):
    assert num_examples <= len(
        dataset
    ), "Can't pick more elements than there are in the dataset."
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset) - 1)
        while pick in picks:
            pick = random.randint(0, len(dataset) - 1)
        picks.append(pick)

    df = pd.DataFrame(dataset[picks])
    for column, typ in dataset.features.items():
        if isinstance(typ, ClassLabel):
            df[column] = df[column].transform(lambda i: typ.names[i])
        elif isinstance(typ, Sequence) and isinstance(typ.feature, ClassLabel):
            df[column] = df[column].transform(
                lambda x: [typ.feature.names[i] for i in x]
            )
    display(HTML(df.to_html()))

In [None]:
show_random_elements(datasets["train"])

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

In [None]:
import transformers

assert isinstance(tokenizer, transformers.PreTrainedTokenizerFast)

In [None]:
tokenizer("Hello, this is one sentence!")

In [None]:
tokenizer(
    ["Hello", ",", "this", "is", "one", "sentence", "split", "into", "words", "."],
    is_split_into_words=True,
)

In [None]:
example = datasets["train"][4]
print(example["tokens"])

In [None]:
tokenized_input = tokenizer(example["tokens"], is_split_into_words=True)
tokens = tokenizer.convert_ids_to_tokens(tokenized_input["input_ids"])
print(tokens)

In [None]:
len(example[f"{task}_tags"]), len(tokenized_input["input_ids"])

In [None]:
print(tokenized_input.word_ids())

In [None]:
word_ids = tokenized_input.word_ids()
aligned_labels = [-100 if i is None else example[f"{task}_tags"][i] for i in word_ids]
print(len(aligned_labels), len(tokenized_input["input_ids"]))

In [None]:
label_all_tokens = True

In [None]:
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["tokens"], truncation=True, is_split_into_words=True
    )

    labels = []
    for i, label in enumerate(examples[f"{task}_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            # Special tokens have a word id that is None. We set the label to -100 so they are automatically
            # ignored in the loss function.
            if word_idx is None:
                label_ids.append(-100)
            # We set the label for the first token of each word.
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            # For the other tokens in a word, we set the label to either the current label or -100, depending on
            # the label_all_tokens flag.
            else:
                label_ids.append(label[word_idx] if label_all_tokens else -100)
            previous_word_idx = word_idx

        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [None]:
tokenize_and_align_labels(datasets["train"][:5])

In [None]:
tokenized_datasets = datasets.map(tokenize_and_align_labels, batched=True)

In [None]:
from transformers import AutoModelForTokenClassification, Trainer, TrainingArguments

model = AutoModelForTokenClassification.from_pretrained(
    model_checkpoint, num_labels=len(label_list)
)

In [None]:
model_name = model_checkpoint.split("/")[-1]
args = TrainingArguments(
    output_dir="./Models/" f"{model_name}-finetuned-{task}-{dataset_name}",
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=3,
    weight_decay=0.01,
    push_to_hub=False,
)

In [None]:
from transformers import DataCollatorForTokenClassification

data_collator = DataCollatorForTokenClassification(tokenizer)

In [None]:
metric = load_metric("seqeval")

In [None]:
labels = [label_list[i] for i in example[f"{task}_tags"]]
metric.compute(predictions=[labels], references=[labels])

In [None]:
import numpy as np


def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    # Remove ignored index (special tokens)
    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

In [None]:
trainer = Trainer(
    model,
    args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
    data_collator=data_collator,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

In [None]:
trainer.train()

In [None]:
tokenizer.save_pretrained(f"./Models/{model_name}-finetuned-{task}-{dataset_name}")

# Load Model for Inference

Load model from local


In [None]:
import torch
from transformers import AutoModelForTokenClassification, AutoTokenizer

In [None]:
model_name = model_checkpoint.split("/")[-1]

In [None]:
checkpoint = f"./Models/{model_name}-finetuned-{task}-{dataset_name}/checkpoint-2500"
local_tokenizer = AutoTokenizer.from_pretrained(checkpoint)
local_model = AutoModelForTokenClassification.from_pretrained(checkpoint)

In [None]:
sequences = ["my name is rory"]

In [None]:
processed_tokens = local_tokenizer(
    sequences, padding=True, truncation=True, return_tensors="pt"
)

In [None]:
processed_tokens

In [None]:
output = local_model(**processed_tokens)

In [None]:
output

In [None]:
print(output.logits)

In [None]:
from transformers import pipeline

# Replace this with your own checkpoint
token_classifier = pipeline(
    "token-classification", model=checkpoint, aggregation_strategy="simple"
)

In [None]:
string = "The show is on the Disney Channel. It airs at 8pm. It will be shown in spanish and english."
print(string)
print(len(string))

In [None]:
result = token_classifier(
    "The show is on the Disney Channel. It airs at 8pm. It will be shown in spanish and english."
)

In [None]:
result

In [None]:
label_list

## Visualise Entites

In [None]:
import spacy
from spacy import displacy

text = "Hi my name is Rory Hurley. I work for the Cabinet Office. I speak english and a little bit of spanish"

nlp = spacy.load("en_core_web_sm")
doc = nlp(text)
displacy.render(doc, style="ent", jupyter=True, options={"distance": 90})

In [None]:
text = "My name is John Smith and I live in Paris"
entities = [
    ("Employee", 11, 21),  # John Smith
    ("Location", 36, 41),  # Paris
]

In [None]:
import spacy


def display_entities(text, entities):
    nlp = spacy.blank("en")
    doc = nlp(text)
    ents = []
    for ee in entities:
        ents.append(doc.char_span(ee[1], ee[2], ee[0]))
    doc.ents = ents
    displacy.render(doc, style="ent", jupyter=True, options={"distance": 90})


def tokenise_and_display(text):
    result = token_classifier(text)
    print(result)
    res_ents = [(i["entity_group"], i["start"], i["end"]) for i in result]
    print(res_ents)
    display_entities(text, entities=res_ents)

In [None]:
display_entities(text, entities)

In [None]:
result[:2]

In [None]:
res_ents = [(i["entity_group"], i["start"], i["end"]) for i in result]
res_ents

In [None]:
display_entities(string, res_ents)

In [None]:
tokenise_and_display(
    "Hi my name is Rory Hurley. I work for the Google. I speak english and a little bit of spanish"
)