## TODO:
* pitch detection rip out non-voice

### Metric candidates:
* absolute error between target and voiced pitch (at every moment)
* Raw Pitch Accuracy (for whole song)
* proportion f1 for voicing/ silence periods (later)

In [1]:
import ipywidgets as widgets
import IPython.display as ipd

import time

import numpy as np

import demucs
import demucs.utils
import demucs.separate
import demucs.pretrained

from resampy import resample
import playsound
import sounddevice as sd

from tensorflow.keras.layers import Input, Reshape, Conv2D, BatchNormalization
from tensorflow.keras.layers import MaxPool2D, Dropout, Permute, Flatten, Dense
from tensorflow.keras.models import Model

import torch


import matplotlib as mpl
import matplotlib.pyplot as plt


playsound is relying on another python subprocess. Please use `pip install pygobject` if you want playsound to run more efficiently.
2021-09-26 10:32:42.437907: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2021-09-26 10:32:42.437954: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


In [2]:
mpl.use("Qt5Agg")

MODEL_SR = 16000
BLOCK_SIZE = 1024
N_CHANNELS = 2
DEVICE = "cpu"

PITCH_MODEL_SR = 16000

In [3]:
def get_vocals(mix_pth, model, shifts: int = 1, splits: int=1, overlap: float = 0.25):
    loaded_mix = demucs.separate.load_track(mix_pth, DEVICE, N_CHANNELS, model.samplerate)
    ref = loaded_mix.mean(0)
    normalized_mix = (loaded_mix - ref.mean()) / ref.std()
    
    all_sources =  demucs.utils.apply_model(model, normalized_mix, shifts=shifts, split=splits,
                                    overlap=overlap, progress=True)
    return all_sources

In [4]:
sources_model = demucs.pretrained.load_pretrained("demucs_quantized")

sources_srate = sources_model.samplerate

In [5]:
songs = [
    "../data/samples/Arctic Monkeys-MadSounds.mp3",
#     "../data/samples/arctic_monkeys_still_take_you_home.mp3",
#     "../data/samples/arctic-monkeys_mardy-bum.mp3",
    "../data/samples/radioactive.mp3",
]

song_voice_stems = {}

for song_pth in songs:
    sources = get_vocals(song_pth, sources_model, shifts=1)
    vocals_source_idx = sources_model.sources.index("vocals")
    song_voice_stems[song_pth] = sources[vocals_source_idx]
    

100%|████████████████████████████████████████████████████████████████████████| 240.0/240.0 [01:58<00:00,  2.02seconds/s]
100%|████████████████████████████████████████████████████████████████████████| 210.0/210.0 [01:44<00:00,  2.01seconds/s]


In [6]:
del sources_model
import gc
gc.collect()

13

In [16]:
def detect_pitch(signal, pitch_model):
    start_time = time.time()
    
    split_idxs = np.arange(1024, len(signal), 1024)
    frames = np.split(signal, split_idxs)

    last = frames[-1]
        
    if len(last) < 1024:
        need_to_pad = 1024 - len(last)
        right_zeros = need_to_pad // 2
        left_zeros = need_to_pad - right_zeros
        frames[-1] = np.concatenate([np.zeros((left_zeros, 1)), last, np.zeros((right_zeros, 1))])
        
    frames = np.concatenate(frames, axis=1)
    frames = frames.transpose(1, 0) # had shape (1024, n_samples), converted to (n_samples, 1024)

    # normalize each frame -- this is expected by the model
    frames -= np.mean(frames, axis=1)[:, np.newaxis]
    std = np.std(frames, axis=1)[:, np.newaxis]
    std[std == 0] = 1e-10
    frames /= std
    
    
    model_preds = pitch_model(frames, training=False)#, workers=-1, use_multiprocessing=True)
    model_preds = model_preds.numpy()
    
    # initially has out shape (length, 360), reducing
    too_low_too_high_mask = np.array([True] * 80 + [False] * 140 + [True] * 140)
    model_preds[:, too_low_too_high_mask] = 0
    
#     print("time needed", time.time() - start_time)
    batch_pitch = model_preds.argmax(axis=1)
    confidence = model_preds.max(axis=1)
    
    return batch_pitch, confidence

In [None]:
def build_and_load_model(model_capacity, filename):
    """
    Build the CNN model and load the weights
    Parameters
    ----------
    model_capacity : 'tiny', 'small', 'medium', 'large', or 'full'
        String specifying the model capacity, which determines the model's
        capacity multiplier to 4 (tiny), 8 (small), 16 (medium), 24 (large),
        or 32 (full). 'full' uses the model size specified in the paper,
        and the others use a reduced number of filters in each convolutional
        layer, resulting in a smaller model that is faster to evaluate at the
        cost of slightly reduced pitch estimation accuracy.
    Returns
    -------
    model : tensorflow.keras.models.Model
        The pre-trained keras model loaded in memory
    """

    capacity_multiplier = {
        'tiny': 4, 'small': 8, 'medium': 16, 'large': 24, 'full': 32
    }[model_capacity]

    layers = [1, 2, 3, 4, 5, 6]
    filters = [n * capacity_multiplier for n in [32, 4, 4, 4, 8, 16]]
    widths = [512, 64, 64, 64, 64, 64]
    strides = [(4, 1), (1, 1), (1, 1), (1, 1), (1, 1), (1, 1)]

    x = Input(shape=(1024,), name='input', dtype='float32')
    y = Reshape(target_shape=(1024, 1, 1), name='input-reshape')(x)

    for l, f, w, s in zip(layers, filters, widths, strides):
        y = Conv2D(f, (w, 1), strides=s, padding='same',
                   activation='relu', name="conv%d" % l)(y)
        y = BatchNormalization(name="conv%d-BN" % l)(y)
        y = MaxPool2D(pool_size=(2, 1), strides=None, padding='valid',
                      name="conv%d-maxpool" % l)(y)
        y = Dropout(0.25, name="conv%d-dropout" % l)(y)

    y = Permute((2, 1, 3), name="transpose")(y)
    y = Flatten(name="flatten")(y)
    y = Dense(360, activation='sigmoid', name="classifier")(y)

    model = Model(inputs=x, outputs=y)

    model.load_weights(filename)
    model.compile('adam', 'binary_crossentropy')

    return model

In [9]:
# not_realtime_pitch_model = build_and_load_model("full", "../models/model-full.h5")

In [10]:
pitch_model = build_and_load_model("large", "../models/model-large.h5")

2021-09-26 10:36:36.216090: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2021-09-26 10:36:36.216118: W tensorflow/stream_executor/cuda/cuda_driver.cc:326] failed call to cuInit: UNKNOWN ERROR (303)
2021-09-26 10:36:36.216136: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (gldsn-hw): /proc/driver/nvidia/version does not exist
2021-09-26 10:36:36.216417: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [11]:
vad_model, _ = torch.hub.load(repo_or_dir='snakers4/silero-vad',
                              model='silero_vad',
                              force_reload=True)

Downloading: "https://github.com/snakers4/silero-vad/archive/master.zip" to /home/arseny/.cache/torch/hub/master.zip


In [12]:
import torchaudio

import torch.nn.functional as F

def get_voicing_probs(model, wav, num_samples_per_window: int = 4000, num_steps: int = 8, batch_size=200):
    
    num_samples = num_samples_per_window
    assert num_samples % num_steps == 0
    step = int(num_samples / num_steps)  # stride / hop
    
    outs = []
    to_concat = []
    for i in range(0, len(wav), step):
        chunk = wav[i: i+num_samples]
        if len(chunk) < num_samples:
            chunk = F.pad(chunk, (0, num_samples - len(chunk)))
        to_concat.append(chunk.unsqueeze(0))
        if len(to_concat) >= batch_size:
            chunks = torch.Tensor(torch.cat(to_concat, dim=0))
            with torch.no_grad():
                out = model(chunks)
            outs.append(out)
            to_concat = []

    outs = torch.cat(outs, dim=0)
    return outs[:, 1] # 1 dim is 'neg' and 'pos' classes, so take pos probability

def get_voice_activity_mask(wav, model, sr, thresh=0.02):
    
    transform = torchaudio.transforms.Resample(orig_freq=sr,
                                               new_freq=16000)
    src_len = len(wav)
    wav = transform(wav)
    sr = 16000
    
    probs = get_voicing_probs(model, wav)
    wav_prob = np.full(src_len, 0, dtype=np.float32)

    step = src_len / len(probs)
    start = 0

    for prob in probs.flatten().numpy():
        wav_prob[round(start): round(start + step)] = prob
        start += step
    print("quantiles", np.quantile(wav_prob, 0.01), np.quantile(wav_prob, 0.5), np.quantile(wav_prob, 0.95))
    winsize = 16000
    rolling_mean_wav_prob = np.convolve(wav_prob, np.ones(winsize), 'same') / winsize
    return rolling_mean_wav_prob >= thresh

In [26]:
import librosa
import json

songs_target_pitch = {}

for song_pth, vocals in song_voice_stems.items():
    mono_vocals = vocals.mean(axis=0, keepdims=False)
    # activity_mask = get_voice_activity_mask(mono_vocals, vad_model, 44100, thresh=0.04)
    mono_vocals = mono_vocals.numpy()
    
    # pure_tone = librosa.tone(frequency=5, sr=44100, length=len(mono_vocals))
    
    # mono_vocals[~activity_mask] = pure_tone[~activity_mask]
    
    mono_vocals_resampled = resample(mono_vocals, sources_srate, PITCH_MODEL_SR).reshape(-1, 1)
    
    pitches = []
    confidences = []
    batch_size = 16
    step_size = 1024 * batch_size
    for split_idx in range(step_size, len(mono_vocals), step_size):
        batch_pitch, batch_confidence = detect_pitch(mono_vocals_resampled[split_idx: split_idx + step_size], pitch_model)
        pitches += list(batch_pitch)
        confidences += list(batch_confidence)
        
    pitches = np.array(pitches)
    confidences = np.array(confidences)
        
    print("confidence", np.quantile(confidences, 0.01), np.quantile(confidences, 0.5), np.quantile(confidences, 0.95))
    pitches = pitches.astype(np.float32)
    pitches[confidences < 0.65] = None
    
    songs_target_pitch[song_pth] = pitches
    
    detected_pitches_lst = pitches.flatten().tolist()
    pitches_pth = f"./pitches_{song_pth.split('/')[-1].split('.')[0]}.json"
    print("pitches_pth", pitches_pth)
    with open(pitches_pth, "w") as f:
        json.dump(detected_pitches_lst, f)
    

confidence 0.0019071030616760255 0.6141752004623413 0.9319335460662842
pitches_pth ./pitches_Arctic Monkeys-MadSounds.json
confidence 0.00029383599758148193 0.21169093251228333 0.9100936949253082
pitches_pth ./pitches_radioactive.json


In [32]:
with open("./pitches_radioactive.json") as f:
    print(json.load(f))

[nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, 168.0, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, na

In [None]:
print(len(songs_target_pitch))

In [13]:
plt.plot(detected_pitch)

[<matplotlib.lines.Line2D at 0x7f17dc3af880>]

In [14]:
plt.pause(1)

In [15]:
# del not_realtime_pitch_model
# import gc
# gc.collect()


In [16]:
class Plotter:
    def __init__(self, targ_pitch, n_points = 200, user_plotted_points = 60, mae_npoints = 30):
        
        assert user_plotted_points <= n_points
        self.n_points = n_points
        self.mae_npoints = mae_npoints
        self.user_plotted_points = user_plotted_points
        self.n_target_future_points = n_points - user_plotted_points
        
        self.user_pitch_padding = np.full(self.n_target_future_points, np.nan)
        

        points = []
        passed_detected_pitch = []

        self.fig, axes = plt.subplots(ncols=2, figsize=(18, 6))
        self.pitch_ax, self.metrics_ax = axes
        
        self.pitch_ax.set_ylim((60, 300))
        self.metrics_ax.set_ylim((0, 100))
        self.metrics_ax.set_xlim((0, self.n_points))
        print(self.pitch_ax, self.metrics_ax)

        # animated=True tells matplotlib to only draw the artist when we
        # explicitly request it
        self.user_pitch_arr = np.zeros(self.user_plotted_points)
        
        self.target_past = np.zeros(self.user_plotted_points).astype(np.float32)
        self.target_future = targ_pitch[:self.n_target_future_points].astype(np.float32)
#         self.target_pitch_arr = np.concatenate([target_past, target_future])
        
        self.targ_pitch_queue = list(targ_pitch[self.n_target_future_points:])
        
        
        self.mae_arr = np.full(self.n_points, np.nan)
        
        self.user_pitch_plot = self._create_plot(np.concatenate([self.user_pitch_arr, self.user_pitch_padding]), self.pitch_ax)
        # different colors for already passed target and future target
        self.target_past_plot = self._create_plot(self.target_past, self.pitch_ax, c="red")
        self.target_future_plot = self._create_plot(self.target_future, self.pitch_ax, c="orange", x=range(len(self.target_past), self.n_points))
        
        self.mae_plot = self._create_plot(self.mae_arr, self.metrics_ax, c="red")
        
#         (plotted_data,) = self.pitch_ax.plot(range(n_points), [60] * (n_points - 1) + [300], animated=True)
#         (plotted_detected,) = ax.plot(range(n_points), [50] + [310] * (n_points - 1), animated=True, c="orange")
        plt.show(block=False)
        plt.pause(0.1)

        self._bg = self.fig.canvas.copy_from_bbox(self.fig.bbox)
        # draw the animated artist, this uses a cached renderer
        self.axes = {
                    "pitch": {"artists": [], "ax": self.pitch_ax},
                    "metrics": {"artists": [], "ax": self.metrics_ax}
        }
        
        self.axes["pitch"]["artists"] += [self.user_pitch_plot, self.target_past_plot, self.target_future_plot]
        self.axes["pitch"]["artists"] += [self.mae_plot]
        
        self._redraw_plot()
        
    def _create_plot(self, data, ax, c="blue", x=None):
        if x is None:
            x = range(len(data))
        (plot,) = ax.plot(x, data, animated=True, c=c)
        return plot
        
    def update(self, user_pitch):
        self.target_past = np.append(self.target_past[1:], [self.target_future[0]])
        
        if len(self.targ_pitch_queue): # at the end of song have no target pitch in queue
            next_targ_pitch = self.targ_pitch_queue.pop(0)
        else:
            
            next_targ_pitch = np.nan
            
        self.target_future = np.append(self.target_future[1:], [next_targ_pitch])
                                     
                                     
        self.user_pitch_arr = np.append(self.user_pitch_arr[1:], [user_pitch])
        self.mae_arr = np.append(self.mae_arr[1:], [self._calc_next_mae(self.target_past, self.user_pitch_arr)])
        
#         plotted_data.set_data(indexes[not_nan_mask], new_plot_data[not_nan_mask])
        self.user_pitch_plot.set_ydata(np.concatenate([self.user_pitch_arr, self.user_pitch_padding]))
        self.target_past_plot.set_ydata(self.target_past)
        self.target_future_plot.set_ydata(self.target_future)
        self.mae_plot.set_ydata(self.mae_arr)
        
        self._redraw_plot()
        
    def _calc_next_mae(self, targ, pred):
        mae_arr1, mae_arr2 = targ[-1 * self.mae_npoints:], pred[-1 * self.mae_npoints:]
        both_notnull_mask = (~np.isnan(mae_arr1)) & (~np.isnan(mae_arr2))
        
        mae_arr1 = mae_arr1[both_notnull_mask]
        mae_arr2 = mae_arr2[both_notnull_mask]
        if len(mae_arr1) == 0: # silence on intersection
            value = np.nan
        value = np.mean(np.abs(mae_arr1 - mae_arr2))
        return value
                                       
    def _redraw_plot(self):
        self.fig.canvas.restore_region(self._bg)
        
        for ax_name, ax_obj in self.axes.items(): 
            ax = ax_obj["ax"]
            for artist in ax_obj["artists"]:
                ax.draw_artist(artist)
                             
        # show the result to the screen, this pushes the updated RGBA buffer from the
        # renderer to the GUI framework so you can see it
        self.fig.canvas.blit(self.fig.bbox)
        self.fig.canvas.flush_events()

In [17]:
def detect_pitch_realtime(signal, model):
    start_time = time.time()
    
    frames = signal[:1024].reshape(1, -1)
    frames -= np.mean(frames, axis=1)[:, np.newaxis]
    frames /= np.std(frames, axis=1)[:, np.newaxis]
    
    model_preds = model(frames, training=False)#, workers=-1, use_multiprocessing=True)
    model_preds = model_preds.numpy()
    
    print("model preds sum", model_preds.sum())
    
    too_low_too_high_mask = np.concatenate([[True] * 80 + [False] * 140 + [True] * 140])
    model_preds[:, too_low_too_high_mask] = 0
#     print("time needed", time.time() - start_time)
    batch_pitch = model_preds.argmax(axis=1)
    confidence = model_preds.max(axis=1)
    
    return batch_pitch, confidence

In [18]:
def load_song(f, mono=False):
    """MP3 to numpy array"""
    a = AudioSegment.from_mp3(f)
    y = np.array(a.get_array_of_samples())
    if a.channels == 2:
        y = y.reshape((-1, 2))
        if mono:
            y = y.mean(axis=1)
    return y, a.frame_rate


In [19]:
from pydub import AudioSegment
import pygame
from scipy.io.wavfile import write

    

class SongPlayer:
    def __init__(self, song_pth, bounds):
        print("bounds", bounds)
        target_pitch = songs_target_pitch[song_pth]
        
        tg_len = len(target_pitch)
        start, end = bounds
        # initially in range 0-100
        start /= 100
        end /= 100
        
        cropped_targ_pitch = target_pitch[int(start * tg_len): int(end * tg_len)]
        
        song_mono, srate = load_song(song_pth, mono=True)
        print("src shape", song_mono.shape, srate)
        song_ln = len(song_mono)
        cropped_song = song_mono[int(start * song_ln): int(end * song_ln)]
        print("cropped", cropped_song)
        
        self.song_pth = "now_played_song.wav"
        write(self.song_pth, srate, cropped_song.astype(np.int16))
        
        AudioSegment.from_wav(self.song_pth).export('now_played_song.ogg', format='ogg')
        
        pygame.mixer.init()
        pygame.mixer.music.load("now_played_song.ogg")
        
        self.targ_pitch = cropped_targ_pitch
        
        self.stream = None
        self.plotter = None
        
    def start(self):
        self.stream = sd.InputStream(
                        samplerate=MODEL_SR,
                        blocksize = BLOCK_SIZE,
                        channels = 1,
        )
        
        self.plotter = Plotter(self.targ_pitch)
        
        self.stream.start()
        pygame.mixer.music.play()
        
    def play(self):
        for block_idx in range(len(self.targ_pitch)):
            user_pitch = self._get_user_pitch()
            self.plotter.update(user_pitch)
            yield
            
        self.stop()
        
    def _get_user_pitch(self):  
        audio_arr, is_overflowed = self.stream.read(BLOCK_SIZE)
        if is_overflowed:
            raise OverflowError()

        model_preds, confidence = detect_pitch_realtime(audio_arr, pitch_model)
        model_preds = model_preds.astype(np.float32)
        model_preds[confidence < 0.5] = None
        
        assert len(model_preds) == 1
        return model_preds[0]
        
    def stop(self):
        pygame.mixer.music.stop()
        self.stream.stop()


pygame 2.0.1 (SDL 2.0.14, Python 3.8.10)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [20]:
def get_fragment_bounds():
    start = input("start percentile\n")
    if len(start) == 0:
        start = 0
    else:
        start = int(start)
    end = input("end percentile\n")
    if len(end) == 0:
        end = 100
    else:
        end = int(end)
        
    return start, end

In [21]:
def get_next_playing():
    print("If move to next track, pass N")
    print("If repeat this track, pass R")
    print("If previous track, pass P")
    value = input()
    return {"N": "next", "R": "repeat", "P": "previous"}[value]

In [None]:
click_sound_pth = "../data/samples/click.wav"

In [None]:
# import librosa
# click = librosa.clicks(times=[0], click_duration=1, sr=44100)

In [None]:
# ipd.Audio(click, rate=44100)

In [None]:
# import scipy
# scipy.io.wavfile.write(click_sound_pth, 44100, click)

In [None]:
def do_countdown(n: int):
    # TODO: add click sound
    for num in range(n, 0, -1):
        print(f"Prepare: {num}")
        playsound.playsound(click_sound_pth, False)
        time.sleep(1)

## TODO: STOP BUTTON

In [None]:
song_idx = 0

next_play_name_to_idx_change = {
    "repeat": 0,
    "next": 1,
    "previous": -1,
}

try:
    while True:
#         ipd.display(need_stop_widg)
        # input to choose part of song to play
        fragment_bounds = get_fragment_bounds()

        player = SongPlayer(songs[song_idx], fragment_bounds)

        do_countdown(3)
        player.start()

        for step in player.play():
            ...
            
        next_playing = get_next_playing()
        song_idx += next_play_name_to_idx_change[next_playing]
        
except BaseException as e:
    pygame.mixer.music.stop()
    plt.close("all")
    raise e

In [None]:
sd.query_devices()