In [1]:
import pandas as pd
import codecs, gc
import numpy as np
import os
os.environ['TF_KERAS'] = '1'
from sklearn.model_selection import KFold
from keras_bert import load_trained_model_from_checkpoint, Tokenizer
from tensorflow.keras.metrics import top_k_categorical_accuracy
from tensorflow.keras.layers import *
from tensorflow.keras.callbacks import *
from tensorflow.keras.models import Model
import tensorflow.keras.backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical

In [2]:
#bert后面接textcnn需要NonMasking
class NonMasking(Layer):
    def __init__(self, **kwargs):
        self.supports_masking = True
        super(NonMasking, self).__init__(**kwargs)
 
    def build(self, input_shape):
        input_shape = input_shape
 
    def compute_mask(self, input, input_mask=None):
        # do not pass the mask to the next layers
        return None
 
    def call(self, x, mask=None):
        return x
 
    def get_output_shape_for(self, input_shape):
        return input_shape


In [4]:
def get_model(config_path, checkpoint_path):
    bert_model = load_trained_model_from_checkpoint(
        config_path, checkpoint_path)
    for l in bert_model.layers:
        l.trainable = False
    T1 = Input(shape=(512,))
    T2 = Input(shape=(512,))
    T = bert_model([T1, T2])
    T = NonMasking()(T)
    convs = []
    for kernel_size in [3, 4, 5]:
        c = Conv1D(32, kernel_size, activation='relu',padding='same')(T)
        c = GlobalMaxPooling1D()(c)
        convs.append(c)
    x = Concatenate()(convs)
    x = Dropout(0.2)(x)
    output = Dense(119, activation='softmax')(x)
    model = Model([T1, T2], output)
    model.summary()
    model.compile(
        loss='categorical_crossentropy',
        optimizer=Adam(1e-5),
        metrics=['accuracy']
    )
    model.summary()
    return model


In [5]:
config_path =r'E:\chinese_L-12_H-768_A-12\new\bert_config.json'
checkpoint_path = r'E:\chinese_L-12_H-768_A-12\new\bert_model.ckpt'
dict_path =  r'E:\chinese_L-12_H-768_A-12\new\vocab.txt'

model_clf = get_model(config_path, checkpoint_path)

Model: "functional_5"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 512)]        0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            [(None, 512)]        0                                            
__________________________________________________________________________________________________
functional_3 (Functional)       (None, 512, 768)     101677056   input_1[0][0]                    
                                                                 input_2[0][0]                    
__________________________________________________________________________________________________
non_masking (NonMasking)        (None, 512, 768)     0           functional_3[0][0]    

In [6]:
token_dict = {}
with codecs.open(dict_path, 'r', 'utf8') as reader:
    for line in reader:
        token = line.strip()
        token_dict[token] = len(token_dict)

In [14]:
data_train=pd.read_csv('train.csv',encoding='utf-8')
data_dev=pd.read_csv('dev.csv',encoding='utf-8')

In [15]:
def token_inde(text):
    tokenizer = Tokenizer(token_dict)
    indices, segments = tokenizer.encode(text, max_len=512)
    return indices
 
def token_segm(text):
    tokenizer = Tokenizer(token_dict)
    indices, segments = tokenizer.encode(text, max_len=512)
    return segments
 
# tokenizer = Tokenizer(token_dict)
# 进行bert token处理
# df['cutted'] = df['text'].apply(lambda x: tokenizer.tokenize(x))

#训练集处理
data_train['indices']=data_train['sentence'].apply(token_inde)
data_train['segments']=data_train['sentence'].apply(token_segm)
#测试集处理
data_dev['indices']=data_dev['sentence'].apply(token_inde)
data_dev['segments']=data_dev['sentence'].apply(token_segm)

In [16]:
#label类别处理(y值即label的映射，label的数量) eg：'体育':1  1:'新闻'
label = list(set(data_train['label_des'].tolist()))
dig_lables = dict(enumerate(label))
lable_dig = dict((lable, dig) for dig, lable in dig_lables.items())
print('y值处理成功类别共计***', len(lable_dig))
data_train['label_new'] = data_train['label_des'].apply(lambda lable: lable_dig[lable])

y值处理成功类别共计*** 119


In [12]:
#类别保存到本地
import json
item = json.dumps(dig_lables, ensure_ascii=False, indent=4)
with open('label.json','w',encoding='utf-8') as f:
    f.write(item)

In [17]:
#训练集x和y
train_data = [np.array(data_train['indices'].tolist()),np.array(data_train['segments'].tolist())]
train_lables = to_categorical(data_train['label'],num_classes=len(label))
 
#测试集
test_data= [np.array(data_dev['indices'].tolist()),np.array(data_dev['segments'].tolist())]

In [None]:
model_clf.summary()
model_clf.fit(train_data,train_lables,epochs=2, batch_size=1024)

Model: "functional_5"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 512)]        0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            [(None, 512)]        0                                            
__________________________________________________________________________________________________
functional_3 (Functional)       (None, 512, 768)     101677056   input_1[0][0]                    
                                                                 input_2[0][0]                    
__________________________________________________________________________________________________
non_masking (NonMasking)        (None, 512, 768)     0           functional_3[0][0]    

In [None]:
# 测试
from sklearn import metrics
test_predict=model_clf.predict(test_data)
test['bert_textcnn预测']=[dig_lables[test_predict[i].argmax()] for i in range(len(test_predict))]
 
print('result:',metrics.accuracy_score(test['text'],test['bert_textcnn预测']))