[TOC]

# 隐马尔科夫链求解词性标注

本次实验使用了HMM、Transformer、CRF三种模型对中文词性标注进行了实验

# HMM

In [2]:
# -*- coding:utf-8 -*-
# @FileName : hmm.py
# @Time : 2024/3/26 20:16
# @Author : fiv
from collections import defaultdict
from pathlib import Path
import os


# 隐马尔科夫链求解词性标注

# pi[q] = 词性q出现所有句子开头的次数 / 所有句子的数量
# trans[q1][q2] = 词性q1后面跟着词性q2的次数 / 词性q1出现的次数
# emit[q][v] = 词性q发射出词v的次数 / 词性q出现的次数

class HMM:
    def __init__(self, corpus_path):
        # self.vocabs, self.classes = self.get_corpus(corpus_path)
        self.corpus_path = corpus_path
        self.line_cnt = 0
        self.states = ['Ag', 'a', 'ad', 'an', 'Bg', 'b', 'c', 'Dg', 'd', 'e', 'f', 'h', 'i', 'j', 'k', 'l', 'Mg', 'm',
                       'Ng', 'n', 'nr', 'ns', 'nt', 'nx', 'nz', 'o', 'p', 'q', 'Rg', 'r', 's', 'na', 'Tg', 't', 'u',
                       'Vg', 'v', 'vd', 'vn', 'vvn', 'w', 'Yg', 'y', 'z']
        self.pi = {state: 0.0 for state in self.states}  # 初始状态概率
        self.trans = {state: {state: 0.0 for state in self.states} for state in self.states}  # 状态转移概率
        self.emit = {state: {} for state in self.states}  # 发射概率
        self.class_cnt = {state: 0 for state in self.states}

        self.train()

    def train(self):
        with open(self.corpus_path, "r", encoding="utf-8") as f:
            lines = f.readlines()
            lines = [line.strip() for line in lines if line.strip()]
            self.line_cnt = len(lines)
            for line in lines:
                vocabs, classes = [], []
                words = line.split(" ")
                for word in words:
                    word = word.strip()
                    if '/' not in word:
                        continue
                    pos = word.index("/")
                    if '[' in word and ']' in word:
                        vocabs.append(word[1:pos])
                        classes.append(word[pos + 1:-1])
                        break
                    if '[' in word:
                        vocabs.append(word[1:pos])
                        classes.append(word[pos + 1:])
                        break
                    if ']' in word:
                        vocabs.append(word[:pos])
                        classes.append(word[pos + 1:-1])
                        break
                    vocabs.append(word[:pos])
                    classes.append(word[pos + 1:])

                assert len(vocabs) == len(classes)
                self.pi[classes[0]] += 1
                for v, c in zip(vocabs, classes):
                    self.class_cnt[c] += 1
                    if v in self.emit[c]:
                        self.emit[c][v] += 1
                    else:
                        self.emit[c][v] = 1
                for (c1, c2) in zip(classes[:-1], classes[1:]):
                    self.trans[c1][c2] += 1

        self.to_prob()

    def to_prob(self):
        for state in self.states:
            self.pi[state] = self.pi[state] / self.line_cnt
            for e in self.emit[state]:
                self.emit[state][e] = self.emit[state][e] / self.class_cnt[state]
            for t in self.trans[state]:
                self.trans[state][t] = self.trans[state][t] / self.class_cnt[state]

    def viterbi(self, sentence):
        # 初始化
        V = [{}]
        path = {}

        for y in self.states:
            V[0][y] = self.pi[y] * self.emit[y].get(sentence[0], 0)
            path[y] = [y]

        # 递推
        for t in range(1, len(sentence)):
            V.append({})
            newpath = {}

            for y in self.states:
                (prob, state) = max(
                    (V[t - 1][y0] * self.trans[y0].get(y, 0) * self.emit[y].get(sentence[t], 0), y0) for y0 in
                    self.states)
                V[t][y] = prob
                newpath[y] = path[state] + [y]
            path = newpath

        # 终止
        (prob, state) = max((V[len(sentence) - 1][y], y) for y in self.states)
        return prob, path[state]


if __name__ == "__main__":
    hmm = HMM("../../data/corpus.txt")
    test_strs = ["今天 天气 特别 好", "欢迎 大家 的 到来", "请 大家 喝茶", "你 的 名字 是 什么"]
    for s in test_strs:
        ss = s.split(" ")
        p, o = hmm.viterbi(ss)
        print(list(zip(ss, o)))

[('今天', 't'), ('天气', 'n'), ('特别', 'd'), ('好', 'a')]
[('欢迎', 'v'), ('大家', 'r'), ('的', 'u'), ('到来', 'vn')]
[('请', 'v'), ('大家', 'r'), ('喝茶', 'v')]
[('你', 'r'), ('的', 'u'), ('名字', 'n'), ('是', 'v'), ('什么', 'r')]


# Transformer model

In [None]:
# -*- coding:utf-8 -*-
# @FileName : model.py
# @Time : 2024/3/20 17:48
# @Author : fiv
import math
from typing import Callable
from typing import Union

import torch
import torch.nn as nn
from torch import Tensor
from torch.nn import functional as F, TransformerEncoderLayer, TransformerEncoder
from torch.nn.modules.normalization import LayerNorm


class Transformer(nn.Module):
    def __init__(self, vocab_size=512, pos_tag_size=32, max_length=128, d_model=512, nhead: int = 8,
                 num_encoder_layers: int = 6, dim_feedforward: int = 2048, dropout: float = 0.1,
                 activation: Union[str, Callable[[Tensor], Tensor]] = F.relu,
                 layer_norm_eps: float = 1e-5, batch_first: bool = True, norm_first: bool = False,
                 bias: bool = True, device=None, dtype=None):
        super(Transformer, self).__init__()
        factory_kwargs = {'device': device, 'dtype': dtype}
        self.d_model = d_model
        encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout,
                                                activation, layer_norm_eps, batch_first, norm_first,
                                                bias, **factory_kwargs)
        encoder_norm = LayerNorm(d_model, eps=layer_norm_eps, bias=bias, **factory_kwargs)
        self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm)

        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        self.linear = nn.Linear(d_model, pos_tag_size)

        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.linear.bias.data.zero_()
        self.linear.weight.data.uniform_(-initrange, initrange)

    def forward(self, x):
        # print("-----------------")
        # print(x.shape)
        x = self.embedding(x)
        # print(x.shape)
        x = self.pos_encoder(x)
        # print(x.shape)
        x = self.encoder(x)
        # print(x.shape)
        # x = x.view(x.size(0), -1)
        # print(x.shape)
        x = self.linear(x)
        # print(x.shape)
        # print("-----------------")
        return x


class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

# pos dataset

In [None]:
# -*- coding:utf-8 -*-
# @FileName : dataset.py
# @Time : 2024/3/27 9:16
# @Author : fiv
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer


class POSDataset(Dataset):

    def __init__(self, vocabs, labels, max_length):
        self.states = ['NONE', 'Ag', 'a', 'ad', 'an', 'Bg', 'b', 'c', 'Dg', 'd', 'e', 'f', 'h', 'i', 'j', 'k', 'l',
                       'Mg', 'm', 'Ng', 'n', 'nr', 'ns', 'nt', 'nx', 'nz', 'o', 'p', 'q', 'Rg', 'r', 's', 'na', 'Tg',
                       't', 'u', 'Vg', 'v', 'vd', 'vn', 'vvn', 'w', 'Yg', 'y', 'z']
        self.label2idx = {state: idx for idx, state in enumerate(self.states)}
        self.tokenizer = AutoTokenizer.from_pretrained("hfl/chinese-bert-wwm-ext", use_fast=True)
        self.corpus = [self.token_and_align_labels(vocab, label, max_length) for vocab, label in zip(vocabs, labels)]

    def token_and_align_labels(self, tokens, labels, max_length):
        tokens = self.tokenizer(tokens, truncation=True, is_split_into_words=True, padding="max_length",
                                add_special_tokens=False, max_length=max_length)
        word_ids = tokens.word_ids()
        aligned_labels = []
        for wid in word_ids:
            if wid is None:
                aligned_labels.append("NONE")
            else:
                aligned_labels.append(labels[wid])
        # return tokens["input_ids"], aligned_labels
        return torch.tensor(tokens["input_ids"]), torch.tensor([self.label2idx[label] for label in aligned_labels])

    def __len__(self):
        return len(self.corpus)

    def __getitem__(self, idx):
        return self.corpus[idx]

    def tag_size(self):
        return len(self.states)

    def vocab_size(self):
        return self.tokenizer.vocab_size


def get_data(corpus_path):
    vocabs, classes = [], []
    with open(corpus_path, "r", encoding="utf-8") as f:
        lines = f.readlines()
        lines = [line.strip() for line in lines if line.strip()]
        for line in lines:
            vocab, label = [], []
            words = line.split(" ")
            for word in words:
                word = word.strip()
                if '/' not in word:
                    continue
                pos = word.index("/")
                if '[' in word and ']' in word:
                    vocab.append(word[1:pos])
                    label.append(word[pos + 1:-1])
                    break
                if '[' in word:
                    vocab.append(word[1:pos])
                    label.append(word[pos + 1:])
                    break
                if ']' in word:
                    vocab.append(word[:pos])
                    label.append(word[pos + 1:-1])
                    break
                vocab.append(word[:pos])
                label.append(word[pos + 1:])

            assert len(vocab) == len(label)
            vocabs.append(vocab)
            classes.append(label)
    return vocabs, classes


def get_dataloader(corpus_path, max_length=128, batch_size=32):
    from sklearn.model_selection import train_test_split
    vocabs, classes = get_data(corpus_path)

    # print(vocabs[0], classes[0])
    train_vocabs, test_vocabs, train_classes, test_classes = train_test_split(vocabs, classes, test_size=0.2)
    train_dataset = POSDataset(train_vocabs, train_classes, max_length)
    test_dataset = POSDataset(test_vocabs, test_classes, max_length)
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
    return train_dataloader, test_dataloader

# CRF

In [5]:
# -*- coding:utf-8 -*-
# @FileName : crf.py
# @Time : 2024/4/4 13:13
# @Author : fiv


from collections import defaultdict
from pathlib import Path
import os
from sklearn_crfsuite import CRF


# 基于条件随机场的词性标注

class CRF_POS:
    def __init__(self, corpus_path):
        self.corpus_path = corpus_path
        self.line_cnt = 0
        self.states = ['Ag', 'a', 'ad', 'an', 'Bg', 'b', 'c', 'Dg', 'd', 'e', 'f', 'h', 'i', 'j', 'k', 'l', 'Mg', 'm',
                       'Ng', 'n', 'nr', 'ns', 'nt', 'nx', 'nz', 'o', 'p', 'q', 'Rg', 'r', 's', 'na', 'Tg', 't', 'u',
                       'Vg', 'v', 'vd', 'vn', 'vvn', 'w', 'Yg', 'y', 'z']
        self.X = []
        self.y = []
        self.crf = CRF(
            algorithm='lbfgs',
            c1=0.1,
            c2=0.1,
            max_iterations=10,
            all_possible_transitions=True,
            verbose=True
        )
        # * ``'lbfgs'`` - Gradient descent using the L-BFGS method   -->> 梯度下降
        # * ``'l2sgd'`` - Stochastic Gradient Descent with L2 regularization term  -->> 随机梯度下降
        # * ``'ap'`` - Averaged Perceptron  -->> 感知机
        # * ``'pa'`` - Passive Aggressive (PA)  -->> 消极攻击
        # * ``'arow'`` - Adaptive Regularization Of Weight Vector (AROW)  -->> 自适应正则化权重向量
        self.train()

    def train(self):
        with open(self.corpus_path, "r", encoding="utf-8") as f:
            lines = f.readlines()
            lines = [line.strip() for line in lines if line.strip()]
            self.line_cnt = len(lines)
            for line in lines:
                vocabs, classes = [], []
                words = line.split(" ")
                for word in words:
                    word = word.strip()
                    if '/' not in word:
                        continue
                    pos = word.index("/")
                    if '[' in word and ']' in word:
                        vocabs.append(word[1:pos])
                        classes.append(word[pos + 1:-1])
                        break
                    if '[' in word:
                        vocabs.append(word[1:pos])
                        classes.append(word[pos + 1:])
                        break
                    if ']' in word:
                        vocabs.append(word[:pos])
                        classes.append(word[pos + 1:-1])
                        break
                    vocabs.append(word[:pos])
                    classes.append(word[pos + 1:])
                assert len(vocabs) == len(classes)
                self.X.append(vocabs)
                self.y.append(classes)
        self.crf.fit(self.X, self.y)

    def predict(self, sentence):
        vocabs = sentence.split(" ")
        return self.crf.predict([vocabs])

In [6]:
crf = CRF_POS("../../data/corpus.txt")

loading training data to CRFsuite: 100%|██████████| 19484/19484 [00:00<00:00, 20752.21it/s]



Feature generation
type: CRF1d
feature.minfreq: 0.000000
feature.possible_states: 0
feature.possible_transitions: 1
0....1....2....3....4....5....6....7....8....9....10
Number of features: 22432
Seconds required: 0.271

L-BFGS optimization
c1: 0.100000
c2: 0.100000
num_memories: 6
max_iterations: 10
epsilon: 0.000010
stop: 10
delta: 0.000010
linesearch: MoreThuente
linesearch.max_iterations: 20

Iter 1   time=5.10  loss=2956295.20 active=22181 feature_norm=1.00
Iter 2   time=12.92 loss=1971237.48 active=22178 feature_norm=13.06
Iter 3   time=2.59  loss=1702008.52 active=20991 feature_norm=14.64
Iter 4   time=2.61  loss=1532821.93 active=21818 feature_norm=17.88
Iter 5   time=5.27  loss=1341643.23 active=21740 feature_norm=26.51
Iter 6   time=2.62  loss=1232498.37 active=22048 feature_norm=28.99
Iter 7   time=2.58  loss=1185865.87 active=22232 feature_norm=30.61
Iter 8   time=2.42  loss=1073502.87 active=21911 feature_norm=37.36
Iter 9   time=2.39  loss=963623.78 active=22015 feature_n

In [7]:
test_strs = ["今天 天气 特别 好", "欢迎 大家 的 到来", "请 大家 喝茶", "你 的 名字 是 什么"]
for test_str in test_strs:
    print(list(zip(test_str.split(" "), crf.predict(test_str)[0])))

[('今天', 't'), ('天气', 't'), ('特别', 'd'), ('好', 'a')]
[('欢迎', 'd'), ('大家', 'a'), ('的', 'u'), ('到来', 'v')]
[('请', 'd'), ('大家', 'a'), ('喝茶', 'u')]
[('你', 'v'), ('的', 'u'), ('名字', 'n'), ('是', 'v'), ('什么', 'r')]
