In [1]:
def find_baseline(data, sampling_rate, PRODUCTS):

    '''
    
    To tell the difference of the non-voice and voice part of the signal
    we have to form a baseline based on the power spectrum.
    
    This function assume the first 2000 samples is in the non-voice part of the signal.
    It'll use the Harmonic Product Spectrum function to generate the corresponding baseline.
    
    Input: 
        data : array-like            
                        Full-length signal.
        sampling_rate : int  
                        Sampling rate of the signal.
        PRODUCTS : int 
                        Number of products to evaluate the harmonic product spectrum over.
    
    Output:
        maxv : int 
                        Base energy level that the algorithm based on to filter out the non-voice part's f0.
    
    '''
    

    window = np.hanning(WINDOW_SIZE)
    i=1
    get_data_slice = data[int((0.5*i-0.5)*WINDOW_SIZE) : int((0.5*i+0.5)*WINDOW_SIZE)]

    fourier_transform = np.fft.rfft(get_data_slice*window, 2**13)

    abs_fourier_transform = np.abs(fourier_transform)
    power_spectrum = np.square(abs_fourier_transform)

    down = []
    for i in range(1, PRODUCTS+1):
        down.append(power_spectrum[::i])
    #     down1 = power_spectrum[::1]

    res = []
    for i in range(len(down[-1])):
        mul = 1
        for j in range(0, PRODUCTS):
            mul *= down[j][i]
        res.append(mul)
    
    maxv = max(res)
    
    return maxv

In [2]:
import numpy as np
import matplotlib.pyplot as plt 
from scipy.io import wavfile
from scipy.signal import resample

%matplotlib tk

'''
    Harmonic product spectrum.

    This algorithm is used for fundamental frequency detection. It evaluates
    the magnitude of the FFT (fast Fourier transform) of the input signal, keeping
    only the positive frequencies. It then element-wise-multiplies this spectrum
    by the same spectrum downsampled by 2, then 3, ..., finally ending after
    numProd downsample-multiply steps.

    Parameters
    ----------
    FILE_PATH : String
      Specify where to locate original signal
      
    PRODUCTS : int
      Number of products to evaluate the harmonic product spectrum.
      
    FFT_POINT : int
      The length of the FFT. FFT lengths with low prime factors (i.e., products of 2,
      3, 5, 7) are usually (much) faster than those with high prime factors.
      
    LOW_PASS : int
      Lowest frequency that will be in the output
      
    HIGH_PASS : int
      Highest frequency that will be in the output
      
    WINDOW_SIZE : int
      The size of window of signal
      
    Return
    ----------
    Result : Array-like
      List of fundamental frequency corresponding with each window
'''

def find_f0_using_HPS(WINDOW_SIZE, LOW_PASS, HIGH_PASS, PRODUCTS, FILE_PATH, FFT_POINT):

    # Read data from original file
    sampling_rate, data = wavfile.read(FILE_PATH)
    # Innitiallize hanning window with the same size ofthe window
    window = np.hanning(WINDOW_SIZE)
    # Calculation of basic baseline to filter noise later
    base = find_baseline(data, sampling_rate, PRODUCTS)
    # Result list variable
    result = []

    # Slide through the signal with windows that overlaps 50% of each other
    for i in range(1, int(len(data)/(WINDOW_SIZE))*2-1):

        # EXTRACT DATA FROM SIGNAL AND COMPUTE FFT
        # Slice data with the windowsize that overlaps 50%
        get_data_slice = data[int((0.5*i-0.5)*WINDOW_SIZE) : int((0.5*i+0.5)*WINDOW_SIZE)]
    #     get_data_slice = data[16000 : 16320]
        # Generate the fourier power and its corresponding frequency
        fourier_transform = np.fft.rfft(get_data_slice*window, FFT_POINT)
#         frequency = np.linspace(0, sampling_rate/2, num=len(fourier_transform))
        frequency = (np.fft.rfftfreq(len(fourier_transform)*2-1,1/sampling_rate))


        # BANDPASS FILTERING
        # Create filter the value of too high or too low frequencies in the fourier result
        pas = []
        for i in range(len(frequency)):
            # Cut off the too low and too high frequencies
            if frequency[i]<LOW_PASS or frequency[i]>HIGH_PASS*5:
                pas.append(0)
            else:
                pas.append(1)
        # Apply the filter to the result of the FFT
        for i in range(len(frequency)):
            fourier_transform[i] *= pas[i]
        # Compute the Power Spectrum
        abs_fourier_transform = np.abs(fourier_transform)
        power_spectrum = np.square(abs_fourier_transform)

        # DOWNSAMPLING
        # Downsampling by the numnber of products
        down = [] # This is for saving the downsampled sequence
        for i in range(1, PRODUCTS+1):
            # Here, "downsampling a vector by N" means keeping only every N samples: downsample(v, N) = v[::N].
            down.append(power_spectrum[::i])

        # MULTIPLICATION
        # Element-wise-multiplies by the downsampled spectrum versions 
        res = [] # this is for saving the result after the multiplication
        for i in range(len(down[-1])):
            mul = 1
            for j in range(0, PRODUCTS):
                mul *= down[j][i]
            res.append(mul)

        # FIND THE PEAK AND EXTRACT F0
        # Find the index of the max peak of the multiplied result of the downsample versions
        maxv = max(res)
        indv = res.index(maxv)
        # Compare with the base TO FILTER OUT non_sound parts
        if maxv < base:
            indv = 0
        # Final compare with the condition then extract f0 for this frame    
        if frequency[indv] < LOW_PASS or frequency[indv] > HIGH_PASS:
            result.append(0)
            continue
        else:
            result.append(frequency[indv])
        
    return result


In [3]:
import numpy as np
from scipy.io.wavfile import read
import matplotlib.pyplot as plt

# find min of array x 
# input: x
# output: value min 'm'
def minx(x):
    m = x[0]
    for n in x:
        if(n < m): m = n
    return m


# find max of array x 
# input: x
# output: value max 'm'
def maxx(x):
    m = x[0]
    for n in x:
        if(n > m): m = n
    return m


# find sum of array x 
# input: x
# output: value sum 's'
def sumx(x):
    s = 0
    for n in x:
        s += n
    return s


# find mean of array x 
# input: x
# output: value mean of 's' with s is sum of array x
def meanx(x):
    s = 0
    if(len(x)==0): return 0
    for n in x:
        s +=n
    return s/len(x)


# Find the peak of a 'frame' of signal base on 'baseline'
# input: frame of signal
# output: a array contain low peaks of a 'frame' in signal called  'peakIndices'
def multi_peak_finding(frame, baseline):
    peakIndices = []
    peakIndex = None
    peakValue = None
    
    for index, value in enumerate(frame):
        if value < baseline:
            if peakValue == None or value < peakValue:
                peakIndex = index
                peakValue = value
        elif value > baseline and peakIndex != None:
            peakIndices.append(peakIndex)
            peakIndex = None
            peakValue = None
            
    if peakIndex != None:
        peakIndices.append(peakIndex)
        
    return peakIndices


# find the mean of signal to create an 'baseline' for 'multi_peak_finding' function
# input: signal x
# output: the mean of signal x
def baseline(x):
    return meanx(x)


# find the mean amplitude deviation of each frame in signal
# input: a frame of signal
# output: an array of mean amplitude deviation of each frame in signal - afAmdf - after using ADMF 
def computeAmdf(frame):
    
    k = len(frame)
    
    if k <= 0:
        return 0
    
    # create a matrix with one row and k colum
    afAmdf = np.ones(k)
    
    # find mean amplitude deviation in frames and add that value into array afAmdf
    for i in range(0,k):
        afAmdf[i] = meanx(np.abs(frame[np.arange(0, k - 1 - i)] - frame[np.arange(i + 1, k)]))
    return (afAmdf)



# Pitch Tracking of signal using AMDF
# input: 
#       x: the signal x read from file, 
#       FrameLength: the lenght of an frame in signal  
#       Hoplenght: Hop length of an frame in signal
#       f_s: sample rate of signal read from file and f_s = 16000Hz
# output: 
#       f : an array sampling frequency (f0) in each frame of signal  
#       t: time stamps
def PitchTimeAmdf(x, FrameLength, HopLength, f_s):

    # from 80 Hz to 250 Hz: fequency of male can speak
    # from 120 Hz to 400 Hz: fequency of female can speak
    # from 75 Hz to 350 Hz: the average fequency of adults can speak
    f_min = 120
    f_max = 400
    
    
    # the number of frame in signal
    NumOfFrames = x.size // HopLength
    
    # compute time stamps
    t = (np.arange(0, NumOfFrames) * HopLength + (FrameLength / 2)) / f_s

    # create a matrix 'f' contain sampling frequency (f0) in each frame
    f = np.zeros(NumOfFrames)
    
    # the limited samples of male or female can speak in signal
    sample_max = f_s / f_min
    sample_min = f_s / f_max
    
    
    # find threshold of signal x to determine which frame is voice sound or unvoice sound 
    cout = 0
    thres = 0
    for n in x:
        if(n > 0):
            thres += n
            cout +=1
    thres = thres / cout
    
    # find f0 in each frame
    for n in range(0, NumOfFrames):
        
        # the first position of frame
        i_start = n * HopLength
        # the last position of frame
        i_stop = minx([len(x) - 1, i_start + FrameLength - 1])

        # return true i_start < i_stop and false if  i_start >= i_stop
        if(i_start >= i_stop):
            continue
        else:
            # create a frame
            x_tmp = x[np.arange(i_start, i_stop + 1)]
            
            # if the max pitch in frame < threshold of signal 
            thres_frame = maxx(x_tmp)
            if(thres_frame < thres):
                f[n] = 0
                continue
            
            # compute AMDF (average magitude difference funtion) to create an array of AMDF - 'afAmdf'
            afAmdf = computeAmdf(x_tmp)
            
        # create an array contain position of value of peaks in afAmdf 
        peak_array = multi_peak_finding(afAmdf, baseline(afAmdf))
        
        # create an array contain the value of peaks in afAmdf
        k = [afAmdf[peak] for peak in peak_array]
        
        # return f0 = 0 in frame n if 'multi_peak_finding' function can not define the peak in this fame
        if(len(k)<=2): 
            f[n]=0
            continue
            
        # find f0 in frame n   
        index = 1
        while(index < len(k)):
            # find min value of peak in frame 
            if(k[index] == minx(k[1:len(k)-1])):  
                # if min value of peak in frame > sample_max, 
                # i will plus that min value to 1000 an then it is not a min value.
                if(peak_array[index]>sample_max):
                    k[index] += 1000
                    index=1
                elif(peak_array[index]<sample_min):
                    f[n]=0
                else:
                    f[n] = f_s / peak_array[index]
                    break
            index+=1
            
    return (f, t)


# In[11]:




In [4]:
import numpy as np
import math
import matplotlib.pyplot as plt
from scipy.io.wavfile import read

#divide signal into frames  
def divide_signal_into_frames(y, frame_size, frame_stride, fs):
    frame_len = int(fs*frame_size) # number of samples in a single frame
    frame_step = int(fs*frame_stride) # number of overlapping samples
    total_frames = int(np.ceil(float(np.abs(len(y)-frame_len))/frame_step))  # total frames of signal
    padded_y = np.append(np.array(y), np.zeros(frame_len * total_frames - len(y)))  #push new array to end of matrix array
    framed_y = np.zeros((total_frames, frame_len))  #frame y (shape)
    for i in range(total_frames): #loop through over of total frames
        framed_y[i] = padded_y[i*frame_step : i*frame_step + frame_len]  # Apply ag Xj * Xj+t
    return framed_y


#Compute Autocorr Of each frames
def calculate_difference(frame) :
    half_len_signal = len(frame)//2
    j = 0
    autocorr = np.zeros(half_len_signal) # Init new Empty array to contain autocorrs of frame
    for j in range(half_len_signal):
        for i in range(half_len_signal):
            diff = frame[i] - frame[i+j] # Do lech bien do
            autocorr[j] += diff**2 
    
    return autocorr 

#tau time autocorrrelation unit
#Tien Research
def normalize_with_cumulative_mean(autocorr, halflen):
    new_autocorr = autocorr
    new_autocorr[0] = 1
    running_sum = 0.0
    for tau in range(1,halflen):
        running_sum += autocorr[tau]
        new_autocorr[tau] = autocorr[tau]/((1/tau)*running_sum)
    
    return new_autocorr


def absolute_threshold(new_autocorr, halflen, threshold):
    #create new array with condition          
    temp = np.array(np.where(new_autocorr < threshold))
    if (temp.shape == (1,0)):
        tau = -1
    else : 
        tau = temp[:,0][0]
    return tau


# Tang do chinh xac cua cac dinh
# Noi suy parabol
def parabolic_interpolation(new_autocorr, tau, frame_len):
    if tau > 1 and tau < (frame_len//2-1):
        alpha = new_autocorr[tau-1]
        beta = new_autocorr[tau]
        gamma = new_autocorr[tau+1]
        improv = 0.5*(alpha - gamma)/(alpha - 2*beta + gamma)
    else :
        improv = 0
    
    new_tau = tau + improv
    #return new_tau
    return new_tau
    

def pitch_tracker(y, frame_size, frame_step, sr):
    #call to divide_signal and push args into it
    framed_y = divide_signal_into_frames(y, frame_size, frame_step, sr)
    pitches = []
    for i in range(len(framed_y)):
        #Find autocorrelation with i in rnage len of framed_y
        autocorr = calculate_difference(framed_y[i])
        #Find new autocorrelation with i in rnage len of framed_y
        new_autocorr = normalize_with_cumulative_mean(autocorr, frame_len//2)
        #find threshold
        tau = absolute_threshold(new_autocorr, frame_len//2, 0.16)
        new_tau = parabolic_interpolation(new_autocorr, tau, frame_len)
        if (new_tau == -1):
            pitch = 0
        else :
            pitch = sr/new_tau
        #push pitch into pitches array
        pitches.append(pitch)
    #get timestamps
    list_times = [int(i * frame_len / 2) for i in range(len(pitches))]
    #return a tuple type with pitches and times
    return (pitches, list_times)

In [5]:
import numpy as np
import matplotlib.pyplot as plt 
from scipy.io import wavfile
from scipy.signal import resample

%matplotlib tk

'''
    Harmonic product spectrum.

    This algorithm is used for fundamental frequency detection. It evaluates
    the magnitude of the FFT (fast Fourier transform) of the input signal, keeping
    only the positive frequencies. It then element-wise-multiplies this spectrum
    by the same spectrum downsampled by 2, then 3, ..., finally ending after
    numProd downsample-multiply steps.

    Parameters
    ----------
    FILE_PATH : String
      Specify where to locate original signal
      
    PRODUCTS : int
      Number of products to evaluate the harmonic product spectrum.
      
    FFT_POINT : int
      The length of the FFT. FFT lengths with low prime factors (i.e., products of 2,
      3, 5, 7) are usually (much) faster than those with high prime factors.
      
    LOW_PASS : int
      Lowest frequency that will be in the output
      
    HIGH_PASS : int
      Highest frequency that will be in the output
      
    WINDOW_SIZE : int
      The size of window of signal
      
    Return
    ----------
    Result : Array-like
      List of fundamental frequency corresponding with each window
'''

def find_f0_using_HPS_nocut(WINDOW_SIZE, LOW_PASS, HIGH_PASS, PRODUCTS, FILE_PATH, FFT_POINT):

    # Read data from original file
    sampling_rate, data = wavfile.read(FILE_PATH)
    # Innitiallize hanning window with the same size ofthe window
    window = np.hanning(WINDOW_SIZE)
    # Calculation of basic baseline to filter noise later
    base = find_baseline(data, sampling_rate, PRODUCTS)
    # Result list variable
    result = []

    # Slide through the signal with windows that overlaps 50% of each other
    for i in range(1, int(len(data)/(WINDOW_SIZE))*2-1):

        # EXTRACT DATA FROM SIGNAL AND COMPUTE FFT
        # Slice data with the windowsize that overlaps 50%
        get_data_slice = data[int((0.5*i-0.5)*WINDOW_SIZE) : int((0.5*i+0.5)*WINDOW_SIZE)]
    #     get_data_slice = data[16000 : 16320]
        # Generate the fourier power and its corresponding frequency
        fourier_transform = np.fft.rfft(get_data_slice*window, FFT_POINT)
#         frequency = np.linspace(0, sampling_rate/2, num=len(fourier_transform))
        frequency = (np.fft.rfftfreq(len(fourier_transform)*2-1,1/sampling_rate))


#         # BANDPASS FILTERING
#         # Create filter the value of too high or too low frequencies in the fourier result
#         pas = []
#         for i in range(len(frequency)):
#             # Cut off the too low and too high frequencies
#             if frequency[i]<LOW_PASS or frequency[i]>HIGH_PASS*5:
#                 pas.append(0)
#             else:
#                 pas.append(1)
        # Apply the filter to the result of the FFT
#         for i in range(len(frequency)):
#             fourier_transform[i] *= pas[i]
        # Compute the Power Spectrum
        abs_fourier_transform = np.abs(fourier_transform)
        power_spectrum = np.square(abs_fourier_transform)

        # DOWNSAMPLING
        # Downsampling by the numnber of products
        down = [] # This is for saving the downsampled sequence
        for i in range(1, PRODUCTS+1):
            # Here, "downsampling a vector by N" means keeping only every N samples: downsample(v, N) = v[::N].
            down.append(power_spectrum[::i])

        # MULTIPLICATION
        # Element-wise-multiplies by the downsampled spectrum versions 
        res = [] # this is for saving the result after the multiplication
        for i in range(len(down[-1])):
            mul = 1
            for j in range(0, PRODUCTS):
                mul *= down[j][i]
            res.append(mul)

        # FIND THE PEAK AND EXTRACT F0
        # Find the index of the max peak of the multiplied result of the downsample versions
        maxv = max(res)
        indv = res.index(maxv)
        # Compare with the base TO FILTER OUT non_sound parts
        if maxv < base:
            indv = 0
        # Final compare with the condition then extract f0 for this frame    
#         if frequency[indv] < LOW_PASS or frequency[indv] > HIGH_PASS:
#             result.append(0)
#             continue
#         else:
        result.append(frequency[indv])
        
    return result


In [6]:
import time

WINDOW_SIZE = 740

LOW_PASS = 70
HIGH_PASS = 350

# Declare the 
FFT_POINT = 2**13

PRODUCTS = 5

FILE_PATH = "studio_female.wav"
time_count = []

sr, y = read('lab_female.wav')
frame_size = 0.02
frame_step = 0.01  #hop length
frame_len = int(frame_size * sr) # #block length


start = time.time()
f0_list = find_f0_using_HPS(WINDOW_SIZE, LOW_PASS, HIGH_PASS, PRODUCTS, FILE_PATH, 2**15)
end =time.time()
time_count.append(end-start)
print(f0_list)
time1 = []
for i in range(len(f0_list)):
    time1.append(int(i*WINDOW_SIZE-WINDOW_SIZE/2))

# plt.plot(time1, f0_list, 'o')

start = time.time()
f0_list1 = find_f0_using_HPS_nocut(WINDOW_SIZE, LOW_PASS, HIGH_PASS, PRODUCTS, FILE_PATH, 2**15)
end =time.time()
time_count.append(end-start)
print(f0_list1)
time2 = []
for i in range(len(f0_list1)):
    time2.append(int(i*WINDOW_SIZE-WINDOW_SIZE/2))

# plt.plot(0, 0, 'o')

# start = time.time()
# # f0_list1 = find_f0_using_HPS(WINDOW_SIZE, LOW_PASS, HIGH_PASS, 5, FILE_PATH, 2**13)
# f_s, x = read(FILE_PATH)
# f, t = PitchTimeAmdf(x, 320, 160,  16000)
# end = time.time()
# time_count.append(end-start)

# time1 = []
# for i in range(len(f)):
#     time1.append(int(i*WINDOW_SIZE-WINDOW_SIZE/2))

# plt.plot(time1, f, 'o')
# # plt.plot(0, 0, 'o')

# start = time.time()
# pitches = pitch_tracker(y, frame_size, frame_step, sr)
# end = time.time()
# time_count.append(end-start)

# time1 = []
# for i in range(len(pitches[0])):
#     time1.append(int(i*WINDOW_SIZE-WINDOW_SIZE/2))

# plt.plot(time1, pitches[0], 'o')
# plt.plot(0, 0, 'o')
# print(pitches)
# start = timeit.default_timer()
# f0_list3 = find_f0_using_HPS(WINDOW_SIZE, LOW_PASS, HIGH_PASS, 9, FILE_PATH, 2**13)
# end = timeit.default_timer()
# time_count.append(end-start)

# time = []
# for i in range(len(f0_list)):
#     time.append(int(i*WINDOW_SIZE-WINDOW_SIZE/2))
    
# time1 = []
# for i in range(len(f0_list1)):
#     time1.append(int(i*WINDOW_SIZE-WINDOW_SIZE/2))
    
# time2 = []
# for i in range(len(f0_list2)):
#     time2.append(int(i*WINDOW_SIZE-WINDOW_SIZE/2))
    
# time3 = []
# for i in range(len(f0_list3)):
#     time3.append(int(i*WINDOW_SIZE-WINDOW_SIZE/2))

fig, axs = plt.subplots(2, 1) 
axs[0].title.set_text("Pitch contour of studio_female.wav that cut-off fft result")
axs[0].set_ylabel("F0")
axs[0].set_xlabel("Timestamps (samples)")
axs[0].plot(time1, f0_list, 'o')

axs[1].title.set_text("Pitch contour of studio_female.wav no cut-off")
axs[1].set_ylabel("F0")
axs[1].set_xlabel("Timestamps (samples)")
axs[1].plot(time2, f0_list1, 'o')

# axs[2].title.set_text("Pitch contour of lab_female.wav using HPS using PRODUCTS level of 10")
# axs[2].set_ylabel("Fundamental Frequency")
# axs[2].set_xlabel("Timestamps (samples)")
# axs[2].plot(time2, f0_list2, 'o')

# axs[3].title.set_text("Pitch contour of lab_female.wav using HPS using PRODUCTS level of 15")
# axs[3].set_ylabel("Fundamental Frequency")
# axs[3].set_xlabel("Timestamps (samples)")
# axs[3].plot(time3, f0_list3, 'o')

# plt.plot(range(len(power_spectrum)), power_spectrum)
print(time_count)



[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 282.70621624095946, 277.82355274802404, 275.8704873508499, 274.89395465226283, 0, 0, 0, 0, 0, 279.7766181451982, 278.3118190973176, 277.3352863987305, 274.89395465226283, 270.49955750862097, 264.15209496780494, 261.2224968720437, 259.7576978241631, 258.29289877628247, 259.2694314748695, 262.68729591992434, 271.476090207208, 273.4291556043822, 270.9878238579145, 261.71076322133723, 245.59797369465042, 232.41478226372487, 220.6963898806799, 210.4427965455156, 201.16573590893833, 195.30653971741586, 102.53593335164332, 0, 0, 0, 0, 261.71076322133723, 254.38676798193416, 256.3398333791083, 256.82809972840187, 258.29289877628247, 257.8046324269889, 0, 0, 0, 0, 240.2270438524215, 227.04385242149593, 208.00146479904788, 73.72821874332449, 79.09914858555342, 71.28688699685678, 76.16955048979219, 70.79862064756324, 73.23995239403095, 0, 71.77515334615032, 75.19301779120511, 243.15664194818274, 240.2270438524215, 231.

In [7]:
# print(len(f))
# print(len(f0_list))
print(len(pitches[0]))


NameError: name 'pitches' is not defined

In [None]:
print((pitches[0]))
