### 1. 安装依赖库

In [1]:
# ! pip install -upgrade pip
# ! pip install jieba
# ! pip install pandas
# ! pip install numpy
# ! pip install tensorflow==2.1.0
#! pip install scikit-learn
# ! pip uninstall tensorflow
# ! pip install tensorflow-gpu==2.1.0

### 2. 导入依赖库

In [2]:
import jieba
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.sequence import pad_sequences
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, precision_recall_curve, auc, roc_curve, confusion_matrix
import seaborn as sns

### 3. 数据加载

In [3]:
train_data = pd.read_csv('data/train.tsv', sep='\t')
valid_data = pd.read_csv('data/dev.tsv', sep='\t')
test_data = pd.read_csv('data/test.tsv', sep='\t') 
x_train, y_train = train_data.text_a.values, train_data.label.values # 训练集
x_valid, y_valid = valid_data.text_a.values, valid_data.label.values # 验证集
x_test, y_test = test_data.text_a.values, test_data.label.values # 测试集

### 4. 构建词汇表

In [4]:
vocab = set()
cut_docs = train_data.text_a.apply(lambda x: jieba.cut(x)).values #在训练集分词作为词表
for doc in cut_docs: #提出每个文本
    for word in doc: #提出文本内每个词
        if word.strip():
            vocab.add(word.strip())

# 将词表写入本地vocab.txt文件
with open('data/vocab.txt', 'w') as file:
    for word in  vocab:
        file.write(word)
        file.write('\n')

Building prefix dict from the default dictionary ...
Loading model from cache C:\Users\SMARTM~1\AppData\Local\Temp\jieba.cache
Loading model cost 0.653 seconds.
Prefix dict has been built successfully.


### 5. 定义配置参数

In [5]:
class Config():
    embedding_dim = 300 # 词向量维度
    max_seq_len = 200 # 文章最大词数
    vocab_file = 'data/vocab.txt' # 词汇表文件路径
config = Config()

### 6. 定义预处理类

In [6]:
class Preprocessor():
    def __init__(self, config):
        self.config = config #config：词表文件路径，最大词数，词向量维度
        # 初始化词和id的映射词典，预留0给padding字符，1给词表中未见过的词
        token2idx = {"[PAD]": 0, "[UNK]": 1} # {word：id}
        with open(config.vocab_file, 'r') as reader:
            for index, line in enumerate(reader):
                token = line.strip()
                token2idx[token] = index+2  #一个词赋值一个num字典
        self.token2idx = token2idx #词典映射
        
    def transform(self, text_list):
        # 文本分词，并将词转换成相应的id, 最后不同长度的文本padding长统一长度，后面补0
        idx_list = [[self.token2idx.get(word.strip(), self.token2idx['[UNK]']) for word in jieba.cut(text)] for text in text_list]
        idx_padding = pad_sequences(idx_list, self.config.max_seq_len, padding='post') #后面padding
        return idx_padding

### 7. 定义模型类

In [7]:
class TextCNN(object):
    def __init__(self, config,kernel_sizes=[2,3,5],pooling_type="global_max_pooling",k=3):
        #配置预处理器
        self.config = config
        self.preprocessor = Preprocessor(config)
        self.class_name = {0: '负面', 1: '正面'}
        self.kernel_sizes = kernel_sizes#卷积核大小
        self.pooling_type = pooling_type
        self.k=k

    def build_model(self):
        # 模型架构搭建
        idx_input = tf.keras.layers.Input((self.config.max_seq_len,))
        #input_embedding : 输入: 2D 张量，尺寸为 (batch_size, input_length) 的张量。
        #self.preprocessor.token2idx：词表大小,表示嵌入层可以嵌入的词数
        #输出：3D 张量，尺寸为 (batch_size, input_length, output_dim)
        input_embedding = tf.keras.layers.Embedding(len(self.preprocessor.token2idx),
                    self.config.embedding_dim, #输出维度
                    input_length=self.config.max_seq_len, #输入维度
                    mask_zero=True)(idx_input)  #忽略零值，避免影响嵌入向量语义
        convs = []
        if self.pooling_type == "global_max_pooling":
            for kernel_size in self.kernel_sizes:
                c = tf.keras.layers.Conv1D(128, kernel_size, activation='relu')(input_embedding)
                c = tf.keras.layers.GlobalMaxPooling1D()(c)  #不同池化策略是否有用?
                convs.append(c)
        elif self.pooling_type=="max_pooling":
            for kernel_size in self.kernel_sizes:
                c = tf.keras.layers.Conv1D(128, kernel_size, activation='relu')(input_embedding)
                c = tf.keras.layers.MaxPooling1D(pool_size=2)(c)  # 使用常规最大池化
                c = tf.keras.layers.Flatten()(c)  # 扁平化处理，以适应后续的全连接层
                convs.append(c)
        elif self.pooling_type=="k-max-pooling":
             for kernel_size in self.kernel_sizes:
                c = tf.keras.layers.Conv1D(128, kernel_size, activation='relu')(input_embedding)
                c = tf.keras.layers.Lambda(lambda x: tf.nn.top_k(tf.transpose(x, [0, 2, 1]), k=self.k, sorted=True)[0])(c)
                c = tf.keras.layers.Flatten()(c)  # 扁平化处理，因为k-max返回三维输出
                convs.append(c)
        else:
            raise "pooling type error!"
        
        fea_cnn = tf.keras.layers.Concatenate()(convs)
        
        fea_cnn_dropout = tf.keras.layers.Dropout(rate=0.4)(fea_cnn)
        
        fea_dense = tf.keras.layers.Dense(128, activation='relu')(fea_cnn_dropout)
        output = tf.keras.layers.Dense(2, activation='softmax')(fea_dense)
        
        model = tf.keras.Model(inputs=idx_input, outputs=[output])
        model.compile(loss='sparse_categorical_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])
        
        model.summary()
        self.model = model
    


    def fit(self, x_train, y_train, x_valid=None, y_valid=None, epochs=5, batch_size=128, callbacks=None, id=None,**kwargs):
        # 训练
        self.build_model()
        x_train = self.preprocessor.transform(x_train)
        valid_data = None #评估矩阵
        if x_valid is not None and y_valid is not None:
            x_valid = self.preprocessor.transform(x_valid)
            valid_data = (x_valid, y_valid)

        # 添加历史记录回调
        if callbacks is None:
            callbacks = []
        history = self.model.fit(
            x=x_train,
            y=y_train,
            validation_data=valid_data,
            batch_size=batch_size,
            epochs=epochs,
            callbacks=callbacks,
            **kwargs
        )
    
        # # 绘制训练和验证的准确率曲线
        # plt.figure(figsize=(12, 5))
        # plt.subplot(1, 2, 1)
        # plt.plot(history.history['accuracy'], label='Train Acc')
        # if valid_data is not None:
        #     plt.plot(history.history['val_accuracy'], label='Validation Acc')
        # plt.title('Model Accuracy')
        # plt.ylabel('Accuracy')
        # plt.xlabel('Epoch')
        # plt.legend()

        # # 绘制训练和验证的损失曲线
        # plt.subplot(1, 2, 2)
        # plt.plot(history.history['loss'], label='Train Loss')
        # if valid_data is not None:
        #     plt.plot(history.history['val_loss'], label='Validation Loss')
        # plt.title('Model Loss')
        # plt.ylabel('Loss')
        # plt.xlabel('Epoch')
        # plt.legend()
        # plt.savefig(f'train_history{id}.png')
        return history
        
    def evaluate(self, x_test, y_test):
        # 评估
        x_test = self.preprocessor.transform(x_test)
        y_pred_probs = self.model.predict(x_test)
        y_pred = np.argmax(y_pred_probs, axis=-1)
        result = classification_report(y_test, y_pred, target_names=['负面', '正面'])
        print(result)

        # # 绘制混淆矩阵
        # cm = confusion_matrix(y_test, y_pred)
        # plt.figure(figsize=(8, 6))
        # sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Negative', 'Possitive'], yticklabels=['Negative', 'Positive'])
        # plt.xlabel('Predicted Label')
        # plt.ylabel('True Label')
        # plt.title('Confusion Matrix')
        # plt.savefig('confusion_matrix.png')

        #  # 计算精确率和召回率
        # precision, recall, _ = precision_recall_curve(y_test, y_pred_probs[:, 1])
        # # 计算PR曲线下的面积
        # pr_auc = auc(recall, precision)

        # # 绘制PR曲线
        # plt.figure()
        # plt.plot(recall, precision, label=f'PR Curve (area = {pr_auc:.2f})')
        # plt.xlabel('Recall')
        # plt.ylabel('Precision')
        # plt.title('Precision-Recall Curve')
        # plt.legend(loc="upper right")
        # plt.savefig("P-R.png")

        # # 计算ROC曲线和AUC
        # fpr, tpr, _ = roc_curve(y_test, y_pred_probs[:, 1])
        # roc_auc = auc(fpr, tpr)

        # # 绘制ROC曲线
        # plt.figure()
        # plt.plot(fpr, tpr, label=f'ROC Curve (area = {roc_auc:.2f})')
        # plt.plot([0, 1], [0, 1], 'r--')  # 添加随机性能参考线
        # plt.xlabel('False Positive Rate')
        # plt.ylabel('True Positive Rate')
        # plt.title('Receiver Operating Characteristic (ROC) Curve')
        # plt.legend(loc="lower right")
        # plt.savefig("ROC.png")
        return result
        
        
    def single_predict(self, text):
        # 预测
        input_idx = self.preprocessor.transform([text])
        predict_prob= self.model.predict(input_idx)[0]
        print(predict_prob)
        predict_label_id = np.argmax(predict_prob)
        
        predict_label_name = self.class_name[predict_label_id]
        predict_label_prob = predict_prob[predict_label_id]
        
        return predict_label_name, predict_label_prob
    def test_predict(self,text):
        # 预测
        input_idx = self.preprocessor.transform([text])
        [predict_prob,feacnn]= self.model.predict(input_idx)
        print(f"feacnn.shape:{feacnn.shape}")
        predict_label_id = np.argmax(predict_prob)
        
        predict_label_name = self.class_name[predict_label_id]
        predict_label_prob = predict_prob[predict_label_id]
        
        return feacnn
    
    def load_model(self, ckpt_file):
        self.build_model()
        self.model.load_weights(ckpt_file)

### 8. 分析k-max-pooling

In [8]:

# 定义early stop早停回调函数
patience = 6
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=patience)


In [None]:
# 初始化模型类，启动训练
#"max_pooling",
pooling_types=["k-max-pooling"]
model=[]
for pooling_type in pooling_types:
    print("pooling type:",pooling_type)
    config = Config()
    for k in [2,3,4,5]:
        textcnn = TextCNN(config,pooling_type=pooling_type,k=k)
        model.append(textcnn)
        textcnn.fit(x_train, y_train, x_valid, y_valid, epochs=50, callbacks=[early_stop]) # 训练
        textcnn.evaluate(x_test, y_test) # 测试集评估

### 9. 错误样本提取与分析

In [37]:
#遍历测试集，找出错误的样本
count=0
#将错误结果整合到dict，以保存到csv中
data=[]
for i in range(len(x_test)):
    text = x_test[i]
    label = {0: '负面', 1: '正面'}[y_test[i]]
    predict_label, predict_prob = textcnn.single_predict(text)
    if label != predict_label:
        count+=1
        print(f'文本: {text}')
        error_dict = {'text':[], 'true_label':[], 'predict_label':[], 'predict_prob':[]}
        error_dict['text'].append(text)
        error_dict['true_label'].append(label)
        error_dict['predict_label'].append(predict_label)
        data.append(error_dict)
        print(f'真实标签: {label}, 预测标签: {predict_label}, 预测概率: {predict_prob}')
        print('-----------------')
print(f'错误样本数: {count}')
error_df = pd.DataFrame(data)
error_df.to_csv('error_samples.csv', index=False)

文本: 这个宾馆比较陈旧了，特价的房间也很一般。总体来说一般
真实标签: 正面, 预测标签: 负面, 预测概率: 0.6363525390625
-----------------
文本: 书一到，即被朋友借走，我是早已看过了的，买它，是因为张爱玲。看过张子静的《我的姊姊张爱玲》，可以看出，张对自己的弟弟也是不甚热情，这是她的性格。但是有一次张子静去找她，那天张爱玲却是很开心的样子。后来张子静推算，彼时张爱玲正与胡兰成相恋。很多人不耻于胡兰成的用情不专，我当然也很反感，但我也想说，胡兰成是给过张爱的欢喜的。张是怎样聪明的女子，若不是爱，她也不会那么痴。属于他们的爱情，让他们自己品尝与承担。胡的文字，确实是不错的，有一种儒雅在里面。不知道是不是那份淡淡的气氛，曾经让张心动——“岁月静好，现世安稳”。
真实标签: 负面, 预测标签: 正面, 预测概率: 0.6333171725273132
-----------------
文本: 没有许多网友评价热度高的问题，第一次要手动安装winxp，主板的blis-硬盘重新设置
真实标签: 正面, 预测标签: 负面, 预测概率: 0.7051104307174683
-----------------
文本: 请问：有些字打错了，我怎么样才可以回去编辑一下啊？
真实标签: 正面, 预测标签: 负面, 预测概率: 0.7619051933288574
-----------------
文本: 风扇确实够响的，尤其是到晚上周围安静下来。风扇频频开启，发热量有些惊人
真实标签: 负面, 预测标签: 正面, 预测概率: 0.6541339755058289
-----------------
文本: 3999的时候抢购的 运气真好 这个价格还有什么号说的 品牌和价格都很不错
真实标签: 负面, 预测标签: 正面, 预测概率: 0.9968169331550598
-----------------
文本: 硬盘到手就发现一个坏块，因为是完美屏，没回京东换新，花了两天在本地换新硬盘，发票都不需要；电池衔接很松，可有1mm间隙；出厂时A、B面贴的保护膜太敷衍，太多气泡，虽然反正要撕掉，但说明厂家态度不严谨。
真实标签: 负面, 预测标签: 正面, 预测概率: 0.5109139084815979
-----------------
文本: