This notebook illustrates the process of finetuning BERT for tech product names Named Entity Recognition (NER) by generating training data with GPT-4 (accessed via OpenAI API)

The initial dataset you need is a list of product names. For effective finetuning, you should aim for at least a few thousand data points.

In [None]:
# sample_initial dataset = ['MacBook Pro',  'DIZO Star 500', 'Asus ZenBook UX430UN', 'Acer Aspire 3', etc]

We will now generate training data with the initial dataset by using GPT-4 though Langchain and OpenAI API.

In [None]:
# import Langchain & OpenAI to set up
from langchain.prompts.prompt import PromptTemplate
from langchain.chains import LLMChain
from langchain.chat_models import ChatOpenAI

In [None]:
# set up LLM. set temperature to 0.9 to prevent generating crazy sentences.
chat_llm = ChatOpenAI(openai_api_key = 'YOUR OPENAI API KEY HERE', model_name='gpt-4', temperature=0.9)

In [None]:
# write prompt templates to be passed to GPT-4. more templates allows generating more diverse training data. I give 3 examples here but you should add more.
# very important to specify the exact number of sentences to generate. 68 is just a number that works for my dataset; you can change it. separate each sentence with a separator so that the sentences can be easily split later.
template1 = '''
you are a customer complaining about the product you bought being lost or damanged. write a 1 sentence complaint about each of the following 68 products. you will generate 68 sentences. separate each sentence with #. insert # after the ending punctuation of the sentence.

products:
{product_names}
'''

template2 = '''
you are a tech reviewer. write 1 sentence about each of the following 68 products. you will generate 68 sentences. separate each sentence with #. insert # after the ending punctuation of the sentence.

products:
{product_names}
'''

template3 = '''
you are a customer shopping for laptops and mobile phones. you are talking to a customer support agent in an online chat. write a 1 sentence inquiry about each of the following 68 products. you will generate 68 sentences. separate each sentence with #. insert # after the ending punctuation of the sentence.

products:
{product_names}
'''


In [None]:
# function dividing initial dataset into smaller chunks to passed through OpenAI API. The API can't hand too much data at once.
def split_initial_dataset(initial_dataset, num_chunks):
    avg_chunk_size = len(initial_dataset) // num_chunks
    chunks = []
    for i in range(num_chunks):
        start_index = i * avg_chunk_size + min(i, len(initial_dataset) % num_chunks)
        end_index = start_index + avg_chunk_size + (1 if i < len(initial_dataset) % num_chunks else 0)
        chunks.append(initial_dataset.iloc[start_index:end_index])
    return chunks

In [None]:
# function for generating training data by asking GPT-4 to write a sentence for each product name in the list. for each list, the function ramdonly chooses a prompt template to increase the variety of the training data
templates_list = [template1, template2, template3]
def generate_responses(product_names_list):
    product_names = product_names_list
    prompt = PromptTemplate(template = random.choice(templates_list), input_variables=['product_names'])
    llm_chain = LLMChain(prompt = prompt, llm = chat_llm, verbose= True)
    response = llm_chain.run({'product_names': product_names})
    return response

In [None]:
# generate training data 3 chunks at a time. you can change the number, but keep it small since the API might malfunction if you pass too many at a time.
llm_response_list = []
for i in range(0, len(chunks), 3):
    chunk = chunks[i:i+3]
    for product_names_list in chunk:
        response = generate_responses(product_names_list)
        llm_response_list.append(response)

In [None]:
# function for spliting the LLM response into individiual sentences
def split_string(input_string):
    split_list = input_string.split('#')
    split_list = [sentence.strip() for sentence in split_list if sentence.strip()]
    return split_list

In [None]:
# append generated sentences to a list. this is the generated training data.
sentences = []
for i in llm_response_list:
    split_sentences_list = split_string(i)
    sentences.extend(split_sentences_list)

If the training data generation was done correctly, you now should have two lists of equal length: a product name list (same as the initial dataset list) and a sentence list.

In [None]:
# set up nltk tokenizer
import nltk
from nltk.tokenize import word_tokenize

In [None]:
# function for finding the indexes of the product name keywords in the sentence. supports passing in a single pair of sentence and product name
def find_keyword_indexes(sentence, product_name):
    # Tokenizing the sentence and keywords with nltk tokenizer
    words = word_tokenize(sentence)
    keyword_words = word_tokenize(product_name)
    words = [word.lower() for word in words]
    keyword_words = [word.lower() for word in keyword_words]

    keyword_indexes = []

    i = 0
    while i <= len(words) - len(keyword_words):
        if words[i:i+len(keyword_words)] == keyword_words:
            keyword_indexes.extend(range(i, i+len(keyword_words)))
        i += 1

    return keyword_indexes

In [None]:
# function for finding the indexes of the product name keywords in the sentence. supports passing in lists of sentences and product names. returns a zipped list
def find_keywords_indexes(sentences_list, product_name_list):
    indexes_list = []
    for sentence, keywords in list(zip(sentences_list, product_name_list)):
        indexes = find_keyword_indexes(sentence, keywords)
        indexes_list.append(indexes)
    return list(zip(sentences_list, indexes_list))

In [None]:
# pass the zipped list to this function to generate NER tags for each token
def generate_tags(zipped_list):
    word_labels_list = []
    for sentence, indexes in zipped_list:
        labels_list = []
        tokens = word_tokenize(sentence)
        for token in tokens:
            index = tokens.index(token)
            if index == indexes[0]:
                labels_list.append('B-pn')
            elif index in indexes and index != indexes[0]:
                labels_list.append('I-pn')
            else:
                labels_list.append('O')
        labels = ', '.join(labels_list)
        word_labels_list.append(labels)
    return word_labels_list

In [None]:
# function for converting lists into a pandas dataframe
import pandas as pd
def lists_to_dataframe(list1, list2, column_names=['Column1', 'Column2']):
    if len(list1) != len(list2):
        raise ValueError("Input lists must have the same length.")
    data = {column_names[0]: list1, column_names[1]: list2}
    df = pd.DataFrame(data)
    return df

In [None]:
# create a dataframe that contains a column of sentences and a column of the corresponding sentences' NER tags
data = lists_to_dataframe(sentences_list, word_labels_list, column_names=['sentence', 'word_labels'])

In [None]:
# remove spaces in the word_labels column
data['word_labels'] = data['word_labels'].str.replace(' ', '')

In [None]:
# assign NER tags numerical ids
label2id = {'I-pn': 2, 'B-pn': 1, 'O': 0}
id2label = {0 : 'O', 1 : 'B-pn', 2: 'I-pn'}

In [None]:
# import libraries for training
import numpy as np
from sklearn.metrics import accuracy_score
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertConfig, BertForTokenClassification

In [None]:
# check cuda
from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'
print(device)

In [None]:
# configure training
MAX_LEN = 128
TRAIN_BATCH_SIZE = 4
VALID_BATCH_SIZE = 2
EPOCHS = 1
LEARNING_RATE = 1e-05
MAX_GRAD_NORM = 10
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
# set up BERT tokenizer
def tokenize_and_preserve_labels(sentence, text_labels, tokenizer):
    """
    Word piece tokenization makes it difficult to match word labels
    back up with individual word pieces. This function tokenizes each
    word one at a time so that it is easier to preserve the correct
    label for each subword. It is, of course, a bit slower in processing
    time, but it will help our model achieve higher accuracy.
    """

    tokenized_sentence = []
    labels = []

    sentence = sentence.strip()

    for word, label in zip(sentence.split(), text_labels.split(",")):

        # Tokenize the word and count # of subwords the word is broken into
        tokenized_word = tokenizer.tokenize(word)
        n_subwords = len(tokenized_word)

        # Add the tokenized word to the final tokenized word list
        tokenized_sentence.extend(tokenized_word)

        # Add the same label to the new list of labels `n_subwords` times
        labels.extend([label] * n_subwords)

    return tokenized_sentence, labels

In [None]:
# define dataset class to pre-process dataset for training
class dataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        self.len = len(dataframe)
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __getitem__(self, index):
        # step 1: tokenize (and adapt corresponding labels)
        sentence = self.data.sentence[index]
        word_labels = self.data.word_labels[index]
        tokenized_sentence, labels = tokenize_and_preserve_labels(sentence, word_labels, self.tokenizer)

        # step 2: add special tokens (and corresponding labels)
        tokenized_sentence = ["[CLS]"] + tokenized_sentence + ["[SEP]"] # add special tokens
        labels.insert(0, "O") # add outside label for [CLS] token
        labels.insert(-1, "O") # add outside label for [SEP] token

        # step 3: truncating/padding
        maxlen = self.max_len

        if (len(tokenized_sentence) > maxlen):
          # truncate
          tokenized_sentence = tokenized_sentence[:maxlen]
          labels = labels[:maxlen]
        else:
          # pad
          tokenized_sentence = tokenized_sentence + ['[PAD]'for _ in range(maxlen - len(tokenized_sentence))]
          labels = labels + ["O" for _ in range(maxlen - len(labels))]

        # step 4: obtain the attention mask
        attn_mask = [1 if tok != '[PAD]' else 0 for tok in tokenized_sentence]

        # step 5: convert tokens to input ids
        ids = self.tokenizer.convert_tokens_to_ids(tokenized_sentence)

        label_ids = [label2id[label] for label in labels]
        # the following line is deprecated
        #label_ids = [label if label != 0 else -100 for label in label_ids]

        return {
              'ids': torch.tensor(ids, dtype=torch.long),
              'mask': torch.tensor(attn_mask, dtype=torch.long),
              #'token_type_ids': torch.tensor(token_ids, dtype=torch.long),
              'targets': torch.tensor(label_ids, dtype=torch.long)
        }

    def __len__(self):
        return self.len

In [None]:
# split dataset into training and testing
train_size = 0.8
train_dataset = data.sample(frac=train_size,random_state=200)
test_dataset = data.drop(train_dataset.index).reset_index(drop=True)
train_dataset = train_dataset.reset_index(drop=True)

print("FULL Dataset: {}".format(data.shape))
print("TRAIN Dataset: {}".format(train_dataset.shape))
print("TEST Dataset: {}".format(test_dataset.shape))

training_set = dataset(train_dataset, tokenizer, MAX_LEN)
testing_set = dataset(test_dataset, tokenizer, MAX_LEN)

In [None]:
# set up params and dataloaders
train_params = {'batch_size': TRAIN_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

test_params = {'batch_size': VALID_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

training_loader = DataLoader(training_set, **train_params)
testing_loader = DataLoader(testing_set, **test_params)

In [None]:
# load BERT model
model = BertForTokenClassification.from_pretrained('bert-base-uncased',
                                                   num_labels=len(id2label),
                                                   id2label=id2label,
                                                   label2id=label2id)
model.to(device)

In [None]:
# verify initial loss
ids = training_set[0]["ids"].unsqueeze(0)
mask = training_set[0]["mask"].unsqueeze(0)
targets = training_set[0]["targets"].unsqueeze(0)
ids = ids.to(device)
mask = mask.to(device)
targets = targets.to(device)
outputs = model(input_ids=ids, attention_mask=mask, labels=targets)
initial_loss = outputs[0]
initial_loss

In [None]:
# verify tensor shape
tr_logits = outputs[1]
tr_logits.shape

In [None]:
# set up optimizer
optimizer = torch.optim.Adam(params=model.parameters(), lr=LEARNING_RATE)

In [None]:
# start training
def train(epoch):
    tr_loss, tr_accuracy = 0, 0
    nb_tr_examples, nb_tr_steps = 0, 0
    tr_preds, tr_labels = [], []
    # put model in training mode
    model.train()

    for idx, batch in enumerate(training_loader):

        ids = batch['ids'].to(device, dtype = torch.long)
        mask = batch['mask'].to(device, dtype = torch.long)
        targets = batch['targets'].to(device, dtype = torch.long)

        outputs = model(input_ids=ids, attention_mask=mask, labels=targets)
        loss, tr_logits = outputs.loss, outputs.logits
        tr_loss += loss.item()

        nb_tr_steps += 1
        nb_tr_examples += targets.size(0)

        if idx % 100==0:
            loss_step = tr_loss/nb_tr_steps
            print(f"Training loss per 100 training steps: {loss_step}")

        # compute training accuracy
        flattened_targets = targets.view(-1) # shape (batch_size * seq_len,)
        active_logits = tr_logits.view(-1, model.num_labels) # shape (batch_size * seq_len, num_labels)
        flattened_predictions = torch.argmax(active_logits, axis=1) # shape (batch_size * seq_len,)
        # now, use mask to determine where we should compare predictions with targets (includes [CLS] and [SEP] token predictions)
        active_accuracy = mask.view(-1) == 1 # active accuracy is also of shape (batch_size * seq_len,)
        targets = torch.masked_select(flattened_targets, active_accuracy)
        predictions = torch.masked_select(flattened_predictions, active_accuracy)

        tr_preds.extend(predictions)
        tr_labels.extend(targets)

        tmp_tr_accuracy = accuracy_score(targets.cpu().numpy(), predictions.cpu().numpy())
        tr_accuracy += tmp_tr_accuracy

        # gradient clipping
        torch.nn.utils.clip_grad_norm_(
            parameters=model.parameters(), max_norm=MAX_GRAD_NORM
        )

        # backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    epoch_loss = tr_loss / nb_tr_steps
    tr_accuracy = tr_accuracy / nb_tr_steps
    print(f"Training loss epoch: {epoch_loss}")
    print(f"Training accuracy epoch: {tr_accuracy}")

for epoch in range(EPOCHS):
  print(f"Training epoch: {epoch + 1}")
  train(epoch)

In [None]:
# evaluate trained model performance
def valid(model, testing_loader):
    # put model in evaluation mode
    model.eval()

    eval_loss, eval_accuracy = 0, 0
    nb_eval_examples, nb_eval_steps = 0, 0
    eval_preds, eval_labels = [], []

    with torch.no_grad():
        for idx, batch in enumerate(testing_loader):

            ids = batch['ids'].to(device, dtype = torch.long)
            mask = batch['mask'].to(device, dtype = torch.long)
            targets = batch['targets'].to(device, dtype = torch.long)

            outputs = model(input_ids=ids, attention_mask=mask, labels=targets)
            loss, eval_logits = outputs.loss, outputs.logits

            eval_loss += loss.item()

            nb_eval_steps += 1
            nb_eval_examples += targets.size(0)

            if idx % 100==0:
                loss_step = eval_loss/nb_eval_steps
                print(f"Validation loss per 100 evaluation steps: {loss_step}")

            # compute evaluation accuracy
            flattened_targets = targets.view(-1) # shape (batch_size * seq_len,)
            active_logits = eval_logits.view(-1, model.num_labels) # shape (batch_size * seq_len, num_labels)
            flattened_predictions = torch.argmax(active_logits, axis=1) # shape (batch_size * seq_len,)
            # now, use mask to determine where we should compare predictions with targets (includes [CLS] and [SEP] token predictions)
            active_accuracy = mask.view(-1) == 1 # active accuracy is also of shape (batch_size * seq_len,)
            targets = torch.masked_select(flattened_targets, active_accuracy)
            predictions = torch.masked_select(flattened_predictions, active_accuracy)

            eval_labels.extend(targets)
            eval_preds.extend(predictions)

            tmp_eval_accuracy = accuracy_score(targets.cpu().numpy(), predictions.cpu().numpy())
            eval_accuracy += tmp_eval_accuracy

    #print(eval_labels)
    #print(eval_preds)

    labels = [id2label[id.item()] for id in eval_labels]
    predictions = [id2label[id.item()] for id in eval_preds]

    #print(labels)
    #print(predictions)

    eval_loss = eval_loss / nb_eval_steps
    eval_accuracy = eval_accuracy / nb_eval_steps
    print(f"Validation Loss: {eval_loss}")
    print(f"Validation Accuracy: {eval_accuracy}")

    return labels, predictions

labels, predictions = valid(model, testing_loader)

In [None]:
# check precision, recall, and f1-score
from seqeval.metrics import classification_report

print(classification_report([labels], [predictions]))