In [42]:
import soundfile as sf
import audiomentations as A
from pathlib import Path
import math

def read_words(path, convert_to_seconds=True, sample_rate=16000):
    words = []
    with open(path) as f:
        for line in f:
            if line == '':
                break
            words.append({
                'word': line.split()[-1].strip(),
                'start': line.split()[0],
                'end': line.split()[1]
            })
    if convert_to_seconds:
        for word in words:
            word['start'] = float(word['start']) / sample_rate
            word['end'] = float(word['end']) / sample_rate
    return words

def middle_words_blankout_augment(audio, sample_rate, words, n_words=1):
    audio = audio.copy()
    middle = len(words) // 2
    start_word = words[middle - math.floor(n_words / 2)]
    end_word = words[middle + math.ceil(n_words / 2) - 1]
    print(f"Blanking out words `{start_word['word']}` to `{end_word['word']}`")
    start = int(start_word['start'] * sample_rate)
    end = int(end_word['end'] * sample_rate)
    audio[start:end] = 0
    return audio

def distort_audio(input_path):
    words = read_words(input_path.replace('.WAV', '.WRD'))
    
    # Load audio file
    audio, sr = sf.read(input_path)
    
    # Apply augmentations
    augmented_audio = middle_words_blankout_augment(audio, sr, words, n_words=3)
    
    # Save augmented audio
    output_path = Path(input_path).with_name(f"{Path(input_path).stem}-aug.wav")
    sf.write(output_path, augmented_audio, samplerate=sr)
    
    return output_path

# Example usage
input_path = "../TIMIT/TRAIN/DR1/FCJF0/SA1.WAV"
output_path = distort_audio(input_path)
print(f"Augmented audio saved to {output_path}")


Blanking out words `suit` to `greasy`
Augmented audio saved to ../TIMIT/TRAIN/DR1/FCJF0/SA1-aug.wav


In [47]:
from pathlib import Path

folder = Path("../TIMIT/TRAIN/DR1/FCJF0/")
for file in folder.glob("*.WAV"):
    distort_audio(str(file))

Blanking out words `one` to `forward`
Blanking out words `from` to `and`
Blanking out words `meeting` to `now`
Blanking out words `to` to `movies`
Blanking out words `suit` to `greasy`
Blanking out words `carry` to `oily`
Blanking out words `permanent` to `their`
Blanking out words `equipment` to `proper`
Blanking out words `her` to `one`
Blanking out words `had` to `mean`


In [82]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.backends.backend_pdf import PdfPages
import os
import whisper
import abc
import numpy as np
import audiomentations as A
from copy import deepcopy

def distort_audio(audio):
    # Define augmentations
    augmentations = A.Compose([
        A.AddGaussianNoise(max_amplitude=0.01, p=1),
        A.ClippingDistortion(p=1),
        A.Gain(p=1),
        A.PeakingFilter(p=1)
    ])
    # Apply augmentations
    augmented_audio = augmentations(samples=audio, sample_rate=16000)
    return augmented_audio

class Augmentation(abc.ABC):

    def __call__(self, audio, sample_rate, transcript):
        return self.call(audio, sample_rate, transcript)

    @abc.abstractmethod
    def call(self, audio, sample_rate, transcript):
        pass

class MiddleWordsBlankoutAugmentation(Augmentation):

    def __init__(self, n_words=1):
        self.n_words = n_words

    def call(self, audio, sample_rate, transcript):
        audio = audio.copy()
        middle = len(transcript) // 2
        start_word = transcript[middle - math.floor(self.n_words / 2)]
        end_word = transcript[middle + math.ceil(self.n_words / 2)]
        start = int(start_word['start'] * sample_rate)
        end = int(end_word['end'] * sample_rate)
        audio[start:end] = 0
        augmented_transcript = transcript[:middle - math.floor(self.n_words / 2)] + \
                                 transcript[middle + math.ceil(self.n_words / 2):]
        return audio, augmented_transcript

class MiddleWordsDistortAugmentation(Augmentation):

    def __init__(self, n_words=1):
        self.n_words = n_words

    def call(self, audio, sample_rate, transcript):
        audio = audio.copy()
        transcript = deepcopy(transcript)
        middle = len(transcript) // 2
        start_word = transcript[middle - math.floor(self.n_words / 2)]
        end_word = transcript[middle + math.ceil(self.n_words / 2)]
        start = int(start_word['start'] * sample_rate)
        end = int(end_word['end'] * sample_rate)
        for i in range(middle-math.floor(self.n_words / 2), middle+math.ceil(self.n_words / 2)):
            transcript[i]['word'] = '#' + transcript[i]['word']
        audio[start:end] = distort_audio(audio[start:end])
        return audio, transcript


def augment_and_transcribe(file_path, model, augmentation=None, gt_transcript=None):
    audio, sr = sf.read(file_path)
    if augmentation:
        audio, gt_transcript = augmentation(audio, sr, gt_transcript)
    result = model.transcribe(np.array(audio, dtype=np.float32), word_timestamps=True)
    return result['segments'][0]['words'], gt_transcript

def plot_transcript(transcript, ax, y_pos, label):
    for word_info in transcript:
        start = word_info['start']
        end = word_info['end']
        word = word_info['word']
        duration = end - start
        rect = patches.Rectangle((start, y_pos), duration, 0.25, edgecolor='black', facecolor='skyblue')
        ax.add_patch(rect)
        ax.text(start + duration / 2, y_pos + 0.2, word, ha='center', va='center', fontsize=8)
    ax.text(0, y_pos + 0.2, label, ha='right', va='center', fontsize=10, fontweight='bold')

def visualize_transcripts(transcripts, labels):
    fig, ax = plt.subplots(figsize=(15, len(transcripts)))
    for i, transcript, label in zip(range(len(transcripts)), transcripts, labels):
        plot_transcript(transcript, ax, i / len(transcripts), label)
    max_duration = max([t[-1]['end'] for t in transcripts])
    ax.set_xlim([0, max_duration])
    ax.set_yticks([])
    ax.set_xlabel('Time (s)')
    ax.set_title('Transcripts Comparison')
    return fig

def generate_visualizations(directory_path, augmentation=None, model_name='tiny.en'):
    model = whisper.load_model(model_name)
    transcripts = []
    labels = []
    file_names = [n for n in os.listdir(directory_path) if n.endswith('.WAV')]
    for i, file_name in enumerate(file_names):
        print(f'[{i+1}/{len(file_names)}] Processing {file_name}')
        file_path = os.path.join(directory_path, file_name)
        wrd_file_path = os.path.join(directory_path, file_name.replace('.WAV', '.WRD'))
        words = read_words(wrd_file_path)
        result, _ = augment_and_transcribe(file_path, model)
        result_aug, transcript_aug = augment_and_transcribe(file_path, model, augmentation, words)
        transcripts.append(result)
        transcripts.append(words)
        transcripts.append(result_aug)
        transcripts.append(transcript_aug)
        labels.append(file_name + ' (whisper)')
        labels.append(file_name.replace('.WAV', '.WRD') + ' (gt)')
        labels.append(file_name + ' (whisper augmented)')
        labels.append(file_name.replace('.WAV', '.WRD') + ' (gt aug)')
    print('Generating visualizations...')
    figs = []
    for i in range(0, len(transcripts), 4):
        fig = visualize_transcripts(transcripts[i:i+4], labels[i:i+4])
        figs.append(fig)
    with PdfPages(f'transcripts_comparison-{model_name}.pdf') as pdf:
        for fig in figs:
            pdf.savefig(fig)
    plt.close('all')

# AUG = MiddleWordsBlankoutAugmentation(n_words=3)
AUG = MiddleWordsDistortAugmentation(n_words=3)

generate_visualizations('../TIMIT/TRAIN/DR1/FCJF0/', AUG)

[1/10] Processing SI1027.WAV
[2/10] Processing SI1657.WAV
[3/10] Processing SX307.WAV




[4/10] Processing SX397.WAV
[5/10] Processing SA1.WAV
[6/10] Processing SA2.WAV
[7/10] Processing SX217.WAV
[8/10] Processing SX37.WAV
[9/10] Processing SI648.WAV
[10/10] Processing SX127.WAV
Generating visualizations...


In [81]:
dir = Path("../TIMIT/TRAIN/DR1/FCJF0/")

for file in dir.glob("*.WAV"):
    audio, sr = sf.read(file)
    words = read_words(str(file).replace('.WAV', '.WRD'))
    augmented_audio, augmented_words = AUG(audio, sr, words)
    output_path = str(file).replace('.WAV', '-aug.wav')
    sf.write(output_path, augmented_audio, samplerate=sr)

