In [1]:
import os
from multiprocessing import cpu_count  #统计本机cpu数量
import numpy as np
import paddle
import paddle.fluid as fluid

# 数据预处理

In [2]:
#　定义一组公共变量
data_root = "data/"  # 数据集所在目录
data_file = "news_classify_data.txt"  # 原始数据集
train_file = "train.txt"  # 训练集文件
test_file = "test.txt"  # 测试集文件
dict_file = "dict_txt.txt"  # 字典文件(存放字和编码映射关系)

data_file_path = data_root + data_file  # 数据集完整路径
train_file_path = data_root + train_file  # 训练集文件完整路径
test_file_path = data_root + test_file  # 测试集文件完整路径
dict_file_path = data_root + dict_file  # 字典文件完整路径


# 取出样本中所有字，对每个字进行编码，将编码结果存入字典文件
def create_dict():
    dict_set = set()  # 集合，用作去重
    with open(data_file_path, "r", encoding="utf-8") as f:
        for line in f.readlines():  # 遍历每行
            line = line.replace("\n", "")  # 去除换行符
            tmp_list = line.split("_!_")  # 根据分隔符拆分
            title = tmp_list[-1]  # 最后一个字段即为标题
            for word in title:  # 取出每个字
                dict_set.add(word)

    # 遍历集合，取出每个字进行编号
    dict_txt = {}  # 定义字典
    i = 1  # 编码使用的计数器
    for word in dict_set:
        dict_txt[word] = i  # 字-编码 键值对添加到字典
        i += 1

    dict_txt["<unk>"] = i  # 未知字符(在样本中未出现过的字)

    # 将字典内容存入文件
    with open(dict_file_path, "w", encoding="utf-8") as f:
        f.write(str(dict_txt))

    print("生成字典结束.")


# 传入一个句子，将每个字替换为编码值，和标签一起返回
def line_encoding(title, dict_txt, label):
    new_line = ""  # 编码结果
    for word in title:
        if word in dict_txt:  # 在字典中
            code = str(dict_txt[word])  # 取出编码值
        else:  # 不在字典中
            code = str(dict_txt["<unk>"])  # 取未知字符编码值
        new_line = new_line + code + ","  # 追加到字符串后面
    new_line = new_line[:-1]  # 去掉最后一个多余的逗号
    new_line = new_line + "\t" + label + "\n"  # 追加标签值
    return new_line


# 读取原始样本，取出标题部分进行编码，将编码后的划分测试集/训练集
def create_train_test_file():
    # 清空训练集/测试集
    with open(train_file_path, "w") as f:
        pass
    with open(test_file_path, "w") as f:
        pass

    # 读取字典文件
    with open(dict_file_path, "r", encoding="utf-8") as f_dict:
        dict_txt = eval(f_dict.readlines()[0])  # 读取字典文件第一行，生成字典对象

    # 读取原始样本
    with open(data_file_path, "r", encoding="utf-8") as f_data:
        lines = f_data.readlines()

    i = 0
    for line in lines:
        tmp_list = line.replace("\n", "").split("_!_")  # 拆分
        title = tmp_list[3]  # 标题
        label = tmp_list[1]  # 类别
        new_line = line_encoding(title, dict_txt, label)  # 对标题编码

        if i % 10 == 0:  # 写入测试集
            with open(test_file_path, "a", encoding="utf-8") as f:
                f.write(new_line)
        else:  # 写入训练集
            with open(train_file_path, "a", encoding="utf-8") as f:
                f.write(new_line)
        i += 1
    print("生成训练集/测试集结束.")


create_dict()  # 根据样本生成字典
create_train_test_file()

生成字典结束.
生成训练集/测试集结束.


# 模型定义与训练

In [None]:
# 读取字典文件，返回字典长度（生成词向量时使用）
def get_dict_len(dict_path):
    with open(dict_path, "r", encoding="utf-8") as f:
        dict_txt = eval(f.readlines()[0])
    return len(dict_txt.keys())


def data_mapper(sample):
    data, label = sample  # 赋值到变量
    val = [int(w) for w in data.split(",")]  # 将编码值转换位数字(从文件读取为字符串)
    return val, int(label)


def train_reader(train_file_path):  # 训练集读取器
    def reader():
        with open(train_file_path, "r") as f:
            lines = f.readlines()
            np.random.shuffle(lines)  # 随机化处理
            for line in lines:
                data, label = line.split("\t")  # 拆分
                yield data, label

    return paddle.reader.xmap_readers(data_mapper, reader, cpu_count(), 1024)


def test_reader(test_file_path):  # 训练集读取器
    def reader():
        with open(test_file_path, "r") as f:
            lines = f.readlines()

            for line in lines:
                data, label = line.split("\t")  # 拆分
                yield data, label

    return paddle.reader.xmap_readers(data_mapper, reader, cpu_count(), 1024)


# 定义网络
def Text_CNN(data, dict_dim, class_dim=10, emb_dim=128,
             hid_dim=128, hid_dim2=128):
    """
    定义TextCNN模型
    :param data:　输入
    :param dict_dim:　词典大小(词语总的数量)
    :param class_dim:　分类的数量
    :param emb_dim: 词嵌入长度
    :param hid_dim:　第一个卷基层卷积核数量
    :param hid_dim2:　第二个卷基层卷积核数量
    :return:　模型预测结果
    """
    # embedding层
    emb = fluid.layers.embedding(input=data, size=[dict_dim, emb_dim])
    # 并列两个卷积/池化层
    conv1 = fluid.nets.sequence_conv_pool(input=emb,  # 输入(词嵌入层输出)
                                          num_filters=hid_dim,  #　卷积核数量
                                          filter_size=3,  #卷积核大小
                                          act="tanh",  #激活函数
                                          pool_type="sqrt")  #池化类型
    conv2 = fluid.nets.sequence_conv_pool(input=emb,  # 输入(词嵌入层输出)
                                          num_filters=hid_dim2,  #　卷积核数量
                                          filter_size=4,  #卷积核大小
                                          act="tanh",  #激活函数
                                          pool_type="sqrt")  #池化类型
    # fc
    output = fluid.layers.fc(input=[conv1, conv2],  # 输入
                             size=class_dim,  #输出值个数
                             act="softmax")  #激活函数
    return output


# 定义占位符张量
words = fluid.layers.data(name="words",
                          shape=[1],
                          dtype="int64",
                          lod_level=1)  # LOD张量用来表示变长数据
label = fluid.layers.data(name="label",
                          shape=[1],
                          dtype="int64")
dict_dim = get_dict_len(dict_file_path)  # 获取字典长度
# 调用模型函数
model = Text_CNN(words, dict_dim)
# 损失函数
cost = fluid.layers.cross_entropy(input=model, label=label)
avg_cost = fluid.layers.mean(cost)
# 优化器
optimizer = fluid.optimizer.Adam(learning_rate=0.0001)
optimizer.minimize(avg_cost)
# 准确率
accuracy = fluid.layers.accuracy(input=model, label=label)

# 执行器
place = fluid.CUDAPlace(0)
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())

# reader
## 训练集reader
tr_reader = train_reader(train_file_path)
batch_train_reader = paddle.batch(tr_reader, batch_size=128)
## 测试集reader
ts_reader = test_reader(test_file_path)
batch_test_reader = paddle.batch(ts_reader, batch_size=128)

# feeder
feeder = fluid.DataFeeder(place=place, feed_list=[words, label])

# 开始训练
for epoch in range(80):  # 外层循环控制训练轮次
    for batch_id, data in enumerate(batch_train_reader()):  # 内层循环控制批次
        train_cost, train_acc = exe.run(fluid.default_main_program(),  #program
                                        feed=feeder.feed(data),  #喂入的参数
                                        fetch_list=[avg_cost, accuracy])  #返回值
        if batch_id % 100 == 0:
            print("epoch:%d, batch:%d, cost:%f, acc:%f" %
                  (epoch, batch_id, train_cost[0], train_acc[0]))

    # 每轮训练结束后进行模型评估
    test_costs_list = []  # 存放测试集损失值
    test_accs_list = []  # 存放测试集准确率

    for batch_id, data in enumerate(batch_test_reader()):
        test_cost, test_acc = exe.run(fluid.default_main_program(),
                                      feed=feeder.feed(data),
                                      fetch_list=[avg_cost, accuracy])
        test_costs_list.append(test_cost[0])
        test_accs_list.append(test_acc[0])
    #　计算所有批次损失值/准确率均值
    avg_test_cost = sum(test_costs_list) / len(test_costs_list)
    avg_test_acc = sum(test_accs_list) / len(test_accs_list)
    print("epoch:%d, test_cost:%f, test_acc:%f" %
          (epoch, avg_test_cost, avg_test_acc))

# 训练结束，保存模型
model_save_dir = "model/"
if not os.path.exists(model_save_dir):
    os.makedirs(model_save_dir)
fluid.io.save_inference_model(model_save_dir,  # 保存路径
                              feeded_var_names=[words.name],  # 使用时传入参数名称
                              target_vars=[model],  #预测结果
                              executor=exe)  #执行器
print("模型保存成功.")

# 推理预测

In [None]:
model_save_dir = "model/"


def get_data(sentence):  # 将传入的句子根据字典中的值进行编码
    with open(dict_file_path, "r", encoding="utf-8") as f:
        dict_txt = eval(f.readlines()[0])

    ret = []  # 编码结果
    keys = dict_txt.keys()
    for w in sentence:  # 取出每个字
        if not w in keys:  # 字不在字典中
            w = "<unk>"
        ret.append(int(dict_txt[w]))
    return ret


# 执行器
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())

infer_program, feed_names, target_var = fluid.io.load_inference_model(model_save_dir, exe)

texts = []  # 存放待预测句子

data1 = get_data("在获得诺贝尔文学奖7年之后，莫言15日晚间在山西汾阳贾家庄如是说")
data2 = get_data("综合'今日美国'、《世界日报》等当地媒体报道，芝加哥河滨警察局表示")
data3 = get_data("中国队2022年冬奥会表现优秀")
data4 = get_data("中国人民银行今日发布通知，降低准备金率，预计释放4000亿流动性")
data5 = get_data("10月20日,第六届世界互联网大会正式开幕")
data6 = get_data("同一户型，为什么高层比低层要贵那么多？")
data7 = get_data("揭秘A股周涨5%资金动向：追捧2类股，抛售600亿香饽饽")
data8 = get_data("宋慧乔陷入感染危机，前夫宋仲基不戴口罩露面，身处国外神态轻松")
data9 = get_data("此盆栽花很好养，花美似牡丹，三季开花，南北都能养，很值得栽培")  # 不属于任何一个类别

texts.append(data1)
texts.append(data2)
texts.append(data3)
texts.append(data4)
texts.append(data5)
texts.append(data6)
texts.append(data7)
texts.append(data8)
texts.append(data9)

base_shape = [[len(c) for c in texts]]  # 计算每个句子长度
tensor_words = fluid.create_lod_tensor(texts, base_shape, place)
result = exe.run(infer_program,
                 feed={feed_names[0]: tensor_words},
                 fetch_list=target_var)
names = ["文化", "娱乐", "体育", "财经", "房产", "汽车", "教育", "科技", "国际", "证券"]
for r in result[0]:
    idx = np.argmax(r)  # 取出最大值的索引
    print("预测结果:", names[idx], " 概率:", r[idx])