# Model

In [None]:
from tensorflow import keras
from tensorflow.keras import layers
from AttentionLayer import *
from keras.layers import BatchNormalization, Dropout



class AMCNN:
    def __init__(self, maxlen, embed_dim,words_count, filter_size, channel, mask_prob=0.7,att_reg=0.0001 ):
        """
        :param maxlen: Max length of sequence
        :param embed_dim: Embedding size of word embedding layer
        :param words_count:  Word count of Tokenizer
        :param filter_size:  Filter size of CNN layer
        :param channel: Number of Attention Layer Channels
        :param mask_prob: Masking proportion of Attention Layer(It only apply training model.)
        :param att_reg: L2 regularizer term of Attention Layer
        """
        self.maxlen = maxlen
        self.words_count = words_count
        self.embed_dim = embed_dim
        self.filter_size = filter_size
        self.channel = channel
        self.att_reg = att_reg
        num_filter = embed_dim // filter_size
        self.num_filters = list(range(1, num_filter + 1))
        self.mask_prob = mask_prob

    def build(self, emb_trainable=True, pre_emb=True, emb_weight=None):
        """
        :param emb_trainable: Define trainable of Embedding Layer
        :param pre_emb: Whether to use pre-trained embedding weights
        :param emb_weight: Pre-trained embedding weights
        :return:
        """
        inputs = layers.Input(shape=(self.maxlen,))
        pad_k = tf.expand_dims(tf.cast((inputs == 0), dtype=tf.float32) * -99999, axis=2)

        if pre_emb:
            emb_layer = layers.Embedding(self.words_count + 1, self.embed_dim, trainable=emb_trainable,
                                         weights=[emb_weight])
        else:
            emb_layer = layers.Embedding(self.words_count + 1, self.embed_dim, trainable=
            True)
        inputs_emb = emb_layer(inputs)

        # Bi-LSTM cell summary
        lstm_layer = layers.LSTM(self.embed_dim, return_sequences=True)
        bi_lstm = layers.Bidirectional(lstm_layer, merge_mode="ave")(inputs_emb)

        C_features, self.scalar_att, self.vector_att = AttentionLayer(self.embed_dim, self.embed_dim, self.channel, 0.0001,
                                                            self.mask_prob)(bi_lstm, pad_k)
        inputs_emb2 = tf.expand_dims(inputs_emb, axis=3)
        C_features = tf.concat([inputs_emb2, C_features], axis=3)

        # kim-cnn process
        pools = []
        for filter_sizes in self.num_filters:
            cnn_layers = layers.Conv2D(self.filter_size, kernel_size=(filter_sizes, self.embed_dim), activation="relu")
            cnn_out = cnn_layers(C_features)
            cnn_out = layers.BatchNormalization()(cnn_out)  #배치정규화
            max_pools = layers.MaxPool2D(pool_size=(self.maxlen - filter_sizes + 1, 1))(cnn_out)
            max_pools = layers.Flatten()(max_pools)
            pools.append(max_pools)
        concated = layers.concatenate(pools)  # filter size x num_fiilters 수

        # Higy-way process
        gap_input_emb = layers.GlobalAvgPool1D()(inputs_emb)  # 임베딩 사이즈로 global average pooling
        trans_ = layers.Dense(self.embed_dim, activation="relu", use_bias=True)(gap_input_emb)
        carry_ = 1 - trans_
        gap_ = layers.Multiply()([trans_, gap_input_emb])
        concated_ = layers.Multiply()([carry_, concated])
        concated_ = layers.Dropout(0.6)(concated_)  # Dropout
        concated_ = layers.Add()([concated_, gap_])
        outputs = layers.Dense(1, activation="sigmoid")(concated_)

        self.model = keras.Model(inputs=inputs, outputs=outputs)
        return self.model

    def load_weights(self, path):
        self.model.load_weights(path)
        print("Load Weights Compelete!")

# Test

In [None]:
import tensorflow as tf
from sklearn.metrics import confusion_matrix, plot_confusion_matrix

args = easydict.EasyDict({        
        "max_length": 100,      
        "att_reg": 0.0001,
        "channel": 2,
        "weight_save_path" : "Weight",
        "val_model_epoch" : -1,
        "test_data" : "데이터/성능확인 test.csv" ,
        "document" : "text",
        "label" : "혐오"
    
})


def main():
    global x_test
    global y_test
    global pred_test
    global model
    global pred_test2
    global test_data
    # Check Gpu Enable
    physical_devices = tf.config.experimental.list_physical_devices('GPU')
    if len(physical_devices) > 0:
        tf.config.experimental.set_memory_growth(physical_devices[0], True)

    # parsing Arg
    test_data_path = args.test_data
    max_len = args.max_length
    att_reg = args.att_reg
    weight_save_path = args.weight_save_path
    document = args.document
    channel = args.channel
    val_model_epoch = args.val_model_epoch
    label = args.label
    
    # Read Data
    if ".csv" in test_data_path:
        read_data = pd.read_csv
    elif ".xlsx" in test_data_path:
        read_data = pd.read_excel
    else:
        read_data = pd.read_table
    test_data = read_data(test_data_path,encoding='CP949')
    

    # Make Tokenizer Token
    tk = Token("Tokenizer", max_len)
    test_data["Token"] = test_data[document].apply(lambda x: tk.make_token_ori(x))

    # Using Keras Tokenizer
    print("Load Keras tokenizer for validate in %s"%(weight_save_path))
    with open(os.path.join(weight_save_path,"keras_tokenizer.pkl"), "rb") as f:
        k_tokenizer = pickle.load(f)
    words_count = len(k_tokenizer.word_counts)

    #  K_tokenizer Sequence
    sequences = k_tokenizer.texts_to_sequences(test_data['Token'])
    x_test = keras.preprocessing.sequence.pad_sequences(sequences, maxlen=max_len)
    y_test = test_data[label].values

    

    # Build simple binary model
    tf.keras.backend.clear_session()
    amcnn = AMCNN(maxlen=max_len,
                  embed_dim=500,
                  words_count=words_count,
                  filter_size=50,
                  channel=channel,
                  mask_prob=0.5,
                  att_reg=att_reg)
    model = amcnn.build(pre_emb=False)
    
    # Weight 폴더에 저장된 마지막 가중치 모델 호출, 이 모델로 test 진행
    if val_model_epoch == -1:
        model_lst = [i for i in os.listdir(weight_save_path) if ".h5" in i]
        model_weight_path = model_lst[-1]
    else:
        model_weight_path = "model-%4d.h5"%(val_model_epoch)
        model_weight_path = model_weight_path.replace(" ","0")
    model.load_weights(os.path.join(weight_save_path,model_weight_path))
    
    print("Evaluate %s Test data"%(os.path.join(weight_save_path,model_weight_path)))
    pred_test = model.predict(x_test,verbose=1)
    pred_test2 = np.int32(pred_test >= 0.5).reshape(-1)
    
    # f1-score, acc_score, recall_score, precision_score 
    print("==============Evaluate Result============")
    print("f1_score :", f1_score(y_test, pred_test2))
    print("acc_score :", accuracy_score(y_test, pred_test2))
    print("recall_score :", recall_score(y_test, pred_test2))
    print("precision_score :", precision_score(y_test, pred_test2))
    print("==============Judgement Result============")
    

if __name__ == "__main__":
    main()