-
Notifications
You must be signed in to change notification settings - Fork 4
/
standard_autoencoder.py
182 lines (161 loc) · 7.8 KB
/
standard_autoencoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import numpy as np
from multiphenotype_utils import (get_continuous_features_as_matrix, add_id, remove_id_and_get_mat,
partition_dataframe_into_binary_and_continuous, divide_idxs_into_batches)
import pandas as pd
import tensorflow as tf
from dimreducer import DimReducer
from copy import deepcopy
from general_autoencoder import GeneralAutoencoder
class StandardAutoencoder(GeneralAutoencoder):
"""
Implements a standard, deterministic feed-forward autoencoder.
"""
def __init__(self,
encoder_layer_sizes,
decoder_layer_sizes,
learn_continuous_variance=True,
**kwargs):
super(StandardAutoencoder, self).__init__(**kwargs)
# Does not include input_dim, but includes last hidden layer
self.encoder_layer_sizes = deepcopy(encoder_layer_sizes) # make a deepcopy so we don't modify the original data accidentally.
self.k = self.encoder_layer_sizes[-1]
self.decoder_layer_sizes = deepcopy(decoder_layer_sizes)
self.learn_continuous_variance = learn_continuous_variance
def init_network(self):
if self.learn_continuous_variance:
# we exponentiate this because it has to be non-negative.
self.log_continuous_variance = tf.Variable(self.initialization_function([1]))
self.weights = {}
self.biases = {}
# Encoder layers.
for encoder_layer_idx, encoder_layer_size in enumerate(self.encoder_layer_sizes):
if encoder_layer_idx == 0:
input_dim = len(self.feature_names) + self.include_age_in_encoder_input # if we include age in input, need one extra feature
else:
input_dim = self.encoder_layer_sizes[encoder_layer_idx - 1]
output_dim = self.encoder_layer_sizes[encoder_layer_idx]
print("Added encoder layer with input dimension %i and output dimension %i" % (input_dim, output_dim))
self.weights['encoder_h%i' % encoder_layer_idx] = tf.Variable(
self.initialization_function([input_dim, output_dim]))
self.biases['encoder_b%i' % encoder_layer_idx] = tf.Variable(
self.initialization_function([output_dim]))
# Decoder layers.
self.decoder_layer_sizes.append(len(self.feature_names))
for decoder_layer_idx, decoder_layer_size in enumerate(self.decoder_layer_sizes):
if decoder_layer_idx == 0:
input_dim = self.k
else:
input_dim = self.decoder_layer_sizes[decoder_layer_idx - 1]
output_dim = self.decoder_layer_sizes[decoder_layer_idx]
print("Added decoder layer with input dimension %i and output dimension %i" % (input_dim, output_dim))
self.weights['decoder_h%i' % decoder_layer_idx] = tf.Variable(
self.initialization_function([input_dim, output_dim]))
self.biases['decoder_b%i' % decoder_layer_idx] = tf.Variable(
self.initialization_function([output_dim]))
def get_setter_ops(self):
self.weights_placeholders = {}
self.weights_setters = {}
for key in self.weights:
self.weights_placeholders[key] = tf.placeholder(
tf.float32,
shape=self.weights[key].shape)
self.weights_setters[key] = tf.assign(
self.weights[key],
self.weights_placeholders[key],
validate_shape=True)
self.biases_placeholders = {}
self.biases_setters = {}
for key in self.biases:
self.biases_placeholders[key] = tf.placeholder(
tf.float32,
shape=self.biases[key].shape)
self.biases_setters[key] = tf.assign(
self.biases[key],
self.biases_placeholders[key],
validate_shape=True)
def assign_weights_and_biases(self, weights, biases):
"""
weights and biases are dicts that can be partially defined.
"""
for key in weights:
assert key in self.weights
self.sess.run(
self.weights_setters[key],
feed_dict={self.weights_placeholders[key]:weights[key]})
for key in biases:
assert key in self.biases
self.sess.run(
self.biases_setters[key],
feed_dict={self.biases_placeholders[key]:biases[key]})
def encode(self, X):
num_layers = len(self.encoder_layer_sizes)
Z = X
for idx in range(num_layers):
Z = tf.matmul(Z, self.weights['encoder_h%i' % (idx)]) \
+ self.biases['encoder_b%i' % (idx)]
# No non-linearity on the last layer
if idx != num_layers - 1:
Z = self.non_linearity(Z)
return Z
def decode(self, Z):
num_layers = len(self.decoder_layer_sizes)
X_with_logits = Z
for idx in range(num_layers):
X_with_logits = tf.matmul(X_with_logits, self.weights['decoder_h%i' % idx]) \
+ self.biases['decoder_b%i' % idx]
# No non-linearity on the last layer
if idx != num_layers - 1:
X_with_logits = self.non_linearity(X_with_logits)
return X_with_logits
def get_continuous_loss(self, X_continuous, Xr_continuous):
# Given the true values X_continuous and the reconstructed values Xr_continuous
# returns the continuous loss.
if len(self.continuous_feature_idxs) == 0:
continuous_loss = tf.zeros(1)
else:
if self.learn_continuous_variance:
# if we do not assume the variance is one, the continuous loss is the
# negative Gaussian log likelihood with all constant terms.
continuous_variance = tf.exp(self.log_continuous_variance)
continuous_loss = (
.5 * (
tf.reduce_mean(
tf.reduce_sum(
tf.square(X_continuous - Xr_continuous) / continuous_variance,
axis=1),
axis=0))
+ .5 * (
self.log_continuous_variance +
tf.log(2 * np.pi))
* len(self.continuous_feature_idxs))
else:
# otherwise, it is just a squared-error loss.
continuous_loss = .5 * (
tf.reduce_mean(
tf.reduce_sum(
tf.square(X_continuous - Xr_continuous),
axis=1),
axis=0))
return continuous_loss
def get_binary_loss(self, X_binary, Xr_logits):
# Given the true values X_binary and the reconstructed values Xr_logits
# returns the binary loss.
if len(self.binary_feature_idxs) == 0:
binary_loss = tf.zeros(1)
else:
binary_loss = tf.reduce_mean(
tf.reduce_sum(
tf.nn.sigmoid_cross_entropy_with_logits(
logits=Xr_logits,
labels=X_binary),
axis=1),
axis=0)
# upweight binary loss by the binary loss weighting.
binary_loss = self.binary_loss_weighting * binary_loss
return binary_loss
def get_binary_and_continuous_loss(self, X, Xr):
X_binary, X_continuous = self.split_into_binary_and_continuous(X)
Xr_logits, Xr_continuous = self.split_into_binary_and_continuous(Xr)
binary_loss = self.get_binary_loss(X_binary, Xr_logits)
continuous_loss = self.get_continuous_loss(X_continuous, Xr_continuous)
return binary_loss, continuous_loss