<h2><u>Ringkasan</u></h2>

Dalam proyek ini, kami mencoba menerapkan arsitektur Transformer yang terinspirasi dari paper Attention is All You Need (oleh Google Brain) dan arsitektur Whisper dari OpenAI untuk melakukan Automatic Speech Recognition dalam bahasa Indonesia. Data yang digunakan dalam proyek ini adalah Common Voice (sumber: huggingface) untuk bahasa Indonesia.

<h3>Import Necessary Libraries</h3>

In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
import torchaudio.functional as F_audio
import torchaudio.transforms as T
from torchinfo import summary
import seaborn as sns
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import string

from PreprocessAudio import PreprocessAudio

<h3>Read and Clean the Metadatas</h3>

Metadata ini akan digunakan untuk membuat dataset. Pertama, diperlukan metadata dengan format ```audio_file_path, transcription```. Pada proyek ini, transkripsi audio diharapkan dapat menebak kata-kata yang dieja dalam file audio dengan benar, sehingga keberadaan tanda baca atau huruf kapital dapat diabaikan. Hal ini juga bersifat menguntungkan karena dapat meningkatkan akurasi.

In [7]:
df  = pd.read_csv('./common_voice_id/dev.tsv', sep='\t')
df2 = pd.read_csv('./common_voice_id/invalidated.tsv', sep='\t')
df3 = pd.read_csv('./common_voice_id/other.tsv', sep='\t')
df4 = pd.read_csv('./common_voice_id/train.tsv', sep='\t')

df  = df[['path', 'sentence']]
df2 = df2[['path', 'sentence']]
df3 = df3[['path', 'sentence']]
df4 = df4[['path', 'sentence']]

df = pd.concat([df, df2, df3, df4], ignore_index=True)
df = df.sample(frac=1, random_state=42)
df = df.reset_index(drop=True)

In [8]:
df

Unnamed: 0,path,sentence
0,common_voice_id_26747327.mp3,"Kamu harus melakukannya, suka tidak suka."
1,common_voice_id_21699230.mp3,Saya dibonceng di belakang sepeda teman.
2,common_voice_id_25248896.mp3,Tom berkata dia dapat menunggu lama.
3,common_voice_id_25537482.mp3,Minggu lalu terus-menerus hujan.
4,common_voice_id_21195036.mp3,Saat libur musim panas tahun ini saya pergi ke...
...,...,...
40199,common_voice_id_25039385.mp3,Aku menyuruh adikku untuk membeli gula di warung.
40200,common_voice_id_25426706.mp3,perusahaan yang berkembang selalu diikuti deng...
40201,common_voice_id_26229445.mp3,Melepaskan yang melekat membawa ke Nirvana.
40202,common_voice_id_20954419.mp3,dia tidak pernah ke dokter gigi selama hidupnya


In [9]:
df_testing = df[:500]

In [10]:
def remove_strips(text):
    # Mengganti tanda hubung (-) dengan spasi dan menghapus tanda baca di awal dan akhir kata
    cleaned_text = text.replace('“', '')
    cleaned_text = cleaned_text.replace('”', '')
    cleaned_text = cleaned_text.replace('-', ' ')
    cleaned_text = cleaned_text.strip(string.punctuation)
    return cleaned_text

for i in range(len(df_testing['sentence'])):
    # print(text)
    df_testing['sentence'][i] = df_testing['sentence'][i].lower()
    df_testing['sentence'][i] = ' '.join(word.strip(string.punctuation) for word in df_testing['sentence'][i].split())
    df_testing['sentence'][i] = ' '.join(remove_strips(word) for word in df_testing['sentence'][i].split())

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_testing['sentence'][i] = df_testing['sentence'][i].lower()
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_testing['sentence'][i] = ' '.join(word.strip(string.punctuation) for word in df_testing['sentence'][i].split())
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_testing['sentence'][i] = ' '.join(remove_strips(word) for word in df_testing['sentence'][i].split())


Dipersiapkan sebuah direktori untuk menyimpan audio yang akan diolah dan nantinya akan dikelompokkan dalam data train dan validasi.

In [11]:
label_list = []
HOME = os.getcwd()
for i in range(len(df_testing)):
    os.system(f'copy \"{HOME}\\common_voice_id\\clips\\{df_testing["path"][i]}\" \"{HOME}\\audio_folder\\{df_testing["path"][i]}\"')
    label_list.append(df_testing['sentence'][i])

In [12]:
print(f'Banyaknya label: {len(label_list)}')
print('Sampel 10 label')
label_list[:10]

Banyaknya label: 500
Sampel 10 label


['kamu harus melakukannya suka tidak suka',
 'saya dibonceng di belakang sepeda teman',
 'tom berkata dia dapat menunggu lama',
 'minggu lalu terus menerus hujan',
 'saat libur musim panas tahun ini saya pergi ke laut dan mendaki gunung',
 'dia memanggil namanya',
 'saat berada di sana saya belajar bahasa inggris',
 'di mana kamu membeli buku itu',
 'sepuluh tahun adalah waktu yang lama untuk menunggu',
 'di atas meja ada vas bunga']

Dataset yang akan kita latih diharapkan memiliki format sebagai berikut.

```python
[[tensor_1], [transcription_1],
 [tensor_2], [transcription_2],
 ...
 [tensor_n], [transcription_n]]
```

Untuk itu modul ```Dataset``` oleh PyTorch dapat digunakan untuk membuat dataset ini. Modul ini dipanggil dengan syntax

```python
from torch.utils.data import Dataset
```

dengan ukuran batch (batch size) sebesar 64.

tensor_i adalah tensor yang memuat matriks MFCC dari sebuah audio. Meninjau ulang bahwa sebuah matriks MFCC memiliki ukuran ```(n_mfcc, timesteps)``` dengan ```n_mfcc=64``` dan timesteps bergantung dari audio yang memiliki durasi terpanjang (timesteps tidak sama dengan durasi audio).

transcription_i adalah tensor yang memuat transkripsi yang telah di encode menjadi angka dalam dictionary encoder yang ditentukan oleh user (biasa disebut sebagai vocabulary). tensor ini memiliki ukuran ```(1, max_len)``` dimana ```max_len``` adalah transkripsi terpanjang dari sebuah audio. Perlu diingat bahwa panjang transkripsi maksimum akan dibatasi sebesar 256.

In [13]:
alphabet = 'abcdefghijklmnopqrstuvwxyz '
max_timestamps = 2752
max_len = 256

alphabets = ['', ' '] + [chr(i + 96) for i in range(1, 27)]
char2num_dict, num2char_dict = {}, {}

for index, chars in enumerate(alphabets):
    char2num_dict[chars] = index
    num2char_dict[index] = chars

def conv_char2num(label, maxlen=max_len):
    label = label[:maxlen].lower()
    label_enc = []
    padding_len = maxlen - len(label)
    for i in label:
        label_enc.append(char2num_dict[i])
    return np.array(label_enc + [0] * padding_len)

def conv_num2char(num):
    txt = ""
    for i in num:
        if i == 0:
            break
        else:
            txt += num2char_dict[i]
    
    return txt

In [14]:
example_text = 'Saya berangkat ke sekolah di pagi hari'
print(conv_char2num(example_text))

[20  2 26  2  1  3  6 19  2 15  8 12  2 21  1 12  6  1 20  6 12 16 13  2
  9  1  5 10  1 17  2  8 10  1  9  2 19 10  0  0  0  0  0  0  0  0  0  0
  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0]


In [15]:
def split_data(df, test_size=0.8):
    data_size = len(df)
    df = df.sample(frac=1).reset_index(drop=True)
    split = int(test_size * data_size)
    df_train, df_valid = df[:split], df[split:]
    df_train
    
    return df_train, df_valid.reset_index(drop=True)

df_train, df_valid = split_data(df_testing)

In [16]:
# Copy train audio data to 'train' folder
label_list_train = []

for i in range(len(df_train)):
    os.system(f'move \"{HOME}\\audio_folder\\{df_train["path"][i]}\" \"{HOME}\\audio_folder\\train\\{df_train["path"][i]}\"')
    label_list_train.append(df_train['sentence'][i])

In [17]:
# Copy train audio data to 'valid' folder
label_list_valid = []

for i in range(len(df_valid)):
    os.system(f'move \"{HOME}\\audio_folder\\{df_valid["path"][i]}\" \"{HOME}\\audio_folder\\valid\\{df_valid["path"][i]}\"')
    label_list_valid.append(df_valid['sentence'][i])

In [18]:
PipelineTrain = PreprocessAudio('./audio_folder/train/', df_train, 35)
PipelineValid = PreprocessAudio('./audio_folder/valid/', df_valid, 35)

In [19]:
dataset_train, df_train_filtered = PipelineTrain.load_audio()
dataset_valid, df_valid_filtered = PipelineValid.load_audio()

Mounted audio directory at: ./audio_folder/train/


  return torch.tensor([signal])


Counter di 2
Error di file ./audio_folder/train/common_voice_id_21192699.mp3
Counter di 108
Error di file ./audio_folder/train/common_voice_id_21194463.mp3
Counter di 162
Error di file ./audio_folder/train/common_voice_id_26242833.mp3
Counter di 174
Error di file ./audio_folder/train/common_voice_id_21699467.mp3
Counter di 179
Error di file ./audio_folder/train/common_voice_id_20847480.mp3
Counter di 188
Error di file ./audio_folder/train/common_voice_id_21194346.mp3
Counter di 317
Error di file ./audio_folder/train/common_voice_id_35338065.mp3
Counter di 329
Error di file ./audio_folder/train/common_voice_id_26237570.mp3
Counter di 343
Error di file ./audio_folder/train/common_voice_id_19783809.mp3
Counter di 350
Error di file ./audio_folder/train/common_voice_id_21587706.mp3
Counter di 389
Error di file ./audio_folder/train/common_voice_id_25469455.mp3
Mounted audio directory at: ./audio_folder/valid/
Counter di 31
Error di file ./audio_folder/valid/common_voice_id_25470004.mp3


In [20]:
def add_padding(mfcc_tensor, index, n_mfcc=64, max_padding=512):
    height, width = np.array(mfcc_tensor[index][0]).shape[0], np.array(mfcc_tensor[index][0]).shape[1]
    
    padded_mfcc = np.zeros([max_padding, n_mfcc])
    padded_mfcc[:height, :width] = mfcc_tensor[index][0]
    return padded_mfcc

In [21]:
train_dataset_list = []
for i in range(len(dataset_train)):
    train_dataset_list.append(add_padding(dataset_train, i, max_padding=max_timestamps).tolist())

In [22]:
valid_dataset_list = []
for i in range(len(dataset_valid)):
    valid_dataset_list.append(add_padding(dataset_valid, i, max_padding=max_timestamps).tolist())

In [23]:
train_dataset_list = torch.tensor(train_dataset_list)
valid_dataset_list = torch.tensor(valid_dataset_list)

In [24]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
train_x = train_dataset_list.to(device)
valid_x = valid_dataset_list.to(device)

In [25]:
df_train_filtered.head()

Unnamed: 0,path,sentence
0,common_voice_id_26009087.mp3,tetapi apa yang bisa dia lakukan
1,common_voice_id_25448391.mp3,ini kok lemot ya
2,common_voice_id_35281926.mp3,dari stasiun sampai perusahaan saya naik taksi
3,common_voice_id_35507973.mp3,saya mengambil banyak foto saat jalan jalan
4,common_voice_id_26628186.mp3,saya tahu dia tidak dapat bekerja lagi


In [26]:
train_y, valid_y = [], []

for text in df_train_filtered['sentence']:
    train_y.append(conv_char2num(text))

for text in df_valid_filtered['sentence']:
    valid_y.append(conv_char2num(text))

In [27]:
train_y = torch.tensor(train_y)
valid_y = torch.tensor(valid_y)

In [28]:
from torch.utils.data import Dataset, DataLoader

class myDataset(Dataset):
    def __init__(self, data, transcriptions):
        self.data = data
        self.transcriptions = transcriptions
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        mfcc_matrix = self.data[index]
        transcription = self.transcriptions[index]
        return mfcc_matrix, transcription

In [31]:
my_train_dataset = myDataset(train_x, train_y)
my_valid_dataset = myDataset(valid_x, valid_y)

In [37]:
batch_size = 64
train_set = DataLoader(my_train_dataset, batch_size=batch_size, shuffle=True)
valid_set = DataLoader(my_valid_dataset, batch_size=batch_size, shuffle=True)

<h3>Build Transformer Model </h3>

In [42]:
class GELU(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        error_const = torch.erf(x / math.sqrt(2.0))
        x = x * 0.5 * (1.0 + error_const)
        return x

class TokenEmbedding(nn.Module):
    def __init__(self, num_vocab=30, num_hid=64):
        super().__init__()
        self.num_vocab = num_vocab
        self.num_hid = num_hid

    def forward(self, x):
        maxlen = x.shape[-1]
        pos = torch.arange(0, maxlen, 1)
        emb = nn.Embedding(num_vocab, num_hid)(x)
        pos_emb = nn.Embedding(maxlen, num_hid)(x)
        return emb + pos_emb

class SpeechFeatureEmbedding(nn.Module):
    def __init__(self, num_hid=64, maxlen=100):
        super().__init__()
        self.conv1 = nn.Conv1d(in_channels=num_hid, out_channels=num_hid, kernel_size=11, stride=2, padding=5)
        self.conv2 = nn.Conv1d(in_channels=num_hid, out_channels=num_hid, kernel_size=11, stride=2, padding=5)
        self.conv3 = nn.Conv1d(in_channels=num_hid, out_channels=num_hid, kernel_size=11, stride=2, padding=5)
        self.gelu = GELU()

    def forward(self, x):
        x = self.conv1(x.T)
        x = self.gelu(x)
        x = self.conv2(x)
        x = self.gelu(x)
        x = self.conv3(x)
        x = self.gelu(x)
        return x

class TransformerEncoder(nn.Module):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, rate=0.1):
        super(TransformerEncoder, self).__init__()
        self.att = nn.MultiheadAttention(embed_dim, num_heads)
        self.ffn = nn.Sequential(
            nn.Linear(embed_dim, feed_forward_dim),
            GELU(),
            nn.Linear(feed_forward_dim, embed_dim)
        )
        self.layernorm1 = nn.LayerNorm(embed_dim, eps=1e-6)
        self.layernorm2 = nn.LayerNorm(embed_dim, eps=1e-6)
        self.dropout1 = nn.Dropout(rate)
        self.dropout2 = nn.Dropout(rate)

    def forward(self, inputs):
        inputs = inputs.T
        attn_output = self.att(inputs, inputs, inputs)[0]
        attn_output = self.dropout1(attn_output)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output)
        return self.layernorm2(out1 + ffn_output)

class TransformerDecoder(nn.Module):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, dropout_rate=0.1):
        super().__init__()
        self.layernorm1 = nn.LayerNorm(embed_dim, eps=1e-6)
        self.layernorm2 = nn.LayerNorm(embed_dim, eps=1e-6)
        self.layernorm3 = nn.LayerNorm(embed_dim, eps=1e-6)
        self.self_att = nn.MultiheadAttention(embed_dim, num_heads)
        self.enc_att = nn.MultiheadAttention(embed_dim, num_heads)
        self.self_dropout = nn.Dropout(0.5)
        self.enc_dropout = nn.Dropout(0.1)
        self.ffn_dropout = nn.Dropout(0.1)
        self.ffn = nn.Sequential(
            nn.Linear(embed_dim, feed_forward_dim),
            GELU(),
            nn.Linear(feed_forward_dim, embed_dim)
        )

    def causalAttentionMask(self, batch_size, n_dest, n_src, dtype):
        i = torch.arange(n_dest)[:, None]
        j = torch.arange(n_src)
        m = i >= j - n_src + n_dest
        mask = m.to(dtype)
        mask = mask.reshape(1, n_dest, n_src)
        mult = torch.cat([torch.tensor([batch_size], dtype=torch.int32), torch.tensor([1, 1], dtype=torch.int32)])
        mult = mult.unsqueeze(0)
        return mask.expand(*mult)

    def forward(self, enc_out, target):
        input_shape = target.shape
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        causal_mask = self.causalAttentionMask(batch_size, seq_len, seq_len, target.dtype)
        target_att = self.self_att(target, target, target, att_mask=causal_mask)[0]
        target_norm = self.layernorm1(target + self.self_dropout(target_att))
        enc_out = self.enc_att(target_norm, target_norm, enc_out)[0]
        enc_out_norm = self.layernorm2(self.enc_dropout(enc_out) + target_norm)
        ffn_out = self.ffn(enc_out_norm)
        ffn_out_norm = self.layernorm3(enc_out_norm + self.ffn_dropout(ffn_out))
        return ffn_out_norm

class Transformer(nn.Module):
    def __init__(
        self,
        num_hid = 64,
        num_head = 2,
        num_feed_forward = 128,
        source_maxlen = 100,
        target_maxlen = 100,
        num_layers_enc = 4,
        num_layers_dec = 1,
        num_classes = 10
    ):
        super().__init__()
        self.loss_metric = nn.MSELoss()
        self.num_layers_enc = num_layers_enc
        self.num_layers_dec = num_layers_dec
        self.target_maxlen = target_maxlen
        self.num_classes = num_classes

        self.enc_input = SpeechFeatureEmbedding(num_hid=num_hid, maxlen=source_maxlen)
        self.dec_input = TokenEmbedding(num_vocab=num_classes, num_hid=num_hid)

        self.encoder = nn.Sequential(
            self.enc_input,
            *[TransformerEncoder(num_hid, num_head, num_feed_forward) for _ in range(num_layers_enc)]
        )

        for i in range(num_layers_dec):
            self.add_module(
                f"dec_layer_{i}",
                TransformerDecoder(num_hid, num_head, num_feed_forward),
            )

        self.classifier = nn.Linear(num_hid, num_classes)

    def decode(self, enc_out, target):
        y = self.dec_input(target)
        for i in range(self.num_layers_dec):
            dec_layer = getattr(self, f"dec_layer_{i}")
            y = dec_layer(enc_out, y)
        return y

    def forward(self, inputs):
        source = inputs[0]
        target = inputs[1]
        x = self.encoder(source)
        y = self.decode(x, target)
        return self.classifier(y)