forked from LongxingTan/Time-series-prediction
/
nbeats.py
66 lines (56 loc) · 2.45 KB
/
nbeats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# -*- coding: utf-8 -*-
# @author: Longxing Tan, tanlongxing888@163.com
# @date: 2020-03
# paper: https://arxiv.org/abs/1905.10437
# other implementations: https://github.com/philipperemy/n-beats
# : https://github.com/ElementAI/N-BEATS
from collections import defaultdict
import tensorflow as tf
from deepts.layers.nbeats_layer import *
tf.config.experimental_run_functions_eagerly(True)
params=defaultdict(stack_types=['trend_block','seasonality_block'],
nb_blocks_per_stack=3,
hidden_layer_units=256,
thetas_dims=(4, 8),
share_weights_in_stack=False,
)
class NBeatsNet(object):
def __init__(self, custom_model_params):
params.update(custom_model_params)
self.stack_types=params['stack_types']
self.nb_blocks_per_stack=params['nb_blocks_per_stack']
self.hidden_layer_units=params['hidden_layer_units']
self.theta_dims=params['thetas_dims']
self.share_weights_in_stack=params['share_weights_in_stack']
self.block_type={
'trend_block':TrendBlock,
'seasonality_block':SeasonalityBlock,
'general':GenericBlock
}
self.stacks=[]
for stack_id in range(len(self.stack_types)):
self.stacks.append(self.create_stack(stack_id))
def __call__(self, x, predict_seq_length, training):
self.forecast_length = x.get_shape().as_list()[1]
self.backcast_length = predict_seq_length
forecast = tf.zeros([tf.shape(x)[0], self.forecast_length],dtype=tf.float32)
for stack_id in range(len(self.stacks)):
for block_id in range(len(self.stacks[stack_id])):
b, f = self.stacks[stack_id][block_id](backcast)
backcast = backcast - b
forecast = forecast + f
return forecast
def create_stack(self, stack_id):
stack_type=self.stack_types[stack_id]
blocks=[]
for block_id in range(self.nb_blocks_per_stack):
block_init=self.block_type[stack_type]
if self.share_weights_in_stack and block_id!=0:
block=blocks[-1]
else:
block=block_init(self.hidden_layer_units,
self.theta_dims[stack_id],
self.backcast_length,
self.forecast_length)
blocks.append(block)
return blocks