<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Requirement" data-toc-modified-id="Requirement-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Requirement</a></span></li><li><span><a href="#Load-Data" data-toc-modified-id="Load-Data-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Load Data</a></span></li><li><span><a href="#Hyperparameters" data-toc-modified-id="Hyperparameters-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Hyperparameters</a></span></li><li><span><a href="#Split-Dataset" data-toc-modified-id="Split-Dataset-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Split Dataset</a></span></li><li><span><a href="#Pre-Processing" data-toc-modified-id="Pre-Processing-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Pre-Processing</a></span><ul class="toc-item"><li><span><a href="#Split-Blocks-For-Train" data-toc-modified-id="Split-Blocks-For-Train-5.1"><span class="toc-item-num">5.1&nbsp;&nbsp;</span>Split Blocks For Train</a></span></li><li><span><a href="#Split-Blocks-For-Test" data-toc-modified-id="Split-Blocks-For-Test-5.2"><span class="toc-item-num">5.2&nbsp;&nbsp;</span>Split Blocks For Test</a></span></li><li><span><a href="#Block-to-Sequence" data-toc-modified-id="Block-to-Sequence-5.3"><span class="toc-item-num">5.3&nbsp;&nbsp;</span>Block to Sequence</a></span></li><li><span><a href="#Get-Trainig-Sequence" data-toc-modified-id="Get-Trainig-Sequence-5.4"><span class="toc-item-num">5.4&nbsp;&nbsp;</span>Get Trainig Sequence</a></span></li><li><span><a href="#Get-Test-Sequence" data-toc-modified-id="Get-Test-Sequence-5.5"><span class="toc-item-num">5.5&nbsp;&nbsp;</span>Get Test Sequence</a></span></li></ul></li><li><span><a href="#Hidden-Markov-Model" data-toc-modified-id="Hidden-Markov-Model-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Hidden Markov Model</a></span><ul class="toc-item"><li><span><a href="#Parameters" data-toc-modified-id="Parameters-6.1"><span class="toc-item-num">6.1&nbsp;&nbsp;</span>Parameters</a></span></li><li><span><a href="#Initialize-Model" data-toc-modified-id="Initialize-Model-6.2"><span class="toc-item-num">6.2&nbsp;&nbsp;</span>Initialize Model</a></span></li><li><span><a href="#Training" data-toc-modified-id="Training-6.3"><span class="toc-item-num">6.3&nbsp;&nbsp;</span>Training</a></span></li><li><span><a href="#Test" data-toc-modified-id="Test-6.4"><span class="toc-item-num">6.4&nbsp;&nbsp;</span>Test</a></span></li><li><span><a href="#Evaluate" data-toc-modified-id="Evaluate-6.5"><span class="toc-item-num">6.5&nbsp;&nbsp;</span>Evaluate</a></span></li></ul></li></ul></div>

# Requirement
+ Python 3.7
+ numpy 数据科学计算库 1.17.2
+ matplotlib 画图库 3.1.1
+ sklearn 数据科学处理库 0.22
+ hmmlearn 隐马尔可夫模型库 0.2.2

In [1]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from IPython.core.interactiveshell import InteractiveShell
from sklearn.metrics import accuracy_score
from hmmlearn import hmm

from fetch_ORL_people import fetch_ORL_people
InteractiveShell.ast_node_interactivity = "all"
np.set_printoptions(precision=3)

# Load Data
读取ORL数据集，数据集中共40个人脸，每个人10张图片，共计400张图片，存入`faces`
`faces`具有以下属性：
+ `data`: (400,)
+ `images`: (400,56,46) 表示400张56X46的图片
+ `target`: (400,) 400张图片对应的人，即标签，用0-39的数字表示
+ `target_names`: (40,) 表示0-39每个数字对应的人名

在加载数据集后已经做好了如下预处理：
+ 将原来(112,92)大小的照片缩放为(56,46)大小
+ 将原图片通过一个(3,3)的最小值滤波器

In [2]:
faces = fetch_ORL_people('../Data')
faces.keys()

dict_keys(['data', 'images', 'target', 'target_names'])

# Hyperparameters
+ `coeff`：量化范围
+ `eps`：一个微小常数，防止除零错误
+ `n_state`：隐藏状态数
+ `n_obs`：观测状态数
+ `n_person`：人脸类别数
+ `random_split`：True 则随机划分，否则（默认）按原论文划分

In [3]:
coeff = np.array([18., 10., 7.])
eps = 1e-6
n_state = 7
n_obs = 1260
n_person = 40
random_split = False

# Split Dataset
两种划分方法
+ 随机层次划分，训练集和测试集各50%
+ 按原论文划分，取每个人脸第1, 5, 6, 7, 10张图片作为训练集，其余作为测试集


In [4]:
if random_split:
    idx_train, idx_test = train_test_split(np.arange(faces.images.shape[0]), test_size=0.5, stratify=faces.target, random_state=None)
else:
    idx_train = np.tile([0,4,5,7,9], n_person)
    idx_test = np.tile([1,2,3,6,8], n_person)
    for i in range(n_person):
        idx_train[i*5:i*5+5] += i*10
        idx_test[i*5:i*5+5] += i*10

train_faces = faces.images[idx_train]
y_train = faces.target[idx_train]
test_faces = faces.images[idx_test]
y_test = faces.target[idx_test]


# Pre-Processing
## Split Blocks For Train
按照原论文描述，对训练集每张图片做如下处理：
+ 将一张(56,46)大小的图片分割成52个有重叠的块
+ 将52个块分别做SVD奇异值分解得到三个矩阵 $U,S,V$
+ 取$U$矩阵第一个元素，$S$矩阵第一第二个元素，一共三个数作为该块的一个表示，即
$block_i=[U(0,0),S(0),S(1)]$
+ 保存每个块中，三个数的最大值可最小值，方便后面做归一量化（**Quantize**）

In [5]:
def split_block_cell_for_train(faces, n_blocks=52):
    n_faces = faces.shape[0]
    max_coeffs = np.array([-np.inf, -np.inf, -np.inf])
    min_coeffs = np.array([np.inf, 0, 0])
    block_cell = np.zeros(shape=(n_faces, n_blocks, 3))

    for faces_index in range(n_faces):
        img = faces[faces_index]
        for block_index in range(n_blocks):
            block = img[block_index:block_index + 5]
            U, S, V = np.linalg.svd(block)
            block_coeffs = np.array([U[0,0], S[0], S[1]])
            min_coeffs = np.minimum(block_coeffs, min_coeffs)
            max_coeffs = np.maximum(block_coeffs, max_coeffs)
            block_cell[faces_index, block_index] = block_coeffs
            
    return min_coeffs, max_coeffs, block_cell

## Split Blocks For Test
利用训练集获得的信息：
+ min_coeffs
+ max_coeffs
+ delta

对测试集的图片进行类似的处理：
+ 将一张(56,46)大小的图片分割成52个有重叠的块
    + $52=\frac{(H-L)}{L-p}$，其中$H$是图片的高度(56)，$L$是块的高度(5)，$P$是块之间重叠的长度(4)
+ 将52个块分别做SVD奇异值分解得到三个矩阵 $U,S,V$
+ 取$U$矩阵第一个元素，$S$矩阵第一第二个元素，一共三个数作为该块的一个表示，即
$block_i=[U(0,0),S(0),S(1)]$
+ 保证每个块的三个数位于训练集的最大值和最小值区间内，方便后面做归一量化（**Quantize**）

In [6]:
def split_block_cell_for_test(faces, min_coeffs, max_coeffs, n_blocks=52):
    n_faces = faces.shape[0]
    block_cell = np.zeros(shape=(n_faces, n_blocks, 3))
    
    for faces_index in range(n_faces):
        img = faces[faces_index]
        for block_index in range(n_blocks):
            block = img[block_index:block_index + 5]
            U, S, V = np.linalg.svd(block)
            block_coeffs = np.array([U[0,0], S[0], S[1]])
            block_coeffs = np.minimum(block_coeffs, max_coeffs)
            block_coeffs = np.maximum(block_coeffs, min_coeffs)
            block_cell[faces_index, block_index] = block_coeffs
            
    return block_cell

## Block to Sequence
+ 将每个由三个数表示的块(block)变成一个整数表示，即将$block_i=[U(0,0),S(0),S(1)] \rightarrow label \in \mathbb{Z}$
+ 转化原理如下： 先将每个bolck中的三个数分别量化至 [18,10,7]，获得一个三位数，假设这个三位数为 $abc$
    + 可以这样理解，a为18进制，b为10进制，c为7进制
    + 将$abc$转化为一个整数，转化方法即为：$label = a \times 10\times 7 + b \times 7 + c$
    + 显然，$label \in [0,1260)$，其中 $1260=18 \times 10 \times 7$
+ 通过这种方法，将每张图片转化成为一个长度为52的序列表示，一共有200张图片，则有(200,52)的序列矩阵

In [7]:
def block_to_seq(block_cell, min_coeffs, delta):
    n_faces, n_blocks, _ = block_cell.shape
    seq_matrix = np.zeros(shape=(n_faces, n_blocks))
    for faces_index in range(n_faces):
        for block_index in range(n_blocks):
            block_coeffs = block_cell[faces_index, block_index]
            Q_t = np.floor((block_coeffs - min_coeffs) / delta)
            label = Q_t[0]*10*7 + Q_t[1]*7 + Q_t[2]
            seq_matrix[faces_index, block_index] = label
    return seq_matrix

## Get Trainig Sequence
+ 从训练集获取三个信息
    + min_coeffs
    + max_coeffs
    + delta
+ 将训练集转化为序列矩阵(200,52)


In [8]:
min_coeffs, max_coeffs, train_block = split_block_cell_for_train(train_faces)
delta = (max_coeffs - min_coeffs) / (coeff - eps)
seq_train = block_to_seq(train_block, min_coeffs, delta)
seq_train = seq_train.astype(np.int32)

## Get Test Sequence
+ 根据训练集获取的三个信息
    + min_coeffs
    + max_coeffs
    + delta
+ 将测试集转化为序列矩阵(200,52)

In [9]:
test_block = split_block_cell_for_test(test_faces, min_coeffs, max_coeffs)
seq_test = block_to_seq(test_block, min_coeffs, delta)
seq_test = seq_test.astype(np.int32)

# Hidden Markov Model
## Parameters
按照原论文描述，定义马尔科夫模型的三个参数
+ 初始概率分布`startprob`
+ （隐）状态转移矩阵`transmat`
+ 发射矩阵`emissionprob`

In [10]:
startprob = np.zeros(n_state)
startprob[0] = 1.

transmat = np.zeros([n_state, n_state]) + eps
transmat[-1, -1] = 1.
for i in range(n_state-1):
    transmat[i, i] = 0.6
    transmat[i, i+1] = 0.4

transmat /= transmat.sum(1, keepdims=True)
emissionprob = np.ones([n_state, n_obs]) / n_obs


print(f'初始概率分布 {startprob.shape}:\n {startprob}')
print(f'状态转移矩阵 {transmat.shape}:\n {transmat}')
print(f'发射矩阵 {emissionprob.shape}:\n {emissionprob}')

初始概率分布 (7,):
 [1. 0. 0. 0. 0. 0. 0.]
状态转移矩阵 (7, 7):
 [[6.e-01 4.e-01 1.e-06 1.e-06 1.e-06 1.e-06 1.e-06]
 [1.e-06 6.e-01 4.e-01 1.e-06 1.e-06 1.e-06 1.e-06]
 [1.e-06 1.e-06 6.e-01 4.e-01 1.e-06 1.e-06 1.e-06]
 [1.e-06 1.e-06 1.e-06 6.e-01 4.e-01 1.e-06 1.e-06]
 [1.e-06 1.e-06 1.e-06 1.e-06 6.e-01 4.e-01 1.e-06]
 [1.e-06 1.e-06 1.e-06 1.e-06 1.e-06 6.e-01 4.e-01]
 [1.e-06 1.e-06 1.e-06 1.e-06 1.e-06 1.e-06 1.e+00]]
发射矩阵 (7, 1260):
 [[0.001 0.001 0.001 ... 0.001 0.001 0.001]
 [0.001 0.001 0.001 ... 0.001 0.001 0.001]
 [0.001 0.001 0.001 ... 0.001 0.001 0.001]
 ...
 [0.001 0.001 0.001 ... 0.001 0.001 0.001]
 [0.001 0.001 0.001 ... 0.001 0.001 0.001]
 [0.001 0.001 0.001 ... 0.001 0.001 0.001]]


## Initialize Model
+ 初始化模型的三个参数（初始概率，转移矩阵，发射矩阵），由于有四十个人（类别），因此需要40个隐马尔科夫模型，根据每个人的观测序列，更新优化各自的参数
+ hmm.MultinomialHMM 类各个参数含义：
    + `n_components`：隐状态个数
    + `n_features`：观测状态的个数（不是观测序列的长度！）
    + `tol`：模型训练时候的容忍度（决定模型何时收敛）
    + `n_iter`：模型最多训练次数（如果达到该训练次数模型还未收敛则停止）
    + `init_params`：是否需要模型（随机初始化）参数，可选`ste`分别代表初始状态分布，转移矩阵，发射矩阵，由于我们已经自定义了三个参数的初始化，因此这里只传入一个空字符串代表不进行随机初始化
    + `startprob_`：初始概率分布，大小 (n_components,)
    + `transmat_`： 转移矩阵，大小 (n_components, n_components)
    + `emissionprob_`：发射矩阵，大小 (n_components, n_features)

In [11]:
models = [hmm.MultinomialHMM(n_components=n_state, tol=1e-2, n_iter=5, init_params='') for _ in range(n_person)]
for model in models:
    model.startprob_ = startprob
    model.transmat_ = transmat
    model.emissionprob_ = emissionprob
    model.n_features = n_obs

## Training
每个模型各自进行训练，即为每个人（类别）取它的5个观测序列，从而优化它对应的模型的三个参数（初始概率，转移矩阵，发射矩阵），训练时间仅为695ms


In [12]:
%%time
for person_idx, model in enumerate(models):
    seq_idx = np.where(y_train==person_idx)[0]
    obs = seq_train[seq_idx]
    model.fit(obs.T)


CPU times: user 828 ms, sys: 0 ns, total: 828 ms
Wall time: 823 ms


## Test
取测试集的200个观测序列，分别计算每个模型对该观测序列的似然概率（即计算这个序列由该模型生成的概率值），生成测试结果为一个(200,40)的矩阵，代表着每个序列对应的模型的概率，用时稍长，2.98s

In [13]:
%%time
logprob = np.zeros((y_test.size, n_person))
for seq_idx, obs in enumerate(seq_test):
    for person_idx, model in enumerate(models):
        logprob[seq_idx, person_idx] = model.score(np.atleast_2d(obs))


CPU times: user 3.67 s, sys: 0 ns, total: 3.67 s
Wall time: 3.68 s


## Evaluate
评测准确率，取概率值最大的模型对应的类别，就是该图片的类别，准确率同原论文一样达到了96.5%

In [14]:
y_pred = logprob.argmax(1)
accuracy = accuracy_score(y_test, y_pred)
print(f'识别准确率 {accuracy*100}%.')

识别准确率 93.0%.
