In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
def adjust_indices(data_list):
    for data in data_list:
        # Extract the text from the current data dictionary and remove additional trailing spaces
        text = data['data']['text'].strip()

        # Iterate through all annotations
        for annotation in data['annotations']:
            # Extract results from the current annotation
            results = annotation['result']
            adjusted_results = []

            for result in results:
                value = result['value']
                original_text = value['text'].strip()  # Remove trailing spaces from original_text

                # Find the start index of the original text in the data text
                start_index = text.find(original_text)
                if start_index == -1:
                    print(f"Warning: '{original_text}' not found in text.")
                    continue

                # Calculate the end index based on the found start index and the length of the original text
                end_index = start_index + len(original_text)

                # Create a new adjusted annotation
                adjusted_result = {
                    'value': {
                        'start': start_index,
                        'end': end_index,
                        'text': original_text,
                        'labels': value['labels']
                    },
                    'id': result['id'],
                    'from_name': result['from_name'],
                    'to_name': result['to_name'],
                    'type': result['type'],
                    'origin': result['origin']
                }

                # Append the adjusted result to the list
                adjusted_results.append(adjusted_result)

            # Replace the original results with the adjusted ones for the current annotation
            annotation['result'] = adjusted_results



In [3]:
tokens_array = []
bio_tags_array = []

In [4]:
def bio_tagging(data):
    # Ensure data is a list
    if not isinstance(data, list):
        raise ValueError("Expected a list of data entries.")

    for entry in data:
        # Ensure entry has the correct structure
        if not isinstance(entry, dict) or "data" not in entry or "text" not in entry["data"]:
            raise ValueError("Entry does not have the correct structure.")

        sentence = entry["data"]["text"]
        tokens = sentence.split()  # Tokenize by whitespace for simplicity
        bio_labels = ["O"] * len(tokens)  # Initialize all tokens with 'O' labels

        # Extract annotations to update the bio_labels if needed
        for annotation in entry.get("annotations", []):
            for result in annotation.get("result", []):
                label = result["value"]["labels"][0]  # Assume one label for simplicity
                start = result["value"]["start"]
                end = result["value"]["end"]

                # Ensure start and end indices are valid
                if start < 0 or end > len(sentence):
                    continue

                # Calculate token indices for start and end
                current_position = 0
                start_token_index = None
                end_token_index = None

                for i, token in enumerate(tokens):
                    token_length = len(token) + (1 if current_position > 0 else 0)  # Add 1 for space
                    if start >= current_position and start < current_position + token_length:
                        start_token_index = i
                    if end > current_position and end <= current_position + token_length:
                        end_token_index = i
                    current_position += token_length

                    # Break if both indices are found
                    if start_token_index is not None and end_token_index is not None:
                        break

                # Validate token indices before marking BIO labels
                if start_token_index is not None and start_token_index < len(bio_labels):
                    # Mark the BIO tags
                    if bio_labels[start_token_index] == "O":  # Only mark B if it hasn't been marked yet
                        bio_labels[start_token_index] = f"B-{label}"

                    # Ensure end_token_index is valid for the loop
                    if end_token_index is not None and end_token_index < len(bio_labels):
                        for i in range(start_token_index + 1, end_token_index + 1):
                            bio_labels[i] = f"I-{label}"

        tokens_array.append(tokens)
        bio_tags_array.append(bio_labels)





In [5]:
import json

# Load your JSON file from Colab's content directory
json_path = '/content/drive/MyDrive/Exported_v_c_i_y_Data.json'  # Replace with your actual file name
with open(json_path, 'r') as file:
    data = json.load(file)

adjust_indices(data)
bio_tagging(data)

# Output tokens and BIO tags
print("Tokens Array:", len(tokens_array))
print("BIO Tags Array:", len(bio_tags_array))

Tokens Array: 40000
BIO Tags Array: 40000


In [6]:
import json

# Load your JSON file from Colab's content directory
json_path = '/content/drive/MyDrive/date_json.json'  # Replace with your actual file name
with open(json_path, 'r') as file:
    data = json.load(file)

adjust_indices(data)
bio_tagging(data)

# Output tokens and BIO tags
print("Tokens Array:", len(tokens_array))
print("BIO Tags Array:", len(bio_tags_array))

Tokens Array: 40050
BIO Tags Array: 40050


In [7]:
import json

# Load your JSON file from Colab's content directory
json_path = '/content/drive/MyDrive/Exported_Vendor_Invoice_Data.json'  # Replace with your actual file name
with open(json_path, 'r') as file:
    data = json.load(file)

adjust_indices(data)
bio_tagging(data)

# Output tokens and BIO tags
print("Tokens Array:", len(tokens_array))
print("BIO Tags Array:", len(bio_tags_array))

Tokens Array: 44050
BIO Tags Array: 44050


In [8]:
import json

# Load your JSON file from Colab's content directory
json_path = '/content/drive/MyDrive/Exported_vendors_component_year_Data.json'  # Replace with your actual file name
with open(json_path, 'r') as file:
    data = json.load(file)

adjust_indices(data)
bio_tagging(data)

# Output tokens and BIO tags
print("Tokens Array:", len(tokens_array))
print("BIO Tags Array:", len(bio_tags_array))

Tokens Array: 48050
BIO Tags Array: 48050


In [9]:
import json

# Load your JSON file from Colab's content directory
json_path = '/content/drive/MyDrive/Exported_vendors_invoice_year_Data.json'  # Replace with your actual file name
with open(json_path, 'r') as file:
    data = json.load(file)

adjust_indices(data)
bio_tagging(data)

# Output tokens and BIO tags
print("Tokens Array:", len(tokens_array))
print("BIO Tags Array:", len(bio_tags_array))

Tokens Array: 52050
BIO Tags Array: 52050


In [10]:
import json

# Load your JSON file from Colab's content directory
json_path = '/content/drive/MyDrive/ExportedInvoiceData.json'  # Replace with your actual file name
with open(json_path, 'r') as file:
    data = json.load(file)

adjust_indices(data)
bio_tagging(data)

# Output tokens and BIO tags
print("Tokens Array:", len(tokens_array))
print("BIO Tags Array:", len(bio_tags_array))

Tokens Array: 52450
BIO Tags Array: 52450


In [11]:
import json

# Load your JSON file from Colab's content directory
json_path = '/content/drive/MyDrive/ExportedVendorData.json'  # Replace with your actual file name
with open(json_path, 'r') as file:
    data = json.load(file)

adjust_indices(data)
bio_tagging(data)

# Output tokens and BIO tags
print("Tokens Array:", len(tokens_array))
print("BIO Tags Array:", len(bio_tags_array))

Tokens Array: 52850
BIO Tags Array: 52850


In [12]:
idx=45100
print(tokens_array[idx])
print(bio_tags_array[idx])

['M/S', 'Ecel', 'pest', 'Control', 'shipped', 'critical', 'Valrack', '12u', 'in', '2023.']
['B-vendors', 'I-vendors', 'I-vendors', 'I-vendors', 'O', 'O', 'B-component', 'I-component', 'O', 'B-year']


In [13]:
# Step 1: Define the mapping
label_mapping = {
    'O': 0,
    'B-component': 1,
    'I-component': 2,
    'B-year': 3,
    'I-year': 4,
    'B-vendors': 5,
    'I-vendors': 6,
    'B-invoice': 7,
    'I-invoice': 8
}

# Step 2: Replace BIO tags with corresponding numbers
encoded_bio_tags_array = []
for sublist in bio_tags_array:
    encoded_sublist = []
    for tag in sublist:
        try:
            encoded_sublist.append(label_mapping[tag])
        except KeyError:
            print(f"Warning: '{tag}' not found in label_mapping.")
            encoded_sublist.append(None)  # Append None or a specific value for unknown tags
    encoded_bio_tags_array.append(encoded_sublist)


In [14]:
encoded_bio_tags_array[:5]

[[0, 3, 5, 6, 6, 6, 6, 6, 6, 0, 0, 0, 7, 0, 0, 1, 2, 0],
 [0, 3, 5, 6, 6, 6, 6, 6, 6, 0, 0, 0, 7, 0, 0, 1, 2, 0],
 [0, 3, 5, 6, 6, 0, 0, 0, 7, 0, 0, 1, 2, 0],
 [0, 3, 5, 6, 6, 6, 6, 0, 0, 0, 7, 0, 0, 1, 2, 0],
 [0, 3, 5, 6, 6, 6, 6, 6, 0, 0, 0, 7, 0, 0, 1, 2, 0]]

In [15]:
from sklearn.model_selection import train_test_split

x_train,x_test,y_train,y_test=train_test_split(tokens_array,encoded_bio_tags_array,test_size=0.5,random_state=50)

In [16]:
print("Training Samples",len(x_train))
print("Testing Samples",len(x_test))

Training Samples 26425
Testing Samples 26425


In [17]:
!pip install transformers datasets evaluate seqeval



In [18]:
import pandas as pd
from datasets import Dataset
train_df = pd.DataFrame({'tokens': x_train, 'ner_tags': y_train})
train_dataset = Dataset.from_pandas(train_df)
test_df = pd.DataFrame({'tokens': x_test, 'ner_tags': y_test})
test_dataset = Dataset.from_pandas(test_df)

In [19]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("roberta-base",add_prefix_space=True)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [20]:
example=tokens_array[0]
tokenized_input = tokenizer(example, is_split_into_words=True)
tokens = tokenizer.convert_ids_to_tokens(tokenized_input["input_ids"])
tokens

['<s>',
 'ĠIn',
 'Ġ20',
 '23',
 ',',
 'ĠM',
 '/',
 'S',
 'ĠNov',
 'ot',
 'ure',
 'ĠEle',
 'cr',
 'ical',
 'Ġ&',
 'digital',
 'Ġsystems',
 'Ġ(',
 'p',
 ')',
 'Ġl',
 'td',
 'Ġprovided',
 'Ġan',
 'Ġinvoice',
 'ĠINV',
 '-',
 '978',
 '76',
 '332',
 'Ġfor',
 'Ġthe',
 'ĠVal',
 'rack',
 'Ġ12',
 'u',
 'Ġorder',
 '.',
 '</s>']

In [21]:
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)

    labels = []
    for i, label in enumerate(examples[f"ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)  # Map tokens to their respective word.
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:  # Set the special tokens to -100.
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:  # Only label the first token of a given word.
                label_ids.append(label[word_idx])
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx
        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [22]:
tokenized_train = train_dataset.map(tokenize_and_align_labels, batched=True)
tokenized_test = test_dataset.map(tokenize_and_align_labels, batched=True)

Map:   0%|          | 0/26425 [00:00<?, ? examples/s]

Map:   0%|          | 0/26425 [00:00<?, ? examples/s]

In [23]:
from transformers import DataCollatorForTokenClassification

data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer, return_tensors="tf")

In [24]:
import evaluate

seqeval = evaluate.load("seqeval")

In [25]:
import numpy as np

label_map = {
    0:'O',
    1:'B-component',
    2:'I-component',
    3:'B-year',
    4:'I-year',
    5:'B-vendors',
    6:'I-vendors',
    7:'B-invoice',
    8:'I-invoice'
}

labels = label_map.keys()
print(labels)


def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    true_predictions = [
        [label_map[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_map[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = seqeval.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

dict_keys([0, 1, 2, 3, 4, 5, 6, 7, 8])


In [26]:
id2label = {
    0:'O',
    1:'B-component',
    2:'I-component',
    3:'B-year',
    4:'I-year',
    5:'B-vendors',
    6:'I-vendors',
    7:'B-invoice',
    8:'I-invoice'
}
label2id = {
    'O': 0,
    'B-component': 1,
    'I-component': 2,
    'B-year': 3,
    'I-year': 4,
    'B-vendors': 5,
    'I-vendors': 6,
    'B-invoice': 7,
    'I-invoice': 8
}

In [27]:
from transformers import create_optimizer

batch_size = 16
num_train_epochs = 3
num_train_steps = (len(tokenized_train) // batch_size) * num_train_epochs
optimizer, lr_schedule = create_optimizer(
    init_lr=2e-5,
    num_train_steps=num_train_steps,
    weight_decay_rate=0.01,
    num_warmup_steps=0,
)

In [28]:
from transformers import TFAutoModelForTokenClassification

model = TFAutoModelForTokenClassification.from_pretrained(
    "roberta-base", num_labels=9, id2label=id2label, label2id=label2id
)

Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFRobertaForTokenClassification: ['roberta.embeddings.position_ids']
- This IS expected if you are initializing TFRobertaForTokenClassification from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFRobertaForTokenClassification from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
Some weights or buffers of the TF 2.0 model TFRobertaForTokenClassification were not initialized from the PyTorch model and are newly initialized: ['classifier.weight', 'classifier.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [29]:
tf_train_set = model.prepare_tf_dataset(
    tokenized_train,
    shuffle=True,
    batch_size=16,
    collate_fn=data_collator,
)

tf_validation_set = model.prepare_tf_dataset(
    tokenized_test,
    shuffle=False,
    batch_size=16,
    collate_fn=data_collator,
)

In [30]:
import tensorflow as tf

model.compile(optimizer=optimizer)  # No loss argument!

In [31]:
from transformers.keras_callbacks import KerasMetricCallback

metric_callback = KerasMetricCallback(metric_fn=compute_metrics, eval_dataset=tf_validation_set)

In [32]:
callbacks = [metric_callback]

In [33]:
from time import time


In [34]:
start_time=time()
model.fit(x=tf_train_set, validation_data=tf_validation_set, epochs=3, callbacks=callbacks)
training_time=abs(start_time-time())
print("Training Time: ",training_time)

Epoch 1/3
Epoch 2/3
Epoch 3/3
Training Time:  1726.325121641159


In [35]:
start_time=time()
model.evaluate(tf_validation_set)
testing_time=abs(start_time-time())
print("Testing Time: ",testing_time)

Testing Time:  141.9361548423767


In [48]:
model.save_pretrained("/content/drive/MyDrive/ChatBotExampleEntityModel")
tokenizer.save_pretrained("/content/drive/MyDrive/ChatBotExampleEntityModel")

('/content/drive/MyDrive/ChatBotExampleEntityModel/tokenizer_config.json',
 '/content/drive/MyDrive/ChatBotExampleEntityModel/special_tokens_map.json',
 '/content/drive/MyDrive/ChatBotExampleEntityModel/vocab.json',
 '/content/drive/MyDrive/ChatBotExampleEntityModel/merges.txt',
 '/content/drive/MyDrive/ChatBotExampleEntityModel/added_tokens.json',
 '/content/drive/MyDrive/ChatBotExampleEntityModel/tokenizer.json')

In [49]:
from transformers import AutoTokenizer, TFAutoModelForTokenClassification
model=TFAutoModelForTokenClassification.from_pretrained("/content/drive/MyDrive/ChatBotExampleEntityModel")
tokenizer=AutoTokenizer.from_pretrained("/content/drive/MyDrive/ChatBotExampleEntityModel")

Some layers from the model checkpoint at /content/drive/MyDrive/ChatBotExampleEntityModel were not used when initializing TFRobertaForTokenClassification: ['dropout_37']
- This IS expected if you are initializing TFRobertaForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFRobertaForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
All the layers of TFRobertaForTokenClassification were initialized from the model checkpoint at /content/drive/MyDrive/ChatBotExampleEntityModel.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFRobertaForTokenClassification for predictions without further training.


In [36]:
text = "tell me details of invoice INV-123456 of ms nisha computers of valrack12 purchased in 2023 and 2024"

In [37]:
tokenizer.tokenize(text)

['Ġtell',
 'Ġme',
 'Ġdetails',
 'Ġof',
 'Ġinvoice',
 'ĠINV',
 '-',
 '123',
 '456',
 'Ġof',
 'Ġms',
 'Ġn',
 'isha',
 'Ġcomputers',
 'Ġof',
 'Ġval',
 'rack',
 '12',
 'Ġpurchased',
 'Ġin',
 'Ġ20',
 '23',
 'Ġand',
 'Ġ2024']

In [38]:
from transformers import pipeline

classifier = pipeline("ner", model=model,tokenizer=tokenizer,device=0)
response=classifier(text)
response

[{'entity': 'B-invoice',
  'score': 0.99991596,
  'index': 6,
  'word': 'ĠINV',
  'start': 27,
  'end': 30},
 {'entity': 'B-invoice',
  'score': 0.99982005,
  'index': 7,
  'word': '-',
  'start': 30,
  'end': 31},
 {'entity': 'B-invoice',
  'score': 0.96974975,
  'index': 8,
  'word': '123',
  'start': 31,
  'end': 34},
 {'entity': 'B-vendors',
  'score': 0.9998807,
  'index': 11,
  'word': 'Ġms',
  'start': 41,
  'end': 43},
 {'entity': 'I-vendors',
  'score': 0.99992144,
  'index': 12,
  'word': 'Ġn',
  'start': 44,
  'end': 45},
 {'entity': 'I-vendors',
  'score': 0.99993265,
  'index': 13,
  'word': 'isha',
  'start': 45,
  'end': 49},
 {'entity': 'I-vendors',
  'score': 0.9999386,
  'index': 14,
  'word': 'Ġcomputers',
  'start': 50,
  'end': 59},
 {'entity': 'B-component',
  'score': 0.99875057,
  'index': 16,
  'word': 'Ġval',
  'start': 63,
  'end': 66},
 {'entity': 'I-component',
  'score': 0.9999367,
  'index': 17,
  'word': 'rack',
  'start': 66,
  'end': 70},
 {'entity': '

In [41]:
d={}
for i in response:
  if i['entity'] not in d:
    d[i['entity']]=[i['word']]
  else:
    d[i['entity']].append(i['word'])
d

{'B-invoice': ['ĠINV', '-', '123'],
 'B-vendors': ['Ġms'],
 'I-vendors': ['Ġn', 'isha', 'Ġcomputers'],
 'B-component': ['Ġval'],
 'I-component': ['rack', '12'],
 'B-year': ['Ġ20', '23', 'Ġ2024']}

In [42]:
entities = {
    'components': '',
    'years': '',
    'vendors': '',
    'invoices': ''
    }
labels = {
    'B-component': 'components',
    'I-component': 'components',
    'B-year': 'years',
    'I-year': 'years',
    'B-vendors': 'vendors',
    'I-vendors': 'vendors',
    'B-invoice': 'invoices',
    'I-invoice': 'invoices'
}

In [43]:
for i in d:
  arr=d[i]
  s=''
  for j in arr:
    s+=j
  entities[labels[i]]+=s

In [44]:
for i in entities:
  entities[i]=entities[i].split('Ġ')

In [45]:
for i in entities:
  arr=[]
  for j in entities[i]:
    if j!='':
      arr.append(j)
  entities[i]=arr

In [46]:
for i in entities:
  if i=='components':
    s=''
    for j in entities[i]:
      s+=j+' '
    entities[i]=s.strip()
  elif i=='vendors':
    s=''
    for j in entities[i]:
      s+=j+' '
    entities[i]=s.strip()

In [47]:
entities

{'components': 'valrack12',
 'years': ['2023', '2024'],
 'vendors': 'ms nisha computers',
 'invoices': ['INV-123']}

In [None]:
text = "tell me details of invoice INV-123456 of ms nisha computers, central stores and appario of valrack12 and curtaincloth purchased in 2023 and 2024"

In [None]:
def get_entities(text):
    response=entity_classifier(text)

    d={}
    for i in response:
        if i['entity'] not in d:
            d[i['entity']]=[i['word']]
        else:
            d[i['entity']].append(i['word'])

    entities = {
        'components': '',
        'years': '',
        'vendors': '',
        'invoices': ''
        }
    labels = {
        'B-component': 'components',
        'I-component': 'components',
        'B-year': 'years',
        'I-year': 'years',
        'B-vendors': 'vendors',
        'I-vendors': 'vendors',
        'B-invoice': 'invoices',
        'I-invoice': 'invoices'
    }


    for i in d:
        arr=d[i]
        s=''
        for j in arr:
            s+=j
        entities[labels[i]]+=s

    for i in entities:
        entities[i]=entities[i].split('Ġ')

    for i in entities:
        arr=[]
        for j in entities[i]:
            if j!='':
                arr.append(j)
        entities[i]=arr


    return entities

get_entities(text)

In [None]:
file_path = '/content/SampleTest.txt'
with open(file_path, 'r') as file:
  file_contents = file.readlines()
  for i in file_contents:
    print('\n\n')
    print(i)
    print(get_entities(i),'end')
