# Pre Process Data 


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install pydub

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1


## Imports 

In [None]:
from pydub import AudioSegment
from pydub.silence import split_on_silence

In [None]:
import os
import shutil
import numpy as np
import random
import librosa
import soundfile as sf
from concurrent.futures import ThreadPoolExecutor

# Duplicate Audio Data 


In [None]:
def duplicate_files_and_folders(source_root, destination_root):
    for root, dirs, files in os.walk(source_root):
        # Get the relative path from the source root
        relative_path = os.path.relpath(root, source_root)

        # Create the destination directory path
        destination_dir = os.path.join(destination_root, relative_path)

        # Create the destination directory if it doesn't exist
        os.makedirs(destination_dir, exist_ok=True)

        # Copy files to the destination directory
        for file in files:
            source_file = os.path.join(root, file)
            destination_file = os.path.join(destination_dir, file)
            shutil.copy2(source_file, destination_file)

        # Duplicate subfolders recursively
        for subdir in dirs:
            source_subdir = os.path.join(root, subdir)
            destination_subdir = os.path.join(destination_dir, subdir)
            duplicate_files_and_folders(source_subdir, destination_subdir)

In [None]:
# source_root = '/content/drive/My Drive/10_languages_media'
# destination_root = '/content/drive/My Drive/10_languages_media_reduce_noise'

# duplicate_files_and_folders(source_root, destination_root)

In [None]:
# source_root = '/content/drive/My Drive/10_languages_media'
# destination_root = '/content/drive/My Drive/10_languages_media_reduce_noise_and_transformations'

# duplicate_files_and_folders(source_root, destination_root)

## Data Transfrom audio

1.   Pitch Shift
2.   Speed
3. gaussian noise
4. reduce noise





In [None]:
def transform_audio(audio_data, sr, n_steps=None, noise_mean=0., noise_std=None, speed_rate=None):
    if n_steps is not None:
        audio_data = librosa.effects.pitch_shift(audio_data, sr=sr, n_steps=n_steps)
    if noise_std is not None:
        noise = np.random.normal(noise_mean, noise_std, audio_data.shape)
        audio_data = np.clip(audio_data + noise, -1., 1.)
    if speed_rate is not None:
        audio_data = librosa.effects.time_stretch(audio_data, rate=speed_rate)
    return audio_data


def convert_mp3_to_wav(file_path):
    sound = AudioSegment.from_mp3(file_path)
    fname_without_ext = os.path.splitext(file_path)[0]
    wav_path = f"{fname_without_ext}.wav"
    sound.export(wav_path, format="wav")
    os.remove(file_path)
    return wav_path


def reduce_noise(audio_file_path):
    audio_file = AudioSegment.from_wav(audio_file_path)
    chunks = split_on_silence(audio_file, min_silence_len=1000, silence_thresh=-20)
    selected_chunks = [chunk for chunk in chunks if len(chunk) > 0]

    if not selected_chunks:
        return

    new_audio = selected_chunks[0]
    for chunk in selected_chunks[1:]:
        new_audio += chunk
    new_audio.export(audio_file_path, format='wav')


def apply_transformations(wav_path, apply_transformations):
    if not apply_transformations:
        return 0

    audio_data, sr = librosa.load(wav_path, sr=None)
    fname_without_ext = os.path.splitext(wav_path)[0]

    random_float = round(random.uniform(0.0001, 0.005), 4)
    audio_data_transformed_noise = transform_audio(audio_data, sr, n_steps=0, noise_mean=0., noise_std=random_float,
                                                   speed_rate=1)
    output_filename = f"{fname_without_ext}_noise_{random_float}.wav"
    sf.write(output_filename, audio_data_transformed_noise, sr)

    random_float = round(random.uniform(1.0, 1.6), 2)
    audio_data_transformed_speed = transform_audio(audio_data, sr, n_steps=0, noise_mean=0., noise_std=0.00,
                                                   speed_rate=random_float)
    output_filename = f"{fname_without_ext}_speed_{random_float}.wav"
    sf.write(output_filename, audio_data_transformed_speed, sr)

    random_float = round(random.uniform(1.0, 3.0), 2)
    audio_data_transformed_pitch = transform_audio(audio_data, sr, n_steps=random_float, noise_mean=0., noise_std=0.00,
                                                   speed_rate=1)
    output_filename = f"{fname_without_ext}_pitch_{random_float}.wav"
    sf.write(output_filename, audio_data_transformed_pitch, sr)

    return 1


def load(rootdir, convert_to_wav=True, reduce_noise_bool=True, apply_transformations_bool=True):
    counter = 0
    audio_files = []
    for dirName, subdirList, fileList in os.walk(rootdir):
        print(f'Found directory: {dirName}')
        for fname in fileList:
            if fname.endswith('.mp3') or fname.endswith('.wav'):
                audio_files.append((os.path.join(dirName, fname), 'train' in dirName.lower()))

    wav_files = []
    with ThreadPoolExecutor() as executor:
        if convert_to_wav:
            mp3_files = [(file, is_train) for file, is_train in audio_files if file.endswith('.mp3')]
            wav_files = list(executor.map(convert_mp3_to_wav, [file for file, _ in mp3_files]))
        if reduce_noise_bool:
            executor.map(reduce_noise, wav_files if wav_files else [file for file, _ in audio_files])
        if apply_transformations_bool:
            wav_files = wav_files if wav_files else audio_files
            wav_files_in_train_dir = [(file, is_train) for file, is_train in wav_files if is_train]
            results = executor.map(lambda file: apply_transformations(file[0], True), wav_files_in_train_dir)
            counter = sum(results)

    print('=============================')
    print(f'Total Data files processed: {counter}')
    print('=============================')

In [None]:
# print('/content/drive/My Drive/10_languages_media/')
# rootDir = '/content/drive/My Drive/10_languages_media/'
# load(rootDir, True, False, False)

# print('/content/drive/My Drive/10_languages_media_reduce_noise')
# rootDir = '/content/drive/My Drive/10_languages_media_reduce_noise'
# load(rootDir, True, True, False)

print('/content/drive/My Drive/10_lang_reupload')
rootDir = '/content/drive/My Drive/10_lang_reupload'
load(rootDir, True, False, False)

/content/drive/My Drive/10_lang_reupload
Found directory: /content/drive/My Drive/10_lang_reupload
Found directory: /content/drive/My Drive/10_lang_reupload/clips_fa
Found directory: /content/drive/My Drive/10_lang_reupload/clips_fa/validate
Found directory: /content/drive/My Drive/10_lang_reupload/clips_fa/test
Found directory: /content/drive/My Drive/10_lang_reupload/clips_fa/train
Found directory: /content/drive/My Drive/10_lang_reupload/clips_th
Found directory: /content/drive/My Drive/10_lang_reupload/clips_th/validate
Found directory: /content/drive/My Drive/10_lang_reupload/clips_th/test
Found directory: /content/drive/My Drive/10_lang_reupload/clips_th/train
Found directory: /content/drive/My Drive/10_lang_reupload/clips_ru
Found directory: /content/drive/My Drive/10_lang_reupload/clips_ru/train
Found directory: /content/drive/My Drive/10_lang_reupload/clips_ru/test
Found directory: /content/drive/My Drive/10_lang_reupload/clips_ru/validate
Found directory: /content/drive/My Dr

In [None]:
import os

def delete_wav_files(root_folder):
    for root, dirs, files in os.walk(root_folder):
        for file in files:
            if file.endswith('.wav'):
                file_path = os.path.join(root, file)
                os.remove(file_path)


def delete_wav_files_recursive(root_folder):
    for root, dirs, files in os.walk(root_folder):
        for file in files:
            if file.endswith('.wav'):
                file_path = os.path.join(root, file)
                os.remove(file_path)
        
        for subdir in dirs:
            subdir_path = os.path.join(root, subdir)
            delete_wav_files_recursive(subdir_path)


root_folder = '/content/drive/My Drive/10_languages_media/'

# # Delete only WAV files in the root folder
# delete_wav_files(root_folder)

# Delete WAV files recursively in the root folder and all subdirectories
delete_wav_files_recursive(root_folder)


KeyboardInterrupt: ignored

## Data Creator txt

In [None]:
def create_path_files(root_folder, output_folder):

    # Create output folder if it doesn't exist
    os.makedirs(output_folder, exist_ok=True)
    
    train_file = os.path.join(output_folder, 'train_10_lang.txt')
    test_file = os.path.join(output_folder, 'test_10_lang.txt')
    validation_file = os.path.join(output_folder, 'validation_10_lang.txt')

    train_subfolders = []
    test_subfolders = []
    validation_subfolders = []
    counter_label = 0

    lang_label_dict = {}

    for folder_lang in os.listdir(root_folder):
        sub_folder = os.path.join(root_folder, folder_lang)
        if os.path.isdir(sub_folder):

            for subsub_folder in os.listdir(sub_folder):
                subsub_folder_path = os.path.join(sub_folder, subsub_folder)
                if os.path.isdir(subsub_folder_path):

                    if subsub_folder == 'train':
                        for root, _, files in os.walk(subsub_folder_path):
                            for file in files:
                                file_path = os.path.join(root, file)
                                path_label = file_path + ' ' + str(counter_label)
                                train_subfolders.append(path_label)
                    elif subsub_folder == 'test':
                        for root, _, files in os.walk(subsub_folder_path):
                            for file in files:
                                file_path = os.path.join(root, file)
                                path_label = file_path + ' ' + str(counter_label)
                                test_subfolders.append(path_label)
                    elif subsub_folder == 'validate':
                        for root, _, files in os.walk(subsub_folder_path):
                            for file in files:
                                file_path = os.path.join(root, file)
                                path_label = file_path + ' ' + str(counter_label)
                                validation_subfolders.append(path_label)
        lang_label_dict[folder_lang] = counter_label
        counter_label += 1

    with open(train_file, 'w') as file:
        file.write('\n'.join(train_subfolders))

    with open(test_file, 'w') as file:
        file.write('\n'.join(test_subfolders))

    with open(validation_file, 'w') as file:
        file.write('\n'.join(validation_subfolders))

    print("======================================")
    print("Txt files updated successfully!")
    print(f'Total labels: {counter_label}')
    print(f'Lang dict: {lang_label_dict}')
    print("======================================")

In [None]:
root_folder_path = '/content/drive/My Drive/10_languages_media_reduce_noise_and_transformations/'
output_folder_path = '/content/drive/My Drive/10_languages_reduce_noise_meta_and_transformations/'
create_path_files(root_folder_path, output_folder_path)

Txt files updated successfully!
Total labels: 10
Lang dict: {'clips_th': 0, 'clips_ru': 1, 'clips_ja': 2, 'clips_ha': 3, 'clips_fa': 4, 'clips_ar': 5, 'clips_es': 6, 'clips_zhCN': 7, 'clips_fr': 8, 'clips_it': 9}
