In [1]:
# Fractional Delay Line

from scipy import signal
from scipy.fftpack import fft, ifft
import numpy as np

from IPython.display import Audio 
import soundfile as sf


import numpy as np

x, Fs = sf.read("snare.wav")

import numpy as np

allpass_params = [
    (210, 0.75),   # APF1
    (158, 0.75),   # APF2
    (561, 0.625),  # APF3
    (410, 0.625)   # APF4
]

predelay = 300        # e.g. 500 samples (~11 ms @ 44.1kHz)
lpf_a = 0.9            # darkish initial LPF


class DelayLine:
    def __init__(self, max_size):
        """
        A simple circular buffer delay line.
        max_size should be >= the largest delay you plan to use.
        """
        self.size = max_size
        self.buffer = np.zeros(max_size, dtype=np.float32)
        self.write_pos = 0

    def write(self, sample):
        """
        Write a single sample into the delay line.
        Advance the write pointer (wrapping with modulo).
        """
        self.buffer[self.write_pos] = sample
        self.write_pos = (self.write_pos + 1) % self.size

    def tap(self, delay_samples):
        """
        Read a sample 'delay_samples' behind the write pointer.
        """
        read_index = (self.write_pos - delay_samples) % self.size
        return self.buffer[read_index]

class OnePoleLPF:
    def __init__(self, a=0.5):
        """
        a in [0,1), typical range ~0.2-0.95
        The closer to 1, the darker the sound.
        """
        self.a = a
        self.y_prev = 0.0  # filter memory

    def process_sample(self, x):
        """
        Process one sample 'x' through the 1-pole low-pass filter.
        """
        y = self.a * self.y_prev + (1.0 - self.a) * x
        self.y_prev = y
        return y

class AllPassFilter:
    def __init__(self, delay_samples, g, max_size):
        """
        All-pass filter using a circular buffer for the delay.
        delay_samples: D
        g: feedback/feedforward coefficient
        max_size: must be >= delay_samples
        """
        self.D = delay_samples
        self.g = g
        self.delay_line = DelayLine(max_size)

    def process_sample(self, x: float) -> float:
        """
        One typical all-pass implementation:
          y = delayed_value - g * x
          new_value = x + g * y
          delay_line.write(new_value)

        Then output y.

        Alternatively, the sign might be reversed:
          y = -g*x + delayed_value
          new_value = x + g*y
        They are mathematically similar, just a matter of sign convention.
        """
        delayed_value = self.delay_line.tap(self.D)
        y = delayed_value - self.g * x
        new_value = x + self.g * y
        self.delay_line.write(new_value)
        return y

def apply_early_reflections(
    x: np.ndarray,
    predelay_samples: int,
    lpf_a: float,
    ap_params: list,
    max_size: int = 20000
):
    """
    :param x:         Input audio (1D numpy array).
    :param predelay_samples: Delay in samples for the predelay.
    :param lpf_a:     The 1-pole LPF coefficient 'a' for the initial filter.
    :param ap_params: List of (D, g) tuples for each allpass filter in series.
    :param max_size:  For each internal DelayLine, must be >= max needed delay.

    Returns a numpy array with enough extra length so that
    the entire "chain" output is captured.
    """

    # 1) Setup our submodules:
    # -- A simple DelayLine for predelay
    predelay_line = DelayLine(max_size=max_size)
    # -- The 1-pole LPF
    lpf = OnePoleLPF(a=lpf_a)
    # -- The series of AllPassFilters
    allpasses = []
    for (D, g) in ap_params:
        allpasses.append(AllPassFilter(D, g, max_size=max_size))

    # 2) Determine how long the output should be
    sum_of_ap_delays = sum(D for (D, _) in ap_params)
    out_len = len(x) + predelay_samples + sum_of_ap_delays

    y = np.zeros(out_len, dtype=x.dtype)

    # 3) Process sample-by-sample
    for n in range(out_len):
        # If within input range, feed x[n], else 0.0
        in_sample = x[n] if n < len(x) else 0.0

        # A) Predelay
        predelay_line.write(in_sample)
        delayed_sample = predelay_line.tap(predelay_samples)

        # B) LPF
        filtered_sample = lpf.process_sample(delayed_sample)

        # C) Series Allpass Filters
        ap_out = filtered_sample
        for apf in allpasses:
            ap_out = apf.process_sample(ap_out)

        # D) Final output is after the last AP
        y[n] = ap_out

    return y


earlyRefProcessed = apply_early_reflections(
    x,
    predelay_samples=predelay,
    lpf_a=lpf_a,
    ap_params=allpass_params,
    max_size=50000  # must be >= largest AP delay (561 or 410)
)

class ModulatedAllPass:
    def __init__(self, max_size: int, g: float, initial_delay: float):
        """
        A modulated Schroeder allpass filter with fractional delay.
        
        Args:
            max_size:       Size of internal buffer (must be >= the max delay you need).
            g:              Allpass feedback/feedforward coefficient.
            initial_delay:  Initial delay in samples (can be float).
        """
        self.size = max_size
        self.buffer = np.zeros(max_size, dtype=np.float32)
        self.write_pos = 0

        self.g = g
        self.delay = initial_delay  # in samples, can be float

    def set_delay(self, new_delay: float):
        """
        Update the internal delay time (in samples). 
        This can be changed each sample if you want continuous modulation.
        """
        self.delay = new_delay

    def process_sample(self, x: float) -> float:
        """
        Process one input sample through the modulated allpass filter.

        Allpass formula (one variant):
          delayed_val = fractionalTap(delay)
          y = delayed_val - g * x
          new_val = x + g * y
          write(new_val)
          output = y
        """
        delayed_val = self._fractional_tap(self.delay)
        y = delayed_val - self.g * x
        new_val = x + self.g * y

        # Write new_val into buffer
        self.buffer[self.write_pos] = new_val
        self.write_pos = (self.write_pos + 1) % self.size

        return y

    def tap(self, tap_delay: float) -> float:
        """
        If you want to 'grab' a signal from inside the delay line 
        (for creating taps or crossfeeds), you can call this.
        """
        return self._fractional_tap(tap_delay)

    def _fractional_tap(self, delay_samples: float) -> float:
        """
        Reads from the circular buffer 'delay_samples' behind write_pos,
        using linear interpolation for fractional positions.
        """
        # Which sample index are we reading from?
        # We'll break delay_samples into integer + fractional parts:
        d_int = int(np.floor(delay_samples))
        frac = delay_samples - d_int

        # 'base_index' is the integer index behind write_pos
        base_index = self.write_pos - d_int

        # We want to read at base_index and base_index-1 to interpolate
        idx_a = base_index % self.size
        idx_b = (base_index - 1) % self.size

        # Sample values
        val_a = self.buffer[idx_a]
        val_b = self.buffer[idx_b]

        # Linear interpolate:  val = val_a + frac*(val_b - val_a)
        val = val_a + frac * (val_b - val_a)
        return val


# Listen in a Jupyter notebook:
Audio(earlyRefProcessed, rate=Fs)


In [2]:
def apply_early_reflections_debug(
    x: np.ndarray,
    predelay_samples: int,
    lpf_a: float,
    allpass_filters,  # list of either AllPassFilter or ModulatedAllPass objects
    out_extra_samples: int = 0
):
    """
    Extended version that:
      1) Returns intermediate signals for debugging/listening.
      2) Lets you supply any mix of fixed allpass or modulated allpass.
      3) Optionally extends output length by 'out_extra_samples' 
         if you want extra tail space.

    :param x:               Input audio (1D).
    :param predelay_samples:Number of samples for predelay.
    :param lpf_a:           Coefficient 'a' for the 1-pole lowpass.
    :param allpass_filters: A list of AP filter objects 
                            (either fixed or modulated).
    :param out_extra_samples:Add extra zero-padding if needed.
    
    :return: A dictionary with:
        {
          "predelay": np.ndarray,
          "lpf_out":  np.ndarray,
          "ap_outs":  [array after AP1, array after AP2, ...],
          "final":    final array,
        }
    """
    max_size = max(fil.size for fil in allpass_filters) if allpass_filters else 20000

    # -- Setup Predelay
    predelay_line = DelayLine(max_size)
    # -- Setup the 1-pole Lowpass
    lpf = OnePoleLPF(a=lpf_a)

    # We guess an output length. If you want to be absolutely sure 
    # you capture all tails, pick something like:
    #   out_len = len(x) + predelay_samples + sum_of_delays + out_extra_samples
    # We'll do a simpler approach here:
    out_len = len(x) + predelay_samples + out_extra_samples

    # Pre-allocate intermediate output arrays
    predelay_out = np.zeros(out_len, dtype=x.dtype)
    lpf_out      = np.zeros(out_len, dtype=x.dtype)
    ap_outs      = [np.zeros(out_len, dtype=x.dtype) for _ in allpass_filters]

    for n in range(out_len):
        # Input sample or 0 if beyond length
        in_sample = x[n] if n < len(x) else 0.0

        # 1) Predelay
        predelay_line.write(in_sample)
        delayed_samp = predelay_line.tap(predelay_samples)
        predelay_out[n] = delayed_samp

        # 2) LPF
        filtered_samp = lpf.process_sample(delayed_samp)
        lpf_out[n] = filtered_samp

        # 3) Series AllPass
        ap_signal = filtered_samp
        for i, apf in enumerate(allpass_filters):
            ap_signal = apf.process_sample(ap_signal)
            ap_outs[i][n] = ap_signal

    return {
        "predelay": predelay_out,
        "lpf_out": lpf_out,
        "ap_outs": ap_outs,        # a list of arrays, one per allpass
        "final": ap_outs[-1] if ap_outs else lpf_out
    }

stages = apply_early_reflections_debug(
    x,
    predelay_samples=500,
    lpf_a=0.7,
    allpass_filters=[ # pass in your AP or MAPF objects
        ModulatedAllPass(max_size=20000, g=0.7, initial_delay=1343.0),
        # more AP filters...
    ],
    out_extra_samples=3000
)

# Listen to the final output
# Audio(stages["final"], rate=Fs)

# Alternatively, listen to the modulated AP output after first AP:
# Audio(stages["ap_outs"][0], rate=Fs)

# 1) Load audio
x, Fs = sf.read("snare.wav")
if x.ndim > 1:
    x = x[:, 0]

# 2) Build your modulated AP + other APs
mapf1 = ModulatedAllPass(max_size=20000, g=0.7, initial_delay=1343.0)
apf2  = AllPassFilter(158,  0.75, 20000)
apf3  = AllPassFilter(561,  0.625, 20000)
apf4  = AllPassFilter(410,  0.625, 20000)

allpass_chain = [mapf1, apf2, apf3, apf4]

# 3) We can do a simple post-processing loop 
#    that sets mapf1 delay with a slow LFO:
lfo_rate = 0.1  # Hz
phase = 0.0
phase_inc = 2*np.pi*lfo_rate / Fs

# 4) We'll do one "manually-coded" pass for the LFO:
out_len = len(x) + 2000  # extra samples for tail
stages_out = {
    "predelay": np.zeros(out_len),
    "lpf_out":  np.zeros(out_len),
    "ap_outs":  [np.zeros(out_len) for _ in allpass_chain],
    "final":    np.zeros(out_len)
}
# Create the DelayLine, LPF as in the function, but let's replicate the logic
predelay = DelayLine(20000)
lpf = OnePoleLPF(a=0.7)
predelay_samples = 500

for n in range(out_len):
    in_samp = x[n] if n < len(x) else 0.0
    # predelay
    predelay.write(in_samp)
    pd = predelay.tap(predelay_samples)
    stages_out["predelay"][n] = pd

    # lpf
    lpf_out = lpf.process_sample(pd)
    stages_out["lpf_out"][n] = lpf_out

    # modulate MAPF1
    new_delay = 1343.0 + 12.0 * np.sin(phase)
    mapf1.set_delay(new_delay)
    phase += phase_inc

    # series allpasses
    ap_signal = lpf_out
    for i, apf_obj in enumerate(allpass_chain):
        ap_signal = apf_obj.process_sample(ap_signal)
        stages_out["ap_outs"][i][n] = ap_signal

stages_out["final"] = stages_out["ap_outs"][-1]

# 5) Listen:
Audio(stages_out["final"], rate=Fs)

In [3]:
class EarlyReflectionsChain:
    """
    Sample-by-sample chain:
      1) Predelay
      2) LPF1
      3) APF1->APF2->APF3->APF4
    """
    def __init__(self, predelay_samples, lpf_a, apf_params, max_size):
        self.predelay = DelayLine(max_size)
        self.predelay_samples = predelay_samples
        self.lpf1 = OnePoleLPF(a=lpf_a)
        self.allpasses = []
        for (D, g) in apf_params:
            self.allpasses.append(AllPassFilter(D, g, max_size))

    def process_sample(self, x: float) -> float:
        # 1) predelay
        self.predelay.write(x)
        delayed_sample = self.predelay.tap(self.predelay_samples)

        # 2) LPF1
        filtered_sample = self.lpf1.process_sample(delayed_sample)

        # 3) 4 allpasses in series
        ap_out = filtered_sample
        for apf in self.allpasses:
            ap_out = apf.process_sample(ap_out)

        return ap_out
class FeedbackDelayChain:
    """
    One "tank" or "feedback chain":
        MAPF (or APF) -> Delay1 -> LPF -> APF -> Delay2
    We'll let you pass in either a ModulatedAllPass or an AllPassFilter 
    for the first stage.
    """
    def __init__(self, 
                 mapf,  # either ModulatedAllPass or AllPassFilter
                 delay1_samples, 
                 lpf_a,
                 apf_delay, apf_g,
                 delay2_samples,
                 max_size):
        self.mapf = mapf  # e.g. MAPF with g=0.7, initial_delay=1343
        self.delay1 = DelayLine(max_size)
        self.delay1_samples = delay1_samples
        self.lpf2 = OnePoleLPF(lpf_a)
        self.apf5 = AllPassFilter(apf_delay, apf_g, max_size)
        self.delay2 = DelayLine(max_size)
        self.delay2_samples = delay2_samples

    def process_sample(self, x: float) -> float:
        # 1) MAPF
        mapf_out = self.mapf.process_sample(x)

        # 2) Delay1
        self.delay1.write(mapf_out)
        d1_out = self.delay1.tap(self.delay1_samples)

        # 3) LPF2
        lpf2_out = self.lpf2.process_sample(d1_out)

        # 4) APF5
        apf5_out = self.apf5.process_sample(lpf2_out)

        # 5) Delay2
        self.delay2.write(apf5_out)
        d2_out = self.delay2.tap(self.delay2_samples)

        return d2_out
    
def process_full_reverb(x, Fs, N_extra=40000):
    """
    Example single-loop reverb with:
      - Early reflections chain
      - Two feedback/delay chains, each mixing the other's output

    x: input signal (1D)
    Fs: sample rate
    N_extra: how many extra samples we process beyond len(x) to let the tail ring out

    Returns:
      c1_out_arr, c2_out_arr (the outputs of chain1 and chain2)
    """
    # 1) Initialize EarlyReflections
    earlyRef = EarlyReflectionsChain(
        predelay_samples=300,  # your big predelay
        lpf_a=0.9,
        apf_params=[(210, 0.75), (158, 0.75), (561, 0.625), (410, 0.625)],
        max_size=100000  # Must be large enough
    )
    
    # 2) Initialize the two feedback/delay chains
    #    We'll do chain1 with a ModulatedAllPass, chain2 with a normal AllPass
    mapf1 = ModulatedAllPass(100000, g=0.7, initial_delay=1343.0)
    chain1 = FeedbackDelayChain(
        mapf=mapf1,
        delay1_samples=6241,
        lpf_a=0.8,
        apf_delay=3931, apf_g=0.5,
        delay2_samples=4681,
        max_size=100000
    )
    
    mapf2 = ModulatedAllPass(100000, g=0.7, initial_delay=995.0)  # or AllPassFilter
    chain2 = FeedbackDelayChain(
        mapf=mapf2,
        delay1_samples=6590,
        lpf_a=0.8,
        apf_delay=2664, apf_g=0.5,
        delay2_samples=5505,
        max_size=100000
    )

    # 3) We'll define how the cross-summing works:
    #    Suppose chain1_in = earlyRefOut + (decay factor)*chain2_out_prev
    #    chain2_in = earlyRefOut + (decay factor)*chain1_out_curr
    # Adjust the logic to your diagram. 
    # If your diagram calls for a 1-sample delay in cross-feed, 
    # you can store chain2_out_prev in a variable from iteration to iteration, etc.

    decay = 0.8  # for example
    length = len(x) + N_extra
    c1_out_arr = np.zeros(length)
    c2_out_arr = np.zeros(length)

    chain2_out_prev = 0.0  # store previous sample from chain2

    # optional LFO for mapf1, mapf2
    import math
    lfo_rate = 0.1
    phase1 = 0.0
    phase2 = math.pi  # offset if you want
    phase_inc = 2 * math.pi * (lfo_rate / Fs)

    for n in range(length):
        in_sample = x[n] if n < len(x) else 0.0

        # (A) Early reflections
        early_out = earlyRef.process_sample(in_sample)

        # (B) Modulate chain1's MAPF if desired
        mapf1.set_delay(1343.0 + 12.0 * math.sin(phase1))
        phase1 += phase_inc

        # Summation node for chain1
        # We'll use chain2_out_prev to feed back crosswise
        c1_in = early_out + decay * chain2_out_prev

        # Process chain1
        c1_out = chain1.process_sample(c1_in)

        # (C) Modulate chain2's MAPF if desired
        mapf2.set_delay(995.0 + 12.0 * math.sin(phase2))
        phase2 += phase_inc

        # Summation node for chain2
        # We'll feed in early_out plus c1_out
        c2_in = early_out + decay * c1_out

        # Process chain2
        c2_out = chain2.process_sample(c2_in)

        # Save
        c1_out_arr[n] = c1_out
        c2_out_arr[n] = c2_out

        # For next iteration, remember c2_out
        chain2_out_prev = c2_out

    return c1_out_arr, c2_out_arr

x, Fs = sf.read("snare.wav")
if x.ndim > 1:
    x = x[:,0]

c1, c2 = process_full_reverb(x, Fs, N_extra=4000)

# Possibly treat chain1 as "Left" and chain2 as "Right" for a stereo output:
stereo_out = np.stack([c1, c2], axis=1)
stereo_out = stereo_out.astype(np.float32)  # convert to float32 to avoid format issues


from IPython.display import Audio
Audio(stereo_out, rate=Fs)