In [1]:
import os
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim
import torch.nn.functional as F
import torchaudio
import numpy as np
from glob import glob
from torch.utils.data import Dataset, DataLoader ,TensorDataset
import time

In [2]:
torch.cuda.is_available()

True

In [3]:
torch.cuda.current_device()

0

In [4]:
torch.cuda.device(0)

<torch.cuda.device at 0x26416e1f850>

In [5]:
torch.cuda.device_count()

1

In [6]:
torch.cuda.get_device_name(0)

'NVIDIA GeForce RTX 3050 Laptop GPU'

In [7]:
all_train_loss = []
all_test_loss = []
all_cer = []
train = "./train"
test = "./test"

In [8]:
class Speech_Dataset(Dataset):
    def __init__(self, spectrograms, labels, input_lengths, label_lengths):
        
        super(Dataset, self).__init__()
        self.spectrograms = spectrograms
        self.labels = labels
        self.input_lengths = input_lengths
        self.label_lengths = label_lengths
              
    def __len__(self):
        return len(self.spectrograms)
    
    def __getitem__(self, index):        
        return (self.spectrograms[index] , self.labels[index] , self.input_lengths[index] , self.label_lengths[index]);

In [9]:
def _levenshtein_distance(ref, hyp):
    """Levenshtein distance is a string metric for measuring the difference
    between two sequences. Informally, the levenshtein disctance is defined as
    the minimum number of single-character edits (substitutions, insertions or
    deletions) required to change one word into the other. We can naturally
    extend the edits to word level when calculate levenshtein disctance for
    two sentences.
    """
    m = len(ref)
    n = len(hyp)

    # special case
    if ref == hyp:
        return 0
    if m == 0:
        return n
    if n == 0:
        return m

    if m < n:
        ref, hyp = hyp, ref
        m, n = n, m

    # use O(min(m, n)) space
    distance = np.zeros((2, n + 1), dtype=np.int32)

    # initialize distance matrix
    for j in range(0,n + 1):
        distance[0][j] = j

    # calculate levenshtein distance
    for i in range(1, m + 1):
        prev_row_idx = (i - 1) % 2
        cur_row_idx = i % 2
        distance[cur_row_idx][0] = i
        for j in range(1, n + 1):
            if ref[i - 1] == hyp[j - 1]:
                distance[cur_row_idx][j] = distance[prev_row_idx][j - 1]
            else:
                s_num = distance[prev_row_idx][j - 1] + 1
                i_num = distance[cur_row_idx][j - 1] + 1
                d_num = distance[prev_row_idx][j] + 1
                distance[cur_row_idx][j] = min(s_num, i_num, d_num)

    return distance[m % 2][n]

In [10]:
def char_errors(reference, hypothesis, ignore_case=False, remove_space=False):
    """Compute the levenshtein distance between reference sequence and
    hypothesis sequence in char-level.
    :param reference: The reference sentence.
    :type reference: basestring
    :param hypothesis: The hypothesis sentence.
    :type hypothesis: basestring
    :param ignore_case: Whether case-sensitive or not.
    :type ignore_case: bool
    :param remove_space: Whether remove internal space characters
    :type remove_space: bool
    :return: Levenshtein distance and length of reference sentence.
    :rtype: list
    """
    if ignore_case == True:
        reference = reference.lower()
        hypothesis = hypothesis.lower()

    join_char = ' '
    if remove_space == True:
        join_char = ''

    reference = join_char.join(filter(None, reference.split(' ')))
    hypothesis = join_char.join(filter(None, hypothesis.split(' ')))

    edit_distance = _levenshtein_distance(reference, hypothesis)
    return float(edit_distance), len(reference)

In [11]:
def cer(reference, hypothesis, ignore_case=False, remove_space=False):
    """Calculate charactor error rate (CER). CER compares reference text and
    hypothesis text in char-level. CER is defined as:
    .. math::
        CER = (Sc + Dc + Ic) / Nc
    where
    .. code-block:: text
        Sc is the number of characters substituted,
        Dc is the number of characters deleted,
        Ic is the number of characters inserted
        Nc is the number of characters in the reference
    We can use levenshtein distance to calculate CER. Chinese input should be
    encoded to unicode. Please draw an attention that the leading and tailing
    space characters will be truncated and multiple consecutive space
    characters in a sentence will be replaced by one space character.
    :param reference: The reference sentence.
    :type reference: basestring
    :param hypothesis: The hypothesis sentence.
    :type hypothesis: basestring
    :param ignore_case: Whether case-sensitive or not.
    :type ignore_case: bool
    :param remove_space: Whether remove internal space characters
    :type remove_space: bool
    :return: Character error rate.
    :rtype: float
    :raises ValueError: If the reference length is zero.
    """
    edit_distance, ref_len = char_errors(reference, hypothesis, ignore_case,
                                         remove_space)

    if ref_len == 0:
        raise ValueError("Length of reference should be greater than 0.")

    cer = float(edit_distance) / ref_len
    return cer

In [12]:
class TextTransform:
    """Maps characters to integers and vice versa"""
    def __init__(self):
        char_map_str = """
        ' 0
        <SPACE> 1
        a 2
        b 3
        c 4
        ç 5
        d 6
        e 7
        f 8
        g 9
        ğ 10
        h 11
        ı 12
        i 13
        j 14
        k 15
        l 16
        m 17
        n 18
        o 19
        ö 20
        p 21
        q 22
        r 23
        s 24
        ş 25
        t 26
        u 27
        ü 28
        v 29
        w 30
        x 31
        y 32
        z 33
        """
        self.char_map = {}
        self.index_map = {}
        for line in char_map_str.strip().split('\n'):
            ch, index = line.split()
            self.char_map[ch] = int(index)
            self.index_map[int(index)] = ch
        self.index_map[1] = ' '

    def text_to_int(self, text):
        """ Use a character map and convert text to an integer sequence """
        int_sequence = []
        for c in text:
            if c == ' ':
                ch = self.char_map['<SPACE>']
            elif c not in self.char_map:
                ch = self.char_map['<SPACE>']
            else:
                ch = self.char_map[c]
            int_sequence.append(ch)
        return int_sequence

    def int_to_text(self, labels):
        """ Use a character map and convert integer labels to an text sequence """
        string = []
        for i in labels:
            string.append(self.index_map[i])
        return ''.join(string).replace('<SPACE>', ' ')

In [13]:
train_audio_transforms = nn.Sequential(
    torchaudio.transforms.MelSpectrogram(sample_rate=44100 , n_fft = 882)
)

text_transform = TextTransform()



In [14]:
def data_processing(wavs , ids):
    spectrograms = []
    labels = []
    input_lengths = []
    label_lengths = []
    for wav in wavs:
        waveform , _ = torchaudio.load(wav)
        spec = train_audio_transforms(waveform).squeeze(0).transpose(0, 1)
        spectrograms.append(spec)
        text = ids[wav.strip().split("\\")[1].split(".")[0]]
        label = torch.Tensor(text_transform.text_to_int(text.lower()))
        labels.append(label)
        input_lengths.append(spec.shape[0]//2)
        label_lengths.append(len(label))

    spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True).unsqueeze(1).transpose(2, 3)
    labels = nn.utils.rnn.pad_sequence(labels, batch_first=True)
    return spectrograms, labels, input_lengths, label_lengths

In [15]:
def GreedyDecoder(output, labels, label_lengths, blank_label=34, collapse_repeated=True):
    arg_maxes = torch.argmax(output, dim=2)
    decodes = []
    targets = []
    for i, args in enumerate(arg_maxes):
        decode = []
        targets.append(text_transform.int_to_text(labels[i][:label_lengths[i]].tolist()))
        for j, index in enumerate(args):
            if index != blank_label:
                if collapse_repeated and j != 0 and index == args[j -1]:
                    continue
                decode.append(index.item())
        decodes.append(text_transform.int_to_text(decode))
    return decodes, targets

## The Model
Base of of Deep Speech 2 with some personal improvements

In [16]:
class CNNLayerNorm(nn.Module):
    """Layer normalization built for cnns input"""
    def __init__(self, n_feats):
        super(CNNLayerNorm, self).__init__()
        self.layer_norm = nn.LayerNorm(n_feats)

    def forward(self, x):
        # x (batch, channel, feature, time)
        x = x.transpose(2, 3).contiguous() # (batch, channel, time, feature)
        x = self.layer_norm(x)
        x = x.transpose(2, 3).contiguous()
        return x # (batch, channel, feature, time) 

In [17]:
class BidirectionalLSTM(nn.Module):

    def __init__(self, rnn_dim, hidden_size, dropout, batch_first):
        super(BidirectionalLSTM, self).__init__()

        self.BiLSTM = nn.LSTM(
            input_size=rnn_dim, hidden_size=hidden_size,
            num_layers=1, batch_first=batch_first, bidirectional=True)
        self.layer_norm = nn.LayerNorm(rnn_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.layer_norm(x)
        x = F.gelu(x)
        x, _ = self.BiLSTM(x)
        x = self.dropout(x)
        return x

In [18]:
class SpeechRecognitionModel(nn.Module):
    
    def __init__(self, n_cnn_layers, n_rnn_layers, rnn_dim, n_class, n_feats, stride=2, dropout=0.1):
        super(SpeechRecognitionModel, self).__init__()
        n_feats = n_feats//2
        self.cnn = nn.Conv2d(1, 32, 3, stride=stride, padding=3//2)  # cnn for extracting heirachal features

        self.birnn_layers = nn.Sequential(*[
            BidirectionalLSTM(rnn_dim=2048 if i==0 else rnn_dim*2,
                             hidden_size=rnn_dim, dropout=dropout, batch_first=i==0)
            for i in range(n_rnn_layers)
        ])
        self.classifier = nn.Sequential(
            nn.Linear(rnn_dim*2, rnn_dim),  # birnn returns rnn_dim*2
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(rnn_dim, n_class)
        )

    def forward(self, x):
        x = self.cnn(x)
        sizes = x.size()
        x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])  # (batch, feature, time)
        x = x.transpose(1, 2) # (batch, time, feature)
        x = self.birnn_layers(x)
        x = self.classifier(x)
        return x

## The Training and Evaluating Script

In [19]:
class IterMeter(object):
    """keeps track of total iterations"""
    def __init__(self):
        self.val = 0

    def step(self):
        self.val += 1

    def get(self):
        return self.val

In [20]:
def trainer(learning_rate=5e-4, batch_size=20, epochs=10 , train = train , test = test):

    hparams = {
        "n_cnn_layers": 0,
        "n_rnn_layers": 8,
        "rnn_dim": 256,
        "n_class": 35,
        "n_feats": 128,
        "stride":2,
        "dropout": 0.1,
        "learning_rate": learning_rate,
        "batch_size": batch_size,
        "epochs": epochs
    }

    use_cuda = torch.cuda.is_available()
    torch.manual_seed(7)
    device = torch.device("cuda" if use_cuda else "cpu") 

    id_to_txttrain = {}  
    wavtrain = glob("{}/**/*.wav".format(train), recursive=True)    
    txttrain = glob("{}/**/*.txt".format(train), recursive=True)

    
    for i in range(len(txttrain)):
        txttrain[i] = txttrain[i].strip().split("\\")[1].split(".")[0]
        s = txttrain[i] + ".txt"
        with open(os.path.join(train , s), encoding="utf-8") as f:
            for line in f:              
                id_to_txttrain[txttrain[i]] = line


    spectrograms, labels, input_lengths, label_lengths = data_processing(wavtrain , id_to_txttrain)
    train = Speech_Dataset(spectrograms, labels, input_lengths, label_lengths)
    train_loader = DataLoader(dataset=train,batch_size=hparams['batch_size'],shuffle=False)
    
    
    model = SpeechRecognitionModel(
        hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
        hparams['n_class'], hparams['n_feats'], hparams['stride'], hparams['dropout']
        ).to(device)
    
    
    print(model)
    print('Num Model Parameters', sum([param.nelement() for param in model.parameters()]))

    optimizer = optim.AdamW(model.parameters(), hparams['learning_rate'])
    criterion = nn.CTCLoss(blank=34).to(device)
    scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=hparams['learning_rate'], 
                                            steps_per_epoch=int(len(train_loader)),
                                            epochs=hparams['epochs'],
                                            anneal_strategy='linear')
    
    iter_meter = IterMeter()
    
    
    for epoch in range(1, epochs + 1):
        print("train")    
        model.train()        
        data_len = len(train_loader.dataset)
        for batch_idx, _data in enumerate(train_loader):
            spectrograms, labels, input_lengths, label_lengths = _data 
            spectrograms, labels = spectrograms.to(device), labels.to(device)
            optimizer.zero_grad()

            output = model(spectrograms)  # (batch, time, n_class)
            output = F.log_softmax(output, dim=2)
            output = output.transpose(0, 1) # (time, batch, n_class)
                
            input_lengths = input_lengths.tolist()
            label_lengths = label_lengths.tolist()

            (label_lengths)
                
            loss = criterion(output, labels, input_lengths, label_lengths)
            loss.backward()

            optimizer.step()
            scheduler.step()
            iter_meter.step()
        print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                        epoch, (batch_idx + 1) * batch_size, data_len,
                        100. * (batch_idx + 1) / len(train_loader), loss.item()))
        
        all_train_loss.append(float(loss.item()))
        

        torch.save(model.state_dict(), "mdl.pt")    

In [21]:
!nvidia-smi

Tue Jan  2 15:10:29 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 546.33                 Driver Version: 546.33       CUDA Version: 12.3     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                     TCC/WDDM  | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  NVIDIA GeForce RTX 3050 ...  WDDM  | 00000000:01:00.0 Off |                  N/A |
| N/A   41C    P0              10W /  60W |      0MiB /  4096MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

start = time.time()

In [22]:
learning_rate = 5e-4
batch_size = 4
epochs = 300



trainer(learning_rate, batch_size, epochs, train, test)

end = time.time()
print('{:.0f} Dakika, {:.0f} Saniye'.format( int((end - start)/60) , int((end - start)%60)   ) )

In [23]:
def tester(learning_rate=5e-4, batch_size=20, epochs=10 , train = train , test = test):

    hparams = {
        "n_cnn_layers": 0,
        "n_rnn_layers": 8,
        "rnn_dim": 256,
        "n_class": 35,
        "n_feats": 128,
        "stride":2,
        "dropout": 0.1,
        "learning_rate": learning_rate,
        "batch_size": batch_size,
        "epochs": epochs
    }
    use_cuda = torch.cuda.is_available()
    torch.manual_seed(7)
    device = torch.device("cuda" if use_cuda else "cpu") 
    
    model =  SpeechRecognitionModel(
        hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
        hparams['n_class'], hparams['n_feats'], hparams['stride'], hparams['dropout']
        ).to(device)
    
    
    model.load_state_dict( torch.load("mdl.pt") ) 
    
#    print('\nevaluating...')
    
    model.eval()

    id_to_txttest = {}
    wavtest = glob("{}/**/*.wav".format(test), recursive=True)
    txttest = glob("{}/**/*.txt".format(test), recursive=True)
    
    iter_meter = IterMeter()
    
    for i in range(len(txttest)):
        txttest[i] = txttest[i].strip().split("\\")[1].split(".")[0]
        s = txttest[i] + ".txt"
        with open(os.path.join(test , s), encoding="utf-8") as f:
            for line in f:
                id_to_txttest[txttest[i]] = line
    spectrograms, labels, input_lengths, label_lengths = data_processing(wavtest , id_to_txttest)
    test = Speech_Dataset(spectrograms, labels, input_lengths, label_lengths)
    test_loader = DataLoader(dataset=test,batch_size=batch_size,shuffle=False)
    
 
#    test_loss = 0
#    test_cer, test_wer = [], []
    with torch.no_grad():
        for i, _data in enumerate(test_loader):
            spectrograms, labels, input_lengths, label_lengths = _data 
            spectrograms, labels = spectrograms.to(device), labels.to(device)
            output = model(spectrograms)  # (batch, time, n_class)
            output = F.log_softmax(output, dim=2)
            output = output.transpose(0, 1) # (time, batch, n_class)
#            loss = criterion(output, labels, input_lengths, label_lengths)
#            test_loss += loss.item() / len(test_loader)
#            all_test_loss.append(float(loss.item()))

            decoded_preds, decoded_targets = GreedyDecoder(output.transpose(0, 1), labels, label_lengths)
            
            print(decoded_preds)
            
            
    return (''.join(str(i) for i in decoded_preds))
            
            # for j in range(len(decoded_preds)):
            #    print("Hedef")
            #    print(decoded_targets[j])
            #    print("Tahmin")
            #    print(decoded_preds[j])
                # test_cer.append(cer(decoded_targets[j], decoded_preds[j]))
                # test_wer.append(wer(decoded_targets[j], decoded_preds[j]))


#        avg_cer = sum(test_cer)/len(test_cer)
#        # avg_wer = sum(test_wer)/len(test_wer)
#        all_cer.append(avg_cer)
#        print('Test set: Average loss: {:.4f}, Average CER: {:4f}\n'.format(test_loss, avg_cer))

In [24]:
message = tester(learning_rate, batch_size, epochs, train, test)

[' er hamo']


In [25]:
print(message)

 er hamo
