In [1]:
import logging
import math
import os

logging.basicConfig(level=logging.INFO)
logging.info("library loading")
logging.info("DEBUG")

import torch
import librosa

torch.set_grad_enabled(True)

import cached_conv as cc
import gin
import nn_tilde
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from typing import Union, Optional
from absl import flags, app

import sys, os
try:
    import rave
except:
    import sys, os 
    sys.path.append(os.path.abspath('.'))
    import rave

import rave.core
import rave.dataset
from rave.transforms import get_augmentations, add_augmentation
import rave.blocks
import rave.resampler
import IPython.display as ipd
import pickle

from rave.cached_a_weight import Cached_A_Weight as cached_a_weight
from rave.pitch_enc import PitchEncoderV2

INFO:root:library loading
INFO:root:DEBUG


  [INFO]: device is not None, use cuda:0
  [INFO]    > call by:torchfcpe.tools.spawn_infer_cf_naive_mel_pe_from_pt
  [WARN] args.model.use_harmonic_emb is None; use default False
  [WARN]    > call by:torchfcpe.tools.spawn_cf_naive_mel_pe


In [2]:
cc.use_cached_conv(True)

run = "pretrained/causal"
ema_weights = False
prior_flag = False
channel_flag = None
sr_flag = 44100
fidelity_flag =.95

logging.info("building rave")

gin.parse_config_file(os.path.join(run, "config.gin"))
checkpoint = rave.core.search_for_run(run)
print("loading checkpoint:", checkpoint)

pretrained = rave.RAVE()
if run is not None:
    logging.info('model found : %s'%run)
    checkpoint = torch.load(checkpoint, map_location='cpu')
    if ema_weights and "EMA" in checkpoint["callbacks"]:
        pretrained.load_state_dict(
            checkpoint["callbacks"]["EMA"],
            strict=False,
        )
    else:
        pretrained.load_state_dict(
            checkpoint["state_dict"],
            strict=False,
        )
else:
    logging.error("No checkpoint found")
    exit()
    
pretrained.eval()

pitch_enc = PitchEncoderV2(data_size=6,
                           capacity=16,
                           ratios=[4,4,4,2],
                           latent_size=1440,
                           n_out=1,
                           kernel_size=3,
                           dilations=[[1, 3, 9], [1, 3, 9], [1, 3, 9], [1, 3]])

pitch_enc.load_state_dict(torch.load(f"rave/utils/caus2048_mb6.pth", weights_only=True))
pretrained.pitch_encoder = pitch_enc

INFO:root:building rave


loading checkpoint: pretrained/causal/latest.ckpt


INFO:root:model found : pretrained/causal


In [3]:
print(pretrained.decoder.net[2].cumulative_delay)

1


In [4]:
for m in pretrained.modules():
    if hasattr(m, "weight_g"):
        nn.utils.remove_weight_norm(m)

t = torch.rand(1, 1, 131072)
out, _, _, _ = pretrained(t, ['Violin'])
print(out.shape)

  return _VF.stft(input, n_fft, hop_length, win_length, window,  # type: ignore[attr-defined]


torch.Size([1, 1, 131072])


In [5]:
with open('rave/utils/loudness_stats.pkl', 'rb') as file:
    loudness_dict = pickle.load(file)

print(loudness_dict)

# Extract the mean and std values
means = [v['mean'] for v in loudness_dict.values()]
stds = [v['std'] for v in loudness_dict.values()]

# Compute the averages
global_mean = (sum(means) / len(means))
global_std = sum(stds) / len(stds)

print("Mean of means:", global_mean)
print("Mean of stds:", global_std)

{'Cello': {'mean': -0.6540045, 'std': 0.86948127, 'mean_f0': 152.18431, 'n_files': 60}, 'Violin': {'mean': -0.8638949, 'std': 0.93849087, 'mean_f0': 475.77075, 'n_files': 135}, 'Flute': {'mean': -0.9614245, 'std': 1.02139, 'mean_f0': 624.2901, 'n_files': 70}, 'Trumpet': {'mean': -1.1028684, 'std': 1.3292578, 'mean_f0': 443.05734, 'n_files': 76}, 'Trombone': {'mean': -0.9961345, 'std': 1.1266551, 'mean_f0': 175.1723, 'n_files': 35}, 'Oboe': {'mean': -0.638035, 'std': 0.877518, 'mean_f0': 730.6992, 'n_files': 8}, 'Bassoon': {'mean': -0.3955518, 'std': 0.76708513, 'mean_f0': 195.48087, 'n_files': 9}, 'Saxophone': {'mean': -1.1703084, 'std': 1.3255111, 'mean_f0': 339.8774, 'n_files': 37}, 'Horn': {'mean': -1.0007778, 'std': 1.355151, 'mean_f0': 274.30194, 'n_files': 18}, 'Clarinet': {'mean': -1.1429967, 'std': 1.1013856, 'mean_f0': 397.2989, 'n_files': 28}}
Mean of means: -0.8925996541976928
Mean of stds: 1.071192592382431


In [12]:
class ScriptedRAVE(nn_tilde.Module):

    def __init__(self,
                 pretrained: rave.RAVE,
                 channels: Optional[int] = None,
                 fidelity: float = .95,
                 target_sr: bool = None) -> None:

        super().__init__()
        self.pqmf = pretrained.pqmf
        self.sr = pretrained.sr
        self.spectrogram = pretrained.spectrogram
        self.resampler = None
        self.input_mode = pretrained.input_mode
        self.output_mode = pretrained.output_mode
        self.n_channels = pretrained.n_channels
        self.target_channels = channels or self.n_channels
        self.stereo_mode = False

        if target_sr is not None:
            if target_sr != self.sr:
                assert not target_sr % self.sr, "Incompatible target sampling rate"
                self.resampler = rave.resampler.Resampler(target_sr, self.sr)
                self.sr = target_sr

        self.full_latent_size = pretrained.latent_size

        self.register_attribute("learn_target", False)
        self.register_attribute("reset_target", False)
        self.register_attribute("learn_source", False)
        self.register_attribute("reset_source", False)

        self.register_buffer("latent_pca", pretrained.latent_pca)
        self.register_buffer("latent_mean", pretrained.latent_mean)
        self.register_buffer("fidelity", pretrained.fidelity)

        self.register_buffer("global_mean", torch.tensor(global_mean))
        self.register_buffer("global_std", torch.tensor(global_std))

        self.latent_size = 2

        # have to init cached conv before graphing
        self.decoder = pretrained.decoder
        self.amp_block = pretrained.amp_block
        self.pitch_encoder = pretrained.pitch_encoder
        
        x_len = 2**14
        x = torch.zeros(1, self.n_channels, x_len)
        
        # configure encoder
        if (pretrained.input_mode == "pqmf") or (pretrained.output_mode == "pqmf"):
            # scripting fails if cached conv is not initialized
            self.pqmf(torch.zeros(1, 1, x_len))

        encode_shape = (pretrained.n_channels, 2**14) 

        self.register_method(
            "forward",
            in_channels=1,
            in_ratio=1,
            out_channels=self.target_channels,
            out_ratio=1,
            input_labels=['(signal) Channel %d'%d for d in range(1, 1 + 1)],
            output_labels=['(signal) Channel %d'%d for d in range(1, self.target_channels+1)],
            test_method=False
        )

    def post_process_latent(self, z):
        raise NotImplementedError

    def pre_process_latent(self, z):
        raise NotImplementedError

    def update_adain(self):
        for m in self.modules():
            if isinstance(m, rave.blocks.AdaptiveInstanceNormalization):
                m.learn_x.zero_()
                m.learn_y.zero_()

                if self.learn_target[0]:
                    m.learn_y.add_(1)
                if self.learn_source[0]:
                    m.learn_x.add_(1)

                if self.reset_target[0]:
                    m.reset_y()
                if self.reset_source[0]:
                    m.reset_x()

        self.reset_source = False,
        self.reset_target = False,

    @torch.jit.export
    def set_stereo_mode(self, stereo):
        self.stereo_mode = bool(stereo);


    @torch.jit.export
    def forward(self, x, emb_x, emb_y, p_mult, n_mult, loudness, loudness_linear):
        batch_size = x.shape[:-2]

        x_m = self.pqmf(x)
        x_m = x_m.reshape(batch_size + (-1, x_m.shape[-1]))
        
        pitch_logits = self.pitch_encoder(x_m[:, :6, :])
        pitch = torch.argmax(pitch_logits, dim=1)
        pitch = rave.core.bins_to_frequency(pitch).unsqueeze(-1)
        periodicity = rave.core.entropy(pitch_logits)

        pitch = pitch * p_mult

        emb = torch.zeros(1, 2)
        emb[:, 0] = emb_x
        emb[:, 1] = emb_y

        emb = emb.unsqueeze(-1)
        amplitudes = self.amp_block(emb.transpose(2, 1))
        emb = emb.repeat(1, 1, periodicity.shape[-1])

        y, _, _, = self.decoder(emb,
                                pitch, 
                                amplitudes,
                                loudness.transpose(2,1),
                                loudness_linear.unsqueeze(-1),
                                periodicity.unsqueeze(-1))

        batch_size = emb.shape[:-2]
        if self.pqmf is not None:
            y = y.reshape(y.shape[0] * self.n_channels, -1, y.shape[-1])
            y = self.pqmf.inverse(y)
            y = y.reshape(batch_size+(self.n_channels, -1))

        if self.resampler is not None:
            y = self.resampler.from_model_sampling_rate(y)
                
        return y

    @torch.jit.export
    def get_learn_target(self) -> bool:
        return self.learn_target[0]

    @torch.jit.export
    def set_learn_target(self, learn_target: bool) -> int:
        self.learn_target = (learn_target, )
        return 0

    @torch.jit.export
    def get_learn_source(self) -> bool:
        return self.learn_source[0]

    @torch.jit.export
    def set_learn_source(self, learn_source: bool) -> int:
        self.learn_source = (learn_source, )
        return 0

    @torch.jit.export
    def get_reset_target(self) -> bool:
        return self.reset_target[0]

    @torch.jit.export
    def set_reset_target(self, reset_target: bool) -> int:
        self.reset_target = (reset_target, )
        return 0

    @torch.jit.export
    def get_reset_source(self) -> bool:
        return self.reset_source[0]

    @torch.jit.export
    def set_reset_source(self, reset_source: bool) -> int:
        self.reset_source = (reset_source, )
        return 0

In [13]:
script_class = ScriptedRAVE

In [14]:
prior_scripted=None

logging.info("script model")
scripted_rave = script_class(
    pretrained=pretrained,
    channels = channel_flag,
    fidelity=fidelity_flag,
    target_sr=sr_flag)

INFO:root:script model
INFO:root:Registering method "forward"


In [15]:
#scripted_rave.normalizer.reset()
x = torch.zeros(1, pretrained.n_channels, 2**14)
x = torch.zeros(1, 1, 2048)

emb_x = torch.zeros(1)
emb_y = torch.zeros(1)
mult = torch.ones(1)
l = torch.ones(1, 1, 1)
ln = torch.ones(1, 1)

y = scripted_rave.forward(x, emb_x, emb_y, mult, mult, l, ln)
print(y.shape)

logging.info("save model")
output = os.path.dirname(run)
model_name = run.split(os.sep)[-1]

model_name += "_streaming.ts"

output = os.path.abspath(output)
if not os.path.isdir(output):
    os.makedirs(output)
scripted_rave.export_to_ts(os.path.join(output, model_name))
try:
    if pretrained.n_channels <= 2:
        # test stereo mode for VST export
        scripted_rave.set_stereo_mode(True)
        z_vst_input = torch.zeros(2, scripted_rave.full_latent_size, z.shape[-1])
        out = scripted_rave.decode(z_vst_input)
        assert out.shape[1] == 2, "model output is not stereo"
        logging.info(f"this model seems compatible with the RAVE vst.")
except Exception as e:
    logging.warning(f"this model will not work with the RAVE VST. \n Caught error : %s"%e)

logging.info(f"all good ! model exported to {os.path.join(output, model_name)}")

INFO:root:save model


torch.Size([1, 1, 2048])


 Caught error : name 'z' is not defined
INFO:root:all good ! model exported to /home/jupyter-arbu/unified-timbre-transfer/pretrained/causal_streaming.ts
