In [19]:
import numpy as np
from collections import defaultdict


class CRF:
    def __init__(self, tags, feature_templates):
        """
        :param feature_templates: 特征模板列表
        :param tags: 标签列表
        """
        self.tags = tags
        self.feature_templates = feature_templates
        self.tag_to_idx = {tag: i for i, tag in enumerate(tags)}
        self.idx_to_tag = {i: tag for tag, i in self.tag_to_idx.items()}

        # 模型参数
        self.weights = defaultdict(float)  # 特征权重
        self.transition = np.random.randn(len(tags), len(tags)) * 0.01  # 转移矩阵

    def legal_transition(self, i, j):
        i_tag = self.idx_to_tag[i]
        j_tag = self.idx_to_tag[j]
        if i_tag.startswith("B-") and not (j_tag.startswith("M-") or j_tag.startswith("E-")):
            return False
        if i_tag.startswith("M-") and not (j_tag.startswith("M-") or j_tag.startswith("E-")):
            return False
        if i_tag.startswith("E-") and (j_tag.startswith("M-") or j_tag.startswith("E-")):
            return False
        return True

    @staticmethod
    def extract_features_helper(sentence, position, template):
        """提取特征模板中的偏移量"""
        context = []  # U08:%x[0,0]/%x[1,0]
        parts = template.split(":")[1].split("/")
        for part in parts:
            offset = int(part[3:-1].split(",")[0])
            idx = position + offset
            if idx < 0:
                context.append("<BEG>")
            elif idx >= len(sentence):
                context.append("<END>")
            else:
                context.append(sentence[idx])
        return context

    def extract_features(self, sentence, position, prev_tag, cur_tag):
        """根据特征模板提取特征"""
        features = []
        for template in self.feature_templates:
            # 处理Unigram特征
            if template.startswith("U"):
                context = self.extract_features_helper(sentence, position, template)
                features.append(f"{cur_tag}::{template}:{'/'.join(context)}")

            # 处理Bigram特征
            elif template.startswith("B"):
                if prev_tag is not None:
                    context = self.extract_features_helper(sentence, position, template)
                    features.append(f"{prev_tag}→{cur_tag}::{template}:{'/'.join(context)}")
        return features

    def forward_backward(self, sentence):
        """前向后向算法"""
        T = len(sentence)
        N = len(self.tags)

        # 初始化
        alpha = np.zeros((T, N))
        beta = np.zeros((T, N))

        # 前向算法
        # alpha_t(j)=\sum_{i=1}^N alpha_{t-1}(i) * T(i,j) * E_t(j)
        for t in range(T):
            for j in range(N):
                if t == 0:
                    features = self.extract_features(sentence, t, None, self.idx_to_tag[j])
                    alpha[t][j] = sum(self.weights[f] for f in features)
                else:
                    log_probs = []
                    for i in range(N):
                        trans = self.transition[i][j]  # T(i,j)
                        features = self.extract_features(sentence, t, self.idx_to_tag[i], self.idx_to_tag[j])
                        emit = sum(self.weights[f] for f in features)  # E_t(j)
                        log_probs.append(alpha[t - 1][i] + trans + emit)
                    alpha[t][j] = np.logaddexp.reduce(log_probs) if log_probs else -np.inf

        # 后向算法
        # beta_t(i)=\sum_{j=1}^N T(i,j) * E_{t+1}(j) * beta_{t+1}(j)
        for t in reversed(range(T)):
            for i in range(N):
                if t == T - 1:
                    beta[t][i] = 0
                else:
                    log_probs = []
                    for j in range(N):
                        trans = self.transition[i][j]  # T(i,j)
                        features = self.extract_features(sentence, t + 1, self.idx_to_tag[i], self.idx_to_tag[j])
                        emit = sum(self.weights[f] for f in features)  # E_{t+1}(j)
                        log_probs.append(trans + emit + beta[t + 1][j])
                    beta[t][i] = np.logaddexp.reduce(log_probs) if log_probs else -np.inf

        # 配分函数
        # Z=\sum_{j=1}^N alpha_T(j)
        log_Z = np.log(sum(np.exp(alpha[-1]))) if any(np.isfinite(alpha[-1])) else -np.inf
        return alpha, beta, log_Z

    def compute_gradient(self, sentence, true_tags):
        """计算梯度"""
        # 提取真实路径特征
        true_features = set()
        for t in range(len(sentence)):
            prev_tag = true_tags[t - 1] if t > 0 else None
            features = self.extract_features(sentence, t, prev_tag, true_tags[t])
            true_features.update(features)

        # 前向后向算法
        alpha, beta, log_Z = self.forward_backward(sentence)
        expected_features = defaultdict(float)

        # 计算特征期望
        # P(y_0=j|x)            = alpha_0(j) * E_0(j) / Z
        # P(y_{t-1}=i, y_t=j|x) = alpha_{t-1}(i) * T(i,j) * E_t(j) * beta_t(j) / Z
        for t in range(len(sentence)):
            for i in range(len(self.tags)):
                for j in range(len(self.tags)):
                    # 提取特征
                    cur_tag = self.idx_to_tag[j]
                    prev_tag = self.idx_to_tag[i] if t > 0 else None
                    features = self.extract_features(sentence, t, prev_tag, cur_tag)

                    # 计算概率
                    if t == 0:
                        prob = np.exp(alpha[t][j] + beta[t][j] - log_Z) if log_Z != -np.inf else 0
                    else:
                        trans_score = self.transition[i][j]
                        emit_score = sum(self.weights[f] for f in features)
                        prob = np.exp(alpha[t - 1][i] + trans_score + emit_score + beta[t][j] - log_Z) if log_Z != -np.inf else 0

                    # 累加特征期望
                    for f in features:
                        expected_features[f] += prob

        # 计算权重梯度
        # weight_grad[f] = true - expected
        weight_grad = defaultdict(float)
        for f in true_features:
            weight_grad[f] += 1
        for f in expected_features:
            weight_grad[f] -= expected_features[f]

        # 计算转移矩阵梯度
        # transition_grad[f] = true - expected
        transition_grad = np.zeros_like(self.transition)
        for t in range(1, len(sentence)):
            i = self.tag_to_idx[true_tags[t - 1]]
            j = self.tag_to_idx[true_tags[t]]
            transition_grad[i][j] += 1

            for i_model in range(len(self.tags)):
                for j_model in range(len(self.tags)):
                    features = self.extract_features(sentence, t, self.idx_to_tag[i_model], self.idx_to_tag[j_model])
                    prob = np.exp(alpha[t - 1][i_model] + self.transition[i_model][j_model] + sum(self.weights[f] for f in features) + beta[t][j_model] - log_Z) if log_Z != -np.inf else 0
                    transition_grad[i_model][j_model] -= prob

        return weight_grad, transition_grad, log_Z

    def train(self, sentences, true_tags_seq, batch_size=32, max_iter=10, learning_rate=0.1):
        for iteration in range(max_iter):
            total_loss = 0
            batch_indices = range(0, len(sentences), batch_size)

            for start_idx in batch_indices:
                end_idx = start_idx + batch_size
                batch_sentences = sentences[start_idx:end_idx]
                batch_tags = true_tags_seq[start_idx:end_idx]

                # 初始化累积变量
                batch_weights_grad = defaultdict(float)
                batch_transition_grad = np.zeros_like(self.transition)
                batch_loss = 0.0

                # 计算批次内所有样本的梯度
                for sentence, tags in zip(batch_sentences, batch_tags):
                    # 计算单个样本的梯度
                    weights_grad, transition_grad, log_Z = self.compute_gradient(sentence, tags)

                    # 累积权重梯度
                    for f in weights_grad:
                        batch_weights_grad[f] += weights_grad[f]

                    # 累积转移矩阵梯度
                    batch_transition_grad += transition_grad

                    # 计算单个样本的损失
                    true_score = self._compute_single_score(sentence, tags)
                    batch_loss += log_Z - true_score

                # 计算批次平均梯度
                batch_size_actual = len(batch_sentences)
                for f in batch_weights_grad:
                    batch_weights_grad[f] /= batch_size_actual
                batch_transition_grad /= batch_size_actual
                batch_loss /= batch_size_actual

                # 使用平均梯度更新参数
                for f in batch_weights_grad:
                    self.weights[f] += learning_rate * batch_weights_grad[f]

                for i in range(len(self.tags)):
                    for j in range(len(self.tags)):
                        if self.legal_transition(i, j):
                            self.transition[i][j] += learning_rate * batch_transition_grad[i][j]

                total_loss += batch_loss * batch_size_actual

            print(f"迭代次数 {iteration}, Loss={total_loss/len(sentences):.2f}")

    def _compute_single_score(self, sentence, tags):
        """计算单个样本的真实路径得分"""
        score = 0
        for t in range(len(sentence)):
            prev_tag = tags[t - 1] if t > 0 else None
            features = self.extract_features(sentence, t, prev_tag, tags[t])
            score += sum(self.weights[f] for f in features)
            if t > 0:
                i = self.tag_to_idx[tags[t - 1]]
                j = self.tag_to_idx[tags[t]]
                score += self.transition[i][j]
        return score

    def viterbi_decode(self, sentence):
        T, N = len(sentence), len(self.tags)
        viterbi = np.full((T, N), -np.inf)  # 初始为负无穷
        backpointers = np.zeros((T, N), dtype=int)

        # 初始步：开头仅允许B-,S-,O
        for j in range(N):
            tag = self.idx_to_tag[j]
            if tag.startswith(("B-", "S-")) or tag == "O":
                features = self.extract_features(sentence, 0, None, tag)
                viterbi[0][j] = sum(self.weights[f] for f in features)

        # 递推步：仅允许合法转移
        for t in range(1, T):
            for j in range(N):
                max_score = -np.inf
                best_i = -1
                for i in range(N):
                    if not self.legal_transition(i, j):  # 跳过非法转移
                        continue
                    score = viterbi[t - 1][i] + self.transition[i][j]
                    features = self.extract_features(sentence, t, self.idx_to_tag[i], self.idx_to_tag[j])
                    score += sum(self.weights[f] for f in features)
                    if score > max_score:
                        max_score = score
                        best_i = i
                if best_i != -1:  # 确保存在合法前驱
                    viterbi[t][j] = max_score
                    backpointers[t][j] = best_i

        # 回溯
        best_path = [np.argmax(viterbi[-1])]
        for t in reversed(range(1, T)):
            best_path.append(backpointers[t][best_path[-1]])
        best_path.reverse()

        return [self.idx_to_tag[i] for i in best_path]

In [None]:
# 读取训练集
def train_dataset(train_file):
    train_sentences, train_tags = [], []
    with open(train_file, "r", encoding="utf-8") as f:
        cur_sentence = []
        for line in f:
            line = line.strip()
            if not line:
                if cur_sentence:
                    train_sentences.append([word for word, _ in cur_sentence])
                    train_tags.append([tag for _, tag in cur_sentence])
                    cur_sentence = []
            else:
                parts = line.split()
                cur_sentence.append((parts[0], parts[1]))

    return train_sentences, train_tags

In [None]:
tags = ["O", "B-NAME", "M-NAME", "E-NAME", "S-NAME", "B-CONT", "M-CONT", "E-CONT", "S-CONT", "B-EDU", "M-EDU", "E-EDU", "S-EDU", "B-TITLE", "M-TITLE", "E-TITLE", "S-TITLE", "B-ORG", "M-ORG", "E-ORG", "S-ORG", "B-RACE", "M-RACE", "E-RACE", "S-RACE", "B-PRO", "M-PRO", "E-PRO", "S-PRO", "B-LOC", "M-LOC", "E-LOC", "S-LOC"]
feature_templates = [
    "U01:%x[-1,0]",  # 前一个词
    "U02:%x[0,0]",  # 当前词
    "U03:%x[1,0]",  # 后一个词
    "U06:%x[-1,0]/%x[0,0]",  # 前词+当前词
    "U08:%x[0,0]/%x[1,0]",  # 当前词+后词
    "B06:%x[0,0]",
]
crf = CRF(tags, feature_templates)

# 读取训练集
train_sentences, train_tags = train_dataset("train_cut.txt")

In [None]:
crf.train(
    train_sentences,
    train_tags,
    batch_size=8,
    max_iter=10,
    learning_rate=0.1,
)

迭代次数 0, Loss=21.10
迭代次数 1, Loss=18.86
迭代次数 2, Loss=16.98
迭代次数 3, Loss=15.29
迭代次数 4, Loss=14.28
迭代次数 5, Loss=23.15
迭代次数 6, Loss=73.81
迭代次数 7, Loss=41.27
迭代次数 8, Loss=30.94
迭代次数 9, Loss=42.33


In [33]:
test_sentence_str = "张建民是上海银行的总裁和工程师"
test_sentence = list(test_sentence_str)
predicted_tags = crf.viterbi_decode(test_sentence)
print("预测结果:", list(zip(test_sentence, predicted_tags)))

预测结果: [('张', 'B-NAME'), ('建', 'M-NAME'), ('民', 'E-NAME'), ('是', 'O'), ('上', 'B-ORG'), ('海', 'M-ORG'), ('银', 'M-ORG'), ('行', 'E-ORG'), ('的', 'B-TITLE'), ('总', 'M-TITLE'), ('裁', 'E-TITLE'), ('和', 'O'), ('工', 'B-TITLE'), ('程', 'M-TITLE'), ('师', 'E-TITLE')]
