# Title: Music Deconstuctor - mp3 track file to component part midis pipeline
# Author: SkYeJustIs
# Created: 2023-09

***************************************************************

🎹 **Purpose**: A tool that allows one to provide mp3 files as input and get midis as output within your Google Drive 🎶

***************************************************************

*   **Input location**: /gdrive/MyDrive/music_deconstruct/input/
      * Put your mp3 track / song files in the above directory
          * Note: mp3 file names should have underscores _ instead of spaces
*   **Output locations**:
      * /gdrive/MyDrive/music_deconstruct/output_midi/
          * will contain output midi files of song parts
      * /gdrive/MyDrive/music_deconstruct/output/htdemucs/
          * will contain output wav files of song parts

*  References:
    * https://github.com/facebookresearch/demucs
    * https://www.audiolabs-erlangen.de/resources/MIR/NMFtoolbox/
    * https://github.com/inagoy/drumsep
    * https://github.com/facebookresearch/demucs/issues/422





In [None]:
# Install python
!apt-get install python3.8
!ln -sf /usr/bin/python3.8 /usr/bin/python3
!apt install python3.8-distutils
!wget https://bootstrap.pypa.io/get-pip.py
!python3.8 get-pip.py
!python3 --version

In [None]:
# Install key python packages
!python3 -m pip install -U git+https://github.com/facebookresearch/demucs.git@v4.0.0
!python3 -m pip install -U git+https://github.com/spotify/basic-pitch.git@v0.2.6

In [None]:
# This will link to your entire drive.
from google.colab import drive
drive.mount('/gdrive')

In [None]:
import os

os.system('mkdir /gdrive/MyDrive/music_deconstruct/')

os.system('mkdir /gdrive/MyDrive/music_deconstruct/modules/')
os.system('mkdir /gdrive/MyDrive/music_deconstruct/modules/NMFtoolbox')
os.system('mkdir /gdrive/MyDrive/music_deconstruct/data/')

os.system('mkdir /gdrive/MyDrive/music_deconstruct/input/')
os.system('mkdir /gdrive/MyDrive/music_deconstruct/output/')

os.system('mkdir /gdrive/MyDrive/music_deconstruct/output_midi/')

os.system('mkdir /gdrive/MyDrive/music_deconstruct/models/')

os.system('unzip -o -j /gdrive/MyDrive/NMFtoolbox.zip python/NMFtoolbox/*.py -d /gdrive/MyDrive/music_deconstruct/modules/NMFtoolbox')
os.system('unzip -o -j /gdrive/MyDrive/NMFtoolbox.zip data/dictW.mat -d /gdrive/MyDrive/music_deconstruct/data')
os.system('touch /gdrive/MyDrive/music_deconstruct/modules/NMFtoolbox/__init__.py')

os.system('pip install jupyter')
os.system('pip install scipy')
os.system('pip install matplotlib')

In [None]:
os.chdir('/gdrive/MyDrive/music_deconstruct/modules/')

# import required packages
import os
import numpy as np
import scipy.io.wavfile as wav
import IPython.display as ipd
import pathlib

from NMFtoolbox.forwardSTFT import forwardSTFT
from NMFtoolbox.inverseSTFT import inverseSTFT
from NMFtoolbox.initTemplates import initTemplates
from NMFtoolbox.initActivations import initActivations
from NMFtoolbox.NMFD import NMFD
from NMFtoolbox.alphaWienerFilter import alphaWienerFilter
from NMFtoolbox.visualizeComponentsNMF import visualizeComponentsNMF
from NMFtoolbox.utils import make_monaural, pcmInt16ToFloat32Numpy

In [None]:
# Customize the following options!
model = "htdemucs"
extensions = ["mp3", "wav", "ogg", "flac"]  # we will look for all those file types.
two_stems = None   # only separate one stems from the rest, for instance
# two_stems = "vocals"

# Options for the output audio.
mp3 = True
mp3 = False
mp3_rate = 320
float32 = False  # output as float 32 wavs, unsused if 'mp3' is True.
int24 = False    # output as int24 wavs, unused if 'mp3' is True.
# You cannot set both `float32 = True` and `int24 = True` !!

in_path = '/gdrive/MyDrive/music_deconstruct/input/'
out_path = '/gdrive/MyDrive/music_deconstruct/output/'

In [None]:
#@title DEMUCS - Useful functions, don't forget to execute
import io
from pathlib import Path
import select
from shutil import rmtree
import subprocess as sp
import sys
from typing import Dict, Tuple, Optional, IO

from google.colab import files

def find_files(in_path):
    out = []
    for file in Path(in_path).iterdir():
        if file.suffix.lower().lstrip(".") in extensions:
            out.append(file)
    return out

def copy_process_streams(process: sp.Popen):
    def raw(stream: Optional[IO[bytes]]) -> IO[bytes]:
        assert stream is not None
        if isinstance(stream, io.BufferedIOBase):
            stream = stream.raw
        return stream

    p_stdout, p_stderr = raw(process.stdout), raw(process.stderr)
    stream_by_fd: Dict[int, Tuple[IO[bytes], io.StringIO, IO[str]]] = {
        p_stdout.fileno(): (p_stdout, sys.stdout),
        p_stderr.fileno(): (p_stderr, sys.stderr),
    }
    fds = list(stream_by_fd.keys())

    while fds:
        # `select` syscall will wait until one of the file descriptors has content.
        ready, _, _ = select.select(fds, [], [])
        for fd in ready:
            p_stream, std = stream_by_fd[fd]
            raw_buf = p_stream.read(2 ** 16)
            if not raw_buf:
                fds.remove(fd)
                continue
            buf = raw_buf.decode()
            std.write(buf)
            std.flush()

def separate(inp=None, outp=None):
    inp = inp or in_path
    outp = outp or out_path
    cmd = ["python3", "-m", "demucs.separate", "-o", str(outp), "-n", model]
    if mp3:
        cmd += ["--mp3", f"--mp3-bitrate={mp3_rate}"]
    if float32:
        cmd += ["--float32"]
    if int24:
        cmd += ["--int24"]
    if two_stems is not None:
        cmd += [f"--two-stems={two_stems}"]
    files = [str(f) for f in find_files(inp)]
    if not files:
        print(f"No valid audio files in {in_path}")
        return
    print("Going to separate the files:")
    print('\n'.join(files))
    print("With command: ", " ".join(cmd))
    p = sp.Popen(cmd + files, stdout=sp.PIPE, stderr=sp.PIPE)
    copy_process_streams(p)
    p.wait()
    if p.returncode != 0:
        print("Command failed, something went wrong.")


def from_upload():
    out_path = Path('separated')
    in_path = Path('tmp_in')

    if in_path.exists():
        rmtree(in_path)
    in_path.mkdir()

    if out_path.exists():
        rmtree(out_path)
    out_path.mkdir()

    uploaded = files.upload()
    for name, content in uploaded.items():
        (in_path / name).write_bytes(content)
    separate(in_path, out_path)


# 🎹 🗣 Separate main music parts (vocals, drums, bass, other, etc.) with DEMUCS

In [None]:
# DEMUCS separate
separate()

In [None]:
# @title NMF - Useful functions, don't forget to execute
def get_nmf_drum_parts( song_file_path, drum_file_name):
  fs, x = wav.read(os.path.join(song_file_path,'drums.wav'))
  # make monaural if necessary
  x = make_monaural(x)

  x = pcmInt16ToFloat32Numpy(x)

  # spectral parameters
  paramSTFT = dict()
  paramSTFT['blockSize'] = 2048
  paramSTFT['hopSize'] = 512
  paramSTFT['winFunc'] = np.hanning(paramSTFT['blockSize'])
  paramSTFT['reconstMirror'] = True
  paramSTFT['appendFrame'] = True
  paramSTFT['numSamples'] = len(x)

  # STFT computation
  X, A, P = forwardSTFT(x, paramSTFT)

  # get dimensions and time and frequency resolutions
  numBins, numFrames = X.shape
  deltaT = paramSTFT['hopSize'] / fs
  deltaF = fs / paramSTFT['blockSize']
  # define drum onset times and instrument types (1: kick, 2: snare, 3: hihat)
  # in this example, there are 28 kick hits, 12 snare hits, and 48 hihat hits
  drumTypes = np.hstack((1.0*np.ones(28,),2.0*np.ones(12),3.0*np.ones(48,)))
  # and these are the onset times in seconds
  drumOnsets = 0.001+np.array([0.005079365, 1.422222222, 1.583492063, 1.964444444, 2.745396825, 3.135238095, 4.519365079, 4.698412698,
  5.102222222, 6.265396825, 7.688888889, 7.847619048, 8.238730159, 9.022222222, 9.412063492, 10.840634921, 10.995555556,
  11.381587302, 12.172698413, 12.579047619, 14.010158730, 14.158730159, 14.556190476, 15.347301587, 15.737142857, 17.121088435,
  17.274920635, 17.677460317,
  0.809795918, 2.353922902, 3.921269841, 5.503129252, 7.058866213, 8.617505669, 10.205170068, 11.769614512,
  13.368888889, 14.944943311, 16.523900227, 18.076734694,
  0.006100000, 0.383129252, 0.806893424, 1.184217687, 1.596371882, 1.979501134, 2.359727891, 2.742857143,
  3.140498866, 3.506213152, 3.921269841, 4.324716553, 4.703854875, 5.117460317, 5.503854875, 5.882993197,
  6.291156463, 6.672108844, 7.062131519, 7.453968254, 7.851247166, 8.239455782, 8.622222222, 9.024943311,
  9.418594104, 9.804988662, 10.211337868, 10.601360544, 10.998639456, 11.386848073, 11.769614512, 12.166893424,
  12.575056689, 12.965079365, 13.373242630, 13.765079365, 14.155102041, 14.556009070, 14.944217687, 15.341496599,
  15.747845805, 16.114285714, 16.529705215, 16.914285714, 17.286167800, 17.679818594, 18.080725624, 18.467120181])

  # set common parameters
  numComp = 3    # how many drum instruments we want to extract
  numIter = 20
  numTemplateFrames = 8

  # generate initial guess for templates
  paramTemplates = dict()
  paramTemplates['deltaF'] = deltaF
  paramTemplates['numComp'] = numComp
  paramTemplates['numBins'] = numBins
  paramTemplates['numTemplateFrames'] = numTemplateFrames
  initW = initTemplates(paramTemplates,'drums')

  # generate initial activations
  paramActivations = dict()
  paramActivations['numComp'] = numComp
  paramActivations['numFrames'] = numFrames

  # generate score-informed activations for the drum part
  paramActivations['deltaT'] = deltaT;
  paramActivations['drums'] = drumTypes
  paramActivations['onsets'] = drumOnsets
  paramActivations['decay'] = 0.65
  initH = initActivations(paramActivations,'drums')

  # if no transcription is available, set this to random
  #initH = initActivations(paramActivations,'random')

  # NMFD parameters
  paramNMFD = dict()
  paramNMFD['numComp'] = numComp
  paramNMFD['numFrames'] = numFrames
  paramNMFD['numIter'] = numIter
  paramNMFD['numTemplateFrames'] = numTemplateFrames
  paramNMFD['initW'] = initW
  paramNMFD['initH'] = initH

  # NMFD core method
  nmfdW, nmfdH, nmfdV, divKL, _ = NMFD(A, paramNMFD)

  # alpha-Wiener filtering
  nmfdA, _ = alphaWienerFilter(A, nmfdV, 1.0)


  # visualize the results of the drum separation
  paramVis = dict()
  paramVis['deltaT'] = deltaT
  paramVis['deltaF'] = deltaF
  paramVis['endeSec'] = 6.675
  paramVis['fontSize'] = 14
  fh1, _ = visualizeComponentsNMF(A, nmfdW, nmfdH, nmfdA, paramVis)

  audios = []
  drum_names = ['kick','snare','hihat']

  # resynthesize into individual stems for each drum instrument
  for k in range(numComp):
      # plug-in original phase
      Y = nmfdA[k] * np.exp(1j * P);
      y, _ = inverseSTFT(Y, paramSTFT)
      # store for playback inside notebook
      audios.append(y)
      # save result
      out_filepath = os.path.join(song_file_path,'drums_' + drum_names[k] + '.wav')
      wav.write(filename=out_filepath, rate=fs, data=y)


In [None]:
root_output = os.path.join('/gdrive/MyDrive/music_deconstruct/output/htdemucs/')

song_file_paths = list()
for filepath in pathlib.Path(root_output).glob('**/*'):
  if not str(filepath.absolute()).endswith('.wav'):
    song_path = str(filepath.absolute())
    print(f"Song path to process: {song_path}")
    song_file_paths.append(song_path)

# 🥁 Get drum parts (NMF method)

In [None]:
for song_file_path in song_file_paths:
  get_nmf_drum_parts(song_file_path, 'drums.wav')

************************************************************************

## 🥁 (OPTIONAL) Get drum parts via inagoy's trained model

###  Manually put the model from here into the folder: /gdrive/MyDrive/music_deconstruct/models
* Download the model from here: https://drive.google.com/file/d/1mbhHbP47fWXC4gnqAJVCezgxmh-h5uVr/view
* Credit: https://github.com/inagoy/drumsep

In [None]:
for song_file_path in song_file_paths:
  os.system(f"demucs --repo '/gdrive/MyDrive/music_deconstruct/models/' -n modelo_final -o '{song_file_path}' '{song_file_path}/drums.wav'")
  os.system(f"mv {song_file_path}/modelo_final/drums/bombo.wav {song_file_path}/modelo_final/drums/inagoy_kick.wav")
  os.system(f"mv {song_file_path}/modelo_final/drums/redoblante.wav {song_file_path}/modelo_final/drums/inagoy_snare.wav")
  os.system(f"mv {song_file_path}/modelo_final/drums/platillos.wav {song_file_path}/modelo_final/drums/inagoy_cymbals.wav")


************************************************************************

# 📁 Get midi files


In [None]:
root_output = os.path.join('/gdrive/MyDrive/music_deconstruct/output/htdemucs/')

file_paths = list()
print("LIST OF FILES TO CONVERT TO MIDI")
for filepath in pathlib.Path(root_output).glob('**/*'):
  if str(filepath.absolute()).endswith('.wav'):
    path = str(filepath.absolute())
    print(path)
    file_paths.append(path)


In [None]:
# convert step
for fl in file_paths:
  os.system(f"mkdir /gdrive/MyDrive/music_deconstruct/output_midi/{fl.split('/')[6]}/")
  os.system(f"basic-pitch /gdrive/MyDrive/music_deconstruct/output_midi/{fl.split('/')[6]}/ {fl}")

************************************************************************

************************************************************************

************************************************************************

### NEXT ⏭ ⏭
      * Download folders corresponding to each mp3 track from:
        * /gdrive/MyDrive/music_deconstruct/output_midi/
        * /gdrive/MyDrive/music_deconstruct/output/htdemucs/
      * Delete files before the next run:
        * /gdrive/MyDrive/music_deconstruct/input/
        * /gdrive/MyDrive/music_deconstruct/output_midi/
        * /gdrive/MyDrive/music_deconstruct/output/htdemucs/

## Cleanup 🧹
## ❗ Do not uncomment and run the following code until you have downloaded the files ❗

In [None]:
#os.system(f"rm -R /gdrive/MyDrive/music_deconstruct/output_midi/")
#os.system(f"rm -R /gdrive/MyDrive/music_deconstruct/output/")
#os.system(f"find /gdrive/MyDrive/music_deconstruct/input/ -maxdepth 1 -type f -delete")