In [42]:
from jiwer import wer
from nltk.translate import bleu_score
from rouge import Rouge
from rouge_score import rouge_scorer
import torch
import math
import sentencepiece as spm

import torch.nn as nn
import torch.optim as optim
import numpy as np
import torch
import re
from torchviz import make_dot
import matplotlib.pyplot as plt
import os
from datetime import datetime
import torch.nn.functional as F

In [23]:
vocab_size = 8000
top_k = 3
max_sentence_length = 20
sp = spm.SentencePieceProcessor()
sp.load("work1.model")

True

In [49]:
mask = torch.tensor([1, 0, 0, 0, 0], dtype=torch.float32)
#取top_k个最大值
def top_k_elements(tensor, k):
    # 对tensor进行排序，返回排序后的值和索引
    sorted_tensor, sorted_indices = torch.sort(tensor, descending=True)

    # 选择前k大的元素及其在原始tensor中的索引
    top_k_values = sorted_tensor[:k]
    top_k_indices = sorted_indices[:k]

    return top_k_values, top_k_indices


#按概率选择位置
def choose_index_based_on_probability(probabilities):
    chosen_index = torch.multinomial(probabilities, 1)[0]
    return chosen_index

#填充向量
def list_to_fixed_length_tensor(input_list, max_length):
    # 将输入列表转换为张量
    input_tensor = torch.tensor(input_list, dtype=torch.float32)
    
    # 如果输入张量的长度小于 max_length，用 0 填充
    if input_tensor.size(0) < max_length:
        padding = torch.zeros(max_length - input_tensor.size(0), dtype=torch.float32)
        fixed_length_tensor = torch.cat((input_tensor, padding))
    # 如果输入张量的长度大于 max_length，进行截断
    else:
        fixed_length_tensor = input_tensor[:max_length]
    
    return fixed_length_tensor

# 多项式分类器模型定义
class MultinomialClassifier(nn.Module):
    
    def __init__(self, input_dim, output_dim):
        super(MultinomialClassifier, self).__init__()
        
        self.linear_probs = nn.Linear(input_dim, output_dim)
       

    def forward(self, x):
       
        
        predicted_probs = torch.softmax(self.linear_probs(x), dim=0)

        result_values, result_indices = top_k_elements(predicted_probs, top_k)
        #print(f"这是原本的概率:{predicted_probs}")
        #print(f"这是tok的位置:{result_indices}")
        #print(f"这是tok的概率:{result_values}"+'\n')
        return result_indices, result_values

In [50]:
def word_error_rate(predictions, references):
    wer = 0
    num_tokens = 0

    for pred, ref in zip(predictions, references):
        pred_words = [str(token) for token in pred]
        ref_words = [str(token) for token in ref]

        errors = 0
        d_matrix = [[0] * (len(ref_words) + 1) for _ in range(len(pred_words) + 1)]

        for i in range(1, len(pred_words) + 1):
            d_matrix[i][0] = i

        for j in range(1, len(ref_words) + 1):
            d_matrix[0][j] = j

        for i in range(1, len(pred_words) + 1):
            for j in range(1, len(ref_words) + 1):
                if pred_words[i - 1] == ref_words[j - 1]:
                    cost = 0
                else:
                    cost = 1

                d_matrix[i][j] = min(d_matrix[i-1][j]+1,
                                     d_matrix[i][j-1]+1,
                                     d_matrix[i-1][j-1]+cost)

        errors = d_matrix[len(pred_words)][len(ref_words)]
        wer += errors
        num_tokens += len(ref_words)

    return wer / num_tokens

def bleu(predictions, references):
    predictions_sentences = [[' '.join([str(token) for token in pred])] for pred in predictions]
    references_sentences = [[[' '.join([str(token) for token in ref])]] for ref in references]

    return bleu_score.corpus_bleu(references_sentences, predictions_sentences)

def rouge(predictions, references):
    scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
    
    total_scores = {'rouge1': 0, 'rouge2': 0, 'rougeL': 0}
    
    for pred, ref in zip(predictions, references):
        pred_sentence = ' '.join([str(token) for token in pred])
        ref_sentence = ' '.join([str(token) for token in ref])

        scores = scorer.score(ref_sentence, pred_sentence)
        
        total_scores['rouge1'] += scores['rouge1'].fmeasure
        total_scores['rouge2'] += scores['rouge2'].fmeasure
        total_scores['rougeL'] += scores['rougeL'].fmeasure

    num_sentences = len(predictions)
    
    return {key: value / num_sentences for key, value in total_scores.items()}

def accuracy(predictions, references):
    correct = 0
    total = 0
    
    for pred, ref in zip(predictions, references):
        total += len(ref)
        correct += sum(p == r for p, r in zip(pred, ref))
    
    return correct / total

def perplexity(predictions, references, num_classes):
    total_loss = 0
    total_num_tokens = 0
    
    for pred, ref in zip(predictions, references):
        num_tokens = len(ref)
        total_num_tokens += num_tokens

        # 将预测值和参考值转换为 PyTorch 张量
        pred_tensor = torch.tensor(pred).view(num_tokens, -1)  # 调整预测张量的维度以匹配目标张量
        ref_tensor = torch.tensor(ref)

        # 计算交叉熵损失
        loss = F.cross_entropy(pred_tensor, ref_tensor, reduction='sum')
        
        total_loss += loss.item()
    
    return torch.exp(total_loss / total_num_tokens).item()

In [26]:
def test(model, data_path,  num_epochs = 1, input_dim = 5):
    
    
    
    all_line = 0
    now_line = 0
    label_list = []
    result_list = []
    with open(data_path,'r',encoding='utf-8') as f:
        lines = f.readlines()
        f.close()
    all_line = len(lines)
    for line in lines:
        #print(line)
        now_line += 1
        if not line:
            continue
        
        # 将句子处理为token id
        token_ids = sp.encode_as_ids(line)
        

        #将token id列表转换为张量
        token_ids_tensor = list_to_fixed_length_tensor(token_ids,max_sentence_length)
        for i in range(max_sentence_length - input_dim):
            data = token_ids_tensor[i:i+input_dim]
            labels = data.clone()

            label_list.append(token_ids[i:i+input_dim].copy())
            result = []
            # 训练模型,每一句话训练num_epochs次
            # num_epochs = 100
            for epoch in range(num_epochs):

                result = []
                result.append(data[0].item())
                mask = torch.tensor([1, 0, 0, 0, 0], dtype=torch.float32)
                for i in range(0, input_dim):
                    mask[i] = 1
                    updated_data = torch.mul(data,mask)
                    
                    x = updated_data
                    
                    predicted_values, predicted_probs = model(x)
                    
                
                    choose_index = choose_index_based_on_probability(predicted_probs)
                    choose_values = predicted_values[choose_index]
                    
                    if i == input_dim-1:
                        break
                    result.append(choose_values.item())
                    
                
                #result = torch.tensor(result, dtype=torch.float32, requires_grad=True)
                result_list.append(result.copy())
                # 反向传播和优化
                # optimizer.zero_grad()
                # loss.backward()
                # optimizer.step()

                #if (epoch+1) % 100 == 0:
                if now_line % 20 == 0:
                    print(f'Line [{now_line}/{all_line}]')
                #print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
    return result_list, label_list

In [27]:
# 要加载的模型文件路径
# 模型、损失函数和优化器初始化
input_dim = 5

output_dim = vocab_size  # 输出vocab_size个预测值,最后选择top_k个较大的值
model = MultinomialClassifier(input_dim, output_dim)

model_file_path = "model/model_27_18.pth"

# 加载模型状态字典
model_state_dict = torch.load(model_file_path)

# 将状态字典加载到模型中
model.load_state_dict(model_state_dict)

<All keys matched successfully>

In [28]:
result_list, label_list = test(model=model, data_path = 'data/数据test.txt',num_epochs = 1, input_dim = 5)

Line [20/6164]
Line [20/6164]
Line [20/6164]
Line [20/6164]
Line [20/6164]
Line [20/6164]
Line [20/6164]
Line [20/6164]
Line [20/6164]
Line [20/6164]
Line [20/6164]
Line [20/6164]
Line [20/6164]
Line [20/6164]
Line [20/6164]
Line [40/6164]
Line [40/6164]
Line [40/6164]
Line [40/6164]
Line [40/6164]
Line [40/6164]
Line [40/6164]
Line [40/6164]
Line [40/6164]
Line [40/6164]
Line [40/6164]
Line [40/6164]
Line [40/6164]
Line [40/6164]
Line [40/6164]
Line [60/6164]
Line [60/6164]
Line [60/6164]
Line [60/6164]
Line [60/6164]
Line [60/6164]
Line [60/6164]
Line [60/6164]
Line [60/6164]
Line [60/6164]
Line [60/6164]
Line [60/6164]
Line [60/6164]
Line [60/6164]
Line [60/6164]
Line [80/6164]
Line [80/6164]
Line [80/6164]
Line [80/6164]
Line [80/6164]
Line [80/6164]
Line [80/6164]
Line [80/6164]
Line [80/6164]
Line [80/6164]
Line [80/6164]
Line [80/6164]
Line [80/6164]
Line [80/6164]
Line [80/6164]
Line [100/6164]
Line [100/6164]
Line [100/6164]
Line [100/6164]
Line [100/6164]
Line [100/6164]
Line

In [35]:
wer = word_error_rate(result_list,label_list)
blu = bleu(result_list,label_list)
rg = rouge(result_list,label_list)

In [51]:
accuracy(result_list,label_list)

0.24803724145802591

In [30]:
print(wer)

1.8841340627267464


In [36]:
print(blu)

0


In [37]:
print(rg)

{'rouge1': 0.13493427991478085, 'rouge2': 0.00019296203592115312, 'rougeL': 0.13493427991478085}
