In [None]:
import numpy as np
from IPython.display import Audio, display
from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

In [None]:
og_len = 5000 # 5 seconds
channels = 2  # Stereo audio
sr = 44100 # stream rate
audio_path = "../../resources/audio1.wav"

sr_loaded, y = wavfile.read(audio_path)

# Convert to float32 and shape to (channels, samples)
y = y.T.astype(np.float32) / np.max(np.abs(y))  # normalize
waveform = y
og_len = waveform.shape[1]

# Convert stereo → mono for SOLA (mean of channels)
if waveform.ndim == 2:
    audio_signal = np.mean(waveform, axis=0)
else:
    audio_signal = waveform

display(Audio(waveform, rate=sr))

# OLA

Overlap and Add
Direct pitch shift up 5 semitones

In [None]:
win_len = 5000 # / 44100 = 113ms
win_f = np.hanning(win_len)

anls_hop_len = 2000 # / 44100 = 45ms
scaling = 2 ** (7 / 12)  # 5 semitones
synth_hop_len = int(anls_hop_len * scaling)

new_waveform_len = int(np.ceil(og_len / anls_hop_len) * synth_hop_len) + win_len
new_waveform = np.zeros((channels, new_waveform_len)) # two channels, stereo output
new_scales = np.zeros(new_waveform_len)

# Loop through the windows
for i in range(0, og_len, anls_hop_len):
    clipped_len = min(win_len, og_len - i)

    idx = int(i * scaling)

    # add window to new waveform
    new_waveform[:, idx:idx + clipped_len] += waveform[:, i:i + clipped_len] * win_f[None, :clipped_len]

    # add up windowing weights for normalization
    new_scales[idx:idx + clipped_len] += win_f[:clipped_len]

new_waveform = new_waveform / np.where(new_scales == 0, 1, new_scales)
display(Audio(new_waveform, rate=sr * scaling))

# SOLA

Synchronous Overlap and Add

In [None]:
def compute_periods_per_sequence(signal, seq_len, min_period, max_period):
    """Estimate fundamental period per analysis window via FFT autocorrelation."""
    N = len(signal)
    hop = seq_len
    periods = []
    for offset in range(0, N, hop):
        segment = signal[offset:offset + seq_len]
        if len(segment) < seq_len:
            segment = np.pad(segment, (0, seq_len - len(segment)))
        fourier = fft(segment)
        fourier[0] = 0  # remove DC
        autoc = ifft(fourier * np.conj(fourier)).real
        # constrain search region
        autoc_peak = min_period + np.argmax(autoc[min_period:max_period])
        periods.append(autoc_peak)
    return np.array(periods, dtype=int)

def find_peaks(signal, fs, max_hz=1000, min_hz=40, analysis_win_ms=40):
    N = len(signal)
    min_period = int(fs / max_hz)
    max_period = int(fs / min_hz)
    seq_len = int(analysis_win_ms / 1000 * fs)

    periods = compute_periods_per_sequence(signal, seq_len, min_period, max_period)
    mean_period = np.clip(int(np.mean(periods)), min_period, max_period)
    min_period = int(mean_period * 0.9)
    max_period = int(mean_period * 1.1)
    periods = compute_periods_per_sequence(signal, seq_len, min_period, max_period)

    peaks = [np.argmax(signal[: int(periods[0] * 1.1)])]
    max_iters = N // min_period

    for _ in range(max_iters):
        prev = peaks[-1]
        idx = min(prev // seq_len, len(periods) - 1)
        step = int(np.clip(periods[idx], min_period, max_period))
        next_est = prev + step
        if next_est + min_period >= N:
            break
        window = signal[next_est - min_period: next_est + min_period]
        if window.size == 0:
            break
        local_max = np.argmax(window) - min_period
        next_peak = next_est + local_max
        if next_peak <= prev:
            break
        peaks.append(next_peak)

    # 👇 THIS MUST BE HERE AND NOT INSIDE THE LOOP
    return np.array(peaks, dtype=int)

def shift_pitch(signal, sr, semitones=4, hop_ms=20, win_ms=80):
    """
    SOLA (Synchronised Overlap-Add) pitch shifter.
    """
    from scipy.signal import resample
    
    # Time-stretch factor
    stretch_factor = 2 ** (semitones / 12)
    
    # Parameters
    hop_len = int(sr * hop_ms / 1000)
    win_len = int(sr * win_ms / 1000)
    
    # Find pitch periods
    peaks = find_peaks(signal, sr, max_hz=400, min_hz=60, analysis_win_ms=int(win_ms))
    if len(peaks) < 2:
        return signal
    
    mean_period = int(np.mean(np.diff(peaks)))
    
    # Synthesis hop (stretched)
    synth_hop = int(hop_len * stretch_factor)
    
    # Output buffer
    max_len = int(len(signal) * stretch_factor) + win_len * 2
    output = np.zeros(max_len)
    output_scale = np.zeros(max_len)
    
    window = np.hanning(win_len)
    
    # SOLA main loop
    analysis_pos = 0
    synth_pos = 0
    
    while analysis_pos + win_len <= len(signal):
        frame = signal[analysis_pos:analysis_pos + win_len]
        
        # SOLA correlation search
        search_range = int(mean_period * 0.5)
        best_offset = 0
        best_corr = -np.inf
        
        for offset in range(-search_range, search_range + 1):
            corr_pos = analysis_pos + offset
            if corr_pos < 0 or corr_pos + win_len > len(signal):
                continue
            
            corr_frame = signal[corr_pos:corr_pos + win_len]
            correlation = np.dot(frame * window, corr_frame * window)
            if correlation > best_corr:
                best_corr = correlation
                best_offset = offset
        
        final_analysis_pos = analysis_pos + best_offset
        if final_analysis_pos >= 0 and final_analysis_pos + win_len <= len(signal):
            frame = signal[final_analysis_pos:final_analysis_pos + win_len]
        
        if synth_pos + win_len <= max_len:
            output[synth_pos:synth_pos + win_len] += frame * window
            output_scale[synth_pos:synth_pos + win_len] += window
        
        analysis_pos += hop_len
        synth_pos += synth_hop
    
    # Normalize
    output_scale[output_scale == 0] = 1
    output = output / output_scale
    
    # Trim to actual stretched length
    output = output[:synth_pos]
    
    # **KEY FIX**: Resample to compress time back to original duration
    # This raises the pitch without changing speed
    num_samples = len(signal)
    output_resampled = resample(output, num_samples)
    
    return output_resampled

shifted_audio = shift_pitch(audio_signal, sr, semitones=4)
display(Audio(shifted_audio, rate=sr))

# WSOLA

Waveform Similarity Overlap and Add

In [None]:
def shift_pitch_wsola(signal, sr, semitones=4, hop_ms=20, win_ms=40):
    from scipy.signal import resample
    
    # Time-stretch factor
    stretch_factor = 2 ** (semitones / 12)
    
    # Parameters
    hop_analysis = int(sr * hop_ms / 1000)
    hop_synthesis = int(hop_analysis * stretch_factor)
    win_len = int(sr * win_ms / 1000)
    
    # Tolerance region for searching (typically half the window)
    tolerance = win_len // 2
    
    # Output buffer
    max_len = int(len(signal) * stretch_factor) + win_len * 2
    output = np.zeros(max_len)
    
    # Window function
    window = np.hanning(win_len)
    
    # Initialize
    analysis_pos = 0
    synth_pos = 0
    
    # Copy first frame directly
    if win_len <= len(signal):
        output[:win_len] = signal[:win_len] * window
        analysis_pos = hop_analysis
        synth_pos = hop_synthesis
    
    # WSOLA main loop
    while analysis_pos + win_len <= len(signal):
        # Target analysis position
        target_pos = analysis_pos
        
        # Search range: target_pos ± tolerance
        search_start = max(0, target_pos - tolerance)
        search_end = min(len(signal) - win_len, target_pos + tolerance)
        
        # Find position with maximum cross-correlation with previous synthesis frame
        best_pos = target_pos
        best_corr = -np.inf
        
        # Get the overlap region from previous synthesis output for comparison
        overlap_len = win_len // 2  # typically half window
        if synth_pos >= overlap_len:
            template = output[synth_pos - overlap_len:synth_pos]
            
            # Search for best matching position
            for pos in range(search_start, search_end + 1):
                # Get candidate overlap region from input signal
                if pos >= overlap_len and pos + win_len <= len(signal):
                    candidate = signal[pos - overlap_len:pos]
                    
                    # Normalized cross-correlation
                    if len(candidate) == len(template):
                        correlation = np.dot(template, candidate)
                        if correlation > best_corr:
                            best_corr = correlation
                            best_pos = pos
        
        # Extract frame at best position
        frame = signal[best_pos:best_pos + win_len]
        
        # Overlap-add with window
        if synth_pos + win_len <= max_len:
            output[synth_pos:synth_pos + win_len] += frame * window
        
        # Advance positions
        analysis_pos += hop_analysis
        synth_pos += hop_synthesis
    
    # Trim to actual stretched length
    output = output[:synth_pos]
    
    # Resample to compress time back to original duration
    output_resampled = resample(output, len(signal))
    
    return output_resampled

# Usage
shifted_audio = shift_pitch_wsola(audio_signal, sr, semitones=4)
display(Audio(shifted_audio, rate=sr))

# TD-PSOLA

Time-Domain Pitch Synchronous Overlap and Add

In [None]:
def shift_pitch_psola(signal, sr, semitones=4):
    """
    TD-PSOLA (Time-Domain Pitch Synchronous Overlap-Add) pitch shifter.
    
    Args:
        signal: Input audio signal (mono)
        sr: Sample rate
        semitones: Number of semitones to shift (positive = higher, negative = lower)
    """
    from scipy.signal import resample
    
    # Pitch shift factor
    pitch_factor = 2 ** (semitones / 12)
    
    # Step 1: Find pitch marks (peaks)
    peaks = find_peaks(signal, sr, max_hz=400, min_hz=60, analysis_win_ms=40)
    
    if len(peaks) < 2:
        return signal
    
    # Step 2: Calculate new peak positions for time-stretching
    # For pitch shifting up: compress time (closer peaks)
    # For pitch shifting down: expand time (farther peaks)
    time_stretch_factor = 1.0 / pitch_factor
    
    new_peaks = []
    for i, peak in enumerate(peaks):
        new_pos = int(peak * time_stretch_factor)
        new_peaks.append(new_pos)
    new_peaks = np.array(new_peaks)
    
    # Step 3: Extract and synthesize pitch-synchronous frames
    max_len = int(len(signal) * time_stretch_factor) + len(signal) // 2
    output = np.zeros(max_len)
    output_scale = np.zeros(max_len)  # Track window sum for normalization
    
    for i in range(len(peaks) - 1):
        # Get pitch period
        period = peaks[i + 1] - peaks[i]
        
        # Create Hann window of 2*period length, centered on pitch mark
        win_len = 2 * period
        window = np.hanning(win_len)
        
        # Extract frame centered on pitch mark
        frame_start = peaks[i] - period
        frame_end = peaks[i] + period
        
        if frame_start < 0:
            # Pad if near beginning
            pad_left = -frame_start
            frame = np.concatenate([np.zeros(pad_left), signal[0:min(frame_end, len(signal))]])
            window = window[pad_left:]
        elif frame_end > len(signal):
            # Pad if near end
            frame = signal[max(0, frame_start):len(signal)]
            pad_right = frame_end - len(signal)
            frame = np.concatenate([frame, np.zeros(pad_right)])
            window = window[:len(frame)]
        else:
            frame = signal[frame_start:frame_end]
        
        # Ensure lengths match
        if len(frame) != len(window):
            min_len = min(len(frame), len(window))
            frame = frame[:min_len]
            window = window[:min_len]
        
        windowed_frame = frame * window
        
        # Place frame at new position (overlap-add)
        new_center = new_peaks[i]
        new_start = max(0, new_center - len(windowed_frame) // 2)
        new_end = min(max_len, new_start + len(windowed_frame))
        frame_len = new_end - new_start
        
        if frame_len > 0:
            output[new_start:new_end] += windowed_frame[:frame_len]
            output_scale[new_start:new_end] += window[:frame_len]
    
    # Normalize by window sum to maintain amplitude
    output_scale[output_scale < 0.1] = 1  # Avoid division by zero
    output = output / output_scale
    
    # Trim to actual length
    output = output[:new_peaks[-1] + len(signal) // 10]
    
    # Step 4: Resample to achieve pitch shift
    # Time-stretched signal is resampled back to original duration
    # This changes pitch without changing duration
    output_resampled = resample(output, len(signal))
    
    return output_resampled

# Usage
shifted_audio = shift_pitch_psola(audio_signal, sr, semitones=4)
display(Audio(shifted_audio, rate=sr))