In [None]:
!pip install -U yt_dlp;
!huggingface-cli login --token <YOURTOKEN>
!pip install stempeg
!pip install torchsde
!pip install -U demucs;

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load
import torch
print("CUDA availability: ", torch.cuda.is_available())

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import yt_dlp
import concurrent.futures
import csv
import demucs.separate
import logging
# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory
from diffusers import StableAudioPipeline
import torchaudio
import matplotlib.pyplot as plt
import os
import shutil
import stempeg
import soundfile as sf
from torch.utils.data import Dataset, DataLoader
import scipy.io.wavfile as wavfile
# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
def getSongTitle():    
    song_titles = []
    csv_file_path = "/kaggle/input/song-popularity-dataset/song_data.csv"
    # Append 'title' and 'artist' columns from the CSV to song_titles
    try:
        with open(csv_file_path, mode='r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            for row in reader:
                name = row['song_name']
                if len(name.split()) == 1:
                    name = name + " - Music Video"
                song_titles.append(name)
    
    except Exception as e:
        print(f"❌ Error processing CSV file: {e}")
    
    print(f"Number of songs: {len(song_titles)}")
    return song_titles

In [None]:
def download_song(title, ydl_opts):
    query = f"ytsearch1:{title}"
    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([query])
    except Exception as e:
        print(f"❌ Error downloading {title}: {e}")


def downloadBatch(song_title_batch, output_folder):
    ydl_opts = {
        'format': 'bestaudio/best',
        'outtmpl': os.path.join(output_folder, '%(title)s.%(ext)s'),
        'noplaylist': True,
        'quiet': True,
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'wav',
            'preferredquality': '192',
        }],
        'postprocessor_args': [
            '-ar', '44100'  # Set audio sampling rate to 44.1kHz
        ]
    }
    
    for title in song_title_batch:
        download_song(title, ydl_opts)
        # print("Song downloaded from Youtube: ", title)

# Seperate vocal and accompaniment

In [None]:
def seperate(output_folder, device):
    # List all .wav files in the downloads folder
    wav_files = [f for f in os.listdir(output_folder) if f.lower().endswith('.wav')]

    logging.getLogger('demucs').setLevel(logging.CRITICAL)
    
    for wav_file in wav_files:
        wav_path = os.path.join(output_folder, wav_file)
        # print(f"🎶 Separating: {wav_file}")
        demucs.separate.main([
            "--two-stems", "vocals", "--device", device, "-n", "mdx_extra", wav_path
        ])

# Load seperated vocals and accompaniment to Torch Dataset
- Apply chunking to regulate the latent audio length
- Calculate total audio length and compute index
  

In [None]:
# ===========UTILS==========
def clear_directory(base_dir):
    """
    Deletes all files and subdirectories in the given base directory.
    
    Args:
        base_dir (str): Path to the base directory to clear.
    """
    if not os.path.isdir(base_dir):
        raise ValueError(f"{base_dir} is not a valid directory.")

    for entry in os.listdir(base_dir):
        entry_path = os.path.join(base_dir, entry)
        try:
            if os.path.isfile(entry_path) or os.path.islink(entry_path):
                os.unlink(entry_path)  # Remove file or symlink
            elif os.path.isdir(entry_path):
                shutil.rmtree(entry_path)  # Recursively delete folder
        except Exception as e:
            print(f"Failed to delete {entry_path}: {e}")

def get_wav_length(file_path):
    with sf.SoundFile(file_path) as f:
        duration = len(f) / f.samplerate
    # print(duration)
    return duration
    
def load_wav_as_array(path):
    audio, sr = sf.read(path, dtype='float32')
    return audio

def load_vocal_and_accmp(song_dir):
    vocal = load_wav_as_array(os.path.join(song_dir, "vocals.wav"))
    accmp = load_wav_as_array(os.path.join(song_dir, "no_vocals.wav"))
    return vocal, accmp

class PopSepDB(Dataset):
    def __init__(self, root_dir):
        self.song_names = [song_name for song_name in os.listdir(root_dir)]
        self.root_dir = root_dir
        
        chunk_per_song = []
        for song in self.song_names:
            vocal_path = os.path.join(root_dir, song, "vocals.wav")
            dur = get_wav_length(vocal_path)
            chunk_per_song.append(int(dur/48))

        self.total_chunks = sum(chunk_per_song)
        self.cumchunk = np.cumsum(chunk_per_song)
        print(self.cumchunk)

    def __len__(self):
        return self.total_chunks

    def __getitem__(self, idx):
        song, chunk = self.compute_idx(idx)
        print("Song: ", song, " Chunk: ",chunk)
        
        song_name = self.song_names[song]
        song_dir = os.path.join(self.root_dir, song_name)

        vocal, accompaniment = load_vocal_and_accmp(song_dir)

        vocal = vocal.transpose(1, 0)
        accompaniment = accompaniment.transpose(1, 0)
        
        start = 2097152 * chunk
        
        # Returns array with shape (2, 2097152)
        return vocal[:, start:start+2097152], accompaniment[:, start:start+2097152]


    def compute_idx(self, idx):
        for i in range(len(self.cumchunk)):
            if idx < self.cumchunk[i]:
                return(i, (idx-self.cumchunk[i-1]) if i>0 else idx)

# Encode the song with autoencoder

In [None]:
def load_encoder(device):
    #login(new_session=False)
    
    REPO = "stabilityai/stable-audio-open-1.0"
    dtype = torch.float32

    pipe = StableAudioPipeline.from_pretrained(REPO, torch_dtype=dtype)
    autoencoder = pipe.vae.to(device)

    print("Autoencoder loaded")
    return autoencoder

In [None]:
def encode(latent_vocals, latent_accomp, dataloader, autoencoder, device="cpu"):
    # print("encoding stems...")
    for i, batch in enumerate(dataloader):
        
        vocals, accompaniment = batch # expand first dimension to match expected input shape
        
        vocals = vocals.to(device).to(torch.float32)
        accompaniment = accompaniment.to(device).to(torch.float32)
    
        # Encode the vocals and accompaniment using the autoencoder
        with torch.no_grad():
            encoded_vocals = autoencoder.encode(vocals).latent_dist.mode()
            encoded_accompaniment = autoencoder.encode(accompaniment).latent_dist.mode()
    
        if torch.isnan(encoded_vocals).any():
            print("===============NaN in vocals==============")
            continue
        
        # Print shapes of the encoded outputs
        # print(f"Encoded Vocals Shape: {encoded_vocals.shape}") # (BATCH, 64, 1024)
        # print(f"Encoded Accompaniment Shape: {encoded_accompaniment.shape}") # (BATCH, 64, 1024)
    
        latent_vocals.extend(encoded_vocals.cpu().numpy())
        latent_accomp.extend(encoded_accompaniment.cpu().numpy())

# Putting it together

In [None]:
# Ensure the folder exists
output_folder = "downloads"
os.makedirs(output_folder, exist_ok=True)

device = "cuda" if torch.cuda.is_available else "cpu"
BATCHSIZE = 1
SONGBATCH = 16

song_titles = getSongTitle()
autoencoder = load_encoder(device)

In [None]:
latent_vocals = []
latent_accomp = []

In [None]:
# for i in range(len(song_titles) // SONGBATCH - 1):
for i in range(2):

    start = i * SONGBATCH
    end = (i + 1) * SONGBATCH
    
    # Download wav file from youtube
    downloadBatch(song_titles[start:end], output_folder)
    
    # Seperate with Demucs
    seperate(output_folder, device)
    
    dataset = PopSepDB("/kaggle/working/separated/mdx_extra")
    dataloader = DataLoader(dataset, batch_size=BATCHSIZE, num_workers=2, pin_memory=True)
    
    encode(latent_vocals, latent_accomp, dataloader, autoencoder, device=device)
    # Convert to numpy array and print shape
    np.save('latent_vocals.npy', latent_vocals) #  (N, 64, 1024)
    np.save('latent_accomp.npy', latent_accomp) #  (N, 64, 1024)

    clear_directory("/kaggle/working/downloads")
    clear_directory("/kaggle/working/separated")

    print(f"Encode Completed - Batch: {i} | Song: {end} | latent vector shape: {len(latent_vocals)} and {len(latent_accomp)}")

In [None]:
"""
kaggle/working/
├── latent_vocals.npy [~1.3MB per songs]
├── latent_accomp.npy
│
├── downloads/
│   ├── Bohemian Rhapsody Queen.wav
│   ├── Blinding Lights The Weeknd.wav
│   ├── Imagine John Lennon.wav  
│   │
│
└── separated/
    └── mdx_extra/
        ├── track1/
        │   ├── vocals.wav
        │   └── accompaniment.wav
        ├── track2/
        │   ├── vocals.wav
        │   └── no_vocals.wav
"""