In [None]:
# set a seed to control all randomness
from tensorflow import set_random_seed
from numpy.random import seed

set_random_seed(1)
seed(1)

# Examine Inputs

Let's load some local data and examine its contents. We'll play a small animation that contains the first few temporal frames of the input data below.

In [None]:
import numpy as np

# X.shape = n_body_parts, n_time_intervals, n_dimensions (3)
X = np.load('data/npy/dance.npy')

# labels[i] is a label for the ith body_part
labels = ['AnnaC7.position', 'AnnaRFSH.position', 'AnnaRSHN.position', 'AnnaLIWR.position', 'AnnaRBWT.position', 'AnnaLOHAND.position', 'AnnaRFRM.position', 'AnnaT10.position', 'AnnaLMT5.position', 'AnnaLKNI.position', 'AnnaRIEL.position', 'AnnaRBHD.position', 'AnnaSolvingHips.position', 'AnnaLBHD.position', 'AnnaRIHAND.position', 'AnnaLBWT.position', 'AnnaRUPA.position', 'AnnaLFRM.position', 'AnnaRFHD.position', 'AnnaRIWR.position', 'AnnaLUPA.position', 'AnnaLFHD.position', 'AnnaRKNE.position', 'AnnaLFWT.position', 'AnnaLSHN.position', 'AnnaRTOE.position', 'AnnaLHEL.position', 'AnnaRKNI.position', 'AnnaCLAV.position', 'AnnaRHEL.position', 'AnnaMBWT.position', 'AnnaRMT5.position', 'AnnaARIEL.position', 'AnnaRBSH.position', 'AnnaLIHAND.position', 'AnnaLMT1.position', 'AnnaLTOE.position', 'AnnaRFWT.position', 'AnnaLabelingHips.position', 'AnnaMFWT.position', 'AnnaRANK.position', 'AnnaLOWR.position', 'AnnaLIEL.position', 'AnnaROHAND.position', 'AnnaRMT1.position', 'AnnaRTHI.position', 'AnnaLBSH.position', 'AnnaRELB.position', 'AnnaROWR.position', 'AnnaLANK.position', 'AnnaSTRN.position', 'AnnaLELB.position', 'AnnaLTHI.position', 'AnnaLFSH.position', 'AnnaLKNE.position']

In [None]:
import mpl_toolkits.mplot3d.axes3d as p3
from mpl_toolkits.mplot3d.art3d import juggle_axes
import matplotlib.pyplot as plt
from IPython.display import HTML
from matplotlib import animation
import matplotlib

# ask matplotlib to plot up to 2^128 frames in animations
matplotlib.rcParams['animation.embed_limit'] = 2**128

def update_points(time, points, X):
  points._offsets3d = juggle_axes(X[:,time,0], X[:,time,1], X[:,time,2], 'z')

def get_plot(X, lim=2, frames=200, duration=45):
  fig = plt.figure()
  ax = p3.Axes3D(fig)
  ax.set_xlim(-lim, lim)
  ax.set_ylim(-lim, lim)
  ax.set_zlim(-lim, lim)
  points = ax.scatter(X[:,0,0], X[:,0,1], X[:,0,2], depthshade=False) # x,y,z vals
  return animation.FuncAnimation(fig,
    update_points,
    frames,
    interval=duration,
    fargs=(points, X),
    blit=False  
  ).to_jshtml()

if False:
  HTML(get_plot(X, frames=int(X.shape[1])))

In [None]:
# center each of the 3 dimensional features
X -= np.amin(X, axis=(0, 1))
X /= np.amax(X, axis=(0, 1))

In [None]:
# via https://github.com/cpmpercussion/keras-mdn-layer

'''
A Mixture Density Layer for Keras
cpmpercussion: Charles Martin (University of Oslo) 2018
https://github.com/cpmpercussion/keras-mdn-layer

Hat tip to [Omimo's Keras MDN layer](https://github.com/omimo/Keras-MDN) for a starting point for this code.
'''
import keras
from keras import backend as K
from keras.layers import Dense
from keras.engine.topology import Layer
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
tfd = tfp.distributions

def elu_activation(x, plus_one=False):
  '''Exponential Linear Unit activation with a very small addition to help prevent NaN in loss.'''
  if plus_one:
    return (K.elu(x) + 1 + 1e-8)
  return (K.elu(x) + 1e-8)


class MDN(Layer):
  '''A Mixture Density Network Layer for Keras.
  This layer has a few tricks to avoid NaNs in the loss function when training:
    - Activation for variances is ELU + 1 + 1e-8 (to avoid very small values)
    - Mixture weights (pi) are trained in as logits, not in the softmax space.

  A loss function needs to be constructed with the same output dimension and number of mixtures.
  A sampling function is also provided to sample from distribution parametrised by the MDN outputs.
  '''

  def __init__(self, output_dimension, num_mixtures, **kwargs):
    self.output_dim = output_dimension
    self.num_mix = num_mixtures
    with tf.name_scope('MDN'):
      self.mdn_mus = Dense(self.num_mix * self.output_dim, name='mdn_mus') # mix*output vals, no activation
      self.mdn_sigmas = Dense(self.num_mix * self.output_dim, activation=elu_activation, name='mdn_sigmas') # mix*output vals exp activation
      self.mdn_pi = Dense(self.num_mix, name='mdn_pi') # mix vals, logits
    super(MDN, self).__init__(**kwargs)

  def build(self, input_shape):
    self.mdn_mus.build(input_shape)
    self.mdn_sigmas.build(input_shape)
    self.mdn_pi.build(input_shape)
    self.trainable_weights = self.mdn_mus.trainable_weights + self.mdn_sigmas.trainable_weights + self.mdn_pi.trainable_weights
    self.non_trainable_weights = self.mdn_mus.non_trainable_weights + self.mdn_sigmas.non_trainable_weights + self.mdn_pi.non_trainable_weights
    super(MDN, self).build(input_shape)

  def call(self, x, mask=None):
    with tf.name_scope('MDN'):
      mdn_out = keras.layers.concatenate([
        self.mdn_mus(x),
        self.mdn_sigmas(x),
        self.mdn_pi(x)
      ], name='mdn_outputs')
    return mdn_out

  def compute_output_shape(self, input_shape):
    '''Returns output shape, showing the number of mixture parameters.'''
    return (input_shape[0], (2 * self.output_dim * self.num_mix) + self.num_mix)

  def get_config(self):
    config = {
      'output_dimension': self.output_dim,
      'num_mixtures': self.num_mix
    }
    base_config = super(MDN, self).get_config()
    return dict(list(base_config.items()) + list(config.items()))


def get_mixture_loss_func(output_dim, num_mixes):
  '''Construct a loss functions for the MDN layer parametrised by number of mixtures.'''
  # Construct a loss function with the right number of mixtures and outputs
  def loss_func(y_true, y_pred):
    # Reshape inputs in case this is used in a TimeDistribued layer
    y_pred = tf.reshape(y_pred, [-1, (2 * num_mixes * output_dim) + num_mixes], name='reshape_ypreds')
    y_true = tf.reshape(y_true, [-1, output_dim], name='reshape_ytrue')
    # Split the inputs into paramaters
    out_mu, out_sigma, out_pi = tf.split(y_pred, num_or_size_splits=[
      num_mixes * output_dim,
      num_mixes * output_dim,
      num_mixes
    ], axis=-1, name='mdn_coef_split')
    # produces flat list that contains `num_mixes` instances of `output_dim` [n, n, n, ...]
    component_splits = [output_dim] * num_mixes
    # produces `num_mixes` arrays with the mus
    mus = tf.split(out_mu, num_or_size_splits=component_splits, axis=1)
    # produces `num_mixes` arrays with the sigs
    sigs = tf.split(out_sigma, num_or_size_splits=component_splits, axis=1)
    cat = tfd.Categorical(logits=out_pi)
    # produces num_mixes arrays each with a multivariate normal distribution with a single mu and sigma
    coll = [tfd.MultivariateNormalDiag(loc=mu, scale_diag=sig) for mu, sig in zip(mus, sigs)]
    mixture = tfd.Mixture(cat=cat, components=coll)
    loss = mixture.log_prob(y_true)
    loss = tf.negative(loss)
    loss = tf.reduce_mean(loss)
    return loss

  # Actually return the loss_func
  with tf.name_scope('MDN'):
    return loss_func


def split_mixture_params(params, output_dim, num_mixes):
  '''Splits up an array of mixture parameters into mus, sigmas, and pis
  depending on the number of mixtures and output dimension.'''
  mus = params[:num_mixes*output_dim]
  sigs = params[num_mixes*output_dim:2*num_mixes*output_dim]
  pi_logits = params[-num_mixes:]
  return mus, sigs, pi_logits


def softmax(w, t=1.0):
  '''Softmax function for a list or numpy array of logits. Also adjusts temperature.'''
  e = np.array(w) / t  # adjust temperature
  e -= e.max()  # subtract max to protect from exploding exp values.
  e = np.exp(e)
  dist = e / np.sum(e)
  return dist


def sample_from_output(params, output_dim, num_mixes, temp=1.0):
  '''Sample from an MDN output with temperature adjustment.'''
  mus = params[:num_mixes*output_dim]
  sigs = params[num_mixes*output_dim:2*num_mixes*output_dim]
  pis = softmax(params[-num_mixes:], t=temp)
  m = sample_from_categorical(pis)
  # Alternative way to sample from categorical:
  # m = np.random.choice(range(len(pis)), p=pis)
  mus_vector = mus[m*output_dim:(m+1)*output_dim]
  sig_vector = sigs[m*output_dim:(m+1)*output_dim] * temp  # adjust for temperature
  cov_matrix = np.identity(output_dim) * sig_vector
  sample = np.random.multivariate_normal(mus_vector, cov_matrix, 1)
  return sample


def sample_from_categorical(dist):
  '''Samples from a categorical model PDF.'''
  r = np.random.rand(1)  # uniform random number in [0,1]
  accumulate = 0
  for i in range(0, dist.size):
    accumulate += dist[i]
    if accumulate >= r:
      return i
  tf.logging.info('Error sampling mixture model.')
  return -1

In [None]:
from keras.models import Sequential, Model
from keras.layers import Dense, LSTM, Dropout, Activation, CuDNNLSTM
from keras.layers.advanced_activations import LeakyReLU
from keras.losses import mean_squared_error
from keras.optimizers import Adam
from keras import backend as K
import keras, os

# config
look_back = 32 # number of previous time slices to use to predict the time positions at time `i`
lstm_cells = 64 # number of cells in each lstm layer
n_features = int(X.shape[0]) * int(X.shape[2]) # number of coordinate values to be predicted by each of `m` models
input_shape = (look_back, n_features) # shape of each input feature
use_mdn = True # whether to use the MDN final layer or not
m = 2 # number of gaussian models to build if use_mdn == True

# use tensorflow backend
os.environ['KERAS_BACKEND'] = 'tensorflow'

# determine the LSTM cells to use (hinges on whether GPU is available to keras)
gpus = K.tensorflow_backend._get_available_gpus()
LSTM_UNIT = CuDNNLSTM if len(gpus) > 0 else LSTM
print('GPUs found:', gpus)

# build the model
model = Sequential()
model.add(LSTM_UNIT(lstm_cells, return_sequences=True, input_shape=input_shape, ))
model.add(LSTM_UNIT(lstm_cells, return_sequences=True, batch_input_shape=(s), ))
model.add(LSTM_UNIT(lstm_cells, batch_input_shape=(s),))
model.add(Dense(lstm_cells))

if use_mdn:
  model.add(MDN(n_features, m))
  model.compile(loss=get_mixture_loss_func(n_features, m), optimizer=Adam(lr=0.0005))
else:
  model.add(Dense(n_features, activation='tanh'))
  model.compile(loss=mean_squared_error, optimizer='sgd')

model.summary()

In [None]:
# train_x has shape: n_samples, look_back, n_vertices*3
train_x = []
train_y = []

n_obs, n_time, n_attrs = [int(i) for i in X.shape]

for i in range(look_back, n_time-1, 1):
  train_x.append( X[:, i-look_back:i, :].reshape(look_back, n_obs * n_attrs) )
  train_y.append( X[:, i            , :].reshape(n_obs * n_attrs) )
  
train_x = np.array(train_x)
train_y = np.array(train_y)

In [None]:
# check untrained (baseline) accuracy
samples = 1
model.evaluate(train_x[:samples], train_y[:samples])

In [None]:
from livelossplot import PlotLossesKeras

# fit the model
model.fit(train_x, train_y, epochs=20, batch_size=1, callbacks=[PlotLossesKeras()], shuffle=False)

In [None]:
model_path = 'models/dance.model'
weights_path = 'models/dance.weights'

In [None]:
import os

if not os.path.exists('models'): os.makedirs('models')

model.save(model_path)
model.save_weights(weights_path)

In [None]:
if use_mdn:
  model = keras.models.load_model(model_path, custom_objects={
    'MDN': MDN,
    'loss_func': get_mixture_loss_func(n_features, m),
  })
else:
  model = keras.models.load_model(model_path)

In [None]:
# generate `n_frames` of new output time slices
n_frames = 1000

# data will copy the first few frames of X then add new frame predictions
data = X[:, 0:look_back, :]

t0, t1, t2 = [int(i) for i in train_x.shape]
d0, d1, d2 = [int(i) for i in data.shape]

for i in range(look_back, n_frames, 1):
  # get the model's prediction for the position of points at time `i`. result.shape = (1, d0 * d2)
  # here we're only feeding one observation from train_x; the indexing +1 just reshapes the observation
  # from a, b into 1, a, b
  result = model.predict(train_x[i:i+1])
  # if using the mixed density network, pull out vals that describe vertex positions
  if use_mdn:
    result = np.apply_along_axis(sample_from_output, 1, result, n_features, m, temp=1.0)
  # reshape the result into the form of a single time slice in `X` (or `data`)
  result = result.reshape(d0, 1, d2)
  # use the result to generate a new training data slice that includes `look_back` observations
  stacked = np.concatenate( (data[:, i-look_back+1:i, ], result),  axis=1)
  # transform the shape of the stacked observation into a single new training observation's shape
  stacked = stacked.reshape(1, look_back, d0 * d2)
  # add the new training observation to the array of training observations
  train_x = np.vstack((train_x, stacked))  
  # add the result's new time slice to the stack of time slices in `data`
  data = np.concatenate((data, result), axis=1)
  
# remove the first look_back time frames as they were used to seed subsequent observations
data = data[:, look_back:, :]

In [None]:
# skip the first look_back frames
HTML(get_plot(data, frames=n_frames - look_back - 1))