In [None]:
# Install speechbrain
%%capture
# Local installation
!git clone https://github.com/speechbrain/speechbrain/
%cd /content/drive/MyDrive/CS5647_Project/speechbrain/
!pip install -r requirements.txt
!pip install -e .
!pip install textgrid transformers librosa


In [None]:
import sys

# Add a new path to the PYTHONPATH
sys.path.append('/content/drive/MyDrive/CS5647_Project/speechbrain/')

In [None]:
!pip install speechbrain



In [None]:
#import necessary libraries
from google.colab import drive, files
import torch
import os
import csv
from glob import glob
from textgrid import TextGrid, IntervalTier
from speechbrain.dataio.dataio import read_audio
import re
import copy
from collections import defaultdict

In [None]:
# Mount drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
folder_path = '/content/drive/MyDrive/CS5647_Project'
os.chdir(folder_path)
current_directory = os.getcwd()
print("Current Working Directory after change:", current_directory)

Current Working Directory after change: /content/drive/MyDrive/CS5647_Project


In [None]:
dataset_path = '/content/drive/MyDrive/CS5647_Project/dataset'
speaker_ids = ['ASI', 'RRBI','SVBI','TNI', 'BWC', 'LXC', 'NCC', 'TXHC']
csv_data_path = '/content/drive/MyDrive/CS5647_Project/data/processed_data.csv'

In [None]:
AUDIO_SAMPLE_RATE = 44100
phn_set="/content/drive/MyDrive/CS5647_Project/arpa_phonemes"
def process_arpa_phoneme(path):
    with open(path, 'r') as f:
        lines = f.readlines()
    arpa_phonemes= []
    for line in lines:
        items = line.strip().split()
        arpa_phonemes.append(items[0])
    return arpa_phonemes



In [None]:
ARPA_PHONEMES = process_arpa_phoneme(phn_set)


In [None]:
def is_sil(s: str) -> bool:
    """Test if the input string represents silence.
    Args:
        s: A phoneme label.
    Returns:
        True if is silence, otherwise False.
    """
    if s.lower() in {"sil", "sp", "spn", "pau", ""}:
        return True
    else:
        return False

In [None]:
def normalize_phone(s: str, is_rm_annotation=True, is_phoneme_canonical=True,keep_artificial_sil=False) -> str:
  """Normalize phoneme labels to lower case, stress-free form.
    This will also deal with L2-ARCTIC annotations.
    Args:
        s: A phoneme annotation.
        is_rm_annotation: [optional] Only return the canonical pronunciation if
        set to true, otherwise will keep the annotations.
        is_phoneme_canonical: [optional] If set to true, return canonical phoneme; otherwise
        return perceived phoneme.
        keep_artificial_sil: If true, will keep the artificial sil produced by the way L2ARCTIC was annotated.
                            If false, will not have the sil
                            e.g. when false, 'ah, sil, d' canonical: ah, perceived: None
                                 when true, 'ah, sil, d' canonical: ah, perceived: sil
    Returns:
        Normalized phoneme (canonical pronunciation or with annotations).
  """
  t = s.lower()
  pattern = re.compile(r"[^a-z,]")
  parse_tag = pattern.sub("", t)
  if is_sil(parse_tag):
      return "sil"
  if len(parse_tag) == 0:
      raise ValueError("Input %s is invalid.", s)
  if len(parse_tag.split(",")) == 1:
      if parse_tag.split(",")[0] == 'ax':
          return 'ah'
      else:
          return parse_tag.split(",")[0]
  if is_rm_annotation:
      # This handles the L2-ARCTIC annotations, here we extract the canonical
      # pronunciation
      if keep_artificial_sil:
          if is_phoneme_canonical:
              return parse_tag.split(",")[0]
          else:
              return parse_tag.split(",")[1]
      elif not keep_artificial_sil:
          if is_phoneme_canonical:
              if parse_tag.split(",")[2] in ['s', 'd']:
                  return parse_tag.split(",")[0]
              elif parse_tag.split(",")[2] == 'a':
                  return None
          else:
              if parse_tag.split(",")[2] in ['s', 'a']:
                  return parse_tag.split(",")[1]
              elif parse_tag.split(",")[2] == 'd':
                  return None
  else:
      return parse_tag






In [None]:
def normalize_tier_mark(tier: IntervalTier,
                        mode="NormalizePhoneCanonical", keep_artificial_sil=False) -> IntervalTier:
    """Normalize the marks of an IntervalTier.
    Refer to the code for supported modes.
    Args:
        tier: An IntervalTier object.
        mode: The filter function for each mark in the tier.
    Returns:
        tier: Mark-normalized tier.
    """
    tier = copy.deepcopy(tier)
    tier_out = IntervalTier()
    if mode not in {"NormalizePhoneCanonical",
                    "NormalizePhonePerceived",
                    "NormalizePhoneAnnotation",
                    "NormalizeWord"}:
        raise ValueError("Mode %s is not valid.", mode)
    for each_interval in tier.intervals:
        if mode == "NormalizePhoneCanonical":
            # Only keep the canonical pronunciation.
            p = normalize_phone(each_interval.mark, True, True, keep_artificial_sil)
        elif mode == "NormalizePhonePerceived":
            # Only keep the perceived pronunciation.
            p = normalize_phone(each_interval.mark, True, False, keep_artificial_sil)
        elif mode == "NormalizePhoneAnnotation":
            # Keep the annotations.
            p = normalize_phone(each_interval.mark, False)
        elif mode == "NormalizeWord":
            p = normalize_word(each_interval.mark)

        if p is None:
            continue
        if p == 'ax':
            p = 'ah'
        each_interval.mark = p
        assert p in ARPA_PHONEMES + ["err"], pdb.set_trace()
        tier_out.addInterval(each_interval)
    return tier_out


In [None]:
def tier_to_list(tier):
    return [interval.mark for interval in tier]

In [None]:
def remove_repetitive_sil(phone_list):
    # Filtering out consecutive silences by applying a mask with `True` marking
    # which sils to remove
    # e.g.
    # phone_list          [  "a", "sil", "sil",  "sil",   "b"]
    # ---
    # create:
    # remove_sil_mask   [False,  True,  True,  False,  False]
    # ---
    # so end result is:
    # phone_list ["a", "sil", "b"]

    remove_sil_mask = [True if x == "sil" else False for x in phone_list]

    for i, val in enumerate(remove_sil_mask):
        if val is True:
            if i == len(remove_sil_mask) - 1:
                remove_sil_mask[i] = False
            elif remove_sil_mask[i + 1] is False:
                remove_sil_mask[i] = False

    phone_list = [
        phon for i, phon in enumerate(phone_list) if not remove_sil_mask[i]
    ]
    return phone_list

In [None]:
def get_phonemes(tg, keep_artificial_sil=False, rm_repetitive_sil=True):
    phone_tier = tg.getFirst("phones")
    perceived_phones = normalize_tier_mark(phone_tier, "NormalizePhonePerceived", keep_artificial_sil)
    canonical_phones = normalize_tier_mark(phone_tier, "NormalizePhoneCanonical", keep_artificial_sil)
    canonical_phones = tier_to_list(canonical_phones)
    perceived_phones = tier_to_list(perceived_phones)
    if keep_artificial_sil:
        # when we preserve the artificial sils, the canonical phones and
        # perceived phones should be perfectly aligned
        assert len(canonical_phones) == len(perceived_phones)
    if rm_repetitive_sil:
        canonical_phones = remove_repetitive_sil(canonical_phones)
        perceived_phones = remove_repetitive_sil(perceived_phones)
    return " ".join(canonical_phones), " ".join(perceived_phones)


In [None]:
def process_annotation_data(tg, wav_file, text_file, spkr):
  row_data = {}
  row_data['ID'] = wav_file
  row_data["wav"] = wav_file
  # Reading the signal (to retrieve duration in seconds)
  signal = read_audio(wav_file)
  duration = len(signal) / AUDIO_SAMPLE_RATE
  row_data["duration"] = duration
  row_data["spk_id"] = spkr
  ## To keep original human annotation, set `keep_artifical_sil=True`, `rm_repetitive_sil=False`
  ## this preserve the original alignment within the annotations
  cano_phns_align, perc_phns_align = get_phonemes(tg, keep_artificial_sil=True, rm_repetitive_sil=False)
  row_data["canonical_aligned"] = cano_phns_align
  row_data["perceived_aligned"] = perc_phns_align
  ## To get training target phones, set `keep_artifical_sil=False`, `rm_repetitive_sil=True`
  ## this apply some preprocessing on the perceived phones, i.e. rm artifical and repetitive sil
  _, target_phns = get_phonemes(tg, keep_artificial_sil=False, rm_repetitive_sil=True)
  row_data["perceived_train_target"] = target_phns

  with open(text_file, "r") as reader:
      text = reader.readline()
  row_data["wrd"] = text
  return row_data





In [None]:
def create_csv(base_dir, output_csv):
  print(f"Creating {output_csv}")
  with open(output_csv, mode='w', newline="") as csv_f:
    fieldnames = ["ID", "wav", "duration", "spk_id","canonical_aligned",
                         "perceived_aligned", "perceived_train_target", "wrd"]
    csv_writer = csv.DictWriter(csv_f, fieldnames=fieldnames, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
    csv_writer.writeheader()


    for spkr in speaker_ids:
      wav_dir = os.path.join(base_dir, spkr, 'wav')
      annotation_dir = os.path.join(base_dir, spkr, 'annotation')
      transcript_dir = os.path.join(base_dir, spkr, 'transcript')

      for tg_file in glob(os.path.join(annotation_dir, "*.TextGrid")):
        tg = TextGrid()
        try:
          tg.read(tg_file)
        except ValueError:
          continue
        base_name = os.path.basename(tg_file).split(".")[0]
        wav_file = os.path.join(wav_dir, base_name + ".wav")
        text_file = os.path.join(transcript_dir, base_name + '.txt')
        row_data = process_annotation_data(tg, wav_file, text_file, spkr)
        csv_writer.writerow(row_data)
      print(f"Succescuffly created for {spkr}!!")







In [None]:
def prepare_l2arctic(base_dir, output_csv):
  # if os.path.exists(output_csv):
  #   print(f"CSV file '{output_csv}' already exists. Skipping data preparation.")
  #   return
  # else:
  create_csv(base_dir, output_csv)





In [None]:
prepare_l2arctic(base_dir=dataset_path,output_csv= csv_data_path)

Creating /content/drive/MyDrive/CS5647_Project/data/processed_data.csv
Succescuffly created for ASI!!
Succescuffly created for RRBI!!
Succescuffly created for SVBI!!
Succescuffly created for TNI!!
Succescuffly created for BWC!!
Succescuffly created for LXC!!
Succescuffly created for NCC!!
Succescuffly created for TXHC!!
