# Project 1 : 
# Isolated Digit Recognition in Noisy Env.

## import packages, define analysis parameters and draw parameters, audio file preparation, etc.

In [32]:
# import necessary pacakages
# strange issue: keep the import order to prevent matplotlib error
#  import matplotlib -> librosa -> pyplot -> librosa.display
import sys
import numpy as np
import matplotlib
import librosa
from matplotlib import pyplot as plt
import librosa.display

#from scipy.io import wavfile
from scipy import signal
from scipy.fft import fftshift

# display wav files
import IPython

In [33]:
# add '/' if path is not a null string
def addpath(path, file):
    if len(path) == 0: 
        return file
    else:
        return path + '/' + file

In [34]:
# Ts : Shift length in seconds, default 0.01 sec = 10ms
# Tf : Frame length in seconds, default 0.02 sec = 20ms
# Fs : navtive sampling frequency, set the sampling rate value as 16000
Fs = 16000
Ts = 0.01
Tf = 0.02

# to set the parameter for rawing the spectrum
cmap_plot = plt.cm.bone_r # default colormap for spectrogram, gray, reversed
FIG_SIZE = (8,3)

In [35]:
# draw spectrogram
from utils.gjdrawspectrogram3 import drawspectrogram3

# linear phase FIR filter design from magnitudes of the frequency components
from utils.gjfiroverlappadd import getLPHFIRFFT

# trapezoidal overlap add for FIR filtering
from utils.gjfiroverlappadd import firoverlapadd

# save audio in wav format
import utils.gjwavfile as wav

- - - - - - - 

### load Speech and Noise
오디오 파일이 16kHz, mono인지 확인

In [36]:
# size 함수 대신, len 사용.
def check_audio_file(file, defFs, checkMono):
    signal, Fs = librosa.load(file, sr=None, mono=False)
    if defFs != Fs:
        print('sampling rate mismatch, %d != %d for file %s'%(defFs, Fs, file))
        return False
    elif checkMono == True:
        if signal.ndim != 1:
            print('not mono file %s, shape='%(file), signal.shape)
            return False
        return True
    elif len(signal) <= 0:
        print('wrong audio file %s, shape='%(file), signal.shape)
    else:
        return True

def convert_audio_file(file, forceFs, forceMono):
    signal, Fs = librosa.load(file, sr=None, mono=False)
    changed = False
    if forceFs != Fs:
        print('sampling rate mismatch, %d != %d for file %s'%(forceFs, Fs, file))
        signal, Fs = librosa.load(file, sr=forceFs, mono=False)
        changed = True
    elif forceMono == True:
        if signal.ndim != 1:
            print('not mono file %s, shape='%(file), signal.shape)
            signal, Fs = librosa.load(file, sr=forceFs, mono=True)
            changed = True
    elif len(signal) <= 0:
        print('wrong audio file %s, shape='%(file), signal.shape)
        return False
    if changed == True:
        wav.writewav(file, Fs, signal, maxval=1.0)
        print('updating', file)
    return changed

In [37]:
import os

current_path = os.getcwd()

In [38]:
trainroot = current_path+'/audio/segmented_all/segmented-train/segmented-train'
labels_train = {'11jeonghy', 
                'Dandyst', 
                'InkooJeon',
                'YouYeNa',
                'deokkyukwon',
                'ohjihyeon',
                'son',
               }

# check, 파일 로드가 제대로 되었는지 false를 통해서 확인함.
# 0일 경우, 파일로딩이 제대로 이루어졌음을 알 수 있음.
Fs = 16000
for subname in labels_train:
    num_files = 0
    num_false_files = 0
    for w in range(10):
        for trial in range(10):
            basename = '%d/kdigits%d-%d.wav'%(w,trial,w)
            file = addpath(trainroot, addpath(subname, basename))
            num_files += 1
            if check_audio_file(file, Fs, True) == False:
                num_false_files += 1
    print('%s: false %d / %d\n'%(subname, num_false_files, num_files))

11jeonghy: false 0 / 100

YouYeNa: false 0 / 100

Dandyst: false 0 / 100

deokkyukwon: false 0 / 100

InkooJeon: false 0 / 100

son: false 0 / 100

ohjihyeon: false 0 / 100



In [39]:
valroot =  current_path+'/audio/segmented_all/segmented-val'
valclean = addpath(valroot, 'org')
labels_val = {
                'chlee',
                'do',
                'kyeong',
               }

# check
Fs = 16000
for subname in labels_val:
    num_files = 0
    num_false_files = 0
    for w in range(10):
        for trial in range(10):
            basename = '%d/kdigits%d-%d.wav'%(w,trial,w)
            file = addpath(valclean, addpath(subname, basename))
            num_files += 1
            if check_audio_file(file, Fs, True) == False:
                num_false_files += 1
    print('%s: false %d / %d\n'%(subname, num_false_files, num_files))

chlee: false 0 / 100

do: false 0 / 100

kyeong: false 0 / 100



- - - - 
### HMM training and test 함수 정의

In [40]:
import numpy as np
import matplotlib.pyplot as plt
#from scikits.talkbox.features import mfcc
#librosa.feature.mfcc(*, y=None, sr=22050, S=None, n_mfcc=20, dct_type=2, norm='ortho', lifter=0, **kwargs)[source]
from librosa.feature import mfcc
from scipy.io import wavfile
from hmmlearn import hmm
import numpy as np
import os
import warnings
import scipy.stats as sp
from time import time

warnings.filterwarnings("ignore")

In [41]:
############################################################################################## 
# extract MFCC features
# STFT : 구간마다 나눔.
# 저주파는 촘촘하게, 고주파는 뭉떵그려서 두는 것을 필터 뱅크 에너지라고 함.
# MFCC : 소리를 벡터로 나타냄. 각 vector element간의 correlation을 줄이면서 feature들로 표현함
def extmfcc(file):
    samplerate, d = wavfile.read(file)
    #features.append(mfcc(d, nwin=int(samplerate * 0.03), fs=samplerate, nceps= 6)[0])
    x = np.float32(d)
    hop=samplerate//100
    mc = mfcc(y=x, sr=samplerate, n_mfcc=num_mfcc, hop_length=hop, win_length=hop*2)
    return np.transpose(mc, (1,0))

# Hidden Markove Model(HMM)에서 점프하는 확률을 계산해주는 함수.
# Prior : 학습하기 전의 default 값 <-> Posterior : 데이터 분포를 보고서 학습한 값.
def initByBakis(inumstates, ibakisLevel):
    startprobPrior = np.zeros(inumstates)
    startprobPrior[0: ibakisLevel - 1] = 1/float((ibakisLevel - 1))
    transmatPrior = getTransmatPrior(inumstates, ibakisLevel)
    return startprobPrior, transmatPrior

def getTransmatPrior(inumstates, ibakisLevel):
    transmatPrior = (1 / float(ibakisLevel)) * np.eye(inumstates)

    for i in range(inumstates - (ibakisLevel - 1)):
        for j in range(ibakisLevel - 1):
            transmatPrior[i, i + j + 1] = 1. / ibakisLevel

    for i in range(inumstates - ibakisLevel + 1, inumstates):
        for j in range(inumstates - i - j):
            transmatPrior[i, i + j] = 1. / (inumstates - i)

    return transmatPrior

In [42]:
############################################################################################## 
# hyperparameters - CHANGE THEM TO IMPROVE PERFORMANCE
# 1. number of MFCC (feature dimension)
num_mfcc = 6
#num_mfcc = 10
#num_mfcc = 13
# 2. Parameters needed to train GMMHMM
m_num_of_HMMStates = 3  # number of states
m_num_of_mixtures = 2  # number of mixtures for each hidden state
m_covarianceType = 'diag'  # covariance type
m_n_iter = 10  # number of iterations
m_bakisLevel = 2

m_startprobPrior, m_transmatPrior = initByBakis(m_num_of_HMMStates,m_bakisLevel)
print("StartProbPrior="); print(m_startprobPrior)
print("TransMatPrior="); print(m_transmatPrior)

StartProbPrior=
[1. 0. 0.]
TransMatPrior=
[[0.5 0.5 0. ]
 [0.  0.5 0.5]
 [0.  0.  1. ]]


In [43]:
############################################################################################## 
# acoustic model definition
class SpeechModel:
    def __init__(self,Class,label):
        self.traindata = np.zeros((0,num_mfcc))
        self.Class = Class
        self.label = label
        self.model  = hmm.GMMHMM(n_components = m_num_of_HMMStates, n_mix = m_num_of_mixtures, \
                transmat_prior = m_transmatPrior, startprob_prior = m_startprobPrior, \
                covariance_type = m_covarianceType, n_iter = m_n_iter)


In [44]:
def train_digits(rootpath, speakers, tag, num_trials=10):    
    ############################################################################################## 
    # 1. find files
    #    for user "gjang", digit 2, recording trial 0 (1st)
    #    "segmented/gjang/2/kdigits0-2.wav"
    # 2. extract MFCC features for training and testing
    #    for each digit, indexes 4 and 9 for test, and the rest for training

    #fpaths = []
    #labels = []
    spoken = []
    m_trainingsetfeatures = []
    m_trainingsetlabels = []

    count = 0
    for username in speakers:
        apath2 = addpath(rootpath, username)    # example: segmented/gjang
        for ii in range(10):   #dnum in os.listdir(apath2):
            dnum = str(ii)
            apath3 = addpath(apath2, dnum)     # example: segmented/gjang/2
            if dnum not in spoken:
                spoken.append(dnum)
            for trial in range(num_trials):
                file = addpath(apath3,"{}{}-{}.wav".format(tag,trial,dnum))      # segmented/gjang/2/kdigits0-2.wav
                mc = extmfcc(file)

                # display file names for the first 20 files only
                count += 1
                if count <= 20:
                    print(file, dnum, end=' '); print(mc.shape, end=' ')
                elif count == 21:
                    print('...'); print('')

                m_trainingsetfeatures.append(mc)
                m_trainingsetlabels.append(dnum)

    print('Words spoken:', spoken)
    #print("number of labels and features = %d, %d" % ( len(labels), len(features) ))
    #print("feature shape = ", end='')
    #print(features[0].shape)

    ############################################################################################## 
    ntrain = len(m_trainingsetlabels)

    print("[training] number of labels and features = %d, %d" % 
            ( len(m_trainingsetlabels), len(m_trainingsetfeatures)) )
    print ('Loading data completed')

    ############################################################################################## 
    # model initialization
    gmmhmmindexdict = {}
    index = 0
    for word in spoken:
        gmmhmmindexdict[word] = index
        index = index +1

    ############################################################################################## 
    # training GMMHMM Models 
    start = time()

    speechmodels = [None] * len(spoken)
    for key in gmmhmmindexdict:
        speechmodels[gmmhmmindexdict[key]] = SpeechModel(gmmhmmindexdict[key],key)

    for i in range(0,len(m_trainingsetfeatures)):
         for j in range(0,len(speechmodels)):
             if int(speechmodels[j].Class) == int(gmmhmmindexdict[m_trainingsetlabels[i]]):
                speechmodels[j].traindata = np.concatenate((speechmodels[j].traindata , m_trainingsetfeatures[i]))

    for speechmodel in speechmodels:
        speechmodel.model.fit(speechmodel.traindata)

    print ('Training completed -- {0} GMM-HMM models are built for {0} different types of words'.format(len(spoken)))
    print('time elapsed: %.2f seconds' % ( time() - start ))
    print (" "); print(" ")
    
    return speechmodels, gmmhmmindexdict


In [45]:
speechmodels, gmmhmmindexdict = train_digits(trainroot, labels_train, 'kdigits', num_trials=10)

/home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-train/segmented-train/11jeonghy/0/kdigits0-0.wav 0 (88, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-train/segmented-train/11jeonghy/0/kdigits1-0.wav 0 (116, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-train/segmented-train/11jeonghy/0/kdigits2-0.wav 0 (79, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-train/segmented-train/11jeonghy/0/kdigits3-0.wav 0 (86, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-train/segmented-train/11jeonghy/0/kdigits4-0.wav 0 (50, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-train/segmented-train/11jeonghy/0/kdigits5-0.wav 0 (74, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-train/segmented-train/11jeonghy/0/kdigits6-0.wav 0 (112, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-train/segmented-t

In [46]:
def validation_digits(speechmodels, gmmhmmindexdict, rootpath, speakers, tag, num_trials=10):    

    ############################################################################################## 
    # 1. find files
    #    for user "gjang", digit 2, recording trial 0 (1st)
    #    "segmented/gjang/2/kdigits0-2.wav"
    # 2. extract MFCC features for training and testing
    #    for each digit, indexes 4 and 9 for test, and the rest for training

    #fpaths = []
    #labels = []
    spoken = []
    m_features = []
    m_labels = []

    count = 0
    for username in speakers:
        apath2 = addpath(rootpath, username)    # example: segmented/gjang
        for ii in range(10):   #dnum in os.listdir(apath2):
            dnum = str(ii)
            apath3 = addpath(apath2, dnum)     # example: segmented/gjang/2
            if dnum not in spoken:
                spoken.append(dnum)
            for trial in range(num_trials):
                file = addpath(apath3,"{}{}-{}.wav".format(tag,trial,dnum))      # segmented/gjang/2/kdigits0-2.wav
                mc = extmfcc(file)

                # display file names for the first 20 files only
                count += 1
                if count <= 20:
                    print(file, dnum, end=' '); print(mc.shape, end=' ')
                elif count == 21:
                    print('...'); print('')

                m_features.append(mc)
                m_labels.append(dnum)

    print('Words spoken:', spoken)
    #print("number of labels and features = %d, %d" % ( len(labels), len(features) ))
    #print("feature shape = ", end='')
    #print(features[0].shape)

    ############################################################################################## 
    print("[validation] number of labels and features = %d, %d" % ( len(m_labels), len(m_features)) )
    print ('Loading data completed')

    ############################################################################################## 
    # testing
    print("Prediction started")
    m_PredictionlabelList = []

    for i in range(0,len(m_features)):
        scores = []
        for speechmodel in speechmodels:
             scores.append(speechmodel.model.score(m_features[i]))
        id  = scores.index(max(scores))
        m_PredictionlabelList.append(speechmodels[id].Class)
        #print(str(np.round(scores, 3)) + " " + str(max(np.round(scores, 3))) +" "+":"+ speechmodels[id].label)

    accuracy = 0.0
    count = 0
    print("")
    print("Prediction for Testing DataSet:")

    for i in range(0,len(m_labels)):
        #print( "Label"+str(i+1)+":"+m_labels[i])
        if gmmhmmindexdict[m_labels[i]] == m_PredictionlabelList[i]:
           count = count+1

    accuracy = 100.0*count/float(len(m_labels))

    print("")
    print("accuracy ="+str(accuracy))
    print("")

    ############################################################################################## 
    # end of testing
    ############################################################################################## 

In [47]:
validation_digits(speechmodels, gmmhmmindexdict, trainroot, labels_train, 'kdigits', num_trials=10)
validation_digits(speechmodels, gmmhmmindexdict, valclean, labels_val, 'kdigits', num_trials=10)

/home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-train/segmented-train/11jeonghy/0/kdigits0-0.wav 0 (88, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-train/segmented-train/11jeonghy/0/kdigits1-0.wav 0 (116, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-train/segmented-train/11jeonghy/0/kdigits2-0.wav 0 (79, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-train/segmented-train/11jeonghy/0/kdigits3-0.wav 0 (86, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-train/segmented-train/11jeonghy/0/kdigits4-0.wav 0 (50, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-train/segmented-train/11jeonghy/0/kdigits5-0.wav 0 (74, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-train/segmented-train/11jeonghy/0/kdigits6-0.wav 0 (112, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-train/segmented-t

- - - - 
### noise 추가

In [49]:
audioinputpath = current_path+'/audio'
noisefile  = addpath(audioinputpath, 'audio_car.wav')
wnoisefile  = addpath(audioinputpath, 'audio_car_wideband.wav')   # 넓은 주파수 대역에 분포한 잡음

Fs=16000
noise, _ = librosa.load(noisefile, sr=Fs, mono=True)
wnoise, _ = librosa.load(wnoisefile, sr=Fs, mono=True)
# sr: target sampling rate. ‘None’ uses the native sampling rate
# mono = True: convert signal to mono

print(noisefile, noise.shape, noise)
print(wnoisefile, wnoise.shape, wnoise)

Ns = int(Fs*Ts)    # shift number of samples
Nf = int(Fs*Tf)    # frame number of samples
NFFT = int(2**(np.ceil(np.log2(Nf))))   # Nf보다 크거나 같은 2의 거듭제곱을 NFFT 로 정의
hNo = NFFT//2+1
print('Fs = %d, Ns = %d, Nf = %d, NFFT = %d, hNo = %d' % (Fs, Ns, Nf, NFFT, hNo))

/home/climate/workspace/ssp2023/practice/audio/audio_car.wav (175745,) [-0.01342773 -0.0222168  -0.02905273 ... -0.0390625  -0.03930664
 -0.04086304]
/home/climate/workspace/ssp2023/practice/audio/audio_car_wideband.wav (175745,) [-0.05984497 -0.14807129 -0.14700317 ... -0.10241699 -0.10253906
 -0.09594727]
Fs = 16000, Ns = 160, Nf = 320, NFFT = 512, hNo = 257


In [50]:
def generate_mixed_signals_2(speech, noise, SNRs, isdraw=False):
    std_s = np.sqrt(np.mean(speech**2))
    std_n = np.sqrt(np.mean(noise[:len(speech)]**2))
    mixedSig = []
    for snr in SNRs:
        gain = np.power(10, -snr/20)
        gn = noise[:len(speech)]/std_n*std_s*gain
        m = speech + gn
        mixedSig.append(m)

    return mixedSig

In [51]:
audioroot = valroot
audioclean = valclean
labels = labels_val
noisyroots = [addpath(audioroot,'nbnSNR'), addpath(audioroot,'wbnSNR')]
SNRs = [10, 0, -10]

for subname in labels:
    num_files = 0
    for w in range(10):
        for trial in range(10):
            basename = '%d/kdigits%d-%d.wav'%(w,trial,w)
            infile = addpath(audioclean, addpath(subname, basename))            
            num_files += 1
            
            signal, Fs = librosa.load(infile, sr=Fs, mono=True)
            nbnsig = generate_mixed_signals_2(signal, noise, SNRs, False)
            wbnsig = generate_mixed_signals_2(signal, wnoise, SNRs, False)
            noisy = [nbnsig, wbnsig]
            
            for jj in range(len(noisy)):
                for n in range(len(noisy[jj])):
                    outfile = addpath('%s%d'%(noisyroots[jj],SNRs[n]), addpath(subname, basename))
                    wav.writewav(outfile, Fs, noisy[jj][n], maxval=1.0)

outputpaths = []
for jj in range(len(noisy)):
    for n in range(len(noisy[jj])):
        outputpaths.append('%s%d'%(noisyroots[jj],SNRs[n]))

In [52]:
# Nosie model Test
for path in outputpaths:
    print('--------------------------------')
    print('testing', path)
    validation_digits(speechmodels, gmmhmmindexdict, path, labels, 'kdigits', num_trials=10)

--------------------------------
testing /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-val/nbnSNR10
/home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-val/nbnSNR10/chlee/0/kdigits0-0.wav 0 (98, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-val/nbnSNR10/chlee/0/kdigits1-0.wav 0 (105, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-val/nbnSNR10/chlee/0/kdigits2-0.wav 0 (94, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-val/nbnSNR10/chlee/0/kdigits3-0.wav 0 (96, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-val/nbnSNR10/chlee/0/kdigits4-0.wav 0 (100, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-val/nbnSNR10/chlee/0/kdigits5-0.wav 0 (95, 6) /home/climate/workspace/ssp2023/practice/audio/segmented_all/segmented-val/nbnSNR10/chlee/0/kdigits6-0.wav 0 (96, 6) /home/climate/workspace/ssp2023/practice/audio/segment

In [59]:
audioroot = current_path+'/audio/segmented_all/unsegmented-test'
# audioclean = addpath(audioroot,'org')
labels = ['gjang']
noisyroots = [addpath(audioroot,'nbnSNR'), addpath(audioroot,'wbnSNR')]
SNRs = [10, 0, -10]

for subname in labels:
    num_files = 0
    for trial in range(10):
        basename = 'kdigits%d.wav' % (trial)
        # infile = addpath(audioclean, addpath(subname, basename))
        infile = addpath(audioroot, addpath(subname, basename))
        
        print(infile)
        num_files += 1

        signal, Fs = librosa.load(infile, sr=Fs, mono=True)
        nbnsig = generate_mixed_signals_2(signal, np.concatenate((noise,noise,noise)), SNRs, False)
        wbnsig = generate_mixed_signals_2(signal, np.concatenate((wnoise,wnoise,wnoise)), SNRs, False)
        noisy = [nbnsig, wbnsig]

        for jj in range(len(noisy)):
            for n in range(len(noisy[jj])):
                outfile = addpath('%s%d'%(noisyroots[jj],SNRs[n]), addpath(subname, basename))
                wav.writewav(outfile, Fs, noisy[jj][n], maxval=1.0)

outputpaths = []
for jj in range(len(noisy)):
    for n in range(len(noisy[jj])):
        outputpaths.append('%s%d'%(noisyroots[jj],SNRs[n]))

/home/climate/workspace/ssp2023/practice/audio/segmented_all/unsegmented-test/gjang/kdigits0.wav
/home/climate/workspace/ssp2023/practice/audio/segmented_all/unsegmented-test/gjang/kdigits1.wav
/home/climate/workspace/ssp2023/practice/audio/segmented_all/unsegmented-test/gjang/kdigits2.wav
/home/climate/workspace/ssp2023/practice/audio/segmented_all/unsegmented-test/gjang/kdigits3.wav
/home/climate/workspace/ssp2023/practice/audio/segmented_all/unsegmented-test/gjang/kdigits4.wav
/home/climate/workspace/ssp2023/practice/audio/segmented_all/unsegmented-test/gjang/kdigits5.wav
/home/climate/workspace/ssp2023/practice/audio/segmented_all/unsegmented-test/gjang/kdigits6.wav
/home/climate/workspace/ssp2023/practice/audio/segmented_all/unsegmented-test/gjang/kdigits7.wav
/home/climate/workspace/ssp2023/practice/audio/segmented_all/unsegmented-test/gjang/kdigits8.wav
/home/climate/workspace/ssp2023/practice/audio/segmented_all/unsegmented-test/gjang/kdigits9.wav
