In [47]:
import os.path as osp
from operator import itemgetter
from typing import Optional, Tuple
import tempfile
from collections import deque
import os
import cv2

from mmengine import Config
from mmaction.apis import inference_recognizer, init_recognizer
from mmaction.visualization import ActionVisualizer

# 参数直接在Notebook中定义
config = '../configs/recognition/tsn/tsn_imagenet-pretrained-r50_8xb32-1x1x3-100e_xiandemo-rgb.py'  # 配置文件路径
checkpoint = '../work_dirs/xiandemo_tsn/epoch_100.pth'  # 检查点文件
video = '../data/xian_video_dataset/pre/VID_20240905_104444.mp4'  # 视频文件或帧文件路径
# video = '../data/xian_video_dataset/val/1 (1).mp4'  # 视频文件或帧文件路径
label_file = '../data/xian_video_dataset/label_map.txt'  # 标签文件路径
out_filename = '../data/xian_video_dataset/output_tsn_video.mp4'  # 输出文件路径
fps = 30  # 输出视频的FPS
font_scale = None  # 字体大小
font_color = 'white'  # 字体颜色
target_resolution = (256, 256)  # 目标分辨率（宽，高）
device = 'cuda:0'  # 使用的设备

clip_length = 15  # 每次推理使用的帧数
interval = 1  # 帧间隔
# 从配置文件中加载模型配置
cfg = Config.fromfile(config)

print(f"-------- cfg: {cfg.test_pipeline[1]}")

cfg.test_pipeline[1].num_clips=2
cfg.test_pipeline[1].clip_len=3

print(f"--------- cfg: {cfg.test_pipeline[1]}")

model = init_recognizer(cfg, checkpoint, device=device)

# 获取标签
labels = [x.strip() for x in open(label_file).readlines()]

print(f"labels: {labels}")

# 视频读取
from collections import deque


cap = cv2.VideoCapture(video)
num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"num_frames: {num_frames}")
frame_queue = deque(maxlen=clip_length)

results = []
temp_videos = []  # 存储临时短视频片段路径
for i in range(num_frames):
    ret, frame = cap.read()
    if not ret:
        break

    # 缓存帧用于分段推理
    results = []
    frame_queue.append(frame)
    if len(frame_queue) == clip_length:
        # 临时保存帧队列为视频文件
        temp_dir = '/root/project/research/action/mmaction2/demo_out/temp'
        with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4", dir=temp_dir) as temp_video:
            temp_video_path = os.path.join('./' , temp_video.name)
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            print(f"temp_video_path: {temp_video_path}, temp_video.name: {temp_video.name}")
            video_writer = cv2.VideoWriter(temp_video_path, fourcc, fps, (frame.shape[1], frame.shape[0]))

            for buffered_frame in frame_queue:
                video_writer.write(buffered_frame)
            video_writer.release()
            
        # 推理并保存结果
        pred_result = inference_recognizer(model, temp_video_path)
        pred_scores = pred_result.pred_score.tolist()
        score_sorted = sorted(enumerate(pred_scores), key=itemgetter(1), reverse=True)
        top5_label = score_sorted[:5]
        print(f"top5_label: {top5_label}")
        top_label = labels[score_sorted[0][0]]
        results.append((i, top_label))
        print(f"results: {results}")

        # 重新打开临时视频文件以添加推理结果
        video_writer = cv2.VideoWriter(temp_video_path, fourcc, fps, (frame.shape[1], frame.shape[0]))
        for buffered_frame in frame_queue:
            # 在每一帧中写入识别的动作类别
            cv2.putText(buffered_frame, f"Action: {top_label}", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
            video_writer.write(buffered_frame)
        video_writer.release()
        
        temp_videos.append(temp_video_path)  # 添加至临时视频列表
        # 清空缓冲队列，开始新的片段
        frame_queue.clear()  

cap.release()

-------- cfg: {'type': 'SampleFrames', 'clip_len': 1, 'frame_interval': 1, 'num_clips': 25, 'test_mode': True}
--------- cfg: {'type': 'SampleFrames', 'clip_len': 3, 'frame_interval': 1, 'num_clips': 2, 'test_mode': True}
Loads checkpoint by local backend from path: ../work_dirs/xiandemo_tsn/epoch_100.pth
labels: ['en', 'na', 'si', 'tie', 'zhuan']
num_frames: 551
temp_video_path: /root/project/research/action/mmaction2/demo_out/temp/tmpx7b16j6j.mp4, temp_video.name: /root/project/research/action/mmaction2/demo_out/temp/tmpx7b16j6j.mp4
top5_label: [(3, 0.9619326591491699), (2, 0.037561289966106415), (4, 0.00019449315732344985), (1, 0.00016191926260944456), (0, 0.00014959697728045285)]
results: [(14, 'tie')]
temp_video_path: /root/project/research/action/mmaction2/demo_out/temp/tmphhagaxx1.mp4, temp_video.name: /root/project/research/action/mmaction2/demo_out/temp/tmphhagaxx1.mp4
top5_label: [(2, 0.9983556866645813), (3, 0.0016058186301961541), (1, 1.706915099930484e-05), (0, 1.517859163

In [48]:
# 合并短视频为一个长视频
from moviepy.editor import VideoFileClip, concatenate_videoclips
clips = [VideoFileClip(temp_path) for temp_path in temp_videos]
# print(f"temp_videos: {temp_videos}")

In [49]:
final_video = concatenate_videoclips(clips)
final_video.write_videofile(out_filename, codec="libx264")

Moviepy - Building video ../data/xian_video_dataset/output_tsn_video.mp4.
Moviepy - Writing video ../data/xian_video_dataset/output_tsn_video.mp4



                                                              

Moviepy - Done !
Moviepy - video ready ../data/xian_video_dataset/output_tsn_video.mp4


In [50]:
# 删除临时文件
for temp_path in temp_videos:
    os.remove(temp_path)

In [51]:
# 如果你想在Notebook中显示生成的视频，可以使用以下代码
from IPython.display import Video
Video('../data/xian_video_dataset/output_tsn_video.mp4', width=640, height=480)