## Transformer

In [1]:
import numpy as np
import tensorflow as tf

In [2]:
def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates


def positional_encoding(position, d_model):
    angle_rads = get_angles(
        np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model
    )

    # apply sin to even indices in the array; 2i
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])

    # apply cos to odd indices in the array; 2i+1
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

    pos_encoding = angle_rads[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)


def scaled_dot_product_attention(q, k, v, mask):
    """Calculate the attention weights.
    q, k, v must have matching leading dimensions.
    k, v must have matching penultimate dimension, i.e.: seq_len_k = seq_len_v.
    The mask has different shapes depending on its type(padding or look ahead)
    but it must be broadcastable for addition.
    Args:
      q: query shape == (..., seq_len_q, depth)
      k: key shape == (..., seq_len_k, depth)
      v: value shape == (..., seq_len_v, depth_v)
      mask: Float tensor with shape broadcastable
            to (..., seq_len_q, seq_len_k). Defaults to None.
    Returns:
      output, attention_weights
    """

    matmul_qk = tf.matmul(q, k, transpose_b=True)  # (..., seq_len_q, seq_len_k)

    # scale matmul_qk
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    # add the mask to the scaled tensor.
    if mask is not None:
        scaled_attention_logits += mask * -1e9

        # softmax is normalized on the last axis (seq_len_k) so that the scores
    # add up to 1.
    attention_weights = tf.nn.softmax(
        scaled_attention_logits, axis=-1
    )  # (..., seq_len_q, seq_len_k)

    output = tf.matmul(attention_weights, v)  # (..., seq_len_q, depth_v)

    return output, attention_weights


class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)

        self.dense = tf.keras.layers.Dense(d_model)

    def split_heads(self, x, batch_size):
        """Split the last dimension into (num_heads, depth).
        Transpose the result such that the shape is (batch_size, num_heads, seq_len, depth)
        """
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask=None):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)  # (batch_size, seq_len, d_model)
        k = self.wk(k)  # (batch_size, seq_len, d_model)
        v = self.wv(v)  # (batch_size, seq_len, d_model)

        q = self.split_heads(q, batch_size)  # (batch_size, num_heads, seq_len_q, depth)
        k = self.split_heads(k, batch_size)  # (batch_size, num_heads, seq_len_k, depth)
        v = self.split_heads(v, batch_size)  # (batch_size, num_heads, seq_len_v, depth)

        # scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth)
        # attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k)
        scaled_attention, attention_weights = scaled_dot_product_attention(
            q, k, v, mask
        )

        scaled_attention = tf.transpose(
            scaled_attention, perm=[0, 2, 1, 3]
        )  # (batch_size, seq_len_q, num_heads, depth)

        concat_attention = tf.reshape(
            scaled_attention, (batch_size, -1, self.d_model)
        )  # (batch_size, seq_len_q, d_model)

        output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)

        return output, attention_weights


def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential(
        [
            tf.keras.layers.Dense(dff, activation="relu"),  # (batch_size, seq_len, dff)
            tf.keras.layers.Dense(d_model),  # (batch_size, seq_len, d_model)
        ]
    )


class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, x, training=None, mask=None):
        attn_output, _ = self.mha(x, x, x, mask)  # (batch_size, input_seq_len, d_model)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)  # (batch_size, input_seq_len, d_model)

        ffn_output = self.ffn(out1)  # (batch_size, input_seq_len, d_model)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(
            out1 + ffn_output
        )  # (batch_size, input_seq_len, d_model)

        return out2


class Encoder(tf.keras.layers.Layer):
    def __init__(
        self, num_layers, d_model, num_heads, dff, maximum_position_encoding, rate=0.1,
    ):
        super(Encoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.pos_encoding = positional_encoding(maximum_position_encoding, self.d_model)

        self.enc_layers = [
            EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)
        ]

        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, training=None, mask=None):
        seq_len = tf.shape(x)[1]

        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)

        return x  # (batch_size, input_seq_len, d_model)


class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000, name=None):
        super(CustomSchedule, self).__init__()

        self.d_model = d_model
        self.d_model = tf.cast(self.d_model, tf.float32)

        self.warmup_steps = warmup_steps
        self.name = name  # Modified from the source

    def __call__(self, step):
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)

        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

    def get_config(self):  # Modified from the source
        return {
            "d_model": self.d_model,
            "warmup_steps": self.warmup_steps,
            "name": self.name,
        }

## Model

In [3]:
!pip install transformer-encoder
from tensorflow.keras.layers import (
    Input,
    GlobalAvgPool1D,
    Dense,
    Bidirectional,
    GRU,
    Dropout,
)
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.python.framework import ops
from tensorflow.python.keras import backend as K
from tensorflow.python.ops import clip_ops
from tensorflow.python.ops import math_ops
from tensorflow.keras.losses import mae


Collecting transformer-encoder
  Downloading transformer_encoder-0.0.3-py3-none-any.whl (9.0 kB)
Installing collected packages: transformer-encoder
Successfully installed transformer-encoder-0.0.3


In [4]:
def custom_binary_accuracy(y_true, y_pred, threshold=0.5):
    threshold = math_ops.cast(threshold, y_pred.dtype)
    y_pred = math_ops.cast(y_pred > threshold, y_pred.dtype)
    y_true = math_ops.cast(y_true > threshold, y_true.dtype)

    return K.mean(math_ops.equal(y_true, y_pred), axis=-1)


def custom_binary_crossentropy(y_true, y_pred):
    y_pred = ops.convert_to_tensor(y_pred)
    y_true = math_ops.cast(y_true, y_pred.dtype)
    epsilon_ = K._constant_to_tensor(K.epsilon(), y_pred.dtype.base_dtype)
    output = clip_ops.clip_by_value(y_pred, epsilon_, 1.0 - epsilon_)

    # Compute cross entropy from probabilities.
    bce = 4 * y_true * math_ops.log(output + K.epsilon())
    bce += (1 - y_true) * math_ops.log(1 - output + K.epsilon())
    return K.sum(-bce, axis=-1)

In [8]:
def transformer_classifier(
                            num_layers=4,
                            d_model=16822,
                            num_heads=13,
                            dff=256,
                            maximum_position_encoding=2048,
                            n_classes=16,
                            ):
    inp = Input((None, d_model))
    encoder = Encoder(
                        num_layers=num_layers,
                        d_model=d_model,
                        num_heads=num_heads,
                        dff=dff,
                        maximum_position_encoding=maximum_position_encoding,
                        rate=0.3,
                    )
    x = encoder(inp)
    x = Dropout(0.2)(x)
    x = GlobalAvgPool1D()(x)
    x = Dense(4 * n_classes, activation="selu")(x)
    out = Dense(n_classes, activation="sigmoid")(x)
    model = Model(inputs=inp, outputs=out)
    opt = Adam(0.00001)
    model.compile(optimizer=opt, loss=custom_binary_crossentropy, metrics=[custom_binary_accuracy])
    model.summary()
    return model

In [None]:
transformer_classifier()

In [None]:
def transformer_pretrain(
                        num_layers=4, d_model=16822, num_heads=13, dff=256, maximum_position_encoding=2048,
                        ):
    inp = Input((None, d_model))
    encoder = Encoder(
        num_layers=num_layers,
        d_model=d_model,
        num_heads=num_heads,
        dff=dff,
        maximum_position_encoding=maximum_position_encoding,
        rate=0.3,
    )
    x = encoder(inp)
    out = Dense(d_model, activation="linear", name="out_pretraining")(x)
    model = Model(inputs=inp, outputs=out)
    opt = Adam(0.0001)
    model.compile(optimizer=opt, loss=mae)
    model.summary()
    return model

In [None]:
transformer_pretrain()

Model: "model_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_4 (InputLayer)         [(None, None, 128)]       0         
_________________________________________________________________
encoder_1 (Encoder)          (None, None, 128)         529920    
_________________________________________________________________
out_pretraining (Dense)      (None, None, 128)         16512     
Total params: 546,432
Trainable params: 546,432
Non-trainable params: 0
_________________________________________________________________


<tensorflow.python.keras.engine.functional.Functional at 0x7f359f421c50>

In [None]:
import numpy as np
import librosa
from scipy import signal
from scipy.io import wavfile
from scipy.signal import butter,filtfilt
from scipy.stats import kurtosis
import scipy.signal as signal
from scipy.integrate import simps
import matplotlib.pyplot as plt
import librosa.display
import sklearn
import pandas as pd
import seaborn as sns
import json

## Loading the Data

In [None]:
def load_data(data_path):
    with open(data_path,"r") as fp:
        data=json.load(fp)
    inputs = np.array(data["mfcc"])
    targets= np.array(data["labels"])
    
    return inputs,targets

In [None]:
inputs,targets=load_data("/content/drive/MyDrive/mfcc_3.json")

  after removing the cwd from sys.path.


In [None]:
shape_diff=[]
for i in range(len(inputs)):
    shape_diff.append(np.array(inputs[i]).shape[0])

INPUTS2=inputs.copy()

INPUTS3=[]
for i in range(INPUTS2.shape[0]):
  A=np.zeros((max(shape_diff),13))#-np.array(INPUTS2[i]).shape[0]
  A[:np.array(INPUTS2[i]).shape[0],:]=INPUTS2[i]
  INPUTS3.append(A)

## Train Test Split

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
def prepare_dataset(test_size,val_size):
  X_train,X_test,y_train,y_test=train_test_split(INPUTS3,targets,test_size=test_size)
  X_train,X_valid,y_train,y_valid=train_test_split(X_train,y_train,test_size=val_size)

  return np.array(X_train),np.array(X_valid),np.array(X_test),y_train,y_valid,y_test

In [None]:
X_train,X_valid,X_test,y_train,y_valid,y_test= prepare_dataset(0.25,0.2)

## Using The Model

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau

In [None]:
h5_name = "transformer_v2.h5"
h5_pretrain = "transformer_pretrain.h5"

In [None]:
input_shape=(X_train.shape[1],X_train.shape[2])
print(type(input_shape))

<class 'tuple'>


In [None]:
model = transformer_classifier(n_classes=2)

Model: "model_4"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_10 (InputLayer)        [(None, None, 128)]       0         
_________________________________________________________________
encoder_6 (Encoder)          (None, None, 128)         529920    
_________________________________________________________________
dropout_66 (Dropout)         (None, None, 128)         0         
_________________________________________________________________
global_average_pooling1d_3 ( (None, 128)               0         
_________________________________________________________________
dense_174 (Dense)            (None, 8)                 1032      
_________________________________________________________________
dense_175 (Dense)            (None, 2)                 18        
Total params: 530,970
Trainable params: 530,970
Non-trainable params: 0
_____________________________________________________

In [None]:
checkpoint = ModelCheckpoint(
    h5_name,
    monitor="val_loss",
    verbose=1,
    save_best_only=True,
    mode="min",
    save_weights_only=True,
)
reduce_o_p = ReduceLROnPlateau(
    monitor="val_loss", patience=20, min_lr=1e-7, mode="min"
)

In [None]:
epochs=1

In [None]:
 model.fit(
        X_train,y_train,
        validation_data=(X_valid,y_valid),
        epochs=epochs,
        callbacks=[checkpoint, reduce_o_p],
        use_multiprocessing=True,
        workers=12,
        verbose=2,
        max_queue_size=64,
    )

ValueError: ignored