In [8]:
import tqdm
import random
import pathlib
import itertools
import collections

import cv2
import einops
import numpy as np
import remotezip as rz
import seaborn as sns
import matplotlib.pyplot as plt

import tensorflow as tf
from sklearn.model_selection import train_test_split
import keras
from keras import layers

In [None]:
# 定义生成的每一帧的尺寸
HEIGHT = 224  # 高度为224像素
WIDTH = 224   # 宽度为224像素
class Conv2Plus1D(keras.layers.Layer):
    def __init__(self, filters, kernel_size, padding):
        """
        一个卷积层的组合，首先对空间维度进行卷积操作，
        然后对时间维度进行卷积操作。
        """
        super().__init__()
        self.seq = keras.Sequential([  
            # 空间维度分解卷积
            layers.Conv3D(filters=filters,
                          kernel_size=(1, kernel_size[1], kernel_size[2]),  # 只对高度和宽度进行卷积
                          padding=padding),
            # 时间维度分解卷积
            layers.Conv3D(filters=filters, 
                          kernel_size=(kernel_size[0], 1, 1),  # 只对时间步长进行卷积
                          padding=padding)
        ])

    def call(self, x):
        # 执行序列化卷积操作
        return self.seq(x)
class ResidualMain(keras.layers.Layer):
    """
    模型中的残差模块，包含卷积、层归一化和 ReLU 激活函数。
    """
    def __init__(self, filters, kernel_size):
        super().__init__()
        self.seq = keras.Sequential([
            # 第一个卷积层
            Conv2Plus1D(filters=filters,
                        kernel_size=kernel_size,
                        padding='same'),
            # 层归一化
            layers.LayerNormalization(),
            # ReLU 激活函数
            layers.ReLU(),
            # 第二个卷积层
            Conv2Plus1D(filters=filters, 
                        kernel_size=kernel_size,
                        padding='same'),
            # 层归一化
            layers.LayerNormalization()
        ])

    def call(self, x):
        # 执行序列化的操作
        return self.seq(x)
class Project(keras.layers.Layer):
    """
    通过不同大小的过滤器和下采样，对张量的某些维度进行投影处理。
    """
    def __init__(self, units):
        super().__init__()
        self.seq = keras.Sequential([
            # 全连接层（投影操作）
            layers.Dense(units), 
            # 层归一化
            layers.LayerNormalization()
        ])

    def call(self, x):
        # 执行顺序操作
        return self.seq(x)

class ResizeVideo(keras.layers.Layer):
    def __init__(self, height, width):
        """
        初始化视频尺寸调整层。

        Args:
            height: 调整后的高度。
            width: 调整后的宽度。
        """
        super().__init__()
        self.height = height
        self.width = width
        # 使用 Keras 的 Resizing 层来调整尺寸
        self.resizing_layer = layers.Resizing(self.height, self.width)

    def call(self, video):
        """
        调整视频张量的尺寸。

        Args:
            video: 表示视频的张量，形状为 (batch, time, height, width, channels)。

        Returns:
            调整为新高度和宽度的视频张量。
        """
        # 解析视频的原始形状：b 表示批次大小，t 表示时间步，h 和 w 表示高度和宽度，c 表示通道数
        old_shape = einops.parse_shape(video, 'b t h w c')

        # 将视频重新排列为单张图像的形式，合并批次和时间维度
        images = einops.rearrange(video, 'b t h w c -> (b t) h w c')

        # 调整每一帧的尺寸
        images = self.resizing_layer(images)

        # 将调整后的图像重新排列为视频的形式，分离批次和时间维度
        videos = einops.rearrange(
            images, '(b t) h w c -> b t h w c',
            t=old_shape['t']
        )
        return videos

input_shape = (None, 10, HEIGHT, WIDTH, 3)  # 输入视频的形状，None 表示批次大小不固定
input = layers.Input(shape=(input_shape[1:]))  # 定义输入层，形状为 (时间步数, 高度, 宽度, 通道数)
x = input

# 初始卷积层：执行 2+1D 卷积操作（空间 + 时间分解）
x = Conv2Plus1D(filters=16, kernel_size=(3, 7, 7), padding='same')(x)
x = layers.BatchNormalization()(x)  # 批量归一化层，规范化每批次的特征
x = layers.ReLU()(x)  # 激活函数 ReLU
x = ResizeVideo(HEIGHT // 2, WIDTH // 2)(x)  # 调整视频帧的尺寸到 (HEIGHT/2, WIDTH/2)

# Block 1: 添加第一个残差块并调整尺寸
x = add_residual_block(x, 16, (3, 3, 3))  # 添加残差块，过滤器数为 16，卷积核大小为 3x3x3
x = ResizeVideo(HEIGHT // 4, WIDTH // 4)(x)  # 调整尺寸到 (HEIGHT/4, WIDTH/4)

# Block 2: 添加第二个残差块并调整尺寸
x = add_residual_block(x, 32, (3, 3, 3))  # 过滤器数为 32
x = ResizeVideo(HEIGHT // 8, WIDTH // 8)(x)  # 调整尺寸到 (HEIGHT/8, WIDTH/8)

# Block 3: 添加第三个残差块并调整尺寸
x = add_residual_block(x, 64, (3, 3, 3))  # 过滤器数为 64
x = ResizeVideo(HEIGHT // 16, WIDTH // 16)(x)  # 调整尺寸到 (HEIGHT/16, WIDTH/16)

# Block 4: 添加第四个残差块
x = add_residual_block(x, 128, (3, 3, 3))  # 过滤器数为 128

# 全局平均池化和分类
x = layers.GlobalAveragePooling3D()(x)  # 对时间、空间维度进行全局平均池化，生成特征向量
x = layers.Flatten()(x)  # 展平为 1D 向量
x = layers.Dense(10)(x)  # 全连接层输出 10 个分类

# 定义模型
model = keras.Model(input, x)

# 从训练数据集中获取一个批次的数据
frames, label = next(iter(train_ds))

# 通过 build 方法将模型与输入张量关联，用于可视化或调试
model.build(frames)

# 使用 Keras 提供的工具绘制模型结构
keras.utils.plot_model(
    model,               # 目标模型
    expand_nested=True,  # 展开嵌套的层，例如子模块或自定义层
    dpi=60,              # 设置图片分辨率
    show_shapes=True     # 显示每一层输出的形状
)


# 编译模型
model.compile(
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),  # 使用稀疏分类交叉熵作为损失函数
    optimizer=keras.optimizers.Adam(learning_rate=0.0001),              # 优化器为 Adam，学习率设置为 0.0001
    metrics=['accuracy']                                               # 评估指标为准确率
)

# 训练模型
history = model.fit(
    x=train_ds,          # 训练数据集
    epochs=50,           # 训练的迭代次数（轮数）
    validation_data=val_ds  # 验证数据集，用于评估模型在训练过程中未见数据上的表现
)


In [3]:
import os

# 指定文件夹路径
folder_path = r"C:\data\video"

# 获取该路径下的所有文件夹
folders = [f for f in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, f))]

# 输出所有文件夹名称
print(folders)


['0-两手托天理三焦（八段锦）', '1-左右开弓似射雕（八段锦）', '10鹿抵（五禽戏）', '11鹿奔（五禽戏）', '12鸟伸（五禽戏）', '13鸟飞（五禽戏）', '14其他', '2-调理脾胃单臂举（八段锦）', '3-五劳七伤往后瞧（八段锦）', '4-摇头摆尾去心火（八段锦）', '5-两手攀足固肾腰（八段锦）', '6-攒拳怒目增气力（八段锦）', '7背后七颠百病消（八段锦）', '8虎举（五禽戏）', '9虎扑（五禽戏）']


In [9]:
import os
import cv2
import numpy as np



def load_video_frames(video_path, frame_size=(224, 224)):
    """
    读取视频并将其转换为一系列帧。
    
    :param video_path: 视频文件的路径
    :param frame_size: 每帧的大小
    :return: 处理后的帧数据 (frame_count, height, width, channels)
    """
    cap = cv2.VideoCapture(video_path)
    frames = []
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        # 调整每帧的大小
        frame = cv2.resize(frame, frame_size)
        frames.append(frame)
    cap.release()
    
    # 转换为NumPy数组
    frames = np.array(frames)
    return frames

def create_dataset(video_dir, frame_size=(224, 224)):
    """
    从指定的目录加载视频文件并为每个视频分配标签
    
    :param video_dir: 存放视频的根目录
    :param frame_size: 每帧的大小
    :return: 视频数据、标签数据
    """
    videos = []
    labels = []
    
    # 获取所有标签（即文件夹名称）
    labels_name = os.listdir(video_dir)
    
    # 遍历每个标签文件夹
    for label_idx, label_name in enumerate(labels_name):
        label_path = os.path.join(video_dir, label_name)
        
        # 如果是文件夹
        if os.path.isdir(label_path):
            # 获取文件夹下的所有视频文件
            video_files = [f for f in os.listdir(label_path) if f.endswith('.mp4')]  # 这里假设视频文件是.mp4格式
            
            for video_file in video_files:
                video_path = os.path.join(label_path, video_file)
                frames = load_video_frames(video_path, frame_size)
                
                if frames.size > 0:  # 如果视频有帧
                    videos.append(frames)  # 保存视频帧
                    labels.append(label_idx)  # 保存标签，label_idx即为当前文件夹的编号（从0开始）
    
    return np.array(videos), np.array(labels)

# 加载数据集
video_dir = r'C:\data\video'
videos, labels = create_dataset(video_dir)

# 打印数据集信息
print(f"视频样本数量: {len(videos)}")
print(f"每个视频的帧数: {videos[0].shape[0] if len(videos) > 0 else '无'}")
print(f"每个帧的形状: {videos[0].shape[1:] if len(videos) > 0 else '无'}")


KeyboardInterrupt: 