In [0]:
import tensorflow as tf
import numpy as np
# !pip install tensorflow==2.0.0-beta1

In [0]:
max_seg = 20
max_word = 30

train_amount = 500
test_amount = 100

w2v_len = 30
level_class_cnt = 5

dropout_rate = 0.5
hidden_feature_dim = 70
gru_feature_dim = 150
kernel_heights = [3, 4, 5]

In [3]:
x_train = np.random.randint(0, 50, (train_amount, max_seg, max_word))
x_test = np.random.randint(0, 50, (test_amount, max_seg, max_word))

y_train = np.random.randint(0, 5, (train_amount))
y_test = np.random.randint(0, 5, (test_amount))

fake_w2v = np.random.rand(50, w2v_len)

# x_train = np.expand_dims(fake_w2v[x_train], axis=-1)
# x_test = np.expand_dims(fake_w2v[x_test], axis=-1)

print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)

(500, 20, 30) (500,)
(100, 20, 30) (100,)


In [0]:
''' Slice a piece from one dimension.

The layer would slice the `index`th dimension from `target_dim` dimension of
the input tensor, which have `total_dim` dimensions, then squeeze the tensor
over the sliced dimension.

Args:
    total_dim (int): The total number of dimensions of the input tensor.
    target_dim (int): The index of the dimension that need to slice.
    index (int): The index of the dimension to keep in the slicing operation.

Returns:
    (Layer): A keras layer that implement the operation.
'''
def __get_filter_layer(total_dim, target_dim, index):
    def tensor_filter(tensor_in):
        nonlocal index
        begin = [0 if i != target_dim else index for i in range(total_dim)]
        size = [-1 if i != target_dim else 1 for i in range(total_dim)]
        return tf.squeeze(tf.slice(tensor_in, begin, size), axis=target_dim)
    return tf.keras.layers.Lambda(tensor_filter)


''' Implement `submodel` for each slice of tensor.

The model would slice its input tensor into pieces using `__get_filter_layer` 
along `branch_index`th dimension, then for each slice, implement submodel, 
finally the outputs of different submodels would be concated and reshaped to 
meet the demand of output.

Args:
    input_shape tuple(int): The shape of the input tensor.
    branch_index (int): The index of the dimension to slice, start from 0 as 
        sample amount dimension.
    output_shape tuple(int): The shape of the output tensor.
    submodel (Model): The model to apply to different slices.
    args (dict): The argument dictionary for `submodel`, exclude the `index` 
        argument.
'''
def __get_branch_model(input_shape, branch_index, output_shape, submodel, args={}):
    model_input = tf.keras.Input(input_shape)
    branch_models = [submodel(**dict(args, **{'index': i}))(model_input) 
                     for i in range(input_shape[branch_index - 1])]
    concated_layers = tf.keras.layers.Concatenate()(branch_models)
    model_output = tf.keras.layers.Reshape(output_shape)(concated_layers)
    return tf.keras.Model(model_input, model_output)


''' A CNN unit to encode segment with single kernel height.

The unit would apply a convolution to its input to get a 2-dimensional 
tensor, then apply max overtime pooling to get a single dimensional tensor.

Args:
    input_shape ((int)): The shape of segment matrix. (word_max)
    kernel_height (int): The height of the convolution kernel.
    index (int): The index of the segment in its belonging document.

Returns:
    (Model): The CNN model to encode the segment matrix.
'''
def __get_sentence_encode_unit(input_shape, kernel_height, index):
    global fake_w2v, w2v_len
    cnned_height = input_shape[0] - kernel_height + 1
    return tf.keras.models.Sequential([
        __get_filter_layer(3, 1, index),
        tf.keras.layers.Embedding(50, w2v_len, weights=[fake_w2v], input_length=30),
        tf.keras.layers.Reshape((*input_shape, w2v_len, 1)),
        tf.keras.layers.Conv2D(hidden_feature_dim, (kernel_height, w2v_len)),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.ReLU(),
        tf.keras.layers.Reshape((cnned_height, hidden_feature_dim, 1)),
        tf.keras.layers.MaxPool2D((cnned_height, 1))
    ])


''' A CNN unit to encode segment with multiple kernel heights

The unit would apply operation defined in `__get_sentence_encode_unit` for 
different kernel heights, then concat the result as a 1-dimensional tensor.

Args:
    input_shape ((int, int)): The shape of the document. (seg_max, word_max)
    kernel_heights ([int]): The list of the kernel heights.
    index: The index of the segment in its belonging document.

Returns:
    (Model): The CNN model to encode the segment matrix.
'''
def __get_multi_kernel_encode_unit(input_shape, kernel_heights, index):
    model_input = tf.keras.Input(input_shape)
    cnn_layers = [__get_sentence_encode_unit((input_shape[1],), h, index)
                     (model_input) for h in kernel_heights]
    concated_layers = tf.keras.layers.concatenate(cnn_layers)
    model_output = tf.keras.layers.Flatten()(concated_layers)
    return tf.keras.Model(model_input, model_output)


''' The softmax linear classifier for predicting segment sentiment.

Args:
    index (int): The index of the segment in its belonging document.

Returns:
    (Model): The softmax linear classifier to predict segment sentiment.
'''
def __get_seg_classifier_unit(index):
    return tf.keras.models.Sequential([
        __get_filter_layer(3, 1, index),
        tf.keras.layers.Dropout(dropout_rate),
        tf.keras.layers.Dense(level_class_cnt, activation='softmax')
    ])


''' The unit to get the attention weight for a segment from hidden feature.

Args:
    index (int): The index of the segment in its belonging document.

Returns:
    (Model): The model for predicting attention weight for a segment.

'''
def __get_attention_unit(index):
    return tf.keras.models.Sequential([
        __get_filter_layer(3, 1, index),
        tf.keras.layers.Dropout(dropout_rate),
        tf.keras.layers.Dense(2 * gru_feature_dim, activation='tanh'),
        tf.keras.layers.Dense(1, use_bias=False, activation='softmax')
    ])


''' A bidirectional-GRU unit to extract the hidden vectors.

The hidden vectors are used to predict the attention weights of the model.

Returns:
    (Model): The bidirectional-GRU unit to predict the hidden vectors.
'''
def __get_bidirectional_gru_unit():
    return tf.keras.models.Sequential([
        tf.keras.layers.Bidirectional(
            tf.keras.layers.GRU(gru_feature_dim, return_sequences=True)
        )
    ])

In [7]:
model_input = tf.keras.Input((max_seg, max_word))

encoding_model = __get_branch_model(
    input_shape=(max_seg, max_word), 
    branch_index=1, 
    output_shape=(max_seg, len(kernel_heights) * hidden_feature_dim), 
    submodel=__get_multi_kernel_encode_unit, 
    args={'kernel_heights': kernel_heights, 'input_shape': (max_seg, max_word)}
)(model_input)

biglu_model = __get_bidirectional_gru_unit()(encoding_model)

attention_model = __get_branch_model(
    input_shape=(max_seg, 2 * gru_feature_dim), 
    branch_index=1, 
    output_shape=(max_seg, 1), 
    submodel=__get_attention_unit
)(biglu_model)

classification_model = __get_branch_model(
    input_shape=(max_seg, len(kernel_heights) * hidden_feature_dim), 
    branch_index=1, 
    output_shape=(max_seg, level_class_cnt), 
    submodel=__get_seg_classifier_unit
)(encoding_model)

weighted_layer = tf.keras.layers.Lambda(tf.matmul, 
                                        arguments={'transpose_a': True, 'b': attention_model})(classification_model)

squeeze_layer = tf.keras.layers.Lambda(tf.squeeze, arguments={'axis': -1})(weighted_layer)

model = tf.keras.Model(model_input, squeeze_layer)

model.compile(optimizer=tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy'])

print('Model Compiled.')

model.fit(x_train, y_train, epochs=1)

print(model.predict(x_test).shape)

Model Compiled.


W0617 09:34:30.292229 140499544840064 deprecation.py:323] From /usr/local/lib/python3.6/dist-packages/tensorflow/python/ops/math_grad.py:1250: add_dispatch_support.<locals>.wrapper (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


Train on 500 samples
(100, 5)


In [10]:
model = tf.keras.models.Sequential([
    tf.keras.layers.Reshape(target_shape=(30, 20, 1))
])
model.compile(optimizer=tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy'])
model.predict(np.random.rand(10, 30, 20)).shape

(10, 30, 20, 1)