<a href="https://colab.research.google.com/github/buganart/descriptor-transformer/blob/main/predict_notebook/Unagan_generate.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title Setup
# @markdown 1. Before starting please save the notebook in your drive by clicking on `File -> Save a copy in drive`
# @markdown 2. Check GPU, should be a Tesla V100 if you want to train it as fast as possible.
# @markdown 3. Mount google drive.
# @markdown 4. Log in to wandb.

!nvidia-smi -L
import os
os.environ["WANDB_MODE"] = "dryrun"

print(f"We have {os.cpu_count()} CPU cores.")
print()

try:
    from google.colab import drive, output

    IN_COLAB = True
except ImportError:
    from IPython.display import clear_output

    IN_COLAB = False

from pathlib import Path

if IN_COLAB:
    drive.mount("/content/drive/")

    if not Path("/content/drive/My Drive/IRCMS_GAN_collaborative_database").exists():
        raise RuntimeError(
            "Shortcut to our shared drive folder doesn't exits.\n\n"
            "\t1. Go to the google drive web UI\n"
            '\t2. Right click shared folder IRCMS_GAN_collaborative_database and click "Add shortcut to Drive"'
        )

clear = output.clear if IN_COLAB else clear_output

def clear_on_success(msg="Ok!"):
    if _exit_code == 0:
        clear()
        print(msg)

print()
print("Wandb installation and login ...")
%pip install -q wandb

wandb_drive_netrc_path = Path("drive/My Drive/colab/.netrc")
wandb_local_netrc_path = Path("/root/.netrc")
if wandb_drive_netrc_path.exists():
    import shutil

    print("Wandb .netrc file found, will use that to log in.")
    shutil.copy(wandb_drive_netrc_path, wandb_local_netrc_path)
else:
    print(
        f"Wandb config not found at {wandb_drive_netrc_path}.\n"
        f"Using manual login.\n\n"
        f"To use auto login in the future, finish the manual login first and then run:\n\n"
        f"\t!mkdir -p '{wandb_drive_netrc_path.parent}'\n"
        f"\t!cp {wandb_local_netrc_path} '{wandb_drive_netrc_path}'\n\n"
        f"Then that file will be used to login next time.\n"
    )

!wandb login

In [None]:
#@title Clone unagan repo and Install dependencies

os.environ["WANDB_MODE"] = "dryrun"
%pip install torch==1.7.1
if IN_COLAB:
    # unagan base package
    !git clone https://github.com/buganart/unagan
    %cd "/content/unagan/"
    # !git checkout dev
    %pip install -r requirements.txt

    clear_on_success("Repo cloned! Dependencies installed!")

In [None]:
#@title Configuration

#@markdown Wandb run id for `melgan` run.
melgan_run_id = "a84z6f5s" #@param {type: "string"}
#a84z6f5s
#3gtjli55

#@markdown Wandb run id for `unagan` run.
unagan_run_id = "2lttzztx" #@param {type: "string"}
#2lttzztx

#@markdown Wandb run id for `hifigan` run.
hifigan_run_id = "" #@param {type: "string"}
#292sxfbq

#@markdown If id is null, choose only one available run or loop through all runs.
random_pick_one_run = False #@param {type: "boolean"}

#@markdown whether to access API for the candidate list or use saved list.
check_API = False #@param {type: "boolean"}

#@markdown Duration of generate samples in seconds.
duration = 10 #@param {type: "integer"}

#@markdown Number of samples to generate.
num_samples =  10#@param {type: "integer"}

#@markdown Random seed for sample generation.
seed = 123 #@param {type: "integer"}
randomize_seed = False #@param {type: "boolean"}

#@markdown The path of the directory where the generated audio files are put.
# output_dir = "/tmp/"
output_dir = "/content/drive/MyDrive/AUDIO DATABASE/TESTING/unagan_output" #@param {type:"string"}
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)

#@markdown Whether to merge generated audios into single file with multiple channel. 
#@markdown - If channel_per_merge = k, k generated audios (1 channel) will be merged into 1 file (k channel).
#@markdown - channel_per_merge will be set smaller or equal to num_samples.
#@markdown - audios are picked randomly without replacement in 1 file, with replacement in between files.
random_merge_audio = True #@param {type: "boolean"}
channel_per_merge = 3 #@param {type: "integer"}
num_combine_audio =  3#@param {type: "integer"}

if channel_per_merge > num_samples:
    channel_per_merge = num_samples

melgan_run_id_record = ['2ai3uoxn', 'lv56hoss', '2dfxmup0', '1sv05hdn', '2gvadb9k', '2uoyw2jq', 'f7vdf3a2', 'h6tospcl', 'a84z6f5s', '10evzcbq']

unagan_run_id_record = ['1ouario5', '3dv0joim', '3b3rwwrs', '1bp1tbai', '1o5e7qs0', '3bayb4xw', '2vqtaos2', '2p3gp4fe', '184nt6ss', '3cs7wwdw', '1bv8sjfl', 's1xg5iup', '13bfkv2p', '2xwq0s2t', 'zbs3z4wc', '10kfp3k1', '20te56qg', '3uy2er7w', '3i9ask6d', '2lttzztx', '2o3gbv1z', '2z700yhr', '1nckwb0l', '2e6l2ncc', 'vnuowoyp', '2t14xcu3', '3cgx95mb', '7ev2hycv', '1yh1havm', '2qtrtkoy', '30yoc8hd', '399pheul', '11qztpuj']

hifigan_run_id_record = ['292sxfbq']
                        
import random
import wandb
import download_weights
def get_run_candidate_list(model):
    if not check_API:
        if model == "unagan":
            return unagan_run_id_record
        elif model == "melgan":
            return melgan_run_id_record
        else:
            return hifigan_run_id_record

    api = wandb.Api()
    all_runs = api.runs(f"demiurge/{model}")
    run_candidate_list = []
    for run in all_runs:
        #epoch test
        summary = run.summary._json_dict
        if "epoch" not in summary:
            continue
        if summary["epoch"] < 10:
            continue
        #TODO: 44.1kHz test
        #weight file test
        paths = download_weights.MODEL_PATHS[model]
        files = list(run.files())
        wandb_files_grouped_by_filename = download_weights.group_filenames(files)
        #check for all weight files
        failed = False
        for filename, _ in paths:
            #check weight file exists
            if filename not in wandb_files_grouped_by_filename:
                failed = True
                break
            # wandb_files = wandb_files_grouped_by_filename[filename]
            #check weight file duplicates
            # if len(wandb_files) > 1:
            #     failed = True
            #     break
        if failed:
            continue

        ### every test passed
        print(f"candidate id: {str(run.id)} found!")
        run_candidate_list.append(str(run.id))
    print(model + ": random select run id: found "+str(len(run_candidate_list))+" qualified runs.")
    print("candidate_list: "+str(run_candidate_list))
    #check list
    if len(run_candidate_list) == 0:
        raise Exception(model + ": No run in wandb is qualified.")
    return run_candidate_list

def random_select_id(model):
    #pick 1 from the run_candidate_list
    run_candidate_list = get_run_candidate_list(model)
    selected_run = random.choice(run_candidate_list)
    print(model + ": picked run_id: "+str(selected_run))
    return selected_run

def check_wandb_id(run_id, model):
    import re
    if run_id and not re.match(r"^[\da-z]{8}$", run_id):
        raise RuntimeError(
            "Run ID needs to be 8 characters long and contain only letters a-z and digits.\n"
            f"Got \"{run_id}\""
        )

# input validation
if melgan_run_id and hifigan_run_id:
    raise RuntimeError("Both melgan_run_id and hifigan_run_id are set. Please only select one.")
 

# unagan ids
unagan_run_id = unagan_run_id.strip().lower()
if not unagan_run_id:
    if random_pick_one_run:
        unagan_run_id = [random_select_id("unagan")]
    else:
        print("unagan_run_id is empty, find candidate_list!")
        unagan_run_id = get_run_candidate_list("unagan")
else:
    unagan_run_id = unagan_run_id.split(",")
    unagan_run_id = [id.strip().lower() for id in unagan_run_id]
for id in unagan_run_id:
    check_wandb_id(id, "unagan")

if not hifigan_run_id:
    # melgan ids
    melgan_run_id = melgan_run_id.strip().lower()
    if not melgan_run_id:
        if random_pick_one_run:
            melgan_run_id = [random_select_id("melgan")]
        else:
            print("melgan_run_id is empty, find candidate_list!")
            melgan_run_id = get_run_candidate_list("melgan")
    else:
        melgan_run_id = melgan_run_id.split(",")
        melgan_run_id = [id.strip() for id in melgan_run_id]
    for id in melgan_run_id:
        check_wandb_id(id, "melgan")
else:
    # hifigan ids
    hifigan_run_id = hifigan_run_id.strip().lower()
    if not hifigan_run_id:
        if random_pick_one_run:
            hifigan_run_id = [random_select_id("hifi-gan")]
        else:
            print("hifigan_run_id is empty, find candidate_list!")
            hifigan_run_id = get_run_candidate_list("hifi-gan")
    else:
        hifigan_run_id = hifigan_run_id.split(",")
        hifigan_run_id = [id.strip() for id in hifigan_run_id]
    for id in hifigan_run_id:
        check_wandb_id(id, "hifi-gan")


config = dict(
    melgan_run_id=melgan_run_id,
    unagan_run_id=unagan_run_id,
    hifigan_run_id=hifigan_run_id,
    duration=duration,
    num_samples=num_samples,
    seed=seed,
    output_dir=output_dir,
)

for k,v in config.items():
    print(f"=> {k:20}: {v}")

In [None]:
#@title Generate
import generate

def generate_sample(melgan_run_id, unagan_run_id, hifigan_run_id, seed):
    temp_location = Path(f"/tmp/generated/")
    temp_location.mkdir(parents=True, exist_ok=True)
    old_files = temp_location.rglob("*.*")
    for o in old_files:
        o.unlink()

    sample_list, _ = generate.main(
        num_samples=num_samples, 
        gid=0,
        output_folder=temp_location,
        seed=seed,
        duration=duration,
        melgan_run_id=melgan_run_id,
        unagan_run_id=unagan_run_id,
        hifigan_run_id=hifigan_run_id,
    )
        
    print(f"melgan:{melgan_run_id}, unagan:{unagan_run_id}, hifigan:{hifigan_run_id}, seed:{seed}.")
    print(f"temp_location:{temp_location}, output_dir:{output_dir}")
    %cp -av "$temp_location" "$output_dir"
    print(f"Success! Samples saved to {output_dir}/generated")
    return sample_list

if melgan_run_id != "" and melgan_run_id != [""]:
    hifigan_run_id = [""]
else:
    melgan_run_id = [""]

for melgan_id in melgan_run_id:
    for hifigan_id in hifigan_run_id:
        for unagan_id in unagan_run_id:
            if randomize_seed:
                seed = random.randint(0, 10000)
            try:
                sample_list = generate_sample(melgan_id, unagan_id, hifigan_id, seed)
            except Exception as e:
                print(f"melgan:{melgan_id}, unagan:{unagan_id}, hifigan:{hifigan_id}, seed:{seed}. Failed")
                print("Error: " + str(e))



In [None]:
#@title Merge
import numpy as np

if random_merge_audio:
    !git clone https://github.com/JiachuanDENG/combine-multiple-channels-of-audio-files
    %cd "./combine-multiple-channels-of-audio-files"
    from combineMultiChannels import combineMultFns

    #extract file header and sample indices
    outfile_name_header = str(sample_list[0].stem).split("_sample")[0]
    sampleIndex_list = []
    for s in sample_list:
        index = (str(s).split("_sample")[1]).split(".")[0]
        sampleIndex_list.append(index)
    
    # combine audio
    chns = [[0]] * channel_per_merge
    for _ in range(num_combine_audio):
        indices = np.random.choice(num_samples, channel_per_merge, replace=False)
        selectedSampleIndex = [sampleIndex_list[i] for i in indices]
        selectedSampleName = [str(sample_list[i]) for i in indices]
        outfile_name = output_dir / ("generated/" + outfile_name_header + "_sample" +("-".join(selectedSampleIndex))+ ".wav")
        combineMultFns(selectedSampleName, chns, str(outfile_name))
        print(f"generate success: {outfile_name}")