In [1]:
!pip install torch
!pip install torchaudio

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/



### Reference 

- Digital Signal Processing Lecture
https://github.com/spatialaudio/digital-signal-processing-lecture 

- Python for Signal Processing (unipingco)
https://github.com/unpingco/Python-for-Signal-Processing 

- Audio for Deep Learning (남기현님)
https://tykimos.github.io/2019/07/04/ISS_2nd_Deep_Learning_Conference_All_Together/ 

- 오디오 전처리 작업을 위한 연습 (박수철님)
https://github.com/scpark20/audio-preprocessing-practice 

- Musical Applications of Machine Learning
https://mac.kaist.ac.kr/~juhan/gct634/ 

- Awesome audio study materials for Korean (최근우님)
https://github.com/keunwoochoi/awesome-audio-study-materials-for-korean

In [2]:
import os
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim
import torch.nn.functional as F
import torchaudio
import numpy as np

In [3]:
train_dataset = torchaudio.datasets.LIBRISPEECH("./", url="train-clean-100", download=True) 
test_dataset = torchaudio.datasets.LIBRISPEECH("./", url="test-clean", download=True)

  0%|          | 0.00/5.95G [00:00<?, ?B/s]

  0%|          | 0.00/331M [00:00<?, ?B/s]

In [4]:
test_dataset[1]
#소리 데이터, 샘플레이트 

(tensor([[ 0.0010,  0.0011,  0.0009,  ..., -0.0007, -0.0010, -0.0011]]),
 16000,
 'STUFF IT INTO YOU HIS BELLY COUNSELLED HIM',
 1089,
 134686,
 1)

In [5]:
def avg_wer(wer_scores, combined_ref_len):
    return float(sum(wer_scores)) / float(combined_ref_len)


def _levenshtein_distance(ref, hyp):
    """"Levenshtein distance"는 두 시퀀스 간의 차이를 측정하기위한 문자열 메트릭입니다. 
    "Levenshtein distanc"는 한 단어를 다른 단어로 변경하는 데 필요한 최소 한 문자 편집 (대체, 삽입 또는 삭제) 수로 정의됩니다. 
    """
    m = len(ref)
    n = len(hyp)

    # special case
    if ref == hyp:
        return 0
    if m == 0:
        return n
    if n == 0:
        return m

    if m < n:
        ref, hyp = hyp, ref
        m, n = n, m

    # use O(min(m, n)) space
    distance = np.zeros((2, n + 1), dtype=np.int32)

    # initialize distance matrix
    for j in range(0,n + 1):
        distance[0][j] = j

    # calculate levenshtein distance
    for i in range(1, m + 1):
        prev_row_idx = (i - 1) % 2
        cur_row_idx = i % 2
        distance[cur_row_idx][0] = i
        for j in range(1, n + 1):
            if ref[i - 1] == hyp[j - 1]:
                distance[cur_row_idx][j] = distance[prev_row_idx][j - 1]
            else:
                s_num = distance[prev_row_idx][j - 1] + 1
                i_num = distance[cur_row_idx][j - 1] + 1
                d_num = distance[prev_row_idx][j] + 1
                distance[cur_row_idx][j] = min(s_num, i_num, d_num)

    return distance[m % 2][n]


In [6]:

def word_errors(reference, hypothesis, ignore_case=False, delimiter=' '):
    """참조 시퀀스와 가설 시퀀스 사이의 거리를 단어 수준으로 계산합니다.
     : param reference : 참조 문장.
     : param hypothesis : 가설 문장.
     : param ignore_case : 대소 문자 구분 여부.
     : param delimiter : 입력 문장의 구분자.
    """
    if ignore_case == True:
        reference = reference.lower()
        hypothesis = hypothesis.lower()

    ref_words = reference.split(delimiter)
    hyp_words = hypothesis.split(delimiter)

    edit_distance = _levenshtein_distance(ref_words, hyp_words)
    return float(edit_distance), len(ref_words)

In [7]:
def char_errors(reference, hypothesis, ignore_case=False, remove_space=False):
    if ignore_case == True:
        reference = reference.lower()
        hypothesis = hypothesis.lower()

    join_char = ' '
    if remove_space == True:
        join_char = ''

    reference = join_char.join(filter(None, reference.split(' ')))
    hypothesis = join_char.join(filter(None, hypothesis.split(' ')))

    edit_distance = _levenshtein_distance(reference, hypothesis)
    return float(edit_distance), len(reference)

In [8]:
def wer(reference, hypothesis, ignore_case=False, delimiter=' '):
    """Calculate word error rate (WER). 
    WER = (Sw + Dw + Iw) / Nw
    Sw는 대체 된 단어의 수입니다.
    Dw는 삭제 된 단어의 수입니다.
    Iw는 삽입 된 단어의 수입니다.
    Nw는 참조의 단어 수입니다.
    """
    edit_distance, ref_len = word_errors(reference, hypothesis, ignore_case,
                                         delimiter)
    if ref_len == 0:
        raise ValueError("Reference's word number should be greater than 0.")

    wer = float(edit_distance) / ref_len
    return wer

In [9]:
def cer(reference, hypothesis, ignore_case=False, remove_space=False):
    """Calculate charactor error rate (CER). 
        CER = (Sc + Dc + Ic) / Nc
        Sc is the number of characters substituted,
        Dc is the number of characters deleted,
        Ic is the number of characters inserted
        Nc is the number of characters in the reference
    """
    edit_distance, ref_len = char_errors(reference, hypothesis, ignore_case,
                                         remove_space)

    if ref_len == 0:
        raise ValueError("Length of reference should be greater than 0.")

    cer = float(edit_distance) / ref_len
    return cer

In [10]:
class TextTransform:
    """Maps characters to integers and vice versa"""
    def __init__(self):
        char_map_str = """
        ' 0
        <SPACE> 1
        a 2
        b 3
        c 4
        d 5
        e 6
        f 7
        g 8
        h 9
        i 10
        j 11
        k 12
        l 13
        m 14
        n 15
        o 16
        p 17
        q 18
        r 19
        s 20
        t 21
        u 22
        v 23
        w 24
        x 25
        y 26
        z 27
        """
        self.char_map = {}
        self.index_map = {}
        for line in char_map_str.strip().split('\n'):
            ch, index = line.split()
            self.char_map[ch] = int(index)
            self.index_map[int(index)] = ch
        self.index_map[1] = ' '

    def text_to_int(self, text):
        """ Use a character map and convert text to an integer sequence """
        int_sequence = []
        for c in text:
            if c == ' ':
                ch = self.char_map['<SPACE>']
            else:
                ch = self.char_map[c]
            int_sequence.append(ch)
        return int_sequence

    def int_to_text(self, labels):
        """ Use a character map and convert integer labels to an text sequence """
        string = []
        for i in labels:
            string.append(self.index_map[i])
        return ''.join(string).replace('<SPACE>', ' ')

In [11]:
train_audio_transforms = nn.Sequential(
    torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128),
    torchaudio.transforms.FrequencyMasking(freq_mask_param=30),
    torchaudio.transforms.TimeMasking(time_mask_param=100)
)

valid_audio_transforms = torchaudio.transforms.MelSpectrogram()

text_transform = TextTransform()



In [12]:
train_audio_transforms = nn.Sequential(
    torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128),
    torchaudio.transforms.FrequencyMasking(freq_mask_param=30),
    torchaudio.transforms.TimeMasking(time_mask_param=100)
)

valid_audio_transforms = torchaudio.transforms.MelSpectrogram()

text_transform = TextTransform()

def data_processing(data, data_type="train"):
    spectrograms = []
    labels = []
    input_lengths = []
    label_lengths = []
    for (waveform, _, utterance, _, _, _) in data:
        if data_type == 'train':
            spec = train_audio_transforms(waveform).squeeze(0).transpose(0, 1)
        elif data_type == 'valid':
            spec = valid_audio_transforms(waveform).squeeze(0).transpose(0, 1)
        else:
            raise Exception('data_type should be train or valid')
        spectrograms.append(spec)
        label = torch.Tensor(text_transform.text_to_int(utterance.lower()))
        labels.append(label)
        input_lengths.append(spec.shape[0]//2)
        label_lengths.append(len(label))

    spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True).unsqueeze(1).transpose(2, 3)
    labels = nn.utils.rnn.pad_sequence(labels, batch_first=True)

    return spectrograms, labels, input_lengths, label_lengths


In [13]:
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size, n_layers=1,
                 input_dropout_p=0, dropout_p=0,
                 bidirectional=False, rnn_cell='gru', variable_lengths=False):

        self.hidden_size = hidden_size
        self.bidirectional = bidirectional
        self.n_layers = n_layers
        self.dropout_p = dropout_p
        self.variable_lengths = variable_lengths

        if rnn_cell.lower() == 'lstm':
            self.rnn_cell = nn.LSTM
        elif rnn_cell.lower() == 'gru':
            self.rnn_cell = nn.GRU
        else:
            raise ValueError("Unsupported RNN Cell: {0}".format(rnn_cell))

        """
        Copied from https://github.com/SeanNaren/deepspeech.pytorch/blob/master/model.py
        Copyright (c) 2017 Sean Naren
        MIT License
        """
        outputs_channel = 32
        self.conv = nn.Sequential(
            nn.Conv2d(1, outputs_channel, kernel_size=(41, 11), stride=(2, 2), padding=(20, 5)),
            nn.BatchNorm2d(outputs_channel),
            nn.Hardtanh(0, 20, inplace=True),
            nn.Conv2d(outputs_channel, outputs_channel, kernel_size=(21, 11), stride=(2, 1), padding=(10, 5)),
            nn.BatchNorm2d(outputs_channel),
            nn.Hardtanh(0, 20, inplace=True)
        )

        rnn_input_dims = int(math.floor(input_size + 2 * 20 - 41) / 2 + 1)
        rnn_input_dims = int(math.floor(rnn_input_dims + 2 * 10 - 21) / 2 + 1)
        rnn_input_dims *= outputs_channel
        
        self.rnn =  self.rnn_cell(rnn_input_dims, self.hidden_size, self.n_layers, dropout=self.dropout_p, bidirectional=self.bidirectional)


    def forward(self, input_var, input_lengths=None):
        """
        param:input_var: Encoder inputs, Spectrogram, Shape=(B,1,D,T)
        param:input_lengths: inputs sequence length without zero-pad
        """
        output_lengths = self.get_seq_lens(input_lengths)

        x = input_var # (B,1,D,T)
        x, _ = self.conv(x, output_lengths) # (B, C, D, T)
        
        x_size = x.size()
        x = x.view(x_size[0], x_size[1] * x_size[2], x_size[3]) # (B, C * D, T)
        x = x.transpose(1, 2).transpose(0, 1).contiguous() # (T, B, D)

        x = nn.utils.rnn.pack_padded_sequence(x, output_lengths)
        x, h_state = self.rnn(x)
        x, _ = nn.utils.rnn.pad_packed_sequence(x)
        x = x.transpose(0, 1) # (B, T, D)
        return x, h_state


    def get_seq_lens(self, input_length):
        seq_len = input_length
        for m in self.conv.modules():
            if type(m) == nn.modules.conv.Conv2d :
                seq_len = ((seq_len + 2 * m.padding[1] - m.dilation[1] * (m.kernel_size[1] - 1) - 1) / m.stride[1] + 1)

        return seq_len.int()

In [14]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Attention(nn.Module):
    def __init__(self, dec_dim, enc_dim, conv_dim, attn_dim):
        super(Attention, self).__init__()
        self.dec_dim = dec_dim
        self.enc_dim = enc_dim
        self.conv_dim = conv_dim
        self.attn_dim = attn_dim
        self.conv = nn.Conv1d(in_channels=1, out_channels=self.attn_dim, kernel_size=3, padding=1)

        self.W = nn.Linear(self.dec_dim, self.attn_dim, bias=False)
        self.V = nn.Linear(self.enc_dim, self.attn_dim, bias=False)

        self.fc = nn.Linear(attn_dim, 1, bias=True)
        self.b = nn.Parameter(torch.rand(attn_dim))

        self.tanh = nn.Tanh()
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, queries, values, last_attn):
        """
        param:quries: Decoder hidden states, Shape=(B,1,dec_D)
        param:values: Encoder outputs, Shape=(B,enc_T,enc_D)
        param:last_attn: Attention weight of previous step, Shape=(batch, enc_T)
        """
        batch_size = queries.size(0)
        dec_feat_dim = queries.size(2)
        enc_feat_len = values.size(1)

        # conv_attn = (B, enc_T, conv_D)
        conv_attn = torch.transpose(self.conv(last_attn.unsqueeze(dim=1)), 1, 2)

        # (B, enc_T)
        score =  self.fc(self.tanh(
         self.W(queries) + self.V(values) + conv_attn + self.b
        )).squeeze(dim=-1)

        attn_weight = self.softmax(score) 
        # (B, 1, enc_T) * (B, enc_T, enc_D) -> (B, 1, enc_D) 
        context = torch.bmm(attn_weight.unsqueeze(dim=1), values)
        return context, attn_weight

In [15]:
class DecoderRNN(nn.Module):
    def __init__(self, vocab_size, max_len, hidden_size, encoder_size,
                 sos_id, eos_id,
                 n_layers=1, rnn_cell='gru', 
                 bidirectional_encoder=False, bidirectional_decoder=False,
                 dropout_p=0, use_attention=True):

        self.output_size = vocab_size
        self.vocab_size = vocab_size
        self.hidden_size = hidden_size
        self.bidirectional_encoder = bidirectional_encoder
        self.bidirectional_decoder = bidirectional_decoder
        self.encoder_output_size = encoder_size * 2 if self.bidirectional_encoder else encoder_size
        self.n_layers = n_layers
        self.dropout_p = dropout_p
        self.max_length = max_len
        self.use_attention = use_attention
        self.eos_id = eos_id
        self.sos_id = sos_id
        
        if rnn_cell.lower() == 'lstm':
            self.rnn_cell = nn.LSTM
        elif rnn_cell.lower() == 'gru':
            self.rnn_cell = nn.GRU
        else:
            raise ValueError("Unsupported RNN Cell: {0}".format(rnn_cell))

        self.init_input = None
        self.rnn = self.rnn_cell(self.hidden_size + self.encoder_output_size, self.hidden_size, self.n_layers,
                                 batch_first=True, dropout=dropout_p, bidirectional=self.bidirectional_decoder)

        self.embedding = nn.Embedding(self.vocab_size, self.hidden_size)
        self.attention = Attention(dec_dim=self.hidden_size, enc_dim=self.encoder_output_size, conv_dim=1, attn_dim=self.hidden_size)
        self.fc = nn.Linear(self.hidden_size + self.encoder_output_size, self.output_size)

    def forward_step(self, input_var, hidden, encoder_outputs, context, attn_w, function):
        if self.training:
            self.rnn.flatten_parameters()

        batch_size = input_var.size(0)
        dec_len = input_var.size(1)
        enc_len = encoder_outputs.size(1)
        enc_dim = encoder_outputs.size(2)
        embedded = self.embedding(input_var) # (B, dec_T, voc_D) -> (B, dec_T, dec_D)
        embedded = self.input_dropout(embedded)

        y_all = []
        attn_w_all = []
        for i in range(embedded.size(1)):
            embedded_inputs = embedded[:, i, :] # (B, dec_D)
            
            rnn_input = torch.cat([embedded_inputs, context], dim=1) # (B, dec_D + enc_D)
            rnn_input = rnn_input.unsqueeze(1) 
            output, hidden = self.rnn(rnn_input, hidden) # (B, 1, dec_D)

            context, attn_w = self.attention(output, encoder_outputs, attn_w) # (B, 1, enc_D), (B, enc_T)
            attn_w_all.append(attn_w)
            
            context = context.squeeze(1)
            output = output.squeeze(1) # (B, 1, dec_D) -> (B, dec_D)
            context = self.input_dropout(context)
            output = self.input_dropout(output)
            output = torch.cat((output, context), dim=1) # (B, dec_D + enc_D)

            pred = function(self.fc(output), dim=-1)
            y_all.append(pred)

        return y_all, hidden, context, attn_w_all


    def forward(self, inputs=None, encoder_hidden=None, encoder_outputs=None,
                    function=F.log_softmax):
        """
        param:inputs: Decoder inputs sequence, Shape=(B, dec_T)
        param:encoder_hidden: Encoder last hidden states, Default : None
        param:encoder_outputs: Encoder outputs, Shape=(B,enc_T,enc_D)
        """
    
        batch_size = encoder_outputs.size(0)
        inputs = torch.LongTensor([self.sos_id] * batch_size).view(batch_size, 1)
        max_length = self.max_length

        decoder_hidden = None
        context = encoder_outputs.new_zeros(batch_size, encoder_outputs.size(2)) # (B, D)
        attn_w = encoder_outputs.new_zeros(batch_size, encoder_outputs.size(1)) # (B, T)

        decoder_outputs = []
        sequence_symbols = []
        lengths = np.array([max_length] * batch_size)

        def decode(step, step_output):
            decoder_outputs.append(step_output)
            symbols = decoder_outputs[-1].topk(1)[1]
            sequence_symbols.append(symbols)
            eos_batches = symbols.data.eq(self.eos_id)
            if eos_batches.dim() > 0:
                eos_batches = eos_batches.cpu().view(-1).numpy()
                update_idx = ((lengths > step) & eos_batches) != 0
                lengths[update_idx] = len(sequence_symbols)
            return symbols

        decoder_input = inputs[:, 0].unsqueeze(1)
        for di in range(max_length):
            decoder_output, decoder_hidden, context, attn_w = self.forward_step(decoder_input, 
                                                                                decoder_hidden,
                                                                                encoder_outputs,
                                                                                context,
                                                                                attn_w,
                                                                                function=function)
            step_output = decoder_output.squeeze(1)
            symbols = decode(di, step_output)
            decoder_input = symbols

        return decoder_outputs