In [1]:
import os
os.environ["CUDA_HOME"] = "/usr/local/cuda-12.1.0"
os.environ["PATH"] = f"{os.environ['CUDA_HOME']}/bin:{os.environ['PATH']}"
os.environ["LD_LIBRARY_PATH"] = f"{os.environ['CUDA_HOME']}/lib64:{os.environ['LD_LIBRARY_PATH']}"
os.environ["LD_LIBRARY_PATH"] = f"{os.environ['CUDA_HOME']}/lib:{os.environ['LD_LIBRARY_PATH']}"
os.environ["LD_LIBRARY_PATH"] = f"{os.environ['CUDA_HOME']}/extras/CUPTI/lib64:{os.environ['LD_LIBRARY_PATH']}'"
os.environ["CUDAToolkit_ROOT_DIR"] = f"{os.environ['CUDA_HOME']}"
os.environ["CUDAToolkit_ROOT"] = f"{os.environ['CUDA_HOME']}"

os.environ["CUDA_TOOLKIT_ROOT_DIR"] = f"{os.environ['CUDA_HOME']}"
os.environ["CUDA_TOOLKIT_ROOT"] = f"{os.environ['CUDA_HOME']}"
os.environ["CUDA_BIN_PATH"] = f"{os.environ['CUDA_HOME']}"
os.environ["CUDA_PATH"] = f"{os.environ['CUDA_HOME']}"
os.environ["CUDA_INC_PATH"] = f"{os.environ['CUDA_HOME']}/targets/x86_64-linux"
os.environ["CFLAGS"] = f"-I{os.environ['CUDA_HOME']}/targets/x86_64-linux/include:{os.environ['CFLAGS']}"
os.environ["CUDAToolkit_TARGET_DIR"] = f"{os.environ['CUDA_HOME']}/targets/x86_64-linux"

In [2]:
import torch
import torchaudio
import torchaudio.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

json_path = "../data/openbible_swahili/PSA.json"
audio_path = "../downloads/wavs_16/PSA/PSA_119.wav"

book = audio_path.split("/")[-2]
chapter = int(audio_path.split("/")[-1].replace(".wav", "").split("_")[-1])
verse_number_prefix = f"{book} {chapter}:"

In [3]:
import json
import re
import string
import unicodedata
from unidecode import unidecode
from num2words import num2words

def preprocess_verse(text: str) -> str:
    text = unidecode(text)
    text = unicodedata.normalize('NFKC', text)
    text = text.lower()
    text = text.translate(str.maketrans('', '', string.punctuation))
    text = re.sub(r"\d+", lambda x: num2words(int(x.group(0)), lang="sw"), text)
    text = re.sub("\s+", " ", text)
    return text

def load_transcripts(json_path, verse_number_prefix):
    with open(json_path, "r") as f:
        data = json.load(f)
    # filter by book and chapter
    transcripts = [d["verseText"] for d in data if d["verseNumber"].startswith(verse_number_prefix)]
    verse_ids = [d["verseNumber"] for d in data if d["verseNumber"].startswith(verse_number_prefix)]
    return verse_ids, transcripts

In [4]:
verse_ids, transcripts = load_transcripts(json_path, verse_number_prefix)
verses = [preprocess_verse(v) for v in transcripts]
# insert "*" before every verse for chapter intro or verse number
augmented_verses = ["*"] * len(verses) * 2
augmented_verses[1::2] = verses

words = [verse.split() for verse in verses]
augmented_words = [word for verse in augmented_verses for word in verse.split()]

In [5]:
bundle = torchaudio.pipelines.MMS_FA
model = bundle.get_model(with_star=True).to(device)
LABELS = bundle.get_labels()
DICTIONARY = bundle.get_dict()

In [6]:
chunk_size_s = 15
waveform, sr = torchaudio.load(audio_path)
chunk_size_frames = chunk_size_s * sr
chunks = [waveform[:, i : i + chunk_size_frames] for i in range(0, waveform.shape[1], chunk_size_frames)]

In [7]:
emissions = []

with torch.inference_mode():
    for chunk in chunks:
        emission, _ = model(chunk.to(device))
        emissions.append(emission)

emission = torch.cat(emissions, dim=1)
assert len(DICTIONARY) == emission.shape[2]
num_frames = emission.size(1)

In [8]:
emission.shape

torch.Size([1, 70803, 29])

In [None]:
# probs = torch.softmax(emission, dim=2)
# greedy_path = torch.argmax(probs, dim=-1).squeeze().cpu().numpy()
# predicted_tokens = [LABELS[i] == "*" for i in greedy_path]

In [9]:
def align(emission, tokens):
    targets = torch.tensor([tokens], dtype=torch.int32, device=device)
    alignments, scores = F.forced_align(emission, targets, blank=0)

    alignments, scores = alignments[0], scores[0]  # remove batch dimension for simplicity
    scores = scores.exp()  # convert back to probability
    return alignments, scores

def unflatten(list_, lengths):
    assert len(list_) == sum(lengths)
    i = 0
    ret = []
    for l in lengths:
        ret.append(list_[i : i + l])
        i += l
    return ret

def compute_alignments(emission, transcript, dictionary):
    tokens = [dictionary[char] for word in transcript for char in word]
    alignment, scores = align(emission, tokens)
    token_spans = F.merge_tokens(alignment, scores)
    word_spans = unflatten(token_spans, [len(word) for word in transcript])
    return word_spans

In [10]:
word_spans = compute_alignments(emission, augmented_words, DICTIONARY)

In [11]:
from IPython.display import Audio

def _score(spans):
    return sum(s.score * len(s) for s in spans) / sum(len(s) for s in spans)

def preview_word(waveform, spans, num_frames, transcript, sample_rate=bundle.sample_rate):
    ratio = waveform.size(1) / num_frames
    x0 = int(ratio * spans[0].start)
    x1 = int(ratio * spans[-1].end)
    print(f"{transcript} ({_score(spans):.2f}): {x0 / sample_rate:.3f} - {x1 / sample_rate:.3f} sec")
    segment = waveform[:, x0:x1]
    return Audio(segment.numpy(), rate=sample_rate)

In [12]:
word_only_spans = [spans for spans, word in zip(word_spans, augmented_words) if word != "*"]
assert len(word_only_spans) == sum(len(word) for word in words)

In [13]:
segments, labels = [], []
start = 0
for verse_words in words:
    end = start + len(verse_words)
    verse_spans = word_only_spans[start: end]
    ratio = waveform.size(1) / num_frames
    x0 = int(ratio * verse_spans[0][0].start)
    x1 = int(ratio * verse_spans[-1][-1].end)
    transcript = " ".join(verse_words)
    segment = waveform[:, x0: x1]
    start = end
    segments.append(segment)
    labels.append(transcript)

In [14]:
assert len(segments) == len(verse_ids)

In [15]:
from IPython.display import Audio

idx = 170
print(labels[idx])
Audio(segments[idx].numpy(), rate=bundle.sample_rate)

midomo yangu na ibubujike sifa kwa kuwa unanifundisha maagizo yako


In [16]:
from pathlib import Path
from scipy.io.wavfile import write

chapter_folder = f"{book}_{str(chapter).zfill(3)}"
output_dir = Path(f"../outputs/openbible_swahili/{book}/{chapter_folder}/")
output_dir.mkdir(parents=True, exist_ok=True)

for verse_id, segment, label in zip(verse_ids, segments, labels):
    verse_number = verse_id.split(":")[-1].zfill(3)
    verse_file_name = f"{chapter_folder}_{verse_number}"
    audio_path = (output_dir / verse_file_name).with_suffix(".wav")
    transcript_path = (output_dir / verse_file_name).with_suffix(".txt")
    write(audio_path, bundle.sample_rate, segment.squeeze().numpy())
    with open(transcript_path, "w") as f:
        f.write(label)