<a href="https://colab.research.google.com/github/ZMIPTV/Tivimate/blob/main/Fr_Easy_1_11.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title Install to Google Drive (for Resuming Training & Automatic Saving)
%cd /content
from google.colab import drive
drive.mount('/content/drive')
from IPython.display import clear_output
from ipywidgets import Button
import os
if not os.path.exists('/content/drive'):
    print("Your drive is not mounted. Creating Fake Drive.")
    os.makedirs('/content/drive/MyDrive')
source = "Rejekts"
!wget https://huggingface.co/{source}/project/resolve/main/project-main.zip -O '/content/project-main.zip' && unzip -n 'project-main.zip' -d /content/drive/MyDrive
!cd '/content/drive/MyDrive/project-main' && python download_files.py && pip install -r 'requirements-safe.txt'
!pip install pyngrok tensorflow==2.12.0
!rm /content/project-main.zip
!rm -r /content/sample_data
!mkdir -p /content/dataset
clear_output()
Button(description="\u2714 Success", button_style="success")

In [None]:
#@markdown ##Click this to load a DATASET instead.
DATASET = "NameZip.zip"  #@param {type:"string"}
import os
import shutil
from glob import glob
import concurrent.futures
import subprocess

def sanitize_directory(directory):
    for filename in os.listdir(directory):
        file_path = os.path.join(directory, filename)
        if os.path.isfile(file_path):
            if filename == ".DS_Store" or filename.startswith("._") or not filename.endswith(('.wav', '.flac', '.mp3', '.ogg', '.m4a')):
                os.remove(file_path)
        elif os.path.isdir(file_path):
            sanitize_directory(file_path)

def convert_file(source_file, output_filename_converted):
    # Check if the input file is 16-bit
    probe_cmd = f'ffprobe -v error -select_streams a:0 -show_entries stream=sample_fmt -of default=noprint_wrappers=1:nokey=1 "{source_file}"'
    sample_format = subprocess.run(probe_cmd, shell=True, text=True, capture_output=True).stdout.strip()

    # Use appropriate ffmpeg command based on sample format
    if sample_format == 's16':
        # Export as 16-bit WAV
        cmd = f'ffmpeg -i "{source_file}" -c:a pcm_s16le "{output_filename_converted}"'
    else:
        # Export as 32-bit float WAV (default behavior)
        cmd = f'ffmpeg -i "{source_file}" -c:a pcm_f32le "{output_filename_converted}"'

    process = subprocess.run(cmd, shell=True)
    if process.returncode != 0:
        print(f'Failed to convert {source_file}. The file may be corrupt.')
    else:
        os.remove(source_file)

def convert_audio_files(source_dir, dest_dir):
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for root, _, files in os.walk(source_dir):
            for filename in files:
                file_ext = os.path.splitext(filename)[1].lower()
                if file_ext in ['.wav', '.flac', '.mp3', '.ogg', '.m4a']:
                    source_file = os.path.join(root, filename)
                    output_filename = os.path.join(dest_dir, filename)
                    output_filename_converted = os.path.splitext(output_filename)[0] + '_converted.wav'
                    executor.submit(convert_file, source_file, output_filename_converted)

dataset_path = '/content/drive/MyDrive/dataset/' + DATASET
final_directory = '/content/dataset'
temp_directory = '/content/temp_dataset'

try:
    if os.path.exists(final_directory):
        shutil.rmtree(final_directory)
        print("Dataset folder already found. Wiping clean for import operation...")
except Exception as e:
    print(f"Error in removing the final directory: {e}")

try:
    if not os.path.exists(dataset_path):
        raise Exception(f'There is no {DATASET} in {os.path.dirname(dataset_path)}')
except Exception as e:
    print(f"Error in verifying dataset: {e}")

try:
    os.makedirs(final_directory, exist_ok=True)
    os.makedirs(temp_directory, exist_ok=True)
except Exception as e:
    print(f"Error in creating directories: {e}")

try:
    # Unzip data into a temporary directory
    !unzip -d {temp_directory} -B {dataset_path}
except Exception as e:
    print(f"Error in unzipping data: {e}")

try:
    # Sanitize temporary directory
    sanitize_directory(temp_directory)
except Exception as e:
    print(f"Error in sanitizing directory: {e}")

try:
    # Convert audio files and move them to the final directory
    convert_audio_files(temp_directory, final_directory)
except Exception as e:
    print(f"Error in converting audio files: {e}")

try:
    # Clean up temp directory
    shutil.rmtree(temp_directory)
except Exception as e:
    print(f"Error in removing temp directory: {e}")

try:
    # Rename files if needed
    !rename 's/(\w+)\.(\w+)~(\d*)/$1_$3.$2/' {final_directory}/*.*~*
except Exception as e:
    print(f"Error in renaming files: {e}")

print('Dataset imported. You can now copy the path of the dataset folder to the training path.')

In [None]:
#@title 1.Preprocess Data
%cd /content/drive/MyDrive/project-main
model_name = 'My-Voice' #@param {type:"string"}
#@markdown <small> Enter the path to your dataset folder (a folder with audios of the vocals you will train on), or if you want just upload the audios using the File Manager into the 'dataset' folder.
dataset_folder = '/content/dataset' #@param {type:"string"}
while len(os.listdir(dataset_folder)) < 1:
    input("Your dataset folder is empty.")
!mkdir -p ./logs/{model_name}
with open(f'./logs/{model_name}/preprocess.log','w') as f:
    print("Starting...")
!python infer/modules/train/preprocess.py {dataset_folder} 40000 2 ./logs/{model_name} False 3.0 > /dev/null 2>&1
with open(f'./logs/{model_name}/preprocess.log','r') as f:
    if 'end preprocess' in f.read():
        clear_output()
        display(Button(description="\u2714 Success", button_style="success"))
    else:
        print("Error preprocessing data... Make sure your dataset folder is correct.")

In [None]:
#@title 2.Extract Features
f0method = "rmvpe_gpu" # @param ["pm", "harvest", "rmvpe", "rmvpe_gpu"]
%cd /content/drive/MyDrive/project-main
with open(f'./logs/{model_name}/extract_f0_feature.log','w') as f:
    print("Starting...")
if f0method != "rmvpe_gpu":
    !python infer/modules/train/extract/extract_f0_print.py ./logs/{model_name} 2 {f0method}
else:
    !python infer/modules/train/extract/extract_f0_rmvpe.py 1 0 0 ./logs/{model_name} True
!python infer/modules/train/extract_feature_print.py cuda:0 1 0 0 ./logs/{model_name} v2
with open(f'./logs/{model_name}/extract_f0_feature.log','r') as f:
    if 'all-feature-done' in f.read():
        clear_output()
        display(Button(description="\u2714 Success", button_style="success"))
    else:
        print("Error preprocessing data... Make sure your data was preprocessed.")

In [None]:
#@title 3.Train Index
import numpy as np
import faiss
%cd /content/drive/MyDrive/project-main
def train_index(exp_dir1, version19):
    exp_dir = "logs/%s" % (exp_dir1)
    os.makedirs(exp_dir, exist_ok=True)
    feature_dir = (
        "%s/3_feature256" % (exp_dir)
        if version19 == "v1"
        else "%s/3_feature768" % (exp_dir)
    )
    if not os.path.exists(feature_dir):
        return "请先进行特征提取!"
    listdir_res = list(os.listdir(feature_dir))
    if len(listdir_res) == 0:
        return "请先进行特征提取！"
    infos = []
    npys = []
    for name in sorted(listdir_res):
        phone = np.load("%s/%s" % (feature_dir, name))
        npys.append(phone)
    big_npy = np.concatenate(npys, 0)
    big_npy_idx = np.arange(big_npy.shape[0])
    np.random.shuffle(big_npy_idx)
    big_npy = big_npy[big_npy_idx]
    if big_npy.shape[0] > 2e5:
        infos.append("Trying doing kmeans %s shape to 10k centers." % big_npy.shape[0])
        yield "\n".join(infos)
        try:
            big_npy = (
                MiniBatchKMeans(
                    n_clusters=10000,
                    verbose=True,
                    batch_size=256 * config.n_cpu,
                    compute_labels=False,
                    init="random",
                )
                .fit(big_npy)
                .cluster_centers_
            )
        except:
            info = traceback.format_exc()
            logger.info(info)
            infos.append(info)
            yield "\n".join(infos)

    np.save("%s/total_fea.npy" % exp_dir, big_npy)
    n_ivf = min(int(16 * np.sqrt(big_npy.shape[0])), big_npy.shape[0] // 39)
    infos.append("%s,%s" % (big_npy.shape, n_ivf))
    yield "\n".join(infos)
    index = faiss.index_factory(256 if version19 == "v1" else 768, "IVF%s,Flat" % n_ivf)
    infos.append("training")
    yield "\n".join(infos)
    index_ivf = faiss.extract_index_ivf(index)  #
    index_ivf.nprobe = 1
    index.train(big_npy)
    faiss.write_index(
        index,
        "%s/trained_IVF%s_Flat_nprobe_%s_%s_%s.index"
        % (exp_dir, n_ivf, index_ivf.nprobe, exp_dir1, version19),
    )

    infos.append("adding")
    yield "\n".join(infos)
    batch_size_add = 8192
    for i in range(0, big_npy.shape[0], batch_size_add):
        index.add(big_npy[i : i + batch_size_add])
    faiss.write_index(
        index,
        "%s/added_IVF%s_Flat_nprobe_%s_%s_%s.index"
        % (exp_dir, n_ivf, index_ivf.nprobe, exp_dir1, version19),
    )
    infos.append(
        "成功构建索引，added_IVF%s_Flat_nprobe_%s_%s_%s.index"
        % (n_ivf, index_ivf.nprobe, exp_dir1, version19)
    )

training_log = train_index(model_name, 'v2')

for line in training_log:
    print(line)
    if 'adding' in line:
        clear_output()
        display(Button(description="\u2714 Success", button_style="success"))

In [None]:
#@title 4.Train Model
#@markdown <small> Enter your ngrok authtoken to open tensorboard. Get one here: https://dashboard.ngrok.com/get-started/your-authtoken
ngrok_authtoken = ""#@param {type:"string"}
!ngrok config add-authtoken {ngrok_authtoken}
#%cd /content/drive/MyDrive/project-main
from random import shuffle
import json
import os
import pathlib
from subprocess import Popen, PIPE, STDOUT
from pyngrok import ngrok
now_dir=os.getcwd()
#@markdown <small> Enter the name of your model again. It must be the same you chose before.
model_name = 'My-Voice'#@param {type:"string"}
#@markdown <small> Choose how often to save the model and how much training you want it to have.
save_frequency = 20 # @param {type:"slider", min:5, max:50, step:5}
epochs = 420 # @param {type:"slider", min:10, max:1000, step:10}
#@markdown <small> ONLY cache datasets under 10 minutes long. Otherwise leave this unchecked.
cache = False #@param {type:"boolean"}
# Remove the logging setup

def click_train(
    exp_dir1,
    sr2,
    if_f0_3,
    spk_id5,
    save_epoch10,
    total_epoch11,
    batch_size12,
    if_save_latest13,
    pretrained_G14,
    pretrained_D15,
    gpus16,
    if_cache_gpu17,
    if_save_every_weights18,
    version19,
):
    # 生成filelist
    exp_dir = "%s/logs/%s" % (now_dir, exp_dir1)
    os.makedirs(exp_dir, exist_ok=True)
    gt_wavs_dir = "%s/0_gt_wavs" % (exp_dir)
    feature_dir = (
        "%s/3_feature256" % (exp_dir)
        if version19 == "v1"
        else "%s/3_feature768" % (exp_dir)
    )
    if if_f0_3:
        f0_dir = "%s/2a_f0" % (exp_dir)
        f0nsf_dir = "%s/2b-f0nsf" % (exp_dir)
        names = (
            set([name.split(".")[0] for name in os.listdir(gt_wavs_dir)])
            & set([name.split(".")[0] for name in os.listdir(feature_dir)])
            & set([name.split(".")[0] for name in os.listdir(f0_dir)])
            & set([name.split(".")[0] for name in os.listdir(f0nsf_dir)])
        )
    else:
        names = set([name.split(".")[0] for name in os.listdir(gt_wavs_dir)]) & set(
            [name.split(".")[0] for name in os.listdir(feature_dir)]
        )
    opt = []
    for name in names:
        if if_f0_3:
            opt.append(
                "%s/%s.wav|%s/%s.npy|%s/%s.wav.npy|%s/%s.wav.npy|%s"
                % (
                    gt_wavs_dir.replace("\\", "\\\\"),
                    name,
                    feature_dir.replace("\\", "\\\\"),
                    name,
                    f0_dir.replace("\\", "\\\\"),
                    name,
                    f0nsf_dir.replace("\\", "\\\\"),
                    name,
                    spk_id5,
                )
            )
        else:
            opt.append(
                "%s/%s.wav|%s/%s.npy|%s"
                % (
                    gt_wavs_dir.replace("\\", "\\\\"),
                    name,
                    feature_dir.replace("\\", "\\\\"),
                    name,
                    spk_id5,
                )
            )
    fea_dim = 256 if version19 == "v1" else 768
    if if_f0_3:
        for _ in range(2):
            opt.append(
                "%s/logs/mute/0_gt_wavs/mute%s.wav|%s/logs/mute/3_feature%s/mute.npy|%s/logs/mute/2a_f0/mute.wav.npy|%s/logs/mute/2b-f0nsf/mute.wav.npy|%s"
                % (now_dir, sr2, now_dir, fea_dim, now_dir, now_dir, spk_id5)
            )
    else:
        for _ in range(2):
            opt.append(
                "%s/logs/mute/0_gt_wavs/mute%s.wav|%s/logs/mute/3_feature%s/mute.npy|%s"
                % (now_dir, sr2, now_dir, fea_dim, spk_id5)
            )
    shuffle(opt)
    with open("%s/filelist.txt" % exp_dir, "w") as f:
        f.write("\n".join(opt))

    # Replace logger.debug, logger.info with print statements
    print("Write filelist done")
    print("Use gpus:", str(gpus16))
    if pretrained_G14 == "":
        print("No pretrained Generator")
    if pretrained_D15 == "":
        print("No pretrained Discriminator")
    if version19 == "v1" or sr2 == "40k":
        config_path = "configs/v1/%s.json" % sr2
    else:
        config_path = "configs/v2/%s.json" % sr2
    config_save_path = os.path.join(exp_dir, "config.json")
    if not pathlib.Path(config_save_path).exists():
        with open(config_save_path, "w", encoding="utf-8") as f:
            with open(config_path, "r") as config_file:
                config_data = json.load(config_file)
                json.dump(
                    config_data,
                    f,
                    ensure_ascii=False,
                    indent=4,
                    sort_keys=True,
                )
            f.write("\n")

    cmd = (
        'python infer/modules/train/train.py -e "%s" -sr %s -f0 %s -bs %s -g %s -te %s -se %s %s %s -l %s -c %s -sw %s -v %s'
        % (
            exp_dir1,
            sr2,
            1 if if_f0_3 else 0,
            batch_size12,
            gpus16,
            total_epoch11,
            save_epoch10,
            "-pg %s" % pretrained_G14 if pretrained_G14 != "" else "",
            "-pd %s" % pretrained_D15 if pretrained_D15 != "" else "",
            1 if if_save_latest13 == True else 0,
            1 if if_cache_gpu17 == True else 0,
            1 if if_save_every_weights18 == True else 0,
            version19,
        )
    )
    # Use PIPE to capture the output and error streams
    p = Popen(cmd, shell=True, cwd=now_dir, stdout=PIPE, stderr=STDOUT, bufsize=1, universal_newlines=True)

    # Print the command's output as it runs
    for line in p.stdout:
        print(line.strip())

    # Wait for the process to finish
    p.wait()
    return "训练结束, 您可查看控制台训练日志或实验文件夹下的train.log"
%load_ext tensorboard
%tensorboard --logdir ./logs --port=8888
print("Tensorboard NGROK URL:",end="")
ngrok_url = ngrok.connect(8888)
print(ngrok_url)
try:
    training_log = click_train(
        model_name,
        '40k',
        True,
        0,
        save_frequency,
        epochs,
        12,
        True,
        'assets/pretrained_v2/f0G40k.pth',
        'assets/pretrained_v2/f0D40k.pth',
        0,
        cache,
        True,
        'v2',
    )
    print(training_log)
except:
    ngrok.kill()