# Premise of the algorithm

For a stationary time signal $x(t)$, the autocorrelation $r_x(\tau)$ is defined as: $r_x(\tau) = \int x(t)x(t + \tau)dt$.
Considering that this signal has a global maximum at $\tau = 0$ and periodic, we need to find at what lag ($\tau$) does $r_x(\tau)$ have another global maximum. The total lag between these two values will give us the period ($T$) which we can then use to find fundamental frequency as this $ F_0 = \dfrac{1}{T}$

In [1]:
import librosa
import scipy.signal
import scipy.fft
import scipy.optimize
import matplotlib.pyplot as plt
import numpy as np
import math
import os

data_dir = ""
pitch_dir = ""

minimum_pitch = 60
maximum_pitch = 500
window_size = None
global_absolute_peak = None


voice_threshold = 0.4
silence_threshold = 0.07
candidates = 4
octaveCost = 0.06

def load_file (name):
    y, sr = librosa.load(data_dir + name, sr=None)
    return (y, sr)

## Step 1: Find Local Maxima in each frame

Since this algorithm requires a constant time signal, we need to analyze a very short frame which will contain the same pitch throughout. We analyze successive frames to come up with our final fundamental frequency value. This way is very robust because analyzing multiple frames helps to minimize any errors which may arise in one or two of many frames. 

### Find the autocorrelation of a short-term frame
1. Multiply the segment with the window function. Boersma proved that Hanning Window, $w(t) = \frac{1}{2} - \frac{1}{2} \cos \left(\dfrac{2\pi t}{T} \right)$ , showed the best results.
2. Find the autocorrelation of $a(t) = x(t) * w(t) $ by first taking discrete fast fourier transform, squaring all the values and then taking the inverse discrete fast fourier transform. 
3. Finally divide $r_a(\tau)$ with the autocorrelation of window function $r_w(\tau) = \left(1-\dfrac{\mid \tau \mid}{T} \right) \left(\dfrac{2}{3} + \dfrac{1}{3} \cos \dfrac{2 \pi \tau}{T} \right) + \dfrac{1}{2 \pi} \sin \dfrac{2 \pi \mid \tau \mid}{T} $ to get $ r_x(\tau) $.

### Next find the local maxima

To find local maxima, we need to do a linear search over $r_x(m\Delta\tau)$ and find all such m values where $ r_x((m+1)\Delta\tau) < r_x(m\Delta\tau)$ and $r_x((m-1)\Delta\tau) < r_x(m\Delta\tau)$. According to this method our m value should be our local maximum but this is not precise enough that is why we use cubic spline interpolation to get even more accurate local maxima. Instead of searching for local maxima in a linear fasion with a very small increment like 0.00001, I used gradient descent. 

In [2]:
def preprocessing (y, sr):
    global global_absolute_peak
    nyquist_frequency = sr / 2
    fft = scipy.fft.fft(y)
    for i in range(np.int(0.95 * nyquist_frequency), np.int(nyquist_frequency)):
        fft[i] = 0
    ifft = np.real(scipy.fft.ifft(fft))
    global_absolute_peak = np.max(np.abs(ifft))
    return ifft

def initialize_min_and_max (song):
    global data_dir
    
    if song[-6] == '1' or song[-6] == '2' or song[-6] == '3':
        try:
            pitch_data = open(data_dir + 'minmax').readlines()
        except FileNotFoundError:
            print('Please create a minmax file. Using default values for now')
            maximum_pitch = 500
            minimum_pitch = 60
            return
        for each in pitch_data:
            if each[0] == song[-6]:
                minimum_pitch = int(each.strip().split(' ')[1])
                maximum_pitch = (each.strip().split(' ')[2])
    else:
        maximum_pitch = 500
        minimum_pitch = 60
        

def closest_power_2 (x):
    a = 1
    while (a < x):
        a *= 2
    return a

c_signal = None

def get_sinc_interpolation (x):
    global c_signal 
    nl = np.int(x)
    nr = nl + 1
    phil = x - nl
    phir = 1 - phil
    N = min(500, window_size//2)
    
    # numpy vectorized solution for faster results
    
    one = c_signal[nl+1:nl+N+1]
    two = np.pi * (np.arange(1, one.size+1) + phir - 1)
    two = np.multiply(np.divide(np.sin(two), two), (0.5 + 0.5 * np.cos(two/(phir + N))))
    two = np.nan_to_num(two, nan=1.0)
    
    if (nr-N-1 < 0):
        difference = np.abs(nr-N-1)
        three = np.concatenate((c_signal[max(nr-1, 0):max(nr-N-1, 0):-1], c_signal[1:difference]))
    else:
        three = c_signal[max(nr-1, 0):max(nr-N-1, 0):-1]
    four = np.pi * (np.arange(1, three.size+1) + phil - 1)
    four = np.multiply(np.divide(np.sin(four), four), (0.5 + 0.5 * np.cos(four/(phil + N))))
    four = np.nan_to_num(four, nan=1.0)
    
    ans = np.sum(np.multiply(one, two)) + np.sum(np.multiply(three, four))    
        
    return ans


tck = None

def cubic_interpolation_init (signal):
    global tck
    x_points = np.arange(0, signal.size)
    tck = scipy.interpolate.interp1d(x_points, -signal, kind="cubic")
    
def get_hanning_auto_correlate (tau, period):
    return (1 - abs(tau)/period) * ((2/3) + (1/3)*np.cos((2 * np.pi * tau)/period)) + (1/(2*np.pi)) * np.sin((2*np.pi*abs(tau))/period)
    
def get_cubic_interpolation (x):
    global tck
    try:
        return tck(x)
    except:
        return 0
    
def get_gaussian (x, period):
        return np.e**(-12 * (x/period - 0.5)**2) / (1-np.e**(-12))

def find_max_cubic_interpolation (l, r):
    global tck
    output = scipy.optimize.brent(get_cubic_interpolation, brack=(l, r), full_output=True, maxiter=500)
    return [output[0], -output[1]]

def find_max_sinc_interpolation (signal, l, r):
    global c_signal
    c_signal = -signal
    output = scipy.optimize.brent(get_sinc_interpolation, brack=(l, r), full_output=True, maxiter=500)
    return [output[0], -output[1]]

def small_segment (ifft, start_time, sr):
    global window_size
    window_size = 3/minimum_pitch * sr
    duration = window_size/sr
    segment = np.real(ifft[np.int(start_time * sr):np.int(start_time * sr + window_size)])
        
    local_absolute_peak = np.max(np.abs(segment))
    rms = np.real(np.sqrt(np.sum(segment**2)/segment.shape[0]))
    segment = np.subtract(segment, rms)
    
    gaussian = get_gaussian(np.arange(segment.shape[0]), segment.shape[0])
    prod = np.multiply(segment, gaussian)
    
    pad = np.pad(prod, (0, prod.shape[0]//2), 'constant')
    pad = np.pad(pad, (0, closest_power_2(pad.shape[0]) - pad.shape[0]), 'constant')
    
    c_fft = np.real(scipy.fft.ifft(np.square(np.abs(scipy.fft.fft(pad)))))
    c_fft = c_fft/c_fft[0]
        
    gaussian_auto_correlate = np.correlate(gaussian, gaussian, mode='full')
    gaussian_auto_correlate = gaussian_auto_correlate[gaussian_auto_correlate.size//2:]
    gaussian_auto_correlate = gaussian_auto_correlate/gaussian_auto_correlate[0]
        
    r_x = np.divide(c_fft[:gaussian.size], gaussian_auto_correlate)
    r_x = r_x[:r_x.size//2]
    cubic_interpolation_init(r_x)

    local_maximi = []
    
    for i in range(2, min(r_x.size, np.int(np.ceil(1/60*22050)))):
        if (r_x[i-2] < r_x[i-1] and r_x[i] < r_x[i-1]):
            point, val = find_max_sinc_interpolation(r_x, i-2, i)   #find_max_sinc_interpolation(r_x, i-2, i)
            local_maximi.append([point, val])   #find_max_cubic_interpolation(i-2, i) 

    return (np.array(local_maximi), local_absolute_peak) 

## Step 2: Determine possible frequency candidates

To determine R we also need to know the global absolute peak (highest peak in the entire signal) and local absolute peak (highest peak in each frame). 

For each frame we consider 4 candidates. One of them is the silence candidate. The rest of 3 are voiced candidates.
All these candidates are represented in a frequency strength pairs (F, R). The unvoiced candidate is assigned frequency 0 and given $ R = \text{VoicingThreshold} + max \left(0, 2 - \dfrac{(\text{localAbsolutePeak}) / (\text{globalAbsolutePeak})}{\text{SilenceThreshold}/(1 + \text{VoicingThreshold})} \right) $ 

Out all the local maxima found for each frame, 3 voiced candidates with the R are considered with $ R = r(\tau_{max}) - \text{OctaveCost} \cdot \log_2(\text{MinimumPitch} \cdot \tau_{max}) $

Default value for voice threshold is 0.4 times global absolute peak and 0.02 times local absolute peak for silence threshold.

In [3]:
 def frame_analysis (local_maximi, local_absolute_peak, sr):
    global global_absolute_peak, voice_threshold, silence_threshold, candidates
    frequency_strength_pairs = [[0.0, (voice_threshold * global_absolute_peak) + max(0, 2 - (local_absolute_peak/global_absolute_peak)/((silence_threshold * global_absolute_peak)/(1 + (voice_threshold * global_absolute_peak))))]]
    c_candidates = candidates
    possible_frequency_strength_pairs = []

    for i in range(0, local_maximi.shape[0]):
        lag = local_maximi[i][0]
        F = sr/lag
        if (lag > 0 and F < maximum_pitch):
            R = min(local_maximi[i][1], 1.0) - octaveCost * math.log2(minimum_pitch * lag * (1/sr))
            possible_frequency_strength_pairs.append([F, R])
            
    possible_frequency_strength_pairs.sort(reverse=True, key=lambda x: x[1])
    frequency_strength_pairs += possible_frequency_strength_pairs[:candidates-1]
    frequency_strength_pairs.sort(reverse=True, key=lambda x: x[1])
    
    return np.array(frequency_strength_pairs)

def analyze_frame (y, frames, sr): 
    all_freq_stren_pairs = []
    # each frame is the starting sample point, the windows are 24 ms
    for i, frame in enumerate(frames): 
        a, b = small_segment(y, frame, sr)
        all_freq_stren_pairs.append(frame_analysis(a, b, sr))
        if ((i+1) % 100 == 0):
            print(f"frame {i+1} out of {frames.size}")
    
    return np.array(all_freq_stren_pairs)

### Using dynamic programming to efficiently find the lowest cost path

Now we have 4 candidates from each frame. Next, we find the least cost path using dynamic programming. The cost is defined as follow: $ cost(\{p_n\}) = \sum\limits_{n=2}^{\text{numberOfFrames}} \text{transitionCost}(F_{n-1, p_{n-1}}, F_{np_n}) - \sum\limits_{n=1}^{\text{numberOfFrames}} R_{np_n} $ 

Where transition cost is defined to be 

$ \text{transitionCost}(F_1, F_2) = $
$$
\begin{align}
\begin{cases}
0 & \text{if $F_1 = 0$ and $F_2 = 0$ } \\
\text{VoicedUnvoicedCost} & \text{if $F_1 = 0$ xor $F_2 = 0$} \\
\text{OctaveJumpCost} \cdot \mid \log_2 \dfrac{F_1}{F_2} \mid & \text{if $F_1 \neq 0 $ and $F_2 \neq 0 $} \\
\end{cases}
\end{align}
$$

where all the candidates are defined as $\{(F_{np_n}, R_{np_n})\} $

And here we define defautl values of VoicedUnvoicedCost and OctaveJumpCost to be 0.2

In [None]:
dp = None

def transition_cost (f1, f2):
    voiced_unvoiced_cost = 0.2
    octave_jump_cost = 0.2
        
    if (f1 == 0 and f2 == 0):
        return 0
    elif ((f1 == 0.0 and f2 != 0.0) or (f1 != 0.0 and f2 == 0.0)):
        return voiced_unvoiced_cost
    else:
        return octave_jump_cost * abs(math.log2(f1/f2))
    
    
def viterbi (all_pairs):
    global dp
    
    for f in range(all_pairs.shape[0]):
        for c in range(all_pairs[f].shape[0]):
            if f == 0:
                dp[f][c] = (-all_pairs[f][c][1], None)
            else:
                cost, history = (float('inf'), float('inf'))
                for p in range(all_pairs[f-1].shape[0]):
                    n_cost = dp[f-1][p][0] + transition_cost(all_pairs[f-1][p][0], all_pairs[f][c][0]) - all_pairs[f][c][1]
                    if n_cost < cost:
                        cost = n_cost
                        history = p
                dp[f][c] = (cost, history)

                
def back_track (all_pairs):
    global dp
    ans = []
    c, h = (float('inf'), 0)
    for i, each in enumerate(dp[len(dp)-1]):
        if (each != np.inf and each[0] < c):
            c = each[0]
            h = i 
    for i in range(len(dp)-1, 0, -1):
        c = all_pairs[i][h][0]
        ans.append(c)
        h = dp[i][h][1]
    return ans[::-1]

## Generating pitch listings

Pitches are generated for every 0.01 seconds and saved inside /Akamai/voice/data/pitches/boersma/Scherbaum Mshavanadze/song_name/recording_name.txt.

In [None]:
def create_and_store (song_names):
    global y, sr, dp
    done = 0
    for song_name in song_names:
        initialize_min_and_max(song_name)
        y, sr = load_file(song_name)
        y = preprocessing(y, sr)
        
        duration = y.size/sr
        frames = np.arange(0, duration, 0.01)[:-2]
        all_pairs = analyze_frame(y, frames, sr)        
        dp = np.full((all_pairs.shape[0], candidates), np.inf).tolist()
        
        viterbi(all_pairs)
        f = back_track(all_pairs)
        for i in range(len(f)):
            f[i] = str(frames[i]) + ' '  + str(f[i])
                                    
        song_dir = pitch_dir + song_name
        song_dir = song_dir[:-4] + ".txt"
        
        fout = open(song_dir, "w+")
        fout.write("\n".join(str(x) for x in f))
        fout.close()
        
        done += 1
        print(f"{done} done")

In [None]:
# Scherbaum Mshavandaze
# parent_data_dir = '/Akamai/voice/data/Scherbaum Mshavanadze/'
# parent_pitch_dir = '/Akamai/voice/data/pitches-raw/boersma/Scherbaum Mshavanadze/'

# for collection in os.listdir(parent_data_dir):
#     if os.path.isdir(f"{parent_data_dir}{collection}"):
#         parts = []
#         for part in os.listdir(f"{parent_data_dir}{collection}"):
#             if part[-3:] == 'wav':
#                 parts.append(part)
                
#         data_dir = parent_data_dir + collection + '/'
#         pitch_dir = parent_pitch_dir + collection + '/'
#         create_and_store(parts)
        
# Teach Yourself Megrelian Songs
# parent_data_dir = '/Akamai/voice/data/Teach Yourself Megrelian Songs/'
# parent_pitch_dir = '/Akamai/voice/data/pitches-raw/boersma/Teach Yourself Megrelian Songs/'

# for collection in os.listdir(parent_data_dir):
#     if os.path.isdir(f"{parent_data_dir}{collection}"):
#         if collection != 'mp3':
#             parts = []
#             for part in os.listdir(f"{parent_data_dir}{collection}"):
#                 if part[-3:] == 'wav':
#                     parts.append(part)

#             data_dir = parent_data_dir + collection + '/'
#             pitch_dir = parent_pitch_dir + collection + '/'
#             create_and_store(parts)
            
# Teach Yourself Gurian Songs
# parent_data_dir = '/Akamai/voice/data/Teach Yourself Gurian Songs/'
# parent_pitch_dir = '/Akamai/voice/data/pitches-raw/boersma/Teach Yourself Gurian Songs/'

# for collection in os.listdir(parent_data_dir):
#     if os.path.isdir(f"{parent_data_dir}{collection}"):
#         if collection != 'mp3':
#             parts = []
#             for part in os.listdir(f"{parent_data_dir}{collection}"):
#                 if part[-3:] == 'wav':
#                     parts.append(part)

#             data_dir = parent_data_dir + collection + '/'
#             pitch_dir = parent_pitch_dir + collection + '/'
#             create_and_store(parts)


### Testing and Debugging