In [1]:
%load_ext autoreload
%autoreload 2

In [10]:
import glob
import os
import warnings
from collections import defaultdict
from utils.misc import *
warnings.filterwarnings('ignore')

In [19]:
import tensorflow as tf
from tensorflow.keras import Input
from tensorflow.keras.layers import Conv1D, Dense, Flatten

In [3]:
DATA_PATH = '../../data/mimii-anomaly-detection'
MERGE_MACHINE_ID = True

file_paths = sorted(glob.glob(DATA_PATH + '/*/*' if MERGE_MACHINE_ID else DATA_PATH + '/*/*/*'))

In [4]:
file_path = file_paths[0]
file_path_split = file_path.split('/')
suffix = '_'.join(['', file_path_split[-1], file_path_split[-2]])

if MERGE_MACHINE_ID:
    print('db: {}, machine type: {}'.format(file_path_split[-2], file_path_split[-1]))

else:
    print('db: {}, machine type: {}, machine id: {}'.format(file_path_split[-3], file_path_split[-2], file_path_split[-1]))
    suffix = '_'.join([suffix, file_path_split[-3]]) 

db: 6dB, machine type: fan


In [5]:
dataset = defaultdict(list)

for key in ['train_files', 'test_files', 'train_labels', 'test_labels']:
    file_name = os.path.join(DATA_PATH, 'dataset', key + suffix + '.txt')
    with open(file_name, 'r') as f:
        for item in f:
            dataset[key].append(item[:-1])

In [6]:
train_data_path = os.path.join(DATA_PATH, 'dataset', 'train_data' + suffix + '.pkl')

if os.path.exists(train_data_path):
    train_data = load_pickle(train_data_path)

In [20]:
n_mels = 64
frames = 5
latent_dim = 2

input_dims = n_mels * frames
input_layer = Input(shape=(input_dims, 1))
h = Conv1D(filters=32, kernel_size=3, strides=2, activation='relu')(input_layer)
h = Conv1D(filters=64, kernel_size=3, strides=2, activation='relu')(h)
h = Flatten()(h)
h = Dense(latent_dim + latent_dim)(h)

* https://www.tensorflow.org/tutorials/generative/cvae?hl=ko
* https://github.com/tensorflow/docs/blob/master/site/en/tutorials/generative/cvae.ipynb

In [8]:
class CVAE(tf.keras.Model):
  """Convolutional variational autoencoder."""

  def __init__(self, latent_dim):
    super(CVAE, self).__init__()
    self.latent_dim = latent_dim
    self.encoder = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(
                filters=32, kernel_size=3, strides=(2, 2), activation='relu'),
            tf.keras.layers.Conv2D(
                filters=64, kernel_size=3, strides=(2, 2), activation='relu'),
            tf.keras.layers.Flatten(),
            # No activation
            tf.keras.layers.Dense(latent_dim + latent_dim),
        ]
    )

    self.decoder = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=(latent_dim,)),
            tf.keras.layers.Dense(units=7*7*32, activation=tf.nn.relu),
            tf.keras.layers.Reshape(target_shape=(7, 7, 32)),
            tf.keras.layers.Conv2DTranspose(
                filters=64, kernel_size=3, strides=2, padding='same',
                activation='relu'),
            tf.keras.layers.Conv2DTranspose(
                filters=32, kernel_size=3, strides=2, padding='same',
                activation='relu'),
            # No activation
            tf.keras.layers.Conv2DTranspose(
                filters=1, kernel_size=3, strides=1, padding='same'),
        ]
    )

  @tf.function
  def sample(self, eps=None):
    if eps is None:
      eps = tf.random.normal(shape=(100, self.latent_dim))
    return self.decode(eps, apply_sigmoid=True)

  def encode(self, x):
    mean, logvar = tf.split(self.encoder(x), num_or_size_splits=2, axis=1)
    return mean, logvar

  def reparameterize(self, mean, logvar):
    eps = tf.random.normal(shape=mean.shape)
    return eps * tf.exp(logvar * .5) + mean

  def decode(self, z, apply_sigmoid=False):
    logits = self.decoder(z)
    if apply_sigmoid:
      probs = tf.sigmoid(logits)
      return probs
    return logits

In [9]:
optimizer = tf.keras.optimizers.Adam(1e-4)


def log_normal_pdf(sample, mean, logvar, raxis=1):
  log2pi = tf.math.log(2. * np.pi)
  return tf.reduce_sum(
      -.5 * ((sample - mean) ** 2. * tf.exp(-logvar) + logvar + log2pi),
      axis=raxis)


def compute_loss(model, x):
  mean, logvar = model.encode(x)
  z = model.reparameterize(mean, logvar)
  x_logit = model.decode(z)
  cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)
  logpx_z = -tf.reduce_sum(cross_ent, axis=[1, 2, 3])
  logpz = log_normal_pdf(z, 0., 0.)
  logqz_x = log_normal_pdf(z, mean, logvar)
  return -tf.reduce_mean(logpx_z + logpz - logqz_x)


@tf.function
def train_step(model, x, optimizer):
  """Executes one training step and returns the loss.

  This function computes the loss and gradients, and uses the latter to
  update the model's parameters.
  """
  with tf.GradientTape() as tape:
    loss = compute_loss(model, x)
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))