<a href="https://colab.research.google.com/github/Benteaux/gomera-project/blob/main/awsr_silbogomero.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Dependencies and Such**

In [None]:
!pip install tensorflow_text
!pip install tensorflow-io

In [None]:
import tensorflow as tf
import tensorflow_text as tf_text
import tensorflow_io as tfio
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
import os
from google.colab import drive

In [None]:
# assuming you've stored your data in a google drive setup like mine
drive.mount("/content/drive")
MAIN = os.path.join("/content", "drive", "My Drive", "silboData")

# note that, to be used with tensorflow, the data must be converted to the PCM 16 subtype. This can be done with the soundfile library.
wordAudio = tf.data.Dataset.list_files(MAIN + "/words/clips/*-16.wav")
wordText = tf.data.Dataset.list_files(MAIN + "/words/transcriptions/*.txt")

**Data Sorting and Preprocessing**

In [None]:
waFiles = []
for element in wordAudio.as_numpy_iterator():
  waFiles.append(element)

In [None]:
audiofiles = sorted(set(waFiles))

In [None]:
import re

wordFiles = []
for element in wordText.as_numpy_iterator():
  wordFiles.append(element)
numbers1 = [int(re.search(r'\d+', filepath.decode()).group()) for filepath in wordFiles]


In [None]:
textfiles = [textfile for _, textfile in (sorted(zip(numbers1, wordFiles)))]

In [None]:
len(textfiles) == len(audiofiles)

**audio preprocessing functions**

In [None]:
# assuming usage of both sentence data and single word data from the dataset, 2651682 was determined to be the largest shape of any audio file.
# 44384 is the median, 87053 is the mean, and 8939 is the min
def load_mono_pad(filepath):
    audio = tf.io.read_file(filepath)
    audio, sampling = tf.audio.decode_wav(audio, desired_channels = 1)
    sampling = tf.cast(sampling, dtype = tf.int64)
    audio = tf.squeeze(audio, axis = -1)
    size = # longest audio length being accepted. note that if you choose to cut stuff off, you may need to preprocess the text differently
    audio = audio[:size]
    padding = tf.zeros([size] - tf.shape(audio), dtype = tf.float32)
    audio = tf.concat([audio, padding], axis = 0)
    # audio and sample are tf.float32
    return audio



In [None]:
# get mel spectrogram and scale to dB
sampling = 44100 # can also return sampling rate from load_mono_pad if desired
def log_mel_features(audio):
  # window = 25ms, stride = 10ms, 512 nffts was taken from TF documentation
  spect = tfio.audio.spectrogram(audio, 512, 25, 10)

  # 80 dimensional mel features as specified by referenced paper
  mel = tfio.audio.melscale(spect, sampling, mels = 80, fmin = 0, fmax = 16000)

  # scale to dB. 80 taken from TF documentation
  mel = tfio.audio.dbscale(mel, top_db = 80)

  return mel

**text preprocessing functions**

In [None]:
def load_text(filepath):
  def actual_stuff(filepath):
    text = tf.io.read_file(filepath).numpy()
    return text

  text = tf.py_function(actual_stuff, [filepath], tf.string)

  return text

In [None]:
# note that spanish characters like á, ñ, etc, are converted to English equivalents (á -> a, ñ -> n)
def text_process(text):
    def replace_chars(text_tensor):
        normal_text = tf_text.normalize_utf8(text_tensor, 'NFKD')
        normal_text = tf.strings.lower(normal_text)
        normal_text = tf.strings.regex_replace(normal_text, '[^ a-z]', '')
        normal_text = tf.strings.regex_replace(normal_text, '(.)', r'\1 ') # character-level tokenization
        normal_text = tf.strings.strip(normal_text)
        normal_text = tf.strings.join(['[START]', normal_text, "[END]"], separator = ' ')
        return normal_text

    processed_texts = replace_chars(text)

    return processed_texts


In [None]:
def preprocess(audio, text):
  audio = log_mel_features(audio)
  text = vectorizer(text)
  textIn = text[:, :-1]
  textOut = text[:, 1:]
  return (audio, textIn), textOut

In [None]:
textData = tf.data.TextLineDataset(textfiles)
audioData = tf.data.Dataset.from_tensor_slices(audiofiles)
audioData = audioData.map(load_mono_pad)
data = tf.data.Dataset.zip(audioData, textData)
data = data.batch(16, drop_remainder = True)

In [None]:
vocabulary = 30
vectorizer = tf.keras.layers.TextVectorization(max_tokens = vocabulary, standardize = text_process, output_sequence_length = 30)

In [None]:
vectorizer.adapt(['[START]',
 '[END]',
 'a',
 'b',
 'c',
 'd',
 'e',
 'f',
 'g',
 'h',
 'i',
 'j',
 'k',
 'l',
 'm',
 'n',
 'o',
 'p',
 'q',
 'r',
 's',
 't',
 'u',
 'v',
 'w',
 'x',
 'y',
 'z'])

In [None]:
buffer = len(textfiles)
data = data.map(preprocess)
data = data.shuffle(buffer)
data = data.shuffle(buffer)

In [None]:
size = # of batches in your dataset. Depends on how much of the silbo gomero data you use
eighty = int(size* 0.8)
train_ds = data.take(eighty)
test_ds = data.skip(eighty).take(size - eighty)

**Model Architecture**

In [None]:
# modelled after the referenced paper's shared encoder
from tensorflow.keras.layers import GRU, Dense, MaxPooling1D
from tensorflow import keras
class Encoder(tf.keras.layers.Layer):
  @classmethod
  def add_method(cls, fun):
    setattr(cls, fun.__name__, fun)
    return fun


  def __init__(self, lstm_units = 512, proj_units = 160, embed = 32):
    super().__init__()
    self.downsampling = tf.keras.layers.MaxPooling1D(pool_size = (3))
    self.rnn1 = GRU(lstm_units, return_sequences = True)
    self.proj1 = Dense(proj_units, activation = 'relu')
    self.rnn2 = GRU(lstm_units, return_sequences = True)
    self.reduction = MaxPooling1D(pool_size = (2))
    self.proj2 = Dense(proj_units, activation = 'relu')
    self.rnn3 = GRU(lstm_units, return_sequences = True)
    self.proj3 = Dense(proj_units, activation = 'relu')

  def call(self, audio):
    x = self.downsampling(audio)
    x = self.rnn1(x)
    x = self.proj1(x)
    x = self.rnn2(x)
    x = self.reduction(x)
    x = self.proj2(x)
    x = self.rnn3(x)
    x = self.proj3(x)

    return x

In [None]:
@Encoder.add_method
def load_mono_pad(self, filepath):
    audio = tf.io.read_file(filepath)
    audio, sampling = tf.audio.decode_wav(audio, desired_channels = 1)
    audio = tf.squeeze(audio, axis = -1)
    size = # same size as before
    audio = audio[:size]
    padding = tf.zeros([size] - tf.shape(audio), dtype = tf.float32)
    audio = tf.concat([audio, padding], axis = 0)

    return audio, sampling

In [None]:
@Encoder.add_method
def log_mel_features(self, audio, sampling):
    spect = tfio.audio.spectrogram(audio, 512, 25, 10)
    mel = tfio.audio.melscale(spect, sampling, mels = 80, fmin = 0, fmax = 8000)
    mel = tfio.audio.dbscale(mel, top_db = 80)

    return mel

In [None]:
@Encoder.add_method
def convert_inputs(self, filepath):
    audio = self.log_mel_features(*(self.load_mono_pad(filepath)))
    audio = tf.expand_dims(audio, axis = 0)
    context = self(audio)
    return context

In [None]:
# modelled after the referenced paper
class CrossAttention(tf.keras.layers.Layer):
  def __init__(self, num_heads = 2):
    super().__init__()
    self.mha = tf.keras.layers.MultiHeadAttention(key_dim = 64, num_heads = num_heads)
    self.add = tf.keras.layers.Add()
    self.layernorm = tf.keras.layers.LayerNormalization()


  def call(self, x, context):
    attention = self.mha(query = x, value = context)
    x = self.add([x, attention])
    x = self.layernorm(x)

    return x

In [None]:
# modelled after the referenced paper
class Decoder(tf.keras.layers.Layer):
  @classmethod
  def add_method(cls, fun):
    setattr(cls, fun.__name__, fun)
    return fun

  def __init__(self, tokenizer, lstm_units = 512, proj_units = 160, embed = 24, **kwargs):
    super().__init__(**kwargs)
    self.tokenizer = tokenizer
    self.vocab_size = tokenizer.vocabulary_size()
    self.id_to_text = tf.keras.layers.StringLookup(vocabulary = tokenizer.get_vocabulary(), mask_token = '', oov_token = '[UNK]', invert = True)
    self.text_to_id = tf.keras.layers.StringLookup(vocabulary = tokenizer.get_vocabulary(), mask_token = '', oov_token = '[UNK]')
    self.start_token = self.text_to_id('[START]')
    self.end_token = self.text_to_id('[END]')
    self.embedding = tf.keras.layers.Embedding(self.vocab_size, embed)
    self.cross = CrossAttention()
    self.rnn1 = GRU(units = lstm_units, return_sequences = True, return_state = True)
    self.rnn2 = GRU(units = lstm_units, return_sequences = True, return_state = True)
    self.proj = Dense(units = proj_units, activation = 'relu')
    self.out = Dense(units = self.vocab_size)


  def call(self, context, x, state1 = None, state2 = None, return_states = False):
    x = self.embedding(x)
    x = self.cross(x, context)
    x, state1 = self.rnn1(x, initial_state = state1)
    x, state2 = self.rnn2(x, initial_state = state2)
    x = self.proj(x)
    logits = self.out(x)
    if return_states:
      return logits, state1, state2
    else:
      return logits

In [None]:
@Decoder.add_method
def get_initial_state(self, context):
  batch_size = tf.shape(context)[0]
  start_tokens = tf.fill([batch_size, 1], self.start_token)
  done = tf.zeros([batch_size, 1], dtype = tf.bool)
  embeddings = self.embedding(start_tokens)
  return start_tokens, done, self.rnn1.get_initial_state(embeddings)[0], self.rnn2.get_initial_state(embeddings)[0]

In [None]:
@Decoder.add_method
def tokens_to_text(self, x):
  text = self.id_to_text(x)
  text = tf.strings.reduce_join(text, axis = -1, separator = ' ')
  text = tf.strings.regex_replace(text, '^ *\[START\] *', '')
  text = tf.strings.regex_replace(text, ' *\[END\] *$', '')
  return text

In [None]:
@Decoder.add_method
def get_next_token(self, context, next_token, done, state1, state2, temperature = 0.0):
  logits, state1, state2 = self(context, next_token, state1, state2, return_states = True)

  if temperature == 0.0:
    next_token = tf.argmax(logits, axis = -1)
  else:
    logits = logits[:, -1, :] / temperature
    next_token = tf.random.categorical(logits, num_samples = 1)
  done = done | (next_token == self.end_token)
  next_token = tf.where(done, tf.constant(0, dtype = tf.int64), next_token)

  return next_token, done, state1, state2

In [None]:
class Whistler(tf.keras.Model):
  @classmethod
  def add_method(cls, fun):
    setattr(cls, fun.__name__, fun)
    return fun

  def __init__(self, tokenizer, **kwargs):
    super().__init__(**kwargs)
    self.tokenizer = tokenizer
    encoder = Encoder()
    decoder = Decoder(tokenizer = self.tokenizer)
    self.encoder = encoder
    self.decoder = decoder

  def call(self, input):
    context, x = input
    context = self.encoder(context)
    logits = self.decoder(context, x)
    return logits

In [None]:
@Whistler.add_method
def translate(self, filepath, *, max_length = 50, temperature = 0.0):
  context = self.encoder.convert_inputs(filepath)

  tokens = []
  next_token, done, state1, state2 = self.decoder.get_initial_state(context)
  for _ in range(max_length):
    next_token, done, state1, state2 = self.decoder.get_next_token(context, next_token, done, state1, state2, temperature)
    tokens.append(next_token)

    if tf.executing_eagerly() and tf.reduce_all(done):
      break

  tokens = tf.concat(tokens, axis = -1)
  output = self.decoder.tokens_to_text(tokens)
  return output

In [None]:
def masked_loss(y_true, y_pred):
  loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits = True, reduction = 'none'
  )
  loss = loss_fn(y_true, y_pred)
  mask = tf.cast(y_true != 0, loss.dtype)

  return tf.reduce_sum(loss) / tf.reduce_sum(mask)

In [None]:
def masked_accuracy(y_true, y_pred):
  y_pred = tf.argmax(y_pred, axis = -1)
  y_pred = tf.cast(y_pred, y_true.dtype)
  masked = tf.cast(y_true != 0, tf.float32)
  matched = tf.cast(y_true == y_pred, tf.float32) * masked

  return tf.reduce_sum(matched) / tf.reduce_sum(masked)

In [None]:
epochs = 20
UNITS = 256

In [None]:
model = Whistler(vectorizer)
opt = tf.keras.optimizers.Adam(1e-4)
model.compile(optimizer = opt, loss = masked_loss, metrics = [masked_loss, masked_accuracy])

**Training & Testing**

In [None]:
model.translate(audiofiles[1025])

In [None]:
model.evaluate(test_ds.repeat(), steps = 20, return_dict = True)

In [None]:
history = model.fit(
    train_ds.repeat(),
    epochs = epochs,
    steps_per_epoch = 100
)

In [None]:
model.evaluate(val_ds.repeat(), steps = 20, return_dict = True)

**Inference Test**

In [None]:
fp = audiofiles[41]
fp

In [None]:
output = model.translate(fp)

In [None]:
load_text(textfiles[41])

In [None]:
# should output "farmacia"
output.numpy()[0].decode()

In [None]:
for i in range(10, 20):
  print(f'Prediction: {model.translate(audiofiles[i]).numpy()[0].decode()}')
  print(f'Answer: {load_text(textfiles[i])}')
  print()

**Model Saving**

In [None]:
class Export(tf.Module):
  def __init__(self, model):
    self.model = model

  @tf.function(input_signature=[tf.TensorSpec(dtype= tf.string, shape=[])])
  def translate(self, inputs):
    return self.model.translate(inputs)

In [None]:
SAVE_PATH = # set save path here
export = Export(model)
export.translate(audiofiles[1024])
tf.saved_model.save(export, SAVE_PATH,
                    signatures={'serving_default': export.translate})