In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in 
from types import SimpleNamespace
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list the files in the input directory

import os
print(os.listdir("../input/test"))

# Any results you write to the current directory are saved as output.

['M3.wav', 'M1.wav', 'F1.wav', 'F5.wav', 'F4.wav', 'M2.wav', 'M5.wav', 'F2.wav', 'F3.wav', 'M4.wav']


In [2]:
def autocorr_hs_method(frame, sfreq, threshold=0.49, fmin=50, fmax=400):
    """Estimate pitch using autocorrelation and harmonic summation in frequency domain
    """

    # Calculate autocorrelation using scipy correlate
    frame = frame.astype(np.float)
    frame -= frame.mean()
    amax = np.abs(frame).max()
    if amax > 0:
        frame /= amax
    else:
        return 0
 
    #center clipping on the single frame
    cl = 0.1  #set center clipping threshold to 10% of max
    for i in range(len(frame)):
        if frame[i] >= cl:
            frame[i] = frame[i] - cl
        elif frame[i] <= -cl:
            frame[i] = frame[i] + cl
        else: frame[i] = 0

    c = signal.correlate(frame, frame)
    c = c[len(c)//2:]
    
    # Find the first minimum
    dc = np.diff(c)
    rmin = np.where(dc > 0)[0]
    if len(rmin) > 0:
        rmin1 = rmin[0]
    else:
        return 0

    # Find the next peak
    peak = np.argmax(c[rmin1:]) + rmin1
    rmax = c[peak]/c[0]
    f0 = sfreq / peak
    
    dft_points = sfreq*10  #0.1 precision in freq
    dft = np.fft.fft(frame, dft_points)
    
    #tuning the returned frequency with the harmonic summation
    cand = int(round(f0/(sfreq/dft_points)))
    for i in range(-10, 11):    #-1Hz/+1Hz
        harm_sum = 0
        for j in range(1,4):  #sum 3 harmonics
            harm_sum += dft[((cand+i)*j)%dft_points]
    
    f0 = (cand+(np.argmax(harm_sum)-15))*(sfreq/dft_points)   #final freq is the one with the highest harmonic sum

    if rmax > threshold and f0 >= fmin and f0 <= fmax:
        return f0
    else:
        return 0

In [3]:
class Counters:
    def __init__(self, gross_threshold=0.2):
        self.num_voiced = 0
        self.num_unvoiced = 0
        self.num_voiced_unvoiced = 0
        self.num_unvoiced_voiced = 0
        self.num_voiced_voiced = 0
        self.num_gross_errors = 0
        self.fine_error = 0
        self.e2 = 0
        self.gross_threshold = gross_threshold
        self.nfiles = 0

    def add(self, other):
        if other is not None:
            self.num_voiced += other.num_voiced
            self.num_unvoiced += other.num_unvoiced
            self.num_voiced_unvoiced += other.num_voiced_unvoiced
            self.num_unvoiced_voiced += other.num_unvoiced_voiced
            self.num_voiced_voiced += other.num_voiced_voiced
            self.num_gross_errors += other.num_gross_errors
            self.fine_error += other.fine_error
            self.e2 += other.e2
            self.nfiles += 1

    def __repr__(self):
        nframes = self.num_voiced + self.num_unvoiced
        if self.nfiles > 0:
            self.fine_error /= self.nfiles
        str = [
            f"Num. frames:\t{self.num_unvoiced + self.num_voiced} = {self.num_unvoiced} unvoiced + {self.num_voiced} voiced",
            f"Unvoiced frames as voiced:\t{self.num_unvoiced_voiced}/{self.num_unvoiced} ({100*self.num_unvoiced_voiced/self.num_unvoiced:.2f}%)",
            f"Voiced frames as unvoiced:\t{self.num_voiced_unvoiced}/{self.num_voiced} ({100*self.num_voiced_unvoiced/self.num_voiced:.2f}%)",
            f"Gross voiced errors (>{100*self.gross_threshold}%):\t{self.num_gross_errors}/{self.num_voiced_voiced} ({100*self.num_gross_errors/self.num_voiced_voiced:.2f}%)",
            f"MSE of fine errors:\t{100*self.fine_error:.2f}%",
            f"RMSE:\t{np.sqrt(self.e2/nframes):.2f}"
        ]
        return  '\n'.join(str)

In [4]:
def compare(fref, pitch):
    vref = np.loadtxt(fref)
    vtest = np.array(pitch)

    diff_frames = len(vref) - len(vtest)
    if abs(diff_frames) > 5:
        print(f"Error: number of frames in ref ({len(vref)}) != number of frames in test ({len(vtest)})")
        return None
    elif diff_frames > 0:
        vref = np.resize(vref, vtest.shape)
    elif diff_frames < 0:
        vtest = np.resize(vtest, vref.shape)

    counters = Counters()
    counters.num_voiced = np.count_nonzero(vref)
    counters.num_unvoiced = len(vref) - counters.num_voiced
    counters.num_unvoiced_voiced = np.count_nonzero(np.logical_and(vref == 0, vtest != 0))
    counters.num_voiced_unvoiced = np.count_nonzero(np.logical_and(vref != 0, vtest == 0))

    voiced_voiced = np.logical_and(vref != 0, vtest != 0)
    counters.num_voiced_voiced = np.count_nonzero(voiced_voiced)

    f = np.absolute(vref[voiced_voiced] - vtest[voiced_voiced])/vref[voiced_voiced]
    gross_errors = f > counters.gross_threshold
    counters.num_gross_errors = np.count_nonzero(gross_errors)
    fine_errors = np.logical_not(gross_errors)
    counters.fine_error = np.sqrt(np.square(f[fine_errors]).mean())
    counters.e2 = np.square(vref - vtest).sum()

    return counters

In [5]:
def wav2f0(options, gui):
    fs = open(options.submission, 'w') if options.submission is not None else None
    totalCounters = Counters()
    with open(gui) as f:
        if fs is not None:
            print('id,frequency', file=fs)
        for line in f:
            line = line.strip()
            if len(line) == 0:
                continue
            filename = os.path.join(options.datadir, line + ".wav")
            f0ref_filename = os.path.join(options.datadir, line + ".f0ref")
            print("Processing:", filename)
            sfreq, data = wavfile.read(filename)
            nsamples = len(data)

            
            #PREPROCESSING
            #lowpass filter
            fh = 1000 #cut off freq
            b, a = signal.ellip(5, 3, 40, fh / (sfreq / 2), 'low')
            data = signal.filtfilt(b, a, data)
            
            
            #center clipping on the entire data
            tmp = data.copy()
            for i in range(int(len(data)*0.03)):    #delete 3% of the max samples for setting the clipping threshold
                clip = np.max(tmp)                  #to avoid a too large number due to unexpected peaks
                n = np.argmax(tmp)
                tmp[n] = 0
                
            
            cl = 0.05*clip  #set center clipping threshold to 5% of previously found value
            for i in range(len(data)):
                if data[i] >= cl:
                    data[i] = data[i] - cl
                elif data[i] <= -cl:
                    data[i] = data[i] + cl
                else: data[i] = 0
    
            # From miliseconds to samples
            ns_windowlength = int(round((options.windowlength * sfreq) / 1000))
            ns_frameshift = int(round((options.frameshift * sfreq) / 1000))
            ns_left_padding = int(round((options.left_padding * sfreq) / 1000))
            ns_right_padding = int(round((options.right_padding * sfreq) / 1000))
            pitch = []
            
            for id, ini in enumerate(range(-ns_left_padding, nsamples - ns_windowlength + ns_right_padding + 1, ns_frameshift)):
                first_sample = max(0, ini)
                last_sample = min(nsamples, ini + ns_windowlength)
                frame = data[first_sample:last_sample]
                f0 = autocorr_hs_method(frame, sfreq)
                pitch.append(f0)
               
            #POST PROCESSING
            pitch = signal.medfilt(pitch, 3)  #median filter
            
            #delete outliers (greater than 3.5 times the lowest estimated pitch)
            pit = [f for f in pitch if f!=0]
            for i in range(len(pitch)):
                if (pitch[i] > 3.5*np.min(pit)):
                    pitch[i] = 0
            
            for i in range(len(pitch)):
                if fs is not None:
                    print(line + '_' + str(i) + ',', pitch[i], file=fs)
                          
            if os.path.isfile(f0ref_filename):
                counters = compare(f0ref_filename, pitch)
                totalCounters.add(counters)

    if totalCounters.num_voiced + totalCounters.num_unvoiced > 0:
        print("### Summary")
        print(totalCounters)
        print("-------------------------------\n")

In [6]:
fda_ue_options = SimpleNamespace(
    windowlength=32, frameshift=15, left_padding=16, right_padding=16, datadir='../input', submission=None)
wav2f0(fda_ue_options, '../input/fda_ue.gui')

Processing: ../input/fda_ue/rl001.wav


  b = a[a_slice]
  return x[reverse].conj()


Processing: ../input/fda_ue/rl002.wav
Processing: ../input/fda_ue/rl003.wav
Processing: ../input/fda_ue/rl004.wav
Processing: ../input/fda_ue/rl005.wav
Processing: ../input/fda_ue/rl006.wav
Processing: ../input/fda_ue/rl007.wav
Processing: ../input/fda_ue/rl008.wav
Processing: ../input/fda_ue/rl009.wav
Processing: ../input/fda_ue/rl010.wav
Processing: ../input/fda_ue/rl011.wav
Processing: ../input/fda_ue/rl012.wav
Processing: ../input/fda_ue/rl013.wav
Processing: ../input/fda_ue/rl014.wav
Processing: ../input/fda_ue/rl015.wav
Processing: ../input/fda_ue/rl016.wav
Processing: ../input/fda_ue/rl017.wav
Processing: ../input/fda_ue/rl018.wav
Processing: ../input/fda_ue/rl019.wav
Processing: ../input/fda_ue/rl020.wav
Processing: ../input/fda_ue/rl021.wav
Processing: ../input/fda_ue/rl022.wav
Processing: ../input/fda_ue/rl023.wav
Processing: ../input/fda_ue/rl024.wav
Processing: ../input/fda_ue/rl025.wav
Processing: ../input/fda_ue/rl026.wav
Processing: ../input/fda_ue/rl027.wav
Processing: 

In [7]:
test_options = SimpleNamespace(
    windowlength=26.5, frameshift=10, left_padding=13.25, right_padding=7, datadir='../input/test', submission='submission.csv')
wav2f0(test_options, '../input/test.gui')

Processing: ../input/test/F1.wav


  b = a[a_slice]
  return x[reverse].conj()


Processing: ../input/test/F2.wav
Processing: ../input/test/F3.wav
Processing: ../input/test/F4.wav
Processing: ../input/test/F5.wav
Processing: ../input/test/M1.wav
Processing: ../input/test/M2.wav
Processing: ../input/test/M3.wav
Processing: ../input/test/M4.wav
Processing: ../input/test/M5.wav
