# IMPORTS

In [11]:
import torch
import librosa
import mirdata
import soundfile
import numpy as np
from torch import nn
from training import Trainer
from model import PaperModel
from data import SalienceDataset
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter

import json
import pickle
import warnings
from pathlib import Path
from datetime import datetime

# CONSTANTS

In [2]:
DATA_DIR = Path("/Users/alexandre/mir_datasets/medleydb_pitch/")

In [None]:
EXP_NAME = "first_attempt"
TIMESTAMP = datetime.now().strftime("%d%m%Y_%H%M%S")
EXP_FOLDER = Path(EXP_NAME + "_" + TIMESTAMP)
EXP_FOLDER.mkdir(parents=True, exist_ok=True)
SUMMARY_WRITER = SummaryWriter(str(EXP_FOLDER/EXP_NAME))

# HYPER PARAMETERS

In [None]:
LR = 1e-2
WEIGHT_DECAY = 1e-4
BATCH_SIZE = 32
INPUT_DIM = 5
DEVICE = "cpu"
N_EPOCHS = 100

HP = {
    "LR": 1e-3,
    "WEIGHT_DECAY": 1e-4,
    "BATCH_SIZE": 32,
    "INPUT_DIM": 5,
    "DEVICE": "cpu",
    "N_EPOCHS": 100,
}
with open(EXP_FOLDER/"hyper_parameters.json", "w") as f:
    json.dump(HP, f)

In [None]:
model = PaperModel()
loss = nn.BCEWithLogitsLoss()
optim = torch.optim.Adam(lr=LR, params=model.parameters(), weight_decay=WEIGHT_DECAY)

train_data = SalienceDataset(DATA_DIR/"train", ratio=0.1)
train_loader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
val_data = SalienceDataset(DATA_DIR/"validation", ratio=0.01)
val_loader = DataLoader(val_data, batch_size=BATCH_SIZE, shuffle=True)

In [None]:
with open(EXP_FOLDER/"model.p", 'wb') as f: pickle.dump(model, f)

In [None]:
trainer = Trainer(
    model=model,
    train_data=train_loader,
    val_data=val_loader,
    loss_cls=loss,
    optimizer=optim,
    device=DEVICE,
    summary_writer=SUMMARY_WRITER,
    ckp_path=EXP_FOLDER
)

In [None]:
warnings.simplefilter('ignore')
trainer.train(N_EPOCHS)

In [4]:
with open("./first_attempt_19082022_143140/model.p", "rb") as f:
    model = pickle.load(f)

In [6]:
model.load_state_dict(torch.load("./first_attempt_19082022_143140/ckp.pt"))

<All keys matched successfully>

In [18]:
model.eval()

PaperModel(
  (conv1): Conv2d(5, 128, kernel_size=(5, 5), stride=(1, 1), padding=same)
  (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(128, 64, kernel_size=(5, 5), stride=(1, 1), padding=same)
  (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=same)
  (bn3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv4): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=same)
  (bn4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv5): Conv2d(64, 8, kernel_size=(3, 70), stride=(1, 1), padding=same)
  (bn5): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv6): Conv2d(8, 1, kernel_size=(1, 1), stride=(1, 1), padding=same)
  (relu): ReLU()
  (sigmoid): Sigmoid()
)

In [41]:
TARGET_SR = 22050
BINS_PER_SEMITONE = 3
N_OCTAVES = 6
FMIN = 32.7
BINS_PER_OCTAVE = 12 * BINS_PER_SEMITONE
N_BINS = N_OCTAVES * BINS_PER_OCTAVE
HOP_LENGTH = 512  # 23 ms hop
N_TIME_FRAMES = 50  # 1.16 seconds
N_AUDIO_SAMPLES = HOP_LENGTH * N_TIME_FRAMES
N_EXAMPLES_PER_TRACK = 100

CQT_FREQUENCIES = librosa.cqt_frequencies(N_BINS, FMIN, BINS_PER_OCTAVE)


def load_audio(audio_path):
    y, _ = librosa.load(audio_path, sr=TARGET_SR, mono=True)
    return y


def compute_hcqt(audio):
    cqt = librosa.cqt(
        audio,
        sr=TARGET_SR,
        hop_length=HOP_LENGTH,
        fmin=FMIN,
        n_bins=N_BINS,
        bins_per_octave=BINS_PER_OCTAVE,
    )
    cqt = (1.0 / 80.0) * librosa.amplitude_to_db(np.abs(cqt), ref=1.0) + 1.0

    hcqt = librosa.interp_harmonics(cqt, CQT_FREQUENCIES, [0.5, 1, 2, 3, 4])  # there are 5 harmonics ranges this is the dimension 0
    return hcqt.transpose([0, 2, 1])


def get_cqt_times(n_bins):
    return librosa.frames_to_time(np.arange(n_bins), sr=TARGET_SR, hop_length=HOP_LENGTH)

mirdata.initialize("vocadito")

def sonify_outputs(save_path, times, freqs, voicing):
    """Sonify the model outputs

    Args:
        save_path (str): path to save the audio file
        times (np.ndarray): time stamps (in seconds)
        freqs (np.ndarray): f0 values (in Hz)
        voicing (np.ndarray): voicing (between 0 and 1)
    """
    y = mir_eval.sonify.pitch_contour(times, freqs, 8000, amplitudes=voicing)
    soundfile.write(save_path, y, 8000)

In [16]:
audio = load_audio("./A Classic Education - NightOwl.stem.mp4")



In [37]:
def run_inference(model, audio_path):
    """Run model inference on a full-length audio file

    Args:
        model (nn.Module): pytorch model
        audio_path (str): path to audio file to run inference on

    Returns:
        mirdata.annotations.F0Data: f0 data object, containing predicted f0 data
    """
    model.eval()  # put the model in evaluation mode

    # load audio (at the fixed sample rate)
    y = load_audio(audio_path)

    # compute input features
    hcqt = compute_hcqt(y)
    n_times = hcqt.shape[1]

    # the number of frames to run inference on at a time
    slice_size = 200
    outputs = []

    with torch.no_grad():
        # loop over the full time range
        for i in np.arange(0, n_times, step=slice_size):
            hcqt_tensor = torch.tensor(hcqt[np.newaxis, :, i : i + slice_size, :]).float()
            predicted_salience = model(hcqt_tensor)
            predicted_salience = nn.Sigmoid()(predicted_salience).detach()
            outputs.append(predicted_salience)

    # concatenate the outputs
    # NOTE: this is not the best approach! This will have boundary effects
    # every slice_size frames. To improve this, use e.g. overlap add
    unwrapped_prediction = np.hstack(outputs)[0, :n_times, :, 0].astype(float)

    # decode the output predictions into a single time series using viterbi decoding
    transition_matrix = librosa.sequence.transition_local(len(CQT_FREQUENCIES), 5)
    predicted_pitch_idx = librosa.sequence.viterbi(unwrapped_prediction.T, transition_matrix)

    # compute f0 and amplitudes using predicted indexes
    predicted_pitch = np.array([CQT_FREQUENCIES[f] for f in predicted_pitch_idx])
    predicted_salience = np.array(
        [unwrapped_prediction[i, f] for i, f in enumerate(predicted_pitch_idx)]
    )
    times = get_cqt_times(n_times)
    return mirdata.annotations.F0Data(
        times, "s", predicted_pitch, "hz", predicted_salience, "likelihood"
    )

In [38]:
pred = run_inference(model, "./A Classic Education - NightOwl.stem.mp4")



In [40]:
est_times, est_freqs, est_voicing = pred.to_mir_eval()

In [44]:
import mir_eval

In [45]:
import os

# sonify the estimates
sonify_outputs(
    os.path.join("./", f"test_f0est.wav"),
    est_times,
    est_freqs,
    est_voicing,
)

In [58]:
def run_inference(model, audio_path):
    """Run model inference on a full-length audio file

    Args:
        model (nn.Module): pytorch model
        audio_path (str): path to audio file to run inference on

    Returns:
        mirdata.annotations.F0Data: f0 data object, containing predicted f0 data
    """
    model.eval()  # put the model in evaluation mode

    # load audio (at the fixed sample rate)
    y = load_audio(audio_path)

    # compute input features
    hcqt = compute_hcqt(y)
    n_times = hcqt.shape[1]

    # the number of frames to run inference on at a time
    slice_size = 200
    outputs = []

    with torch.no_grad():
        # loop over the full time range
        for i in np.arange(0, n_times, step=slice_size):
            hcqt_tensor = torch.tensor(hcqt[np.newaxis, :, i : i + slice_size, :]).float()
            predicted_salience = model(hcqt_tensor)
            predicted_salience = nn.Sigmoid()(predicted_salience).detach()
            outputs.append(predicted_salience)

    # concatenate the outputs
    # NOTE: this is not the best approach! This will have boundary effects
    # every slice_size frames. To improve this, use e.g. overlap add
    unwrapped_prediction = np.hstack(outputs)[0, :n_times, :, 0].astype(float)

    # decode the output predictions into a single time series using viterbi decoding
    transition_matrix = librosa.sequence.transition_local(len(CQT_FREQUENCIES), 5)
    predicted_pitch_idx = librosa.sequence.viterbi(unwrapped_prediction.T, transition_matrix)

    # compute f0 and amplitudes using predicted indexes
    predicted_pitch = np.array([CQT_FREQUENCIES[f] for f in predicted_pitch_idx])
    predicted_salience = np.array(
        [unwrapped_prediction[i, f] for i, f in enumerate(predicted_pitch_idx)]
    )
    times = get_cqt_times(n_times)
    return mirdata.annotations.F0Data(
        times, "s", predicted_pitch, "hz", predicted_salience, "likelihood"
    )


def sonify_outputs(save_path, times, freqs, voicing):
    """Sonify the model outputs

    Args:
        save_path (str): path to save the audio file
        times (np.ndarray): time stamps (in seconds)
        freqs (np.ndarray): f0 values (in Hz)
        voicing (np.ndarray): voicing (between 0 and 1)
    """
    y = mir_eval.sonify.pitch_contour(times, freqs, 8000, amplitudes=voicing)
    soundfile.write(save_path, y, 8000)


def evaluate(model, track_path, sonification_dir):
    """Run evaluation on vocadito

    Args:
        model (nn.Module): pytorch model
        sonification_dir (str): path to save sonifications

    """
    scores = {}
    medley_db = mirdata.initialize("medleydb_pitch")
    # loop over the tracks in medley_db
    
    estimated_f0 = run_inference(model, track_path)

    # get the estimated f0 in mir_eval format
    est_times, est_freqs, est_voicing = estimated_f0.to_mir_eval()

    # sonify the estimates
    sonify_outputs(
        os.path.join(sonification_dir, f"test_f0est.wav"),
        est_times,
        est_freqs,
        est_voicing,
    )

In [59]:
medley_db = mirdata.initialize("medleydb_pitch")

In [64]:
for track_id in medley_db.track_ids:
    track_path = medley_db.track(track_id).audio_path
    break