<a href="https://colab.research.google.com/github/Abhi23run/Python/blob/main/finetuning_ner.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
! pip install datasets transformers seqeval



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
%ls

[0m[01;34mdrive[0m/  [01;34msample_data[0m/


In [None]:
%cd /content/drive/MyDrive/Finetuning_NER_DLT_Assignment

/content/drive/MyDrive/Finetuning_NER_DLT_Assignment


In [None]:
%ls

finetuning_ner.ipynb  train.json


In [None]:
import json
import os
import random
import numpy as np
import pandas as pd

In [None]:
import operator
import torch
import copy
import logging
import itertools
import numpy as np

from typing import List, Dict, Tuple, Optional, Union

In [None]:
def span_to_label(labeled_spans: Dict[Tuple[int, int], str], tokens: List[str]) -> List[str]:
    """
    Convert entity spans to labels

    Parameters
    ----------
    labeled_spans: labeled span dictionary: {(start, end): label}
    tokens: a list of tokens, used to check if the spans are valid.

    Returns
    -------
    a list of string labels
    """
    if labeled_spans:
        assert list(labeled_spans.keys())[-1][1] <= len(tokens), ValueError("label spans out of scope!")

    labels = ["O"] * len(tokens)
    for (start, end), label in labeled_spans.items():
        if type(label) == list or type(label) == tuple:
            lb = label[0][0]
        else:
            lb = label
        labels[start] = "B-" + lb
        if end - start > 1:
            labels[start + 1 : end] = ["I-" + lb] * (end - start - 1)

    return labels

In [None]:
def span_list_to_dict(span_list: List[list]) -> Dict[Tuple[int, int], Union[str, tuple]]:
    """
    convert entity label span list to span dictionaries

    Parameters
    ----------
    span_list

    Returns
    -------
    span_dict
    """
    span_dict = dict()
    for span in span_list:
        span_dict[(span[0], span[1])] = span[2]
    return span_dict

In [None]:
def load_data_from_json(file_dir: str):
    """
    Load data stored in the current data format.

    Parameters
    ----------
    file_dir: str
        file directory

    """
    with open(file_dir, "r", encoding="utf-8") as f:
        data_list = json.load(f)

    tk_seqs = list()
    lbs_list = list()

    for inst in data_list:
        # get tokens
        tk_seqs.append(inst["text"])

        # get true labels
        lbs = span_to_label(span_list_to_dict(inst["label"]), inst["text"])
        lbs_list.append(lbs)

    return tk_seqs, lbs_list

In [None]:
%pwd

'/content/drive/MyDrive/Finetuning_NER_DLT_Assignment'

In [None]:
text,labels=load_data_from_json("/content/drive/MyDrive/Finetuning_NER_DLT_Assignment/train.json")

In [None]:
len(text),len(labels)

(1000, 1000)

In [None]:
text[1],labels[1]

(['--', 'Reuter', 'London', 'Newsroom', '+44', '171', '542', '7658'],
 ['O', 'B-ORG', 'I-ORG', 'I-ORG', 'O', 'O', 'O', 'O'])

In [None]:
import transformers

print(transformers.__version__)

4.34.0


In [None]:
from transformers.utils import send_example_telemetry

send_example_telemetry("token_classification_notebook", framework="pytorch")

In [None]:
task = "ner" # Should be one of "ner", "pos" or "chunk"
model_checkpoint = "distilbert-base-uncased"
batch_size = 16

In [None]:
from datasets import load_dataset, load_metric

In [None]:
datasets = load_dataset("conll2003")

In [None]:
datasets

DatasetDict({
    train: Dataset({
        features: ['id', 'tokens', 'pos_tags', 'chunk_tags', 'ner_tags'],
        num_rows: 14041
    })
    validation: Dataset({
        features: ['id', 'tokens', 'pos_tags', 'chunk_tags', 'ner_tags'],
        num_rows: 3250
    })
    test: Dataset({
        features: ['id', 'tokens', 'pos_tags', 'chunk_tags', 'ner_tags'],
        num_rows: 3453
    })
})

In [None]:
datasets["train"][0]

{'id': '0',
 'tokens': ['EU',
  'rejects',
  'German',
  'call',
  'to',
  'boycott',
  'British',
  'lamb',
  '.'],
 'pos_tags': [22, 42, 16, 21, 35, 37, 16, 21, 7],
 'chunk_tags': [11, 21, 11, 12, 21, 22, 11, 12, 0],
 'ner_tags': [3, 0, 7, 0, 0, 0, 7, 0, 0]}

In [None]:
datasets["train"].features['ner_tags']

Sequence(feature=ClassLabel(names=['O', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC', 'B-MISC', 'I-MISC'], id=None), length=-1, id=None)

In [None]:
def pack_instances(**kwargs) -> list[dict]:
    """
    Convert attribute lists to a list of data instances, each is a dict with attribute names as keys
    and one datapoint attribute values as values
    """

    instance_list = list()
    keys = tuple(kwargs.keys())
    # print(keys)

    for inst_attrs in zip(*tuple(kwargs.values())):
        inst = dict(zip(keys, inst_attrs))
        instance_list.append(inst)

    return instance_list

In [None]:
def unpack_instances(instance_list: list[dict], attr_names: Optional[list[str]] = None):
    """
    Convert a list of dict-type instances to a list of value lists,
    each contains all values within a batch of each attribute

    Parameters
    ----------
    instance_list: list[dict]
        a list of attributes
    attr_names: list[str], optional
        the name of the needed attributes. Notice that this variable should be specified
        for Python versions that does not natively support ordered dict
    """
    if not attr_names:
        attr_names = list(instance_list[0].keys())
    attribute_tuple = [[inst[name] for inst in instance_list] for name in attr_names]

    return attribute_tuple

In [None]:
from transformers import AutoTokenizer

In [None]:
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased", add_prefix_space=True)

In [None]:
tokenized_text = tokenizer(text, add_special_tokens=True, is_split_into_words=True)

In [None]:
tokenizer.decode(tokenizer(text[2], add_special_tokens=True, is_split_into_words=True)['input_ids'])

'[CLS] bugno tested positive for the banned hormone after the fifth stage of the tour, in which he finished third overall. [SEP]'

In [167]:
lb2idx={'O':1,'B-PER':1,'I-PER':2,'B-ORG':3,'I-ORG':4,'B-LOC':5,'I-LOC':6,'B-MISC':7,"I-MISC":8}

In [168]:
bert_lbs_list = list()

In [169]:
for i,j in enumerate(tokenized_text['input_ids']):
    word_ids=(tokenized_text.word_ids(batch_index=i))
    prev_word_id=None
    bert_lbs_list_i=[]
    for word_id in word_ids:
      if word_id is None:
        bert_lbs_list_i.append(-100)
      elif word_id != prev_word_id:
        bert_lbs_list_i.append(lb2idx[labels[i][word_id]])
      else:
        bert_lbs_list_i.append(-100)

      prev_word_id=word_id
    bert_lbs_list.append(bert_lbs_list_i)

In [170]:
len(bert_lbs_list),len(tokenized_text['input_ids']),len(tokenized_text['attention_mask'])

(1000, 1000, 1000)

In [171]:
print("1st input id's")
print(tokenized_text['input_ids'][0])
print('-'*20)
print("1st attention masks")
print(tokenized_text['attention_mask'][0])
print('-'*20)
print("1st data point labels")
print(bert_lbs_list[0])

1st input id's
[101, 13144, 7460, 2000, 2068, 1012, 102]
--------------------
1st attention masks
[1, 1, 1, 1, 1, 1, 1]
--------------------
1st data point labels
[-100, 1, 1, 1, 1, 1, -100]


In [172]:
data_instances = pack_instances(
            bert_tk_ids=tokenized_text['input_ids'],
            bert_attn_masks=tokenized_text['attention_mask'],
            bert_lbs=bert_lbs_list
        )

In [173]:
(data_instances)[0]

{'bert_tk_ids': [101, 13144, 7460, 2000, 2068, 1012, 102],
 'bert_attn_masks': [1, 1, 1, 1, 1, 1, 1],
 'bert_lbs': [-100, 1, 1, 1, 1, 1, -100]}

In [174]:
data_instances[1]

{'bert_tk_ids': [101,
  1011,
  1011,
  2128,
  19901,
  2414,
  2739,
  9954,
  1009,
  4008,
  18225,
  5139,
  2475,
  6146,
  27814,
  102],
 'bert_attn_masks': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
 'bert_lbs': [-100,
  1,
  -100,
  3,
  -100,
  4,
  4,
  -100,
  1,
  -100,
  1,
  1,
  -100,
  1,
  -100,
  -100]}

In [175]:
def test_fn(**kwargs):
  print(tuple(kwargs.keys()))
  print(len(tuple(kwargs.values())))

In [176]:
test_fn(bert_tk_ids=tokenized_text['input_ids'],
            bert_attn_masks=tokenized_text['attention_mask'],
            bert_lbs=bert_lbs_list)

('bert_tk_ids', 'bert_attn_masks', 'bert_lbs')
3


In [177]:
tk_ids, attn_masks, lbs = unpack_instances(data_instances, ["bert_tk_ids", "bert_attn_masks", "bert_lbs"])

In [178]:
batch_size=16

In [179]:
tk_ids=tk_ids[:16]
attn_masks=attn_masks[:16]
lbs=lbs[:16]

In [180]:
max_batch_length=max([len(i) for i in tk_ids])

In [181]:
tk_ids = tokenizer.pad({'input_ids': tk_ids}, padding='longest', return_tensors="pt")["input_ids"].to(torch.int64)

In [182]:
attn_masks = tokenizer.pad({'input_ids': attn_masks}, padding='longest', return_tensors="pt")["input_ids"].to(torch.int64)

In [183]:
lbs=torch.tensor([i+[collate_fn.label_pad_token_id]*(max_batch_length-len(i)) for i in lbs]).to(torch.int64)

In [185]:
lbs[1]

tensor([-100,    1, -100,    3, -100,    4,    4, -100,    1, -100,    1,    1,
        -100,    1, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100,
        -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100,
        -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100,
        -100])

In [None]:
collate_fn.label_pad_token_id

-100

In [None]:
from transformers import DataCollatorForTokenClassification

In [None]:
collate_fn = DataCollatorForTokenClassification(tokenizer)

In [None]:
collate_fn(data_instances[:2])

You're using a DistilBertTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


ValueError: ignored

In [None]:
def pack_instances(**kwargs) -> list[dict]:
    """
    Convert attribute lists to a list of data instances, each is a dict with attribute names as keys
    and one datapoint attribute values as values
    """

    instance_list = list()
    keys = tuple(kwargs.keys())
    # print(keys)

    for inst_attrs in zip(*tuple(kwargs.values())):
        inst = dict(zip(keys, inst_attrs))
        instance_list.append(inst)

    return instance_list

In [None]:
input_dict={'input_ids': [[101, 2079, 2025, 19960, 10362, 1999, 1996, 3821, 1997, 16657, 1010, 2005, 2027, 2024, 11259, 1998, 4248, 2000, 4963, 1012, 102],[101,10100,102]],
 'token_type_ids': [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],[0,0,0]],
 'attention_mask': [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],[1,1,1]]}

tk_ids, attn_masks, lbs = unpack_instances(pack_instances(**input_dict),attr_names=['input_ids','token_type_ids','attention_mask'])

# Padding using the tokenizer's pad method
padded= tokenizer.pad([{"input_ids": tk_ids}],
                              padding="longest",
                              return_tensors="pt")

# Getting the padded sequences
tk_ids = padded["input_ids"].to(torch.int64)
# attn_masks = padded["attention_mask"].to(torch.int64)
# lbs = padded["labels"].to(torch.int64)