/
crf1d.py
210 lines (172 loc) · 7.99 KB
/
crf1d.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
from chainer.functions.array import broadcast
from chainer.functions.array import concat
from chainer.functions.array import reshape
from chainer.functions.array import select_item
from chainer.functions.array import split_axis
from chainer.functions.connection import embed_id
from chainer.functions.math import logsumexp
from chainer.functions.math import minmax
from chainer.functions.math import sum as _sum
def crf1d(cost, xs, ys, reduce='mean'):
"""Calculates negative log-likelihood of linear-chain CRF.
It takes a transition cost matrix, a sequence of costs, and a sequence of
labels. Let :math:`c_{st}` be a transition cost from a label :math:`s` to
a label :math:`t`, :math:`x_{it}` be a cost of a label :math:`t` at
position :math:`i`, and :math:`y_i` be an expected label at position
:math:`i`. The negative log-likelihood of linear-chain CRF is defined as
.. math::
L = -\\left( \\sum_{i=1}^l x_{iy_i} + \\
\\sum_{i=1}^{l-1} c_{y_i y_{i+1}} - {\\log(Z)} \\right) ,
where :math:`l` is the length of the input sequence and :math:`Z` is the
normalizing constant called partition function.
.. note::
When you want to calculate the negative log-likelihood of sequences
which have different lengths, sort the sequences in descending order of
lengths and transpose the sequences.
For example, you have three input sequences:
>>> a1 = a2 = a3 = a4 = np.random.uniform(-1, 1, 3).astype('f')
>>> b1 = b2 = b3 = np.random.uniform(-1, 1, 3).astype('f')
>>> c1 = c2 = np.random.uniform(-1, 1, 3).astype('f')
>>> a = [a1, a2, a3, a4]
>>> b = [b1, b2, b3]
>>> c = [c1, c2]
where ``a1`` and all other variables are arrays with ``(K,)`` shape.
Make a transpose of the sequences:
>>> x1 = np.stack([a1, b1, c1])
>>> x2 = np.stack([a2, b2, c2])
>>> x3 = np.stack([a3, b3])
>>> x4 = np.stack([a4])
and make a list of the arrays:
>>> xs = [x1, x2, x3, x4]
You need to make label sequences in the same fashion.
And then, call the function:
>>> cost = chainer.Variable(
... np.random.uniform(-1, 1, (3, 3)).astype('f'))
>>> ys = [np.zeros(x.shape[0:1], dtype='i') for x in xs]
>>> loss = F.crf1d(cost, xs, ys)
It calculates mean of the negative log-likelihood of the three
sequences.
The output is a variable whose value depends on the value of
the option ``reduce``. If it is ``'no'``, it holds the elementwise
loss values. If it is ``'mean'``, it holds mean of the loss values.
Args:
cost (Variable): A :math:`K \\times K` matrix which holds transition
cost between two labels, where :math:`K` is the number of labels.
xs (list of Variable): Input vector for each label.
``len(xs)`` denotes the length of the sequence,
and each :class:`~chainer.Variable` holds a :math:`B \\times K`
matrix, where :math:`B` is mini-batch size, :math:`K` is the number
of labels.
Note that :math:`B`\\ s in all the variables are not necessary
the same, i.e., it accepts the input sequences with different
lengths.
ys (list of Variable): Expected output labels. It needs to have the
same length as ``xs``. Each :class:`~chainer.Variable` holds a
:math:`B` integer vector.
When ``x`` in ``xs`` has the different :math:`B`, correspoding
``y`` has the same :math:`B`. In other words, ``ys`` must satisfy
``ys[i].shape == xs[i].shape[0:1]`` for all ``i``.
reduce (str): Reduction option. Its value must be either
``'mean'`` or ``'no'``. Otherwise, :class:`ValueError` is raised.
Returns:
~chainer.Variable: A variable holding the average negative
log-likelihood of the input sequences.
.. note::
See detail in the original paper: `Conditional Random Fields:
Probabilistic Models for Segmenting and Labeling Sequence Data
<http://repository.upenn.edu/cis_papers/159/>`_.
"""
if reduce not in ('mean', 'no'):
raise ValueError(
"only 'mean' and 'no' are valid for 'reduce', but '%s' is "
'given' % reduce)
assert xs[0].shape[1] == cost.shape[0]
n_label = cost.shape[0]
n_batch = xs[0].shape[0]
alpha = xs[0]
alphas = []
for x in xs[1:]:
batch = x.shape[0]
if alpha.shape[0] > batch:
alpha, alpha_rest = split_axis.split_axis(alpha, [batch], axis=0)
alphas.append(alpha_rest)
b_alpha, b_cost = broadcast.broadcast(alpha[..., None], cost)
alpha = logsumexp.logsumexp(b_alpha + b_cost, axis=1) + x
if len(alphas) > 0:
alphas.append(alpha)
alpha = concat.concat(alphas[::-1], axis=0)
logz = logsumexp.logsumexp(alpha, axis=1)
cost = reshape.reshape(cost, (cost.size, 1))
score = select_item.select_item(xs[0], ys[0])
scores = []
for x, y, y_prev in zip(xs[1:], ys[1:], ys[:-1]):
batch = x.shape[0]
if score.shape[0] > batch:
y_prev, _ = split_axis.split_axis(y_prev, [batch], axis=0)
score, score_rest = split_axis.split_axis(score, [batch], axis=0)
scores.append(score_rest)
score += (select_item.select_item(x, y) + reshape.reshape(
embed_id.embed_id(y_prev * n_label + y, cost), (batch,)))
if len(scores) > 0:
scores.append(score)
score = concat.concat(scores[::-1], axis=0)
loss = logz - score
if reduce == 'mean':
return _sum.sum(loss) / n_batch
else:
return loss
def argmax_crf1d(cost, xs):
"""Computes a state that maximizes a joint probability of the given CRF.
Args:
cost (Variable): A :math:`K \\times K` matrix which holds transition
cost between two labels, where :math:`K` is the number of labels.
xs (list of Variable): Input vector for each label.
``len(xs)`` denotes the length of the sequence,
and each :class:`~chainer.Variable` holds a :math:`B \\times K`
matrix, where :math:`B` is mini-batch size, :math:`K` is the number
of labels.
Note that :math:`B`\\ s in all the variables are not necessary
the same, i.e., it accepts the input sequences with different
lengths.
Returns:
tuple: A tuple of :class:`~chainer.Variable` object ``s`` and a
:class:`list` ``ps``.
The shape of ``s`` is ``(B,)``, where ``B`` is the mini-batch size.
i-th element of ``s``, ``s[i]``, represents log-likelihood of i-th
data.
``ps`` is a list of :class:`numpy.ndarray` or
:class:`cupy.ndarray`, and denotes the state that maximizes the
point probability.
``len(ps)`` is equal to ``len(xs)``, and shape of each ``ps[i]`` is
the mini-batch size of the corresponding ``xs[i]``. That means,
``ps[i].shape == xs[i].shape[0:1]``.
"""
alpha = xs[0]
alphas = []
max_inds = []
for x in xs[1:]:
batch = x.shape[0]
if alpha.shape[0] > batch:
alpha, alpha_rest = split_axis.split_axis(alpha, [batch], axis=0)
alphas.append(alpha_rest)
else:
alphas.append(None)
b_alpha, b_cost = broadcast.broadcast(alpha[..., None], cost)
scores = b_alpha + b_cost
max_ind = minmax.argmax(scores, axis=1)
max_inds.append(max_ind)
alpha = minmax.max(scores, axis=1) + x
inds = minmax.argmax(alpha, axis=1)
path = [inds.data]
for m, a in zip(max_inds[::-1], alphas[::-1]):
inds = select_item.select_item(m, inds)
if a is not None:
inds = concat.concat([inds, minmax.argmax(a, axis=1)], axis=0)
path.append(inds.data)
path.reverse()
score = minmax.max(alpha, axis=1)
for a in alphas[::-1]:
if a is None:
continue
score = concat.concat([score, minmax.max(a, axis=1)], axis=0)
return score, path