# DIRECT INFERENCING 

In [None]:
# !pip install tensorflow
# !pip install glob


In [1]:
import os
import random
from glob import glob
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

class TokenEmbedding(layers.Layer):
    def __init__(self, num_vocab=1000, maxlen=100, num_hid=64):
        super().__init__()
        self.emb = tf.keras.layers.Embedding(num_vocab, num_hid)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=num_hid)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        x = self.emb(x)
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        return x + positions


class SpeechFeatureEmbedding(layers.Layer):
    def __init__(self, num_hid=64, maxlen=100):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv1D(
            num_hid, 11, strides=2, padding="same", activation="relu"
        )
        self.conv2 = tf.keras.layers.Conv1D(
            num_hid, 11, strides=2, padding="same", activation="relu"
        )
        self.conv3 = tf.keras.layers.Conv1D(
            num_hid, 11, strides=2, padding="same", activation="relu"
        )
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=num_hid)

    def call(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        return self.conv3(x)

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential(
            [
                layers.Dense(feed_forward_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)
    
    
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, dropout_rate=0.1):
        super().__init__()
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = layers.LayerNormalization(epsilon=1e-6)
        self.self_att = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.enc_att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.self_dropout = layers.Dropout(0.5)
        self.enc_dropout = layers.Dropout(0.1)
        self.ffn_dropout = layers.Dropout(0.1)
        self.ffn = keras.Sequential(
            [
                layers.Dense(feed_forward_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )

    def causal_attention_mask(self, batch_size, n_dest, n_src, dtype):
        """Masks the upper half of the dot product matrix in self attention.

        This prevents flow of information from future tokens to current token.
        1's in the lower triangle, counting from the lower right corner.
        """
        i = tf.range(n_dest)[:, None]
        j = tf.range(n_src)
        m = i >= j - n_src + n_dest
        mask = tf.cast(m, dtype)
        mask = tf.reshape(mask, [1, n_dest, n_src])
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
        )
        return tf.tile(mask, mult)

    def call(self, enc_out, target):
        input_shape = tf.shape(target)
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        causal_mask = self.causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        target_att = self.self_att(target, target, attention_mask=causal_mask)
        target_norm = self.layernorm1(target + self.self_dropout(target_att))
        enc_out = self.enc_att(target_norm, enc_out)
        enc_out_norm = self.layernorm2(self.enc_dropout(enc_out) + target_norm)
        ffn_out = self.ffn(enc_out_norm)
        ffn_out_norm = self.layernorm3(enc_out_norm + self.ffn_dropout(ffn_out))
        return ffn_out_norm

class Transformer(keras.Model):
    def __init__(
        self,
        num_hid=64,
        num_head=2,
        num_feed_forward=128,
        source_maxlen=100,
        target_maxlen=100,
        num_layers_enc=4,
        num_layers_dec=1,
        num_classes=10,
    ):
        super().__init__()
        self.loss_metric = keras.metrics.Mean(name="loss")
        self.num_layers_enc = num_layers_enc
        self.num_layers_dec = num_layers_dec
        self.target_maxlen = target_maxlen
        self.num_classes = num_classes

        self.enc_input = SpeechFeatureEmbedding(num_hid=num_hid, maxlen=source_maxlen)
        self.dec_input = TokenEmbedding(
            num_vocab=num_classes, maxlen=target_maxlen, num_hid=num_hid
        )

        self.encoder = keras.Sequential(
            [self.enc_input]
            + [
                TransformerEncoder(num_hid, num_head, num_feed_forward)
                for _ in range(num_layers_enc)
            ]
        )

        for i in range(num_layers_dec):
            setattr(
                self,
                f"dec_layer_{i}",
                TransformerDecoder(num_hid, num_head, num_feed_forward),
            )

        self.classifier = layers.Dense(num_classes)

    def decode(self, enc_out, target):
        y = self.dec_input(target)
        for i in range(self.num_layers_dec):
            y = getattr(self, f"dec_layer_{i}")(enc_out, y)
        return y

    def call(self, inputs):
        source = inputs[0]
        target = inputs[1]
        x = self.encoder(source)
        y = self.decode(x, target)
        return self.classifier(y)

    @property
    def metrics(self):
        return [self.loss_metric]

    def train_step(self, batch):
        """Processes one batch inside model.fit()."""
        source = batch["source"]
        target = batch["target"]
        dec_input = target[:, :-1]
        dec_target = target[:, 1:]
        with tf.GradientTape() as tape:
            preds = self([source, dec_input])
            one_hot = tf.one_hot(dec_target, depth=self.num_classes)
            mask = tf.math.logical_not(tf.math.equal(dec_target, 0))
            loss = self.compiled_loss(one_hot, preds, sample_weight=mask)
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        self.loss_metric.update_state(loss)
        return {"loss": self.loss_metric.result()}

    def test_step(self, batch):
        source = batch["source"]
        target = batch["target"]
        dec_input = target[:, :-1]
        dec_target = target[:, 1:]
        preds = self([source, dec_input])
        one_hot = tf.one_hot(dec_target, depth=self.num_classes)
        mask = tf.math.logical_not(tf.math.equal(dec_target, 0))
        loss = self.compiled_loss(one_hot, preds, sample_weight=mask)
        self.loss_metric.update_state(loss)
        return {"loss": self.loss_metric.result()}

    def generate(self, source, target_start_token_idx):
        """Performs inference over one batch of inputs using greedy decoding."""
        bs = tf.shape(source)[0]
        enc = self.encoder(source)
        dec_input = tf.ones((bs, 1), dtype=tf.int32) * target_start_token_idx
        dec_logits = []
        for i in range(self.target_maxlen - 1):
            dec_out = self.decode(enc, dec_input)
            logits = self.classifier(dec_out)
            logits = tf.argmax(logits, axis=-1, output_type=tf.int32)
            last_logit = tf.expand_dims(logits[:, -1], axis=-1)
            dec_logits.append(last_logit)
            dec_input = tf.concat([dec_input, last_logit], axis=-1)
        return dec_input

class VectorizeChar:
    def __init__(self, max_len=50):
        self.vocab = (
            ["-", "#", "<", ">"]
            + [chr(i + 2303) for i in range(1, 120)] 
            + [" ", ".", ",", "?"]
        )
        
        self.max_len = max_len
        self.char_to_idx = {}
        for i, ch in enumerate(self.vocab):
            self.char_to_idx[ch] = i
            print(self.char_to_idx[ch],ch)

    def __call__(self, text):
        text = text.lower()
        text = text[: self.max_len - 2]
        text = "<" + text + ">"
        pad_len = self.max_len - len(text)
        return [self.char_to_idx.get(ch, 1) for ch in text] + [0] * pad_len

    def get_vocabulary(self):
#         print(self.vocab)
        return self.vocab
        


max_target_len = 200  # all transcripts in our data are < 200 characters
# data = get_data(wavs, id_to_text, max_target_len)
# # print(data)
vectorizer = VectorizeChar(max_target_len)
# print("vocab size", len(vectorizer.get_vocabulary()))

# import random
# random.shuffle(data)


def create_text_ds(data):
    texts = [_["text"] for _ in data]
    text_ds = [vectorizer(t) for t in texts]
    text_ds = tf.data.Dataset.from_tensor_slices(text_ds)
#     print(text_ds)
    return text_ds


def path_to_audio(path):
    # spectrogram using stft
    audio = tf.io.read_file(path)
    audio, _ = tf.audio.decode_wav(audio, 1)
    audio = tf.squeeze(audio, axis=-1)
    stfts = tf.signal.stft(audio, frame_length=200, frame_step=80, fft_length=256)

    x = tf.math.pow(tf.abs(stfts), 0.5)

    # normalisation
    means = tf.math.reduce_mean(x, 1, keepdims=True)
    stddevs = tf.math.reduce_std(x, 1, keepdims=True)
    x = (x - means) / stddevs
    audio_len = tf.shape(x)[0]

    # padding to 10 seconds
    pad_len = 2754
    paddings = tf.constant([[0, pad_len], [0, 0]])
    x = tf.pad(x, paddings, "CONSTANT")[:pad_len, :]
    x = tf.where(tf.math.is_nan(x), 0., x) # get rid of all nan values, avoid "loss:nan"

    return x


def create_audio_ds(data):
    flist = [_["audio"] for _ in data]
    audio_ds = tf.data.Dataset.from_tensor_slices(flist)
    audio_ds = audio_ds.map(
        path_to_audio, num_parallel_calls=tf.data.AUTOTUNE
    )
    return audio_ds


def create_tf_dataset(data, bs=4):
    audio_ds = create_audio_ds(data)
    text_ds = create_text_ds(data)
    ds = tf.data.Dataset.zip((audio_ds, text_ds))
    ds = ds.map(lambda x, y: {"source": x, "target": y})
    ds = ds.batch(bs)
    ds = ds.prefetch(tf.data.AUTOTUNE)
    return ds


class DisplayOutputs(keras.callbacks.Callback):
    def __init__(
        self, batch, idx_to_token, target_start_token_idx=27, target_end_token_idx=28
    ):
        """Displays a batch of outputs after every epoch

        Args:
            batch: A test batch containing the keys "source" and "target"
            idx_to_token: A List containing the vocabulary tokens corresponding to their indices
            target_start_token_idx: A start token index in the target vocabulary
            target_end_token_idx: An end token index in the target vocabulary
        """
        self.batch = batch
        print(batch)
        self.target_start_token_idx = target_start_token_idx
        self.target_end_token_idx = target_end_token_idx
        self.idx_to_char = idx_to_token

    def on_epoch_end(self, epoch, logs=None):
        if epoch % 5 != 0:
            return
        source = self.batch["source"]
        target = self.batch["target"].numpy()
        bs = tf.shape(source)[0]
        preds = self.model.generate(source, self.target_start_token_idx)
        preds = preds.numpy()
        for i in range(bs):
            target_text = "".join([self.idx_to_char[_] for _ in target[i, :]])
            prediction = ""
            for idx in preds[i, :]:
                prediction += self.idx_to_char[idx]
                if idx == self.target_end_token_idx:
                    break
            print(f"target:     {target_text.replace('-','')}")
            print(f"prediction: {prediction}\n")

model = Transformer(
    num_hid=200,
    num_head=2,
    num_feed_forward=400,
    target_maxlen=max_target_len,
    num_layers_enc=4,
    num_layers_dec=1,
    num_classes=127,
)

0 -
1 #
2 <
3 >
4 ऀ
5 ँ
6 ं
7 ः
8 ऄ
9 अ
10 आ
11 इ
12 ई
13 उ
14 ऊ
15 ऋ
16 ऌ
17 ऍ
18 ऎ
19 ए
20 ऐ
21 ऑ
22 ऒ
23 ओ
24 औ
25 क
26 ख
27 ग
28 घ
29 ङ
30 च
31 छ
32 ज
33 झ
34 ञ
35 ट
36 ठ
37 ड
38 ढ
39 ण
40 त
41 थ
42 द
43 ध
44 न
45 ऩ
46 प
47 फ
48 ब
49 भ
50 म
51 य
52 र
53 ऱ
54 ल
55 ळ
56 ऴ
57 व
58 श
59 ष
60 स
61 ह
62 ऺ
63 ऻ
64 ़
65 ऽ
66 ा
67 ि
68 ी
69 ु
70 ू
71 ृ
72 ॄ
73 ॅ
74 ॆ
75 े
76 ै
77 ॉ
78 ॊ
79 ो
80 ौ
81 ्
82 ॎ
83 ॏ
84 ॐ
85 ॑
86 ॒
87 ॓
88 ॔
89 ॕ
90 ॖ
91 ॗ
92 क़
93 ख़
94 ग़
95 ज़
96 ड़
97 ढ़
98 फ़
99 य़
100 ॠ
101 ॡ
102 ॢ
103 ॣ
104 ।
105 ॥
106 ०
107 १
108 २
109 ३
110 ४
111 ५
112 ६
113 ७
114 ८
115 ९
116 ॰
117 ॱ
118 ॲ
119 ॳ
120 ॴ
121 ॵ
122 ॶ
123  
124 .
125 ,
126 ?


In [53]:
# Loading one of the saved model
model_path = r".\models\smallDatasetCheckpoint-48-0.12"
# model_path = r".\models\smallDatasetCheckpoint-56-0.12"
# model_path = r".\models\smallDatasetCheckpoint-56-0.12"
model.load_weights(model_path)


<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x1e1a5afea90>

# ###LIVE AUDIO####

In [39]:
# please uncomment the 2 lines below if the dependencies are not downloaded already.

# !pip install sounddevice
# !pip install scipy

In [40]:
##LIVE RECORD TRANSLATION (FOR DEMO)
# import required libraries
import sounddevice as sd
from scipy.io.wavfile import write
#animation imports
import itertools
import threading
import time
import sys
import sounddevice

def my_rec():
    done = False
    #here is the animation
    def animate():
        for c in itertools.cycle(['|', '/', '-', '\\']):
            if done:
                break
            sys.stdout.write('\rRecording ' + c)
            sys.stdout.flush()
            time.sleep(0.1)
        sys.stdout.write('\rDone!')

    fs= 16000  
    second = 4
    t = threading.Thread(target=animate)
    t.start()
    #long process here
    print("Please Speak in Nepali")
    record_voice = sounddevice.rec( int ( second * fs ) , samplerate = fs , channels = 1, dtype='int16' )
    sounddevice.wait()
    time.sleep(1)
    done = True
    aud_path=r"./live_audrec/Audiotest.wav"
    write(aud_path,fs,record_voice)
    print("Saved in REC folder")

#Predictor functions

def single_model_predict():
    #Prediction of the recorded audio.
    print("Loading the transcription")
    path=r"./live_audrec/Audiotest.wav"

    # print(path)
    x = path_to_audio(path)
    #print(x)
    x = tf.expand_dims(x, axis=0)
    # print(x.shape)
    idx_to_char = vectorizer.get_vocabulary()
    preds = model.generate(x, 2)
    preds = preds.numpy()
    bs = tf.shape(x)[0]
    for i in range(bs):
        prediction = ""
        for idx in preds[i, :]:
            prediction += idx_to_char[idx]
            if idx == 3:
                break

    print("prediction: ", prediction)


def all_model_predict(models_paths):
        #Prediction of the recorded audio.
    
    path=r"./live_audrec/Audiotest.wav"
    mod_no=0
    for model_path in models_paths:
            # print(path)
        model.load_weights(model_path)
        x = path_to_audio(path)
        #print(x)
        x = tf.expand_dims(x, axis=0)
        # print(x.shape)
        idx_to_char = vectorizer.get_vocabulary()
        preds = model.generate(x, 2)
        preds = preds.numpy()
        bs = tf.shape(x)[0]
        for i in range(bs):
            prediction = ""
            for idx in preds[i, :]:
                prediction += idx_to_char[idx]
                if idx == 3:
                    break
        mod_no=mod_no+1

        print("prediction from model{}: ".format(mod_no), prediction)



In [49]:
#calling my_rec() function
my_rec()

Recording |Please Speak in Nepali
Recording /Saved in REC folder
Done!

In [54]:
#calling single_model_predict() function
single_model_predict()

Loading the transcription
prediction:  <मेरो नाम सिसिर पौडेल हो>


In [52]:
# Loading all of the three saved models
model1_path = r".\models\smallDatasetCheckpoint-48-0.12" # trained for 48 epochs
model2_path = r".\models\smallDatasetCheckpoint-56-0.12" # trained for 56 epochs
model3_path = r".\models\smallDatasetCheckpoint-57-0.12" # trained for 57 epochs

models_paths=[model1_path,model2_path,model3_path]

print("Loading the transcriptions")
# call each model in sequence
all_model_predict(models_paths)

Loading the transcriptions
prediction from model1:  <मेरो नाम सिसिर पौडेल हो>
prediction from model2:  <मेरो नाम सिसिट पौडेल हो>
prediction from model3:  <मेरो नाम सिसिर पौडेल हो>


# New WER calcualtion