<a href="https://colab.research.google.com/github/AlexandreBourrieau/ML/blob/main/Wavenet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.python.keras import initializers, activations, constraints, regularizers

# Classes Dense3D, TemporalConv

In [43]:
class Dense3D(tf.keras.layers.Layer):
    def __init__(self, units,
                 activation=None,
                 kernel_initializer='glorot_uniform',
                 kernel_regularizer=None,
                 kernel_constraint=None,
                 use_bias=True,
                 bias_initializer="zeros",
                 trainable=True,
                 name=None):
        super(Dense3D, self).__init__(trainable=trainable, name=name)
        self.units = units
        self.activation = activations.get(activation)
        self.kernel_initializer = initializers.get(kernel_initializer)
        self.kernel_regularizer = regularizers.get(kernel_regularizer)
        self.kernel_constraint = constraints.get(kernel_constraint)
        self.use_bias = use_bias
        self.bias_initializer = bias_initializer

    def build(self, input_shape):
        inputs_units = int(input_shape[-1])  # input.get_shape().as_list()[-1]
        self.kernel = self.add_weight('kernel',
                                      shape=[inputs_units, self.units],
                                      initializer=self.kernel_initializer,
                                      regularizer=self.kernel_regularizer,
                                      constraint=self.kernel_constraint,
                                      dtype=tf.float32,
                                      trainable=True)
        if self.use_bias:
            self.bias = self.add_weight("bias",
                                        shape=[self.units],
                                        initializer=self.bias_initializer,
                                        dtype=self.dtype,
                                        trainable=True)
        super(Dense3D, self).build(input_shape)

    def call(self, inputs):
        output = tf.einsum('ijk,kl->ijl', inputs, self.kernel)

        if self.use_bias:
            output += self.bias

        if self.activation is not None:
            output = self.activation(output)
        return output

In [38]:
class TemporalConv(tf.keras.layers.Layer):
    """ Temporal convolutional layer

    """
    def __init__(self, filters,
                 kernel_size,
                 strides=1,
                 dilation_rate=1,
                 activation=None,
                 causal=True,
                 kernel_initializer='glorot_uniform',
                 name=None):
        super(TemporalConv, self).__init__(name=name)
        self.filters = filters
        self.kernel_size = kernel_size
        self.strides = strides
        self.dilation_rate = dilation_rate
        self.activation = activations.get(activation)
        self.causal = causal
        self.kernel_initializer = initializers.get(kernel_initializer)

    def build(self, input_shape):  # Create the weights
        self.conv = tf.keras.layers.Conv1D(kernel_size=self.kernel_size,
                                           kernel_initializer=self.kernel_initializer,
                                           filters=self.filters,
                                           padding='valid',
                                           dilation_rate=self.dilation_rate,
                                           activation=self.activation)
        super(TemporalConv, self).build(input_shape)

    def call(self, input):
        if self.causal:
            padding_size = (self.kernel_size - 1) * self.dilation_rate
            # padding: 1st dim is batch, so [0,0]; 2nd dim is time, so [padding_size, 0]; 3rd dim is feature [0,0]
            input = tf.pad(input, [[0, 0], [padding_size, 0], [0, 0]])

        output = self.conv(input)
        return output

# Classe Wavenet

**Encodeur**

In [39]:
class Encoder(object):
    def __init__(self, params):
        self.params = params
        self.conv_times = []
        for i, (kernel_size, dilation) in enumerate(zip(self.params['kernel_sizes'], self.params['dilation_rates'])):
            self.conv_times.append(TemporalConv(filters=2 * self.params['filters'],
                                                kernel_size=kernel_size,
                                                causal=True,
                                                dilation_rate=dilation))
        self.dense_time1 = Dense3D(units=self.params['filters'], activation='tanh', name='encoder_dense_time1')
        self.dense_time2 = Dense3D(units=self.params['filters'] + self.params['filters'], name='encoder_dense_time2')
        self.dense_time3 = Dense3D(units=self.params['dense_hidden_size'], activation='relu', name='encoder_dense_time3')
        self.dense_time4 = Dense3D(units=1, name='encoder_dense_time_4')

    def forward(self, x):
        """
        :param x:
        :return: conv_inputs [batch_size, time_sequence_length, filters] * time_sequence_length
        """
        inputs = self.dense_time1(inputs=x)  # batch_size * time_sequence_length * filters

        skip_outputs = []
        conv_inputs = [inputs]
        for conv_time in self.conv_times:
            dilated_conv = conv_time(inputs)
            conv_filter, conv_gate = tf.split(dilated_conv, 2, axis=2)
            dilated_conv = tf.nn.tanh(conv_filter) * tf.nn.sigmoid(conv_gate)
            outputs = self.dense_time2(inputs=dilated_conv)
            skips, residuals = tf.split(outputs, [self.params['filters'], self.params['filters']], axis=2)
            inputs += residuals
            conv_inputs.append(inputs)  # batch_size * time_sequence_length * filters
            skip_outputs.append(skips)

        skip_outputs = tf.nn.relu(tf.concat(skip_outputs, axis=2))
        h = self.dense_time3(skip_outputs)
        y_hat = self.dense_time4(h)
        return y_hat, conv_inputs[:-1]

    def __call__(self, x):
        return self.forward(x)

**Décodeur**

In [58]:
class Decoder(object):
    """ Decoder need avoid future data leaks

    """
    def __init__(self, params):
        self.params = params
        self.dense_1 = Dense(self.params['filters'], activation='tanh', name='decoder_dense_1')
        self.dense_2 = Dense(2 * self.params['filters'], name='decoder_dense_2')
        self.dense_3 = Dense(2 * self.params['filters'], use_bias=False, name='decoder_dense_3')
        self.dense_4 = Dense(2 * self.params['filters'], name='decoder_dense_4')
        self.dense_5 = Dense(self.params['dense_hidden_size'], activation='relu', name='decoder_dense_5')
        self.dense_6 = Dense(1, name='decoder_dense_6')

    def foward(self, x, decoder_feature, encoder_states, predict_seq_length, teacher):
        decoder_init_value = x[:, -1, :]

        def cond_fn(time, prev_output, decoder_output_ta):
            return time < predict_seq_length

        def body(time, prev_output, decoder_output_ta):
            if time == 0 or teacher is None:
                current_input = prev_output
            else:
                current_input = teacher[:, time - 1, :]

            if decoder_feature is not None:
                current_feature = decoder_feature[:, time, :]
                current_input = tf.concat([current_input, current_feature], axis=1)

            inputs = self.dense_1(current_input)

            skip_outputs = []
            for i, dilation in enumerate(self.params['dilation_rates']):
                state = encoder_states[i][:, -dilation, :]

                dilated_conv = self.dense_2(state) + self.dense_3(inputs)
                conv_filter, conv_gate = tf.split(dilated_conv, 2, axis=1)
                dilated_conv = tf.nn.tanh(conv_filter) * tf.nn.sigmoid(conv_gate)
                outputs = self.dense_4(dilated_conv)
                skips, residuals = tf.split(outputs, [self.params['filters'], self.params['filters']], axis=1)
                inputs += residuals
                encoder_states[i] = tf.concat([encoder_states[i], tf.expand_dims(inputs, 1)], axis=1)
                skip_outputs.append(skips)

            skip_outputs = tf.nn.relu(tf.concat(skip_outputs, axis=1))
            h = self.dense_5(skip_outputs)
            y_hat = self.dense_6(h)
            decoder_output_ta = decoder_output_ta.write(time, y_hat)
            return time + 1, y_hat, decoder_output_ta

#        loop_init = [tf.constant(0, dtype=tf.int32),decoder_init_value,tf.TensorArray(dtype=tf.float32, size=predict_seq_length)]

        for time in range(0,predict_seq_length):
          _, _, decoder_outputs_ta = body(time,decoder_init_value,tf.TensorArray(dtype=tf.float32, size=predict_seq_length))

#        _, _, decoder_outputs_ta = tf.while_loop(cond=cond_fn, body=body, loop_vars=loop_init)
        decoder_outputs = decoder_outputs_ta.stack()
        decoder_outputs = tf.transpose(decoder_outputs, [1, 0, 2])
        return decoder_outputs

    def __call__(self, x, decoder_feature, encoder_states, predict_seq_length, teacher=None):
        return self.foward(x,decoder_feature,encoder_states,predict_seq_length=predict_seq_length,teacher=teacher)

**Wavenet**

In [41]:
params = {
    'dilation_rates': [2 ** i for i in range(4)],
    'kernel_sizes': [2 for i in range(4)],
    'filters': 128,
    'dense_hidden_size': 64
}

class WaveNet(object):
    """WaveNet network

    """
    def __init__(self, custom_model_params={}, dynamic_decoding=True):
        params.update(custom_model_params)
        self.encoder = Encoder(params)
        self.decoder = Decoder(params)

    def __call__(self, inputs, training, predict_seq_length, teacher=None):
        if isinstance(inputs, tuple):
            x, encoder_feature, decoder_feature = inputs
            encoder_feature = tf.concat([x, encoder_feature], axis=-1)
        else:  # for single variable prediction
            encoder_feature = x = inputs
            decoder_feature = None

        encoder_output, encoder_states = self.encoder(encoder_feature)
        decoder_output = self.decoder(x,decoder_feature,encoder_states=encoder_states,predict_seq_length=predict_seq_length,teacher=teacher)
        return decoder_output

# Utilisation

In [50]:
%tensorflow_version 1.x

TensorFlow is already loaded. Please restart the runtime to change versions.


In [14]:
!pip install --upgrade keras

Requirement already up-to-date: keras in /usr/local/lib/python3.7/dist-packages (2.4.3)


In [59]:
input_seq_length = 100
output_seq_length = 1

Model = WaveNet()
inputs = tf.keras.layers.Input([input_seq_length, 1])
outputs = Model(inputs, training=True, predict_seq_length=output_seq_length)

TypeError: ignored