<img src="images/bert-2phase.jpg">
<caption><center>BERT model for transfer learning: use a pre-trained BERT model and fine tune for your downstream NLP tasks</center></caption>

BERT pre-trained model on PyTorch Hub:

bert-base-chinese、bert-base-uncased、bert-base-cased、bert-base-german-cased、bert-base-multilingual-uncased、bert-base-multilingual-cased、bert-large-cased、bert-large-uncased、bert-large-uncased-whole-word-masking、bert-large-cased-whole-word-masking

In [None]:
import torch
from transformers import BertTokenizer
from IPython.display import clear_output

PRETRAINED_MODEL_NAME = "bert-base-chinese"
#PRETRAINED_MODEL_NAME = 'bert-base-cased'
#PRETRAINED_MODEL_NAME = 'bert-base-uncased'

# Download BERT Chinese tokenizer model
tokenizer = BertTokenizer.from_pretrained(PRETRAINED_MODEL_NAME)

print("PyTorch version: ", torch.__version__)

In [None]:
import transformers as t
t.__version__

In [None]:
vocab = tokenizer.vocab
print("Size of the dictionary: ", len(vocab))

type(vocab)
#list(vocab)

Some of the tokens in Chinese BERT dictionary

In [None]:
import random
random_tokens = random.sample(list(vocab), 10)
random_ids = [vocab[t] for t in random_tokens]

print("{0:20}{1:15}".format("token", "index"))
print("-" * 25)
for t, id in zip(random_tokens, random_ids):
    print("{0:15}{1:10}".format(t, id))

BERT uses the WordPiece Tokenization developed with Google NMT. Wordpiece tokens are with prefix "##".

Taiwan's phonetic symbol, bopomofo is also collected in the Chinese BERT dictionary.

In [None]:
indices = list(range(647, 666))
some_pairs = [(t, idx) for t, idx in vocab.items() if idx in indices]
for pair in some_pairs:
    print(pair)

Tokenize with BERT tokenizer:

In [None]:
tokenizer.tokenize('九到十二個月')

In [None]:
text = "神愛世人，甚至將祂的獨生子賜給他們，叫一切信祂的，不至滅亡，反得永生。（約翰福音3:16）"
#text = "起初，神創造天地。地是空虛混沌，淵面黑暗。神的靈運行在水面上，神說：「要有光」，就有了光。（創世紀1:1~3）"
#text = "愛是恆久忍耐，又有恩慈；愛是不嫉妒；愛是不自誇，不張狂，不做害羞的事，不求自己的益處，不輕易發怒，不計算人的惡，不喜歡不義，只喜歡真理；凡事包容，凡事相信，凡事盼望，凡事忍耐。愛是永不止息。（哥前13:4~8a）"
tokens = tokenizer.tokenize(text)
ids = tokenizer.convert_tokens_to_ids(tokens)

print(text)
print(tokens)
print(ids)

There are five special tokens in BERT:

[CLS]：representation for the classification of the input sequence.

[SEP]：boundary of the two input sequences.

[UNK]：token for wordpieces not in the BERT dictionary.

[PAD]：zero padding in one batch.

[MASK]：mask token used in the Masked Language Model task.

BERT is a very powerful language representation model that could be used in many downstream NLP tasks.

1. Prepare text dataset

In [None]:
data_source = 'data/training_data.txt'
lines = open(data_source, 'r', encoding='UTF-8').read().strip().split('\n')

In [None]:
label = []
text = []

for l in lines:
    label.append(l.split(' +++$+++ ')[0])
    text.append(l.split(' +++$+++ ')[1])

In [None]:
import pandas as pd

data_dic = {
    "text": text, 
    "label": label,
}
df_train = pd.DataFrame(data_dic)

# Save the training dataset to tsv file for PyTorch
df_train.to_csv("data/train.tsv", sep="\t", index=False)

In [None]:
test_data_source = 'data/testing_data.csv'
lines = open(test_data_source, 'r', encoding='UTF-8').read().strip().split('\n')

In [None]:
test_label = []
test_text = []

for l in lines[1:]:
    test_label.append(l[0])
    test_text.append(l[2:])

In [None]:
test_dic = {
    "text": test_text, 
    "label": test_label,
}
df_test = pd.DataFrame(test_dic)

# Save the test dataset to tsv file for PyTorch
df_test.to_csv("data/test.tsv", sep="\t", index=False)

In [None]:
# Remove null examples
empty_title = ((df_train['text'].isnull()) | (df_train['text'] == ''))
df_train = df_train[~empty_title]

# Remove the sequences longer than 60 tokens
MAX_LENGTH = 60
df_train = df_train[~(df_train.text.apply(lambda x : len(x)) > MAX_LENGTH)]

# Use 20% of the training dataset
SAMPLE_FRAC = 0.2
df_train = df_train.sample(frac=SAMPLE_FRAC, random_state=9527)

# Remove unused columns and rename the used columns
df_train = df_train.reset_index()
df_train = df_train.loc[:, ['text', 'label']]
df_train.columns = ['text', 'label']

print("Number of examples of the training dataset: ", len(df_train))
df_train.head()

In [None]:
df_train.label.value_counts() / len(df_train)

Test dataset

In [None]:
print("Number of examples of the test dataset: ", len(df_test))
df_test.head()

In [None]:
ratio = len(df_test) / len(df_train)
print(f"Size of test set:  / Size of training set:  = {ratio:.1f}")

2. Convert the text data into BERT compatible format

Three kinds of tensors:

tokens_tensor：index value in the BERT dictionary of every input token

segments_tensor：used to distinguish the two input sequences. The first sequence is 0, the second is 1.

masks_tensor：masks of the self-attention mechnism. 1 for tokens to be attended and 0 for padding.

In [None]:
PRETRAINED_MODEL_NAME = 'bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(PRETRAINED_MODEL_NAME)

In [None]:
"""
Read the dataset and convert the sentences to BERT compatible format.
It returns 2 tensors：
- tokens_tensor：an index sequence of the sentence include [CLS].
- label_tensor：classification label, none for test set
- 0：Negtive sentiment
- 1：Positive sentiment
"""
from torch.utils.data import Dataset
import pysnooper

class SentimentDataset(Dataset):
    
    def __init__(self, mode, tokenizer):
        assert mode in ["train", "test"]
        self.mode = mode
        self.df = pd.read_csv("data/" + mode + ".tsv", sep="\t").fillna("")
        self.len = len(self.df)
        self.label_map = {'0': 'Negtive', '1': 'Positive'}
        self.tokenizer = tokenizer
    
    def __getitem__(self, idx):
        #if self.mode == "test":
        #    text = self.df.iloc[idx, 0]
        #    label_tensor = None
        #else:
        text, label = self.df.iloc[idx, :].values
        label_tensor = torch.tensor(int(label))

        word_pieces = ["[CLS]"]
        tokens = self.tokenizer.tokenize(text)
        word_pieces += tokens
        len_a = len(word_pieces)

        ids = self.tokenizer.convert_tokens_to_ids(word_pieces)
        tokens_tensor = torch.tensor(ids)

        return (tokens_tensor, label_tensor)
        
    def __len__(self):
        return self.len
    
    
# Initialize a training dataset and use BERT to tokenize.
trainset = SentimentDataset("train", tokenizer=tokenizer)

In [None]:
# Choose one sample.
sample_idx = 41

# Original sample in dataset
text, label = trainset.df.iloc[sample_idx].values

# Take one sample from the training dataset
tokens_tensor, label_tensor = trainset[sample_idx]

# Convert the tokens_tensor into text
tokens = tokenizer.convert_ids_to_tokens(tokens_tensor.tolist())
combined_text = " ".join(tokens)

print(f"""[Original text]
Sentence: {text}
Classification: {trainset.label_map[str(label)]}
--------------------
[Returned tensors from dataset]
tokens_tensor  ：{tokens_tensor}
label_tensor   ：{label_tensor}
--------------------
[Converted from tokens_tensors]
{combined_text}
""")

Mini-batches returned from a DataLoader

In [None]:
"""
Reads the SentimentDataset and returns 3 tensors required by BERT
- tokens_tensors  : (batch_size, max_seq_len_in_batch)
- masks_tensors   : (batch_size, max_seq_len_in_batch)
- label_ids       : (batch_size)
"""

from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

def create_mini_batch(samples):
    tokens_tensors = [s[0] for s in samples]
    
    if samples[0][1] is not None:
        label_ids = torch.stack([s[1] for s in samples])
    else:
        label_ids = None
    
    tokens_tensors = pad_sequence(tokens_tensors, 
                                  batch_first=True)

    masks_tensors = torch.zeros(tokens_tensors.shape, 
                                dtype=torch.long)
    masks_tensors = masks_tensors.masked_fill(
        tokens_tensors != 0, 1)
    
    return tokens_tensors, masks_tensors, label_ids

BATCH_SIZE = 16
trainloader = DataLoader(trainset, batch_size=BATCH_SIZE, 
                         collate_fn=create_mini_batch)

Take one batch

In [None]:
data = next(iter(trainloader))

tokens_tensors, masks_tensors, label_ids = data

print(f"""
tokens_tensors.shape   = {tokens_tensors.shape} 
{tokens_tensors}
------------------------
masks_tensors.shape    = {masks_tensors.shape}
{masks_tensors}
------------------------
label_ids.shape        = {label_ids.shape}
{label_ids}
""")

3. Downstream NLP tasks based on pre-trained BERT model

<img src="images/bert_fine_tuning_tasks.jpg">
<caption><center>4 NLP tasks based on a fine-tuned BERT model</center></caption>

In [None]:
from transformers import BertForSequenceClassification

PRETRAINED_MODEL_NAME = 'bert-base-uncased'
NUM_LABELS = 2

model = BertForSequenceClassification.from_pretrained(
    PRETRAINED_MODEL_NAME, num_labels=NUM_LABELS)

print("""
name            module
----------------------""")
for name, module in model.named_children():
    if name == "bert":
        for n, _ in module.named_children():
            print(f"{name}:{n}")
    else:
        print("{:15} {}".format(name, module))

Default parameters of the classifier

In [None]:
model.config

Change the parameter value of the classifier

In [None]:
model.config.hidden_size = 768

In [None]:
model.config.architectures = 'bertForMaskedLM'

For now, there are 8 models and 1 tokenizer on PyTorch Hub.

Basic:
bertModel、
bertTokenizer

Pretrained model:
bertForMaskedLM、
bertForNextSentencePrediction、
bertForPreTraining

Fine-tuning for downstream NLP tasks based on BERT:
bertForSequenceClassification、
bertForTokenClassification、
bertForQuestionAnswering、
bertForMultipleChoice

In [None]:
"""
Please refer to https://leemeng.tw/attack_on_bert_transfer_learning_in_nlp.html
"""

def get_predictions(model, dataloader, compute_acc=False):
    predictions = None
    correct = 0
    total = 0
      
    with torch.no_grad():

        for data in dataloader:
            # Move tensors to GPU
            if next(model.parameters()).is_cuda:
                data = [t.to("cuda:0") for t in data if t is not None]

            # First 2 tensors are token tensor and mask tensor
            tokens_tensors, masks_tensors = data[:2]
            outputs = model(input_ids=tokens_tensors, attention_mask=masks_tensors)
            
            logits = outputs[0]
            _, pred = torch.max(logits.data, 1)
            
            # Compute accuracy
            if compute_acc:
                labels = data[2]
                total += labels.size(0)
                correct += (pred == labels).sum().item()
                
            # Record prediction results
            if predictions is None:
                predictions = pred
            else:
                predictions = torch.cat((predictions, pred))
    
    if compute_acc:
        acc = correct / total
        return predictions, acc
    return predictions
    
# Running the model on GPU if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("device:", device)
model = model.to(device)
#_, acc = get_predictions(model, trainloader, compute_acc=True)
#print("classification acc:", acc)

Get total learnable parameters

In [None]:
def get_learnable_params(module):
    return [p for p in module.parameters() if p.requires_grad]
     
model_params = get_learnable_params(model)
clf_params = get_learnable_params(model.classifier)

print(f"""
Total parameters of the whole model: {sum(p.numel() for p in model_params)}
Total parameters of the linear classifier: {sum(p.numel() for p in clf_params)}
""")

4. Training and fine tuning for your downstream NLP tasks

In [None]:
%%time

# Training mode
model.train()

# Use Adam Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)


EPOCHS = 30
for epoch in range(EPOCHS):
    
    running_loss = 0.0
    for data in trainloader:
        
        tokens_tensors, masks_tensors, labels = [t.to(device) for t in data]

        # Initialize the gradient
        optimizer.zero_grad()
        
        # forward pass
        outputs = model(input_ids=tokens_tensors, attention_mask=masks_tensors, labels=labels)

        loss = outputs[0]
        # backward
        loss.backward()
        optimizer.step()


        # Record batch loss
        running_loss += loss.item()
        
    # Calculate accuracy
    _, acc = get_predictions(model, trainloader, compute_acc=True)

    print('[epoch %d] loss: %.3f, acc: %.3f' %
          (epoch + 1, running_loss, acc))

Save the trained model

In [None]:
torch.save(model, 'BERT_sentiment_analysis.pkl')

Load the trained model

In [None]:
model = torch.load('BERT_sentiment_analysis.pkl', map_location = torch.device(device))

5. Inference for new data

In [None]:
testset = SentimentDataset("test", tokenizer=tokenizer)
sample_idx = 10

# Original data in test dataset
text, label = testset.df.iloc[sample_idx].values
test_tokens_tensor, _ = testset[sample_idx]
tokens = tokenizer.convert_ids_to_tokens(test_tokens_tensor.tolist())
combined_text = " ".join(tokens)
print(f"""[Original data]
Sentence: {text}
Label: {testset.label_map[str(label)]}
--------------------
[Returned tensors by dataset]
tokens_tensor  ：{tokens_tensor}
--------------------
[Converted tokens_tensors]
{combined_text}
""")

In [None]:
#%%time
# Test dataset and data loader
testset = SentimentDataset("test", tokenizer=tokenizer)
testloader = DataLoader(testset, batch_size=16, 
                        collate_fn=create_mini_batch)

# Prediction
#predictions, acc = get_predictions(model, testloader, compute_acc = True)

index_map = {int(k): v for k, v in testset.label_map.items()}

#df = pd.DataFrame({"Category": predictions.tolist()})
#df['Category'] = df.Category.apply(lambda x: index_map[x])
#df_pred = pd.concat([testset.df.loc[:, ["label"]], 
#                          df.loc[:, 'Category']], axis=1)

#print(f'Accuracy: {acc:.3f}')

#df_pred.to_csv('data/bert_sentiment_predict.csv', index=False)

In [None]:
#df_pred.head(10)

Sentiment prediction per sentence

In [None]:
class SentimentTestData(Dataset):
    
    def __init__(self, text_in, tokenizer):
        self.label_map = {'0': 'Negtive', '1': 'Positive'}
        self.tokenizer = tokenizer
        self.df = pd.DataFrame({"text": text_in})
        self.len = len(self.df)

    def __getitem__(self, idx):

        text = self.df.iloc[idx, 0]
        label_tensor = None
        
        word_pieces = ["[CLS]"]
        tokens = self.tokenizer.tokenize(text)
        word_pieces += tokens

        ids = self.tokenizer.convert_tokens_to_ids(word_pieces)
        tokens_tensor = torch.tensor(ids)

        return (tokens_tensor, label_tensor)
    
    def __len__(self):
        return self.len

Input a sentence

In [None]:
text = input()

In [None]:
text_in = [text]
test_data = SentimentTestData(text_in, tokenizer=tokenizer)
test_data_loader = DataLoader(test_data, batch_size = 1, collate_fn=create_mini_batch)
predictions = get_predictions(model, test_data_loader)
print(f"The predicted sentiment is {index_map[predictions.tolist()[0]]}.")