In [1]:
import tensorflow as tf
import numpy as np
import tensorflow_addons as tfa

2023-01-08 18:18:55.856887: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
from typing import NamedTuple

class DataPreprocessor(object):
    
    def __init__(self, record_unknow=False) -> None:
        if record_unknow: self.unknow_words = dict()
    
    @staticmethod
    def text_cleansing(text):
        regex_except_token = r'\B(?!<\w+>\B)[^\w\s]'
        regex_expect_words = r'[^\w<>]+'
        output = re.sub(regex_except_token, '', text)
        output = re.sub(regex_expect_words, ' ', output)
        return output
    
    def load_word2vec_format(self, 
                             file_path, 
                             unknow_token=None, 
                             unknow_repr=None):
        with open(file_path, "r") as f:
            vec = dict()
            for l in f.readlines():
                data = l.split()
                vec[data[0]] = np.array(data[1:], dtype=np.float32)
            
            f.close()
                
        embedding_size = vec.pop(list(vec.keys())[0])
        if unknow_token is not None:
            try:
                unknow_vec = vec[unknow_token]
            except AttributeError:
                print(f"there's not '{unknow_token}' in dictionary.")
                if unknow_repr:
                    assert len(unknow_repr) == embedding_size, \
                        f"unknown represent vector must same shape with embedding size (expected {embedding_size}, got {len(unknow_repr)})"
                else:
                    unknow_repr = [ 0 for _ in range(embedding_size) ]
                vec[unknow_token] = unknow_repr
            self.unknow_token = unknow_token
        else: self.unknow_token = None
        
        vocab_size = len(vec.keys())    
        self.word_vectors = np.array(list(vec.values()))
        self.words_indices = { word: i for i, word in enumerate(vec.keys()) }
        self.embedding_shape = NamedTuple(vocab_size=vocab_size, embedding_size=embedding_size)
    
    def indice_encode(self, line):
        record_unknow = hasattr(self, "unknow_words")
        words = line.split()
        encoded = list()
        for word in words:
            try:
                i = self.words_indices[word]
            except AttributeError:
                if record_unknow:
                    if word in self.unknow_words.keys(): self.unknow_words[word] += 1
                    else: self.unknow_words[word] = 0
                
                if self.unknow_token is None: continue
                else: i = self.words_indices[self.unknow_token]
            encoded.append(i)
            
        return encoded
    
    def indice_padding(self, batch, padding_token, padding_size=0):
        padding_idx = self.words_indices[padding_token]
        seq_length = np.array([ len(inst) for inst in batch ])
        max_length = seq_length.max()
        padding_size = max(padding_size, max_length)
        
        padding_batch = np.array([ np.pad(inst, (0, padding_size - len(inst)), constant_values=padding_idx) for inst in batch ])
        return padding_batch

In [4]:
class TextCNNConfig():
    
    def __init__(self,
                num_classes,
                vocab_size,
                embedding_size, 
                filter_sizes, 
                num_filters,
                sequence_length=None,
                dropout_rate=None,
                l2_reg_lambda=0.0,
                seed=42,
                pretrain_embedding_matrix=None
                ) -> None:
        self.num_classes = num_classes
        self.vocab_size = vocab_size
        self.embedding_size = embedding_size
        self.filter_sizes = filter_sizes
        self.num_filters = num_filters
        self.sequence_length = sequence_length
        self.dropout_rate = dropout_rate
        self.l2_reg_lambda = l2_reg_lambda
        self.seed = seed
        self.pretrain_embedding_matrix = pretrain_embedding_matrix

class TextCNN(object):
    
    def __init__(self,
        num_classes,
        vocab_size,
        embedding_size, 
        filter_sizes, 
        num_filters,
        sequence_length=None,
        dropout_rate=None,
        l2_reg_lambda=0.0,
        seed=42,
        pretrain_embedding_matrix=None
        ) -> None:
        
        self.config = TextCNNConfig(
            num_classes,
            vocab_size,
            embedding_size, 
            filter_sizes, 
            num_filters,
            sequence_length,
            dropout_rate,
            l2_reg_lambda,
            seed,
            pretrain_embedding_matrix
        )
        
        if seed is not None: tf.random.set_seed(seed)
        input_word_idx = tf.keras.layers.Input(
            shape=(None, sequence_length),
            dtype=tf.dtypes.int32,
            name="input-word-idx-layer"
        )
        
        embed_trainable = pretrain_embedding_matrix is None
        if pretrain_embedding_matrix is None:
            embed_initializers = tf.keras.initializers.RandomUniform(minval=-1, maxval=1)
        else:
            pretrain_embedding_matrix = np.array(pretrain_embedding_matrix)
            assert (vocab_size, embedding_size) == pretrain_embedding_matrix.shape, \
                f"shape of embedding_matrix must match to vocab_size and embedding_size (expect {(vocab_size, embedding_size)}, got {pretrain_embedding_matrix.shape})."
            embed_initializers = tf.keras.initializers.Constant(pretrain_embedding_matrix)
        
        with tf.name_scope("embedding"), tf.device("cpu:0"):
            embed = tf.keras.layers.Embedding(
                input_dim=vocab_size,
                output_dim=embedding_size,
                embeddings_initializer=embed_initializers,
                input_length=sequence_length,
                trainable=embed_trainable,
                name="embedding-layer"
            )(input_word_idx)
            
            expand_dim_embed = tf.keras.layers.Reshape(
                target_shape=(vocab_size, embedding_size, 1),
                name="reshape-expand-dim-layer"
                )(embed)
        
        features = list()
        with tf.name_scope("convolution"):
            for i in range(num_filters):
                for size in filter_sizes:
                    conv = tf.keras.layers.Conv2D(
                        filters=num_filters,
                        kernel_size=(size, embedding_size),
                        strides=(1, 1),
                        padding="valid",
                        activation="relu",
                        use_bias=True,
                        kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.1),
                        bias_initializer=tf.keras.initializers.Constant(0.1),
                        name=f"conv-{size}_{embedding_size}-{i}th-layer"
                    )(expand_dim_embed)
                    reshape = tf.keras.layers.Reshape(target_shape=(1, -1), name=f"reshape-2d-{size}_{embedding_size}-{i}th-layer")(conv)
                    pooling = tf.keras.layers.GlobalMaxPool1D(name=f"global-max-pooling-{size}_{embedding_size}-{i}-layer")(reshape)
                    features.append(pooling)
            
        concat = tf.keras.layers.Concatenate(axis=1, name="concatenat-layer")(features)
        
        if dropout_rate is not None:
            with tf.name_scope("dropout"):
                dropout = tf.keras.layers.Dropout(rate=dropout_rate)(concat)
                fc_input = dropout
        else: fc_input = concat
        
        with tf.name_scope("fully-connected"):
            output = tf.keras.layers.Dense(
                units=num_classes,
                activation="softmax",
                use_bias=True,
                kernel_initializer=tf.keras.initializers.GlorotUniform(),
                bias_initializer=tf.keras.initializers.Constant(0.1),
                kernel_regularizer=tf.keras.regularizers.L2(l2=l2_reg_lambda),
                bias_regularizer=tf.keras.regularizers.L2(l2=l2_reg_lambda),
                name="output-layer"
                )(fc_input)
            
        self.model = tf.keras.Model(inputs=input_word_idx, outputs=output)
        

In [11]:
textcnn = TextCNN(num_classes=2,
                vocab_size=200,
                embedding_size=150,
                filter_sizes=[2, 3, 4],
                num_filters=2,
                sequence_length=10,
                dropout_rate=0.5,
                l2_reg_lambda=0.01,
                seed=42,
                pretrain_embedding_matrix=None)

textcnn.model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
    metrics=[
        tf.keras.metrics.Precision(),
        tf.keras.metrics.Recall(),
        tfa.metrics.F1Score(num_classes=textcnn.config.num_classes, average="macro")
    ]
)

In [13]:
# Note: If you don't want to visualize model achitecture 
# ***** you don't need to run this cell just skip it.
# You must install pydot (`pip install pydot`) and install graphviz 
# (see instructions at https://graphviz.gitlab.io/download/) for plot_model to work.
# (e.g. for mac) $ brew install graphviz
# (e.g. for win) $ winget install graphviz
tf.keras.utils.plot_model(textcnn.model, "textcnn_model.png", show_shapes=True)

You must install pydot (`pip install pydot`) and install graphviz (see instructions at https://graphviz.gitlab.io/download/) for plot_model to work.


In [14]:
textcnn.model.summary()

Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input-word-idx-layer (InputLay  [(None, None, 10)]  0           []                               
 er)                                                                                              
                                                                                                  
 embedding-layer (Embedding)    (None, None, 10, 15  30000       ['input-word-idx-layer[0][0]']   
                                0)                                                                
                                                                                                  
 reshape-expand-dim-layer (Resh  (None, 200, 150, 1)  0          ['embedding-layer[0][0]']        
 ape)                                                                                       