In [4]:
# download lj speech dataset
import os
import subprocess

def download_ljspeech_dataset(output_dir='./data/raw'):
    # Create the output directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Define the URL for the LJSpeech dataset
    url = "https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2"

    # Download the dataset
    subprocess.run(["/opt/homebrew/anaconda3/bin/wget", url, "-O", os.path.join(output_dir, "LJSpeech-1.1.tar.gz")])

    # Extract the dataset
    subprocess.run(["tar", "-xzf", os.path.join(output_dir, "LJSpeech-1.1.tar.gz"), "-C", output_dir])

    print("LJSpeech dataset downloaded and extracted successfully.")


download_ljspeech_dataset()




--2024-10-08 15:06:45--  https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2
Resolving data.keithito.com (data.keithito.com)... 24.199.73.137
Connecting to data.keithito.com (data.keithito.com)|24.199.73.137|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2748572632 (2.6G) [text/plain]
Saving to: './data/raw/LJSpeech-1.1.tar.gz'

     0K .......... .......... .......... .......... ..........  0%  625K 71m36s
    50K .......... .......... .......... .......... ..........  0%  557K 75m57s
   100K .......... .......... .......... .......... ..........  0% 51.7M 50m55s
   150K .......... .......... .......... .......... ..........  0% 42.3M 38m27s
   200K .......... .......... .......... .......... ..........  0%  618K 45m14s
   250K .......... .......... .......... .......... ..........  0% 18.7M 38m5s
   300K .......... .......... .......... .......... ..........  0% 16.2M 33m2s
   350K .......... .......... .......... .......... ..........  0% 26.7M 29m6

LJSpeech dataset downloaded and extracted successfully.


In [28]:
# create a pandas dataframe with the following columns:
# - speaker_id
# - text
# - audio_path
# - duration
# - speaker_id is a unique identifier for each speaker
# - text is the text of the audio
# - audio_path is the path to the audio file
# - duration is the duration of the audio file

import pandas as pd
import csv

metadata = {}

# open the metadata.csv file
with open("./data/raw/LJSpeech-1.1/metadata.csv", "r") as file:
    reader = csv.reader(file)
    #print the number of lines in the file
    #print(len(file.readlines()))
    for row in reader:
        # concat all elements in the row
        unprocessed_row = "".join(row)
        split_row = unprocessed_row.split("|")

        # add a new row to the metadata dictionary
        metadata[split_row[0]] = {
            'audio_path': split_row[0],
            'text': split_row[1],
        }

print(metadata['LJ001-0001'])

# create a pandas dataframe with the following columns:
# - audio_path
# - text

df = pd.DataFrame(metadata.values(), columns=['audio_path', 'text'])
df.head()







{'audio_path': 'LJ001-0001', 'text': 'Printing in the only sense with which we are at present concerned differs from most if not from all the arts and crafts represented in the Exhibition'}


Unnamed: 0,audio_path,text
0,LJ001-0001,Printing in the only sense with which we are a...
1,LJ001-0002,in being comparatively modern.
2,LJ001-0003,For although the Chinese took impressions from...
3,LJ001-0004,produced the block books which were the immedi...
4,LJ001-0005,the invention of movable metal letters in the ...


In [18]:
# print number of entries in our df
print('number of entries in our df: ', len(df))

# for each entry in our df, calculate the max length of the text

number of entries in our df:  10043


In [19]:
# for each entry in our df, calculate the max length of the text
df['text_length'] = df['text'].apply(len)
int(df['text_length'].max())


187

In [20]:
# print number of unique words
unique_words = set()
for text in df['text'].tolist():
    unique_words.update(text.split())
print('number of unique words: ', len(unique_words))

number of unique words:  17724


In [21]:
"""
The follow code will:

- prepare the data

- vectorize the text

- process the audio

- create our dataset
"""

import torch
import torchaudio
import numpy as np
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

# prepare the data

max_target_len = int(df['text_length'].max())

# vectorize the text

class TextVectorizer:
    def __init__(self, data, max_len):
        self.vocab = set(''.join(data['text'].tolist()))
        self.char2idx = {char: i for i, char in enumerate(self.vocab)}
        self.idx2char = {idx: char for char, idx in self.char2idx.items()}
        self.max_len = max_len

    def __call__(self, text):
        vector = [self.char2idx.get(char, len(self.vocab)) for char in text]
        return torch.tensor(vector + [0] * (self.max_len - len(vector)), dtype=torch.long)
    
    def get_vocab(self):
        return self.vocab

text_vectorizer = TextVectorizer(data=df, max_len=max_target_len)
vocab_size = len(text_vectorizer.get_vocab())
print('vocab size: ', len(text_vectorizer.get_vocab()))


# process the audio


vocab size:  88


In [22]:
# process the audio

def path_to_audio(path):  
    path = './data/raw/LJSpeech-1.1/wavs/' + path + '.wav'
    audio, sr = torchaudio.load(path)
    audio = audio.squeeze(0) # what does this do?
    audio = torchaudio.functional.resample(audio, sr, 16000)

    # get the spectrogram
    #spec = torchaudio.transforms.Spectrogram(n_fft=256, hop_length=80, power=2)(waveform)

    # get the log mel spectrogram
    log_mel_spec = torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_fft=256, hop_length=80, power=2)(audio)

    # apply power law compression
    log_mel_spec = torch.pow(log_mel_spec, 0.3)

    # normalize
    log_mel_spec = torchaudio.functional.normalize(log_mel_spec)

    # pad or truncate to fixed length
    target_len = 2754 #why?

    if log_mel_spec.shape[1] < target_len:
        log_mel_spec = torch.nn.functional.pad(log_mel_spec, (0, target_len - log_mel_spec.shape[1]))
    else:
        log_mel_spec = log_mel_spec[:, :target_len]

    # replace NaNs with zeros
    log_mel_spec = torch.nan_to_num(log_mel_spec, nan=0.0)
    return log_mel_spec

# create our dataset

class ASRDataset(Dataset):
    def __init__(self, data, vectorizer):
        self.data = data
        self.vectorizer = vectorizer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        audio = path_to_audio(item['audio_path'])
        text = self.vectorizer(item['text'])
        return {'source': audio, 'target': text}
    
def collate_fn(batch):
    sources = [item['source'] for item in batch]
    targets = [item['target'] for item in batch]

    sources = pad_sequence(sources, batch_first=True)
    targets = pad_sequence(targets, batch_first=True, padding_value=0)
    return {'source': sources, 'target': targets}

# create dataset and dataloaders
split_index = int(len(df) * 0.9)

train_data = df[:split_index]
val_data = df[split_index:]

train_data = ASRDataset(train_data, text_vectorizer)
val_data = ASRDataset(val_data, text_vectorizer)

train_loader = DataLoader(train_data, batch_size=8, collate_fn=collate_fn)
val_loader = DataLoader(val_data, batch_size=8, collate_fn=collate_fn)



In [23]:
# create the transformer model

import torch
import torch.nn as nn
import torch.nn.functional as F

class SpeechFeatureEmbedding(nn.Module):
    def __init__(self, num_hid, maxlen):
        super().__init__()
        self.conv1 = nn.Conv1d(80, num_hid, kernel_size=11, padding=5, bias=False)
        self.conv2 = nn.Conv1d(num_hid, num_hid, kernel_size=11, padding=5, bias=False)
        self.conv3 = nn.Conv1d(num_hid, num_hid, kernel_size=11, padding=5, bias=False)
        self.pos_embedding = nn.Embedding(maxlen, num_hid)

    def forward(self, x):
        x = self.conv1(x.transpose(1, 2))
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = self.conv3(x)
        x = x.transpose(1, 2)
        positions = torch.arange(x.size(1), device=x.device).unsqueeze(0)
        x += self.pos_embedding(positions)
        return x
    
class TokenEmbedding(nn.Module):
    def __init__(self, num_vocab, maxlen, num_hid):
        super().__init__()
        self.emb = nn.Embedding(num_vocab, num_hid)
        self.pos_embedding = nn.Embedding(maxlen, num_hid)

    def forward(self, x):
        x = self.emb(x)
        positions = torch.arange(x.size(1), device=x.device).unsqueeze(0)
        x += self.pos_embedding(positions)
        return x
    
class TransformerEncoder(nn.Module):
    def __init__(self, num_hid, num_head, num_feed_forward):
        super().__init__()
        self.self_attention = nn.MultiheadAttention(num_hid, num_head)
        self.feed_forward = nn.Sequential(
            nn.Linear(num_hid, num_feed_forward),
            nn.ReLU(),
            nn.Linear(num_feed_forward, num_hid)
        )
        self.layernorm1 = nn.LayerNorm(num_hid)
        self.layernorm2 = nn.LayerNorm(num_hid)

    def forward(self, x):
        attn_output, _ = self.self_attention(x, x, x)
        x = self.layernorm1(x + attn_output)
        ff_output = self.feed_forward(x)
        x = self.layernorm2(x + ff_output)
        return x

class TransformerDecoder(nn.Module):
    def __init__(self, num_hid, num_head, num_feed_forward):
        super().__init__()
        self.self_attention = nn.MultiheadAttention(num_hid, num_head)
        self.cross_attention = nn.MultiheadAttention(num_hid, num_head)
        self.feed_forward = nn.Sequential(
            nn.Linear(num_hid, num_feed_forward),
            nn.ReLU(),
            nn.Linear(num_feed_forward, num_hid)
        )
        self.layernorm1 = nn.LayerNorm(num_hid)
        self.layernorm2 = nn.LayerNorm(num_hid)
        self.layernorm3 = nn.LayerNorm(num_hid)

        def forward(self, enc_output, target):
            self_attn_output, _ = self.self_attention(target, target, target)
            x = self.layernorm1(target + self_attn_output)
            cross_attn_output, _ = self.cross_attention(x, enc_output, enc_output)
            x = self.layernorm2(x + cross_attn_output)
            ff_output = self.feed_forward(x)
            x = self.layernorm3(x + ff_output)
            return x

class Transformer(nn.Module):
    def __init__(
            self,
            num_hid=64,
            num_head=2,
            num_feed_forward=128,
            source_maxlen=100,
            target_maxlen=100,
            num_layers_enc=4,
            num_layers_dec=1,
            num_classes=10,
    ):
        super().__init__()
        self.target_maxlen = target_maxlen
        self.num_classes = num_classes

        self.enc_input = SpeechFeatureEmbedding(num_hid=num_hid, maxlen=source_maxlen)
        self.dec_input = TokenEmbedding(
            num_vocab=num_classes,
            maxlen=target_maxlen,
            num_hid=num_hid
        )

        self.encoder = nn.Sequential(
            self.enc_input,
            *[TransformerEncoder(num_hid, num_head, num_feed_forward) for _ in range(num_layers_enc)])
        
        self.decoder_layers = nn.ModuleList([
            TransformerDecoder(num_hid, num_head, num_feed_forward)
            for _ in range(num_layers_dec)
        ])

        self.classifier = nn.Linear(num_hid, num_classes)

    def decode(self, enc_out, target):
        y = self.dec_input(target)
        for dec_layer in self.decoder_layers:
            y = dec_layer(enc_out, y)
        return y

    def forward(self, inputs):
        source, target = inputs
        x = self.encoder(source)
        y = self.decode(x, target)
        return self.classifier(y)
    
    def generate(self, source, target_start_token_idx):
        bs = source.size(0)
        enc = self.encoder(source)
        dec_input = torch.ones((bs, 1), dtyype=torch.long, device=source.devices) * target_start_token_idx

        for i in range(Self.target_maxlen - 1):
            dec_out = self.decode(enc, dec_input)
            logits = self.classifier(dec_out)
            last_logit = logits[:, -1:].argmax(dim=-1)
            dec_input = torch.cat([dec_input, last_logit], dim=1)
        return dec_input

In [24]:
import torch
from torch.optim import Optimizer
from torch.optim.lr_scheduler import _LRScheduler

class CustomSchedule(_LRScheduler):
    def __init__(
            self,
            optimizer: Optimizer,
            init_lr=0.00001,
            lr_after_warmup=0.001,
            final_lr=0.00001,
            warmup_epochs=15,
            decay_epochs=85,
            steps_per_epoch=203,
            last_epoch=1
    ):
        self.init_lr = init_lr
        self.lr_after_warmup = lr_after_warmup
        self.final_lr = final_lr
        self.warmup_epochs = warmup_epochs,
        self.decay_epocs = decay_epochs,
        self.steps_per_epoch = steps_per_epoch
        super().__init__(optimizer, last_epoch)

    def calculate_lr(self, epoch):
        if epoch < self.warmup_epochs:
            return self.init_lr + ((self.lr_after_warmup - self.init_lr) / (self.warmup_epochs - 1) ) * epoch
        else:
            return max(
                self.final_lr,
                self.lr_after_warmup
                - (epoch - self.warmup_epochs)
                * (self.lr_after_warmup - self.final_lr)
                / self.decay_epochs,
            )
    
    def get_lr(self):
        epoch = self.last_epoch // self.steps_per_epoch
        return (self.calculate_lr(epoch) for _ in self.base_lrs)
    


In [26]:
# initialize model

model = Transformer(
    num_hid=200,
    num_head=2,
    num_feed_forward=400,
    target_maxlen=max_target_len,
    num_layers_enc=4,
    num_layers_dec=1,
    num_classes=34
)

# loss function

loss_function = torch.nn.CrossEntropyLoss(label_smoothing=0.1)

# optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.00001)

scheduler = CustomSchedule(
    optimizer,
    init_lr=0.00001,
    lr_after_warmup=0.001,
    final_lr=0.00001,
    warmup_epochs=15,
    decay_epochs=85,
    steps_per_epoch=len(train_loader),
)



KeyError: "param 'initial_lr' is not specified in param_groups[0] when resuming an optimizer"

In [6]:
# Train!

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from tqdm import tqdm
import matplotlib.pyplot as plt

def train_one_epoch(model, train_loader, optimizer, scheduelr, loss_fn, device):
    model.train()
    total_loss = 0

    for batch in tqdm(train_loader, desc="Training"):
        optimizer.zero_grad()
        source = batch['source'].to(device)
        target = batch['target'].to(device)

        output = model(source, target[:, :-1]) # teacher forcing

        loss = loss_fn(output.reshape(-1, output.shape[-1]), target[:, 1:].reshape(-1))

        loss.backward()
        optimizer.step()
        scheduler.step()

        total_loss += loss.item()

    return total_loss / len(train_loader)

def validate(model, val_loader, loss_fn, device):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch in tqdm(val_loader, desc="Validating"):
            source = batch['source'].to(device)
            target = batch['target'].to(device)

            output = model(source, target[:, :-1])
            loss = loss_fn(output.reshape(-1, output.shape[-1]), target[:, 1:].reshape(-1))

            total_loss += loss.item()

    return total_loss / len(train_loader)


# training loop

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model.to(device)

num_epochs = 100
train_losses = []
val_losses = []

for epoch in range(num_epochs):
    print(f"Epoch {epoch+1} / {num_epochs}")
    train_loss = train_one_epoch(model, train_loader, optimizer, scheduler, loss_function, device)
    val_loss = validate(model, val_loader, loss_function, device)

    train_losses.append(train_loss)
    val_losses.append(val_loss)

    print(f"Train loss: {train_loss:.4f}, Validation loss: {val_loss:.4f}")

    # save the model

    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'scheduler_state_dict': scheduler.state_dict(),
        'train_loss': train_loss,
        'val_loss': val_loss,
    }, f"/.speechrecognition_epoch_{epoch+1}.pth")


# plot training history
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label="train loss")
plt.plot(val_losses, label="val losses")
plt.title('training history')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend()
plt.show()


# save the final model
torch.save(model.state_dict(), "./speechrecognition_final.plt")






NameError: name 'model' is not defined

In [None]:
import torch
import torchaudio
from pathlib import Path
import editdistance


# load the trained model

model_path = Path('path/to/model')

model = Transformer(
    num_hid=200,
    num_head=2,
    num_feed_forward=400,
    target_maxlen=max_target_lenm
    num_layers_enc=4,
    num_layers_dec=1,
    num_classes=34
    )

model.load_state_dict(torch.load(model_path))
model.eval() # set the model to evaluation mode


# set the device
device = torch.device("cuda" if torch.cuda.is_available else "cpu")
model = mode.to(device)

# randomly select an audio file
sample = df.iloc[random.randint(0, len(df) - 1)]
audio_path = sample['audio_path']
actual_text = sample['text']

# process an audio file
x = path_to_audio(audio_path)
x = x.unsqueeze(0)
print("input shape: ", x.shape)


# move input to device
x = x.to(device)

# generate a prediction
with torch.no_grad():
    preds = model.generate(x, target_start_token_idx=2) # assuming 2 is your start index

# convert predictions to text
idx_to_char = text_vectorizer.get_vocab()
predicted_text = ""

for idx in preds[0].cpu().numpy():
    if idx == 3: # assuming 3 is your end token
        break
    predicted_text += idx_to_char[idx]

print("audio file: ", audio_path)
print("actual text", actual_text)
print("predicted text", predicted_text)

def edit
# calculate CER
def calculate_cer(reference, hypothesis):
    return editdistance.eval(reference, hypothesis) / len(reference)

cer = calculate_cer(actual_text, predicted_text)
print(f"CER:  {cer:.4f}")

# calculate WER

def calculate_wer(reference, hypothesis):
    ref_words = reference.split()
    hyp_words = hypothesis.split()

    return editdistance.eval(ref_words, hyp_words) / len(ref_words)

wer = calculate_wer(actual_text, predicted_text)
print(f"WER:  {wer:.4f}")