<a href="https://colab.research.google.com/github/magenta/ddsp/blob/main/ddsp/colab/tutorials/4_core_functions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


##### Copyright 2021 Google LLC.

Licensed under the Apache License, Version 2.0 (the "License");





In [None]:
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

# DDSP Core Functions

This notebook provides some simple demonstrations of using DDSP primitives for synthesis, filtering, and interpolation. Keep in mind that all of these components are fully differentiable and can be integrated with neural networks and end-2-end learning. 

While the preferred API is to use the Synthesizer and Effect Processors that are built around these central components, it is of course possible to call the core functions directly as well.

In [None]:
#@title Install DDSP
!rm -rf /content/miniconda
!curl -L https://repo.anaconda.com/miniconda/Miniconda3-py39_23.11.0-2-Linux-x86_64.sh -o miniconda.sh
!chmod +x miniconda.sh
!sh miniconda.sh -b -p /content/miniconda
!/content/miniconda/bin/pip install tensorflow==2.11 tensorflow-probability==0.19.0 tensorflow-datasets==4.9.0 ddsp==3.7.0
print('\nDone installing DDSP in conda environment!')

In [None]:
#@title Import display helpers
import warnings
warnings.filterwarnings('ignore')
import base64, io, os
import numpy as np
import matplotlib.pyplot as plt
from IPython import display
from scipy.io import wavfile
from scipy import signal as scipy_signal

sample_rate = 16000

def play(array_of_floats, sample_rate=sample_rate):
  if isinstance(array_of_floats, list): array_of_floats = np.array(array_of_floats)
  if len(array_of_floats.shape) == 2: array_of_floats = array_of_floats[0]
  normalizer = float(np.iinfo(np.int16).max)
  array_of_ints = np.array(np.asarray(array_of_floats) * normalizer, dtype=np.int16)
  memfile = io.BytesIO()
  wavfile.write(memfile, sample_rate, array_of_ints)
  html = '<audio controls><source src="data:audio/wav;base64,{}" type="audio/wav"/></audio>'
  html = html.format(base64.b64encode(memfile.getvalue()).decode('ascii'))
  memfile.close()
  display.display(display.HTML(html))

def specplot(audio, vmin=-5, vmax=1, rotate=True, size=512+256):
  if isinstance(audio, list): audio = np.array(audio)
  if len(audio.shape) == 2: audio = audio[0]
  f, t, Sxx = scipy_signal.stft(audio, fs=sample_rate, nperseg=size, noverlap=size*3//4)
  logmag = np.log10(np.abs(Sxx) + 1e-7)
  if rotate: logmag = np.flipud(logmag)
  plt.matshow(logmag, vmin=vmin, vmax=vmax, cmap=plt.cm.magma, aspect='auto')
  plt.xticks([]); plt.yticks([])
  plt.xlabel('Time'); plt.ylabel('Frequency')

def transfer_function(ir, sample_rate=16000):
  if len(ir.shape) == 1: ir = ir[np.newaxis, np.newaxis, :]
  elif len(ir.shape) == 2: ir = ir[np.newaxis, :, :]
  n = ir.shape[-1]
  n_fft = int(2**np.ceil(np.log2(max(n, 2))))
  magnitudes = np.abs(np.fft.rfft(ir, n=n_fft))
  frequencies = np.fft.rfftfreq(n_fft, d=1.0/sample_rate)
  return frequencies, magnitudes

def plot_impulse_responses(impulse_response, desired_magnitudes, sample_rate=16000):
  if isinstance(impulse_response, np.ndarray): ir = impulse_response
  else: ir = np.array(impulse_response)
  if len(ir.shape) == 3: ir_plot = ir[0, 0, :]
  elif len(ir.shape) == 2: ir_plot = ir[0, :]
  else: ir_plot = ir
  frequencies, magnitudes = transfer_function(ir, sample_rate)
  if len(magnitudes.shape) == 3: mag_plot = magnitudes[0, 0, :]
  elif len(magnitudes.shape) == 2: mag_plot = magnitudes[0, :]
  else: mag_plot = magnitudes
  if hasattr(desired_magnitudes, 'numpy'): desired_magnitudes = desired_magnitudes.numpy()
  desired_magnitudes = np.array(desired_magnitudes)
  plt.figure(figsize=(14, 4))
  plt.subplot(131); plt.plot(ir_plot); plt.title('Impulse Response')
  plt.subplot(132); plt.semilogy(frequencies, mag_plot); plt.title('Transfer Function')
  plt.subplot(133)
  freq_d = np.linspace(0, sample_rate/2, len(desired_magnitudes))
  plt.semilogy(freq_d, desired_magnitudes, label='Desired')
  plt.semilogy(frequencies, mag_plot, label='Actual')
  plt.title('Comparison'); plt.legend()

print('Helpers imported!')

# Generation

## `oscillator_bank()`



Synthesize audio with an array of sinusoidal oscillators. Frequencies and amplitudes must be provided at audio rate.


In [None]:
#@title Run generation demos (oscillator_bank, linear_lookup, variable_length_delay)

SCRIPT = r'''

import os, warnings
warnings.filterwarnings("ignore")
import numpy as np
import ddsp
import ddsp.training
import tensorflow as tf
import tensorflow_datasets as tfds

out = '/content/core_outputs'
os.makedirs(out, exist_ok=True)
sample_rate = 16000

# === oscillator_bank: simple harmonic ===
print('--- oscillator_bank: simple harmonic ---')
n_samples = int(sample_rate * 4.0)
n_components = 3
amps = np.linspace(0.3, 0.0, n_samples)
amps = np.tile(amps[np.newaxis, :, np.newaxis], [1, 1, n_components])
frequencies = np.ones([1, n_samples, 1]) * np.array([[[220, 440, 660]]])
audio = ddsp.core.oscillator_bank(frequencies, amps, sample_rate)
audio_np = audio.numpy() if hasattr(audio,'numpy') else np.array(audio)
np.save(f'{out}/osc_simple.npy', audio_np)

# === oscillator_bank: random frequencies ===
print('--- oscillator_bank: random ---')
n_components = 6; n_frames = 100
amps2 = np.linspace(0.3, 0.0, n_samples)
amps2 = np.tile(amps2[np.newaxis, :, np.newaxis], [1, 1, n_components])
freqs = []
for _ in range(n_components):
    f_start = np.random.uniform(20, 4000)
    f_end = np.random.uniform(20, 4000)
    freqs.append(np.linspace(f_start, f_end, n_frames))
freqs = np.stack(freqs).T[np.newaxis, ...]
freqs = ddsp.core.resample(freqs, n_samples)
audio2 = ddsp.core.oscillator_bank(freqs, amps2, sample_rate)
audio2 = audio2 / (np.abs(audio2).max() + 1e-8)
audio2_np = audio2.numpy() if hasattr(audio2,'numpy') else np.array(audio2)
np.save(f'{out}/osc_random.npy', audio2_np)

# === oscillator_bank: swarm ===
print('--- oscillator_bank: swarm ---')
def smooth(x, window_size=2000):
    window = np.ones(window_size) / window_size
    return np.convolve(window, x, mode='same')

n_samples_s = int(sample_rate * 6.0); n_components_s = 100
n_start = int(sample_rate * 1.5); n_stop = int(sample_rate * 4.0)
n_ramp = n_stop - n_start; n_level = n_samples_s - n_stop
amps_s = np.ones([1, n_samples_s, 1])
amps_s = amps_s * np.logspace(0, -2, n_components_s)[np.newaxis, np.newaxis, :]
amps_s[:, :n_start, :] *= np.logspace(-2, 0, n_start)[np.newaxis, :, np.newaxis]
amps_s[:, -2000:, :] *= np.logspace(0, -2, 2000)[np.newaxis, :, np.newaxis]
freq_initial = np.random.uniform(low=240.0, high=280.0, size=10)
harmonics = np.arange(1, 11)
f0 = np.array([0.5, 1, 1, 2.5, 3, 3.5, 4, 4.5, 5, 5.25])
freq_final = 150 * f0
frequencies_s = []
for i, f in zip(freq_initial, freq_final):
    freq = np.concatenate([i * np.ones(n_start), np.linspace(i, f, n_ramp), f * np.ones(n_level)])
    d_freq = smooth(np.concatenate([np.random.uniform(low=0.1, high=1.9, size=n_start),
                                     np.random.uniform(low=0.5, high=1.5, size=n_ramp + n_level)]))
    freq *= d_freq
    frequencies_s.append([freq * h for h in harmonics])
frequencies_s = np.transpose(np.stack(frequencies_s), (2, 1, 0))
frequencies_s = np.reshape(frequencies_s, [1, n_samples_s, -1])
audio_s = ddsp.core.oscillator_bank(frequencies_s, amps_s, sample_rate)
audio_s = audio_s / (np.abs(audio_s).max() + 1e-8)
audio_s_np = audio_s.numpy() if hasattr(audio_s,'numpy') else np.array(audio_s)
np.save(f'{out}/osc_swarm.npy', audio_s_np)

# === linear_lookup: sinusoidal ===
print('--- linear_lookup ---')
n_samples_l = int(sample_rate * 1.0); n_wavetable = 2048; n_cycles = 440
wavetable = tf.sin(tf.linspace(0.0, 2.0 * np.pi, n_wavetable))
wavetable = wavetable[tf.newaxis, tf.newaxis, :]
phase = tf.linspace(0.0, n_cycles, n_samples_l) % 1.0
phase = phase[tf.newaxis, :, tf.newaxis]
output = ddsp.core.linear_lookup(phase, wavetable)
target = np.sin(np.linspace(0.0, 2.0 * np.pi * n_cycles, n_samples_l))
np.save(f'{out}/ll_output.npy', output[0].numpy())
np.save(f'{out}/ll_phase.npy', phase[0, :, 0].numpy())
np.save(f'{out}/ll_target.npy', target)
np.save(f'{out}/ll_wavetable.npy', wavetable[0, 0, :].numpy())

# modulated lookup
modulation = tf.linspace(0.0, 0.5, n_samples_l)
modulation = modulation[tf.newaxis, :, tf.newaxis]
phase2 = (tf.sin(np.pi * phase) + modulation)**2.0 % 1.0
output2 = ddsp.core.linear_lookup(phase2, wavetable)
np.save(f'{out}/ll_output2.npy', output2[0].numpy())
np.save(f'{out}/ll_phase2.npy', phase2[0, :, 0].numpy())

# === wavetable synthesis ===
print('--- wavetable synthesis ---')
n_secs = 3; n_samples_w = int(sample_rate * n_secs)
n_wavetable_w = 2048; n_cycles_w = 110 * n_secs
phase_w = tf.linspace(0.0, n_cycles_w, n_samples_w) % 1.0
phase_w = phase_w[tf.newaxis, :, tf.newaxis]
wt_sin = tf.sin(tf.linspace(0.0, 2.0 * np.pi, n_wavetable_w))
wt_sin = wt_sin[tf.newaxis, tf.newaxis, :]
wt_sq = tf.cast(wt_sin > 0.0, tf.float32) * 2.0 - 1.0
wts = tf.concat([wt_sin, wt_sq], axis=1)
wts = ddsp.core.resample(wts, n_samples_w)
wts_scaled = wts * 0.5
out_mw = ddsp.core.linear_lookup(phase_w, wts_scaled)
np.save(f'{out}/wt_wavetables.npy', wts[0].numpy())
np.save(f'{out}/wt_output.npy', out_mw[0].numpy())
np.save(f'{out}/wt_phase.npy', phase_w[0, :, 0].numpy())

# wavetable_synthesis convenience
n_frames_wt = 100
freqs_wt = 110 * tf.linspace(1.5, 1, n_frames_wt)[tf.newaxis, :, tf.newaxis]
amps_wt = 0.5 * tf.linspace(0.7, 0.001, n_frames_wt)[tf.newaxis, :, tf.newaxis]
wts2 = tf.concat([wt_sin, wt_sq, wt_sin], axis=1)
wts2 = ddsp.core.resample(wts2, n_samples_w)
out_wts = ddsp.core.wavetable_synthesis(freqs_wt, amps_wt, wts2, n_samples=n_samples_w, sample_rate=sample_rate)
np.save(f'{out}/wts_output.npy', out_wts[0].numpy())
np.save(f'{out}/wts_wavetables.npy', wts2[0].numpy())

# === variable_length_delay ===
print('--- variable_length_delay ---')
data_provider = ddsp.training.data.NSynthTfds(split='train')
batch = data_provider.get_batch(batch_size=1, shuffle=False).skip(1)
audio_nsynth = next(iter(tfds.as_numpy(batch)))['audio']
np.save(f'{out}/delay_input.npy', audio_nsynth)

n_samples_d = audio_nsynth.shape[1]
n_seconds_d = n_samples_d / sample_rate

def sin_phase_d(mod_rate):
    phase = tf.sin(tf.linspace(0.0, mod_rate * n_seconds_d * 2.0 * np.pi, n_samples_d))
    phase = (phase[tf.newaxis, :, tf.newaxis] + 1.0) / 2.0
    return phase

for name, mod_rate, mod_ms, center_ms, wet_only in [
    ('flanger', 0.25, 1.5, 0.0, False),
    ('chorus', 2.0, 1.0, 25.0, False),
    ('vibrato', 1.0, 20.0, 0.0, True)]:
    delay_ms = mod_ms + center_ms
    max_length = int(sample_rate / 1000.0 * delay_ms)
    phase_d = sin_phase_d(mod_rate) * (mod_ms / delay_ms) + (center_ms / delay_ms)
    audio_wet = ddsp.core.variable_length_delay(phase_d, audio_nsynth, max_length=max_length)
    if wet_only:
        audio_d = audio_wet
    else:
        audio_d = 0.5 * (audio_nsynth + audio_wet)
    audio_d_np = audio_d.numpy() if hasattr(audio_d,'numpy') else np.array(audio_d)
    np.save(f'{out}/delay_{name}.npy', audio_d_np)
    print(f'  {name} saved')

print('\nGeneration section done!')
'''

with open('/content/core_gen.py', 'w') as f:
  f.write(SCRIPT)

!unset PYTHONPATH PYTHONHOME && /content/miniconda/bin/python /content/core_gen.py

### Ex: Simple harmonic sound

In [None]:
audio = np.load('/content/core_outputs/osc_simple.npy')
play(audio)
specplot(audio)

### Ex: Random frequencies

In [None]:
audio = np.load('/content/core_outputs/osc_random.npy')
play(audio)
specplot(audio)

### Ex: Swarm of sinusoids

Just for fun...

In [None]:
audio = np.load('/content/core_outputs/osc_swarm.npy')
play(audio)
specplot(audio)

## `linear_lookup()`


### Ex: Sinusoidal lookup

As a simple example, lookup from a sin-wave wavetable produces a sin wave at the lookup frequency

In [None]:
output = np.load('/content/core_outputs/ll_output.npy')
phase = np.load('/content/core_outputs/ll_phase.npy')
target = np.load('/content/core_outputs/ll_target.npy')
wavetable = np.load('/content/core_outputs/ll_wavetable.npy')

plt.figure(figsize=(12, 6))
plt.subplot(121); plt.plot(wavetable); plt.title('Wavetable')
plt.subplot(122)
plt.plot(output[:200], label='Output')
plt.plot(phase[:200], label='Oscillator Phase')
plt.plot(target[:200], label='Target')
plt.title('Wavetable lookup'); plt.legend(loc='lower left')
print('Target'); play(target)
print('Output'); play(output)

There are small artifacts due to the linear interpolation and implicit resampling of the signal

In [None]:
output = np.load('/content/core_outputs/ll_output.npy')
target = np.load('/content/core_outputs/ll_target.npy')
plt.plot(target[:200] - output[:200])

In [None]:
output2 = np.load('/content/core_outputs/ll_output2.npy')
phase2 = np.load('/content/core_outputs/ll_phase2.npy')
plt.figure(figsize=(6, 6))
plt.plot(output2[:200], label='Output')
plt.plot(phase2[:200], label='Oscillator Phase')
plt.title('Wavetable lookup'); plt.ylim(-1.5, 1.5)
plt.legend(loc='lower left')
print('Output'); play(output2)

### Ex: Wavetable Synthesis

In [None]:
wavetables = np.load('/content/core_outputs/wt_wavetables.npy')
output_mw = np.load('/content/core_outputs/wt_output.npy')
phase = np.load('/content/core_outputs/wt_phase.npy')
plt.figure(figsize=(12, 6))
plt.subplot(121)
plt.plot(wavetables[0, :]); plt.plot(wavetables[16000, :]); plt.plot(wavetables[32000, :])
plt.title('Wavetable')
plt.subplot(122)
plt.plot(output_mw[:200], label='Output')
plt.plot(phase[:200], label='Oscillator Phase')
plt.title('Wavetable lookup'); plt.legend(loc='lower left')
print('Output'); play(output_mw)

## `wavetable_synthesis()`

In [None]:
wavetables2 = np.load('/content/core_outputs/wts_wavetables.npy')
output_wts = np.load('/content/core_outputs/wts_output.npy')
plt.figure(figsize=(12, 6))
plt.subplot(121)
plt.plot(wavetables2[0, :]); plt.plot(wavetables2[16000, :]); plt.plot(wavetables2[32000, :])
plt.title('Wavetable')
print('Output'); play(output_wts)

## `variable_length_delay()`

In [None]:
audio = np.load('/content/core_outputs/delay_input.npy')
specplot(audio)
play(audio)

### Ex. Flanger

In [None]:
audio = np.load('/content/core_outputs/delay_flanger.npy')
play(audio)
specplot(audio)

### Ex. Chorus

In [None]:
audio = np.load('/content/core_outputs/delay_chorus.npy')
play(audio)
specplot(audio)

### Ex. Vibrato

In [None]:
audio = np.load('/content/core_outputs/delay_vibrato.npy')
play(audio)
specplot(audio)

# Filtering


Time-varying differentiable linear filters (parameterized in frequency space). Impulse responses are designed by `sinc_impulse_response()` and `frequency_impulse_response()` and then applied by `fft_convolve()`. 

`sinc_filter()` and `frequency_filter()` are thin wrappers around filter design and `fft_convolve()`.

In [None]:
#@title Run filtering demos

SCRIPT = r'''

import os, warnings
warnings.filterwarnings("ignore")
import numpy as np
import ddsp
import tensorflow as tf

out = '/content/core_outputs'
os.makedirs(out, exist_ok=True)
sample_rate = 16000

# === fft_convolve: low-pass sweep ===
print('--- fft_convolve ---')
noise = np.random.uniform(-0.5, 0.5, [1, sample_rate * 4])
f_cutoff = np.linspace(0., 1.0, 200)[np.newaxis, :, np.newaxis]
ir = ddsp.core.sinc_impulse_response(f_cutoff, 2048)
filtered = ddsp.core.fft_convolve(noise, ir)
np.save(f'{out}/fft_noise.npy', noise)
filt_np = filtered.numpy() if hasattr(filtered,'numpy') else np.array(filtered)
np.save(f'{out}/fft_filtered.npy', filt_np)

# === sinc_impulse_response: brick-wall ===
print('--- sinc_impulse_response ---')
for tag, fc, ws, sr_arg in [('sinc1', 4000, 2000, sample_rate), ('sinc2', 0.5, 2000, None), ('sinc3', 0.5, 250, None)]:
    if sr_arg:
        ir_s = ddsp.core.sinc_impulse_response(fc, ws, sr_arg)
    else:
        ir_s = ddsp.core.sinc_impulse_response(fc, ws)
    ir_np = ir_s.numpy() if hasattr(ir_s,'numpy') else np.array(ir_s)
    np.save(f'{out}/ir_{tag}.npy', ir_np)
    print(f'  {tag} saved')

# desired magnitudes for brick-wall
half_nyquist = int(2000 / 2)
desired_mags = np.concatenate([np.ones([half_nyquist]), np.zeros([half_nyquist]) + 1e-6])
np.save(f'{out}/desired_mags_sinc.npy', desired_mags)

# === sinc_filter: bandlimited upsampling ===
print('--- sinc_filter ---')
original_sample_rate = 10000
n_samples_s = sample_rate + 1
time_s = tf.linspace(0.0, 1.0, n_samples_s)
signal_s = (tf.linspace(0.0, 100.0, n_samples_s) % 1.0) - 0.5
np.save(f'{out}/sinc_signal.npy', signal_s.numpy())
np.save(f'{out}/sinc_time.npy', time_s.numpy())

# FFT of original
n_fft_o = int(2**np.ceil(np.log2(max(n_samples_s, 2))))
mags_o = np.abs(np.fft.rfft(signal_s.numpy(), n=n_fft_o))
freqs_o = np.fft.rfftfreq(n_fft_o, d=1.0/original_sample_rate)
np.save(f'{out}/sinc_freqs_orig.npy', freqs_o)
np.save(f'{out}/sinc_mags_orig.npy', mags_o)

upsample = 2
upsample_rate = int(original_sample_rate * upsample)
n_upsample = int(n_samples_s * upsample)

# Box upsampling
signal_up = tf.compat.v1.image.resize_nearest_neighbor(
    signal_s[tf.newaxis, :, tf.newaxis, tf.newaxis], [n_upsample, 1])[0, :, 0, 0]
n_fft_u = int(2**np.ceil(np.log2(max(n_upsample, 2))))
mags_up = np.abs(np.fft.rfft(signal_up.numpy(), n=n_fft_u))
freqs_up = np.fft.rfftfreq(n_fft_u, d=1.0/upsample_rate)
np.save(f'{out}/sinc_freqs_up.npy', freqs_up)
np.save(f'{out}/sinc_mags_up.npy', mags_up)
np.save(f'{out}/sinc_signal_up.npy', signal_up.numpy())

# Bilinear upsampling
signal_up_bl = ddsp.core.resample(signal_s[tf.newaxis, :, tf.newaxis], n_upsample)[0, :, 0]
mags_up_bl = np.abs(np.fft.rfft(signal_up_bl.numpy(), n=n_fft_u))
np.save(f'{out}/sinc_mags_up_bl.npy', mags_up_bl)
np.save(f'{out}/sinc_signal_up_bl.npy', signal_up_bl.numpy())

# Anti-aliased
cutoff_freq = tf.ones([1, 1, 1]) * 0.5
signal_filt = ddsp.core.sinc_filter(signal_up[tf.newaxis, :], cutoff_freq, window_size=1024)[0]
mags_filt = np.abs(np.fft.rfft(signal_filt.numpy(), n=n_fft_u))
np.save(f'{out}/sinc_mags_filt.npy', mags_filt)
np.save(f'{out}/sinc_signal_filt.npy', signal_filt.numpy())

# === frequency_impulse_response ===
print('--- frequency_impulse_response ---')
n_freq = 512
mags_fi = (tf.linspace(1.0, 0.001, n_freq) + 0.1 * tf.sin(tf.linspace(0.0, 2.0*np.pi*8, n_freq)))
mags_fi = mags_fi[tf.newaxis, tf.newaxis, :]
ir_fi = ddsp.core.frequency_impulse_response(mags_fi, window_size=0)
np.save(f'{out}/freq_ir1.npy', ir_fi.numpy() if hasattr(ir_fi,'numpy') else np.array(ir_fi))
np.save(f'{out}/freq_desired_mags.npy', mags_fi[0,0,:].numpy())

ir_fi2 = ddsp.core.frequency_impulse_response(mags_fi, window_size=80)
np.save(f'{out}/freq_ir2.npy', ir_fi2.numpy() if hasattr(ir_fi2,'numpy') else np.array(ir_fi2))

# === frequency_filter ===
print('--- frequency_filter ---')
n_samples_ff = int(sample_rate * 4.0)
audio_in = tf.random.uniform([1, n_samples_ff], -0.5, 0.5)
np.save(f'{out}/ff_audio_in.npy', audio_in.numpy())

for tag, n_freq_ff, n_frames_ff, ws in [('ff1', 1000, 1, 0), ('ff2', 32, 1, 0), ('ff3', 1000, 1, 32)]:
    mags_ff = tf.sin(tf.linspace(0.0, 10.0, n_freq_ff))**4.0
    mags_ff = mags_ff[tf.newaxis, tf.newaxis, :]
    ao = ddsp.core.frequency_filter(audio_in, mags_ff, window_size=ws)
    np.save(f'{out}/{tag}.npy', ao.numpy() if hasattr(ao,'numpy') else np.array(ao))

# Time-varying filters
for tag, n_frames_tv in [('ff_tv250', 250), ('ff_tv15', 15)]:
    n_freq_tv = 1000
    mags_tv = [tf.sin(tf.linspace(0.0, w, n_freq_tv))**4.0 for w in np.linspace(4.0, 40.0, n_frames_tv)]
    mags_tv = tf.stack(mags_tv)[tf.newaxis, :, :]
    ao_tv = ddsp.core.frequency_filter(audio_in, mags_tv)
    np.save(f'{out}/{tag}.npy', ao_tv.numpy() if hasattr(ao_tv,'numpy') else np.array(ao_tv))

print('\nFiltering section done!')
'''

with open('/content/core_filter.py', 'w') as f:
  f.write(SCRIPT)

!unset PYTHONPATH PYTHONHOME && /content/miniconda/bin/python /content/core_filter.py

## `fft_convolve()`


In [None]:
noise = np.load('/content/core_outputs/fft_noise.npy')
filtered = np.load('/content/core_outputs/fft_filtered.npy')
specplot(noise); specplot(filtered)
play(noise); play(filtered)

## `sinc_impulse_response()`


### Ex: Brick-wall filter


In [None]:
desired_mags = np.load('/content/core_outputs/desired_mags_sinc.npy')
for tag in ['sinc1', 'sinc2', 'sinc3']:
    ir = np.load(f'/content/core_outputs/ir_{tag}.npy')
    print(f'--- {tag} ---')
    plot_impulse_responses(ir, desired_mags)

## `sinc_filter()`


### Ex: Bandlimited Upsampling

Let's start with a triangle wave at 100 Hz, sampled at 10kHz.

In [None]:
signal = np.load('/content/core_outputs/sinc_signal.npy')
time_s = np.load('/content/core_outputs/sinc_time.npy')
freqs_o = np.load('/content/core_outputs/sinc_freqs_orig.npy')
mags_o = np.load('/content/core_outputs/sinc_mags_orig.npy')
original_sample_rate = 10000

plt.figure(figsize=(12, 6))
plt.subplot(121); plt.plot(time_s[:200], signal[:200]); plt.title('Amplitude (time)')
plt.subplot(122); plt.semilogy(freqs_o, mags_o); plt.title('Magnitude (frequency)')
print('Original'); play(signal, sample_rate=original_sample_rate)

In [None]:
freqs_up = np.load('/content/core_outputs/sinc_freqs_up.npy')
mags_up = np.load('/content/core_outputs/sinc_mags_up.npy')
mags_up_bl = np.load('/content/core_outputs/sinc_mags_up_bl.npy')
signal_up = np.load('/content/core_outputs/sinc_signal_up.npy')
signal_up_bl = np.load('/content/core_outputs/sinc_signal_up_bl.npy')
mags_filt = np.load('/content/core_outputs/sinc_mags_filt.npy')
signal_filt = np.load('/content/core_outputs/sinc_signal_filt.npy')
upsample_rate = 20000

plt.figure(figsize=(12, 6))
plt.subplot(121)
plt.semilogy(freqs_up, mags_up, label='box upsample')
plt.semilogy(freqs_up, mags_up_bl, label='bilinear upsample')
plt.semilogy(freqs_o, mags_o, label='original')
plt.ylim(1e-3, 1e4); plt.title('Magnitude (frequency)'); plt.legend()
print('Box upsample'); play(signal_up, sample_rate=upsample_rate)
print('Bilinear upsample'); play(signal_up_bl, sample_rate=upsample_rate)

plt.figure(figsize=(12, 6))
plt.subplot(121)
plt.semilogy(freqs_up, mags_up, label='box upsample')
plt.semilogy(freqs_up, mags_filt, label='anti-aliased')
plt.ylim(1e-3, 1e4); plt.title('Magnitude (frequency)'); plt.legend()
print('Anti-aliased'); play(signal_filt, sample_rate=upsample_rate)

## `frequency_impulse_response()`


In [None]:
desired_mags = np.load('/content/core_outputs/freq_desired_mags.npy')
for tag in ['freq_ir1', 'freq_ir2']:
    ir = np.load(f'/content/core_outputs/{tag}.npy')
    print(f'--- {tag} ---')
    plot_impulse_responses(ir, desired_mags)

## `frequency_filter()`


In [None]:
audio_in = np.load('/content/core_outputs/ff_audio_in.npy')
for tag, title in [('ff1','1000 freq bands'), ('ff2','32 freq bands'), ('ff3','1000 bands, window=32')]:
    audio_out = np.load(f'/content/core_outputs/{tag}.npy')
    print(title); play(audio_out); specplot(audio_out)
for tag, title in [('ff_tv250','Time-varying, 250 frames'), ('ff_tv15','Time-varying, 15 frames')]:
    audio_out = np.load(f'/content/core_outputs/{tag}.npy')
    print(title); play(audio_out); specplot(audio_out)

# Resampling

## `resample()`


In [None]:
#@title Run resampling demos

SCRIPT = r'''

import os, warnings
warnings.filterwarnings("ignore")
import numpy as np
import ddsp

out = '/content/core_outputs'
os.makedirs(out, exist_ok=True)

n_coarse = 9; n_fine = 16000
coarse = 1.0 - np.sin(np.linspace(0, np.pi, n_coarse))[np.newaxis, :, np.newaxis]

# resample add_endpoint=False
fine = ddsp.core.resample(coarse, n_fine, add_endpoint=False)
fine_np = fine.numpy() if hasattr(fine,'numpy') else np.array(fine)
np.save(f'{out}/resample_noep.npy', fine_np)

# resample add_endpoint=True (default)
fine2 = ddsp.core.resample(coarse, n_fine)
fine2_np = fine2.numpy() if hasattr(fine2,'numpy') else np.array(fine2)
np.save(f'{out}/resample_ep.npy', fine2_np)

# cubic
fine3 = ddsp.core.resample(coarse, n_fine, method='cubic', add_endpoint=False)
fine3_np = fine3.numpy() if hasattr(fine3,'numpy') else np.array(fine3)
np.save(f'{out}/resample_cubic.npy', fine3_np)

# downsampling
fine_down = 1.0 - np.sin(np.linspace(0, np.pi, n_fine))[np.newaxis, :, np.newaxis]
coarse_d = ddsp.core.resample(fine_down, n_coarse, add_endpoint=False)
coarse_d_np = coarse_d.numpy() if hasattr(coarse_d,'numpy') else np.array(coarse_d)
np.save(f'{out}/resample_down_noep.npy', coarse_d_np)
np.save(f'{out}/resample_down_fine.npy', fine_down)

coarse_d2 = ddsp.core.resample(fine_down, n_coarse)
coarse_d2_np = coarse_d2.numpy() if hasattr(coarse_d2,'numpy') else np.array(coarse_d2)
np.save(f'{out}/resample_down_ep.npy', coarse_d2_np)

# upsample_with_windows
n_coarse_w = 5
coarse_w = 1.0 - np.sin(np.linspace(0, np.pi, n_coarse_w))[np.newaxis, :, np.newaxis]
fine_w = ddsp.core.upsample_with_windows(coarse_w, n_fine, add_endpoint=False)
fine_w_np = fine_w.numpy() if hasattr(fine_w,'numpy') else np.array(fine_w)
np.save(f'{out}/upwin_noep.npy', fine_w_np)
np.save(f'{out}/upwin_coarse.npy', coarse_w)

fine_w2 = ddsp.core.upsample_with_windows(coarse_w, n_fine)
np.save(f'{out}/upwin_ep.npy', fine_w2.numpy() if hasattr(fine_w2,'numpy') else np.array(fine_w2))

fine_w3 = ddsp.core.resample(coarse_w, n_fine, method='window')
np.save(f'{out}/upwin_method.npy', fine_w3.numpy() if hasattr(fine_w3,'numpy') else np.array(fine_w3))

fine_w4 = ddsp.core.resample(coarse_w, n_fine, method='cubic')
np.save(f'{out}/upwin_cubic.npy', fine_w4.numpy() if hasattr(fine_w4,'numpy') else np.array(fine_w4))

np.save(f'{out}/resample_coarse.npy', coarse)

print('Resampling section done!')
'''

with open('/content/core_resample.py', 'w') as f:
  f.write(SCRIPT)

!unset PYTHONPATH PYTHONHOME && /content/miniconda/bin/python /content/core_resample.py

In [None]:
n_coarse = 9; n_fine = 16000
coarse = np.load('/content/core_outputs/resample_coarse.npy')
fine = np.load('/content/core_outputs/resample_noep.npy')
plt.plot(np.linspace(0, n_fine, n_coarse), coarse[0, :, 0], 'o', label='coarse')
plt.plot(np.linspace(0, n_fine, n_fine), fine[0, :, 0], label='fine')
plt.title('Bilinear upsampling ({} points, {} intervals)'.format(n_coarse, n_coarse - 1))
plt.legend(loc='lower right'); _ = plt.ylim(-0.1, 1.1)

In [None]:
fine = np.load('/content/core_outputs/resample_ep.npy')
n_adjusted = int(n_fine / n_coarse * (n_coarse - 1))
plt.plot(np.linspace(0, n_adjusted, n_coarse), coarse[0, :, 0], 'o', label='coarse')
plt.plot(np.linspace(0, n_fine, n_fine), fine[0, :, 0], label='fine')
plt.title('Bilinear upsampling ({} points, {} intervals)'.format(n_coarse, n_coarse))
plt.legend(loc='lower right'); _ = plt.ylim(-0.1, 1.1)

In [None]:
fine = np.load('/content/core_outputs/resample_cubic.npy')
plt.plot(np.linspace(0, n_fine, n_coarse), coarse[0, :, 0], 'o', label='coarse')
plt.plot(np.linspace(0, n_fine, n_fine), fine[0, :, 0], label='fine')
plt.title('Bicubic upsampling ({} points, {} intervals)'.format(n_coarse, n_coarse - 1))
plt.legend(loc='lower right'); _ = plt.ylim(-0.1, 1.1)

In [None]:
fine_down = np.load('/content/core_outputs/resample_down_fine.npy')
coarse_d = np.load('/content/core_outputs/resample_down_noep.npy')
plt.plot(np.linspace(0, n_coarse, n_fine), fine_down[0, :, 0], label='fine')
plt.plot(np.linspace(0, n_coarse, n_coarse), coarse_d[0, :, 0], 'o', label='coarse')
plt.title('Bilinear downsampling'); plt.legend(loc='lower right')
plt.xlim(-0.5, 10.5); _ = plt.ylim(-0.1, 1.1)

In [None]:
coarse_d2 = np.load('/content/core_outputs/resample_down_ep.npy')
plt.plot(np.linspace(0, n_coarse, n_fine), fine_down[0, :, 0], label='fine')
plt.plot(np.linspace(0, n_coarse - 1, n_coarse), coarse_d2[0, :, 0], 'o', label='coarse')
plt.title('Bilinear downsampling (add_endpoint)'); plt.legend(loc='lower right')
plt.xlim(-0.5, 10.5); _ = plt.ylim(-0.1, 1.1)

## `upsample_with_windows()`

In [None]:
n_coarse_w = 5; n_fine = 16000
coarse_w = np.load('/content/core_outputs/upwin_coarse.npy')
for tag, title in [('upwin_noep','no endpoint'),('upwin_ep','with endpoint'),('upwin_method','method=window'),('upwin_cubic','bicubic comparison')]:
    fine_w = np.load(f'/content/core_outputs/{tag}.npy')
    plt.figure(figsize=(8, 4))
    plt.plot(np.linspace(0, n_fine, n_coarse_w), coarse_w[0, :, 0], 'o', label='coarse')
    plt.plot(np.linspace(0, n_fine, n_fine), fine_w[0, :, 0], label='fine')
    plt.title(f'Upsample with windows ({title})'); plt.legend(loc='lower right')
    _ = plt.ylim(-0.1, 1.1)