# Dataset Cleaning/Testing

In [9]:
print("Exception: operation does not have an identity.")
print("Text: ['---'] Failed to process.")

Exception: operation does not have an identity.
Text: ['---'] Failed to process.


In [2]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pydub import AudioSegment
from tqdm import tqdm
import os
from random import shuffle
from pathlib import Path
import librosa
import numpy as np
import subprocess
from glob import glob
import shutil
import concurrent.futures
from multiprocessing import Pool
from unidecode import unidecode

# Functions
def force_move_dir(root_src_dir, root_dst_dir):
    for src_dir, dirs, files in tqdm(os.walk(root_src_dir), smoothing=0):
        dst_dir = src_dir.replace(root_src_dir, root_dst_dir, 1)
        if not os.path.exists(dst_dir):
            os.makedirs(dst_dir)
        for file_ in files:
            src_file = os.path.join(src_dir, file_)
            dst_file = os.path.join(dst_dir, file_)
            if os.path.exists(dst_file):
                # in case of the src and dst are the same file
                if os.path.samefile(src_file, dst_file):
                    continue
                os.remove(dst_file)
            shutil.move(src_file, dst_dir)


def reset_directory_structure(inputpath, outputpath, verbose=True):
    """delete outputpath, copy folder structure of inputpath to outputpath but no files"""
    assert inputpath != outputpath
    import shutil
    if os.path.exists(outputpath):
        if verbose:
            print("Resetting ",outputpath)
        shutil.rmtree(outputpath)
    def ig_f(dir, files):
        return [f for f in files if os.path.isfile(os.path.join(dir, f))]
    shutil.copytree(inputpath, outputpath, ignore=ig_f)


def even_split(a, n):
    """split array into n seperate evenly sized chunks"""
    n = min(n, len(a)) # if less elements in array than chunks to output, change chunks to array length
    k, m = divmod(len(a), n)
    return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))


def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    # list(chunks([0,1,2,3,4,5,6,7,8,9],2)) -> [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
    for i in range(0, len(lst), n):
        yield lst[i:i + n]


def get_arpadict(dictionary_path):
    print("Running, Please wait...")
    thisdict = {}
    for line in reversed((open(dictionary_path, "r").read()).splitlines()):
        thisdict[unidecode((line.split(" ",1))[0])] = unidecode((line.split(" ",1))[1].strip())
    print("Dictionary Ready.")
    return thisdict


def concat_text(filenames, outpath):
    with open(outpath, 'w') as outfile:
        nan = 0
        for fname in filenames:
            if nan == 1: outfile.write("\n") # add newlines (\n) between each file
            else: nan = 1
            with open(fname) as infile:
                for line in infile:
                    outfile.write(line)


def arpabet(input_path, output_path, start_tokens=True, encoding="utf-8"):
    errored_words = ""
    sym = list(r"""!?,.;:␤"'#-_☺☻♥♦♣♠•◘○◙♂♀♪♫☼►◄↕‼¶§▬↨↑↓→←∟↔▲""") # ␤ = new line
    output_string = ""
    for line in ((open(input_path, "r").read()).splitlines()):
        out = ''
        for word_ in (line.split("|")[1]).split(" "):
            word=word_; end_chars = ''; start_chars = ''
            while any(elem in word for elem in sym) and len(word) > 1:
                if word[-1] in sym: end_chars = word[-1] + end_chars; word = word[:-1]
                elif word[0] in sym: start_chars = start_chars + word[0]; word = word[1:]
                else: break
            try: word_arpa = thisdict[word.upper()]
            except: word_arpa = ''
            if len(word_arpa)!=0: word = "{" + str(word_arpa) + "}"
            out = (out + " " + start_chars + word + end_chars).strip()
        output_string =  output_string + line.split("|")[0] + "|" + out + "␤|" + line.split("|")[2] + "\n"
    text_file = open(output_path, "w", encoding=encoding)
    text_file.write(output_string)
    text_file.close()


def nancy_build_metadata(directory, prompts, SAMPLE_RATE=48000, BIT_DEPTH=2, min_audio_duration=0.8, max_audio_duration=12.0):
    Nancy_lookup = {filename: quote[1:-1].strip() for filename, quote in [line[2:-2].split(" ",1) for line in ((open(prompts, "r").read()).splitlines())]}
    short_clip_count = long_clip_count = total_clip_count = invalid_clip_count = 0
    for audio_file in glob(directory+"/**/*.wav", recursive=True):
        total_clip_count+=1
        if os.stat(audio_file).st_size < int(BIT_DEPTH*SAMPLE_RATE*min_audio_duration): short_clip_count+=1; continue # if file length less than min_audio_duration seconds, skip
        if os.stat(audio_file).st_size > int(BIT_DEPTH*SAMPLE_RATE*max_audio_duration): long_clip_count+=1; continue #  if file length greater than max_audio_duration seconds, skip
        
        audio_filename = "/".join(audio_file.split("/")[-1:])
        if audio_filename.replace(".wav","") in Nancy_lookup.keys():
            quote = Nancy_lookup[audio_filename.replace(".wav","")]
            quote = unidecode(quote)
            timestamp = "00_00_00"
            voice = "(Audiobook) Blizzard2011_"+"Nancy"
            emotions = []
            noise_level = ""
            
            if voice.title() in list(metadata.keys()):
                metadata[str(voice).title()].append({"file_path": audio_file, "timestamp": timestamp, "emotions": emotions, "noise_level": noise_level, "quote": quote})
            else:
                metadata[str(voice).title()] = [{"file_path": audio_file, "timestamp": timestamp, "emotions": emotions, "noise_level": noise_level, "quote": quote}]
        else:
            print(f"{audio_file} Has no Quote.")
    print(str(total_clip_count)+" Total Clips")
    print(str(short_clip_count)+" clips are too short")
    print(str(long_clip_count)+" clips are too long")
    print(str(invalid_clip_count)+" clips are invalid (bad filename or missing quote)")
    print(str(total_clip_count-(short_clip_count+long_clip_count+invalid_clip_count))+" clips written into metadata dict")


# metadata["celestia"] =  [{file_path: "", timestamp: "00_00_05", emotions: ["neutral"], noise_level: "", quote = "Once upon a time."}, .... , ....]
def build_metadata(directory, ignore_dirs=["Noise samples",], SAMPLE_RATE=48000, BIT_DEPTH=2, min_audio_duration=0.8, max_audio_duration=12.0): # uses Global "directory" for recursive directory search, for every .wav file it will find the accompanying label and add to metadata.
    short_clip_count = 0
    long_clip_count = 0
    total_clip_count = 0
    invalid_clip_count = 0
    for dir_ in [x[0] for x in os.walk(directory)]: # recursive directory search
        if len(os.listdir(dir_)) < 1: continue
        for filename in os.listdir(dir_):
            file_path = os.path.join(dir_,filename)
            if any([ignoredir in file_path for ignoredir in ignore_dirs]): continue
            if filename.endswith(".wav"):
                total_clip_count+=1
                if os.stat(file_path).st_size < int(BIT_DEPTH*SAMPLE_RATE*min_audio_duration): short_clip_count+=1; continue # if file length less than min_audio_duration seconds, skip
                if os.stat(file_path).st_size > int(BIT_DEPTH*SAMPLE_RATE*max_audio_duration): long_clip_count+=1; continue #  if file length greater than max_audio_duration seconds, skip
                # ---- Unique DATASETs ----
                if "Anons/PostalDude2" in file_path:
                    timestamp = "00_00_00"
                    voice = "postal dude"
                    emotions = []
                    noise_level = ""
                    filename_quote = None # missing question marks
                elif "Anons/Rise Kujikawa" in file_path:
                    timestamp = "00_00_00"
                    voice = "Rise Kujikawa"
                    emotions = []
                    noise_level = ""
                    filename_quote = None # missing question marks
                # ---- Unique DATASETs ----
                # ---- VCTK and Clipper Sliced DATASET ----
                else:
                    splitted = filename.split("_")
                    try: # if Clippers MLP dataset
                        timestamp = "_".join(splitted[0:3])          # 00_00_05
                        voice = "(Show) My Little Pony_"+splitted[3].title()# celestia
                        
                        if "Other/Star Trek (John de Lancie, Discord)" in file_path:
                            voice = "(Show) Star Trek_"+"Q"
                        elif "Other/Eli, Elite Dangerous (John de Lancie, Discord)" in file_path:
                            voice = "(Game) Elite Dangerous_"+"Eli"
                        elif "Other/A Little Bit Wicked (Kristin Chenoworth, Skystar)" in file_path:
                            voice = "(Audiobook) A Little Bit Wicked_"+splitted[3].title()
                        elif "Other/Sum - Tales From the Afterlives (Emily Blunt, Tempest)" in file_path:
                            voice = "(Audiobook) Sum - Tales From the Afterlives_"+splitted[3].title()
                        elif "Other/Dr. Who" in file_path:
                            voice = "(Audiobook) Dr. Who_"+splitted[3].title()
                        elif "Other/Dan vs" in file_path:
                            voice = "(Show) Dan vs_"+splitted[3].title()
                        elif "Other/TFH" in file_path:
                            voice = "(Game) Them's Fightin' Herds_"+splitted[3].title()
                        elif "/FoE s1e01 Radioplay" in file_path:
                            voice = "(Audiodrama) Fallout Equestria_"+splitted[3].title()
                        elif "/FoE s1e02 Radio Play" in file_path:
                            voice = "(Audiodrama) Fallout Equestria_"+splitted[3].title()
                        elif "/Songs" in file_path:
                            voice = "(Music) My Little Pony_"+splitted[3].title()
                        elif "Anons/Blaze" in file_path:
                            voice = "(Game) Sonic_"+splitted[3].title()
                        
                        emotions = splitted[4].lower().split(" ")   # neutral
                        noise_level = splitted[5].lower()           # "" = clean, "noisy" = Noisy, "very noisy" = Very Noisy
                        filename_quote = unidecode(splitted[6]) # missing question marks
                    except:
                        if (os.path.basename(dir_).startswith("p")): # if VCTK then use this
                            emotions = []; noise_level = ""; timestamp = "00_00_00"; voice = "VCTK (News Extracts)_"+os.path.basename(dir_)
                        else:
                            print("'"+file_path+"' is not a valid filename")
                            invalid_clip_count+=1; continue
                # ---- VCTK and Clipper Sliced DATASET ----
                try:
                    try:
                        with open(os.path.join(dir_,filename.replace(".wav",".txt")), 'r', encoding="utf-8") as file:
                            txt_quote = unidecode(file.read().replace('\n', '')) # Once upon a time.
                    except:
                        with open(os.path.join(dir_,filename.replace(".wav",".txt")), 'r', encoding="latin-1") as file:
                            txt_quote = unidecode(file.read().replace('\n', '')) # Once upon a time.
                except Exception as ex:
                    print(ex)
                    invalid_clip_count+=1; continue
                if voice.title() in list(metadata.keys()):
                    metadata[str(voice).title()].append({"file_path": file_path, "timestamp": timestamp, "emotions": emotions, "noise_level": noise_level, "quote": txt_quote})
                else:
                    metadata[str(voice).title()] = [{"file_path": file_path, "timestamp": timestamp, "emotions": emotions, "noise_level": noise_level, "quote": txt_quote}]
            else:
                continue
    print(str(total_clip_count)+" Total Clips")
    print(str(short_clip_count)+" clips are too short")
    print(str(long_clip_count)+" clips are too long")
    print(str(invalid_clip_count)+" clips are invalid (bad filename or missing TXT)")
    print(str(total_clip_count-(short_clip_count+long_clip_count+invalid_clip_count))+" clips written into metadata dict")


def write_datasets(speaker_id = 0, permitted_noise_levels = [""], minimum_clips=3, start_token="", end_token="", percentage_training_data=0.96): 
    print("Filtering Metadata for desired files")
    multi_speaker_lines = []
    speaker_ids = []
    speaker_ids_done = []
    for voice in list(metadata.keys()):
        meta = metadata[voice] # meta == [{file_path: "", timestamp: "00_00_05", emotions: ["neutral"], noise_level: "", quote = "Once upon a time."}, .... , ....]
        if len(meta) < minimum_clips: continue # ignore voices with less than 3 clips of audio
        single_speaker_lines = []
        for clip in meta:
            if (clip["noise_level"] in permitted_noise_levels):
                single_speaker_lines.append(clip["file_path"]+"|"+start_token+clip["quote"]+end_token)
                multi_speaker_lines.append (clip["file_path"]+"|"+start_token+clip["quote"]+end_token+"|"+str(speaker_id))
                if speaker_id not in speaker_ids_done: speaker_ids_done.append(speaker_id); speaker_ids.append(f"|{voice}|{speaker_id}")
        speaker_id+=1 # next speaker_id for next voice
    # shuffle stuff
    shuffled_multi_speaker_lines = multi_speaker_lines
    shuffle(shuffled_multi_speaker_lines)
    num_clips = len(shuffled_multi_speaker_lines)
    train_end = int(num_clips * percentage_training_data)
    train_arr = shuffled_multi_speaker_lines[:train_end]; validation_arr = shuffled_multi_speaker_lines[train_end:]
    
    # also make unshuffled stuff (sorted by speaker_id)
    unshuffled_multi_speaker_lines = []
    for i in range(len(list(metadata.keys()))):
        for line in multi_speaker_lines:
            if line.split("|")[2] == str(i): unshuffled_multi_speaker_lines.append(line)
    # Write all this crap to files
    write_files(unshuffled_multi_speaker_lines, speaker_ids, train_arr, validation_arr, output_directory_=DIRECTORY_GLOBAL)


def write_files(multi_speaker_lines, speaker_ids, train_arr, val_arr, output_directory_):
    output_directory = os.path.join(output_directory_,"filelists")
    print(f"Writing Metadata files to {output_directory}.\nPlease Wait.")
    if not os.path.exists(output_directory): os.makedirs(output_directory)
    
    # generate files of speaker ID's
    text_file = open(os.path.join(output_directory,"speaker_ids.txt"), "w", encoding="utf-8")
    text_file.write("\n".join(speaker_ids)); text_file.close()
    
    # generate mel text dataset metadata
    text_file = open(os.path.join(output_directory,"mel_train_taca2.txt"), "w", encoding="utf-8")
    text_file.write("\n".join(train_arr).replace(".wav|",".npy|")); text_file.close()
    arpabet(os.path.join(output_directory,"mel_train_taca2.txt"),os.path.join(output_directory,"mel_train_taca2_arpa.txt"))
    # generate mel arpabet dataset metadata
    text_file = open(os.path.join(output_directory,"mel_validation_taca2.txt"), "w", encoding="utf-8")
    text_file.write("\n".join(val_arr).replace(".wav|",".npy|")); text_file.close()
    arpabet(os.path.join(output_directory,"mel_validation_taca2.txt"),os.path.join(output_directory,"mel_validation_taca2_arpa.txt"))
    # generate mel merged dataset metadata
    concat_text([os.path.join(output_directory,"mel_train_taca2.txt"), os.path.join(output_directory,"mel_train_taca2_arpa.txt")], os.path.join(output_directory,"mel_train_taca2_merged.txt"))
    concat_text([os.path.join(output_directory,"mel_validation_taca2.txt"), os.path.join(output_directory,"mel_validation_taca2_arpa.txt")], os.path.join(output_directory,"mel_validation_taca2_merged.txt"))
    print("mel filepaths ready")
    
    # generate text dataset metadata
    text_file = open(os.path.join(output_directory,"unshuffled_taca2.txt"), "w", encoding="utf-8")
    text_file.write("\n".join(multi_speaker_lines)); text_file.close()
    
    # generate text dataset metadata
    text_file = open(os.path.join(output_directory,"train_taca2.txt"), "w", encoding="utf-8")
    text_file.write("\n".join(train_arr)); text_file.close()
    arpabet(os.path.join(output_directory,"train_taca2.txt"),os.path.join(output_directory,"train_taca2_arpa.txt"))
    # generate arpabet dataset metadata
    text_file = open(os.path.join(output_directory,"validation_taca2.txt"), "w", encoding="utf-8")
    text_file.write("\n".join(val_arr)); text_file.close()
    arpabet(os.path.join(output_directory,"validation_taca2.txt"),os.path.join(output_directory,"validation_taca2_arpa.txt"))
    # generate merged dataset metadata
    concat_text([os.path.join(output_directory,"train_taca2.txt"), os.path.join(output_directory,"train_taca2_arpa.txt")], os.path.join(output_directory,"train_taca2_merged.txt"))
    concat_text([os.path.join(output_directory,"validation_taca2.txt"), os.path.join(output_directory,"validation_taca2_arpa.txt")], os.path.join(output_directory,"validation_taca2_merged.txt"))
    
    print("Finished!")


def convert_dir_to_wav(directory, SAMPLE_RATE=48000, BIT_DEPTH=2, ignore_dirs=["Noise samples"], continue_from=0, skip_existing=False):
    skip = 0
    tqdm.write("Converting flacs to wavs")
    for index, file_path in tqdm(enumerate(glob(directory+"/**/*.flac", recursive=True)), total=len(glob(directory+"/**/*.flac", recursive=True)), smoothing=0): # recursive directory search
        if index < continue_from: continue
        if skip_existing and os.path.exists(file_path.replace(".flac",".wav")): continue
        for filter_dir in ignore_dirs:
            if filter_dir in file_path: tqdm.write("Skipping "+file_path); skip = 1; break
        if skip: skip = 0; continue
        if file_path.endswith("_mic1.flac"):
            os.rename(file_path,file_path.replace("_mic1.flac",".flac"))
        if file_path.endswith("_mic2.flac"): tqdm.write("Skipping "+file_path); continue
        if file_path.endswith(".flac"):
            #tqdm.write(file_path+" --> "+file_path.replace(".flac",".wav"))
            os.system('flac --decode "'+(file_path.replace("_mic1.flac",".flac"))+'" -f -s')
            #converter.flac2wav(file_path, file_path.replace(".flac",".wav"), "flac", frame_rate=SAMPLE_RATE, sample_width=BIT_DEPTH) # sample_width is bit_depth in bytes eg: 2 = 16 bit audio.


def convert_dir_to_wav_multiprocess(file_paths_arr, SAMPLE_RATE=48000, BIT_DEPTH=2, ignore_dirs=["Noise samples"], continue_from=0, skip_existing=False):
    skip = 0
    for file_path in tqdm(file_paths_arr, smoothing=0.01): # recursive directory search
        if skip_existing and os.path.exists(file_path.replace(".flac",".wav")): continue
        for filter_dir in ignore_dirs:
            if filter_dir in file_path: tqdm.write("Skipping "+file_path); skip = 1; break
        if skip: skip = 0; continue
        if file_path.endswith("_mic1.flac"):
            os.rename(file_path,file_path.replace("_mic1.flac",".flac"))
        if file_path.endswith("_mic2.flac"): tqdm.write("Skipping "+file_path); continue
        if file_path.endswith(".flac"):
            #song = AudioSegment.from_flac(file_path)
            #print("banan")
            #song.export(file_path.replace(".flac",".wav"), format = "wav", bitrate=str(SAMPLE_RATE))
            os.system('flac --decode "'+(file_path.replace("_mic1.flac",".flac"))+'" -f -s')


def set_wavs_to_mono(directory):
    tqdm.write("Setting wavs to mono")
    for file_path in tqdm(glob(directory+"/**/*.wav", recursive=True), smoothing=0): # recursive directory search
        #tqdm.write(file_path)
        sound = AudioSegment.from_wav(file_path)
        if sound.channels > 1:
            sound = sound.set_channels(1)
            sound.export(file_path, format="wav")


def set_wavs_to_mono_multiprocess(file_paths_arr):
    for file_path in file_paths_arr: # recursive directory search
        sound = AudioSegment.from_wav(file_path)
        if sound.channels > 1:
            tqdm.write(file_path)
            sound = sound.set_channels(1)
            sound.export(file_path, format="wav")


def normalize_wav_volumes_mixmode(directory, amplitude=0.08):
    subdirectories = [x[0] for x in os.walk(directory)]
    for subdirectory in subdirectories:
        os.system(f"normalize-audio -a {amplitude} -b '{subdirectory}/'*.wav")

def pad_wavs():
    tqdm.write("Padding Wavs")
    os.system("powershell '/media/cookie/Samsung PM961/Pad_Dataset.ps1'")
    tqdm.write("Replacing wavs with Padded wavs")
    force_move_dir("/media/cookie/Samsung 860 QVO/ClipperDatasetV2_Padded","/media/cookie/Samsung 860 QVO/ClipperDatasetV2")


def fix_wavs(directory, SAMPLE_RATE=48000, BIT_DEPTH=2):
    tqdm.write("Setting Wavs to 48Khz 16 bit Mono")
    #os.system("powershell '/media/cookie/Samsung PM961/Fix_Dataset.ps1'")
    reset_directory_structure(directory, directory.replace("/ClipperDatasetV2","/ClipperDatasetV2_Fixed"))
    for file_path in tqdm(glob(directory+"/**/*.wav", recursive=True), smoothing=0): # recursive directory search
        file_pathB = file_path.replace("/ClipperDatasetV2","/ClipperDatasetV2_Fixed")
        os.system('sox "'+file_path+'" -r '+str(SAMPLE_RATE)+' -c 1 -b '+str(BIT_DEPTH*8)+' "'+file_pathB+'"')
    tqdm.write("Replacing wavs with 48Khz 16 bit Mono wavs")
    force_move_dir("/media/cookie/Samsung 860 QVO/ClipperDatasetV2_Fixed","/media/cookie/Samsung 860 QVO/ClipperDatasetV2")


def clean_VCTK(directory, noiseprof_path, denoise_strength=0.40):
    tqdm.write("Cleaning VCTK Wavs")
#    os.system("powershell '/media/cookie/Samsung PM961/RemoveNoise_Dataset.ps1'") # old Powershell method
    reset_directory_structure(directory, directory.replace("/wav","/wavCleaned"))
    for file_path in tqdm(glob(directory+"/**/*.wav", recursive=True), smoothing=0): # recursive directory search
        os.system('sox "'+file_path+'" "'+(file_path.replace("/wav","/wavCleaned"))+'" noisered "'+noiseprof_path+f'" {denoise_strength}')
    tqdm.write("Replacing wavs with Cleaned wavs")


def clean_VCTK_multiprocess(file_paths_arr, noiseprof_path, denoise_strength=0.40):
    tqdm.write("Cleaning VCTK Wavs")
#    os.system("powershell '/media/cookie/Samsung PM961/RemoveNoise_Dataset.ps1'") # old Powershell method
    for file_path in tqdm(file_paths_arr, smoothing=0.0): # recursive directory search
        os.system('sox "'+file_path+'" "'+(file_path.replace("/wav","/wavCleaned"))+'" noisered "'+noiseprof_path+f'" {denoise_strength}')
    tqdm.write("Replacing wavs with Cleaned wavs")


def AIO_wavs(directory, PADDING_BEFORE_CLIP=0.025, PADDING_AFTER_CLIP=0.025, SAMPLE_RATE=48000, BIT_DEPTH=2):
    tqdm.write("Normalizing, Padding and Fixing Dataset")
    #os.system("powershell '/media/cookie/Samsung PM961/All_in_one_Dataset.ps1'")

    for file_path in tqdm(glob(directory+"/**/*.wav", recursive=True), smoothing=0): # recursive directory search
        file_pathB = file_path.replace("/ClipperDatasetV2","/ClipperDatasetV2_Padded")
        #os.system('normalize-audio "'+file_path+'" -a 0.03125; sox "'+file_path+'" -r '+str(SAMPLE_RATE)+' -c 1 -b '+str(BIT_DEPTH*8)+' "'+file_pathB+'"; sox "'+file_pathB+'" "'+file_path+'" pad '+str(PADDING_BEFORE_CLIP)+' '+str(PADDING_BEFORE_CLIP))
        os.system('sox "'+file_path+'" -r '+str(SAMPLE_RATE)+' -c 1 -b '+str(BIT_DEPTH*8)+' "'+file_pathB+'"; sox "'+file_pathB+'" "'+file_path+'" pad '+str(PADDING_BEFORE_CLIP)+' '+str(PADDING_BEFORE_CLIP))


def AIO_wavs_multiprocess(file_paths_arr, PADDING_BEFORE_CLIP=0.0, PADDING_AFTER_CLIP=0.0125, SAMPLE_RATE=48000, BIT_DEPTH=2):
    for file_path in tqdm(file_paths_arr, smoothing=0.0): # recursive directory search
        file_pathB = file_path.replace("/ClipperDatasetV2","/ClipperDatasetV2_Padded")
        #os.system('normalize-audio "'+file_path+'" -a 0.03125; sox "'+file_path+'" -r '+str(SAMPLE_RATE)+' -c 1 -b '+str(BIT_DEPTH*8)+' "'+file_pathB+'"; sox "'+file_pathB+'" "'+file_path+'" pad '+str(PADDING_BEFORE_CLIP)+' '+str(PADDING_BEFORE_CLIP))
        os.system('sox "'+file_path+'" -r '+str(SAMPLE_RATE)+' -c 1 -b '+str(BIT_DEPTH*8)+' "'+file_pathB+'"; sox "'+file_pathB+'" "'+file_path+'" pad '+str(PADDING_BEFORE_CLIP)+' '+str(PADDING_BEFORE_CLIP))


def trim_wavs(directory, SAMPLE_RATE=48000, top_db=30, window_length=8192, hop_length=128, ref=np.max):
    tqdm.write("Trimming Wavs")
    for file_path in tqdm(glob(directory+"/**/*.wav", recursive=True), smoothing=0): # recursive directory search
        try:
            sound, _ = librosa.core.load(file_path, sr=SAMPLE_RATE)
            trimmed_sound, index = librosa.effects.trim(sound, top_db=top_db, frame_length=window_length, hop_length=hop_length, ref=ref) # gonna be a little messed up for different sampling rates
        except Exception as ex:
            tqdm.write("\n\n"+file_path+" is corrupt"+str(ex)); os.system('rm "'+file_path+'"')
        librosa.output.write_wav(file_path, trimmed_sound, SAMPLE_RATE)


def trim_wavs_multiprocess(file_paths_arr, SAMPLE_RATE=48000, margin_left=0, margin_right=0, top_db=30, window_length=8192, hop_length=128, ref=np.max, preemphasis_strength=0.50):
    for file_path in tqdm(file_paths_arr, smoothing=0.0): # recursive directory search
        try:
            sound, _ = librosa.core.load(file_path, sr=SAMPLE_RATE)
            sound_filt = librosa.effects.preemphasis(sound, coef=preemphasis_strength)
            trimmed_sound, index = librosa.effects.trim(sound_filt, top_db=top_db, frame_length=window_length, hop_length=hop_length, ref=ref) # gonna be a little messed up for different sampling rates
            trimmed_sound = sound[max(index[0]-margin_left,0):index[1]+margin_right]
            print(index)
            if len(sound) != len(trimmed_sound):
                librosa.output.write_wav(file_path, trimmed_sound, SAMPLE_RATE)
            else:
                print(file_path, "does not need trimming")
        except Exception as ex:
            tqdm.write("\n\n"+file_path+" is corrupt"+str(ex)); os.system('rm "'+file_path+'"')


def high_pass_filter_wavs_multiprocess(file_paths_arr, cutoff_freq, strength):
    import soundfile as sf
    import scipy
    from scipy import signal
    
    prev_sr = 0
    for file_path in tqdm(file_paths_arr, smoothing=0.0): # recursive directory search
            audio, sample_rate = sf.read(file_path) # load audio to RAM
            if sample_rate != prev_sr:
                sos = signal.butter(strength, cutoff_freq, 'hp', fs=sample_rate, output='sos') # calcuate filter somethings
                prev_sr = sample_rate
            filtered_audio = signal.sosfilt(sos, audio) # apply filter
            sf.write(file_path, filtered_audio, sample_rate) # write back to disk


def reset_padding(directory, SAMPLE_RATE=48000):
    trim_wavs(directory, SAMPLE_RATE=SAMPLE_RATE, top_db=60, window_length=256)


def multithread_directory_wavs(function, directory, threads=16):
    from random import shuffle
    p = Pool(threads)
    file_paths = glob(directory+"/**/*.wav", recursive=True)
    shuffle(file_paths)
    split_file_paths = list(even_split(file_paths,threads))
    print(p.map(function, split_file_paths))


def multiprocess_directory_wavs(function, directory, threads=16):
    from random import shuffle
    p = Pool(threads)
    file_paths = glob(directory+"/**/*.wav", recursive=True)
    shuffle(file_paths)
    split_file_paths = list(even_split(file_paths,threads))
    #with concurrent.futures.ProcessPoolExecutor() as executor:
    #    print(executor.map(function, split_file_paths))
    print(p.map(function, split_file_paths))


def multiprocess_directory_flacs(function, directory, threads=16):
    from random import shuffle
    p = Pool(threads)
    file_paths = glob(directory+"/**/*.flac", recursive=True)
    shuffle(file_paths)
    split_file_paths = list(even_split(file_paths,threads))
    #with concurrent.futures.ProcessPoolExecutor() as executor:
    #    print(executor.map(function, split_file_paths))
    print(p.map(function, split_file_paths))


def multiprocess_filearray(function, file_paths, threads=16):
    p = Pool(threads)
    split_file_paths = list(even_split(file_paths,threads))
    #with concurrent.futures.ProcessPoolExecutor() as executor:
    #    print(executor.map(function, split_file_paths))
    print(p.map(function, split_file_paths))


def convertWavsToFlac(array, SAMPLE_RATE=48000):
    from pydub import AudioSegment
    for path in array:
        if path.endswith('.wav'):
            song = AudioSegment.from_wav(path)
            song.export(path.replace(".wav",".flac"),format = "flac", bitrate=str(SAMPLE_RATE))


def NancySplitRawIntoClips(directory, label_folder, Nancy_CorpusToArchive, SAMPLE_RATE=96000):
    """
    Take original 96Khz studio files and match them with the filenames used for the quote files.
    """
    Nancy_ignore = {studio: output for output, studio, exception in [line.split("\t") for line in ((open(Nancy_CorpusToArchive, "r").read()).splitlines())] if exception}
    print(Nancy_ignore)
    Nancy_lookup = {studio: output for output, studio, exception in [line.split("\t") for line in ((open(Nancy_CorpusToArchive, "r").read()).splitlines())]}
    
    os.makedirs(os.path.join(directory,'Sliced'), exist_ok=True)
    available_labels = ["/".join(x.split("/")[-1:]) for x in glob(label_folder+"/*.txt")]
    available_audio = glob(directory+"/*.wav")
    for audio_file in available_audio:
        print(audio_file)
        audio_filename = "/".join(audio_file.split("/")[-1:])
        audio_basename = audio_filename.replace(".wav","")
        audio_basename = audio_basename.replace("341_763","343_763") # exception, easier than rewriting entire label file
        ID_offset = int(audio_basename.split("_")[-2]) - 1
        ID_end = int(audio_basename.split("_")[-1]) - 1
        Prepend_ID = "_".join(audio_basename.split("_")[:-2]) # empty unless ARCTIC or LTI files
        if Prepend_ID: Prepend_ID += "_"
        if audio_filename.replace(".wav",".txt") in available_labels:
            label_path = os.path.join(label_folder, audio_filename.replace(".wav",".txt") ) # get label file
            beeps = []
            for line in ((open(label_path, "r").read()).splitlines()):
                beeps+=[line.split("\t")] # [beep_start, beep_stop, ID]
            print("beep count", len(beeps))
            print("ID_offset", ID_offset)
            print("ID_end", ID_end)
            print("end - offset", ID_end-ID_offset)
            assert (len(beeps)-1) == (ID_end-ID_offset), "Ensure each beep is labelled and matches the ArchiveMap"
            sound, _ = librosa.core.load(audio_file, sr=SAMPLE_RATE)
            for i in range(len(beeps)):
                clip_start = int(float(beeps[i][1])*SAMPLE_RATE) # end of previous beep
                clip_end = int(float(beeps[i+1][0])*SAMPLE_RATE) if i+1 < len(beeps) else len(sound) # start of next beep or end of file if no beeps left
                ID = Prepend_ID + str(ID_offset + int(beeps[i][2]))
                if ID in Nancy_ignore.keys(): continue
                print(ID,"-->",Nancy_lookup[ID])
                ID = Nancy_lookup[ID]
                clip_outpath = os.path.join(directory, 'Sliced', ID+".wav")
                sound_clipped = sound[clip_start:clip_end]
                librosa.output.write_wav(clip_outpath, sound_clipped, SAMPLE_RATE)
        else:
            print(audio_file, "doesn't have an available label")


def uniqueElems(mylist):
    used = set()
    return [x for x in mylist if x not in used and (used.add(x) or True)]


def ClipperSplitRawIntoClips(working_dir, source_dir, label_dir, verbose=False, explicit_merge=False, MAX_FILENAME_LENGTH=120):
    """
    Take original MLP Episode files and split them into individual clips.
    Also merge neighbouring clips to create longer clips and merge clips of ponies speaking to each-other.
    
    S1-S8 Episode Audio: https://files.catbox.moe/2q4p2z.torrent
    Clippers MEGA Folder: https://mega.nz/#F!L952DI4Q!nibaVrvxbwgCgXMlPHVnVw!fwYlhK7B
    
    INPUTS:
        working_dir - will contain the outputs and any temp files
        source_dir - should contain original unclipped episode files downloaded from https://files.catbox.moe/2q4p2z.torrent
        label_dir - should contain episode labels from clippers MEGA folder
    RETURNS:
        null
    """
    os.makedirs(os.path.join(working_dir,'Sliced'), exist_ok=True)
    label_paths = glob(label_dir+"/*.txt")
    unclipped_paths = glob(source_dir+"/**/*.wav", recursive=True)
    
    # ---------- LINK THE LABELS TO FILES ----------
    linked_paths = [] # dictionary of labels -> unclipped wavs
    for label_path_ in label_paths:
        label_path = label_path_ # get a temp varable to fuck about with
        if any(True for x in ["_izo.txt","_original.txt","_unmix.txt"] if x in label_path): continue # ignore any izo, orignal or unmix labels.
        if "fim_s01" in label_path:
            episode = label_path.split("fim_s01")[1][:3]
            #print(episode)
            matching_audio = [x for x in unclipped_paths if ("mlp.s01"+episode in x)] # get all audio files that potentially match this label file.
            if len(matching_audio) > 1: print("Too many potential matches:\nmaching_audio = ", matching_audio); raise Exception("Multiple episodes match a label file.")
            if not len(matching_audio): print("label_path_ = ", label_path_); raise Exception("Label has no matching audio file.")
            
            linked_paths.append({label_path_: matching_audio[0]})
            unclipped_paths.remove(matching_audio[0])
            continue
        elif "_outtakes.txt" in label_path:
            continue # for later
        elif "_special source.txt" in label_path:
            continue # for later
        elif "fim_s09" in label_path:
            continue # for later
        elif "fim_s" in label_path: # all seasons other than s01
            episode = label_path.split("fim_s")[1][2:5]
            season = "S"+label_path.split("fim_s")[1][:2]
            s_ep = (season + episode).upper()
            
            matching_audio = [x for x in unclipped_paths if (s_ep in x)] # get all audio files that potentially match this label file.
            if len(matching_audio) > 1: print("Too many potential matches:\nmaching_audio = ", matching_audio); raise Exception("Multiple episodes match a label file.")
            if not len(matching_audio): print("label_path_ = ", label_path_); raise Exception("Label has no matching audio file.")
            linked_paths.append({label_path_: matching_audio[0]})
            unclipped_paths.remove(matching_audio[0])
            continue
    
    if verbose:
        for l in linked_paths:
            print(l, "")
    
    for i, linked_path in enumerate(linked_paths):
        label_path, audio_path = list(linked_path.items())[0]
        base_audio_dirs = []
        labels = []
        for line in ((open(label_path, "r").read()).splitlines()):
            labels+=[line.split("\t")] # [label_start, label_stop, time_voice_emotion_noise_quote]
        
        audio, _ = librosa.core.load(audio_path, sr=SAMPLE_RATE)
        
        # relative dir to store audio data
        if "fim_s" in label_path: # all seasons other than s01
            episode = label_path.split("fim_s")[1][2:5]
            season = "S"+label_path.split("fim_s")[1][:2]
            s_ep = (season + episode).upper()
            base_audio_dirs.append(season)
            base_audio_dirs.append(episode)
        
        prev_label = [-10, -10, "00_00_00_Null_Null_Noisy_"]
        for label in tqdm(labels, leave=False, desc=f"{i}/{len(linked_paths)}"):
            audio_dirs = base_audio_dirs # get relative output path back to ["S01","E01"]
            label_start, label_stop, info = label
            *time_stamp, voice, str_emotion, noise_level, quote = info.split("_") # 00_00_05_Celestia_Neutral__Once upon a time. -> [["00","00","05"],"Celestia","Neutral","","Once upon a time."]
            emotions = str_emotion.split(" ") # "Happy Shouting" -> ["Happy","Shouting"]
            
            prev_label_start, prev_label_stop, prev_info = prev_label
            *prev_time_stamp, prev_voice, prev_str_emotion, prev_noise_level, prev_quote = prev_info.split("_") # 00_00_05_Celestia_Neutral__Once upon a time. -> [["00","00","05"],"Celestia","Neutral","","Once upon a time."]
            prev_emotions = prev_str_emotion.split(" ") # "Happy Shouting" -> ["Happy","Shouting"]
            
            time_between_clips = float(label_start) - float(prev_label_stop) # time between start of this clip and end of the last clip
            prev_label_start_sample = int(float(prev_label_start)*SAMPLE_RATE)
            prev_label_stop_sample = int(float(prev_label_stop)*SAMPLE_RATE)
            label_start_sample = int(float(label_start)*SAMPLE_RATE)
            label_stop_sample = int(float(label_stop)*SAMPLE_RATE)
            
            # get more accurate silent time between clips
            if (float(prev_label_stop)-float(prev_label_start)) > 0.025: # if time_between clips longer than 25ms and previous clip has duration longer than 25ms
                def return_ref(input):
                    return 0.0000250# 0.0000020 for trimming silence but leaving breathing in. # 0.0000250 for trimming silence and also quiet breathing.
                clipped_audio = audio[prev_label_start_sample:prev_label_stop_sample] # previous sample
                index = librosa.effects.trim(clipped_audio, top_db=0, frame_length=int(SAMPLE_RATE*0.100), hop_length=int(SAMPLE_RATE*0.0125), ref=return_ref)[1] # gonna be a little messed up for different sampling rates
                prev_label_stop_speech_sample = prev_label_start_sample+index[1] # when previous loud speech stop, ignores breating and is a rough trim
                #prev_label_start_sample = prev_label_start_sample+index[0]
                clipped_audio = audio[label_start_sample:label_stop_sample] # current sample
                index = librosa.effects.trim(clipped_audio, top_db=0, frame_length=int(SAMPLE_RATE*0.100), hop_length=int(SAMPLE_RATE*0.0125), ref=return_ref)[1] # gonna be a little messed up for different sampling rates
                #label_stop_sample = label_start_sample+index[1]
                label_start_speech_sample = label_start_sample+index[0] # when loud speech starts, ignores breating and is a rough trim
                time_between_clips = (label_start_speech_sample - prev_label_stop_speech_sample)/SAMPLE_RATE # time where there is only silence or very quiet breathing.
                
                # this block does the same as above, just in one dense line and gets the silent time rather than quiet time between clips.
                def return_ref(input):
                    return 0.0000020# 0.0000020 for trimming silence but leaving breathing in. # 0.0000250 for trimming silence and also quiet breathing.
                silent_time_between_clips = ((label_start_sample+librosa.effects.trim(audio[label_start_sample:label_stop_sample], top_db=0, frame_length=int(SAMPLE_RATE*0.100), hop_length=int(SAMPLE_RATE*0.0125), ref=return_ref)[1][0]) - (prev_label_start_sample+librosa.effects.trim(audio[prev_label_start_sample:prev_label_stop_sample], top_db=0, frame_length=int(SAMPLE_RATE*0.100), hop_length=int(SAMPLE_RATE*0.0125), ref=return_ref)[1][1]))/SAMPLE_RATE # time where there is only silence or very quiet breathing. 
                
                # get rough volume between clips
                audio_between_clips = audio[prev_label_stop_speech_sample:label_start_speech_sample]
                std_between_clips = np.std(audio_between_clips) if len(audio_between_clips) > 10 else 0
            
            # --- DO STUFF ---
            
            if time_between_clips < 0.8 and noise_level == "" and prev_noise_level == "":
                clipped_audio = audio[prev_label_start_sample:label_stop_sample]
                assert len(clipped_audio), f"LABEL: {label}\nFILE: {label_path}\ndid not process correctly. Output audio has 0 length."
                
                if prev_voice.lower() == voice.lower():
                    audio_dirs = ["Merged Same Speaker"] + audio_dirs
                else:
                    audio_dirs = ["Merged Multi Speaker"] + audio_dirs                    
                
                # cleanup the break point between clips
                if explicit_merge:
                    merged_quote = prev_quote+'#'+quote
                else:
                    if time_between_clips < 0.08:
                        prev_quote = prev_quote[:-1]+"," if prev_quote[-1] == "." else prev_quote
                        merged_quote = prev_quote+' '+quote
                    elif time_between_clips < 0.3:
                        merged_quote = prev_quote+' '+quote
                    elif silent_time_between_clips > 0.4:
                        prev_quote = prev_quote+".." if prev_quote[-1] == "." else prev_quote # "Sentence. Yes?" -> "Sentence... Yes?" if pause has no breathing and last clip ends with "."
                        merged_quote = prev_quote+' '+quote
                    else:
                        merged_quote = prev_quote+' '+quote
                
                # ensure merged quote isn't too long for saving.
                while len(merged_quote) > MAX_FILENAME_LENGTH:
                    merged_quote = " ".join(merged_quote.split(" ")[:-1])+merged_quote[-1] # cut of the last word
                
                filename = f"{'_'.join(prev_time_stamp)}_{' '.join(uniqueElems([prev_voice, voice]))}_{' '.join(uniqueElems(prev_emotions+emotions))}_{noise_level}_{merged_quote}.wav"
                output_folder = os.path.join(working_dir, 'Sliced', *audio_dirs)
                os.makedirs(output_folder, exist_ok=True)
                output_path = os.path.join(output_folder, filename)
                librosa.output.write_wav(output_path, clipped_audio, SAMPLE_RATE)
            
            if False: # normal sliced dialogue
                clipped_audio = audio[label_start_sample:label_stop_sample]
                assert len(clipped_audio), f"LABEL: {label}\nFILE: {label_path}\ndid not process correctly. Output audio has 0 length."
                
                audio_dirs = ["Sliced"] + audio_dirs
                
                filename = f"{'_'.join(prev_time_stamp)}_{' '.join(uniqueElems([prev_voice, voice]))}_{' '.join(uniqueElems(prev_emotions+emotions))}_{noise_level}_{quote}.wav"
                output_folder = os.path.join(working_dir, 'Sliced', *audio_dirs)
                os.makedirs(output_folder, exist_ok=True)
                output_path = os.path.join(output_folder, filename)
                librosa.output.write_wav(output_path, clipped_audio, SAMPLE_RATE)
            
            # --- DO STUFF ---
            prev_label = label


def test_audio_files(directory):
    """
    For different audio files, test whether they trim correctly.
    """
    paths = glob(directory+"/*.wav")
    for audio_path in paths:
        audio, _ = librosa.core.load(audio_path, sr=SAMPLE_RATE)
        def return_ref(input):
            return 0.0000250 # 0.0000020 for trimming silence but leaving breathing in.
                             # 0.0000250 for trimming silence and also quiet breathing.
        index = librosa.effects.trim(audio, top_db=0, frame_length=int(SAMPLE_RATE*0.100), hop_length=int(SAMPLE_RATE*0.0125), ref=return_ref)[1]   
        print("file:", os.path.basename(audio_path),"\t", round(index[0]/SAMPLE_RATE,2), "\t", round((len(audio)-index[1])/SAMPLE_RATE,2), "\tduration:", round((index[1]-index[0])/SAMPLE_RATE,2), "\toriginal_duration:\t", round(len(audio)/SAMPLE_RATE,2), "\tPercent:",round((index[1]-index[0])/len(audio),3))

In [3]:
from distutils.dir_util import copy_tree, remove_tree

In [4]:
THREADS = 6
AUDIO_DEPTH = 2 # target Audio, eg. 16 bit audio = 2 (bytes)
SAMPLE_RATE = 48000

DIRECTORY_GLOBAL = r"D:\Sound Sample\Testing"

In [5]:
def multiprocess_directory_wavs(func, directory, threads=1):
    file_paths = glob(directory+r"/**/*.wav", recursive=True)
    print(func(file_paths))

In [37]:
def low_pass_filter_wavs_multiprocess(file_paths_arr, cutoff_freq, strength):
    import soundfile as sf
    import scipy
    from scipy import signal
    
    prev_sr = 0
    for file_path in tqdm(file_paths_arr, smoothing=0.0): # recursive directory search
            audio, sample_rate = sf.read(file_path) # load audio to RAM
            if cutoff_freq*2 < sample_rate:
                if sample_rate != prev_sr:
                        sos = signal.butter(strength, cutoff_freq, 'lp', fs=sample_rate, output='sos') # calcuate filter somethings
                        prev_sr = sample_rate
                filtered_audio = signal.sosfilt(sos, audio) # apply filter
                sf.write(file_path, filtered_audio, sample_rate) # write back to disk

In [42]:
# ---------Main block--------------
if __name__ == '__main__':
    metadata = {}
    
    remove_tree(DIRECTORY_GLOBAL)
    copy_tree(r"D:\Sound Sample\Original", DIRECTORY_GLOBAL)
    
    freq = 150
    strength = 4
    print(f"Running Global {freq}Hz highpass filter to remove microphone noise/quiet electical humming.")
    def func(array):
        high_pass_filter_wavs_multiprocess(array, freq, strength)
    multiprocess_directory_wavs(func, DIRECTORY_GLOBAL, threads=1)
    
    freq = 40
    strength = 60
    print(f"Running Global {freq}Hz highpass filter to remove microphone noise/quiet electical humming.")
    def func(array):
        high_pass_filter_wavs_multiprocess(array, freq, strength)
    multiprocess_directory_wavs(func, DIRECTORY_GLOBAL, threads=1)
    
    freq = 16000
    strength = 10
    print(f"Running Global {freq}Hz lowpass filter to remove microphone noise/quiet electical humming.")
    def func(array):
        low_pass_filter_wavs_multiprocess(array, freq, strength)
    multiprocess_directory_wavs(func, DIRECTORY_GLOBAL, threads=1)
    
    freq = 20000
    strength = 60
    print(f"Running Global {freq}Hz lowpass filter to remove microphone noise/quiet electical humming.")
    def func(array):
        low_pass_filter_wavs_multiprocess(array, freq, strength)
    multiprocess_directory_wavs(func, DIRECTORY_GLOBAL, threads=1)

 29%|███████████████████████████▎                                                                   | 19/66 [00:00<00:00, 182.67it/s]

Running Global 150Hz highpass filter to remove microphone noise/quiet electical humming.


100%|███████████████████████████████████████████████████████████████████████████████████████████████| 66/66 [00:00<00:00, 184.33it/s]
  6%|█████▉                                                                                           | 4/66 [00:00<00:02, 30.88it/s]

None
Running Global 40Hz highpass filter to remove microphone noise/quiet electical humming.


100%|████████████████████████████████████████████████████████████████████████████████████████████████| 66/66 [00:02<00:00, 24.67it/s]
 21%|████████████████████▏                                                                          | 14/66 [00:00<00:00, 135.25it/s]

None
Running Global 16000Hz lowpass filter to remove microphone noise/quiet electical humming.


100%|███████████████████████████████████████████████████████████████████████████████████████████████| 66/66 [00:00<00:00, 110.82it/s]
  6%|█████▉                                                                                           | 4/66 [00:00<00:01, 32.52it/s]

None
Running Global 20000Hz lowpass filter to remove microphone noise/quiet electical humming.


100%|████████████████████████████████████████████████████████████████████████████████████████████████| 66/66 [00:02<00:00, 28.40it/s]

None





In [83]:
import random; training_percent = 0.5
with open("train.txt","w") as train, open("val.txt","w") as val:
    [train.write(f"{x}\n") if random.random() < training_percent else val.write(f"{x}\n") for x in open("training_lines_here.txt",'r').read().split("\n")]

In [100]:
exec('import random; training_percent = 0.5\nwith open("train.txt","w") as train, open("val.txt","w") as val:\n\t[train.write(f"{x}\\n") if random.random() < training_percent else val.write(f"{x}\\n") for x in open("training_lines_here.txt",'r').read().split("\\n")]')

In [101]:
thisdict = {}

In [1]:
%%timeit
string = ""
for i in range(120000):
    string = string + f"EXTRA STUFF EXTRA STUFF EXTRA STUFF EXTRA STUFF. {i} + {i*100}\n"

3.07 s ± 44.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [2]:
%%timeit
arr = []
for i in range(120000):
    arr.append(f"EXTRA STUFF EXTRA STUFF EXTRA STUFF EXTRA STUFF. {i} + {i*100}")
arr = "\n".join(arr)

69.3 ms ± 1.87 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [7]:
%%timeit
arr = list()
for i in range(120000):
    arr.append(f"EXTRA STUFF EXTRA STUFF EXTRA STUFF EXTRA STUFF. {i} + {i*100}")
arr = "\n".join(arr)

67 ms ± 1.72 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [13]:
%%timeit
arr = list()
for i in range(120000):
    arr.append(f"EXTRA STUFF EXTRA STUFF EXTRA STUFF EXTRA STUFF. {i} + {i*100}")
arr = "\n".join(arr)

68.8 ms ± 2.36 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [24]:
%%timeit
string = ""
end_chars = "$"
for i in range(12):
    string=string + ("WORD " + end_chars).strip()

3.39 µs ± 251 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


In [23]:
%%timeit
string = ""
end_chars = "$"
for i in range(12):
    string+=("WORD " + end_chars).strip()

3.54 µs ± 231 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


In [21]:
%%timeit
arr = []
end_chars = "$"
for i in range(12):
    arr.append(("WORD " +end_chars).strip())
arr = " ".join(arr)

3.81 µs ± 188 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


In [26]:
%%timeit
" ".join([f"WORD {x}" for x in range(12)])

3.2 µs ± 224 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


In [36]:
import torch
torch.randint(0,65535,(5,))

tensor([15470, 48836, 17056, 54425, 46086])

In [62]:
conv = torch.nn.Conv1d(1,1,3, padding=1, padding_mode='reflect')
conv.weight.data

In [71]:
input = torch.arange(10)[None, None, :].float()
print(input)

tensor([[[0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]]])


In [72]:
with torch.no_grad():
    print(conv(input), conv(input).shape, sep='\n')

tensor([[[ 1.5703,  2.6898,  5.3838,  8.0778, 10.7718, 13.4658, 16.1598,
          18.8539, 21.5479, 21.7562]]])
torch.Size([1, 1, 10])


In [80]:
_=[print(f"{i:<5}|{((2**8)*3*i):<10}|{((2**8)*3*i)/600:<10}") for i in range(60)]

0    |0         |0.0       
1    |768       |1.28      
2    |1536      |2.56      
3    |2304      |3.84      
4    |3072      |5.12      
5    |3840      |6.4       
6    |4608      |7.68      
7    |5376      |8.96      
8    |6144      |10.24     
9    |6912      |11.52     
10   |7680      |12.8      
11   |8448      |14.08     
12   |9216      |15.36     
13   |9984      |16.64     
14   |10752     |17.92     
15   |11520     |19.2      
16   |12288     |20.48     
17   |13056     |21.76     
18   |13824     |23.04     
19   |14592     |24.32     
20   |15360     |25.6      
21   |16128     |26.88     
22   |16896     |28.16     
23   |17664     |29.44     
24   |18432     |30.72     
25   |19200     |32.0      
26   |19968     |33.28     
27   |20736     |34.56     
28   |21504     |35.84     
29   |22272     |37.12     
30   |23040     |38.4      
31   |23808     |39.68     
32   |24576     |40.96     
33   |25344     |42.24     
34   |26112     |43.52     
35   |26880     |44.

In [53]:
sr = 48000
hop_length = 600
print("time|n_group|length|\n~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
_=[print(f"{round(1000*x/sr,2):<4}|{x:7}|{hop_length/x:<6}|{'Whole Number' if hop_length//x == hop_length/x else ''}") for x in range(2,hop_length+2,2) if hop_length//x == hop_length/x]
 # n_group | hop_size/n_group | whole_number

time|n_group|length|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
6.25|      2|300.0 |Whole Number
3.12|      4|150.0 |Whole Number
2.08|      6|100.0 |Whole Number
1.56|      8|75.0  |Whole Number
1.25|     10|60.0  |Whole Number
1.04|     12|50.0  |Whole Number
0.62|     20|30.0  |Whole Number
0.52|     24|25.0  |Whole Number
0.42|     30|20.0  |Whole Number
0.31|     40|15.0  |Whole Number
0.25|     50|12.0  |Whole Number
0.21|     60|10.0  |Whole Number
0.12|    100|6.0   |Whole Number
0.1 |    120|5.0   |Whole Number
0.08|    150|4.0   |Whole Number
0.06|    200|3.0   |Whole Number
0.04|    300|2.0   |Whole Number
0.02|    600|1.0   |Whole Number
