In [2]:
import os
import cv2
import torch
from torch.utils.data import Dataset, DataLoader

## 导入文件

### 文件名获取

In [3]:
def load_dataset_files(data_path, begin, end):
    """ 获取 1~10 文件夹下以 M_ 开头的视频文件和对应的标注文件 """


    matched_files = []

    # 遍历 1~10 的文件夹
    for i in range(begin, end + 1):  # 修改为 end + 1，确保包含 'end'
        folder_path = os.path.join(data_path, str(i))  # 构造文件夹路径

        if not os.path.isdir(folder_path):  # 确保该路径是文件夹
            continue

        # 筛选文件夹中的文件并构建标注文件字典
        txt_files = {os.path.basename(txt).replace(".txt", ""): txt for txt in os.listdir(folder_path) if txt.endswith(".txt")}

        for file in os.listdir(folder_path):
            if file.startswith("M_") and file.lower().endswith('.mp4'):  # 只筛选 M_ 开头的 mp4 文件
                video_path = os.path.join(folder_path, file)
                #video_files.append(video_path)

                base_name = file.replace("M_", "").replace(".MP4", "")  # 去掉 M_ 前缀并去掉后缀
                annotation_file_name = base_name + ".txt"  # 得到相应的 txt 文件名
                 # 拼接绝对路径
                annotation_file_path = os.path.join(folder_path, annotation_file_name)
                
                # 检查文件是否存在
                if os.path.exists(annotation_file_path):
                    matched_files.append((video_path, annotation_file_path))
                else:
                    print(f"{video_path} 视频未找到相对应的文本文件")  # 如果没有对应的标注文件，加入 None

    print(f"找到 {len(matched_files)} 对（视频-文本）文件对")
    return matched_files

In [4]:
matched_files = load_dataset_files(r"D:\ESD\test",1,5)
for char in matched_files:
    print(char)
    print(r"\n")

找到 10 对（视频-文本）文件对
('D:\\ESD\\test\\1\\M_20230822145107_U2291907_1_001_0001-01.MP4', 'D:\\ESD\\test\\1\\20230822145107_U2291907_1_001_0001-01.txt')
\n
('D:\\ESD\\test\\1\\M_20230822145107_U2291907_1_001_0002-01.MP4', 'D:\\ESD\\test\\1\\20230822145107_U2291907_1_001_0002-01.txt')
\n
('D:\\ESD\\test\\2\\M_20230824101337_U2464637_1_001_0001-01.MP4', 'D:\\ESD\\test\\2\\20230824101337_U2464637_1_001_0001-01.txt')
\n
('D:\\ESD\\test\\2\\M_20230824101337_U2464637_1_001_0002-01.MP4', 'D:\\ESD\\test\\2\\20230824101337_U2464637_1_001_0002-01.txt')
\n
('D:\\ESD\\test\\3\\M_20230828105533_U2802633_1_001_0001-01.MP4', 'D:\\ESD\\test\\3\\20230828105533_U2802633_1_001_0001-01.txt')
\n
('D:\\ESD\\test\\3\\M_20230828105533_U2802633_1_001_0002-01.MP4', 'D:\\ESD\\test\\3\\20230828105533_U2802633_1_001_0002-01.txt')
\n
('D:\\ESD\\test\\4\\M_20230829124054_U2997853_1_001_0001-01.MP4', 'D:\\ESD\\test\\4\\20230829124054_U2997853_1_001_0001-01.txt')
\n
('D:\\ESD\\test\\4\\M_20230829124054_U2997853_1_001_0002-0

In [17]:
def load_annotation_file(annotation_file_path):
    """ 打开标注文件并获取标注信息（假设包含 Frame 和 Phase） """
    annotations = []
    try:
        # 打开并读取标注文件
        with open(annotation_file_path, 'r') as file:
            lines = file.readlines()

            # 跳过表头（如果有的话）
            if lines[0].startswith("Frame"):
                lines = lines[1:]

            # 遍历每一行，提取标注信息
            for line in lines:
                # 去除行尾换行符，并按空格分隔
                parts = line.strip().split()

                if len(parts) == 2:  # 确保每行包含 2 部分 (Frame 和 Phase)
                    try:
                        frame = int(parts[0])  # 帧号
                        phase = parts[1]  # 阶段

                        # 将解析后的数据保存为字典
                        annotations.append({
                            'frame': frame,
                            'phase': phase
                        })
                    except ValueError:
                        print(f"⚠️ 无法解析标注文件中的行: {line}")
                else:
                    print(f"⚠️ 标注文件格式不正确: {line}")

    except FileNotFoundError:
        print(f"⚠️ 找不到文件: {annotation_file_path}")
    except Exception as e:
        print(f"⚠️ 打开标注文件时发生错误: {e}")
    if annotations:
        for i in range(1, len(annotations)):
            prev_frame = annotations[i - 1]['frame']
            curr_frame = annotations[i]['frame']
            if curr_frame != prev_frame + 1:
                print(f"⚠️ 帧号不连续: {prev_frame} -> {curr_frame}")
    return annotations

In [18]:
annotations = load_annotation_file(r"D:\\ESD\\test\\1\\20230822145107_U2291907_1_001_0001-01.txt")

In [19]:
length = len(annotations)
print(length)

35250


In [10]:
def frams_check_in(video_path):
    """ 对齐标注帧数和实际帧数 """
    
    # 打开视频文件
    cap = cv2.VideoCapture(video_path)
    
    if not cap.isOpened():
        print(f"无法打开视频文件: {video_path}")
        return None, None
    
    # 获取视频的帧率 (fps) 和总帧数
    fps = cap.get(cv2.CAP_PROP_FPS)  # 获取帧率
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))  # 获取视频总帧数
    
    cap.release()  # 释放视频文件资源
    
    return fps, total_frames


In [12]:
video_path = r"D:\\ESD\\test\\1\\M_20230822145107_U2291907_1_001_0001-01.MP4"
fps, total_frames = frams_check_in(video_path)

if fps is not None and total_frames is not None:
    print(f"视频帧率: {fps} FPS")
    print(f"视频总帧数: {total_frames} 帧")

视频帧率: 50.0 FPS
视频总帧数: 40060 帧


In [37]:
def annotations_prepare_frams(matched_files):
    for video_path, annotation_file_path in matched_files:
        print(f"处理视频文件: {video_path}")
        print(f"处理标注文件: {annotation_file_path}")

        # 解析标注文件
        annotations = []
        try:
            # 打开并读取标注文件
            with open(annotation_file_path, 'r') as file:
                lines = file.readlines()

                # 跳过表头
                if lines[0].startswith("Frame"):
                    lines = lines[1:]

                # 遍历每一行，提取标注信息
                for line in lines:
                    # 去除行尾换行符，并按空格分隔
                    parts = line.strip().split()

                    if len(parts) == 2:  # 确保每行包含 2 部分 (Frame 和 Phase)
                        try:
                            frame = int(parts[0])  # 帧号
                            phase = parts[1]  # 阶段

                            # 将解析后的数据保存为字典
                            annotations.append({
                                'frame': frame,
                                'phase': phase
                            })
                        except ValueError:
                            print(f"⚠️ 无法解析标注文件中的行: {line}")
                    else:
                        print(f"⚠️ 标注文件格式不正确: {line}")

        except FileNotFoundError:
            print(f"⚠️ 找不到文件: {annotation_file_path}")
            continue
        except Exception as e:
            print(f"⚠️ 打开标注文件时发生错误: {e}")
            continue

        # 输出标注数目
        print(f"标注的数目为: {len(annotations)}")

        # 打开视频文件
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            print(f"无法打开视频文件: {video_path}")
            continue

        # 获取视频的帧率 (fps) 和总帧数
        fps = cap.get(cv2.CAP_PROP_FPS)  # 获取帧率
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))  # 获取视频总帧数
        print(f"视频帧率: {fps}")
        print(f"视频总帧数: {total_frames}")


        print("\n")
        # 释放视频文件资源
        cap.release()

In [None]:
annotations_prepare_frams(matched_files)

In [20]:
import cv2
import random

def parse_annotations(annotation_file_path):
    annotations = []
    try:
        with open(annotation_file_path, 'r') as file:
            lines = file.readlines()
            if lines[0].startswith("Frame"):
                lines = lines[1:]
            for line in lines:
                parts = line.strip().split()
                if len(parts) == 2:
                    try:
                        frame = int(parts[0])  # 读取帧号
                        phase = parts[1]       # 读取阶段
                        annotations.append({
                            'frame': frame,
                            'phase': phase
                        })
                    except ValueError:
                        print(f"⚠️ 无法解析标注文件中的行: {line}")
                else:
                    print(f"⚠️ 标注文件格式不正确: {line}")
    except FileNotFoundError:
        print(f"⚠️ 找不到文件: {annotation_file_path}")
    except Exception as e:
        print(f"⚠️ 读取标注文件时发生错误: {e}")
    return annotations

def annotations_prepare_frames(matched_files):
    for video_path, annotation_file_path in matched_files:
        print(f"处理视频文件: {video_path}")
        print(f"处理标注文件: {annotation_file_path}")

        # 解析标注文件
        annotations = parse_annotations(annotation_file_path)
        print(f"标注数量: {len(annotations)}")

        # 打开视频文件，获取总帧数
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            print(f"⚠️ 无法打开视频文件: {video_path}")
            continue
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        cap.release()
        print(f"视频总帧数: {total_frames}")

        # 过滤掉超出视频总帧数的标注
        valid_annotations = [a for a in annotations if a['frame'] < total_frames]

        # 检查帧号连续性
        check_frame_continuity(valid_annotations)

        # 验证对齐结果
        validate_alignment(video_path, valid_annotations)


# 执行处理
annotations_prepare_frames(matched_files)



处理视频文件: D:\ESD\test\1\M_20230822145107_U2291907_1_001_0001-01.MP4
处理标注文件: D:\ESD\test\1\20230822145107_U2291907_1_001_0001-01.txt
标注数量: 35250
视频总帧数: 40060


NameError: name 'check_frame_continuity' is not defined

NameError: name 'annotations_prepare_frams' is not defined