## API Project: Source Separation of Piano Concertos

In [None]:
import os
import numpy as np
import librosa
import soundfile as sf
import pandas as pd
from tqdm import tqdm
import mir_eval
from spleeter.__main__ import separate
from glob import glob
from typing import Dict, Tuple
from spleeter.audio import Codec

SAMPLE_RATE = 22050
MUS_DIR = "metadata" # where you put the music metadata
OUTPUT_DIR = "data" # where you want to put the output dataset
CONFIG_DIR = "configs" # where you put the configuration files
N_TRAIN = 1200
N_VAL = 10

### 1. Processing the dataset for pretraining.

The directory containing the metadata must be in the form ".../{Composer}/{Piano/Orchestra}/{audiofiles}".

In [None]:
def trim_audio(wave_path, duration_sec = 20):
    """
    Trim the audio wave into 20-sec chunks.
    """
    clips = []
    waveform, _ = librosa.load(wave_path, sr = SAMPLE_RATE)
    total_samples = waveform.shape[0]
    num_clips = total_samples // (duration_sec * SAMPLE_RATE)

    for i in range(num_clips):
        start_sample = i * duration_sec * SAMPLE_RATE
        end_sample = start_sample + duration_sec * SAMPLE_RATE
        clip_waveform = waveform[start_sample:end_sample]
        clips.append(clip_waveform)
    return clips

def mix_audio(piano_waves, orch_waves, composer, train, validation, dur = 20.0):
    """
    Randomly mixing the piano and orchestra audio waves.
    piano_waves: waveform of piano, type: numpy array
    orch_waves: waveform of orchestra, must be the same shape as piano_waves
    composer: composer, type: string
    train: a pandas dataframe recording the training set
    validation: a pandas dataframe recording the validation set
    dur: duration of piano_waves and orch_waves in seconds
    """
    for i in tqdm(range(N_TRAIN + N_VAL)):
      piano_idx = np.random.randint(low = 0, high = len(piano_waves))
      orch_idx = np.random.randint(low = 0, high = len(orch_waves))
      piano_sample = piano_waves[piano_idx]
      orch_sample = orch_waves[orch_idx]
      mix_sample = (piano_sample + orch_sample) / 2
      if i < N_TRAIN:
        save_path = os.path.join(OUTPUT_DIR, "train", composer + "_" + str(i))
        train.loc[len(train)] = {
          "mix_path": os.path.join(save_path, "mix.wav"),
          "piano_path": os.path.join(save_path, "piano.wav"),
          "orchestra_path": os.path.join(save_path, "orchestra.wav"),
          "duration": dur
      }
      else :
        save_path = os.path.join(OUTPUT_DIR, "val", composer + "_" + str(i))
        validation.loc[len(validation)] = {
          "mix_path": os.path.join(save_path, "mix.wav"),
          "piano_path": os.path.join(save_path, "piano.wav"),
          "orchestra_path": os.path.join(save_path, "orchestra.wav"),
          "duration": dur
        }
      os.makedirs(save_path, exist_ok = True)
      sf.write(os.path.join(save_path, "piano.wav"), piano_sample, SAMPLE_RATE)
      sf.write(os.path.join(save_path, "orchestra.wav"), orch_sample, SAMPLE_RATE)
      sf.write(os.path.join(save_path, "mix.wav"), mix_sample, SAMPLE_RATE)

    return train, validation

In [None]:
def make_dataset(train_name, val_name):
    """
    Mix samples and write Spleeter-required csv files for training and testing set.
    """
    train = pd.DataFrame(columns = ["mix_path", "piano_path", "orchestra_path", "duration"])
    validation = pd.DataFrame(columns = ["mix_path", "piano_path", "orchestra_path", "duration"])

    for composer in os.listdir(MUS_DIR):
      if composer[0] == ".":
        continue
      piano_path = os.path.join(MUS_DIR, composer, "Piano")
      orch_path = os.path.join(MUS_DIR, composer, "Orchestra")
      piano_waves = []
      orch_waves = []
      for piece in os.listdir(piano_path):
        if not piece[0] == ".":
          waves = trim_audio(os.path.join(piano_path, piece))
          piano_waves.extend(waves)
      for piece in os.listdir(orch_path):
        if not piece[0] == ".":
          waves = trim_audio(os.path.join(orch_path, piece))
          orch_waves.extend(waves)
      train, validation = mix_audio(piano_waves, orch_waves, composer = composer, train = train, validation = validation)


    os.makedirs(CONFIG_DIR, exist_ok = True)
    train.to_csv(os.path.join(CONFIG_DIR, train_name), index = False)
    validation.to_csv(os.path.join(CONFIG_DIR, val_name), index = False)


make_dataset("train.csv", "val.csv")

### 2. Creating TTA data for each test recording.

In [None]:
def trim_and_concatenate(wave_time):
    """
    Trim the piano-only and orchestra-only passages from the test recording.
    wave_time: {filename: [[start_1, end_1], [start_2, end_2], ...]}
    """
    clips = []
    concatenated = []
    for wave_path, time_list in wave_time.items():
      waveform, _ = librosa.load(wave_path, sr = SAMPLE_RATE)

      for time in time_list:
          begin, end = time
          begin_sample = begin * SAMPLE_RATE
          end_sample = end * SAMPLE_RATE
          clip_waveform = waveform[begin_sample:end_sample]
          clips.append(clip_waveform)
        
      concatenated.append(np.concatenate(clips, axis=0))
    
    return np.concatenate(concatenated, axis=0)

piano_wave_time = {
   "BeethovenOp73.mp3":[[0, 100], [100, 200]]
}

orchestra_wave_time = {
   "BeethovenOp73.mp3":[[200, 300]]
}


piano_only = trim_audio(trim_and_concatenate(piano_wave_time))
orchestra_only = trim_audio(trim_and_concatenate(orchestra_wave_time))

train = pd.DataFrame(columns = ["mix_path", "piano_path", "orchestra_path", "duration"])
validation = pd.DataFrame(columns = ["mix_path", "piano_path", "orchestra_path", "duration"])

train, validation = mix_audio(piano_only, orchestra_only, n_train = 100, n_val = 5, piece_name = "BeethovenOp73", train = train, validation = validation)

In [5]:
EVALUATION_SPLIT: str = "val"
EVALUATION_METRICS_DIRECTORY: str = "metrics"
EVALUATION_INSTRUMENTS: Tuple[str, ...] = ("piano", "orchestra")
EVALUATION_MIXTURE: str = "mix.wav"
EVALUATION_AUDIO_DIRECTORY: str = "audio"


def calculate_sdr(reference_audio_path, estimation_audio_path):
    """
    SDR calculation using mir_eval package.
    """
    reference, _ = librosa.load(reference_audio_path, sr=None)
    estimation, _ = librosa.load(estimation_audio_path, sr=None)
    sdr, _, _, _ = mir_eval.separation.bss_eval_sources(reference[None, :], estimation[None, :], compute_permutation=False)
    return sdr[0]

def evaluate(
    output_path: str,
    params_filename: str,
    mus_dir: str,
    verbose: bool = 0,
) -> Dict:
    """
    Evaluate a model on the test recording and print out the SDR values for piano and orchestra.
    """
    songs = glob(os.path.join(mus_dir, EVALUATION_SPLIT, "*/"))
    mixtures = [os.path.join(song, EVALUATION_MIXTURE) for song in songs]
    audio_output_directory = os.path.join(output_path, EVALUATION_SPLIT)
    separate(
        deprecated_files=None,
        files=mixtures,
        adapter="spleeter.audio.ffmpeg.FFMPEGProcessAudioAdapter",
        bitrate="128k",
        codec=Codec.WAV,
        duration=600.0,
        offset=0,
        output_path=audio_output_directory,
        filename_format="{foldername}/{instrument}.{codec}",
        params_filename=params_filename,
        mwf=False,
        verbose=verbose,
    )
    

    for instrument in EVALUATION_INSTRUMENTS:
        sdr_values = []
        for piece in os.listdir(os.path.join(mus_dir, EVALUATION_SPLIT)):
            if piece[0] == ".":
                continue
            est_path = os.path.join(audio_output_directory, piece, f"{instrument}.wav")
            ref_path = os.path.join(mus_dir, EVALUATION_SPLIT, piece, f"{instrument}.wav")
            sdr_values.append(calculate_sdr(ref_path, est_path))
        print(f"{instrument}_sdr: {np.median(sdr_values)}")


### 3. Training and Evaluation

#### 3.1 Pretraining the model

In [None]:
!spleeter train -p configs/base_config.json -d data --verbose

#### 3.2 Test-time adaptation

Each time, modify "train_csv", "validation_csv" and "model_dir" in base_config_{piece}.json and base_config_{piece}_test.json and put them under the configs directory. Then copy and rename the folder of pretrained model parameters and start training.

In [None]:
!cp -r pretrained B73
!spleeter train -p configs/base_config_B73.json -d /home/featurize/data --verbose

#### 3.3 Evaluation

In [None]:
evaluate(output_path = "audio", params_filename = "configs/base_config.json", mus_dir = "data/BeethovenOp73")

In [None]:
evaluate(output_path = "audio", params_filename = "configs/base_config_B73_test.json", mus_dir = "data/BeethovenOp73")