/
dynamic_pooling_layer.py
99 lines (82 loc) · 3.42 KB
/
dynamic_pooling_layer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""An implementation of Dynamic Pooling Layer."""
import typing
from keras import backend as K
from keras.engine import Layer
class DynamicPoolingLayer(Layer):
"""
Layer that computes dynamic pooling of one tensor.
:param psize1: pooling size of dimension 1
:param psize2: pooling size of dimension 2
:param kwargs: Standard layer keyword arguments.
Examples:
>>> import matchzoo as mz
>>> layer = mz.layers.DynamicPoolingLayer(3, 2)
>>> num_batch, left_len, right_len, num_dim = 5, 3, 2, 10
>>> layer.build([[num_batch, left_len, right_len, num_dim],
... [num_batch, left_len, right_len, 3]])
"""
def __init__(self,
psize1: int,
psize2: int,
**kwargs):
""":class:`DynamicPoolingLayer` constructor."""
super().__init__(**kwargs)
self._psize1 = psize1
self._psize2 = psize2
def build(self, input_shape: typing.List[int]):
"""
Build the layer.
:param input_shape: the shapes of the input tensors,
for DynamicPoolingLayer we need tow input tensors.
"""
super().build(input_shape)
input_shape_one = input_shape[0]
self._msize1 = input_shape_one[1]
self._msize2 = input_shape_one[2]
def call(self, inputs: list, **kwargs) -> typing.Any:
"""
The computation logic of DynamicPoolingLayer.
:param inputs: two input tensors.
"""
x, dpool_index = inputs
dpool_shape = K.tf.shape(dpool_index)
batch_index_one = K.tf.expand_dims(
K.tf.expand_dims(
K.tf.range(dpool_shape[0]), axis=-1),
axis=-1)
batch_index = K.tf.expand_dims(
K.tf.tile(batch_index_one, [1, self._msize1, self._msize2]),
axis=-1)
dpool_index_ex = K.tf.concat([batch_index, dpool_index], axis=3)
x_expand = K.tf.gather_nd(x, dpool_index_ex)
stride1 = self._msize1 / self._psize1
stride2 = self._msize2 / self._psize2
suggestion1 = self._msize1 / stride1
suggestion2 = self._msize2 / stride2
if suggestion1 != self._psize1 or suggestion2 != self._psize2:
raise ValueError("DynamicPooling Layer can not "
"generate ({} x {}) output feature map, "
"please use ({} x {} instead.)"
.format(self._psize1, self._psize2,
suggestion1, suggestion2))
x_pool = K.tf.nn.max_pool(x_expand,
[1, stride1, stride2, 1],
[1, stride1, stride2, 1],
"VALID")
return x_pool
def compute_output_shape(self, input_shape: list) -> tuple:
"""
Calculate the layer output shape.
:param input_shape: the shapes of the input tensors,
for DynamicPoolingLayer we need tow input tensors.
"""
input_shape_one = input_shape[0]
return (None, self._psize1, self._psize2, input_shape_one[3])
def get_config(self) -> dict:
"""Get the config dict of DynamicPoolingLayer."""
config = {
'psize1': self._psize1,
'psize2': self._psize2
}
base_config = super(DynamicPoolingLayer, self).get_config()
return dict(list(base_config.items()) + list(config.items()))