# 语音识别
#### 顾翔，沈贝宁、那铭心、季诚
#### 本题由顾翔负责
---
## 运行环境
* python3
* 安装有pandas、scikit-learn、numpy库
* 运行软件：Anaconda、jupyter notebook
----
* 注：本题使用到的算法（特征处理STFT，训练GMM-HMM），过于复杂，因此代码参考较多

### 一、数据处理
* 该次作业数据处理极为复杂，首先,需要从文件汇总.wav数据集，根据语音样本转化为矩阵
* 接着对数据进行短时傅里叶变换STFT，首先语音将会被分成几个小块，然后对每一小块进行傅里叶变换FFT，得到一个二维的FFT“图片”，这个“图片”即我们通常所说的频谱。选择不同的FFT的大小会得到不同频率分辨率下的结果，而重叠这些窗口可以让我们以增加数据大小为代价来控制时间分辨率。
* 将STFT后的数据使用滑窗提取峰值，最终得到的是一个 105×6×216 的三维数组，最终每一个语音波形都得到216个重叠帧，每一帧用最大的6个峰值对应的频率来描述。

In [16]:
fpaths = [] # 文件路径
labels = [] # 每一个语音文件所包含的单词内容
spoken = [] # 所有语音文件所包含的单词内容列表
import os
for f in os.listdir('./audio')[0:]: 
    for w in os.listdir('./audio/' + f):
        fpaths.append('./audio/' + f + '/' + w)
        labels.append(f)
        if f not in spoken:
            spoken.append(f)
print('Words spoken:', spoken)

Words spoken: ['apple', 'banana', 'kiwi', 'lime', 'orange', 'peach', 'pineapple']


In [17]:
from scipy.io import wavfile
import numpy as np

data = np.zeros((len(fpaths), 32000)) # 每一行为一个语音样本，每一列为样本的一个特征
maxsize = -1 # 最大的列数，每一个音频文件的时长不同，所以在循环中需要不断更改
for n,file in enumerate(fpaths):
    _, d = wavfile.read(file)
    data[n, :d.shape[0]] = d
    if d.shape[0] > maxsize:
        maxsize = d.shape[0]
data = data[:, :maxsize]

#Each sample file is one row in data, and has one entry in labels
print('Number of files total:', data.shape[0])
all_labels = np.zeros(data.shape[0])
for n, l in enumerate(set(labels)):
    all_labels[np.array([i for i, _ in enumerate(labels) if _ == l])] = n
print('data',data.shape)
print('Labels and label indices', all_labels)

Number of files total: 105
data (105, 6966)
Labels and label indices [5. 5. 5. 5. 5. 5. 5. 5. 5. 5. 5. 5. 5. 5. 5. 6. 6. 6. 6. 6. 6. 6. 6. 6.
 6. 6. 6. 6. 6. 6. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
 1. 1. 1. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 4. 4. 4. 4. 4. 4.
 4. 4. 4. 4. 4. 4. 4. 4. 4.]


In [18]:
import scipy

"""
函数 stft()：实现短时傅里叶变换STFT
输入参数 x：原始数据，ndarray；
        fftsize：FFT大小，默认为64
        overlap_pct：重叠窗口大小(FFT大小的倍数)，默认为0.5
输出参数 raw[:,:(fftsize//2)]：二维STFT谱
"""
def stft(x, fftsize=64, overlap_pct=.5):
    hop = int(fftsize * (1 - overlap_pct))
    w = scipy.hanning(fftsize + 1)[:-1] # 转换为汉明窗口
    raw = np.array([np.fft.rfft(w * x[i:i + fftsize]) for i in range(0, len(x) - fftsize, hop)])
    return raw[:, :(fftsize // 2)]

In [19]:
from numpy.lib.stride_tricks import as_strided

"""
函数 peakfind()：STFT的峰值检测
输入参数 x：STFT谱；
        n_peaks：最终选择峰值数
        l_size：窗口左半部分大小，默认为3
        r_size：窗口中间部分大小，默认为3
        c_size：窗口右半部分大小，默认为3
        f：对每部分采用的统计函数，默认为均值np.mean
输出参数 heights：波峰对应的高度（幅值）
        top[:n_peaks]：前n_peaks个波峰的频率值
"""
def peakfind(x, n_peaks, l_size=3, r_size=3, c_size=3, f=np.mean):
    win_size = l_size + r_size + c_size
    shape = x.shape[:-1] + (x.shape[-1] - win_size + 1, win_size)
    strides = x.strides + (x.strides[-1],)
    xs = as_strided(x, shape=shape, strides=strides)
    def is_peak(x):
        centered = (np.argmax(x) == l_size + int(c_size/2))
        l = x[:l_size]
        c = x[l_size:l_size + c_size]
        r = x[-r_size:]
        passes = np.max(c) > np.max([f(l), f(r)])
        if centered and passes:
            return np.max(c)
        else:
            return -1
    r = np.apply_along_axis(is_peak, 1, xs)
    top = np.argsort(r, None)[::-1]
    heights = r[top[:n_peaks]]
    top[top > -1] = top[top > -1] + l_size + int(c_size / 2.)
    return heights, top[:n_peaks]

In [20]:
all_obs = []
for i in range(data.shape[0]):
    d = np.abs(stft(data[i, :]))
    n_dim = 6
    obs = np.zeros((n_dim, d.shape[0]))
    for r in range(d.shape[0]):
        _, t = peakfind(d[r, :], n_peaks=n_dim)
        obs[:, r] = t.copy()
    if i % 10 == 0:
        print("Processed obs %s" % i)
    all_obs.append(obs)
    
all_obs = np.atleast_3d(all_obs)
print (all_obs.shape)

  w = scipy.hanning(fftsize + 1)[:-1] # 转换为汉明窗口


Processed obs 0
Processed obs 10
Processed obs 20
Processed obs 30
Processed obs 40
Processed obs 50
Processed obs 60
Processed obs 70
Processed obs 80
Processed obs 90
Processed obs 100
(105, 6, 216)


### 二、GMM-HMM
* GMM决定了隐马尔科夫模型中状态与输入语音帧之间的符合情况，HMM用来处理在时间轴上的声学可变性
* HMM使用前后向算法

In [21]:
import scipy.stats as st
import numpy as np

class GmmHmm:
    
    def __init__(self, n_states):
        self.n_states = n_states
        self.random_state = np.random.RandomState(0)
        
        # 随机初始化
        self.prior = self._normalize(self.random_state.rand(self.n_states, 1))
        self.A = self._stochasticize(self.random_state.rand(self.n_states, self.n_states))
        
        self.mu = None
        self.covs = None
        self.n_dims = None
           
    def _forward(self, B):
        log_likelihood = 0.
        T = B.shape[1]
        alpha = np.zeros(B.shape)
        for t in range(T):
            if t == 0:
                alpha[:, t] = B[:, t] * self.prior.ravel()
            else:
                alpha[:, t] = B[:, t] * np.dot(self.A.T, alpha[:, t - 1])
         
            alpha_sum = np.sum(alpha[:, t])
            alpha[:, t] /= alpha_sum
            log_likelihood = log_likelihood + np.log(alpha_sum)
        return log_likelihood, alpha
    
    def _backward(self, B):
        T = B.shape[1]
        beta = np.zeros(B.shape);
           
        beta[:, -1] = np.ones(B.shape[0])
            
        for t in range(T - 1)[::-1]:
            beta[:, t] = np.dot(self.A, (B[:, t + 1] * beta[:, t + 1]))
            beta[:, t] /= np.sum(beta[:, t])
        return beta
    
    def _state_likelihood(self, obs):
        obs = np.atleast_2d(obs)
        B = np.zeros((self.n_states, obs.shape[1]))
        for s in range(self.n_states):
            #N 这一部分需要 0.14版本以上的Scipy
            np.random.seed(self.random_state.randint(1))
            B[s, :] = st.multivariate_normal.pdf(
                obs.T, mean=self.mu[:, s].T, cov=self.covs[:, :, s].T)
        return B
    
    def _normalize(self, x):
        return (x + (x == 0)) / np.sum(x)
    
    def _stochasticize(self, x):
        return (x + (x == 0)) / np.sum(x, axis=1)
    
    def _em_init(self, obs):
        
        if self.n_dims is None:
            self.n_dims = obs.shape[0]
        if self.mu is None:
            subset = self.random_state.choice(np.arange(self.n_dims), size=self.n_states, replace=False)
            self.mu = obs[:, subset]
        if self.covs is None:
            self.covs = np.zeros((self.n_dims, self.n_dims, self.n_states))
            self.covs += np.diag(np.diag(np.cov(obs)))[:, :, None]
        return self
    
    def _em_step(self, obs): 
        obs = np.atleast_2d(obs)
        B = self._state_likelihood(obs)
        T = obs.shape[1]
        
        log_likelihood, alpha = self._forward(B)
        beta = self._backward(B)
        
        xi_sum = np.zeros((self.n_states, self.n_states))
        gamma = np.zeros((self.n_states, T))
        
        for t in range(T - 1):
            partial_sum = self.A * np.dot(alpha[:, t], (beta[:, t] * B[:, t + 1]).T)
            xi_sum += self._normalize(partial_sum)
            partial_g = alpha[:, t] * beta[:, t]
            gamma[:, t] = self._normalize(partial_g)
              
        partial_g = alpha[:, -1] * beta[:, -1]
        gamma[:, -1] = self._normalize(partial_g)
        
        expected_prior = gamma[:, 0]
        expected_A = self._stochasticize(xi_sum)
        
        expected_mu = np.zeros((self.n_dims, self.n_states))
        expected_covs = np.zeros((self.n_dims, self.n_dims, self.n_states))
        
        gamma_state_sum = np.sum(gamma, axis=1)
        #在除法运算之前，将0改为1
        gamma_state_sum = gamma_state_sum + (gamma_state_sum == 0)
        
        for s in range(self.n_states):
            gamma_obs = obs * gamma[s, :]
            expected_mu[:, s] = np.sum(gamma_obs, axis=1) / gamma_state_sum[s]
            partial_covs = np.dot(gamma_obs, obs.T) / gamma_state_sum[s] - np.dot(expected_mu[:, s], expected_mu[:, s].T)
            # 对称
            partial_covs = np.triu(partial_covs) + np.triu(partial_covs).T - np.diag(partial_covs)
        
        #加上对角元素以保证半正定
        expected_covs += .01 * np.eye(self.n_dims)[:, :, None]
        
        self.prior = expected_prior
        self.mu = expected_mu
        self.covs = expected_covs
        self.A = expected_A
        return log_likelihood
    
    def fit(self, obs, n_iter=15):
        # obj支持2D或者3D数组
        # 2D数组的形式为 n_features, n_dims
        # 3D数组的形式为 n_examples, n_features, n_dims
        # 例如一个语音片段若包含6个特征，并且由105个不同的样本
        # 那么这个obj数组为三维，大小为(105,6,X)，其中X为frames的个数
        # 对于只包含一个样本的数据obj大小为(6,X)
        if len(obs.shape) == 2:
            for i in range(n_iter):
                self._em_init(obs)
                log_likelihood = self._em_step(obs)
        elif len(obs.shape) == 3:
            count = obs.shape[0]
            for n in range(count):
                for i in range(n_iter):
                    self._em_init(obs[n, :, :])
                    log_likelihood = self._em_step(obs[n, :, :])
        return self
    
    def transform(self, obs):
        # obj支持2D或者3D数组
        # 2D数组的形式为 n_features, n_dims
        # 3D数组的形式为 n_examples, n_features, n_dims
        # 例如一个语音片段若包含6个特征，并且由105个不同的样本
        # 那么这个obj数组为三维，大小为(105,6,X)，其中X为frames的个数
        # 对于只包含一个样本的数据obj大小为(6,X)
        if len(obs.shape) == 2:
            B = self._state_likelihood(obs)
            log_likelihood, _ = self._forward(B)
            return log_likelihood
        elif len(obs.shape) == 3:
            count = obs.shape[0]
            out = np.zeros((count,))
            for n in range(count):
                B = self._state_likelihood(obs[n, :, :])
                log_likelihood, _ = self._forward(B)
                out[n] = log_likelihood
            return out

In [22]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(all_obs,
                                                    all_labels,
                                                    train_size=0.8,
                                                    test_size=0.2,
                                                    random_state =666,
                                                    stratify=all_labels)

# 由于HMM得到的结果是概率，所以我们需要对特征进行归一化
for n,i in enumerate(all_obs):
    all_obs[n] /= all_obs[n].sum(axis=0)

print('Size of training matrix:', X_train.shape)
print('Size of testing matrix:', X_test.shape)

Size of training matrix: (84, 6, 216)
Size of testing matrix: (21, 6, 216)


### 三、结果分析
* 效果并不是十分理想

In [23]:
ys = set(all_labels)
ms = [GmmHmm(6) for y in ys]
_ = [m.fit(X_train[y_train == y, :, :]) for m, y in zip(ms, ys)]
ps = [m.transform(X_test) for m in ms]
res = np.vstack(ps)
predicted_labels = np.argmax(res, axis=0)
missed = (predicted_labels != y_test)
print('Test accuracy: %.2f percent' % (100 * (1 - np.mean(missed))))

  alpha[:, t] /= alpha_sum
  log_likelihood = log_likelihood + np.log(alpha_sum)
  beta[:, t] /= np.sum(beta[:, t])


Test accuracy: 14.29 percent
