/
linear.py
102 lines (80 loc) · 3.71 KB
/
linear.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from __future__ import absolute_import, division, print_function
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
use_cuda = torch.cuda.is_available()
FloatTensor = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if use_cuda else torch.LongTensor
ByteTensor = torch.cuda.ByteTensor if use_cuda else torch.ByteTensor
Tensor = FloatTensor
class EnvelopeLinearCQN(torch.nn.Module):
'''
Linear Controllable Q-Network, Envelope Version
'''
def __init__(self, state_size, action_size, reward_size):
super(EnvelopeLinearCQN, self).__init__()
self.state_size = state_size
self.action_size = action_size
self.reward_size = reward_size
# S x A -> (W -> R^n). =>. S x W -> (A -> R^n)
self.affine1 = nn.Linear(state_size + reward_size,
(state_size + reward_size) * 16)
self.affine2 = nn.Linear((state_size + reward_size) * 16,
(state_size + reward_size) * 32)
self.affine3 = nn.Linear((state_size + reward_size) * 32,
(state_size + reward_size) * 64)
self.affine4 = nn.Linear((state_size + reward_size) * 64,
(state_size + reward_size) * 32)
self.affine5 = nn.Linear((state_size + reward_size) * 32,
action_size * reward_size)
def H(self, Q, w, s_num, w_num):
# mask for reordering the batch
mask = torch.cat(
[torch.arange(i, s_num * w_num + i, s_num)
for i in range(s_num)]).type(LongTensor)
reQ = Q.view(-1, self.action_size * self.reward_size
)[mask].view(-1, self.reward_size)
# extend Q batch and preference batch
reQ_ext = reQ.repeat(w_num, 1)
w_ext = w.unsqueeze(2).repeat(1, self.action_size * w_num, 1)
w_ext = w_ext.view(-1, self.reward_size)
# produce the inner products
prod = torch.bmm(reQ_ext.unsqueeze(1), w_ext.unsqueeze(2)).squeeze()
# mask for take max over actions and weights
prod = prod.view(-1, self.action_size * w_num)
inds = prod.max(1)[1]
mask = ByteTensor(prod.size()).zero_()
mask.scatter_(1, inds.data.unsqueeze(1), 1)
mask = mask.view(-1, 1).repeat(1, self.reward_size)
# get the HQ
HQ = reQ_ext.masked_select(Variable(mask)).view(-1, self.reward_size)
return HQ
def H_(self, Q, w, s_num, w_num):
reQ = Q.view(-1, self.reward_size)
# extend preference batch
w_ext = w.unsqueeze(2).repeat(1, self.action_size, 1).view(-1, 2)
# produce hte inner products
prod = torch.bmm(reQ.unsqueeze(1), w_ext.unsqueeze(2)).squeeze()
# mask for take max over actions
prod = prod.view(-1, self.action_size)
inds = prod.max(1)[1]
mask = ByteTensor(prod.size()).zero_()
mask.scatter_(1, inds.data.unsqueeze(1), 1)
mask = mask.view(-1, 1).repeat(1, self.reward_size)
# get the HQ
HQ = reQ.masked_select(Variable(mask)).view(-1, self.reward_size)
return HQ
def forward(self, state, preference, w_num=1):
s_num = int(preference.size(0) / w_num)
x = torch.cat((state, preference), dim=1)
x = x.view(x.size(0), -1)
x = F.relu(self.affine1(x))
x = F.relu(self.affine2(x))
x = F.relu(self.affine3(x))
x = F.relu(self.affine4(x))
q = self.affine5(x)
q = q.view(q.size(0), self.action_size, self.reward_size)
hq = self.H(q.detach().view(-1, self.reward_size), preference, s_num, w_num)
return hq, q