### text preprocessing: 
* lowercase all words 
* remove special characters 
* remove line breaks 
* expand all numbers and other characters: e.g. 18% - muoi tam phan tram, 20/10/2024 (ngay hai muoi thang muoi nam hai ngan khong tram muoi bon)

In [14]:
import re
import os
from vietnam_number import n2w

# Specify the path to the input text file
input_file_path = "./original_transcript/recording1.txt"

# Extract the directory and base filename without extension
directory, filename = os.path.split(input_file_path)
base_filename, _ = os.path.splitext(filename)

# Specify the path for the new processed text file
output_directory = "./original_transcript/processed_transcript"
output_file_path = os.path.join(output_directory, f"{base_filename}.txt")

# Read the contents of the text file
with open(input_file_path, 'r', encoding='utf-8') as file:
    input_text = file.read()

# Task 1: Lowercase all words
lowercased_text = input_text.lower()

# Task 2: Remove all non-letter characters (except numbers, %, and dates)
# Preserve characters with Vietnamese tone marks
non_letter_pattern = r'[^a-zA-Z0-9%\/àáảãạâầấẩẫậăằắẳẵặèéẻẽẹêềếểễệđìíỉĩịòóỏõọôồốổỗộơờớởỡợùúủũụưừứửữựỳýỷỹỵ]+'
letters_and_numbers_text = re.sub(non_letter_pattern, ' ', lowercased_text)

# Task 3: Remove all line breaks
cleaned_text = re.sub(r'\n', ' ', letters_and_numbers_text)

# Task 4: Replace numbers with their full form using vietnam_number
def replace_numbers(match):
    number = match.group(0)
    return n2w(number)

# Replace numbers with their full form
expanded_text = re.sub(r'\b\d+\b', replace_numbers, cleaned_text)

# Task 5: Convert dates
def replace_date(match):
    date = match.group(0)
    day, month, year = date.split('/')
    
    day_text = n2w(day)  # Convert day to written form using vietnam_number
    month_text = [
        'tháng một', 'tháng hai', 'tháng ba', 'tháng tư', 'tháng năm', 'tháng sáu',
        'tháng bảy', 'tháng tám', 'tháng chín', 'tháng mười', 'tháng mười một', 'tháng mười hai'
    ][int(month) - 1]  # Choose the month text
    year_text = 'năm ' + n2w(year)  # Convert year to written form using vietnam_number
    
    return f'ngày {day_text} {month_text} {year_text}'

# Replace dates with the desired format
date_pattern = r'\b\d{1,2}/\d{1,2}/\d{4}\b'
expanded_text = re.sub(date_pattern, replace_date, expanded_text)

# Task 6: Add a space between numbers and "%" symbols
expanded_text = re.sub(r'%', r' phần trăm', expanded_text)

# Save the processed text into a new file
with open(output_file_path, 'w', encoding='utf-8') as file:
    file.write(expanded_text)


### part 1: generate textgrid where the speech segments' transcriptions are empty (i.e. main purpose is to split original wav file into smaller speech segments, the alignment of audio and transcription is not important at this step)
* decrease energy threshold will make the resulting speech segments longer

In [16]:
from praatio import tgio
import soundfile as sf
from auditok import split
import os

# wav: wave signal (a table with all the frames extracted from the signal), sr: sampled rate
wav, sr = sf.read("./speech_corpus/recording1.wav")
# split region into speech segments, default energy_threshold is 50. increase = longer speech segments 
energy_threshold = 10
region = split("./speech_corpus/recording1.wav", energy_threshold=energy_threshold)

# duration in seconds
duration = len(wav)/sr

# Create a directory for the corresponding energy_threshold if it doesn't exist
# output_directory = f"./mfa_data/auditok/{energy_threshold}"
output_directory = "./speech_corpus"
os.makedirs(output_directory, exist_ok=True)

# Initialize blank Textgrid object
tg = tgio.Textgrid()
entryList = []

# The region stores all timestamps in the wav file where there is speech
# You can iterate through the region to get starts and ends of elements (speech segments) and annotate
for elt in region: 
    entry = tgio.Interval(elt.meta.start, elt.meta.end, "*")
    entryList.append(entry)

# "str": name of IntervalTier, 0 - duration: covering the entire duration of the audio
tier = tgio.IntervalTier("str", entryList, 0, duration)
tg.addTier(tier)
output_textgrid = os.path.join(output_directory, "recording1.TextGrid")
tg.save(output_textgrid)


### part 2: using the output from Montreal Force Aligner on the original, long .wav file, we try to populate the transcription of the textgrid created in part 1 with the transcription from the MFA's textgrid, and the original, long transcription file of the .wav audio file

In [17]:
from praatio import tgio
import os

# Define the path to the input TextGrid
input_textgrid_path = "./mfa_aligned_textgrid/recording1.TextGrid"

# Load the TextGrid with individual words
word_tg = tgio.openTextgrid(input_textgrid_path)

# Construct the path to the segmented speech TextGrid with the same base filename and energy_threshold
speech_textgrid_path = f"./split_textgrid/recording1.TextGrid"

# Load the TextGrid with segmented speech regions
speech_tg = tgio.openTextgrid(speech_textgrid_path)

# Extract the base filename (without extension) from the input TextGrid path
base_filename = os.path.splitext(os.path.basename(input_textgrid_path))[0]

# Construct the output folder path with the current energy_threshold
output_folder = f"./combined_textgrid/"

# Create the output folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)

# Construct the output TextGrid path with the same filename
output_textgrid_path = os.path.join(output_folder, f"{base_filename}.TextGrid")

# Create a new TextGrid to store the combined information
combined_tg = tgio.Textgrid()

# Copy the minimum and maximum timestamps from the segmented speech TextGrid
combined_tg.minTimestamp = speech_tg.minTimestamp
combined_tg.maxTimestamp = speech_tg.maxTimestamp

# Initialize a list to store the entries for the new TextGrid
new_entries = []

# Iterate through intervals in the segmented speech TextGrid
for speech_interval in speech_tg.tierDict["str"].entryList:
    speech_start = speech_interval[0]
    speech_end = speech_interval[1]

    # Convert speech_start and speech_end to float
    speech_start_float = float(speech_start)
    speech_end_float = float(speech_end)

    # Find words that fall within the speech segment
    words_in_segment = []
    for word_interval in word_tg.tierDict["words"].entryList:
        word_start = float(word_interval[0])
        word_end = float(word_interval[1])

        # Check if the word_interval overlaps with the speech segment
        if speech_start_float <= word_end and speech_end_float >= word_start:
            words_in_segment.append(word_interval[2])

    # Combine words into a single string
    combined_words = " ".join(words_in_segment)

    # Create a new entry for the combined TextGrid
    new_entry = tgio.Interval(speech_start, speech_end, combined_words)
    new_entries.append(new_entry)

# Create a new tier in the combined TextGrid
combined_tier = tgio.IntervalTier("speech_segments", new_entries, combined_tg.minTimestamp, combined_tg.maxTimestamp)

# Add the new tier to the combined TextGrid
combined_tg.addTier(combined_tier)

# Save the combined TextGrid with the same filename as the input TextGrid
combined_tg.save(output_textgrid_path)


### part 3: split into .wav files and their corresponding .txt files

In [18]:
from praatio import tgio
import soundfile as sf
import os

# Define the paths to the TextGrid and WAV files
textgrid_path = "./combined_textgrid/recording1.TextGrid"
wav_path = "./speech_corpus/recording1.wav"

# Extract the base filename (without extension) from the WAV file
base_filename = os.path.splitext(os.path.basename(wav_path))[0]

# Create the output folder based on the base filename and energy_threshold
output_folder = os.path.join("./asr_corpus", base_filename)
os.makedirs(output_folder, exist_ok=True)

# Load the TextGrid
textgrid = tgio.openTextgrid(textgrid_path)

# Load the WAV file and its sample rate
wav_data, sample_rate = sf.read(wav_path)

# Initialize segment counter
segment_counter = 1

# Iterate through intervals in the TextGrid
for tier_name in textgrid.tierNameList:
    tier = textgrid.tierDict[tier_name]

    for interval in tier.entryList:
        start_time = interval[0]
        end_time = interval[1]
        transcription = interval[2]

        # Check if the interval is non-empty and at least 2 seconds
        if transcription.strip() and end_time - start_time >= 2.0:
            # Extract the segment from the WAV data
            start_sample = int(start_time * sample_rate)
            end_sample = int(end_time * sample_rate)
            segment = wav_data[start_sample:end_sample]

            # Define the filenames for the segment within the subfolder
            wav_filename = os.path.join(output_folder, f"{base_filename}_segment{segment_counter}.wav")
            txt_filename = os.path.join(output_folder, f"{base_filename}_segment{segment_counter}.txt")

            # Save the segment as a WAV file
            sf.write(wav_filename, segment, sample_rate)

            # Save the transcription as a text file
            with open(txt_filename, "w", encoding="utf-8") as txt_file:
                txt_file.write(transcription)

            segment_counter += 1
