In [None]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

In [None]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import scipy.fftpack

In [None]:
n_dims = list(range(1, 17))
files = ["./data/control_net/control_seq_n_dim_{}_trace.npy".format(i) for i in n_dims]
traces = []
for i, file in enumerate(files):
    traces.append(np.load(file))

In [None]:
cmap = matplotlib.cm.get_cmap('viridis')
colours = cmap(np.linspace(0, 1, len(n_dims)))
fig, ax = plt.subplots(figsize=(4.3, 3.25))
RMS = 0.64
for i in range(16):
    trace = traces[i]
    ax.semilogy(trace[0] * 1e-3, np.sqrt(np.mean(trace[1:], axis=0)) / RMS, color=colours[i], label="$n = {}$".format(n_dims[i]))
ax.set_xlabel("Number of batches ($\\times 10^3$)")
ax.set_ylabel("Mean training error (Relative RMSE)")
ax.set_ylim(2.5e-2, 2)
ax.legend(loc='best', ncol=4, columnspacing=0.75, labelspacing=0.1)

#fig.savefig('../doc/media/control_network_training.pdf', bbox_inches='tight')

In [None]:
import gym_speech_resynthesis.envs.control_network as control_network
import gym_speech_resynthesis.envs.audio as audio
import gym_speech_resynthesis.envs.mfcc as mfcc
import gnuspeech_trm

In [None]:
X = control_network.read_control_sequence('data/control_sequences/000/0049_776f3a41.prm.gz')
X2 = control_network.read_control_sequence('data/control_sequences/000/0002_07a1b183.prm.gz')

Xmin, Xmax = np.min(X, axis=0), np.max(X, axis=0)
Xoffs = -0.5 * (Xmin + Xmax)
Xscale = 2.0 / np.maximum(.1, (Xmax - Xmin))

In [None]:
%%script false
fig, ax = plt.subplots(figsize=(10, 2))
plt.plot(CE_sweep);

In [None]:
%%script false

def measure_trm_latency(fs=16000):
    trm = gnuspeech_trm.TRM()
    trm.output_rate = fs
    trm.filter_period = 20e-3
    trm.glot_vol = 0.0
    smpls1 = trm.synthesize(1024)
    trm.glot_vol = 60.0
    smpls2 = trm.synthesize(1024)
    trm.glot_vol = 0.0
    smpls3 = trm.synthesize(1024)

    smpls = np.concatenate((smpls1, smpls2, smpls3))

    mfcc_analysis = mfcc.MFCCFeatureAnalysis(sample_rate=fs, sample_rate_in=fs)
    mfccs, spectrum, ts = mfcc_analysis(smpls)

    fig, ax = plt.subplots()
    ax.plot(np.linspace(0, len(smpls) / fs, len(smpls)), smpls)
    for i, t in enumerate(ts):
        ax.plot([t - mfcc_analysis.fft_size * 0.5 / fs,
                 t - mfcc_analysis.fft_size * 0.5 / fs,
                 t + mfcc_analysis.fft_size * 0.5 / fs,
                 t + mfcc_analysis.fft_size * 0.5 / fs],
                [0,
                 np.sum(spectrum[i] + 1) * 0.1,
                 np.sum(spectrum[i] + 1) * 0.1,
                 0], 'k-')
    ax.plot(np.array([0, 1023, 1024, 2048, 2049, 3096]) / fs, [0, 0, 1, 1, 0, 0])
    
measure_trm_latency()

In [None]:
def synthesise_trajectory(X, render=False, twnd=3.75e-3, fs=16000, config_dict=gnuspeech_trm.TRM.voice_male):
    class Dummy:
        def __enter__(self): return self
        def __exit__(self, *_): return self
    
    mfcc_analysis = mfcc.MFCCFeatureAnalysis(sample_rate_in=fs, sample_rate=fs)

    trm = gnuspeech_trm.TRM(config_dict=config_dict)
    trm.volume = 60.0
    trm.output_rate = fs
    trm.filter_period = 20e-3

    n_samples = int(twnd * fs)

    spectrum = []
    mfccs = []
    ts = []
    l = 0
    with (audio.Player(channels=1, sample_rate=fs) if render else Dummy()) as player:
        for x in X:
            # Synthesise
            trm.set_parameters(x)
            samples = trm.synthesize(n_samples_max=n_samples)
            l += len(samples)

            # Record spectrum and MFCCs
            ms, ss, t = mfcc_analysis(samples)
            mfccs += ms.tolist()
            spectrum += ss.tolist()      
            ts += t.tolist()

            # Render if requested
            if render:
                player.write(samples.reshape(-1, 1))             

    return mfccs, spectrum, ts

In [None]:
cnet = control_network.ControlNetwork(
    weight_file='data/control_net/control_seq_n_dim_1_weights_014.h5',
    autoencoder=True)

cnet_forward = control_network.ControlNetwork(
    weight_file='data/control_net/control_seq_n_dim_1_weights_014.h5',
    autoencoder=False)

def apply_cnet(X, cnet):
    encoded = list(map(cnet.eval, X))
    XE = np.array(list(map(lambda x: x[0], encoded)))
    CE = np.array(list(map(lambda x: x[1], encoded)))
    return XE,CE

XE, CE = apply_cnet(X, cnet)

CE2 = np.random.uniform(-1, 1, (CE.shape[0], 2))
XE2, CE2 = apply_cnet(CE2, cnet_forward)

ts = np.linspace(0, (X.shape[0] - 1) * 3.75e-3, X.shape[0])

In [None]:
%%script false
fig, axs = plt.subplots(8, 2, figsize=(8.6, 3.5), sharex=True)
for i in range(16):
    ax = axs[i % 8, i // 8]
    ax.plot(ts, X[:, i], 'k', linewidth=1);
    ax.plot(ts, XE[:, i], linestyle=(0, (2, 1)), linewidth=0.75);
    ax.plot(ts, XE[:, i], color='white', linestyle=(1, (1, 2)), linewidth=0.75);
    ax.set_xlim(np.min(ts), np.max(ts))

#fig.savefig('../doc/media/trm_params_.pdf', bbox_inches='tight')

fig, ax = plt.subplots(figsize=(8.6, 0.3))
ax.plot(ts, CE)
ax.set_xlim(np.min(ts), np.max(ts))
#fig.savefig('../doc/media/control_net_traj_.pdf', bbox_inches='tight')

In [None]:
mfccs_x, spectrum_x, ts_x = synthesise_trajectory(X, render=False, config_dict=gnuspeech_trm.TRM.voice_female)
mfccs_xe, spectrum_xe, ts_xe = synthesise_trajectory(XE, render=False, config_dict=gnuspeech_trm.TRM.voice_female)

#mfccs_x = mfccs_x[:10]

mfccs_x = mfccs_x[:min(len(mfccs_x), len(mfccs_xe))]
mfccs_xe = mfccs_xe[:min(len(mfccs_x), len(mfccs_xe))]
spectrum_x = spectrum_x[:min(len(mfccs_x), len(mfccs_xe))]
spectrum_xe = spectrum_xe[:min(len(mfccs_x), len(mfccs_xe))]
ts_x = ts_x[:min(len(mfccs_x), len(mfccs_xe))]
ts_xe = ts_xe[:min(len(mfccs_x), len(mfccs_xe))]
mfccs_x, spectrum_x = np.array(mfccs_x), np.array(spectrum_x)
mfccs_xe, spectrum_xe = np.array(mfccs_xe), np.array(spectrum_xe)

In [None]:
fig, axs = plt.subplots(3, 2, figsize=(8.6, 1.5), sharex=True, gridspec_kw = {'height_ratios':[3, 3, 2]})

def norm_img(x):
    return x / np.maximum(1e-2, np.max(np.abs(x), axis=0))

extent_mfcc = (min(ts_x), max(ts_x), 0, 12)
extent_spectrum = (min(ts_x), max(ts_x), 0, 40)

axs[0, 0].imshow(norm_img(spectrum_x.T), origin='lower', vmin=-1, vmax=1, extent=extent_spectrum)
axs[0, 0].set_aspect('auto')
axs[1, 0].imshow(norm_img(spectrum_xe.T), origin='lower', vmin=-1, vmax=1, extent=extent_spectrum)
axs[1, 0].set_aspect('auto')
err = -np.sqrt(np.mean((spectrum_x.T - spectrum_xe.T)**2, axis=0))
axs[2, 0].plot(ts_x, err)
axs[2, 0].plot([min(ts_x), max(ts_x)], [np.mean(err), np.mean(err)], 'k--')
axs[2, 0].set_xlim(min(ts_x), max(ts_x))
axs[2, 0].set_ylim(-2, 0.2)
axs[2, 0].set_xlabel('Time (s)')
axs[2, 0].grid()

axs[0, 1].imshow(norm_img(mfccs_x.T), origin='lower', vmin=-1, vmax=1, extent=extent_mfcc)
axs[0, 1].set_aspect('auto')
axs[1, 1].imshow(norm_img(mfccs_xe.T), origin='lower', vmin=-1, vmax=1, extent=extent_mfcc)
axs[1, 1].set_aspect('auto')
err = -np.sqrt(np.mean((mfccs_x.T - mfccs_xe.T)**2, axis=0))
print(np.mean(err))
axs[2, 1].plot(ts_x, err)
axs[2, 1].plot([min(ts_x), max(ts_x)], [np.mean(err), np.mean(err)], 'k--')
axs[2, 1].set_xlim(min(ts_x), max(ts_x))
axs[2, 1].set_ylim(-5, 0.2)
axs[2, 1].set_xlabel('Time (s)')
axs[2, 1].grid()

fig.savefig('../doc/media/control_network_spectrogram_.pdf', bbox_inches='tight')


0 --> -0.9765524809873503
1 --> -1.1808174964200375
2 --> -1.1965627846355837
3 --> -1.0581228203476254
4 --> -0.9999949165223575
5 --> -1.0286517547132525
6 --> -1.001001173461883
7 --> -1.1711381796270222
8 --> -0.9853842660259826
9 --> -1.160913628648073
10 --> -1.01060159941192
11 --> -1.0404142789618576
12 --> -0.9536047139706527
13 --> -1.0543453249269936
14 --> -0.9034262340837026
15 --> -0.9956026783130717

In [None]:
## PCA

In [None]:
Xs = control_network.read_control_sequences('data/control_sequences/000')

In [None]:
X = np.concatenate(Xs)
Xmin, Xmax = np.min(X, axis=0), np.max(X, axis=0)
Xoffs = -0.5 * (Xmin + Xmax)
Xscale = 2.0 / np.maximum(.1, (Xmax - Xmin))
X = (X + Xoffs) * Xscale

In [None]:
X = np.concatenate(Xs)[:, 1:]

Xmin, Xmax = np.min(X, axis=0), np.max(X, axis=0)
Xoffs = -0.5 * (Xmin + Xmax)
Xscale = 2.0 / np.maximum(.1, (Xmax - Xmin))

X = (X + Xoffs) * Xscale

# Compute the PCA
X = X - np.mean(X, axis=0)
E, V = np.linalg.eigh(X.T @ X)
print(E.shape, V.shape)

In [None]:
fig, ax = plt.subplots()
ax.plot(E / np.max(E))

In [None]:
rmses = np.zeros(15)
for n_dims in range(1, 16):
    XP = V.T[-n_dims:] @ X.T
    XE = XP.T @ V.T[-n_dims:]
    rmses[n_dims - 1] = np.sqrt(np.mean((X - XE)**2))

In [None]:
fig, ax = plt.subplots(figsize=(4.3, 3.25))
ax.semilogy(list(range(1, 14)), rmses[:13] / 0.64, '--+', color='k')
ax.set_ylim(2.5e-2, 2)
ax.set_ylabel('Relative RMSE')
ax.set_xlabel('Number of principal components')

fig.savefig('../doc/media/control_network_pca.pdf', bbox_inches='tight')

In [None]:
# Load some sequence and project it onto the first two principal components
X = control_network.read_control_sequence('data/control_sequences/000/0049_776f3a41.prm.gz')
X = X[:, 1:]
n_dims = 8
X = (X + Xoffs) * Xscale
X = X - np.mean(X, axis=0)
XP = V.T[-n_dims:] @ X.T

# Project back and plot
XE = XP.T @ V.T[-n_dims:]

In [None]:
fig, axs = plt.subplots(15, 1, figsize=(10, 16))
for i in range(15):
    axs[i].plot(X[:, i] / Xscale[i] - Xoffs[i]);
    axs[i].plot(XE[:, i] / Xscale[i] - Xoffs[i]);

# Hilbert-Curve based sampling

In [None]:
class HilbertCurve(object):
    """Hilbert curve function.
    Pre-calculates the Hilbert space filling curve with a given number
    of iterations. The curve will lie in the square delimited by the
    points (0, 0) and (1, 1).
    Arguments
    ---------
    n : int
        Iterations.
    """
    # Implementation based on
    # https://en.wikipedia.org/w/index.php?title=Hilbert_curve&oldid=633637210

    def __init__(self, n):
        self.n = n
        self.n_corners = (2 ** n) ** 2
        self.corners = np.zeros((self.n_corners, 2))
        self.steps = np.arange(self.n_corners)

        steps = np.arange(self.n_corners)
        for s in 2 ** np.arange(n):
            r = np.empty_like(self.corners, dtype='int')
            r[:, 0] = 1 & (steps // 2)
            r[:, 1] = 1 & (steps ^ r[:, 0])
            self._rot(s, r)
            self.corners += s * r
            steps //= 4

        self.corners /= (2 ** n) - 1

    def _rot(self, s, r):
        swap = r[:, 1] == 0
        flip = np.all(r == np.array([1, 0]), axis=1)

        self.corners[flip] = (s - 1 - self.corners[flip])
        self.corners[swap] = self.corners[swap, ::-1]

    def __call__(self, u):
        """Evaluate pre-calculated Hilbert curve.
        Arguments
        ---------
        u : ndarray (M,)
            Positions to evaluate on the curve in the range [0, 1].
        Returns
        -------
        ndarray (M, 2)
            Two-dimensional curve coordinates.
        """
        step = np.asarray(u * len(self.steps))
        return np.vstack((
            np.interp(step, self.steps, self.corners[:, 0]),
            np.interp(step, self.steps, self.corners[:, 1]))).T

In [None]:
seqs = control_network.read_control_sequences('data/control_sequences/000')

In [None]:
curve = HilbertCurve(4)
C_sweep = (2 * curve(np.linspace(0, 1, (1 << 10) - 1))) - 1
X_sweep, _ = apply_cnet(C_sweep, cnet_forward)

In [None]:
fig, ax = plt.subplots(figsize=(2.5, 2.5))
ax.plot(C_sweep[:, 0], C_sweep[:, 1])
ax.set_xlabel('Pitch')
ax.set_ylabel('Phoneme')

fig.savefig('../doc/media/hilbert_.pdf', bbox_inches='tight')

In [None]:
mfccs_ref, _, _ = synthesise_trajectory(X_sweep, render=False, config_dict=gnuspeech_trm.TRM.voice_female)

In [None]:
mfccs_ref = np.array(mfccs_ref)
fig, ax = plt.subplots(figsize=(6.25, 0.5))
ax.imshow(norm_img(mfccs_ref.T), origin='lower', extent=(0, 256, 0, 12))
ax.set_aspect('auto')
ax.set_xlabel('Hilbert curve point')
ax.set_ylabel('MFCC coeff.')

fig.savefig('../doc/media/hilbert_mfcc_.pdf', bbox_inches='tight')

In [None]:
def reconstruct_control_seq(mfccs_x, mfccs_ref, cs_ref):
    n, m = mfccs_ref.shape[0], cs_ref.shape[0]
    i = n // 2
    C_reconstructed = []
    for mfcc_x in mfccs_x:
        dist = np.sum((mfccs_ref - mfcc_x[None, :])**2, axis=1)
        reg = np.abs(0.0 * (np.arange(n) - i))
        i = np.argmin(dist + reg)
        t = int((i / n) * m)
        C_reconstructed.append(cs_ref[t])
    return np.array(C_reconstructed)

In [None]:
import sys
N = 0
err = 0
for i, seq in enumerate(seqs[0:1]):
    mfccs_x, _, _ = synthesise_trajectory(seq, render=False, config_dict=gnuspeech_trm.TRM.voice_female)
    mfccs_x = np.array(mfccs_x)
    C_reconstructed = reconstruct_control_seq(mfccs_x, mfccs_ref, C_sweep)
    X_reconstructed, _ = apply_cnet(C_reconstructed, cnet_forward)
    
    twnd_reconstructed = 8.1875e-3
    mfccs_r, spectrum_r, ts_r = synthesise_trajectory(X_reconstructed, render=True, twnd=twnd_reconstructed, config_dict=gnuspeech_trm.TRM.voice_female)
    mfccs_r = np.array(mfccs_r)

    mfccs_x = mfccs_x[:min(mfccs_x.shape[0], mfccs_r.shape[0])]
    mfccs_r = mfccs_r[:min(mfccs_x.shape[0], mfccs_r.shape[0])]
    ts_r = ts_r[:min(mfccs_x.shape[0], mfccs_r.shape[0])]

    N += mfccs_x.shape[0]
    e = -np.sum(np.sqrt(np.mean((mfccs_x.T - mfccs_r.T)**2, axis=0)))
    print(e / mfccs_x.shape[0])
    err += e

    sys.stderr.write('\r{:6.2f}% done; err {:4.2f}'.format(100 * (i + 1) / len(seqs), err / N))
sys.stderr.write('\n')

print(err / N)

In [None]:

mfccs_r = np.array(mfccs_r)
fig, axs = plt.subplots(3, 1, figsize=(6.25, 1.25), sharex=True, gridspec_kw = {'height_ratios':[3, 3, 2]})
axs[0].imshow(norm_img(mfccs_r.T), origin='lower', extent=(np.min(ts_r), np.max(ts_r), 0, 12))
axs[0].set_aspect('auto')

axs[1].imshow(norm_img(mfccs_x.T), origin='lower', extent=(np.min(ts_r), np.max(ts_r), 0, 12))
axs[1].set_aspect('auto')

err = -np.sqrt(np.mean((mfccs_x.T - mfccs_r.T)**2, axis=0))
print(np.mean(err))
axs[2].plot(ts_r, err)
axs[2].plot([min(ts_r), max(ts_r)], [np.mean(err), np.mean(err)], 'k--')
axs[2].set_xlim(min(ts_r), max(ts_r))

fig.savefig('../doc/media/hilbert_resynth_.pdf', bbox_inches='tight')