-
Notifications
You must be signed in to change notification settings - Fork 0
/
model.py
111 lines (88 loc) · 3.79 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import torch
import torch.nn as nn
import torch.nn.functional as F
class Encoder(nn.Module):
def __init__(self, input_size=37, hidden_size=16, num_layers=2):
super(Encoder, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True,
dropout=0.1, bidirectional=False)
def forward(self, x):
output, (hidden, cell) = self.lstm(x) # out: tensor of shape (batch_size, seq_length, hidden_size)
return output, (hidden, cell)
class Decoder(nn.Module):
def __init__(self, input_size=37, hidden_size=16, output_size=37, num_layers=2):
super(Decoder, self).__init__()
self.hidden_size = hidden_size
self.output_size = output_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True,
dropout=0.1, bidirectional=False)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x, hidden):
output, (hidden, cell) = self.lstm(x, hidden) # out: tensor of shape (batch_size, seq_length, hidden_size)
prediction = self.fc(output)
return prediction, (hidden, cell)
## LSTM Auto Encoder
class LSTMAutoEncoder(nn.Module):
def __init__(self,
input_dim: int,
hidden_dim: int,
attention: bool,
seq_length: int,
**kwargs) -> None:
"""
:param input_dim: 변수 Tag 갯수
:param hidden_dim: 최종 압축할 차원 크기
:param kwargs:
"""
super(LSTMAutoEncoder, self).__init__()
self.hidden_dim = hidden_dim
self.input_dim = input_dim
self.attention = attention
self.seq_length = seq_length
if "num_layers" in kwargs:
num_layers = kwargs.pop("num_layers")
else:
num_layers = 1
self.encoder = Encoder(
input_size=input_dim,
hidden_size=hidden_dim,
num_layers=num_layers,
)
self.reconstruct_decoder = Decoder(
input_size=input_dim,
output_size=input_dim,
hidden_size=hidden_dim,
num_layers=num_layers,
)
if self.attention:
self.Qw = nn.Linear(self.hidden_dim,self.hidden_dim,bias=False)
self.Kw = nn.Linear(self.hidden_dim,self.hidden_dim,bias=False)
self.Vw = nn.Linear(self.hidden_dim,self.hidden_dim,bias=False)
self.project = nn.Linear(self.seq_length,num_layers,bias=False)
def forward(self,x):
batch_size, sequence_length, var_length = x.size()
output,encoder_hidden = self.encoder(x)
if self.attention:
hidden, cell = encoder_hidden
query = self.Qw(output)
key = self.Kw(output)
value = self.Vw(output)
attention_map = torch.matmul(query,key.transpose(2,1))
attention_map = F.softmax(attention_map,dim=-1)
attention_map = torch.matmul(attention_map,value)
project_mat = self.project(attention_map.transpose(2,1))
new_hidden = project_mat.permute(2,0,1)+hidden
new_hidden = new_hidden.contiguous()
hidden = (new_hidden,cell)
else:
hidden = encoder_hidden
temp_input = torch.zeros((batch_size,1,var_length),dtype=torch.float).to(x.device)
reconstruct_output=[]
for t in range(sequence_length):
temp_input, hidden = self.reconstruct_decoder(temp_input,hidden)
reconstruct_output.append(temp_input)
reconstruct_output = torch.cat(reconstruct_output,dim=1)
return reconstruct_output