In [3]:
import numpy as np
import sounddevice as sd
from scipy.io import wavfile

In [4]:
def ms2smp(ms, fs):
    """
    Parameters
    ----------
    ms: float
        Time in milliseconds
    fs: float
        Sampling rate in Hz.
    """
    # seconds = ms/1000
    return int(fs*ms/1000.)

In [5]:
def win_taper(N, a, data_type=np.int16):

    """
    Parameters
    ----------
    N: the length of the grain (in samples)
    a: a double between 0 and 1 representing the fraction of the N samples that will be attenuated
        (a/2 samples are attenuated on both sides)
    data_type: the data type of the output
        
    output: a profile represented by values that span the entire positive range of "data_type", that will modify the sound
    samples at the beginning and the start of grains so that we can make grains overlap without problem."""
    
    
    # Number of samples that are attenuated on each side
    nb_attenuated = int(N * a / 2)
    
    # Create the increasing "ramp"
    ramp = np.arange(0, nb_attenuated) / float(nb_attenuated)
    
    # Create the final profile by concatenating increasing ramp, untouched samples and decreasing ramp
    win = np.concatenate((ramp, 
        np.ones(N-2*nb_attenuated), 
        ramp[::-1]))
    
    # The maximum value that can be represented using this datatype
    max_val = np.iinfo(data_type).max

    # Make sure that the correct type is returned
    return (win*max_val).astype(data_type)

In [6]:
def compute_stride(N, a):
    return N - int(N * a / 2) - 1

In [7]:
def build_linear_interp_table(n_samples, down_fact, data_type=np.int16):

    which_samples = []
    amplitudes = []
    for n in range(n_samples):
        
        # The interpolation time
        t = n*down_fact
        
        # The largest integer smaller than t
        N = np.floor(t)
        
        # The amplitude that should have this latest sample
        # (if t = 1.01 s then the amplitude of the sample at time N=1 sould be 0.99)
        a = 1-(t-N)
        
        which_samples.append(N)
        amplitudes.append(a)

    MAX_VAL = np.iinfo(data_type).max
    
    # Set the amplitudes in the range defined by the data_type
    amplitudes = (np.array(amplitudes)*MAX_VAL).astype(data_type)

    return which_samples, amplitudes

In [13]:
# state variables and constants
def init():

    # lookup table for tapering window
    global WIN
    WIN = win_taper(N, a, data_type)

    # lookup table for linear interpolation
    global SAMP_VALS
    global AMP_VALS
    SAMP_VALS, AMP_VALS = build_linear_interp_table(N, shift_factor, data_type)
    

    # create arrays to pass between buffers (state variables)
    global LAST_VALUES
    LAST_VALUES = np.zeros((N))
    
    # create arrays for intermediate values
    global ARRAY_BUFF_1
    global ARRAY_BUFF_2
    ARRAY_BUFF_1 = np.zeros((N))
    ARRAY_BUFF_2 = np.zeros((N))



In [30]:
# the process function!
def process(input_buffer, output_buffer, buffer_len):

    # need to specify those global variables changing in this function (state variables and intermediate values)
    global ARRAY_BUFF_1
    global ARRAY_BUFF_2
    global LAST_VALUES
    

    # append samples from previous buffer
    for n in range(N):
        if n > N - OVERLAP_LEN:
            ARRAY_BUFF_1[n] = LAST_VALUES[n]

    # resample
    for n in range(N):
        if n < OVERLAP_LEN:
            ARRAY_BUFF_2[n] = input_buffer[n]

    # apply window
    for n in range(N):
        windowed_signal = WIN * (ARRAY_BUFF_1 + ARRAY_BUFF_2)

    # write to output
    for n in range(N):
        # overlapping part
        if n < OVERLAP_LEN:
            output_buffer[n] = windowed_signal[n] + windowed_signal[N-1-n]
        # non-overlapping part
        elif n < STRIDE:
            output_buffer[n] = input_buffer[n]
        # update state variables
        else:
            LAST_VALUES[n] = input_buffer[n - OVERLAP_LEN]

### Main cell for file processing

In [31]:

"""
Pitch shifting with granular synthesis for shift factors <=1.0
"""

""" User selected parameters """


input_wav = "speech.wav"
grain_len = 20      # in milliseconds
a = 0.3    # grain overlap (0,1)
shift_factor = 0.7  # <= 1.0

# open WAV file
samp_freq, signal = wavfile.read(input_wav)
signal = signal[:,1]  # get first channel
data_type = signal.dtype
MAX_VAL = np.iinfo(data_type).max

# derived parameters
N = ms2smp(grain_len, samp_freq)
STRIDE = compute_stride(N, a)
OVERLAP_LEN = N-STRIDE

# allocate input and output buffers
input_buffer = np.zeros(STRIDE, dtype=data_type)
output_buffer = np.zeros(STRIDE, dtype=data_type)

# Let's say that N = 100 and STRIDE = 80
# We want to output the first 80 samples
# input buffer contains the first 80 RAW samples

# For each input buffer, we need to remember the 

"""
Nothing to touch after this!
"""

init()

# Init simulation of block based processing
n_buffers = len(signal)//STRIDE
result = np.zeros(n_buffers*STRIDE, dtype=data_type)


for k in range(n_buffers):

    # sample indices
    start_idx = k*STRIDE
    end_idx = (k+1)*STRIDE

    
    # input_buffer is a buffer of length stride
    input_buffer = signal[start_idx:end_idx]
    
    # Process
    process(input_buffer, output_buffer, STRIDE)
    result[start_idx:end_idx] = output_buffer

# write to WAV
file_name = "output_gran_synth.wav"
print("Result written to: %s" % file_name)
wavfile.write(file_name, samp_freq, result)



Result written to: output_gran_synth.wav
