In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd
from tensorflow import keras
from tensorflow.keras.activations import softmax
from keras import layers
from tensorflow.keras.layers import Dense,LayerNormalization ## alternative for nn.linear
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

In [None]:
X_data = np.load("../data/data_mfcc.npy")
X_data = np.transpose(X_data, (0, 2, 1))
# X_data=X_data[:100]
print(X_data.shape)




In [None]:
data = pd.read_csv(
    "../data/LJSpeech-1.1/metadata.csv",
    sep="|",
    header=None,
    names=["ID", "Text1", "Text2"],
)
texts = data["Text1"].to_list()
ID = data["ID"].to_list()
tokenizer = Tokenizer()
tokenizer.fit_on_texts(texts)
num_classes = len(tokenizer.word_index) + 1  # Add 1 for the padding token
sequences = tokenizer.texts_to_sequences(texts)
Y_data = pad_sequences(sequences, padding="post", maxlen=30)
# Y_data=Y_data[:100]
print(num_classes)
print(Y_data.shape)

In [None]:
def create_self_attention_mask(sequence_length):
    mask = np.tril(np.ones((sequence_length,sequence_length)))
    mask[mask==0]=-np.inf
    mask[mask==1]=0
    return mask

In [None]:
test = create_self_attention_mask(5)
test

In [None]:
def scaled_dot_product(q, k, v, mask):
    d_k = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_qk = tf.matmul(q, k, transpose_b=True) / tf.math.sqrt(d_k)

    if mask is not None:
        scaled_qk += mask

    attention_weights = tf.nn.softmax(scaled_qk)
    output = tf.matmul(attention_weights, v)
    return output, attention_weights

In [None]:
# q=[[1.0,2.0,1.0],[1.0,1.0,1.0],[1.0,1.0,1.0]]
# k=[[1.0,3.0,1.0],[1.0,1.0,1.0],[1.0,1.0,1.0]]
# v=[[1.0,1.0,1.0],[1.0,1.0,1.0],[1.0,1.0,1.0]]
# mask_1=create_self_attention_mask(3)
# test,weights = scaled_dot_product(q,k,v,mask=mask_1)
# print("output is\n",test)
# print("attention weights is \n",weights)

In [None]:
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, d_model, max_sequence_length):
        super(PositionalEncoding, self).__init__()
        self.max_sequence_length = max_sequence_length
        self.d_model = d_model

    
    def call(self, inputs):
        even_i = tf.range(0, self.d_model, 2, dtype=tf.float32)
        denominator = tf.pow(10000.0, even_i / self.d_model)
        position = tf.reshape(
            tf.range(self.max_sequence_length, dtype=tf.float32),
            (1, self.max_sequence_length, 1),
        )
        even_PE = tf.sin(position / denominator)
        odd_PE = tf.cos(position / denominator)
        stacked = tf.stack([even_PE, odd_PE], axis=2)
        PE = tf.reshape(stacked, (1, self.max_sequence_length, -1))
        print("postional encoding output shape",PE.shape)
        return PE

In [None]:
class ConvolutionalLayer(tf.keras.layers.Layer):
    def __init__(self, input_shape, filters=32, kernel_size=3, **kwargs):
        super(ConvolutionalLayer, self).__init__(**kwargs)
        self.filters = filters
        self.kernel_size = kernel_size

        self.conv1 = layers.Conv1D(filters=self.filters, kernel_size=self.kernel_size, padding="same", trainable=True)
        self.batch_norm1 = layers.BatchNormalization()
        self.relu1 = layers.ReLU()

        self.conv2 = layers.Conv1D(filters=self.filters, kernel_size=self.kernel_size, padding="same",trainable=True)
        self.batch_norm2 = layers.BatchNormalization()
        self.relu2 = layers.ReLU()

        self.global_avg_pooling = layers.GlobalAveragePooling1D()

    
    def call(self, inputs, training=None, mask=None):
        conv1_out = self.relu1(self.batch_norm1(self.conv1(inputs), training=training))
        conv2_out = self.relu2(self.batch_norm2(self.conv2(conv1_out), training=training))
        gap_out = self.global_avg_pooling(conv2_out)
        print("CNN output shape is  ",gap_out.shape)
        return gap_out

In [None]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.qkv_layer = tf.keras.layers.Dense(3 * d_model, use_bias=False)
        self.linear_layer = tf.keras.layers.Dense(d_model, activation='relu')

    def split_heads(self, x, batch_size):
        if len(x.shape) == 2:
            # Expand dimensions to simulate batch_size=1 and sequence_length=30
            x = tf.expand_dims(tf.expand_dims(x, axis=0), axis=1)
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.head_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, x, mask):
        print("MultiHeadAttention input shape",x.shape)
        batch_size, _, _ = x.shape

        qkv = self.qkv_layer(x)
        q, k, v = tf.split(qkv, 3, axis=-1)
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        values, attention = scaled_dot_product(q, k, v, mask)

        values = tf.transpose(values, perm=[0, 2, 1, 3])
        values = tf.reshape(values, (batch_size, -1, self.num_heads * self.head_dim))
        out = self.linear_layer(values)
        print("MultiHeadAttention output shape is ",out.shape)
        return out

In [None]:
class PositionwiseFeedForward(tf.keras.layers.Layer):
    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = tf.keras.layers.Dense(hidden)
        self.linear2 = tf.keras.layers.Dense(d_model)
        self.relu = tf.keras.layers.ReLU()
        self.dropout = tf.keras.layers.Dropout(rate=drop_prob)

    
    def call(self, x):
        print("Input shape for positonal encoding",x.shape)
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        print("output shape from postional encoding",x.shape)
        return x

In [None]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(epsilon=1e-5)
        self.dropout1 = tf.keras.layers.Dropout(rate=drop_prob)
        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm2 = LayerNormalization(epsilon=1e-5)
        self.dropout2 = tf.keras.layers.Dropout(rate=drop_prob)

    
    def call(self, x, self_attention_mask, training=None):
        residual_x = x
        x = self.attention(x, mask=self_attention_mask)
        x = self.dropout1(x, training=training)
        x = self.norm1(x + residual_x)

        residual_x = x
        x = self.ffn(x)
        x = self.dropout2(x, training=training)
        x = self.norm2(x + residual_x)

        return x

In [None]:
class TransformerEncoderBlock(keras.layers.Layer):
    def __init__(self, head_size, num_heads, ff_dim, dropout, num_blocks=1, **kwargs):
        super(TransformerEncoderBlock, self).__init__(**kwargs)
        self.head_size = head_size
        self.num_heads = num_heads
        self.ff_dim = ff_dim
        self.dropout = dropout
        self.num_blocks = num_blocks

        # Create a list of Transformer encoder blocks
        self.encoder_blocks = [self.build_encoder_block() for _ in range(num_blocks)]

    def build_encoder_block(self):
        return TransformerEncoderBlockSingle(
            head_size=self.head_size,
            num_heads=self.num_heads,
            ff_dim=self.ff_dim,
            dropout=self.dropout,
        )

    def build(self, input_shape):
        super(TransformerEncoderBlock, self).build(input_shape)
        # Ensure that the encoder blocks are built
        for encoder_block in self.encoder_blocks:
            encoder_block.build(input_shape)

    def call(self, inputs):
        # Stack multiple Transformer encoder blocks
        x = inputs
        for encoder_block in self.encoder_blocks:
            x = encoder_block(x)
        return x


class TransformerEncoderBlockSingle(keras.layers.Layer):
    def __init__(self, head_size, num_heads, ff_dim, dropout, **kwargs):
        super(TransformerEncoderBlockSingle, self).__init__(**kwargs)

        self.head_size = head_size
        self.num_heads = num_heads
        self.ff_dim = ff_dim
        self.dropout = dropout

        # Multi-head self-attention layer
        self.self_attention = layers.MultiHeadAttention(
            key_dim=self.head_size,
            num_heads=self.num_heads,
            dropout=self.dropout,
        )

        # Feed Forward Part
        self.ffn_hidden = layers.Dense(self.ff_dim, activation="relu")
        self.ffn_output = layers.Dense(self.head_size)  # Use head_size here

        # Layer Normalization
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)

        # Dropout
        self.dropout1 = layers.Dropout(self.dropout)
        self.dropout2 = layers.Dropout(self.dropout)

    def build(self, input_shape):
        super(TransformerEncoderBlockSingle, self).build(input_shape)

    def call(self, inputs):
        # Attention and Normalization
        self_attn_output = self.self_attention(inputs, inputs)
        self_attn_output = self.dropout1(self_attn_output)
        x = self.layernorm1(self_attn_output + inputs)

        # Feed Forward Part
        ffn_output = self.ffn_output(self.ffn_hidden(x))
        x = self.layernorm2(ffn_output + x)

        return x

In [None]:
class SequentialEncoder(tf.keras.layers.Layer):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_sequence_length):
        super(SequentialEncoder, self).__init__()
        self.layers = [EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(num_layers)]

    
    def call(self, x, training=True, mask=None):
        for layer in self.layers:
            x = layer(x, training, mask)
        return x

In [None]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, 
                 d_model, 
                 ffn_hidden, 
                 num_heads, 
                 drop_prob, 
                 num_layers,
                 max_sequence_length):
        super(Encoder, self).__init__()
        self.layers = SequentialEncoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_sequence_length)
    
    
    def call(self, x, self_attention_mask):
        # Assuming x is the output from the convolutional layer
        print("Encoder input shape is",x.shape)
        x = self.layers(x, self_attention_mask)
        return x

In [None]:
class Transformer(tf.keras.Model):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_input_length):
        super(Transformer, self).__init__()
        self.d_model = d_model
        self.cnn_layer = ConvolutionalLayer(input_shape=(None, 20, 500))  # Adjust input shape
        self.encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_input_length)

    def call(self, inputs, training=None):
        cnn_output = self.cnn_layer(inputs)

        # Generate self-attention mask
        batch_size, sequence_length = cnn_output.shape
        self_attention_mask = create_self_attention_mask(sequence_length)

        encoder_output = self.encoder(cnn_output, self_attention_mask)
        return encoder_output


In [None]:
d_model=20
ffn_hidden=1024
num_heads=1
drop_prob=0.1
num_layers=1
max_input_length=500

In [None]:
model = tf.keras.Sequential([Transformer(d_model=d_model,ffn_hidden=ffn_hidden,num_heads=num_heads,drop_prob=drop_prob,num_layers=num_layers,max_input_length=max_input_length)])

In [None]:
from sklearn.model_selection import train_test_split

# Assuming X_data and Y_data are your input features and labels
X_train, X_val, Y_train, Y_val = train_test_split(X_data, Y_data, test_size=0.2, random_state=42)

print(X_train.shape)
print(X_val.shape)

print(Y_train.shape)
print(Y_val.shape)

In [None]:
model.compile(optimizer='adam', loss='mean_squared_error')
# model.build(input_shape=(13100,500,20))  # Replace your_input_shape with the actual input shape
# model.summary() 


In [None]:
def loss_function(y_true, y_pred):
    return tf.reduce_mean(tf.square(y_true - y_pred))

In [None]:
model.fit(X_train,Y_train,epochs=4)
predictions = model.predict(X_val)
print(predictions[0],Y_val[0])
