
# 提取视频帧

extract_frames_from_non_hate_video 函数：
从非仇恨视频中提取的帧数由 target_frames 参数决定，默认是 100 帧。
如果提取的帧数不足 100 帧，会用白帧填充。

extract_frames_from_hate_video 函数：
从仇恨视频中提取的帧数也是由 target_frames 参数决定，默认是 100 帧。
如果提取的帧数不足 100 帧，会从非仇恨时间段中随机提取帧，或者用白帧填充。 

In [4]:
import pandas as pd
import cv2
import numpy as np
import os
from typing import List, Tuple

# Define the function to get video information
def get_video_info(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error opening video file: {video_path}")
        return None, None
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    cap.release()
    return total_frames, fps

# Function to check if a frame is near monochrome (all black/white)
def is_monochrome(frame: np.ndarray, threshold: int = 15) -> bool:
    """
    Check if the frame is monochrome.
    A frame is considered monochrome if the standard deviation of its grayscale pixel values is below a threshold.
    """
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    return np.std(gray_frame) < threshold

# Define the function to extract and filter frames from a non-hate video
def extract_frames_from_non_hate_video(video_path, target_frames=100):
    # Get video info
    total_frames, fps = get_video_info(video_path)
    if total_frames is None:
        return []  # If video info couldn't be retrieved, return an empty list

    # Open the video file
    cap = cv2.VideoCapture(video_path)
    
    # Calculate the interval for frame extraction
    interval = max(1, total_frames // target_frames)
    
    # Initialize list to store frames
    extracted_frames = []
    
    # Extract frames
    for i in range(0, total_frames, interval):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i)
        ret, frame = cap.read()
        if not ret or is_monochrome(frame):
            continue  # Skip the frame if it's not successfully read or is monochrome
        extracted_frames.append(frame)
        if len(extracted_frames) == target_frames:
            break  # Stop if we have extracted the target number of frames
    
    # If not enough frames extracted, add white frames
    # check if extracted_frames is empty. If so, use the video resolution to create a white frame.
    if len(extracted_frames) < target_frames:
        if extracted_frames:
            white_frame = 255 * np.ones_like(extracted_frames[0])  # Assuming all frames are of the same dimension
        else:
            # If no frames were extracted, we need to get the video resolution to create a white frame
            cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
            ret, frame = cap.read()
            if ret:
                white_frame = 255 * np.ones_like(frame)
            else:
                # If we can't read a frame to get the resolution, we will default to 1080p (1920x1080)
                white_frame = 255 * np.ones((1080, 1920, 3), dtype=np.uint8)
        
        # Append white frames until we have 100 frames
        while len(extracted_frames) < target_frames:
            extracted_frames.append(white_frame)
    
    cap.release()
    return extracted_frames

# Define a function to save frames to disk
def save_frames(frames, output_dir, video_name):
    # Make sure the output directory exists
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    # Save each frame to disk
    for i, frame in enumerate(frames):
        frame_filename = f"{video_name}_frame_{i:03d}.png"
        frame_path = os.path.join(output_dir, frame_filename)
        cv2.imwrite(frame_path, frame)

# Define the directory for non-hate videos
non_hate_videos_dir = '../Datas/non_hate_videos'

# Define the output directory for extracted frames
output_frames_dir = '../Datas/extract_non_hate_frame'

# List all non-hate videos
non_hate_video_files = [f for f in os.listdir(non_hate_videos_dir) if f.endswith('.mp4')]

# Process each non-hate video and save the frames
for video_file in non_hate_video_files:
    video_path = os.path.join(non_hate_videos_dir, video_file)
    frames = extract_frames_from_non_hate_video(video_path)
    save_frames(frames, output_frames_dir, video_file.split('.')[0])  # We use the video name without the extension

# Let's print out a success message
print("Finished extracting and saving frames for non-hate videos.")


Finished extracting and saving frames for non-hate videos.


In [2]:
# Define the function to convert a time string to a frame index
def time_to_frame_index(time_str, fps):
    """
    Convert a time string in the format HH:MM:SS to a frame index.
    Args:
    - time_str (str): Time string to convert.
    - fps (float): Frames per second of the video.

    Returns:
    - int: Frame index corresponding to the given time.
    """
    # Split the time string into hours, minutes, and seconds
    h, m, s = map(int, time_str.split(':'))
    # Calculate the frame index
    frame_index = (h * 3600 + m * 60 + s) * fps
    return int(frame_index)

# Define the function to extract frames from hate videos based on annotations
def extract_frames_from_hate_video(video_path, hate_intervals, target_frames=100):
    # Get video info
    total_frames, fps = get_video_info(video_path)
    if total_frames is None:
        return []  # If video info couldn't be retrieved, return an empty list
    
    # Calculate the frame indices from hate intervals
    hate_frame_indices = []
    for start_time, end_time in hate_intervals:
        start_frame = time_to_frame_index(start_time, fps)
        end_frame = time_to_frame_index(end_time, fps)
        hate_frame_indices.extend(range(start_frame, min(end_frame, total_frames)))

    # Open the video file
    cap = cv2.VideoCapture(video_path)
    
    # Initialize list to store frames
    extracted_frames = []
    
    # Extract frames based on hate intervals
    for idx in sorted(set(hate_frame_indices)):
        if len(extracted_frames) == target_frames:
            break  # Stop if we have extracted the target number of frames
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if not ret or is_monochrome(frame):
            continue  # Skip the frame if it's not successfully read or is monochrome
        extracted_frames.append(frame)
    
    # If not enough frames extracted, add remaining video frames or white frames
    additional_frame_indices = [i for i in range(total_frames) if i not in hate_frame_indices]
    np.random.shuffle(additional_frame_indices)  # Shuffle to get a random sampling of remaining frames
    for idx in additional_frame_indices:
        if len(extracted_frames) == target_frames:
            break  # Stop if we have reached the target number of frames
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if not ret or is_monochrome(frame):
            continue  # Skip the frame if it's not successfully read or is monochrome
        extracted_frames.append(frame)
    
    # If still not enough frames, add white frames
    if len(extracted_frames) < target_frames:
        if extracted_frames:
            white_frame = 255 * np.ones_like(extracted_frames[0])
        else:
            # If no frames were extracted, we need to get the video resolution to create a white frame
            cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
            ret, frame = cap.read()
            if ret:
                white_frame = 255 * np.ones_like(frame)
            else:
                # If we can't read a frame to get the resolution, we will default to 1080p (1920x1080)
                white_frame = 255 * np.ones((1080, 1920, 3), dtype=np.uint8)
        extracted_frames += [white_frame] * (target_frames - len(extracted_frames))
    
    cap.release()
    return extracted_frames

# Define the directory for hate videos
hate_videos_dir = '../Datas/hate_videos'

# Define the output directory for extracted hate frames
output_hate_frames_dir = '../Datas/extract_hate_frame'
# Load annotations
annotations_path = '../Datas/HateMM_annotation.csv'
annotations = pd.read_csv(annotations_path)
# Process each hate video and save the frames based on annotations
for index, row in annotations.iterrows():
    if row['label'] == 'Hate':
        # Construct the video path
        video_path = os.path.join(hate_videos_dir, row['video_file_name'])
        
        # Parse hate intervals from the annotation
        hate_intervals = eval(row['hate_snippet'])
        
        # Extract frames from the hate video
        frames = extract_frames_from_hate_video(video_path, hate_intervals)
        
        # Save the frames to disk
        save_frames(frames, output_hate_frames_dir, row['video_file_name'].split('.')[0])

# Let's print out a success message
print("Finished extracting and saving frames for hate videos.")


NameError: name 'pd' is not defined

In [None]:
import pandas as pd
import cv2
import numpy as np
import os
from concurrent.futures import ThreadPoolExecutor
from moviepy.editor import VideoFileClip

# 定义获取视频信息的函数
def get_video_info(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error opening video file: {video_path}")
        return None, None
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    cap.release()
    return total_frames, fps

# 检测单色帧的函数
def is_monochrome(frame: np.ndarray, threshold: int = 15) -> bool:
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    return np.std(gray_frame) < threshold

# 从非仇恨视频中提取帧的函数
def extract_frames_from_non_hate_video(video_path, target_frames=100):
    total_frames, fps = get_video_info(video_path)
    if total_frames is None:
        return []
    
    cap = cv2.VideoCapture(video_path)
    interval = max(1, total_frames // target_frames)
    extracted_frames = []
    
    for i in range(0, total_frames, interval):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i)
        ret, frame = cap.read()
        if not ret or is_monochrome(frame):
            continue
        extracted_frames.append(frame)
        if len(extracted_frames) == target_frames:
            break
    
    if len(extracted_frames) < target_frames:
        if extracted_frames:
            white_frame = 255 * np.ones_like(extracted_frames[0])
        else:
            cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
            ret, frame = cap.read()
            white_frame = 255 * np.ones_like(frame) if ret else 255 * np.ones((1080, 1920, 3), dtype=np.uint8)
        while len(extracted_frames) < target_frames:
            extracted_frames.append(white_frame)
    
    cap.release()
    return extracted_frames

# 保存帧到磁盘的函数
def save_frames(frames, output_dir, video_name):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    for i, frame in enumerate(frames):
        frame_filename = f"{video_name}_frame_{i:03d}.png"
        frame_path = os.path.join(output_dir, frame_filename)
        cv2.imwrite(frame_path, frame)

# 时间字符串转帧索引的函数
def time_to_frame_index(time_str, fps):
    h, m, s = map(int, time_str.split(':'))
    frame_index = (h * 3600 + m * 60 + s) * fps
    return int(frame_index)

# 从仇恨视频中提取帧的函数
def extract_frames_from_hate_video(video_path, hate_intervals, target_frames=100):
    total_frames, fps = get_video_info(video_path)
    if total_frames is None:
        return []
    
    hate_frame_indices = []
    for start_time, end_time in hate_intervals:
        start_frame = time_to_frame_index(start_time, fps)
        end_frame = time_to_frame_index(end_time, fps)
        hate_frame_indices.extend(range(start_frame, min(end_frame, total_frames)))
    
    cap = cv2.VideoCapture(video_path)
    extracted_frames = []
    
    for idx in sorted(set(hate_frame_indices)):
        if len(extracted_frames) == target_frames:
            break
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if not ret or is_monochrome(frame):
            continue
        extracted_frames.append(frame)
    
    additional_frame_indices = [i for i in range(total_frames) if i not in hate_frame_indices]
    np.random.shuffle(additional_frame_indices)
    for idx in additional_frame_indices:
        if len(extracted_frames) == target_frames:
            break
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if not ret or is_monochrome(frame):
            continue
        extracted_frames.append(frame)
    
    if len(extracted_frames) < target_frames:
        if extracted_frames:
            white_frame = 255 * np.ones_like(extracted_frames[0])
        else:
            cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
            ret, frame = cap.read()
            white_frame = 255 * np.ones_like(frame) if ret else 255 * np.ones((1080, 1920, 3), dtype=np.uint8)
        while len(extracted_frames) < target_frames:
            extracted_frames.append(white_frame)
    
    cap.release()
    return extracted_frames

# 目录路径
hate_videos_dir = '../Datas/hate_videos'
non_hate_videos_dir = '../Datas/non_hate_videos'
output_frames_dir = '../Datas/extract_non_hate_frame'
output_hate_frames_dir = '../Datas/extract_hate_frame'
annotations_path = '../Datas/HateMM_annotation.csv'
annotations = pd.read_csv(annotations_path)

# 提取非仇恨视频帧
def process_non_hate_video(video_file):
    video_path = os.path.join(non_hate_videos_dir, video_file)
    frames = extract_frames_from_non_hate_video(video_path)
    save_frames(frames, output_frames_dir, video_file.split('.')[0])
    print(f"Finished extracting and saving frames for non-hate video: {video_file}")

# 提取仇恨视频帧
def process_hate_video(index, row):
    if row['label'] == 'Hate':
        video_path = os.path.join(hate_videos_dir, row['video_file_name'])
        hate_intervals = eval(row['hate_snippet'])
        frames = extract_frames_from_hate_video(video_path, hate_intervals)
        save_frames(frames, output_hate_frames_dir, row['video_file_name'].split('.')[0])
        print(f"Finished extracting and saving frames for hate video: {row['video_file_name']}")

# 并行处理非仇恨视频
with ThreadPoolExecutor(max_workers=4) as executor:
    non_hate_video_files = [f for f in os.listdir(non_hate_videos_dir) if f.endswith('.mp4')]
    executor.map(process_non_hate_video, non_hate_video_files)

# 并行处理仇恨视频
with ThreadPoolExecutor(max_workers=4) as executor:
    executor.map(process_hate_video, annotations.iterrows())

print("Finished extracting and saving frames for all videos.")
# 主要改进点
# 并行处理：使用 ThreadPoolExecutor 实现多线程并行处理，提高处理速度。
# 错误处理：增加了对视频无法打开或无法读取帧的错误处理。
# 代码重构：将重复的代码提取到函数中，提高代码的可读性和可维护性。
# 日志记录：增加了打印日志信息，以便跟踪处理进度和错误。
# 1. 统一帧数
# 上采样：对帧数较少的视频，通过插值或重复帧来增加帧数。
# 下采样：对帧数较多的视频，通过随机采样或固定间隔采样来减少帧数。
# 2. 数据增强
# 通过数据增强技术，生成更多样本来平衡数据集。以下是一些常见的数据增强技术：

# 旋转：随机旋转帧图像。
# 缩放：随机缩放帧图像。
# 平移：随机平移帧图像。
# 翻转：随机翻转帧图像。
# 噪声：添加随机噪声。

In [2]:
import cv2

def calculate_frames_for_duration(video_path, duration_in_seconds=50):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error opening video file: {video_path}")
        return None
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(duration_in_seconds * fps)
    cap.release()
    return total_frames, fps

# 示例：计算50秒的视频帧数
video_path = '../Datas/hate_videos/hate_video_2.mp4'
frames, fps = calculate_frames_for_duration(video_path)
if frames is not None:
    print(f"The video has {fps} FPS, and 50 seconds is approximately {frames} frames.")


The video has 25.0 FPS, and 50 seconds is approximately 1250 frames.
