<a href="https://colab.research.google.com/github/ajit2704/dancingAI/blob/master/dancingAI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import os
from google.colab import drive
drive.mount('/gdrive')
os.symlink('/gdrive/My Drive', '/content/gdrive')
!ls -l /content/gdrive/


In [0]:
import keras
from keras import backend as K
from keras.layers import Dense
from keras.engine.topology import Layer
import numpy as np
from tensorflow.contrib.distributions import Categorical, Mixture, MultivariateNormalDiag
import tensorflow as tf
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.layers import Flatten
from keras.layers import Reshape
from keras.layers import Input
from keras.layers import UpSampling2D
from keras.layers import Lambda
from keras.models import Model
from keras.losses import binary_crossentropy
from keras.callbacks import ModelCheckpoint
from keras.layers import Dropout
from keras.layers import LSTM
from keras.optimizers import adam
from keras.callbacks import ModelCheckpoint
from sklearn.preprocessing import MinMaxScaler
import cv2
import os


def elu_plus_one_plus_epsilon(x):
    """ELU activation with a very small addition to help prevent NaN in loss."""
    return (K.elu(x) + 1 + 1e-8)


class MDN(Layer):
    """A Mixture Density Network Layer for Keras.
    This layer has a few tricks to avoid NaNs in the loss function when training:
        - Activation for variances is ELU + 1 + 1e-8 (to avoid very small values)
        - Mixture weights (pi) are trained in as logits, not in the softmax space.

    A loss function needs to be constructed with the same output dimension and number of mixtures.
    A sampling function is also provided to sample from distribution parametrised by the MDN outputs.
    """

    def __init__(self, output_dimension, num_mixtures, **kwargs):
        self.output_dim = output_dimension
        self.num_mix = num_mixtures
        with tf.name_scope('MDN'):
            self.mdn_mus = Dense(self.num_mix * self.output_dim, name='mdn_mus')  # mix*output vals, no activation
            self.mdn_sigmas = Dense(self.num_mix * self.output_dim, activation=elu_plus_one_plus_epsilon, name='mdn_sigmas')  # mix*output vals exp activation
            self.mdn_pi = Dense(self.num_mix, name='mdn_pi')  # mix vals, logits
        super(MDN, self).__init__(**kwargs)

    def build(self, input_shape):
        self.mdn_mus.build(input_shape)
        self.mdn_sigmas.build(input_shape)
        self.mdn_pi.build(input_shape)
        self.trainable_weights = self.mdn_mus.trainable_weights + self.mdn_sigmas.trainable_weights + self.mdn_pi.trainable_weights
        self.non_trainable_weights = self.mdn_mus.non_trainable_weights + self.mdn_sigmas.non_trainable_weights + self.mdn_pi.non_trainable_weights
        super(MDN, self).build(input_shape)

    def call(self, x, mask=None):
        with tf.name_scope('MDN'):
            mdn_out = keras.layers.concatenate([self.mdn_mus(x),
                                                self.mdn_sigmas(x),
                                                self.mdn_pi(x)],
                                               name='mdn_outputs')
        return mdn_out

    def compute_output_shape(self, input_shape):
        return (input_shape[0], self.output_dim)

    def get_config(self):
        config = {
            "output_dimension": self.output_dim,
            "num_mixtures": self.num_mix
        }
        base_config = super(MDN, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))


def get_mixture_loss_func(output_dim, num_mixes):
    """Construct a loss functions for the MDN layer parametrised by number of mixtures."""
    # Construct a loss function with the right number of mixtures and outputs
    def loss_func(y_true, y_pred):
        out_mu, out_sigma, out_pi = tf.split(y_pred, num_or_size_splits=[num_mixes * output_dim,
                                                                         num_mixes * output_dim,
                                                                         num_mixes],
                                             axis=1, name='mdn_coef_split')
        cat = Categorical(logits=out_pi)
        component_splits = [output_dim] * num_mixes
        mus = tf.split(out_mu, num_or_size_splits=component_splits, axis=1)
        sigs = tf.split(out_sigma, num_or_size_splits=component_splits, axis=1)
        coll = [MultivariateNormalDiag(loc=loc, scale_diag=scale) for loc, scale
                in zip(mus, sigs)]
        mixture = Mixture(cat=cat, components=coll)
        loss = mixture.log_prob(y_true)
        loss = tf.negative(loss)
        loss = tf.reduce_mean(loss)
        return loss

    # Actually return the loss_func
    with tf.name_scope('MDN'):
        return loss_func


def get_mixture_sampling_fun(output_dim, num_mixes):
    """Construct a sampling function for the MDN layer parametrised by mixtures and output dimension."""
    # Construct a loss function with the right number of mixtures and outputs
    def sampling_func(y_pred):
        out_mu, out_sigma, out_pi = tf.split(y_pred, num_or_size_splits=[num_mixes * output_dim,
                                                                         num_mixes * output_dim,
                                                                         num_mixes],
                                             axis=1, name='mdn_coef_split')
        cat = Categorical(logits=out_pi)
        component_splits = [output_dim] * num_mixes
        mus = tf.split(out_mu, num_or_size_splits=component_splits, axis=1)
        sigs = tf.split(out_sigma, num_or_size_splits=component_splits, axis=1)
        coll = [MultivariateNormalDiag(loc=loc, scale_diag=scale) for loc, scale
                in zip(mus, sigs)]
        mixture = Mixture(cat=cat, components=coll)
        samp = mixture.sample()
        # Todo: temperature adjustment for sampling function.
        return samp

    # Actually return the loss_func
    with tf.name_scope('MDNLayer'):
        return sampling_func


def get_mixture_mse_accuracy(output_dim, num_mixes):
    """Construct an MSE accuracy function for the MDN layer
    that takes one sample and compares to the true value."""
    # Construct a loss function with the right number of mixtures and outputs
    def mse_func(y_true, y_pred):
        out_mu, out_sigma, out_pi = tf.split(y_pred, num_or_size_splits=[num_mixes * output_dim,
                                                                         num_mixes * output_dim,
                                                                         num_mixes],
                                             axis=1, name='mdn_coef_split')
        cat = Categorical(logits=out_pi)
        component_splits = [output_dim] * num_mixes
        mus = tf.split(out_mu, num_or_size_splits=component_splits, axis=1)
        sigs = tf.split(out_sigma, num_or_size_splits=component_splits, axis=1)
        coll = [MultivariateNormalDiag(loc=loc, scale_diag=scale) for loc, scale
                in zip(mus, sigs)]
        mixture = Mixture(cat=cat, components=coll)
        samp = mixture.sample()
        mse = tf.reduce_mean(tf.square(samp - y_true), axis=-1)
        # Todo: temperature adjustment for sampling functon.
        return mse

    # Actually return the loss_func
    with tf.name_scope('MDNLayer'):
        return mse_func


def split_mixture_params(params, output_dim, num_mixes):
    """Splits up an array of mixture parameters into mus, sigmas, and pis
    depending on the number of mixtures and output dimension."""
    mus = params[:num_mixes*output_dim]
    sigs = params[num_mixes*output_dim:2*num_mixes*output_dim]
    pi_logits = params[-num_mixes:]
    return mus, sigs, pi_logits


def softmax(w, t=1.0):
    """Softmax function for a list or numpy array of logits. Also adjusts temperature."""
    e = np.array(w) / t  # adjust temperature
    e -= e.max()  # subtract max to protect from exploding exp values.
    e = np.exp(e)
    dist = e / np.sum(e)
    return dist


def sample_from_categorical(dist):
    """Samples from a categorical model PDF."""
    r = np.random.rand(1)  # uniform random number in [0,1]
    accumulate = 0
    for i in range(0, dist.size):
        accumulate += dist[i]
        if accumulate >= r:
            return i
    tf.logging.info('Error sampling mixture model.')
    return -1


def sample_from_output(params, output_dim, num_mixes, temp=1.0):
    """Sample from an MDN output with temperature adjustment."""
    mus = params[:num_mixes*output_dim]
    sigs = params[num_mixes*output_dim:2*num_mixes*output_dim]
    pis = softmax(params[-num_mixes:], t=temp)
    m = sample_from_categorical(pis)
    # Alternative way to sample from categorical:
    # m = np.random.choice(range(len(pis)), p=pis)
    mus_vector = mus[m*output_dim:(m+1)*output_dim]
    sig_vector = sigs[m*output_dim:(m+1)*output_dim] * temp  # adjust for temperature
    cov_matrix = np.identity(output_dim) * sig_vector
    sample = np.random.multivariate_normal(mus_vector, cov_matrix, 1)
    return sample

Using TensorFlow backend.


In [0]:


latent_dim = 128
def sampling(args):
    z_mean, z_log_var = args
    batch = K.shape(z_mean)[0]
    dim = K.int_shape(z_mean)[1]
    # by default, random_normal has mean=0 and std=1.0
    epsilon = K.random_normal(shape=(batch, dim))
    return z_mean + K.exp(0.5 * z_log_var) * epsilon


input_img = Input(shape=(120,208,1))
x = Conv2D(filters=128,kernel_size=3, activation='relu', padding='same')(input_img)
x = MaxPooling2D(pool_size=2)(x)
x = Conv2D(filters=64,kernel_size=3, activation='relu', padding='same')(x)
x = MaxPooling2D(pool_size=2)(x)
x = Conv2D(filters=32,kernel_size=3, activation='relu', padding='same')(x)
x = MaxPooling2D(pool_size=2)(x)
shape = K.int_shape(x)
x = Flatten()(x)
x = Dense(128,kernel_initializer='glorot_uniform')(x)

z_mean = Dense(latent_dim)(x)
z_log_var = Dense(latent_dim)(x)
z = Lambda(sampling, output_shape=(latent_dim,), name="z")([z_mean,z_log_var])

encoder = Model(input_img, [z_mean, z_log_var,z], name="encoder")
encoder.summary()


latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
x = Dense(shape[1] * shape[2] * shape[3], kernel_initializer='glorot_uniform',activation='relu')(latent_inputs)
x = Reshape((shape[1],shape[2],shape[3]))(x)
x = Dense(128,kernel_initializer='glorot_uniform')(x)
x = Conv2D(filters=32, kernel_size=3, activation='relu', padding='same')(x)
x = UpSampling2D(size=(2,2))(x)
x = Conv2D(filters=64,kernel_size=3, activation='relu', padding='same')(x)
x = UpSampling2D(size=(2,2))(x)
x = Conv2D(filters=128,kernel_size=3, activation='relu', padding='same')(x)
x = UpSampling2D(size=(2,2))(x)
x = Conv2D(filters=1,kernel_size=3, activation='sigmoid', padding='same')(x)

decoder = Model(latent_inputs,x,name='decoder')

decoder.summary()


outputs = decoder(encoder(input_img)[2])
print(outputs.shape)
vae = Model(input_img,outputs,name="vae")

def data_generator(batch_size,limit):

	batch = []
	counter = 1
	while 1:
		for i in range(1,limit+1):
			if counter >= limit:
				counter = 1
			img = cv2.imread("imgs/{}.jpg".format(counter),cv2.IMREAD_GRAYSCALE)
			img = img.reshape(120,208,1)
			batch.append(img)
			if len(batch) == batch_size:
				batch_np = np.array(batch) / 255
				batch = []
				yield (batch_np,None)
			counter += 1



__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            (None, 120, 208, 1)  0                                            
__________________________________________________________________________________________________
conv2d_8 (Conv2D)               (None, 120, 208, 128 1280        input_2[0][0]                    
__________________________________________________________________________________________________
max_pooling2d_4 (MaxPooling2D)  (None, 60, 104, 128) 0           conv2d_8[0][0]                   
__________________________________________________________________________________________________
conv2d_9 (Conv2D)               (None, 60, 104, 64)  73792       max_pooling2d_4[0][0]            
__________________________________________________________________________________________________
max_poolin

# Set paths

In [0]:
PATH = '/content/gdrive/Colab/src/dancenet-master/d.h5'

In [0]:
data = np.load('/content/gdrive/Colab/src/dancenet-master/video.npy')
print(data.shape)

(20210, 1, 128)


In [0]:
data = np.array(data).reshape(-1,128)
scaler = MinMaxScaler(feature_range=(0, 1))
scaler = scaler.fit(data)
data =  scaler.transform(data)

In [0]:
numComponents = 24
outputDim = 128

In [0]:
inputs = Input(shape=(128,))
x = Reshape((1,128))(inputs)
x = LSTM(512, return_sequences=True,input_shape=(1,128))(x)
x = Dropout(0.40)(x)
x = LSTM(512, return_sequences=True)(x)
x = Dropout(0.40)(x)
x = LSTM(512)(x)
x = Dropout(0.40)(x)
x = Dense(1000,activation='relu')(x)
outputs = MDN(outputDim, numComponents)(x)
model = Model(inputs=inputs,outputs=outputs)
print(model.summary())

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_3 (InputLayer)         (None, 128)               0         
_________________________________________________________________
reshape_3 (Reshape)          (None, 1, 128)            0         
_________________________________________________________________
lstm_1 (LSTM)                (None, 1, 512)            1312768   
_________________________________________________________________
dropout_1 (Dropout)          (None, 1, 512)            0         
_________________________________________________________________
lstm_2 (LSTM)                (None, 1, 512)            2099200   
_________________________________________________________________
dropout_2 (Dropout)          (None, 1, 512)            0         
_________________________________________________________________
lstm_3 (LSTM)                (None, 512)               2099200   
__________

In [0]:
opt = adam(lr=0.0005)
model.compile(loss=get_mixture_loss_func(outputDim,numComponents),optimizer=opt)

Instructions for updating:
The TensorFlow Distributions library has moved to TensorFlow Probability (https://github.com/tensorflow/probability). You should update all references to use `tfp.distributions` instead of `tf.contrib.distributions`.
Instructions for updating:
The TensorFlow Distributions library has moved to TensorFlow Probability (https://github.com/tensorflow/probability). You should update all references to use `tfp.distributions` instead of `tf.contrib.distributions`.
Instructions for updating:
The TensorFlow Distributions library has moved to TensorFlow Probability (https://github.com/tensorflow/probability). You should update all references to use `tfp.distributions` instead of `tf.contrib.distributions`.
Instructions for updating:
The TensorFlow Distributions library has moved to TensorFlow Probability (https://github.com/tensorflow/probability). You should update all references to use `tfp.distributions` instead of `tf.contrib.distributions`.
Instructions for updatin

In [0]:
train = False #change to True to train from scratch

if train:
    X = data[0:len(data)-1]
    Y = data[1:len(data)]
    checkpoint = ModelCheckpoint(PATH, monitor='loss', verbose=1, save_best_only=True, mode='auto')
    callbacks_list = [checkpoint]
    model.fit(X,Y,batch_size=1024, verbose=1, shuffle=False, validation_split=0.20, epochs=10000, callbacks=callbacks_list)

# Load weights

In [0]:
vae.load_weights('/content/gdrive/Colab/src/dancenet-master/vae.h5')
model.load_weights(PATH)

# Generate Video

In [0]:
z = cv2.VideoWriter_fourcc(*'mp4v')
video = cv2.VideoWriter("/content/gdrive/Colab/src/dancenet-master/result.mp4", z, 30.0, (208, 120))
lv_in = data[0]

for i in range(500):
    input = np.array(y).reshape(1,128)
    result = model.predict(input)
    shape = np.array(result).shape[1]
    result = np.array(result).reshape(shape)
    result = sample_from_output(result,128,numComponents,temp=0.01)
    result = scaler.inverse_transform(result)
    img = decoder.predict(np.array(result).reshape(1,128))
    img = np.array(img).reshape(120,208,1)
    img = img * 255
    img = np.array(img).astype("uint8")
    img = cv2.cvtColor(img,cv2.COLOR_GRAY2RGB)
    y = result
    video.write(img)
video.release()
