# CRF算子 (CRF Operator)

This CRF operator is simpler than the implementation on PyTorch (https://pytorch.org/tutorials/beginner/nlp/advanced_tutorial.html?highlight=ner), because I don't use START_TAG and STOP_TAG. So you can focus more on the actual tags you are trying to predict.

* mainly used for NER problem
* In: emission score matrix
    * size of row: count of words in a sentence decided by the length of sentence (no need to consider);
    * size of column: the target entity type sequence (set through tag_size during initialization)
* Out: tuple (best_score, best_path)
    * *best_score* refers to the score calculated according to CRF algorithm
    * *best_path* refers to the predicted types of entity, which is a sequence
* Loss function for training: *loss_nll_crf(emission, tags)*
    * This self-defined loss function (specifically for CRF model) will be called during training process

If you have any troubleshooting using this operator, DO let me know. Thank you!

In [1]:
import torch
import torch.nn as nn
import numpy as np

# torch.manual_seed(3407) is all you need: On the influence of random seeds in DL architectures for computer vision
# Well, I just set 3407 for fun, nothing serious ...
torch.manual_seed(3407)


class CRF(nn.Module):

    def __init__(self, tag_size):
        super(CRF, self).__init__()

        # real size of tags from input
        self.tag_size = tag_size

        # PARAM: transitional matrix - element (i,j) refers to the score transitioning from i to j
        self.transition = nn.Parameter(
            data=torch.tensor(data=np.random.randn(tag_size, tag_size), dtype=torch.float64)
        )

    def _score_real_path(self, emission, tags):
        """
        A capsuled internal logic for forward() API. DO NOT call this function externally!!!
        This function is used for calculating the score of real path given by dataset
        :param emission: emission matrix, size: (seq_len, tag_size)
        :param tags: the given tags in the dataset, size: (seq_len), where tags[i] represents word_i's real tag
        :return: current score of the given real path
        """
        score = torch.zeros(size=[1]) + emission[0, tags[0]]
        for i, cur_emission in enumerate(emission[1:], start=1):
            score = score + cur_emission[tags[i]] + self.transition[tags[i - 1], tags[i]]
        return score

    def _score_all_paths(self, emission):
        """
        A capsuled internal logic for forward() API. DO NOT call this function externally!!!
        This function is used for calculating the TOTAL score of all possible combinations of tags in a given sequence
        :param emission: emission matrix, size: (seq_len, tag_size)
        :return: TOTAL
        """
        pre_score = emission[0]
        for i, cur_emit in enumerate(emission[1:], start=1):
            cur_score = pre_score.unsqueeze(dim=1) + cur_emit + self.transition
            pre_score = torch.log(torch.sum(torch.exp(cur_score), dim=0))
        # then calculate all the elements in pre_score
        pre_score = torch.log(torch.sum(torch.exp(pre_score)))
        return pre_score

    def forward(self, emission):
        """
        Given the emission score of input sequence, find out the best tag sequence (its score & best path)
        Algorithm: viterbi decoding
        :param emission: emission score matrix, size: (seq_len, tag_size)
        :return: (score, tag_seq) - score of the best path, size: (tag_size); and corresponding tag_seq, size: (tag_size)
        """
        # init: The original score is only the score of word0 in emission score
        best_scores = torch.clone(emission[0, :])
        # init: The previous tag index of current index, where the score of the path to current tag is the best
        previous = torch.zeros(size=emission.shape, dtype=torch.int)
        previous[0] = -1

        for i, cur_emit in enumerate(emission[1:], start=1):
            cur_score = best_scores.unsqueeze(dim=1) + cur_emit + self.transition
            best_scores, previous_idx = torch.max(cur_score, dim=0)
            # best_scores can be updated automatically, but max_previous should be preserved
            previous[i] = previous_idx

        # for the final status of best_scores, grab the max score and its current index
        best_score, best_idx = best_scores.max(dim=0)
        best_score = best_score.tolist()
        best_idx = best_idx.tolist()
        # find out the best path from prevoius
        best_path = [best_idx]
        for i in range(previous.shape[0] - 1, 0, -1):
            best_idx = previous[i, best_idx].item()
            best_path.append(best_idx)
        # as a stack, reverse the best_path
        best_path = best_path[::-1]

        return best_score, best_path

    def loss_nll_crf(self, emission, tags):
        """
        Self-defined loss function specifically for CRF. (Negtive Log Likelihood)
        Normally, the loss function should be integrated with other DL semantic encoding layer.
        :param emission: given emission score from
        :param tags: given real tags of the sentence from dataset
        :return: Loss of the CRF = log(sum(exp(all_the_paths))) - score_of_real_path
        """
        return self._score_all_paths(emission) - self._score_real_path(emission, tags)

Experiment

In [2]:
X = torch.randn(size=[8, 16])
X.shape[1]

16

In [3]:
crf_op = CRF(tag_size=X.shape[1])

In [4]:
with torch.no_grad():
    best_score, best_path = crf_op(X)

In [5]:
best_score, best_path

(20.65096295615288, [14, 14, 0, 2, 1, 12, 8, 7])

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim

# build the model
crf_op = CRF(tag_size=X.shape[1])

# make up some toy data (you should use real data during training process)
training_data = [
    (X, [0, 1, 2, 3, 4, 5, 6, 7]),
    (torch.randn(size=[8, 16]), [8, 9, 10, 11, 12, 13, 14, 15]),
]

# create an optimizer
optimizer = optim.SGD(crf_op.parameters(), lr=0.01, weight_decay=1e-4)

# train the model
# normally you would NOT do 300 epochs, This is only toy data!!!
for epoch in range(300):
    for emission, tags in training_data:
        crf_op.zero_grad()
        loss = crf_op.loss_nll_crf(X, tags)
        loss.backward()
        optimizer.step()

# Check predictions after training
with torch.no_grad():
    print(crf_op(training_data[0][0]))

(22.604530311438175, [11, 12, 1, 2, 3, 5, 6, 7])


In [7]:
with torch.no_grad():
    print(crf_op(training_data[1][0]))

(26.06646110200638, [11, 12, 10, 11, 12, 13, 6, 7])
