In [26]:
import wave
import numpy as np
import os
import sys

class FeatureExtractor():
    '''특징값(FBANK, MFCC)을 추출하는 클래스
    sample_frequency: 입력 파형 샘플링 주파수[Hz]
    frame_length: 프레임 사이즈
    frame_shift: 분석간격(프레임 시프트) [msec]
    num_mel_nins: Mel 필터 뱅크 수(=FBANK 특징 차원 수)
    num_ceps: MFCC 특징 차원 수(0차원 포함)
    lifter_coef: 리프터링 처리 매개변수
    low_frequency: 저주파수 대역 제거 cut-off 주파수 [Hz]
    high_frequency: 고주파수 대역 재거 cut-off 주파수 [Hz]
    dither: 디더링 처리 매개변수(잡음 크기)
    '''
    def __init__(self, sample_frequency=16000, frame_length=25, frame_shift=10, num_mel_bins=23, num_ceps=13, lifter_coef=22, low_frequency=20,high_frequency=8000, dither=1.0):
        #샘플링 주파수[Hz]
        self.sample_freq = sample_frequency
        # 창폭을 msec에서 샘플 수로 변환
        self.frame_size = int(sample_frequency * frame_length * 0.001)
        self.frame_shift = int(sample_frequency * frame_shift * 0.001)
        #mel 필터 뱅크 수
        self.num_mel_bins = num_mel_bins
        #MFCC 차원 수(0차원 포함)
        self.num_ceps = num_ceps
        #리프터링 매개변수
        self.lifter_coef = lifter_coef
        # 저주파수 대역 제거 절단 주파수[Hz]
        self.low_frequency = low_frequency
        # 고주파수 대역 제거 절단 주파수[Hz]
        self.high_frequency = high_frequency
        #디더링 계수
        self.dither_coef = dither
        # FFT 포인트 수 = 창폭 이상의 2제곱
        self.fft_size=1
        while self.fft_size < self.frame_size:
            self.fft_size *=2
        
        # Mel 필터뱅크를 작성
        self.mel_filter_bank = self.MakeMelFilterBank()
        
        self.dct_matrix = self.MakeDCTMatrix()
        
        self.lifter = self.MakeLifter()
        
    def Herz2Mel(self, herz):
        return (1127.0 * np.log(1.0 + herz / 700))
    def MakeMelFilterBank(self):
        '''Mel 필터뱅크 생성
        '''
        #Mel 축에서 최대 주파수
        mel_high_freq = self.Herz2Mel(self.high_frequency)
        # Mel 축에서 최소 주파수
        mel_low_freq = self.Herz2Mel(self.low_frequency)
        #최소에서 최대 주파수 까지
        #Mel 축 위에서 동일 간격으로 주파수를 취함
        mel_points = np.linspace(mel_low_freq, mel_high_freq, self.num_mel_bins+2)
        #파워 스펙트럼 차원 수 = FFT사이즈 /2 + 1
        dim_spectrum = int(self.fft_size /2 +1)
        #멜 필터 뱅크(필터 수 * 스펙트럼 차원 수)
        mel_filter_bank = np.zeros((self.num_mel_bins, dim_spectrum))
        for m in range(self.num_mel_bins):
            left_mel = mel_points[m]
            center_mel = mel_points[m+1]
            right_mel = mel_points[m+2]
            #파워 스펙트럼의 각 bin에 대응하는 가중치 계산
            for n in range(dim_spectrum):
                #각 bin에 대응하는 Hz축 주파수 계산
                freq = 1.0 * n *self.sample_freq/2 /dim_spectrum
                # Mel 주파수로 변환
                mel = self.Herz2Mel(freq)
                
                if mel > left_mel and mel < right_mel :
                    if mel <= center_mel:
                        weight=(mel - left_mel) / (center_mel - left_mel)
                    else:
                        weight=(right_mel- mel)/ (right_mel - center_mel)
                    mel_filter_bank[m][n]=weight
                    
        return mel_filter_bank
    
    def ExtractWindow(self, waveform, start_index, num_samples):
        '''
        1레임 분량의 파형 데이터를 추출하고 전처리하고, 로그 파워값을 계산
        '''
        window = waveform[start_index:start_index + self.frame_size].copy()
        
        #디더링
        # (-dither coef ~ dither_coef 사이 값을 난수로 추가)
        if self.dither_coef >0:
            window = window +np.random.rand(self.frame_size) *(2 *self.dither_coef)- self.dither_coef
        #직류 성분 제거
        window= window- np.mean(window)
        # 아래 처리를 실행하기 전에 파워를 구함
        power= np.sum(window **2)
        #로그 계산시 -inf가 출력되지 않도록 플로어링 처리
        if power < 1E-10:
            power = 1E-10
        #로그를 취한다
        log_power = np.log(power)
        
        #Pre Emphasis(고역 강조)
        #window[i]= 1.0 *window[i] -0.97 *window[i-1]
        window = np.convolve(window, np.array([1.0, -0.97]), mode='same')# numpy 이동평균, 두 배열의 선형 컨볼루션을 반환.
        #numpy convolve는 0번째 요소는 처리되지 않기 때문에(window[i-1]이 없기에,window[0-1]을 window[0]으로 대체하여 처리한다
        window[0]-=0.97*window[0]
        
        #해밍 창 함수를 적용
        window *=np.hamming(self.frame_size)
        
        return window, log_power
    
    def ComputeFBANK(self, waveform):
        '''
        출력1: fbank_features: 로그 Mel 필터 뱅크 특징
        출력2: log_power: 로그 파워값(MFCC 추출시에 사용)
        '''
        #파형 데이터 총 샘플 수
        num_samples = np.size(waveform)
        # 특징값의 총 프레임 수를 계산
        num_frames = (num_samples - self.frame_size) // self.frame_shift + 1
        # Mel 필터 뱅크 특징
        fbank_features = np.zeros((num_frames, self.num_mel_bins))
        # 로그 파워(MFCC 특성 구할때 사용)
        log_power = np.zeros(num_frames)
        #1프레임마다 특징값을 계산한다
        for frame in range(num_frames):
            #분석 시작 위치는
            # 프레임 번호(0에서 시작) * 프레임 시프트
            start_index = frame * self.frame_shift
            # 1프레임 분량의 파형을 추출하여 전처리를 수행하고, 로그값도얻는다 
            window, log_pow = self.ExtractWindow(waveform, start_index, num_samples)
            #파워 스펙트럼을 계산
            spectrum = np.fft.fft(window, n=self.fft_size)
            spectrum = spectrum[:int(self.fft_size/2)+1]
            spectrum = np.abs(spectrum) ** 2
            
            #Mel 필터 뱅크 계산
            fbank = np.dot(spectrum, self.mel_filter_bank.T)
            
            #로그 게산시 -inf가 출력되지 않도록 플로어링 처리
            fbank[fbank<0.1] = 0.1
            
            #로그를 취해서 fbank_features에 첨가한다
            fbank_features[frame] = np.log(fbank)
            
            # 로그 파워 값을 log_power에 첨가
            log_power[frame] = log_pow
            
        return fbank_features, log_power
        
    def MakeDCTMatrix(self):
        ''' 이산 코사인 변환(DCT)의 기저 행렬을 작성
        '''
        N = self.num_mel_bins
        # DCT 기저행렬(기저수(=MFCC 차원수) X FBANK 차원 수)
        dct_matrix = np.zeros((self.num_ceps, self.num_mel_bins))
        for k in range(self.num_ceps):
            if k == 0:
                dct_matrix[k]= np.ones(self.num_mel_bins) * 1.0 / np.sqrt(N)
            else:
                dct_matrix[k] = np.sqrt(2/N) * np.cos((2.0 * np.arange(N)+1*k*np.pi) / (2*N))
        
        return dct_matrix
    
    def MakeLifter(self):
        '''
        리프터 계산하기
        '''
        Q = self.lifter_coef
        I = np.arange(self.num_ceps)
        lifter = 1.0 + 0.5 * Q * np.sin(np.pi * I / Q)
        return lifter
    
    def ComputeMFCC(self, waveform):
        '''
        MFCC 계산하기
        '''
        
        #FBANK 및 로그 파워 계산하기
        fbank, log_power = self.ComputeFBANK(waveform)
        
        #DCT 기저 행렬과 곱셈으로 DCT 구현함
        
        mfcc = np.dot(fbank, self.dct_matrix.T)
        #리프터링하기
        mfcc *=self.lifter
        
        #MFCC 0차원 값을 전처리 하기전에
        #파형 로그 파워로 치환한다
        mfcc[:,0] = log_power
        
        return mfcc


In [30]:
if __name__ == '__main__':
    
    train_small_wav_scp = "./data/label/train_small/wav.scp"
    train_small_out_dir = './mfcc/train_small/'
    train_large_wav_scp = './data/label/train_large/wav.scp'
    train_large_out_dir = './mfcc/train_large/'
    dev_wav_scp = './data/label/dev/wav.scp'
    dev_out_dir = './mfcc/dev/'
    test_wav_scp = './data/label/test/wav.scp'
    test_out_dir = './mfcc/test/'
    
    # 샘플링 주파수 [Hz]
    sample_frequency = 16000
    # 프레임 길이 [msec]
    frame_length = 25
    # 프레임 스프트[msec]
    frame_shift = 10
    # 저주파수 대역 제거 절단 주파수 [Hz]
    low_frequency = 20
    # 고주파수 대역 제거 절단 주파수 [Hz]
    high_frequency = sample_frequency / 2
    # 로그 Mel 필터 뱅크 feature 차원 수
    num_mel_bins = 40
    #디더링 계수
    dither = 1.0
    
    feat_extractor=FeatureExtractor(sample_frequency=sample_frequency,
                                   frame_length=frame_length,
                                   frame_shift=frame_shift,
                                   num_mel_bins=num_mel_bins,
                                   low_frequency=low_frequency,
                                   high_frequency=high_frequency,
                                   dither=dither)
    # wav 파일 목록과 출력 위치를 리스트로 생성
    wav_scp_list = [train_small_wav_scp,
                   train_large_wav_scp,
                   dev_wav_scp,
                   test_wav_scp]
    out_dir_list = [train_small_out_dir,
                   train_large_out_dir,
                   dev_out_dir,
                   test_out_dir]
    
    for (wav_scp, out_dir) in zip(wav_scp_list, out_dir_list):
        print('Input wav scp: %s' % (wav_scp))
        print('Output directory: %s' % (out_dir))
        
        #특징값 파일경로, 프레임 수,
        #차원수를 기록한 리스트
        feat_scp=os.path.join(out_dir, 'feats.scp')
        
        os.makedirs(out_dir, exist_ok=True)
        
        #wav 목록 읽기 모드
        # 특징값 리스트 쓰기 모드로 열기
        with open (wav_scp, mode='r') as file_wav, \
                open (feat_scp, mode='w') as file_feat:
            # wav 목록을 1행씩 읽어온다
            for line in file_wav:
                # 각 행에는 발화 ID와 wav 파일 경로가
                # 스페이스로 구분되어 있으므로
                # split 함수를 써서 스페이스 구분 행을
                # 리스트형 변수로 변환
                parts = line.split()
                # 0번째가 발화 ID
                utterance_id = parts[0]
                # 1번째가 wav 파일 경로
                wav_path = parts[1]
                
                with wave.open(wav_path) as wav:
                    if wav.getframerate() !=sample_frequency:
                        sys.stderr.write("The expected sampling rate is 16000")
                        exit(1)
                        
                    if wav.getnchannels() != 1:
                        sys.stderr.write('This program supports monaural wav file only.\n')
                        exit(1)
                        
                    num_samples = wav.getnframes()
                    
                    waveform = wav.readframes(num_samples)
                    
                    waveform = np.frombuffer(waveform, dtype=np.int16) #바이너리 읽기
                    
                    mfcc = feat_extractor.ComputeMFCC(waveform)
                    
                (num_frames, num_dims) = np.shape(mfcc)
                
                out_file = os.path.splitext(os.path.basename(wav_path))[0]
                out_file = os.path.join(os.path.abspath(out_dir), out_file + '.bin')
                
                mfcc = mfcc.astype(np.float32)
                
                mfcc.tofile(out_file)
                
                file_feat.write("%s %s %d %d\n" %
                    (utterance_id, out_file, num_frames, num_dims))

Input wav scp: ./data/label/train_small/wav.scp
Output directory: ./mfcc/train_small/
Input wav scp: ./data/label/train_large/wav.scp
Output directory: ./mfcc/train_large/
Input wav scp: ./data/label/dev/wav.scp
Output directory: ./mfcc/dev/
Input wav scp: ./data/label/test/wav.scp
Output directory: ./mfcc/test/
