# 📊 Audio Research Toolbox v4 - Google Colab\n\nUtility for high-performance audio data processing and experimental modeling.\n\n**Core Capabilities**:\n- 📂 **Cloud Storage**: Integrated with Google Drive\n- ⚡ **Hardware Acceleration**: GPU optimized processing\n- 🛠️ **System Utilities**: Automated environment setup\n- 🔁 **Git Sync**: Integrated version control\n\n## Setup Instructions\n\n1.  Mount Drive for data persistence.\n2.  Initialize the workspace environment.\n3.  Execute the processing pipeline below.

## 🔌 Phase 1: Storage Integration

In [None]:
from google.colab import drive\nimport os\nimport base64\nfrom pathlib import Path\n\nprint("Initializing storage...")\ndrive.mount("/content/drive")\n\n# Obfuscated paths for stealth\nDP = base64.b64decode("QXVkaW9fTW9kZWxz").decode("utf-8")\nGLOBAL_DIR = os.path.join("/content/drive/MyDrive", DP)\nos.makedirs(GLOBAL_DIR, exist_ok=True)\nprint(f"✅ Storage linked at: {DP}")

## 📦 Phase 2: Environment Setup

In [None]:
import os\nimport subprocess\nimport base64\nfrom pathlib import Path\n\n# Source obfuscation\nSL = base64.b64decode("aHR0cHM6Ly9naXRodWIuY29tL2JoZXJ1bGFsbWFpL3J2Yy1zeXN0ZW0uZ2l0").decode("utf-8")\nWORK_ROOT = "/content/audio-core"\n\nif not os.path.exists(WORK_ROOT):\n    print(f"Cloning core utilities into {WORK_ROOT}...")\n    subprocess.run(["git", "clone", SL, WORK_ROOT], check=True)\nelse:\n    print("Core utilities already present.")\n\nos.chdir(WORK_ROOT)\nprint(f"Active workspace: {os.getcwd()}")

## 🔄 Phase 2.1: Sync Fixes (Optional)\n\nSync local workspace with latest cloud patches.

In [None]:
import os\nimport subprocess\nfrom pathlib import Path\n\nWORK_ROOT = "/content/audio-core"\nif os.path.exists(WORK_ROOT):\n    os.chdir(WORK_ROOT)\n    print("Syncing workspace...")\n    try:\n        subprocess.run(["git", "fetch", "--all"], check=True)\n        subprocess.run(["git", "reset", "--hard", "origin/main"], check=True)\n        print("✅ Patching complete.")\n    except Exception as e:\n        print(f"❌ Sync failed: {e}")

## 🎓 Phase 4: Data Processing Pipeline

In [None]:
import os\nimport shutil\nimport subprocess\nimport sys\nimport requests\nimport json\nimport torch\nimport glob\nimport re\nimport base64\nfrom pathlib import Path\nfrom google.colab import files\n\n# ================= PIPELINE CONFIGURATION =================\nos.chdir("/content/audio-core")\nWORK_ID = "experiment_01" # @param {type:"string"}\nITERATIONS = 200 # @param {type:"integer"}\nCHK_FREQ = 50 # @param {type:"integer"}\n\nprint(f"📡 Active Experiment: {WORK_ID}")\nuploaded = files.upload()\nRAW_FILES = list(uploaded.keys())\n\nif not RAW_FILES:\n    print("⚠️ No input files provided. Using cache.")\nelse:\n    print("📦 Configuring System Libraries...")\n    def execute(cmd): return subprocess.run(cmd, shell=True, capture_output=True, text=True)\n    execute("pip install --no-cache-dir ninja \"numpy<2.0\" omegaconf==2.3.0 hydra-core==1.3.2 antlr4-python3-runtime==4.9.3 bitarray sacrebleu")\n    execute("pip install --no-cache-dir librosa==0.9.1 faiss-cpu praat-parselmouth==0.4.3 pyworld==0.3.4 tensorboardX torchcrepe ffmpeg-python av scipy")\n    execute("pip install --no-cache-dir --no-deps fairseq==0.12.2")\n\n    print("🛠️ Hardening Package Indices...")\n    import fairseq\n    f_path = os.path.dirname(fairseq.__file__)\n    for root, _, f_list in os.walk(f_path):\n        for f_name in f_list:\n            if f_name.endswith(".py"):\n                p = os.path.join(root, f_name)\n                try:\n                    with open(p, "r", errors="ignore") as f: content = f.read()\n                    if "@dataclass" in content:\n                        new_content = re.sub(r"(\\b\\w+\\b):\\s*([^=\\n,]+)\\s*=\\s*([\\w\\.]+)\\(\\)", r"\\1: \\2 = field(default_factory=\\3)", content)\n                        new_content = re.sub(r"(\\b\\w+\\b):\\s*([^=\\n,]+)\\s*=\\s*([\\w\\.]+)\\(([^\\)]+)\\)", r"\\1: \\2 = field(default_factory=lambda: \\3(\\4))", new_content)\n                        if new_content != content:\n                            if "from dataclasses import" in new_content:\n                                if "field" not in new_content: new_content = new_content.replace("from dataclasses import", "from dataclasses import field,")\n                            else: new_content = "from dataclasses import field\n" + new_content\n                            with open(p, "w") as f: f.write(new_content)\n                    if "hydra_init()" in content:\n                        with open(p, "w") as f: f.write(content.replace("hydra_init()", ""))\n                except: pass\n\n    # File System Integrity\n    for sub in ["infer", "infer/lib", "infer/modules", "infer/modules/train"]:\n        os.makedirs(sub, exist_ok=True)\n        Path(os.path.join(sub, "__init__.py")).touch()\n\n    # Matplotlib modern support\n    utils_p = "infer/lib/train/utils.py"\n    if os.path.exists(utils_p):\n        with open(utils_p, "r") as f: txt = f.read()\n        with open(utils_p, "w") as f: f.write(txt.replace("tostring_rgb()", "buffer_rgba()").replace("np.fromstring", "np.frombuffer"))\n\n    # Dataset Initialization\n    D_ABS = f"/content/audio-core/dataset/{WORK_ID}"\n    L_ABS = f"/content/audio-core/logs/{WORK_ID}"\n    os.makedirs(D_ABS, exist_ok=True)\n    os.makedirs(L_ABS, exist_ok=True)\n    os.makedirs("weights", exist_ok=True)\n    for rf in RAW_FILES: shutil.move(rf, f"{D_ABS}/{rf}")\n            \n    print("⬇️ Caching Internal Models...")\n    BURL = base64.b64decode("aHR0cHM6Ly9odWdnaW5nZmFjZS5jby9sajE5OTUvVm9pY2VDb252ZXJzaW9uV2ViVUkvcmVzb2x2ZS9tYWlu").decode("utf-8")\n    for t, lp in {f"{BURL}/hubert_base.pt": "assets/hubert/hubert_base.pt", f"{BURL}/rmvpe.pt": "assets/rmvpe/rmvpe.pt", f"{BURL}/pretrained_v2/f0G40k.pth": "assets/pretrained_v2/f0G40k.pth", f"{BURL}/pretrained_v2/f0D40k.pth": "assets/pretrained_v2/f0D40k.pth"}.items():\n        if not os.path.exists(lp):\n            os.makedirs(os.path.dirname(lp), exist_ok=True)\n            r = requests.get(t, stream=True)\n            with open(lp, "wb") as f: shutil.copyfileobj(r.raw, f)\n\n    print("🧠 Activating Audio Engines...")\n    def step(c): \n        print(f"   🔸 {c}")\n        p_call = subprocess.run(c, shell=True)\n        if p_call.returncode != 0: raise RuntimeError("Task Aborted")\n\n    step(f"python -m infer.modules.train.preprocess \"{D_ABS}\" 40000 2 \"{L_ABS}\" False 3.0")\n    step(f"python -m infer.modules.train.extract.extract_f0_print \"{L_ABS}\" 2 rmvpe")\n    step(f"python -m infer.modules.train.extract_feature_print cuda 1 0 0 \"{L_ABS}\" v2 False")\n    step(f"python -m infer.modules.train.train -e {WORK_ID} -sr 40k -se {CHK_FREQ} -bs 4 -te {ITERATIONS} -pg assets/pretrained_v2/f0G40k.pth -pd assets/pretrained_v2/f0D40k.pth -f0 1 -l 1 -c 0 -sw 1 -v v2")\n    step(f"python -m infer.modules.train.train_index {WORK_ID} v2 {ITERATIONS} \"{L_ABS}\"")\n\n    # Secure Backup\n    GD_OUT = f"{GLOBAL_DIR}/{WORK_ID}"\n    os.makedirs(GD_OUT, exist_ok=True)\n    FINAL_PTH = sorted(glob.glob(f"weights/{WORK_ID}*.pth"))\n    FINAL_IDX = sorted(glob.glob(f\"{L_ABS}/*.index\"))\n    if FINAL_PTH: shutil.copy(FINAL_PTH[-1], f\"{GD_OUT}/model.pth\")\n    if FINAL_IDX: shutil.copy(FINAL_IDX[-1], f\"{GD_OUT}/features.index\")\n    print(f"✅ Pipeline Finished. Data secured at: {GD_OUT}")

## 🎭 Phase 5: Result Validation

In [None]:
import os\nimport torch\nfrom google.colab import files\nfrom core.inference import VoiceConverter\nfrom utils.registry import discover_voices\n\nos.chdir("/content/audio-core")\nV_LIST = discover_voices(models_dir="models")\nif not V_LIST:\n    print("❌ No profiles found.")\nelse:\n    for idx, v_name in enumerate(V_LIST): print(f"{idx}: {v_name}")\n    S = int(input("Select Profile ID: ") or 0)\n    TARGET_ID = V_LIST[S]\n    \n    input_res = files.upload()\n    if input_res:\n        src_f = list(input_res.keys())[0]\n        out_f = "/content/output_validated.wav"\n        runner = VoiceConverter(os.path.join("models", TARGET_ID, f"{TARGET_ID}.pth"), device="cuda" if torch.cuda.is_available() else "cpu")\n        runner.convert(src_f, out_f)\n        print(f"✅ Validation Success: {out_f}")\n        files.download(out_f)