+ 用于可视化该数据集，理解标注文件的含义

In [2]:
import cv2
import numpy as np 
from pathlib import Path
import json
from tqdm import tqdm

In [2]:
ann_path = Path("Coffee_room_01/Annotation_files/")

In [3]:
def _int(x, y):
    return int(x), int(y)

In [4]:
def show_videos(ann_path):
    try:
        video_path = ann_path.parents[0].joinpath("Videos")
        cv2.namedWindow("videos", cv2.WINDOW_KEEPRATIO)
        print("Press 'q' to exit this function.")
        for ann in ann_path.glob("*.txt"):
            # print(f"{str(ann)}")
            with ann.open('r') as fd:
                lines = fd.readlines()
                lines = [l.strip('\n') for l in lines]
            fall_frame_start = int(lines[0])  # 跌倒动作起始帧
            fall_frame_end = int(lines[1])    # 跌倒动作结束帧， 跌倒动作后躺地上的帧数
            frame_info = [l.replace(',', ' ') for l in lines[2:]]
            frame_info = np.loadtxt(frame_info)   

            video_file = video_path.joinpath(ann.stem + ".avi")
            cap = cv2.VideoCapture(str(video_file))
            frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)

            # fall_direction 是跌倒方向，顺时针从1-8, 1是正北(默认值), 2是东北, 3是正东, ....
            for frame_id, fall_direction, x1, y1, x2, y2 in frame_info:
                if cap.isOpened() and frame_id < frame_count:
                    _, img = cap.read()

                    if img is None:
                        continue
                    img = cv2.rectangle(img, _int(x1, y1), _int(x2, y2), color=(0, 255, 0), thickness=1)
                    text = f"{frame_id:2.0f}, {fall_direction:2.0f}"
                    img = cv2.putText(img, text, (14, 17), cv2.FONT_HERSHEY_SIMPLEX, 0.5,  color=(0, 255, 0), thickness=1)
                    
                    if fall_frame_start <= frame_id <= fall_frame_end:
                        img = cv2.putText(img, "falling", (14, 34), cv2.FONT_HERSHEY_SIMPLEX, 0.5,  color=(0, 0, 255), thickness=1)
                    elif fall_direction == 1:
                        img = cv2.putText(img, "normal", (14, 34), cv2.FONT_HERSHEY_SIMPLEX, 0.5,  color=(0, 255, 0), thickness=1)
                    elif fall_direction != 1 or frame_id > fall_frame_end:
                        img = cv2.putText(img, "faint", (14, 34), cv2.FONT_HERSHEY_SIMPLEX, 0.5,  color=(255, 255, 0), thickness=1)
                        
                    cv2.imshow("videos", img)
                    key = cv2.waitKey(1)
                    if key & 0XFFFF == ord('q'):
                        return
            cap.release()
    except ValueError: 
        print(f"value error: ann_file => {ann}")
    finally:
        cv2.destroyAllWindows()
        


In [5]:
show_videos(ann_path=ann_path)

Press 'q' to exit this function.


尝试使用sklearn进行分类

In [4]:
import json
from pathlib import Path

def load_json(json_file):
    with open(json_file, 'r') as f:
        json_dict = json.load(f)
    return json_dict

def save_json(json_obj, json_file):
    with open(json_file, 'w') as f:
        json.dump(json_obj, f, indent=4)

In [5]:
ann_dict = load_json("FallDataset/Coffee_room_01/annotations/video (1).json")
print(f"{ann_dict['info']}")

{'label': 'normal 0 | falling 1 | faint 2', 'fall_direction': 'int 1~8', 'bbox': 'x1, y1, w, h', 'keypoints': 'x1, y1, v1, ...., x12, y12, v12', 'keypoints_order': ['left_shoulder', 'right_shoulder', 'left_elbow', 'right_elbow', 'left_wrist', 'right_wrist', 'left_hip', 'right_hip', 'left_knee', 'right_knee', 'left_ankle', 'right_ankle'], 'skeleton': [[0, 1], [6, 7], [0, 2], [1, 3], [2, 4], [3, 5], [0, 6], [1, 7], [6, 8], [7, 9], [8, 10], [9, 11]]}


In [6]:
def get_xy_from_json(json_file):
    ann_dict = load_json(str(json_file))
    kpts, bboxes, labels = [], [], []
    for ann in ann_dict['annotations']:
        kpt = ann['keypoints']
        bbox = ann['bbox']
        if kpt != [] and bbox != [] \
            and bbox[2] > 0 and bbox[3] > 0:
            kpts.append(kpt)
            bboxes.append(bbox)
            labels.append(ann['label'])

    num_frame = len(kpts)
    bboxes = np.array(bboxes)
    w, h = bboxes[:, 2], bboxes[:, 3]
    ratio = w / h  # w/h

    kpts = np.array(kpts).reshape((num_frame, -1, 3))
    kpts[:, :, 0] /= w[:, None] 
    kpts[:, :, 1] /= h[:, None] 

    x = np.concatenate([kpts.reshape((num_frame, -1)), ratio[:, None]], axis=1)
    y = np.array(labels)
    
    return x, y

加载标注文件，得到输入X和输出Y

In [7]:
def load_data(annotations_dirs):
    if not isinstance(annotations_dirs, list):
        annotations_dirs = [annotations_dirs]
    X, Y = [], []
    for ann_dir in tqdm(annotations_dirs):
        ann_dir = Path(ann_dir)
        x, y = [], []
        for json_file in ann_dir.glob("*.json"):
            _x, _y = get_xy_from_json(json_file)
            x.append(_x)
            y.append(_y)
        X.append(np.concatenate(x, axis=0))
        Y.append(np.concatenate(y, axis=0))
    return X, Y

In [8]:
X, Y = load_data(annotations_dirs=[
    "FallDataset/Coffee_room_01/annotations",
    "FallDataset/Coffee_room_02/annotations",
    "FallDataset/Home_01/annotations",
    "FallDataset/Home_02/annotations",
])

100%|██████████| 4/4 [00:02<00:00,  1.77it/s]


分析各类别样本数和分布

In [9]:
def count_class_samples(Y, num_class=3):
    num_all = []
    if isinstance(Y, list):
        for y in Y:
            num = []
            for i in range(num_class):
                num.append(sum(y==i))
            num_all.append(num)
    else:
        for i in range(num_class):
            num_all.append(sum(Y==i))
    return np.array(num_all)

num_samples = count_class_samples(Y)
num_samples

array([[ 8780,  1394,  4001],
       [10362,   411,  1814],
       [ 2542,   418,  3006],
       [ 4828,    93,  1073]])

In [10]:
# 每个视频目录的类别比例
percentage_per_dir = num_samples / num_samples.sum(axis=1)[:, None]
percentage_per_dir

array([[0.61940035, 0.09834215, 0.2822575 ],
       [0.82323032, 0.03265274, 0.14411695],
       [0.42608113, 0.07006369, 0.50385518],
       [0.80547214, 0.01551552, 0.17901235]])

In [11]:
# 总的类别比例
num_samples_all = num_samples.sum(axis=0)
percentage_all = num_samples_all / num_samples_all.sum()
num_samples_all, percentage_all

(array([26512,  2316,  9894]), array([0.68467538, 0.05981096, 0.25551366]))

得到总的输入样本和输出样本

In [12]:
x_all = np.concatenate(X, axis=0)
y_all = np.concatenate(Y, axis=0)

# O、划分数据集（train/test）

In [13]:
from sklearn.model_selection import train_test_split
from sklearn import datasets
import pandas as pd

1、 简单随机抽样
+ 在所有样本中按比例随机抽取，有小概率导致划分结果的类别分布变化较大。

In [None]:
x_train, x_test, y_train, y_test = train_test_split(x_all, y_all, test_size=0.3)
[data.shape for data in [x_train, x_test, y_train, y_test]], pd.value_counts(y_train)

2、分层抽样
+ 分别在各类别中按比例划分，确保划分数据集前后，类别分布基本不变。

In [14]:
x_train, x_test, y_train, y_test = train_test_split(x_all, y_all, test_size=0.3, stratify=y_all)
[data.shape for data in [x_train, x_test, y_train, y_test]], pd.value_counts(y_train)

([(27105, 37), (11617, 37), (27105,), (11617,)],
 0    18558
 2     6926
 1     1621
 dtype: int64)

# 一、过采样策略——数据不平衡

In [16]:
# 通过采样策略解决数据样本不均衡问题
from imblearn.over_sampling import RandomOverSampler, SMOTE, BorderlineSMOTE, ADASYN
num_train = count_class_samples(y_train)   # 各类别的训练样本数
num_original = {i:int(num_train[i]) for i in range(3)} # 总数据集各类别样本数
sampling_strategy  = dict()
max_samples = max(num_train)
over_ratio = 0.8  # 过采样比例， 将少数类的样本提高到 over_ratio * max_samples
for i, num in enumerate(num_train):
    sampling_strategy[i] = int(num) if num == max_samples else int(max_samples * over_ratio)
print(f"over sampler: {num_original} => {sampling_strategy}")

over sampler: {0: 18558, 1: 1621, 2: 6926} => {0: 18558, 1: 14846, 2: 14846}


## 1、随机过采样
+ 设置随机数种子random_state为常数，使得采样过程可重现
+ sampling_strategy={0: 26512, 1: 15000, 2: 15000} => 生成指定样本数数据集

In [17]:
ros = RandomOverSampler(sampling_strategy=sampling_strategy, random_state=0)
x_ros, y_ros = ros.fit_resample(x_train, y_train)
count_class_samples(y_ros)

array([18558, 14846, 14846])

## 2、少数类的插值采样

In [18]:
smo = SMOTE(sampling_strategy=sampling_strategy, random_state=0)
x_smo, y_smo = smo.fit_resample(x_train, y_train)
y_smo.shape, y_train.shape, 

((48250,), (27105,))

## 3、边界类样本采样

In [19]:
bsmo = BorderlineSMOTE(kind='borderline-1', sampling_strategy=sampling_strategy,
                       random_state=42)   # 'borderline-1', 'borderline-2'
x_bsmo, y_bsmo = bsmo.fit_resample(x_train, y_train)
y_bsmo.shape, y_train.shape

((48250,), (27105,))

## 4、自适应合成抽样

In [20]:
ana = ADASYN(sampling_strategy=sampling_strategy, random_state=0)
x_ana, y_ana = ana.fit_resample(x_train, y_train)
y_ana.shape, y_train.shape

((48337,), (27105,))

# 二、欠采样策略——数据不平衡
+ 对样本数较多的类别，随机丢弃一部分样本，使其样本数解决样本数少的类别，从而达到数据平衡
+ 缺点：没有充分利用多数类的样本信息，如果强行欠采样到与最少样本类别一致，可能会丢失大量的样本。

In [21]:
from imblearn.under_sampling import ClusterCentroids, RandomUnderSampler, NearMiss

1、 随机丢弃一部分样本
+ sampling_strategy 为默认值 'auto'时， 将所有类别的样本数欠采样到与样本数最少的类别相同。

In [22]:
rus = RandomUnderSampler(sampling_strategy='auto', random_state=0)
x_rus, y_rus = rus.fit_resample(x_train, y_train)
y_rus.shape, count_class_samples(y_rus), count_class_samples(y_train)

((4863,), array([1621, 1621, 1621]), array([18558,  1621,  6926]))

2、根据 k-means 中心生成
+ 计算量比较大，生成时间较长

In [23]:
cc = ClusterCentroids(sampling_strategy='auto', random_state=0)
x_cc, y_cc = cc.fit_resample(x_train, y_train)
count_class_samples(y_cc), count_class_samples(y_train)

(array([1621, 1621, 1621]), array([18558,  1621,  6926]))

3、根据邻近样本规则进行下采样，含三种方式

In [24]:
nm = NearMiss(sampling_strategy='auto', version=1)
x_nm, y_nm = nm.fit_resample(x_train, y_train)
count_class_samples(y_nm), count_class_samples(y_train)

(array([1621, 1621, 1621]), array([18558,  1621,  6926]))

# 三、获取分类结果

1、定义分类器

In [25]:
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.gaussian_process.kernels import RBF
from sklearn.neural_network import MLPClassifier
from sklearn import svm

class Classifer:
    def __init__(self, clf_type='svm', optimizer='sgd', hidden_layer_sizes=(64,), activation='relu'):
        """
        clf_type = 'svm' | 'gpc' | 'mlp'
        optimizer = 'lbfgs' | 'sgd' | 'adam'
        activation = 'logistic' | 'relu' | 'tanh' | 'identity'
        """
        assert clf_type.lower() in ['svm', 'gpc', 'mlp']
        if clf_type.lower() == 'svm':
            self.clf = svm.SVC()
        elif clf_type.lower() == 'mlp':
            self.clf = MLPClassifier(solver=optimizer,  # 'lbfgs', 'sgd', 'adam'
                    alpha=1e-5,
                    hidden_layer_sizes=hidden_layer_sizes,
                    activation=activation,  # logistic, relu, tanh, identity
                    random_state=1)
            # print(f"MLP layers => {[layer.shape for layer in self.clf.coefs_]}")
        else:
            kernel = 1. * RBF(1.)
            self.clf = GaussianProcessClassifier(kernel=kernel, random_state=0)
    
    def train_test_split(self, x, y, test_size=0.3, stratify=None, verbose=False):
        """
        x, y (array-like): 是要被划分的输入和输出
        test_size (float): 测试集的比例, default to 0.3
        stractify (array-like): default to None 采用随机抽样, 如果 stractify = y, 采用分层抽样
        verbose: 显示划分信息
        """
        data = train_test_split(x_all, y_all, test_size=0.3, stratify=y_all)
        self.x_train, self.x_test, self.y_train, self.y_test = data
        if verbose:
            print(f"{self.x_train.shape=}\t{self.y_train.shape=}")
            print(f"{self.x_test.shape=}\t{self.y_test.shape=}")
            print(f"train: the number of each class =>\n{pd.value_counts(self.y_train)}")
            print(f"test: the number of each class =>\n{pd.value_counts(self.y_test)}")

    def fit(self, x=None, y=None):
        """训练分类器, 拟合训练数据"""
        if x is None or y is None:
            self.clf.fit(self.x_train, self.y_train)
        else:
            self.clf.fit(x, y)
    
    def scores(self, x=None, y=None, num_class=3):
        """评估模型性能, 得到准确率"""
        x_test = x if x is not None else self.x_test
        y_test = y if y is not None else self.y_test
        
        s_all = round(self.clf.score(x_test, y_test), 3)
        scores=dict()
        for i in range(num_class):
            sample_weight = np.zeros_like(y_test)
            sample_weight[y_test == i] = 1
            si = round(self.clf.score(x_test, y_test, sample_weight), 3)
            scores[i] = si
            print(f"score of class {i:3d} = {si}")
        print(f"total score = {s_all}")
        return s_all, scores

2、统计数据集

In [28]:
train_data = dict(
    original_data=(x_train, y_train),           # 原始数据集
    o_RandomOverSampler=(x_ros, y_ros),     # 随机过采样
    o_SMOTE=(x_smo, y_smo),                 # SMOTE 插值过采样
    o_BorderlineSMOTE=(x_bsmo, y_bsmo),     # 边界SMOTE插值过采样
    o_ADASYN=(x_ana, y_ana),                # 自适应合成过采样
    u_RandomUnderSampler=(x_rus, y_rus),    # 随机欠采样
    u_ClusterCentroids=(x_cc, y_cc),        # 聚类中心欠采样
    u_NearMiss=(x_nm, y_nm),                # 最近邻欠采样
)

2、SVM

In [51]:
svm_results = dict()  
for data_name, dt in train_data.items(): 
    clf = Classifer(clf_type='svm')  
    print(f"{data_name=}".center(50, '-'))  
    # clf.train_test_split(dt[0], dt[1], verbose=True)   # 划分训练集和测试集
    # clf.fit() 
    # svm_results[data_name] = clf.scores() 
    clf.fit(dt[0], dt[1]) 
    svm_results[data_name] = clf.scores(x_test, y_test) 

------------data_name='original_data'-------------
self.x_train.shape=(27105, 37)	self.y_train.shape=(27105,)
self.x_test.shape=(11617, 37)	self.y_test.shape=(11617,)
train: the number of each class =>
0    18558
2     6926
1     1621
dtype: int64
test: the number of each class=>
0    7954
2    2968
1     695
dtype: int64
score of class   0 = 0.996
score of class   1 = 0.131
score of class   2 = 0.686
total score = 0.865
---------data_name='o_RandomOverSampler'----------
self.x_train.shape=(27105, 37)	self.y_train.shape=(27105,)
self.x_test.shape=(11617, 37)	self.y_test.shape=(11617,)
train: the number of each class =>
0    18558
2     6926
1     1621
dtype: int64
test: the number of each class=>
0    7954
2    2968
1     695
dtype: int64
score of class   0 = 0.997
score of class   1 = 0.112
score of class   2 = 0.696
total score = 0.868
---------------data_name='o_SMOTE'----------------
self.x_train.shape=(27105, 37)	self.y_train.shape=(27105,)
self.x_test.shape=(11617, 37)	self.y_tes

3、MLP

In [30]:
mlp_results = dict()
for data_name, dt in data.items():  
    clf = Classifer(clf_type='mlp', 
                optimizer='sgd', 
                hidden_layer_sizes=(64,), 
                activation='relu')  
    print(f"{data_name=}".center(50, '-'))  
    # clf.train_test_split(dt[0], dt[1], verbose=True)   # 划分训练集和测试集
    # clf.fit()  
    # mlp_results[data_name] = clf.scores()  
    clf.fit(dt[0], dt[1]) 
    mlp_results[data_name] = clf.scores(x_test, y_test) 

------------data_name='original_data'-------------




score of class   0 = 0.987
score of class   1 = 0.292
score of class   2 = 0.774
total score = 0.891
---------data_name='o_RandomOverSampler'----------




score of class   0 = 0.92
score of class   1 = 0.866
score of class   2 = 0.773
total score = 0.879
---------------data_name='o_SMOTE'----------------




score of class   0 = 0.899
score of class   1 = 0.832
score of class   2 = 0.813
total score = 0.873
----------data_name='o_BorderlineSMOTE'-----------




score of class   0 = 0.874
score of class   1 = 0.875
score of class   2 = 0.592
total score = 0.802
---------------data_name='o_ADASYN'---------------




score of class   0 = 0.879
score of class   1 = 0.845
score of class   2 = 0.698
total score = 0.831
---------data_name='u_RandomUnderSampler'---------




score of class   0 = 0.919
score of class   1 = 0.714
score of class   2 = 0.502
total score = 0.8
----------data_name='u_ClusterCentroids'----------




score of class   0 = 0.868
score of class   1 = 0.788
score of class   2 = 0.378
total score = 0.738
--------------data_name='u_NearMiss'--------------
score of class   0 = 0.814
score of class   1 = 0.679
score of class   2 = 0.48
total score = 0.72




4、GPC
+ 内存占用太大，一跑就死机

In [31]:
# gpc_results = dict()
# for data_name, dt in data.items():
#     clf = Classifer(clf_type='gpc')  
#     print(f"{data_name=}".center(50, '-'))
#     # clf.train_test_split(dt[0], dt[1], verbose=True)   # 划分训练集和测试集
#     # clf.fit()
#     # gpc_results[data_name] = clf.scores()
#     clf.fit(dt[0], dt[1]) 
#     gpc_results[data_name] = clf.scores(x_test, y_test) 

------------data_name='original_data'-------------
