In [6]:
from pydub import AudioSegment
import os
from tqdm import tqdm
import glob
from collections import defaultdict

In [None]:
stm_path = 'path/to/your/data/main test/JA/Data/data/annot/text/stm' ###
out_path = './output_dir'
audio_path = 'path/to/your/data/main test/JA/Data/data/audio/wav' ###

In [8]:
audio_pnl = os.path.join(audio_path, 'comp-p/nl/silenced')
audio_pvl = os.path.join(audio_path, 'comp-p/vl/silenced')
audio_qnl = os.path.join(audio_path, 'comp-q/nl')
audio_qvl = os.path.join(audio_path, 'comp-q/vl')

audio_root_folders = [
    audio_pnl,
    audio_pvl,
    audio_qnl,
    audio_qvl,
]

stm_pnl = os.path.join(stm_path, 'comp-p/nl')
stm_pvl = os.path.join(stm_path, 'comp-p/vl')
stm_qnl = os.path.join(stm_path, 'comp-q/nl')
stm_qvl = os.path.join(stm_path, 'comp-q/vl')

stm_root_folders = [
    stm_pnl,
    stm_pvl,
    stm_qnl,
    stm_qvl,
]

In [None]:
stm_folders_with_codes = {
    'comp-p/nl': 'pnl',
    'comp-p/vl': 'pvl',
    'comp-q/nl': 'qnl',
    'comp-q/vl': 'qvl',
}

for subfolder, code in stm_folders_with_codes.items():
    full_path = os.path.join(stm_path, subfolder)
    stm_files = glob.glob(os.path.join(full_path, '*.stm'))

    for stm_file in stm_files:
        updated_lines = []
        with open(stm_file, 'r', encoding='utf-8') as f:
            for line in f:
                if line.startswith(';') or not line.strip():
                    updated_lines.append(line)
                    continue
                parts = line.strip().split(maxsplit=6)
                if len(parts) >= 1:
                    utt_id = parts[0]
                    if utt_id.startswith('JA_'):
                        #insert the code after 'JA'
                        new_utt_id = f'JA{code}{utt_id[2:]}'
                        parts[0] = new_utt_id
                        updated_line = ' '.join(parts)
                        updated_lines.append(updated_line + '\n')
                    else:
                        updated_lines.append(line)
                else:
                    updated_lines.append(line)

        #overwrite the STM file with the updated content
        with open(stm_file, 'w', encoding='utf-8') as f:
            f.writelines(updated_lines)

print("STM files successfully updated.")

In [None]:
def silence_machine_speaker(stm_audio_pairs, out_path):
    os.makedirs(out_path, exist_ok=True)

    for stm_folder, audio_folder in stm_audio_pairs:
        stm_files = glob.glob(os.path.join(stm_folder, '*.stm'))
        print(f"Found {len(stm_files)} STM files in {stm_folder}")

        for stm_file in tqdm(stm_files, desc="Processing STM files"):
            with open(stm_file, 'r', encoding='utf-8') as file:
                lines = file.readlines()

            audiofile = ''
            audio = None

            for line in lines:
                if line.startswith(';') or not line.strip():
                    continue

                segments = line.strip().split()
                if len(segments) < 5:
                    continue

                current_utterance = segments[0]
                current_audiofile = current_utterance.split('_u')[0]

                if current_audiofile != audiofile:
                    if audiofile != '' and audio is not None:
                        output_filename = f"{audiofile}.wav"
                        output_path_full = os.path.join(out_path, output_filename)
                        audio.export(output_path_full, format='wav')

                    audiofile = current_audiofile
                    audio_path = os.path.join(audio_folder, f"{audiofile}.wav")
                    if not os.path.exists(audio_path):
                        audio = None
                        continue

                    full_audio = AudioSegment.from_wav(audio_path)
                    channels = full_audio.split_to_mono()
                    audio = channels[0]

                if segments[2] == 'inter_segment_gap' and audio is not None:
                    start = float(segments[3]) * 1000
                    end = float(segments[4]) * 1000
                    silence = AudioSegment.silent(duration=end - start)
                    audio = audio[:int(start)] + silence + audio[int(end):]

            #save final file after loop ends
            if audiofile != '' and audio is not None:
                output_filename = f"{audiofile}.wav"
                output_path_full = os.path.join(out_path, output_filename)
                audio.export(output_path_full, format='wav')

stm_audio_pairs = [
    (stm_pnl, audio_pnl),
    (stm_pvl, audio_pvl),
]

silence_machine_speaker(stm_audio_pairs, out_path)

Found 300 STM files in C:/Users/Topicus/Documents/Datasets/main test/JA/Data/data/annot/text/stm\comp-p/nl


Processing STM files: 100%|██████████| 300/300 [06:45<00:00,  1.35s/it]


Found 193 STM files in C:/Users/Topicus/Documents/Datasets/main test/JA/Data/data/annot/text/stm\comp-p/vl


Processing STM files: 100%|██████████| 193/193 [03:18<00:00,  1.03s/it]


In [None]:
def rename_stm_files(stm_root_folders):
    for folder in stm_root_folders:
        stm_files = glob.glob(os.path.join(folder, '*.stm'))

        folder_parts = folder.split(os.sep)
        folder_type = folder_parts[-1]
        print(folder_type)

        if folder_type.startswith('comp-p'):
            prefix = 'p'
        elif folder_type.startswith('comp-q'):
            prefix = 'q'
        else:
            continue 

        folder_code = os.path.basename(folder).replace('-', '').lower()  #e.g. 'nl', 'vl'

        for stm_file in stm_files:
            base_filename = os.path.splitext(os.path.basename(stm_file))[0]
            base_filename = base_filename.split('_')[1]
            
            new_filename = f"JA{prefix}{folder_code}_{base_filename}.stm"
            new_filepath = os.path.join(folder, new_filename)

            os.rename(stm_file, new_filepath)
            print(f"Renamed {stm_file} to {new_filepath}")

rename_stm_files(stm_root_folders)

In [None]:
def update_stm_filenames(root_folders):
    for folder in root_folders:
        stm_files = glob.glob(os.path.join(folder, '*.stm'))

        for stm_file in stm_files:
            with open(stm_file, 'r', encoding='utf-8') as f:
                lines = f.readlines()

            new_lines = []
            base_filename = os.path.splitext(os.path.basename(stm_file))[0]
            idx = 0

            for line in lines:
                parts = line.strip().split(maxsplit=6) 
                if len(parts) < 6:
                    continue

                original_id, channel, speaker, start, end, label = parts[:6]
                text = parts[6] if len(parts) == 7 else ""

                new_id = f"JA_{base_filename}_u{idx}"
                if speaker == 'inter_segment_gap':
                    new_id += '_SIL'

                new_line = f"{new_id} {channel} {speaker} {start} {end} {label} {text}".rstrip()
                new_lines.append(new_line)
                idx += 1

            with open(stm_file, 'w', encoding='utf-8') as f:
                for line in new_lines:
                    f.write(line + '\n')

            print(f"Updated {stm_file} with {idx} segments.")
            
update_stm_filenames(stm_root_folders)

In [9]:
def get_utterances_stm(stm_path):
    utterances = []
    with open(stm_path, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split(maxsplit=6)
            if len(parts) < 6:
                continue
            file_id, _, speaker, start, end, _ = parts[:6]
            text = parts[6] if len(parts) > 6 else ''
            utterances.append({
                'id': file_id,
                'text': text,
                'from': float(start),
                'to': float(end),
                'speaker': speaker
            })
    return utterances

def get_partial_audio(audio_file_path, from_sec, to_sec):
    audio = AudioSegment.from_wav(audio_file_path)
    audio = audio.set_channels(1).set_frame_rate(16000)
    segment = audio[from_sec * 1000 : (to_sec * 1000)]
    return segment

def write_audio(file_path, audio_segment):
    audio_segment.export(file_path, format="wav")

def collect_audio_and_transcripts(audio_root_folders, stm_root_folders):
    audio_files = []
    transcript_files = []

    for audio_root in audio_root_folders:
        audio_files.extend(glob.glob(os.path.join(audio_root, '*.wav')))
    for stm_root in stm_root_folders:
        transcript_files.extend(glob.glob(os.path.join(stm_root, '*.stm')))

    return audio_files, transcript_files

In [None]:
audio_files, transcript_files = collect_audio_and_transcripts(audio_root_folders, stm_root_folders)

#map stm files by basename for easy lookup
stm_lookup = { os.path.splitext(os.path.basename(p))[0]: p for p in transcript_files }

all_utterances = {}

for af in audio_files:
    c_id = os.path.splitext(os.path.basename(af))[0]
    single_transcript_path = stm_lookup.get(c_id)

    if single_transcript_path is None:
        print(f"Warning: No STM file found for {c_id}")
        continue

    utterances = get_utterances_stm(single_transcript_path)

    for idx, u in enumerate(utterances):
        utt_id = f"{c_id}_u{idx}"
        all_utterances[utt_id] = {
            'file_id': u['id'],
            'text': u['text'],
            'audio_file': af,
            'from_sec': u['from'],
            'to_sec': u['to'],
            'speaker': u['speaker']
        }

print('Writing individual utterance audio files...')
utterance_audio_folder = os.path.join(out_path, 'audio_utterances')
os.makedirs(utterance_audio_folder, exist_ok=True)

for utt_id, utt in tqdm(all_utterances.items()):
    utt_audio = get_partial_audio(utt['audio_file'], utt['from_sec'], utt['to_sec'])
    final_utt_id = utt_id
    if utt['speaker'] == 'inter_segment_gap':
        final_utt_id = f'{utt_id}_SIL'
    write_audio(os.path.join(utterance_audio_folder, f'{final_utt_id}.wav'), utt_audio)

print('Writing reference transcript file...')
reference_transcript_path = os.path.join(out_path, 'JA_reference.stm')
with open(reference_transcript_path, 'w', encoding='utf-8') as f:
    for utt_id, utt in all_utterances.items():
        if len(utt['text']) > 0:
            f.write(f"{utt['file_id']} 1 {utt['speaker']} {utt['from_sec']} {utt['to_sec']} <o,f0,unknown> {utt['text']}\n")
        else:
            f.write(f"({utt['file_id']} 1 'inter_segment_gap' {utt['from_sec']} {utt['to_sec']} )\n")
print('Done!') 

Writing reference transcript file...
Done!
