In [None]:
!pip install tensorflow-io==0.17

In [2]:
import os
import pathlib
import re
import shutil
import math
import librosa

import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tensorflow_io as tfio

from tensorflow.keras.layers.experimental import preprocessing
from tensorflow.keras import layers
from tensorflow.keras import models
from tensorflow import keras
from string import ascii_lowercase

# Upload weight.zip, audio_initialize.wav, text_initialize.txt

In [40]:
from google.colab import files
files.upload()

Saving weight.zip to weight (1).zip


In [3]:
import zipfile

if os.path.exists('./weight.zip'):
  with zipfile.ZipFile('./weight.zip','r') as f:
    f.extractall('./')

### EncodingDicoding Class

In [4]:
class EncodingDecoding:
  def __init__(self):
    self.char = (["-", "#", "<", ">"]
                 + [c for c in ascii_lowercase]
                 + [" ", ".", ",", "?","'"])

  def encode_label(self, label):
    keys_tensor = tf.constant(self.char)
    vals_tensor = tf.constant(np.arange(len(self.char)))
    input_tensor = label

    table = tf.lookup.StaticHashTable(
        tf.lookup.KeyValueTensorInitializer(keys_tensor, vals_tensor),
        default_value=-1)
    
    return table.lookup(input_tensor)

  def decode_label(self,predicted_label):
    keys_tensor = tf.constant(np.arange(len(self.char)))
    vals_tensor = tf.constant(self.char)
    input_tensor = tf.cast(predicted_label, dtype=tf.int64)

    table = tf.lookup.StaticHashTable(
        tf.lookup.KeyValueTensorInitializer(keys_tensor, vals_tensor),
        default_value='')
    
    return table.lookup(input_tensor).numpy()
  
  def decode_audio(self, audio_file):
    ''' decode flac file to float tensor'''
    audio = tfio.IOTensor.graph(tf.int16).from_audio(audio_file)
    audio_tensor = tf.squeeze(audio.to_tensor(), axis=-1)
    waveform = tf.cast(audio_tensor, tf.float32)/32768.0

    return waveform

### AudioProcessing Class

In [5]:
class AudioDataProcessing:
  def __init__(self):
    self.pad_len = 2754

  def db_scale(self, S, amin=1e-16, top_db=80.0):
    """Convert spectrogram to decible units"""
    def _tf_log10(x):
        numerator = tf.math.log(x)
        denominator = tf.math.log(tf.constant(10, dtype=numerator.dtype))
        return numerator / denominator
    
    ref = tf.reduce_max(S)

    log_spec = 10.0 * _tf_log10(tf.maximum(amin, S))
    log_spec -= 10.0 * _tf_log10(tf.maximum(amin, ref))

    log_spec = tf.maximum(log_spec, tf.reduce_max(log_spec) - top_db)

    return log_spec
    
  def get_spectrogram(self, waveform):
    '''Create spectogram from audio wave form'''
    #Extracting log spectrogram from audio waveform
    waveform = tf.cast(waveform, tf.float32)
    spectrogram = tf.signal.stft(
        waveform, 
        frame_length=200, 
        frame_step=80, 
        fft_length=256)
    log_spectrogram = tf.math.pow(tf.abs(spectrogram), 0.5)
    log_spectrogram = self.db_scale(log_spectrogram)

    # normalisation
    means = tf.math.reduce_mean(log_spectrogram, 1, keepdims=True)
    stddevs = tf.math.reduce_std(log_spectrogram, 1, keepdims=True)
    x = (log_spectrogram - means) / stddevs
    
    #padding to 20 seconds
    pad_len = self.pad_len
    paddings = tf.constant([[0, self.pad_len], [0, 0]])
    x = tf.pad(x, paddings, "CONSTANT")[:self.pad_len, :]
    return x

### Label Processing

In [6]:
class LabelProcessing(EncodingDecoding):
  def __init__(self):
    super().__init__()
    self.max_length = 200
    
  def get_label(self, text_file):
    
    #convert tensor into str
    text = tf.io.read_file(text_file)

    #slice text so it will be under 198 character
    def string_slice(x):
      text_slice = x.numpy()[:self.max_length-2]
      return text_slice

    text_slice = tf.py_function(func=string_slice, inp=[text], Tout=tf.string)
    text_slice.set_shape(text.get_shape())

    #split char from whole string
    chars = tf.strings.join([b'<', text_slice, b'>'])
    chars = tf.strings.bytes_split(chars)
    
    # encode text file to numeric values  
    label = super().encode_label(chars)

    zero_padding = tf.zeros([self.max_length] - tf.shape(label), dtype=tf.int64)

    # Concatenate encode text with padding so that all encode text will be of the 
    # same length
    label = tf.concat([label, zero_padding], 0)
    label = tf.cast(label, dtype=tf.int32)

    return label

### Model

In [7]:
class TokenEmbedding(layers.Layer):
    def __init__(self, num_vocab=1000, maxlen=100, num_hid=64):
        super().__init__()
        self.emb = tf.keras.layers.Embedding(num_vocab, num_hid)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=num_hid)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        x = self.emb(x)
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        return x + positions


class SpeechFeatureEmbedding(layers.Layer):
    def __init__(self, num_hid=64, maxlen=100):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv1D(
            num_hid, 11, strides=2, padding="same", activation="relu"
        )
        self.conv2 = tf.keras.layers.Conv1D(
            num_hid, 11, strides=2, padding="same", activation="relu"
        )
        self.conv3 = tf.keras.layers.Conv1D(
            num_hid, 11, strides=2, padding="same", activation="relu"
        )
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=num_hid)

    def call(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        return self.conv3(x)

In [8]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential(
            [
                layers.Dense(feed_forward_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

In [9]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, dropout_rate=0.1):
        super().__init__()
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = layers.LayerNormalization(epsilon=1e-6)
        self.self_att = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.enc_att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.self_dropout = layers.Dropout(0.5)
        self.enc_dropout = layers.Dropout(0.1)
        self.ffn_dropout = layers.Dropout(0.1)
        self.ffn = keras.Sequential(
            [
                layers.Dense(feed_forward_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )

    def causal_attention_mask(self, batch_size, n_dest, n_src, dtype):
        """Masks the upper half of the dot product matrix in self attention.

        This prevents flow of information from future tokens to current token.
        1's in the lower triangle, counting from the lower right corner.
        """
        i = tf.range(n_dest)[:, None]
        j = tf.range(n_src)
        m = i >= j - n_src + n_dest
        mask = tf.cast(m, dtype)
        mask = tf.reshape(mask, [1, n_dest, n_src])
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
        )
        return tf.tile(mask, mult)

    def call(self, enc_out, target):
        input_shape = tf.shape(target)
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        causal_mask = self.causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        target_att = self.self_att(target, target, attention_mask=causal_mask)
        target_norm = self.layernorm1(target + self.self_dropout(target_att))
        enc_out = self.enc_att(target_norm, enc_out)
        enc_out_norm = self.layernorm2(self.enc_dropout(enc_out) + target_norm)
        ffn_out = self.ffn(enc_out_norm)
        ffn_out_norm = self.layernorm3(enc_out_norm + self.ffn_dropout(ffn_out))
        return ffn_out_norm

In [10]:
class Transformer(keras.Model):
    def __init__(
        self,
        num_hid=64,
        num_head=2,
        num_feed_forward=128,
        source_maxlen=100,
        target_maxlen=100,
        num_layers_enc=4,
        num_layers_dec=1,
        num_classes=10,
    ):
        super().__init__()
        self.loss_metric = keras.metrics.Mean(name="loss")
        self.num_layers_enc = num_layers_enc
        self.num_layers_dec = num_layers_dec
        self.target_maxlen = target_maxlen
        self.num_classes = num_classes

        self.enc_input = SpeechFeatureEmbedding(num_hid=num_hid, maxlen=source_maxlen)
        self.dec_input = TokenEmbedding(
            num_vocab=num_classes, maxlen=target_maxlen, num_hid=num_hid
        )

        self.encoder = keras.Sequential(
            [self.enc_input]
            + [
                TransformerEncoder(num_hid, num_head, num_feed_forward)
                for _ in range(num_layers_enc)
            ]
        )

        for i in range(num_layers_dec):
            setattr(
                self,
                f"dec_layer_{i}",
                TransformerDecoder(num_hid, num_head, num_feed_forward),
            )

        self.classifier = layers.Dense(num_classes)

    def decode(self, enc_out, target):
        y = self.dec_input(target)
        for i in range(self.num_layers_dec):
            y = getattr(self, f"dec_layer_{i}")(enc_out, y)
        return y

    def call(self, inputs):
        source = inputs[0]
        target = inputs[1]
        x = self.encoder(source)
        y = self.decode(x, target)
        return self.classifier(y)

    @property
    def metrics(self):
        return [self.loss_metric]

    def train_step(self, batch):
        """Processes one batch inside model.fit()."""
        batch = dict(source = batch[0], target = batch[1])
        source = batch["source"]
        target = batch["target"]
        dec_input = target[:, :-1]
        dec_target = target[:, 1:]
        with tf.GradientTape() as tape:
            preds = self([source, dec_input])
            one_hot = tf.one_hot(dec_target, depth=self.num_classes)
            mask = tf.math.logical_not(tf.math.equal(dec_target, 0))
            loss = self.compiled_loss(one_hot, preds, sample_weight=mask)
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        self.loss_metric.update_state(loss)
        return {"loss": self.loss_metric.result()}

    def test_step(self, batch):
        batch = dict(source = batch[0], target = batch[1])
        source = batch["source"]
        target = batch["target"]
        dec_input = target[:, :-1]
        dec_target = target[:, 1:]
        preds = self([source, dec_input])
        one_hot = tf.one_hot(dec_target, depth=self.num_classes)
        mask = tf.math.logical_not(tf.math.equal(dec_target, 0))
        loss = self.compiled_loss(one_hot, preds, sample_weight=mask)
        self.loss_metric.update_state(loss)
        return {"loss": self.loss_metric.result()}

    def generate(self, source, target_start_token_idx):
        """Performs inference over one batch of inputs using greedy decoding."""
        bs = tf.shape(source)[0]
        enc = self.encoder(source)
        dec_input = tf.ones((bs, 1), dtype=tf.int32) * target_start_token_idx
        dec_logits = []
        for i in range(self.target_maxlen - 1):
            dec_out = self.decode(enc, dec_input)
            logits = self.classifier(dec_out)
            logits = tf.argmax(logits, axis=-1, output_type=tf.int32)
            last_logit = tf.expand_dims(logits[:, -1], axis=-1)
            dec_logits.append(last_logit)
            dec_input = tf.concat([dec_input, last_logit], axis=-1)
        return dec_input

### Model Call

In [11]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
  def __init__(self, d_model, warmup_steps=4000):
    super().__init__()
    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)
    self.warmup_steps = warmup_steps

  def __call__(self, step):
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps ** -1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

In [12]:
def create_model(optimizer):
  model = Transformer(
      num_hid=256,
      num_head=4,
      num_feed_forward=1024,
      target_maxlen=200,
      num_layers_enc=6,
      num_layers_dec=6,
      num_classes=35,
    )
  loss_fn = tf.keras.losses.CategoricalCrossentropy(
      from_logits=True, label_smoothing=0.1,
    )
  model.compile(optimizer=optimizer, loss=loss_fn)
  
  return model

learning_rate = CustomSchedule(d_model=256)
optimizer = keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98,
                                     epsilon=1e-9)
model = create_model(optimizer)

### Load Model

In [13]:
decoding = EncodingDecoding()
audio_processing = AudioDataProcessing()
label_processing = LabelProcessing()
# Convert audio file to db-scale spectrogram
waveform = decoding.decode_audio('./audio_initialize.wav')
spectrogram = audio_processing.get_spectrogram(waveform)
x = tf.expand_dims(spectrogram, axis=0)


# Encode label
label = label_processing.get_label('./text_initialize.txt')
y = tf.expand_dims(label, axis=0)

In [15]:
model.train_on_batch(x, y)

model.load_weights('./weight/my_weights')

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f117f58f390>

In [16]:
def inference(model, spectrogram):
  label = model.generate(spectrogram,2)

  decoding = EncodingDecoding()
  label = decoding.decode_label(tf.cast(tf.squeeze(label, axis=0),
                                        dtype=tf.int64))
  
  label = b''.join(label).decode('utf-8')
  return label

In [17]:
inference(model, x)

'<printing, in the only are it present concernent in the only all the arts and crafts represented in the only sens with which wear it presense with which we are it presented in theartsibifitenct priviv'