# Proyecto Final de Deep Learning: Desenvolviendo el Sonido en la Universidad del Valle de Guatemala

> Este trabajo se basa en distintos proyectos de separación de audio, como por ejemplo el proyecto [Music Source Separation](https://github.com/andabi/music-source-separation) desarrollado durante el [Jeju Machine Learning Camp 2017](http://mlcampjeju.kakao.com). Sin embargo, han servido como base y han sido extensamente modificados y mejorados como parte del proyecto final para la Universidad del Valle de Guatemala por Ale Gómez, Michy Solano, Andrea Lam, Chris García, Gabo Vicente y Rodri Barrera.

## Introducción 🎵

La separación de fuentes musicales es una tarea esencial en el procesamiento de señales de audio, que se centra en separar diferentes componentes de una canción, como la voz y los instrumentos. Este proyecto busca mejorar la arquitectura y la eficacia del modelo inicial propuesto en el repositorio base, explorando técnicas avanzadas en redes neuronales y procesamiento de señales.


### Comparativas con Herramientas Existentes:

- Comparación de rendimiento con herramientas existentes como Splitter AI, validando las mejoras implementadas y proporcionando un benchmark sobre el estado del arte.

## Evaluación y Métricas 📊

- Utilización de métricas estándar en la tarea de separación de fuentes como SDR, SIR y SAR, además de otras métricas relevantes como la precisión y la recall en la detección de componentes vocales e instrumentales.
- Documentación meticulosa de los resultados obtenidos, incluyendo visualizaciones de espectrogramas y comparativas cualitativas.




### Base del modelo:

- 3 capas RNN
- 2 capas Dense

----------


Instrucciones de uso:

Agregar paths correctamente en la sección de "Configuración"
Correr el notebook

In [None]:

from __future__ import division

import numpy as np


class Diff(object):
    def __init__(self, v=0.0):
        self.value = v
        self.diff = 0.0

    def update(self, v):
        if self.value:
            diff = v / self.value - 1
            self.diff = diff
        self.value = v


def shape(tensor):
    s = tensor.get_shape()
    return tuple([s[i].value for i in range(0, len(s))])


def pretty_list(list):
    return ", ".join(list)


def pretty_dict(dict):
    return "\n".join("{} : {}".format(k, v) for k, v in dict.items())


def closest_power_of_two(target):
    if target > 1:
        for i in range(1, int(target)):
            if 2**i >= target:
                pwr = 2**i
                break
        if abs(pwr - target) < abs(pwr / 2 - target):
            return pwr
        else:
            return int(pwr / 2)
    else:
        return 1


# Write the nd array to txtfile
def nd_array_to_txt(filename, data):
    path = filename + ".txt"
    file = open(path, "w")
    with file as outfile:
        # I'm writing a header here just for the sake of readability
        # Any line starting with "#" will be ignored by numpy.loadtxt
        outfile.write("# Array shape: {0}\n".format(data.shape))

        # Iterating through a ndimensional array produces slices along
        # the last axis. This is equivalent to data[i,:,:] in this case
        for data_slice in data:

            # The formatting string indicates that I'm writing out
            # the values in left-justified columns 7 characters in width
            # with 2 decimal places.
            np.savetxt(outfile, data_slice, fmt="%-7.2f")

            # Writing out a break to indicate different slices...
            outfile.write("# New slice\n")


### Configuración

In [None]:
import tensorflow as tf

class ModelConfig:
    SR = 16000  # Sample Rate
    L_FRAME = 1024  # default 1024
    L_HOP = closest_power_of_two(L_FRAME / 4)
    SEQ_LEN = 4
    # For Melspectogram
    N_MELS = 512
    F_MIN = 0.0


# Train
class TrainConfig:
    CASE = str(ModelConfig.SEQ_LEN) + "frames_ikala"
    CKPT_PATH = "checkpoints/" + CASE
    GRAPH_PATH = "graphs/" + CASE + "/train"
    DATA_PATH = "dataset/train/ikala"
    LR = 0.0001
    FINAL_STEP = 100000
    CKPT_STEP = 500
    NUM_WAVFILE = 1
    SECONDS = 8.192  # To get 512,512 in melspecto
    RE_TRAIN = True
    session_conf = tf.ConfigProto(
        device_count={"CPU": 1, "GPU": 1},
        gpu_options=tf.GPUOptions(
            allow_growth=True, per_process_gpu_memory_fraction=0.25
        ),
    )



class EvalConfig:
    # CASE = '1frame'
    # CASE = '4-frames-masking-layer'
    CASE = str(ModelConfig.SEQ_LEN) + "frames_ikala"
    CKPT_PATH = "checkpoints/" + CASE
    GRAPH_PATH = "graphs/" + CASE + "/eval"
    DATA_PATH = "dataset/eval/kpop"
    # DATA_PATH = 'dataset/mir-1k/Wavfile'
    # DATA_PATH = 'dataset/ikala'
    GRIFFIN_LIM = False
    GRIFFIN_LIM_ITER = 1000
    NUM_EVAL = 9
    SECONDS = 60
    RE_EVAL = True
    EVAL_METRIC = False
    WRITE_RESULT = True
    RESULT_PATH = "results/" + CASE
    session_conf = tf.ConfigProto(
        device_count={"CPU": 1, "GPU": 1},
        gpu_options=tf.GPUOptions(allow_growth=True),
        log_device_placement=False,
    )


### Modelo

In [None]:
# -*- coding: utf-8 -*-
# !/usr/bin/env python
"""
By Dabi Ahn. andabi412@gmail.com.
https://www.github.com/andabi

Modificaciones por Grupo 5 - Proyecto Final Deep Learning
UVG - 2023
"""

from __future__ import division
import tensorflow as tf
from tensorflow.contrib.rnn import GRUCell, MultiRNNCell
import os
import numpy as np


class Model:
    def __init__(self, n_rnn_layer=3, hidden_size=256):

        # Input, Output
        self.x_mixed = tf.placeholder(
            tf.float32, shape=(None, None, ModelConfig.L_FRAME // 2 + 1), name="x_mixed"
        )
        self.y_src1 = tf.placeholder(
            tf.float32, shape=(None, None, ModelConfig.L_FRAME // 2 + 1), name="y_src1"
        )
        self.y_src2 = tf.placeholder(
            tf.float32, shape=(None, None, ModelConfig.L_FRAME // 2 + 1), name="y_src2"
        )

        # Network
        self.hidden_size = hidden_size
        self.n_layer = n_rnn_layer
        self.net = tf.make_template("net", self._net)
        self()

    def __call__(self):
        return self.net()

    def _net(self):
        # RNN and dense layers
        rnn_layer = MultiRNNCell(
            [GRUCell(self.hidden_size) for _ in range(self.n_layer)]
        )
        output_rnn, rnn_state = tf.nn.dynamic_rnn(
            rnn_layer, self.x_mixed, dtype=tf.float32
        )
        input_size = shape(self.x_mixed)[2]
        y_hat_src1 = tf.layers.dense(
            inputs=output_rnn,
            units=input_size,
            activation=tf.nn.relu,
            name="y_hat_src1",
        )
        y_hat_src2 = tf.layers.dense(
            inputs=output_rnn,
            units=input_size,
            activation=tf.nn.relu,
            name="y_hat_src2",
        )

        # time-freq masking layer
        y_tilde_src1 = (
            y_hat_src1 / (y_hat_src1 + y_hat_src2 + np.finfo(float).eps) * self.x_mixed
        )
        y_tilde_src2 = (
            y_hat_src2 / (y_hat_src1 + y_hat_src2 + np.finfo(float).eps) * self.x_mixed
        )

        return y_tilde_src1, y_tilde_src2

    def loss(self):
        pred_y_src1, pred_y_src2 = self()
        return tf.reduce_mean(
            tf.square(self.y_src1 - pred_y_src1) + tf.square(self.y_src2 - pred_y_src2),
            name="loss",
        )

    @staticmethod
    # shape = (batch_size, n_freq, n_frames) => (batch_size, n_frames, n_freq)
    def spec_to_batch(src):
        num_wavs, freq, n_frames = src.shape

        # Padding
        pad_len = 0
        if n_frames % ModelConfig.SEQ_LEN > 0:
            pad_len = ModelConfig.SEQ_LEN - (n_frames % ModelConfig.SEQ_LEN)
        pad_width = ((0, 0), (0, 0), (0, pad_len))
        padded_src = np.pad(
            src, pad_width=pad_width, mode="constant", constant_values=0
        )

        assert padded_src.shape[-1] % ModelConfig.SEQ_LEN == 0

        batch = np.reshape(
            padded_src.transpose(0, 2, 1), (-1, ModelConfig.SEQ_LEN, freq)
        )
        return batch, padded_src

    @staticmethod
    def batch_to_spec(src, num_wav):
        # shape = (batch_size, n_frames, n_freq) => (batch_size, n_freq, n_frames)
        batch_size, seq_len, freq = src.shape
        src = np.reshape(src, (num_wav, -1, freq))
        src = src.transpose(0, 2, 1)
        return src

    @staticmethod
    def load_state(sess, ckpt_path):
        ckpt = tf.train.get_checkpoint_state(os.path.dirname(ckpt_path + "/checkpoint"))
        if ckpt and ckpt.model_checkpoint_path:
            tf.train.Saver().restore(sess, ckpt.model_checkpoint_path)


### Preprocesamiento

In [None]:
import librosa
import numpy as np
import soundfile as sf

# Batch considered
def get_random_wav(filenames, sec, sr=ModelConfig.SR):
    # load wav -> pad if necessary to fit sr*sec -> get random samples with len = sr*sec -> map = do this for all in filenames -> put in np.array
    src1_src2 = np.array(
        list(
            map(
                lambda f: _sample_range(
                    _pad_wav(librosa.load(f, sr=sr, mono=False)[0], sr, sec), sr, sec
                ),
                filenames,
            )
        )
    )
    mixed = np.array(list(map(lambda f: librosa.to_mono(f), src1_src2)))
    src1, src2 = src1_src2[:, 0], src1_src2[:, 1]
    return mixed, src1, src2


# Batch considered
def to_spectrogram(wav, len_frame=ModelConfig.L_FRAME, len_hop=ModelConfig.L_HOP):
    return np.array(
        list(map(lambda w: librosa.stft(w, n_fft=len_frame, hop_length=len_hop), wav))
    )


# Batch considered
def to_wav(mag, phase, len_hop=ModelConfig.L_HOP):
    stft_matrix = get_stft_matrix(mag, phase)
    return np.array(
        list(map(lambda s: librosa.istft(s, hop_length=len_hop), stft_matrix))
    )


# Batch considered
def to_wav_from_spec(stft_maxrix, len_hop=ModelConfig.L_HOP):
    return np.array(
        list(map(lambda s: librosa.istft(s, hop_length=len_hop), stft_maxrix))
    )


# Batch considered
def to_wav_mag_only(
    mag,
    init_phase,
    len_frame=ModelConfig.L_FRAME,
    len_hop=ModelConfig.L_HOP,
    num_iters=50,
):
    # return np.array(list(map(lambda m_p: griffin_lim(m, len_frame, len_hop, num_iters=num_iters, phase_angle=p)[0], list(zip(mag, init_phase))[1])))
    return np.array(
        list(
            map(
                lambda m: lambda p: griffin_lim(
                    m, len_frame, len_hop, num_iters=num_iters, phase_angle=p
                ),
                list(zip(mag, init_phase))[1],
            )
        )
    )


# Batch considered
def get_magnitude(stft_matrixes):
    return np.abs(stft_matrixes)


# Batch considered
def get_phase(stft_maxtrixes):
    return np.angle(stft_maxtrixes)


# Batch considered
def get_stft_matrix(magnitudes, phases):
    return magnitudes * np.exp(1.0j * phases)


# Batch considered
def soft_time_freq_mask(target_src, remaining_src):
    mask = np.abs(target_src) / (
        np.abs(target_src) + np.abs(remaining_src) + np.finfo(float).eps
    )
    return mask


# Batch considered
def hard_time_freq_mask(target_src, remaining_src):
    mask = np.where(target_src > remaining_src, 1.0, 0.0)
    return mask


def write_wav(data, path, sr=ModelConfig.SR, format="wav", subtype="PCM_16"):
    sf.write("{}.wav".format(path), data, sr, format=format, subtype=subtype)


def griffin_lim(mag, len_frame, len_hop, num_iters, phase_angle=None, length=None):
    assert num_iters > 0
    if phase_angle is None:
        phase_angle = np.pi * np.random.rand(*mag.shape)
    spec = get_stft_matrix(mag, phase_angle)
    for i in range(num_iters):
        wav = librosa.istft(
            spec, win_length=len_frame, hop_length=len_hop, length=length
        )
        if i != num_iters - 1:
            spec = librosa.stft(
                wav, n_fft=len_frame, win_length=len_frame, hop_length=len_hop
            )
            _, phase = librosa.magphase(spec)
            phase_angle = np.angle(phase)
            spec = get_stft_matrix(mag, phase_angle)
    return wav


def _pad_wav(wav, sr, duration):
    assert wav.ndim <= 2

    n_samples = int(sr * duration)
    pad_len = np.maximum(0, n_samples - wav.shape[-1])
    if wav.ndim == 1:
        pad_width = (0, pad_len)
    else:
        pad_width = ((0, 0), (0, pad_len))
    wav = np.pad(wav, pad_width=pad_width, mode="constant", constant_values=0)

    return wav


def _sample_range(wav, sr, duration):
    assert wav.ndim <= 2

    target_len = int(sr * duration)
    wav_len = wav.shape[-1]
    start = np.random.choice(range(np.maximum(1, wav_len - target_len)), 1)[0]
    end = start + target_len
    if wav.ndim == 1:
        wav = wav[start:end]
    else:
        wav = wav[:, start:end]
    return wav


### Data

In [None]:
import random
from os import walk

class Data:
    def __init__(self, path):
        self.path = path

    def next_wavs(self, sec, size=1):
        wavfiles = []
        for (root, dirs, files) in walk(self.path):
            wavfiles.extend(
                ["{}/{}".format(root, f) for f in files if f.endswith(".wav")]
            )
        wavfiles = random.sample(wavfiles, size)
        mixed, src1, src2 = get_random_wav(wavfiles, sec, ModelConfig.SR)
        return mixed, src1, src2, wavfiles


### Entrenamiento

In [None]:

import tensorflow as tf


import os
import shutil
import matplotlib as plt
import librosa.display


def train():
    # Model
    model = Model()

    # Loss, Optimizer
    global_step = tf.Variable(
        0, dtype=tf.int32, trainable=False, name="global_step")
    loss_fn = model.loss()
    optimizer = tf.train.AdamOptimizer(learning_rate=TrainConfig.LR).minimize(
        loss_fn, global_step=global_step
    )

    # Summaries
    summary_op = summaries(model, loss_fn)

    with tf.Session(config=TrainConfig.session_conf) as sess:

        # Initialized, Load state
        sess.run(tf.global_variables_initializer())
        model.load_state(sess, TrainConfig.CKPT_PATH)

        writer = tf.summary.FileWriter(TrainConfig.GRAPH_PATH, sess.graph)

        # Input source
        data = Data(TrainConfig.DATA_PATH)

        loss = Diff()
        for step in range(
            global_step.eval(), TrainConfig.FINAL_STEP
        ):  # changed xrange to range for py3
            mixed_wav, src1_wav, src2_wav, _ = data.next_wavs(
                TrainConfig.SECONDS, TrainConfig.NUM_WAVFILE
            )

            mixed_spec = to_spectrogram(mixed_wav)
            mixed_mag = get_magnitude(mixed_spec)

            src1_spec, src2_spec = to_spectrogram(
                src1_wav), to_spectrogram(src2_wav)
            src1_mag, src2_mag = get_magnitude(
                src1_spec), get_magnitude(src2_spec)

            src1_batch, _ = model.spec_to_batch(src1_mag)
            src2_batch, _ = model.spec_to_batch(src2_mag)
            mixed_batch, _ = model.spec_to_batch(mixed_mag)

            l, _, summary = sess.run(
                [loss_fn, optimizer, summary_op],
                feed_dict={
                    model.x_mixed: mixed_batch,
                    model.y_src1: src1_batch,
                    model.y_src2: src2_batch,
                },
            )

            loss.update(l)
            print(
                "step-{}\td_loss={:2.2f}\tloss={}".format(
                    step, loss.diff * 100, loss.value
                )
            )

            writer.add_summary(summary, global_step=step)

            # Save state
            if step % TrainConfig.CKPT_STEP == 0:
                tf.train.Saver().save(
                    sess, TrainConfig.CKPT_PATH + "/checkpoint", global_step=step
                )

        writer.close()


def summaries(model, loss):
    for v in tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES):
        tf.summary.histogram(v.name, v)
        tf.summary.histogram("grad/" + v.name, tf.gradients(loss, v))
    tf.summary.scalar("loss", loss)
    tf.summary.histogram("x_mixed", model.x_mixed)
    tf.summary.histogram("y_src1", model.y_src1)
    tf.summary.histogram("y_src2", model.y_src1)
    return tf.summary.merge_all()


def setup_path():
    if TrainConfig.RE_TRAIN:
        if os.path.exists(TrainConfig.CKPT_PATH):
            shutil.rmtree(TrainConfig.CKPT_PATH)
        if os.path.exists(TrainConfig.GRAPH_PATH):
            shutil.rmtree(TrainConfig.GRAPH_PATH)
    if not os.path.exists(TrainConfig.CKPT_PATH):
        os.makedirs(TrainConfig.CKPT_PATH)


if __name__ == "__main__":
    setup_path()
    train()


### Evaluación

In [None]:
import os
import shutil
import numpy as np
import tensorflow as tf
from mir_eval.separation import bss_eval_sources



def eval():
    # Model
    model = Model()
    global_step = tf.Variable(0, dtype=tf.int32, trainable=False, name="global_step")

    with tf.Session(config=EvalConfig.session_conf) as sess:

        # Initialized, Load state
        sess.run(tf.global_variables_initializer())
        model.load_state(sess, EvalConfig.CKPT_PATH)

        writer = tf.summary.FileWriter(EvalConfig.GRAPH_PATH, sess.graph)

        data = Data(EvalConfig.DATA_PATH)
        mixed_wav, src1_wav, src2_wav, wavfiles = data.next_wavs(
            EvalConfig.SECONDS, EvalConfig.NUM_EVAL
        )

        mixed_spec = to_spectrogram(mixed_wav)
        mixed_mag = get_magnitude(mixed_spec)
        mixed_batch, padded_mixed_mag = model.spec_to_batch(mixed_mag)
        mixed_phase = get_phase(mixed_spec)

        assert np.all(
            np.equal(
                model.batch_to_spec(mixed_batch, EvalConfig.NUM_EVAL), padded_mixed_mag
            )
        )

        (pred_src1_mag, pred_src2_mag) = sess.run(
            model(), feed_dict={model.x_mixed: mixed_batch}
        )

        seq_len = mixed_phase.shape[-1]
        pred_src1_mag = model.batch_to_spec(pred_src1_mag, EvalConfig.NUM_EVAL)[
            :, :, :seq_len
        ]
        pred_src2_mag = model.batch_to_spec(pred_src2_mag, EvalConfig.NUM_EVAL)[
            :, :, :seq_len
        ]

        # Time-frequency masking
        mask_src1 = soft_time_freq_mask(pred_src1_mag, pred_src2_mag)
        # mask_src1 = hard_time_freq_mask(pred_src1_mag, pred_src2_mag)
        mask_src2 = 1.0 - mask_src1
        pred_src1_mag = mixed_mag * mask_src1
        pred_src2_mag = mixed_mag * mask_src2

        # (magnitude, phase) -> spectrogram -> wav
        if EvalConfig.GRIFFIN_LIM:
            pred_src1_wav = to_wav_mag_only(
                pred_src1_mag,
                init_phase=mixed_phase,
                num_iters=EvalConfig.GRIFFIN_LIM_ITER,
            )
            pred_src2_wav = to_wav_mag_only(
                pred_src2_mag,
                init_phase=mixed_phase,
                num_iters=EvalConfig.GRIFFIN_LIM_ITER,
            )
        else:
            pred_src1_wav = to_wav(pred_src1_mag, mixed_phase)
            pred_src2_wav = to_wav(pred_src2_mag, mixed_phase)

        # Write the result
        tf.summary.audio(
            "GT_mixed", mixed_wav, ModelConfig.SR, max_outputs=EvalConfig.NUM_EVAL
        )
        tf.summary.audio(
            "Pred_music", pred_src1_wav, ModelConfig.SR, max_outputs=EvalConfig.NUM_EVAL
        )
        tf.summary.audio(
            "Pred_vocal", pred_src2_wav, ModelConfig.SR, max_outputs=EvalConfig.NUM_EVAL
        )

        if EvalConfig.EVAL_METRIC:
            # Compute BSS metrics
            gnsdr, gsir, gsar = bss_eval_global(
                mixed_wav, src1_wav, src2_wav, pred_src1_wav, pred_src2_wav
            )

            # Write the score of BSS metrics
            tf.summary.scalar("GNSDR_music", gnsdr[0])
            tf.summary.scalar("GSIR_music", gsir[0])
            tf.summary.scalar("GSAR_music", gsar[0])
            tf.summary.scalar("GNSDR_vocal", gnsdr[1])
            tf.summary.scalar("GSIR_vocal", gsir[1])
            tf.summary.scalar("GSAR_vocal", gsar[1])

        if EvalConfig.WRITE_RESULT:
            # Write the result
            for i in range(len(wavfiles)):
                name = wavfiles[i].replace("/", "-").replace(".wav", "")
                write_wav(
                    mixed_wav[i],
                    "{}/{}-{}".format(EvalConfig.RESULT_PATH, name, "original"),
                )
                write_wav(
                    pred_src1_wav[i],
                    "{}/{}-{}".format(EvalConfig.RESULT_PATH, name, "music"),
                )
                write_wav(
                    pred_src2_wav[i],
                    "{}/{}-{}".format(EvalConfig.RESULT_PATH, name, "voice"),
                )

        writer.add_summary(
            sess.run(tf.summary.merge_all()), global_step=global_step.eval()
        )

        writer.close()


def bss_eval_global(mixed_wav, src1_wav, src2_wav, pred_src1_wav, pred_src2_wav):
    len_cropped = pred_src1_wav.shape[-1]
    src1_wav = src1_wav[:, :len_cropped]
    src2_wav = src2_wav[:, :len_cropped]
    mixed_wav = mixed_wav[:, :len_cropped]
    gnsdr = gsir = gsar = np.zeros(2)
    total_len = 0
    for i in range(EvalConfig.NUM_EVAL):
        sdr, sir, sar, _ = bss_eval_sources(
            np.array([src1_wav[i], src2_wav[i]]),
            np.array([pred_src1_wav[i], pred_src2_wav[i]]),
            False,
        )
        sdr_mixed, _, _, _ = bss_eval_sources(
            np.array([src1_wav[i], src2_wav[i]]),
            np.array([mixed_wav[i], mixed_wav[i]]),
            False,
        )
        nsdr = sdr - sdr_mixed
        gnsdr += len_cropped * nsdr
        gsir += len_cropped * sir
        gsar += len_cropped * sar
        total_len += len_cropped
    gnsdr = gnsdr / total_len
    gsir = gsir / total_len
    gsar = gsar / total_len
    return gnsdr, gsir, gsar


def setup_path():
    if EvalConfig.RE_EVAL:
        if os.path.exists(EvalConfig.GRAPH_PATH):
            shutil.rmtree(EvalConfig.GRAPH_PATH)
        if os.path.exists(EvalConfig.RESULT_PATH):
            shutil.rmtree(EvalConfig.RESULT_PATH)

    if not os.path.exists(EvalConfig.RESULT_PATH):
        os.makedirs(EvalConfig.RESULT_PATH)


if __name__ == "__main__":
    setup_path()
    eval()
