#### 任务：基于HMM模型实现KTH视频数据的动作分类
1.	下载KTH数据集文件，并从数据集的视频中提取特征；奇数索引的视频数据作为训练集，偶数索引的视频数据作为测试集。
##### 实验1
2.	在提取特征后的训练数据的基础上，实现一个空间聚类算法（例如 K-Means，任何聚类算法均可）。
3.	为每类动作训练单独的隐马尔可夫模型（每个元音单独聚类，然后分别学习 HMM，输入是与每个2D点关联的聚类类别号）。对于每个测试数据，针对每个HMM计算其对数似然，即log P(O|M)，并获取给出最高对数似然的HMM类别，即对测试数据进行分类 判别。给出混淆矩阵并描述你的发现。
4.	改变2中的聚类数量（变量M）和3中隐藏节点的数量（变量N）并计算分类准确率。给出不同M和N取值下的混淆矩阵，并描述你的发现。
##### 实验2
5.	实现动态时间规整算法（Dynamic Time Warping），并重复1-4。
##### 实验3
6.	学习一个HMM模型，该HMM可以使用维特比解码(Viterbi Decoding)来执行分类（注意：本题目不是让你生成多个分类器并进行分类判别，而是生成一个HMM模型，可以执行多个类别判别）。将分类准确率与题目 3 和4的结果进行比较，并描述你的发现。




In [1]:
import cv2
import numpy as np
from skimage.transform import resize
from sklearn.cluster import KMeans
from hmmlearn import hmm
import os 
import matplotlib.pyplot as plt

In [None]:
提取特征

In [None]:
def extract_features_dt_f(video_file):
    cap = cv2.VideoCapture(video_file)
    frame_rate = cap.get(cv2.CAP_PROP_FPS)
    dense_sampling_interval = frame_rate
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    total_frames = min(frame_count, frame_rate * 100)  
    dense_trajectories = []
    prev_gray = None
    frame_number = 0
    hsv = np.zeros((160, 120, 3))
    while frame_number < total_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, (120, 160))
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        if prev_gray is not None:
            flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
            mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
            hsv[..., 0] = ang * 180 / np.pi / 2
            hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
            dense_trajectories.append(hsv.flatten())
        
        prev_gray = gray
        frame_number += dense_sampling_interval
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)

    cap.release()
    return np.array(dense_trajectories),np.array(dense_trajectories).shape[0]

提取特征的示例

In [2]:
from datasets import extract_features_dt_f
# Example usage:
video_path = r"data\jogging\person01_jogging_d1_uncomp.avi"  
features,_ = extract_features_dt_f(video_path)
features = np.array(features)
print(f'Shape of features: {features.shape}')
num_frames=features.shape[0]
print(features)
n_features=features.shape[1]

Shape of features: (16, 57600)
[[ 1.79402969e+02  0.00000000e+00  6.97106495e-02 ...  1.33374924e+02
   0.00000000e+00  1.02969337e+00]
 [ 8.76478500e+01  0.00000000e+00  1.28269106e-01 ...  3.30666199e+01
   0.00000000e+00  2.07341144e-10]
 [ 7.53964996e+00  0.00000000e+00  3.57635784e+00 ...  1.70400116e+02
   0.00000000e+00 -1.43300571e-09]
 ...
 [ 9.47653046e+01  0.00000000e+00  1.15680850e+00 ...  8.50442047e+01
   0.00000000e+00  2.57939649e+00]
 [ 1.10593462e+01  0.00000000e+00  8.45990628e-02 ...  1.03541100e+02
   0.00000000e+00  3.94400731e-02]
 [ 9.13240356e+01  0.00000000e+00  7.83343017e-02 ...  6.04778633e+01
   0.00000000e+00  1.43436951e-09]]


读取数据

In [3]:

import pickle
#---------------------dt-----------------------
with open('Data/data_dt_f.pkl', 'rb') as f:
     data = pickle.load(f)
# 从.pkl文件加载数据
with open('Data/labels_dt.pkl', 'rb') as f:
    labels = pickle.load(f)
with open('Data/frames.pkl', 'rb') as f:
     frames = pickle.load(f)

In [4]:
# print(data_pad[103].shape)
print(data[399].shape)
#num_frames=data[399].shape[0]
n_features=data[399].shape[1]

(13, 57600)


In [5]:
assert len(data)==599
print(data[103].shape)
data[590].shape

(12, 57600)


(21, 57600)

In [6]:
data=np.array(data)

  data=np.array(data)


In [7]:
data.shape

(599,)

In [8]:
len(data[209])

17

将各个动作视频分开，因为后续要单独聚类

In [9]:
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

actions = ["walking", "boxing", "handclapping", "jogging", "running", "handwaving"]
data_walk=data[0:100]
data_box=data[100:200]
data_handclap=data[200:299]
data_jog=data[299:399]
data_run=data[399:499]
data_wave=data[499:599]

frames_walk=frames[0:100]
frames_box=frames[100:200]
frames_handclap=frames[200:299]
frames_jog=frames[299:399]
frames_run=frames[399:499]
frames_wave=frames[499:599]

处理标签

In [10]:
labels_ = []  # 用来存储分割后的序列列表
for i,label in enumerate(labels):
    for j in range(frames[i]):
        labels_.append(label)
labels_ = np.array(labels_)

In [11]:
assert len(labels_)==11254

In [12]:
import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
# 假设 data 是您的数据集，形状为 (598, 50, 15552)
# 重新组织数据，将每个视频的帧、x坐标和y坐标合并成特征向量
data_walk=np.vstack(data_walk)
data_box=np.vstack(data_box)
data_handclap=np.vstack(data_handclap)
data_jog=np.vstack(data_jog)
data_run=np.vstack(data_run)
data_wave=np.vstack(data_wave)
data=np.vstack(data)

# 检查序列长度是否符合
assert data_walk.shape[0]==sum(frames_walk)
assert data_box.shape[0]==sum(frames_box)
assert data_handclap.shape[0]==sum(frames_handclap)
assert data_jog.shape[0]==sum(frames_jog)
assert data_run.shape[0]==sum(frames_run)
assert data_wave.shape[0]==sum(frames_wave)

In [13]:
# 对数据进行PCA降维
n_pca=260     #主成分个数
pca=PCA(n_components=n_pca)
pca.fit(data)
data_pca_walk = pca.transform(data_walk)
data_pca_box = pca.transform(data_box)
data_pca_handclap= pca.transform(data_handclap)
data_pca_jog = pca.transform(data_jog)
data_pca_run = pca.transform(data_run)
data_pca_wave = pca.transform(data_wave)
data_pca=pca.transform(data)

# 对每个数据点进行LDA降维
n_lda=5
lda =  LinearDiscriminantAnalysis(n_components=n_lda)
lda.fit(np.concatenate([data_pca_walk,data_pca_box,data_pca_handclap,data_pca_jog,data_pca_run,data_pca_wave]),labels_)
data_lda = lda.transform(data_pca)

In [None]:
# 绘制PCA降维后主成分的累积可解释方差比例的曲线
import matplotlib.pyplot as plt
plt.plot(np.cumsum(pca.explained_variance_ratio_))
plt.xlabel('Number of Principal Components')
plt.ylabel('Cumulative Explained Variance Ratio')
plt.title('Cumulative Explained Variance Ratio vs. Number of Components')
plt.show()

In [None]:
# 绘制LDA降维后的主成分累积可解释方差比例的曲线
import matplotlib.pyplot as plt
plt.plot(np.cumsum(lda.explained_variance_ratio_))
plt.xlabel('Number of Principal Components')
plt.ylabel('Cumulative Explained Variance Ratio')
plt.title('Cumulative Explained Variance Ratio vs. Number of Components')
plt.show()

对各个类别的视频进行单独聚类 

In [None]:
# 对各种视频单独进行KMeans聚类
num_clusters = 6  

kmeans_walk = KMeans(n_clusters=num_clusters, random_state=0,n_init=300)
clusters_walk = kmeans_walk.fit_predict(data_pca_walk) 

kmeans_box = KMeans(n_clusters=num_clusters, random_state=0,n_init=300)
clusters_box = kmeans_box.fit_predict(data_pca_box)

kmeans_handclap = KMeans(n_clusters=num_clusters, random_state=0,n_init=300)
clusters_handclap = kmeans_handclap.fit_predict(data_pca_handclap) 

kmeans_jog = KMeans(n_clusters=num_clusters, random_state=0,n_init=300)
clusters_jog = kmeans_jog.fit_predict(data_pca_jog) 

kmeans_run = KMeans(n_clusters=num_clusters, random_state=0,n_init=300)
clusters_run = kmeans_run.fit_predict(data_pca_run) 

kmeans_wave = KMeans(n_clusters=num_clusters, random_state=0,n_init=300)
clusters_wave = kmeans_wave.fit_predict(data_pca_wave) 

实验3中对所有数据进行一起聚类

In [None]:
# 对所有视频一起进行KMeans聚类
num_clusters_=18
kmeans = KMeans(n_clusters=num_clusters_, random_state=0,n_init=700,tol=0.00001)
clusters=kmeans.fit_predict(data_lda)

可视化聚类效果

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

# 创建一个3D图形
fig = plt.figure(figsize=(22,9))
ax = fig.add_subplot(111,projection='3d')

# 根据聚类标签绘制不同颜色的数据点
for label in np.unique(clusters_walk):
    indices = np.where(clusters_walk == label)
    ax.scatter(data_pca_walk[indices, 0], data_pca_walk[indices, 1], data_pca_walk[indices, 2], label=f'Cluster {label}') 
    
ax.set_xlabel('Principal Component 1')
ax.set_ylabel('Principal Component 2')
ax.set_zlabel('Principal Component 3')
ax.set_title('KMeans Clustering Visualization')

plt.legend()
plt.show()

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

# 创建一个3D图形
fig = plt.figure(figsize=(22,9))
ax = fig.add_subplot(111,projection='3d')

# 根据聚类标签绘制不同颜色的数据点
for label in np.unique(clusters):
    indices = np.where(clusters == label)
    ax.scatter(data_lda[indices, 0], data_lda[indices, 1], data_lda[indices, 2], label=f'Cluster {label}') 
    
ax.set_xlabel('Principal Component 1')
ax.set_ylabel('Principal Component 2')
ax.set_zlabel('Principal Component 3')
ax.set_title('KMeans Clustering Visualization')

plt.legend()
plt.show()

获取聚类后每个视频对应的标签序列

In [None]:
clusters_walk_ = []  # 用来存储分割后的序列列表
start_idx = 0
for frame in frames_walk:
    # 根据frames数组中的值，从data中分割出一个序列
    sequence = clusters_walk[start_idx:start_idx + frame]
    # 将该序列添加到sequences列表中
    clusters_walk_.append(sequence)
    # 更新起始索引
    start_idx += frame
data_walk = np.array(clusters_walk_)

clusters_box_ = []  # 用来存储分割后的序列列表
start_idx = 0
for frame in frames_box:
    # 根据frames数组中的值，从data中分割出一个序列
    sequence = clusters_box[start_idx:start_idx + frame]
    # 将该序列添加到sequences列表中
    clusters_box_.append(sequence)
    # 更新起始索引
    start_idx += frame
data_box = np.array(clusters_box_)

clusters_handclap_ = []  # 用来存储分割后的序列列表
start_idx = 0
for frame in frames_handclap:
    # 根据frames数组中的值，从data中分割出一个序列
    sequence = clusters_handclap[start_idx:start_idx + frame]
    # 将该序列添加到sequences列表中
    clusters_handclap_.append(sequence)
    # 更新起始索引
    start_idx += frame
data_handclap = np.array(clusters_handclap_)

clusters_jog_ = []  # 用来存储分割后的序列列表
start_idx = 0
for frame in frames_jog:
    # 根据frames数组中的值，从data中分割出一个序列
    sequence = clusters_jog[start_idx:start_idx + frame]
    # 将该序列添加到sequences列表中
    clusters_jog_.append(sequence)
    # 更新起始索引
    start_idx += frame
data_jog = np.array(clusters_jog_)

clusters_run_ = []  # 用来存储分割后的序列列表
start_idx = 0
for frame in frames_run:
    # 根据frames数组中的值，从data中分割出一个序列
    sequence = clusters_run[start_idx:start_idx + frame]
    # 将该序列添加到sequences列表中
    clusters_run_.append(sequence)
    # 更新起始索引
    start_idx += frame
data_run= np.array(clusters_run_)

clusters_wave_ = []  # 用来存储分割后的序列列表
start_idx = 0
for frame in frames_wave:
    # 根据frames数组中的值，从data中分割出一个序列
    sequence = clusters_wave[start_idx:start_idx + frame]
    # 将该序列添加到sequences列表中
    clusters_wave_.append(sequence)
    # 更新起始索引
    start_idx += frame
data_wave = np.array(clusters_wave_)

clusters_ = []  # 用来存储分割后的序列列表
start_idx = 0
for frame in frames:
    # 根据frames数组中的值，从data中分割出一个序列
    sequence = clusters[start_idx:start_idx + frame]
    # 将该序列添加到sequences列表中
    clusters_.append(sequence)
    # 更新起始索引
    start_idx += frame
data_all = np.array(clusters_)

# 输出聚类结果
for video_index, cluster_label in enumerate(data_walk):
    print(f"walk视频{video_index+1}的聚类结果：{cluster_label}")
    
for video_index, cluster_label in enumerate(data_box):
    print(f"box视频{video_index+1}的聚类结果：{cluster_label}")
    
for video_index, cluster_label in enumerate(data_handclap):
    print(f"handclap视频{video_index+1}的聚类结果：{cluster_label}")
    
for video_index, cluster_label in enumerate(data_jog):
    print(f"jog视频{video_index+1}的聚类结果：{cluster_label}")
    
for video_index, cluster_label in enumerate(data_run):
    print(f"run视频{video_index+1}的聚类结果：{cluster_label}")

for video_index, cluster_label in enumerate(data_wave):
    print(f"wave视频{video_index+1}的聚类结果：{cluster_label}")

加载标签数据

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

# 划分训练集和测试集
split=0.5
X_train_walk=data_walk[::2]
X_test_walk=data_walk[1::2]

X_train_box=data_box[::2]
X_test_box=data_box[1::2]

X_train_handclap=data_handclap[::2]
X_test_handclap=data_handclap[1::2]

X_train_jog=data_jog[::2]
X_test_jog=data_jog[1::2]

X_train_run=data_run[::2]
X_test_run=data_run[1::2]

X_train_wave=data_wave[::2]
X_test_wave=data_wave[1::2]

# 制作训练序列
data_walk=np.concatenate([i for i in X_train_walk]).reshape(-1,1)
data_box=np.concatenate([i for i in X_train_box]).reshape(-1,1)
data_handclap=np.concatenate([i for i in X_train_handclap]).reshape(-1,1)
data_jog=np.concatenate([i for i in X_train_jog]).reshape(-1,1)
data_run=np.concatenate([i for i in X_train_run]).reshape(-1,1)
data_wave=np.concatenate([i for i in X_train_wave]).reshape(-1,1)

# 序列长度
x_len_walk=frames_walk[::2]
x_len_box=frames_box[::2]
x_len_handclap=frames_handclap[::2]
x_len_jog=frames_jog[::2]
x_len_run=frames_run[::2]
x_len_wave=frames_wave[::2]

x_len_walk_=frames_walk[1::2]
x_len_box_=frames_box[1::2]
x_len_handclap_=frames_handclap[1::2]
x_len_jog_=frames_jog[1::2]
x_len_run_=frames_run[1::2]
x_len_wave_=frames_wave[1::2]

labels_walk=labels[0:100]
labels_box=labels[100:200]
labels_handclap=labels[200:299]
labels_jog=labels[299:399]
labels_run=labels[399:499]
labels_wave=labels[499:599]

In [None]:
print(len(data_handclap))
print(sum(x_len_handclap))
assert len(data_walk)==sum(x_len_walk)
assert len(data_box)==sum(x_len_box)
assert len(data_handclap)==sum(x_len_handclap)
assert len(data_jog)==sum(x_len_jog)
assert len(data_run)==sum(x_len_run)
assert len(data_wave)==sum(x_len_wave)

### 实验1

训练隐马尔可夫分类模型

hmm中的参数：

	n_components：一个整数，指定了状态的数量。
	covariance_type：一个字符串，指定了使用方差矩阵的类型。可以为：
		'spherical'：对每个状态，该状态的所有特征的方差都是同一个值。
		'diag'：每个状态的方差矩阵为对角矩阵。
		'full'：每个状态的方差矩阵为普通的矩阵。
		'tied'：所有状态都是用同一个普通的方差矩阵。
	min_covar：一个浮点数。给出了方差矩阵对角线上元素的最小值，用于防止过拟合。
	startprob_prior：一个数组，形状为(n_components, )。初始状态的先验概率分布。
	transmat_prior：一个数字，形状为(n_components, n_components )。先验的状态转移矩阵。
	algorithm：一个字符串。指定了Decoder算法。可以为 'viterbi'（维特比算法）或者'map' 。
	random_state：指定随机数种子。
	tol：指定迭代收敛阈值。
	verbose：指定打印日志。
	params：一个字符串。控制在训练过程中，哪些参数能够得到更新（你也可以指定它们的组合形式）：
		's'：初始概率。
		't'：转移概率。
		'm'：均值。
		'c'：偏差。
	init_params：一个字符串。控制在训练之前，先初始化哪些参数（你也可以指定它们的组合形式）：
		's'：初始概率。
		't'：转移概率。
		'm'：均值。
		'c'：偏差。

每个类别的视频数据训练单独的隐马尔可夫模型

In [None]:
from hmmlearn import hmm

n_hmm=6      # 隐藏节点个数
# assert n_hmm<10
n_iter=300              # 代最大个数   调大点 >200
algorithm='map'     # 解码算法   最大后验概率解码       'viterbi'维特比解码算法
tol=0.001           # 相邻两次迭代之间对数似然变化 迭代终止阈值
init_params='sc' # stemc  stec-56% sec-57% sc-58% c-51%
n_features=220 
assert n_features>=num_clusters 
min_covar=0.1
# 迭代算法：EM算法
#-----------高斯分布---------
# model_walk = hmm.GaussianHMM(n_components=n_hmm,covariance_type="full", n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params,min_covar=min_covar)
# model_box = hmm.GaussianHMM(n_components=n_hmm, covariance_type="full", n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params)
# model_handclap= hmm.GaussianHMM(n_components=n_hmm, covariance_type="full",n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params)
# model_jog = hmm.GaussianHMM(n_components=n_hmm, covariance_type="full", n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params)
# model_run = hmm.GaussianHMM(n_components=n_hmm, covariance_type="full", n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params)
# model_handwave = hmm.GaussianHMM(n_components=n_hmm, covariance_type="full", n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params)
#--------混合高斯分布------------
# model_walk = hmm.GMMHMM(n_components=n_hmm,covariance_type="full", n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params,min_covar=min_covar)
# model_box = hmm.GMMHMM(n_components=n_hmm, covariance_type="full", n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params,min_covar=min_covar)
# model_handclap= hmm.GMMHMM(n_components=n_hmm, covariance_type="full",n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params,min_covar=min_covar)
# model_jog = hmm.GMMHMM(n_components=n_hmm, covariance_type="full", n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params,min_covar=min_covar)
# model_run = hmm.GMMHMM(n_components=n_hmm, covariance_type="full", n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params,min_covar=min_covar)
# model_handwave = hmm.GMMHMM(n_components=n_hmm, covariance_type="full", n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params,min_covar=min_covar)
#--------多项式--------------------

model_walk = hmm.CategoricalHMM(n_components=n_hmm, n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params,n_features=n_features)
model_box = hmm.CategoricalHMM(n_components=n_hmm,  n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params,n_features=n_features)
model_handclap= hmm.CategoricalHMM(n_components=n_hmm, n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params,n_features=n_features)
model_jog = hmm.CategoricalHMM(n_components=n_hmm,  n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params,n_features=n_features)
model_run = hmm.CategoricalHMM(n_components=n_hmm, n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params,n_features=n_features)
model_handwave = hmm.CategoricalHMM(n_components=n_hmm,  n_iter=n_iter,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params,n_features=n_features)

model_walk.fit(data_walk,x_len_walk)
model_box.fit(data_box,x_len_box)
model_handclap.fit(data_handclap,x_len_handclap)
model_jog.fit(data_jog,x_len_jog)
model_run.fit(data_run,x_len_run)
model_handwave.fit(data_wave,x_len_wave)

制作测试集和训练集

In [None]:
# 制作测试集和训练集
train_labels_walk=labels_walk[::2]
train_labels_box=labels_box[::2]
train_labels_handclap=labels_handclap[::2]
train_labels_jog=labels_jog[::2]
train_labels_run=labels_run[::2]
train_labels_wave=labels_wave[::2]
X_train=np.concatenate([X_train_walk,X_train_box,X_train_handclap,X_train_jog,X_train_run,X_train_wave])
y_train=np.concatenate([train_labels_walk,train_labels_box,train_labels_handclap,train_labels_jog,train_labels_run,train_labels_wave])
test_labels_walk=labels_walk[1::2]
test_labels_box=labels_box[1::2]
test_labels_handclap=labels_handclap[1::2]
test_labels_jog=labels_jog[1::2]
test_labels_run=labels_run[1::2]
test_labels_wave=labels_wave[1::2]
X_test=np.concatenate([X_test_walk,X_test_box,X_test_handclap,X_test_jog,X_test_run,X_test_wave])
y_test=np.concatenate([test_labels_walk,test_labels_box,test_labels_handclap,test_labels_jog,test_labels_run,test_labels_wave])

测试训练集的分类准确率

In [None]:
correct_count = 0
total_count = len(X_train)
for i, x in enumerate(X_train):
    # 计算两个类别模型预测的概率
    score=[model_walk.score(x.reshape(-1, 1)),model_box.score(x.reshape(-1, 1)),model_handclap.score(x.reshape(-1, 1)),
           model_jog.score(x.reshape(-1, 1)),model_run.score(x.reshape(-1, 1)),model_handwave.score(x.reshape(-1, 1))]
    
    predicted_label=score.index(max(score))
    if predicted_label == y_train[i]:
        correct_count += 1

# 计算分类器的准确率
train_accuracy = correct_count / total_count
print("分类器在训练集的准确率：", train_accuracy)

测试测试集的分类准确率

In [None]:
# 在测试集上进行预测
correct_count = 0
total_count = len(X_test)
predict_labels_test=[]
for i, x in enumerate(X_test):
    # 计算两个类别模型预测的概率
    score=[model_walk.score(x.reshape(-1, 1)),model_box.score(x.reshape(-1, 1)),model_handclap.score(x.reshape(-1, 1)),
           model_jog.score(x.reshape(-1, 1)),model_run.score(x.reshape(-1, 1)),model_handwave.score(x.reshape(-1, 1))]
    #print(score)           #对数似然值
    predicted_label=score.index(max(score))
    #print(predicted_label)
    predict_labels_test.append(predicted_label)
    if predicted_label == y_test[i]:
        correct_count += 1
# 计算分类器的准确率
test_accuracy = correct_count / total_count
print("分类器在测试集的准确率：", test_accuracy)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns 
from sklearn.metrics import confusion_matrix

conf_matrix = confusion_matrix(y_test, predict_labels_test)

# 绘制热力图
plt.figure(figsize=(16, 12))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=['walk', 'box', 'handclap', 'jog', 'run', 'handwave'], yticklabels=['walk', 'box', 'handclap', 'jog', 'run', 'handwave'])
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title(f'Confusion Matrix in M={num_clusters},N={n_hmm}')
plt.show()

### 实验2

动态时间规划(DTW)

In [None]:
import numpy as np
from collections import defaultdict
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean
from sklearn.metrics import accuracy_score

def dtw_distance(x1, x2):
    # 计算动态时间规整距离
    alignment = fastdtw(x1, x2)[1]
    dtw_dist = 0
    for i, j in alignment:
        dtw_dist += abs(x1[i] - x2[j])
    return dtw_dist

class KNN:
    def __init__(self, k=3):
        self.k = k
    def fit(self, X, y):
        self.X_train = X
        self.y_train = y
    def predict(self, X):
        y_pred = [self._predict(x) for x in X]
        return y_pred
    def _predict(self, x):
        # 计算样本与所有训练数据的DTW距离
        distances = [dtw_distance(x, x_train) for x_train in self.X_train]
        # 根据距离排序，找到K个最近邻的样本
        k_indices = np.argsort(distances)[:self.k]
        k_nearest_labels = [self.y_train[i] for i in k_indices]
        # 计算距离的倒数，作为权重
        weights = [1 / (distances[i] + 1e-6) for i in k_indices]
        # 通过加权投票确定新样本的类别
        weighted_votes = defaultdict(float)
        for label, weight in zip(k_nearest_labels, weights):
            weighted_votes[label] += weight

        most_common = max(weighted_votes, key=weighted_votes.get)
        return most_common

In [None]:
#制作训练集和测试集
data_train=data_all[::2]
data_test=data_all[1::2]
data_hmm=np.concatenate([i for i in data_train]).reshape(-1,1)
frames_train=frames[::2]
frames_test=frames[1::2]
labels_train=labels[::2]
labels_test=labels[1::2]

基于DTW的KNN分类模型对观测序列做分类判别

In [None]:
# 使用手写KNN算法进行分类
k=40
knn_dtw = KNN(k=k)

# 将训练样本和标签转换为 NumPy 数组
labels_train = np.array(labels_train)
train = np.concatenate([X_train_walk, X_train_box, X_train_handclap, X_train_jog, X_train_run, X_train_wave])
test = np.concatenate([X_test_walk, X_test_box, X_test_handclap, X_test_jog, X_test_run, X_test_wave])

# 重新训练 KNN 模型
knn_dtw.fit(train, labels_train)

predictions_train = knn_dtw.predict(train)
predictions = knn_dtw.predict(test)
print("Predictions:", predictions)

### 实验3

训练一个包含所有视频数据的隐马尔可夫模型

In [None]:
n_hmm_=23
n_iter_=350
algorithm='map'
tol=0.001
n_features_=260
model_hmm = hmm.CategoricalHMM(n_components=n_hmm_, n_iter=n_iter_,algorithm=algorithm,verbose=True,tol=tol,init_params=init_params,n_features=n_features_)

In [None]:
#检查序列长度
assert sum(frames_train)==len(data_hmm)

In [None]:
#训练模型
model_hmm.fit(data_hmm,frames_train)

对观测序列进行解码

In [None]:
train_decodered=[]
algorithm_decode='map'
for i, x in enumerate(data_train):    
    train_decodered.append(model_hmm.decode(x.reshape(-1,1),frames_train[i],algorithm=algorithm_decode)[1])
test_decodered=[]
for i, x in enumerate(data_test):    
    test_decodered.append(model_hmm.decode(x.reshape(-1,1),frames_test[i],algorithm=algorithm_decode)[1])

基于DTW的KNN分类模型对解码序列做分类判别

In [None]:
from sklearn.neighbors import KNeighborsClassifier
# 使用手写KNN算法进行分类
k=40
knn = KNeighborsClassifier(k=k)
# 将训练样本和标签转换为 NumPy 数组
train_decodered = np.array(train_decodered)
labels_train = np.array(labels_train)

knn.fit(train_decodered, labels_train)

predictions_train = knn.predict(train_decodered)
predictions = knn.predict(test_decodered)
print("Predictions:", predictions)

计算模型在训练集和测试集的准确率并绘制混淆矩阵

In [None]:
# 绘制混淆矩阵
count=0
total_count=len(predictions_train)
for i,prediction in enumerate(predictions_train):
    if prediction==labels_train[i]:
        count+=1
train_accuracy=count/total_count
print(f"在训练集上的Accuracy: {train_accuracy}")

count=0
total_count=len(predictions)
for i,prediction in enumerate(predictions):
    if prediction==labels_test[i]:
        count+=1
test_accuracy=count/total_count
print(f"在测试集上的Accuracy: {test_accuracy}")

import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
conf_matrix = confusion_matrix(labels_test,predictions)

plt.figure(figsize=(16, 12))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=['walk', 'box', 'handclap', 'jog', 'run', 'handwave'], yticklabels=['walk', 'box', 'handclap', 'jog', 'run', 'handwave'])
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title(f'Confusion Matrix in M={num_clusters_},N={n_hmm_}')
plt.show()