In [1]:
# SYSTEM PREP
!pip install -q yt-dlp librosa gcsfs google-cloud-storage torchaudio
!pip install -q demucs==4.0.0 pretty_midi

# Authenticate if running in Colab
from google.colab import auth
auth.authenticate_user()


💬 QC Note: Clear and focused. Consider encapsulating environment setup into a script for reproducibility.

In [2]:
import os
import gcsfs
from google.cloud import storage

# CONFIG – customize this for your project
PROJECT_ID = "rootz-engine"
INPUT_BUCKET = "rootz-engine-input"
OUTPUT_BUCKET = "rootz-engine-output"
TRAINING_BUCKET = "rootz-engine-training"

os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID

# GCS setup
fs = gcsfs.GCSFileSystem(project=PROJECT_ID)
storage_client = storage.Client()
input_bucket = storage_client.bucket(INPUT_BUCKET)
output_bucket = storage_client.bucket(OUTPUT_BUCKET)
training_bucket = storage_client.bucket(TRAINING_BUCKET)

# Local dirs
TEMP_DIR = "/content/temp"
DOWNLOAD_DIR = "/content/mp3"
os.makedirs(TEMP_DIR, exist_ok=True)
os.makedirs(DOWNLOAD_DIR, exist_ok=True)

print("✅ GCS ready, local directories created.")


✅ GCS ready, local directories created.


💬 QC Note: ✅ Functional but must replace placeholder PROJECT_ID. Suggest moving config to .env or a JSON if this will scale.

In [3]:
def download_mp3s():
    blobs = input_bucket.list_blobs()
    for blob in blobs:
        if blob.name.endswith(".mp3"):
            destination = os.path.join(DOWNLOAD_DIR, os.path.basename(blob.name))
            if not os.path.exists(destination):
                print(f"⬇️ Downloading {blob.name} → {destination}")
                blob.download_to_filename(destination)
            else:
                print(f"⏭️ Already downloaded: {destination}")

download_mp3s()


⏭️ Already downloaded: /content/mp3/007 [2S_wq99sbWM].mp3
⏭️ Already downloaded: /content/mp3/12_ Don Carlos - Mr Sun [quvsj04WexE].mp3
⏭️ Already downloaded: /content/mp3/400 Years (1970) - Bob Marley & The Wailers [gCD6AG2yi5A].mp3
⏭️ Already downloaded: /content/mp3/African Herbsman (Dub Version) [3XVdwSntbPU].mp3
⏭️ Already downloaded: /content/mp3/All In One - Original [SXyf4IaBdLA].mp3
⏭️ Already downloaded: /content/mp3/BOB MARLEY THREE LITTLE BIRDS [zaGUr6wzyT8].mp3
⏭️ Already downloaded: /content/mp3/Bam Bam - Sister Nancy [OcaPu9JPenU].mp3
⏭️ Already downloaded: /content/mp3/Bam Bam [BGM0v44Yszk].mp3
⏭️ Already downloaded: /content/mp3/Barrington Levy - Be Strong [UUx1kXGqcvo].mp3
⏭️ Already downloaded: /content/mp3/Barrington Levy - Black Roses [OdhEAl_sI1A].mp3
⏭️ Already downloaded: /content/mp3/Barrington Levy - Murderer [W9mvTNh-plY].mp3
⏭️ Already downloaded: /content/mp3/Barrington Levy--Oh Jah,Can_t You See [ZfN5tTrRi6E].mp3
⏭️ Already downloaded: /content/mp3/Battle 

In [4]:
# 🔢 Cell 4 – Separate Stems Using Demucs (patched to use full path)
import torchaudio
import torch
import soundfile as sf
from demucs.pretrained import get_model
from demucs.apply import apply_model

print("🎛️ Loading Demucs model...")
model = get_model("htdemucs")
model.cpu().eval()

def separate_stems(mp3_path):
    if not os.path.exists(mp3_path):
        raise FileNotFoundError(f"🎧 MP3 file not found: {mp3_path}")

    filename = os.path.basename(mp3_path)
    song_name = os.path.splitext(filename)[0]
    stem_dir = os.path.join(STEMS_BASE_DIR, song_name)

    if os.path.exists(os.path.join(stem_dir, "bass.wav")):
        print(f"⏭️ Skipping {song_name}, stems already exist.")
        return song_name, stem_dir

    print(f"🎚️ Separating stems for: {song_name}")
    waveform, sr = torchaudio.load(mp3_path)
    waveform = torchaudio.functional.resample(waveform, sr, 44100)

    with torch.no_grad():
        sources = apply_model(model, waveform[None], device="cpu")[0]

    os.makedirs(stem_dir, exist_ok=True)
    for source_name, audio in zip(model.sources, sources):
        out_path = os.path.join(stem_dir, f"{source_name}.wav")
        sf.write(out_path, audio.cpu().numpy().T, 44100)
        print(f"✅ Saved {source_name}.wav → {out_path}")

    return song_name, stem_dir


🎛️ Loading Demucs model...


🔎 Notes:

    Accepts an .mp3 file path

    Outputs all Demucs-separated stems into /content/stems/{SONG_NAME}/

    Skips if already processed (idempotent)

In [5]:
# 🔢 Cell 5 – Generate MIDI + Groove JSON

import librosa
import pretty_midi
import json
import numpy as np

def generate_midi_and_groove(stem_dir, song_name):
    bass_path = os.path.join(stem_dir, "bass.wav")
    if not os.path.exists(bass_path):
        raise FileNotFoundError(f"Bass stem not found: {bass_path}")

    print(f"🎼 Generating MIDI + JSON for: {song_name}")

    # Load audio and extract rhythm
    y, sr = librosa.load(bass_path, sr=44100)
    tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
    onset_env = librosa.onset.onset_strength(y=y, sr=sr)
    onsets = librosa.onset.onset_detect(onset_envelope=onset_env, sr=sr)

    # Create MIDI
    midi_path = os.path.join(OUTPUT_DIR, f"{song_name}.mid")
    pm = pretty_midi.PrettyMIDI()
    instrument = pretty_midi.Instrument(program=34)  # Fingered bass
    for onset in onsets:
        start = onset / sr
        note = pretty_midi.Note(velocity=100, pitch=36, start=start, end=start + 0.2)
        instrument.notes.append(note)
    pm.instruments.append(instrument)
    pm.write(midi_path)

    # Create Groove JSON
    json_path = os.path.join(OUTPUT_DIR, f"{song_name}.json")
    groove_data = {
        "song": song_name,
        "tempo": float(tempo),
        "onsets": onsets.tolist()
    }
    with open(json_path, "w") as f:
        json.dump(groove_data, f, indent=2)

    print(f"✅ MIDI saved: {midi_path}")
    print(f"✅ Groove JSON saved: {json_path}")
    return midi_path, json_path


🔎 Notes:

    Reads the separated bass.wav file

    Extracts:

        Tempo

        Onset locations

    Outputs:

        .mid file to /content/output/

        .json groove map to /content/output/

In [6]:
# 🔢 Cell 6 – Upload MIDI + Groove JSON to GCS (with logging + validation)
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

def upload_outputs_to_gcs(midi_path, json_path, song_key, output_bucket):
    if not midi_path or not os.path.exists(midi_path):
        raise FileNotFoundError(f"MIDI file not found: {midi_path}")
    if not json_path or not os.path.exists(json_path):
        raise FileNotFoundError(f"JSON file not found: {json_path}")
    if not output_bucket:
        raise ValueError("Output bucket cannot be None.")

    try:
        midi_blob = output_bucket.blob(f"midi/{song_key}.mid")
        midi_blob.upload_from_filename(midi_path)
        logging.info(f"Uploaded MIDI → GCS: {midi_blob.name}")

        json_blob = output_bucket.blob(f"groove/{song_key}.json")
        json_blob.upload_from_filename(json_path)
        logging.info(f"Uploaded Groove JSON → GCS: {json_blob.name}")

    except Exception as e:
        logging.error(f"Upload failed for {song_key}: {e}")
        raise

🔎 Notes:

    Uploads both files using a clean GCS path structure:

        midi/{SONG_NAME}_bass.mid

        groove/{SONG_NAME}_bass.json

    Assumes output_bucket is already defined (✅ done in Cell 2)

💬 QC Notes:

    ✅ Great structure and use of Demucs.

    💡 Consider setting device="cuda" if GPU available.

    ⚠️ You may want to catch exceptions per file to avoid halting the batch.

UPDATED 10:43 3/29    

In [7]:
# 🔢 Cell 7 – Log Processed & Clean Up (with .npy cleanup)
import csv
import shutil

def is_already_processed(song_name):
    if not os.path.exists(LOG_PATH):
        return False
    with open(LOG_PATH, "r") as f:
        return song_name in f.read()

def mark_as_processed(song_name):
    with open(LOG_PATH, "a", newline='') as f:
        writer = csv.writer(f)
        writer.writerow([song_name])

def cleanup_files(song_name):
    try:
        mp3_path = os.path.join(DOWNLOAD_DIR, f"{song_name}.mp3")
        if os.path.exists(mp3_path):
            os.remove(mp3_path)

        stem_dir = os.path.join(STEMS_BASE_DIR, song_name)
        if os.path.exists(stem_dir):
            shutil.rmtree(stem_dir)

        for instrument in ["bass", "drums"]:
            for ext in ["mid", "json"]:
                file_path = os.path.join(OUTPUT_DIR, f"{song_name}_{instrument}.{ext}")
                if os.path.exists(file_path):
                    os.remove(file_path)

            for npy_file in glob(os.path.join(SPECTRAL_OUTPUT_DIR, f"{song_name}_{instrument}.npy")):
                os.remove(npy_file)

        logging.info(f"🧹 Cleanup complete for: {song_name}")

    except Exception as e:
        logging.error(f"Cleanup failed for {song_name}: {e}")

🔎 Notes:

    ✅ Safe file removal after processing

    🧾 Appends to processed_log.csv to avoid reprocessing

    🔁 Works great with batch runs and re-runs

In [8]:
# 🔢 Cell 8 – Batch Runner (With Logging & Validation)
from glob import glob

mp3_files = glob(os.path.join(DOWNLOAD_DIR, "*.mp3"))
logging.info(f"🎧 Found {len(mp3_files)} MP3(s) to process.")

for mp3_path in mp3_files:
    song_name = os.path.splitext(os.path.basename(mp3_path))[0]

    if is_already_processed(song_name):
        logging.info(f"⏭️  Skipping (already processed): {song_name}")
        continue

    try:
        logging.info(f"🚀 Starting pipeline for: {song_name}")

        _, stem_dir = separate_stems(mp3_path)
        upload_stems_to_training(song_name)

        for instrument in ["bass", "drums"]:
            midi_path, json_path = generate_midi_and_groove(stem_dir, song_name, instrument)
            if midi_path and json_path:
                song_key = f"{song_name}_{instrument}"
                upload_outputs_to_gcs(midi_path, json_path, song_key, output_bucket)

        convert_to_spectral_arrays(song_name)
        upload_spectral_arrays(song_name)

        mark_as_processed(song_name)
        cleanup_files(song_name)

        logging.info(f"✅ Finished: {song_name}")

    except Exception as e:
        logging.error(f"❌ Error processing {song_name}: {e}")

NameError: name 'LOG_PATH' is not defined

🔎 Final Touches:

    Runs all 7 steps in a loop

    Skips already processed files

    Handles exceptions gracefully

    Logs, cleans, uploads, and moves on to the next

In [None]:
# 🔢 Cell 9 – Upload Separated Stems to GCS Training Bucket

def upload_stems_to_training(song_name):
    stem_dir = os.path.join(STEMS_BASE_DIR, song_name)
    if not os.path.exists(stem_dir):
        print(f"⚠️ Stem directory not found for: {song_name}")
        return

    for file in glob(os.path.join(stem_dir, "*.wav")):
        blob_name = f"stems/{song_name}/{os.path.basename(file)}"
        blob = training_bucket.blob(blob_name)
        print(f"⬆️ Uploading {blob_name}")
        blob.upload_from_filename(file)

    print(f"✅ All stems uploaded for: {song_name}")


💬 QC Notes:

    ✅ Works well.

    🛡️ Consider checking if file already exists on GCS before upload.

In [None]:
# 🔢 Cell 11 – Upload Spectrogram Arrays to GCS

def upload_spectral_arrays(song_name):
    for file in glob(os.path.join(SPECTRAL_OUTPUT_DIR, f"{song_name}_*.npy")):
        blob_name = f"spectral/{os.path.basename(file)}"
        blob = training_bucket.blob(blob_name)
        print(f"📤 Uploading {blob_name}")
        blob.upload_from_filename(file)
    print(f"✅ Uploaded spectral arrays for: {song_name}")


In [None]:
# 🔢 Cell 12 – Build Spectral Dataset JSON

import json
import numpy as np
import os

SPECTRAL_DATA_DIR = "spectral_arrays"
SPECTRAL_JSON_DIR = "training_data"
SPECTRAL_JSON_PATH = os.path.join(SPECTRAL_JSON_DIR, "spectral_dataset.json")

os.makedirs(SPECTRAL_JSON_DIR, exist_ok=True)

spectral_data = []

for file in os.listdir(SPECTRAL_DATA_DIR):
    if file.endswith(".npy"):
        data_path = os.path.join(SPECTRAL_DATA_DIR, file)
        spectrogram = np.load(data_path)

        # Flatten spectrogram to 1D
        flattened = spectrogram.flatten().tolist()

        # Extract label from filename (e.g., onedrop_bass.npy → onedrop)
        label = file.split("_")[0]

        spectral_data.append({
            "label": label,
            "filename": file,
            "spectrogram": flattened
        })

with open(SPECTRAL_JSON_PATH, "w") as f:
    json.dump(spectral_data, f)

print(f"✅ Saved spectral dataset JSON → {SPECTRAL_JSON_PATH}")


💬 QC Notes:

    ✅ Good use of CSV log to avoid reprocessing.

    ⚠️ Relies on filename-only logic — could use hash or unique ID for robustness.

    🧹 Cleanup routine assumes specific file naming conventions; may want more flexibility.

In [None]:
# 🔢 Cell 13 – Upload Spectral Dataset JSON to GCS

def upload_spectral_dataset_manifest():
    blob = training_bucket.blob("training_data/spectral_dataset.json")
    blob.upload_from_filename(SPECTRAL_JSON_PATH)
    print(f"☁️ Uploaded spectral_dataset.json to: gs://{TRAINING_BUCKET}/training_data/")

upload_spectral_dataset_manifest()
