-
Notifications
You must be signed in to change notification settings - Fork 0
/
filters.py
328 lines (282 loc) · 11.6 KB
/
filters.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
"""
This file contains the DAN and function to construct the neural networks
"""
import torch
from torch import nn
from torch.distributions.multivariate_normal import MultivariateNormal as Mvn
import numpy as np
class DAN(nn.Module):
"""
A Data Assimilation Network class
"""
def __init__(self, a_kwargs, b_kwargs, c_kwargs):
nn.Module.__init__(self)
self.a = Constructor(**a_kwargs)
self.b = Constructor(**b_kwargs)
self.c = Constructor(**c_kwargs)
self.scores = {
"RMSE_b": [],
"RMSE_a": [],
"LOGPDF_b": [],
"LOGPDF_a": [],
"LOSS": []}
def forward(self, ha, x, y): # dessin sur ipad => la routine de base
"""
forward pass in the DAN
"""
# TODO
# propagate past mem into prior mem
background_t = self.b.forward(ha)
# translate prior mem into prior pdf
pdf_b = self.c.forward(background_t) #couple moyenne, variance pour b
# analyze prior mem
analysis_t = self.a.forward(torch.cat((background_t,y), dim = 1)) #h_(t+1)^a
# translate post mem into post pdf
pdf_a = self.c.forward(analysis_t) #moyenne, variance pour a
### LOSS ###
logpdf_b = -torch.mean(pdf_b.log_prob(x), dim = 0) #perte estimer par monte-carlo car intégrale (perte à l'instant t)
logpdf_a = -torch.mean(pdf_a.log_prob(x), dim = 0)
loss = logpdf_a + logpdf_b
# Compute scores
with torch.no_grad():
if logpdf_a is not None:
self.scores["RMSE_b"].append(torch.mean(torch.norm(
pdf_b.mean - x, dim=1)*x.size(1)**-.5).item())
self.scores["RMSE_a"].append(torch.mean(torch.norm(
pdf_a.mean - x, dim=1)*x.size(1)**-.5).item())
self.scores["LOGPDF_b"].append(logpdf_b.item())
self.scores["LOGPDF_a"].append(logpdf_a.item())
self.scores["LOSS"].append(loss.item())
return loss, analysis_t
def clear_scores(self):
""" clear the score lists
"""
for v in self.scores.values():
v.clear()
class Id(nn.Module):
""" A simple id function
"""
def __init__(self):
nn.Module.__init__(self)
def forward(self, x):
""" trivial
"""
return x
class Cst(nn.Module):
""" A constant scale_vec
"""
def __init__(self, init, dim=None):
nn.Module.__init__(self)
if isinstance(init, torch.Tensor):
self.c = init.unsqueeze(0)
else:
raise NameError("Cst init unknown")
def forward(self, x):
return self.c.expand(x.size(0), self.c.size(0))
class Lin2d(nn.Module):
def __init__(self, x_dim, N, dt, init, window = None):
### Matrice de rotation M ###
nn.Module.__init__(self)
deg = torch.tensor(np.pi/100)
self.M = torch.tensor([[torch.cos(deg), torch.sin(deg)], [-torch.sin(deg), torch.cos(deg)]])
self.x_dim = x_dim
self.N = N
def forward(self, x):
for _ in range(self.N):
x = torch.matmul(x,torch.transpose(self.M,0,1))
return x
class EDO(nn.Module):
""" Integrates an EDO with RK4
"""
def __init__(self, x_dim, N, dt, init,
window=None):
nn.Module.__init__(self)
self.x_dim = x_dim
self.N = N
self.dt = dt
if init == "95":
""" Lorenz95 (96) initialization
"""
self.window = (-2, -1, 0, 1)
self.diameter = 4
self.A = torch.tensor([[[0., 0., 0., 0.],
[-1., 0., 0., 0.],
[0., 0., 0., 0.],
[0., 1., 0., 0.]]])
self.b = torch.tensor([[0., 0., -1., 0.]])
self.c = torch.tensor([8.])
else:
raise NameError("EDO init not available")
def edo(self, x): #Pour résoudre Lorentz résout une edo
# input x: (mb,x_dim)
# output dx/dt: (mb,x_dim)
# Hint: convert x into v (mb,x_dim,4), then reshape into (mb*x_dim,4)
# and apply the matrix self.A using torch.nn.functional.bilinear, etc
"""v=
x-2 x-1 x0 x1
| | | |
"""
# TODO
v = torch.cat([torch.roll(x.unsqueeze(1), -i, 2) for i in self.window], 1)
v = torch.transpose(v, 1, 2)
v_reshaped = v.reshape(-1, self.diameter)
dx = torch.nn.functional.bilinear(v_reshaped, v_reshaped, self.A)\
+ torch.nn.functional.linear(v_reshaped, self.b, self.c)
return dx.view(x.size(0), x.size(1))
def forward(self, x):
for _ in range(self.N):
k1 = self.edo(x)
k2 = self.edo(x + 0.5*self.dt*k1)
k3 = self.edo(x + 0.5*self.dt*k2)
k4 = self.edo(x + self.dt*k3)
x = x + (self.dt/6.0)*(k1 + 2.0*k2 + 2.0*k3 + k4)
return x
class FullyConnected(nn.Module):
""" Fully connected NN ending with a linear layer
"""
def __init__(self, layers, activation_classname):
nn.Module.__init__(self)
n = len(layers)
self.lins = nn.ModuleList(
[nn.Linear(d0, d1) for
d0, d1 in zip(layers[:-1], layers[1:])])
self.acts = nn.ModuleList(
[eval(activation_classname)() for _ in range(n-2)])
def forward(self, h):
for lin, act in zip(self.lins[:-1], self.acts):
h = act(lin(h))
return self.lins[-1](h)
class FcZero(nn.Module):
"""
Fully connected neural network with ReZero trick
"""
def __init__(self, dim, deep, activation_classname):
"""
layers: the list of the layers dimensions
"""
# TODO correct an error
### Il faut initiliser les alphas comme étant des paramètres ###
### pour qu'on puisse calculer le gradient de la loss ###
nn.Module.__init__(self)
layers = (deep+1)*[dim]
self.lins = nn.ModuleList(
[nn.Linear(d0, d1) for
d0, d1 in zip(layers[:-1], layers[1:])])
self.acts = nn.ModuleList(
[eval(activation_classname)() for _ in range(deep)])
self.alphas = [nn.Parameter(torch.Tensor([0.])) for _ in range(deep)] #les alphas doivent avoit
def forward(self, h):
for lin, act, alpha in zip(self.lins, self.acts, self.alphas):
h = h + alpha*act(lin(h))
return h
class FcZeroLin(nn.Module): #Réseauw de neurones
"""
FcZero network ending with linear layer
"""
def __init__(self, in_dim, out_dim, deep, activation_classname):
"""
layers: the list of the layers dimensions
"""
nn.Module.__init__(self)
self.fcZero = FcZero(in_dim, deep, activation_classname)
self.out_dim = out_dim
assert(out_dim <= in_dim)
self.lin = FullyConnected([in_dim, out_dim], activation_classname)
def forward(self, h):
h = self.fcZero(h)
h = self.lin(h)
return h
class Gaussian(Mvn):
"""
Return a pytorch Gaussian pdf from args
args is either a (loc, scale_tril) or a (x_dim, vec)
"""
def __init__(self, *args):
self.stab_a = torch.Tensor([-8.0])
self.stab_b = torch.Tensor([8.0])
if isinstance(args[0], int):
"""args is a (x_dim, vec)
loc is the first x_dim coeff of vec
if the rest is one coeff c then
scale_tril = e^c*I
else
scale_tril is filled diagonal by diagonal
starting by the main one
(which is exponentiated to ensure strict positivity)
"""
x_dim, vec = args
vec_dim = vec.size(-1)
if vec_dim == x_dim + 1:
#print('Mvn: scale_tril = e^c*I')
loc = vec[:, :x_dim]
scale_tril = torch.eye(x_dim)\
.unsqueeze(0)\
.expand(vec.size(0), -1, -1)
scale_tril = torch.exp(vec[:, x_dim])\
.view(vec.size(0), 1, 1)*scale_tril
else:
#print('Mvn by mean and cov')
# TODO rewrite loc and scale_tril
# hint: use vec_to_inds
# vec.size(0) is the mini-batch size
mb = vec.size(0)
loc = vec[:, :x_dim]
inds = self.vec_to_inds(x_dim, vec_dim) #trouve quels indices de V vont dans la matrice triangulaire inférieur de variance covariance
scale_tril = torch.eye(x_dim).unsqueeze(0).expand(mb, -1, -1)
scale_tril = torch.exp(torch.max(self.stab_a, torch.min(vec[:, x_dim:2*x_dim], self.stab_b))).view(mb, 1, x_dim).mul(scale_tril)
scale_tril[: , inds[0][x_dim:], inds[1][x_dim:]] = vec[:, 2*x_dim:]
Mvn.__init__(self, loc = loc, scale_tril = scale_tril)
else:
"""args is a loc, scale_tril
"""
print('Init Mvn by full arg')
Mvn.__init__(self, loc = args[0], scale_tril = args[1])
def vec_to_inds(self, x_dim, vec_dim): # creer la matrice de variance covariance imposée définie positive en mettant des exp sur la diagonale => decomposition de cholesky pour lambda0 la variance slide 7 du cours
"""Computes the indices of scale_tril coeffs,
scale_tril is filled main diagonal first
x_dim: dimension of the random variable
vec_dim: dimension of the vector containing
the coeffs of loc and scale_tril
"""
ldiag, d, c = x_dim, 0, 0 # diag length, diag index, column index
inds = [[], []] # list of line and column indexes
for i in range(vec_dim - x_dim): # loop over the non-mean coeff
inds[0].append(c+d) # line index
inds[1].append(c) # column index
if c == ldiag-1: # the current diag end is reached
ldiag += -1 # the diag length is decremented
c = 0 # the column index is reinitialized
d += 1 # the diag index is incremented
else: # otherwize, only the column index is incremented
c += 1
return inds
class Constructor(nn.Module): #creation d'une gaussienne avec mx en moyenne et tir un epsilon pour l'étape de propagation et d'observation avec une moyenne différente
#si creer un réseau de neurones
"""Construct functions and conditional Gaussians from strings and kwargs
- scale_vec_class is not None: return a Gaussian made from a vector,
this vector is made of the concatenation of loc and scale_vec
- scale_vec_class is None:
if gauss_dim is not None: return a Gaussian made from a vector,
else: return a vector
"""
def __init__(self, loc_classname, loc_kwargs,
gauss_dim=None,
scale_vec_classname=None, scale_vec_kwargs=None):
nn.Module.__init__(self)
self.gauss_dim = gauss_dim
self.loc = eval(loc_classname)(**loc_kwargs)
if scale_vec_classname is not None:
self.scale_vec =\
eval(scale_vec_classname)(**scale_vec_kwargs)
else:
self.scale_vec = None
def forward(self, *args):
lc = self.loc(*args)
if self.gauss_dim is not None:
if self.scale_vec is not None:
sc = self.scale_vec(*args)
return Gaussian(self.gauss_dim, torch.cat((lc, sc), dim = 1))
else:
return Gaussian(self.gauss_dim, lc)
else:
return lc