# 1- Installing & Importing Necessary Packages:

In [None]:
!pip install torch torchaudio transformers datasets

In [None]:
import torch
from torch import nn
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor, Wav2Vec2Config, Wav2Vec2Model
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset, concatenate_datasets
import soundfile as sf
import torchaudio

In [None]:
from huggingface_hub import notebook_login

notebook_login()

# 2- Downloading & Processing Datasets:

## Downloading & Processing Common Voice 13 Dataset:

In [None]:
common_voice = load_dataset("mozilla-foundation/common_voice_13_0", "ar", split="train+test+validation")

In [None]:
print(len(common_voice))

In [None]:
print(common_voice)

In [None]:
common_voice = common_voice.remove_columns(["client_id", "audio", "up_votes", "down_votes", "age", "gender", "accent", "locale", "segment", "variant"])

print(common_voice)

In [None]:
print(common_voice[0])

## Downloading & Processing Google Fleurs Dataset:

In [None]:
fleurs_train = load_dataset("google/fleurs", "ar_eg", split="train")

In [None]:
fleurs_test = load_dataset("google/fleurs", "ar_eg", split="test")

In [None]:
fleurs_val = load_dataset("google/fleurs", "ar_eg", split="validation")

In [None]:
print(len(fleurs_train))
print(len(fleurs_test))
print(len(fleurs_val))

In [None]:
print(fleurs_train)

In [None]:
def update_audio_path_train(data_item):
  parts = data_item["path"].split('/')
  parts.insert(-1, "train")
  data_item["path"] = '/'.join(parts)
  data_item["sentence"] = data_item["transcription"]
  return data_item
def update_audio_path_test(data_item):
  parts = data_item["path"].split('/')
  parts.insert(-1, "test")
  data_item["path"] = '/'.join(parts)
  data_item["sentence"] = data_item["transcription"]
  return data_item
def update_audio_path_val(data_item):
  parts = data_item["path"].split('/')
  parts.insert(-1, "dev")
  data_item["path"] = '/'.join(parts)
  data_item["sentence"] = data_item["transcription"]
  return data_item

In [None]:
fleurs_train = fleurs_train.map(update_audio_path_train)

In [None]:
fleurs_test = fleurs_test.map(update_audio_path_test)

In [None]:
fleurs_val = fleurs_val.map(update_audio_path_val)

In [None]:
fleurs_train = fleurs_train.remove_columns(["id", "num_samples", "audio", "transcription", "raw_transcription", "gender", "lang_id", "language", "lang_group_id"])

In [None]:
fleurs_test = fleurs_test.remove_columns(["id", "num_samples", "audio", "transcription", "raw_transcription", "gender", "lang_id", "language", "lang_group_id"])

In [None]:
fleurs_val = fleurs_val.remove_columns(["id", "num_samples", "audio", "transcription", "raw_transcription", "gender", "lang_id", "language", "lang_group_id"])

In [None]:
print(fleurs_train[0])
print(fleurs_test[0])
print(fleurs_val[0])

# 3- Combining Datasets & Creating The Vocabulary:

## Creating The Combined Dataset For Training & The Vocabulary:

In [None]:
combined_dataset = concatenate_datasets([common_voice, fleurs_train, fleurs_test])

In [None]:
print(len(common_voice))
print(len(fleurs_train))
print(len(fleurs_test))
print(len(fleurs_val))
print(len(combined_dataset))

In [None]:
print(combined_dataset)

In [None]:
print(combined_dataset[0])

In [None]:
vocabulary = ['ا', 'ب', 'ت', 'ث', 'ج', 'ح', 'خ', 'د', 'ذ', 'ر', 'ز', 'س', 'ش', 'ص', 'ض', 'ط', 'ظ', 'ع', 'غ', 'ف', 'ق', 'ك', 'ل', 'م', 'ن', 'ه', 'و', 'ي', 'ء', 'آ', 'أ', 'إ', 'ؤ', 'ئ', 'ة', 'ى', 'ﻻ', 'ﻷ', 'ﻹ', 'ﻵ',' ', '.']

In [None]:
print(len(vocabulary))

## Processing The Training & Testing Datasets:

In [None]:
def process_transcriptions(data_item):
  new_sentence = ''.join([char for char in data_item["sentence"] if char in vocabulary])
  data_item["sentence"] = new_sentence
  return data_item

In [None]:
combined_dataset = combined_dataset.map(process_transcriptions)

In [None]:
print(combined_dataset[0])

In [None]:
fleurs_val = fleurs_val.map(process_transcriptions)

# 4- Creating The Custom Processor:

This is used to map characters to integers and vice versa.

In [None]:
class CustomProcessor:
  def __init__(self, vocab):
    self.vocab = vocab
    self.char_to_index = {char: index for index, char in enumerate(vocab)}

  def text_to_int(self, text):
    return [self.char_to_index[char] for char in text]

  def int_to_text(self, indices):
    return ''.join([self.vocab[index] for index in indices])

In [None]:
object_voc = CustomProcessor(vocabulary)

In [None]:
text = "مرحبا"
encoding = object_voc.text_to_int(text)
decoding = object_voc.int_to_text(encoding)
print(encoding)
print(decoding)

# 5- Creating The Dataset Class:

In [None]:
import torch
from torch.utils.data import Dataset
import torchaudio

In [None]:
class CustomDataset(Dataset):
  def __init__(self, dataset, vocab):
    self.dataset = dataset
    self.processor = CustomProcessor(vocab)
    self.spectrogram_transform = torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128)

  def __len__(self):
    return len(self.dataset)

  def __getitem__(self, idx):
    audio_input, sampling_rate = torchaudio.load(self.dataset[idx]["path"])
    if sampling_rate != 16000:
      resampler = torchaudio.transforms.Resample(orig_freq=sampling_rate, new_freq=16000)
      audio_input = resampler(audio_input)

    # spectrogram = self.spectrogram_transform(audio_input).squeeze(0)
    spectrogram = self.spectrogram_transform(audio_input)
    spectrogram = (spectrogram - spectrogram.mean()) / spectrogram.std()

    sentence = self.dataset[idx]["sentence"]

    labels = self.processor.text_to_int(sentence)
    labels = torch.tensor(labels)

    return spectrogram, labels, spectrogram.shape[-1]

In [None]:
custom_data_set = CustomDataset(combined_dataset, vocabulary)
custom_test_data_set = CustomDataset(fleurs_val, vocabulary)

# 6- Model Architecture:

In [None]:
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

In [None]:
import math

class PositionalEncoding(nn.Module):
  def __init__(self, d_model, dropout=0.1, max_len=5000):
    super().__init__()
    self.dropout = nn.Dropout(p=dropout)
    position = torch.arange(max_len).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
    pe = torch.zeros(max_len, 1, d_model)
    pe[:, 0, 0::2] = torch.sin(position * div_term)
    pe[:, 0, 1::2] = torch.cos(position * div_term)
    self.register_buffer('pe', pe)

  def forward(self, x):
    x = x + self.pe[:x.size(0)]
    return self.dropout(x)

In [None]:
import torch
import torch.nn as nn

class CustomSTTModel2(nn.Module):
  def __init__(self, num_classes):
    super(CustomSTTModel2, self).__init__()
    self.conv1 = nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    self.bn1 = nn.BatchNorm2d(32)
    self.conv2 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    self.bn2 = nn.BatchNorm2d(64)
    self.conv3 = nn.Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    self.bn3 = nn.BatchNorm2d(128)
    self.conv4 = nn.Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    self.bn4 = nn.BatchNorm2d(256)
    self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

    self.skip_conv2 = nn.Conv2d(32, 64, kernel_size=(1, 1), stride=(1, 1))
    self.skip_conv3 = nn.Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1))
    self.skip_conv4 = nn.Conv2d(128, 256, kernel_size=(1, 1), stride=(1, 1))

    self.pos_encoder = PositionalEncoding(d_model=8192)

    self.gru1 = nn.GRU(input_size=8192, hidden_size=128, num_layers=1, bidirectional=True, batch_first=True)
    self.gru2 = nn.GRU(input_size=256, hidden_size=256, num_layers=1, bidirectional=True, batch_first=True)

    self.dropout = nn.Dropout(0.5)

    self.fc = nn.Linear(512, 256)
    self.out = nn.Linear(256, num_classes + 1)

  def forward(self, x):
    skip_connection = x
    x = self.conv1(x)
    x = self.bn1(x)
    x = nn.ELU()(x)
    # x is torch.Size([8, 32, 128, 490])
    x = x + skip_connection  # Element-wise addition (broadcasting works)
    x = self.pool(x)

    skip_connection = self.skip_conv2(x) # Adjusting skip_connection to be able to add it with x
    x = self.conv2(x)
    x = self.bn2(x)
    x = nn.ELU()(x)
    x = x + skip_connection
    x = self.pool(x)

    # print(x.shape)
    # torch.Size([8, 64, 32, 166]) [Batch Size, Channels, Height(frequency), Width(time)]

    skip_connection = self.skip_conv3(x)
    x = self.conv3(x)
    x = self.bn3(x)
    x = nn.ELU()(x)
    x = x + skip_connection
    # x = self.pool(x) Removed due to introducing numerical instability
    # print(x.shape)
    # print(skip_connection.shape)
    # torch.Size([8, 128, 32, 166]) [Batch Size, Channels, Height(frequency), Width(time)]

    skip_connection = self.skip_conv4(x)
    x = self.conv4(x)
    x = self.bn4(x)
    x = nn.ELU()(x)
    x = x + skip_connection
    # x = self.pool(x) Removed due to numerical instability
    # print(x.shape)
    # torch.Size([8, 256, 32, 166]) [Batch Size, Channels, Height(frequency), Width(time)]
    x = x.permute(0, 3, 1, 2)  # Rearrange the dimensions for GRU

    # print(x.shape)
    # torch.Size([8, 166, 256, 32]) [Batch Size, Width, Channels, Height]
    x = torch.flatten(x, start_dim=2)  # Flatten the non-time dimensions (channels and frequency bins) into a single feature dimension for GRU as well
    # print(x.shape)
    # torch.Size([8, 166, 8192]) [Batch Size, Width, Channels * Height]

    x = self.pos_encoder(x)

    # print(x.shape)

    x, _ = self.gru1(x)
    # print(x.shape)
    x = self.dropout(x)

    x, _ = self.gru2(x)
    # print(x.shape)
    x = self.dropout(x)


    x = self.fc(x)
    x = nn.ELU()(x)
    x = self.dropout(x)
    x = self.out(x)
    # print(x.shape)
    # print(x)
    return x

In [None]:
custom_model_2 = CustomSTTModel2(len(vocabulary))

# 7- Training Phase:

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Creating The Collate Function & Initializing Train & Test Loaders:

In [None]:
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
  spectrograms, labels, input_lengths = zip(*batch)

  spectrograms = [s.permute(2, 0, 1) for s in spectrograms]  # Shape: [time_steps, channels, n_mels] since we need to pad the time_steps and pad_sequence pads the first dimension

  spectrograms = pad_sequence(spectrograms, batch_first=True, padding_value=0) # Pad the spectrograms to have the same time length

  spectrograms = spectrograms.permute(0, 2, 3, 1)  # Shape: [batch, channels, n_mels, time_steps] we just return back the original dimensions

  label_lengths = torch.tensor([len(label) for label in labels], dtype=torch.long, device=device)

  labels = torch.cat(labels)

  input_lengths = torch.tensor(input_lengths)

  return spectrograms, labels, input_lengths, label_lengths

In [None]:
batch_size = 8
train_loader = DataLoader(custom_data_set, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(custom_test_data_set, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

## Creating The Testing Loop:

In [None]:
def test(model, data_loader, criterion, device):
  model.eval()
  total_loss = 0
  with torch.no_grad():
    for batch_idx, (spectrograms, labels, input_lengths, label_lengths) in enumerate(data_loader):
      spectrograms, labels = spectrograms.to(device), labels.to(device)
      outputs = model(spectrograms)
      output_lengths = outputs.shape[1]
      output_lengths = torch.full((outputs.shape[0],), output_lengths, dtype=torch.int64)

      log_probs = torch.nn.functional.log_softmax(outputs, dim=2)
      log_probs = log_probs.permute(1, 0, 2).to(device)
      label_lengths = label_lengths.to(device)
      output_lengths = output_lengths.to(device)

      loss = criterion(log_probs, labels, output_lengths, label_lengths)
      total_loss += loss.item()

  avg_loss = total_loss / len(data_loader)
  return avg_loss

## Creating The Training Loop:

In [None]:
best_loss = 10000

def train(model, data_loader, val_loader, criterion, optimizer, epochs, device):
  global best_loss
  model.to(device)
  for epoch in range(epochs):
    model.train()
    total_loss = 0
    for batch_idx, (spectrograms, labels, input_lengths, label_lengths) in enumerate(train_loader):
      spectrograms, labels = spectrograms.to(device), labels.to(device)
      outputs = model(spectrograms) # [batch, output sequence length, classes]
      output_lengths = outputs.shape[1]
      output_lengths = torch.full((outputs.shape[0],), output_lengths, dtype=torch.int64)
      # print(outputs)
      log_probs = torch.nn.functional.log_softmax(outputs, dim=2) # Because CTC expects log softmax probabilities
      # print(log_probs.shape)

      log_probs = log_probs.permute(1, 0, 2) # The output of the network needs to be in the shape (output sequence length, batch, classes)
      # print(log_probs.shape)
      log_probs = log_probs.to(device)
      label_lengths = label_lengths.to(device)
      output_lengths = output_lengths.to(device)
      # print(output_lengths)
      # print(log_probs.shape)
      # print(label_lengths)
      # print(labels.shape)

      loss = criterion(log_probs, labels, output_lengths, label_lengths)
      if torch.isinf(loss): # This is due to a few numerical instabilities.
        # print(output_lengths)
        # print(log_probs.shape)
        # print(label_lengths)
        # print(labels.shape)
        # print(outputs)
        # print(log_probs)
        continue

      total_loss += loss.item()

      optimizer.zero_grad()
      loss.backward()
      torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
      optimizer.step()

      print(f"Epoch [{epoch+1}/{epochs}], Batch [{batch_idx+1}/{len(data_loader)}], Loss: {loss.item():.4f}")

    avg_loss = total_loss / len(data_loader)

    avg_val_loss = test(model, val_loader, criterion, device)
    print("validation loss: ", avg_val_loss)

    if epoch % 2 == 0:
      torch.save(model.state_dict(), f"model_state_dict_epoch_{epoch}.pth")

    if avg_loss < best_loss:
      best_loss = avg_loss
      torch.save(model.state_dict(), "best_model_state_dict.pth")

## Training & Testing:

In [None]:
from torch import nn
import torch.optim as optim

criterion = nn.CTCLoss(blank=len(vocabulary)).to(device)

In [None]:
optimizer = optim.Adam(custom_model_2.parameters(), lr=0.001)

In [None]:
epochs = 10
train(custom_model_2, train_loader, test_loader, criterion, optimizer, epochs, device)

# 8- Inference Phase:

## Creating Audio-To-Spectrogram Transformation:

In [None]:
def transform_audio_to_spectrogram(audio_path, transform):
  waveform, sample_rate = torchaudio.load(audio_path)
  if waveform.shape[0] == 2: # Convert stereo to mono
    waveform = torch.mean(waveform, dim=0, keepdim=True)

  if sample_rate != 16000:
    resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
    waveform = resampler(waveform)

  spectrogram = transform(waveform)
  spectrogram = (spectrogram - spectrogram.mean()) / spectrogram.std()
  return spectrogram

In [None]:
spectrogram_transform = torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128)

## Loading Model Weights:

In [None]:
model_path = "best_model_state_dict.pth"
model = CustomSTTModel2(len(vocabulary))
model.load_state_dict(torch.load(model_path))
model.to(device)
model.eval()

## Inference:

In [None]:
def infer(audio_path, model, transform, processor):
  spectrogram = transform_audio_to_spectrogram(audio_path, transform)
  spectrogram = spectrogram.unsqueeze(0).to(device)

  with torch.no_grad():
    outputs = model(spectrogram)
    outputs = torch.nn.functional.log_softmax(outputs, dim=2)
    outputs = outputs.permute(1, 0, 2)

  decoded_preds = torch.argmax(outputs, dim=2)
  decoded_preds = decoded_preds.transpose(0, 1)

  decoded_preds_list = decoded_preds.flatten().tolist()
  # print(decoded_preds_list)
  pred_text = processor.int_to_text([i for i in decoded_preds_list if i < len(processor.vocab)])

  return pred_text

In [None]:
object_voc = CustomProcessor(vocabulary)
audio_file = "r3.wav"
# audio_file = dataset[0]["path"]
predicted_text = infer(audio_file, model, spectrogram_transform, object_voc)
print(predicted_text)