In [None]:
from parselmouth.praat import run_file, call
from itertools import repeat
import tensorflow as tf
import soundfile as sf
import multiprocessing
from glob import glob
import pandas as pd
import numpy as np
import parselmouth
import pathlib
import librosa
import random
import math
import os
import re


## create augmented files
### Parameters:

- **name_aug**:  
  Name of the augmentation. Should be one of the keywords.

- **data_path**:  
  The data path of the original audios.

- **data_path_aug**:  
  The data path for the augmented audios.

- **aug_perc**:  
  Percentage for augmentation per file.

- **aug_num**:  
  Number of augmented files per original file.

- **aug_len**:  
  Segment length for each augmentation.

- **audio_length**:  
  Length of one segment from the whole audio.


In [None]:
def augment(name_aug, data_path, data_path_aug, aug_perc, aug_num, aug_len, audio_length):
    if aug_perc > 1.0:
        raise Warning("Augmentation percentage should be maximal 1.0!")
    if aug_len > audio_length:
        raise Warning("Augmentation length shoud be shorter than audio length!")
    
    
    if not os.path.exists(data_path_aug):
        os.makedirs(data_path_aug)
    
    all_dialects = glob(data_path + '\\*', recursive = True)
                
    all_speaker = []

    for dialect in all_dialects:
        all_speaker = np.concatenate((all_speaker, glob(dialect + '\\*', recursive = True)), axis=None)

    audios = []       

    for path in all_speaker:
        audios.extend(tf.io.gfile.glob(path + '\\*.wav'))
        new_path = data_path_aug + '\\' + path.split('\\')[-2] + '\\' + path.split('\\')[-1]
        if not os.path.exists(new_path):
            os.makedirs(new_path)

    pool = multiprocessing.Pool(5)
    pool.starmap(augmentation, zip(np.array(audios), repeat(name_aug), repeat(data_path_aug), repeat(aug_perc),
                                   repeat(aug_num), repeat(aug_len), repeat(audio_length)))
                     

In [None]:
def augmentation(audio, name_aug, data_path_aug, aug_perc, aug_num, aug_len, audio_length):
    y, sr = librosa.load(audio, sr=16000, res_type='soxr_vhq')

    for aug_num_cnt in range (0, aug_num):
        y_new = y.copy()
       
        if ('frequency_masking' in name_aug):
            times_total = len(y)//int(audio_length*16000)
            sound = parselmouth.Sound(audio)
            for i in range(0, times_total):
                interval_start_index = int(i * audio_length * 16000)
                bandwidth = random.randint(100, 2500)
                times = random.randint(1, 3)
                intervals_start = generate_intervals(bandwidth, times, 8000)
                interval_start_sec = int(i*audio_length)
                part = call(sound, "Extract part", interval_start_sec, interval_start_sec+audio_length, 'rectangular', 1.0, 'no')
                for interval_start in intervals_start:
                    part = call(part, "Filter (stop Hann band)", interval_start, interval_start+bandwidth, 100)
                y_new[interval_start_index:int(interval_start_index+(16000*audio_length))] = part.values.flatten() #masked_audio_signal 

        new_path = data_path_aug + '\\' + audio.split('\\')[-3] + '\\' + audio.split('\\')[-2] + '\\' + 'aug' + str(aug_num_cnt) + '_' + audio.split('\\')[-1]
        sf.write(new_path, y_new, 16000)


In [5]:
def generate_intervals(length, times, total_len):
    result = []
    # Ensure there's enough space for intervals
    if times * length > total_len:
        raise ValueError("Not enough space for intervals in the given range.")
    
    # Generate 'times' random interval starting points
    end = 0
    for i in range(times):
        old_end = end
        start_tmp = random.randint(0, total_len - ((times-i) * length))
        start = start_tmp + old_end
        end = start + length
        result.append(start)
        # Adjust starting point for the next interval to avoid overlap
        total_len -= start_tmp + length
    return result
