<a href="https://colab.research.google.com/github/TReV-89/TReV-89/blob/main/Create_Acholi_TTS_dataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Acholi TTS Dataset preparation
The purpose of this notebook is to create the TTS dataset by aligning the audio files and corresponding sentences.

This notebook is specifically for the Acholi audio files.

If necessary, this will be generalized to handle multiple languages.

In [None]:
!pip install webrtcvad
!pip install boto3

Collecting webrtcvad
  Downloading webrtcvad-2.0.10.tar.gz (66 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m66.2/66.2 kB[0m [31m961.2 kB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: webrtcvad
  Building wheel for webrtcvad (setup.py) ... [?25l[?25hdone
  Created wheel for webrtcvad: filename=webrtcvad-2.0.10-cp310-cp310-linux_x86_64.whl size=73471 sha256=28a736b1a115586161baefd8e3af1185dfd7d2ad15a4f0d5f798a1e0842615a8
  Stored in directory: /root/.cache/pip/wheels/2a/2b/84/ac7bacfe8c68a87c1ee3dd3c66818a54c71599abf308e8eb35
Successfully built webrtcvad
Installing collected packages: webrtcvad
Successfully installed webrtcvad-2.0.10
Collecting boto3
  Downloading boto3-1.28.9-py3-none-any.whl (135 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.7/135.7 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting botocore<1.32.0,>=1.31.9 (from boto3)
  Downlo

In [None]:
import librosa
import soundfile
import numpy as np
import struct
import webrtcvad
import os
import glob
import warnings
from tqdm.notebook import tqdm
from IPython.display import Audio
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
BASE_DIR = '/content/drive/Shareddrives/Sunbird AI/Projects/African Language Technology/Data'
RAW_DATA_DIR = f'{BASE_DIR}/Acholi Voice Over/Acholi Voiced Over Phrases-Polin Owiny'
PROCESSED_DATA_DIR = f'{BASE_DIR}/acholi_processed'

In [None]:
# Get .wav files from google drive folder
def get_wav_files():
  wav_files = glob.glob(RAW_DATA_DIR + '/*.wav')
  wav_files = [os.path.basename(f) for f in wav_files]
  return wav_files

wav_files = get_wav_files()
num_audio_files = len(wav_files)
print(f"Number of audio files: {num_audio_files}")

Number of audio files: 5000


In [None]:
wav_files[0:5]

['Acholi 4006.wav',
 'Acholi 4007.wav',
 'Acholi 4008.wav',
 'Acholi 4009.wav',
 'Acholi 4010.wav']

## Fix filename errors

In [None]:
# Check filenames
def has_expected_filename(filename: str):
  split = filename.strip().split(" ")
  if len(split) != 2:
    return False, f"Split returned {len(split)} values instead of 2", 1

  lang, num = split
  if lang != 'Acholi':
    return False, f"Wrong language {lang}", 2

  if filename != f"{lang} {num}":
    return False, f"Filename is not expected. Got {filename}, expected '{lang} {num}'", 3
  return True, "All good", 0


def get_bad_files():
  bad_files = []
  bad_files_by_err = [[], [], [], []]
  for filename in wav_files:
    is_good, reason, code = has_expected_filename(filename)
    if not is_good:
      bad_files.append((filename, reason, code))
      bad_files_by_err[code].append(filename)
  return bad_files, bad_files_by_err

bad_files, bad_files_by_err = get_bad_files()
print(len(bad_files))

2


In [None]:
bad_files

[('Acholi 941 2.wav', 'Split returned 3 values instead of 2', 1),
 ('Acholi 1996 .wav', 'Split returned 3 values instead of 2', 1)]

In [None]:
# rename bad files
src = f"{RAW_DATA_DIR}/Acholi 941 2.wav"
dst = f"{RAW_DATA_DIR}/Acholi 9412.wav"
os.rename(src, dst)

In [None]:
src = f"{RAW_DATA_DIR}/Acholi 1996 .wav"
dst = f"{RAW_DATA_DIR}/Acholi 1996.wav"
os.rename(src, dst)

In [None]:
wav_files = get_wav_files()

NameError: ignored

In [None]:
# No more filename errors
bad_files, bad_files_by_err = get_bad_files()
print(len(bad_files))

NameError: ignored

## Voice Activity Detection

In [None]:
class Frame(object):
  def __init__(self, bytes, timestamp, duration):
    self.bytes = bytes
    self.timestamp = timestamp
    self.duration = duration

def frame_generator(frame_duration_ms, audio, sample_rate):
  """Splits PCM audio into frames."""
  n = int(sample_rate * (frame_duration_ms / 1000.0) * 2)
  offset = 0
  timestamp = 0.0
  duration = (float(n) / sample_rate) / 2.0
  while offset + n < len(audio):
    yield Frame(audio[offset:offset + n], timestamp, duration)
    timestamp += duration
    offset += n

def waveform_to_pcm_bytes(wave):
  """Convert waveform from numpy array to PCM byte sequence."""
  x = np.int32(wave * 0x7fff)
  x = np.clip(x, -32768, 32767).astype(np.short)
  pcm = struct.pack(f'<{len(x)}h', *list(x))
  return pcm

def trim_speech(input_wav_path, output_wav_path, output_sample_rate = 22050):
  """Remove non-speech from the beginning and end of wav audio."""

  # Read in the original file at 32kHz sample rate
  wave, rate = librosa.load(input_wav_path, sr=32000)

  # Split waveform into 30ms frames, and find which contain speech
  vad = webrtcvad.Vad(3)
  pcm = waveform_to_pcm_bytes(wave)
  frames = frame_generator(30, pcm, rate)
  is_speech = [vad.is_speech(frame.bytes, rate) for frame in frames]

  # Find the first and last frames with speech activity
  speech_frames = np.where(is_speech)[0]
  start_frame = np.min(speech_frames)
  end_frame = np.max(speech_frames)

  # Trim the waveform correspondingly
  start_index = int(start_frame * 0.03 * rate)
  end_index = min(int(end_frame * 0.03 * rate), len(wave))
  trimmed_wave = wave[start_index:end_index]

  if len(trimmed_wave) < 0.5 * rate:
    warnings.warn(f'The output audio being saved to {output_wav_path} is less than 0.5 seconds. There may be an alignment or recording error.')

  # Resample to desired output rate.
  trimmed_wave = librosa.resample(trimmed_wave, orig_sr=rate, target_sr=output_sample_rate)
  soundfile.write(output_wav_path, trimmed_wave, output_sample_rate)


In [None]:
# Carry out the trimming and resampling on all files, with the option to skip any which were already processed.

skip_existing_output_files = True

for f in tqdm(wav_files):
  input_path = os.path.join(RAW_DATA_DIR, f)
  output_path = os.path.join(PROCESSED_DATA_DIR, f)
  if os.path.exists(output_path) and skip_existing_output_files:
    continue
  else:
    trim_speech(input_path, output_path)

# Check we converted everything
processed_wav_files = glob.glob(PROCESSED_DATA_DIR + "/*.wav")
if len(processed_wav_files) != len(wav_files):
  raise ValueError(
      f"The number of processed files ({len(processed_wav_files)}) is not the same as the number of input files ({len(wav_files)})."
  )


  0%|          | 0/5000 [00:00<?, ?it/s]

## Match sentences to audio files