In [1]:
import os
import cv2
import numpy as np
import json
from tqdm import tqdm

### 비디오 데이터 전처리 함수

In [2]:
def preprocess_video_every_3_seconds(video_path:str, frame_size:tuple, block_nums:int, frame_rate=3):
    """
    Extracts frames every 3 seconds from a video file, resizing them to frame_size and converting to grayscale.
    
    Args:
    video_path (str): Path to the video file.
    frame_size (tuple): Size (height, width) to resize frames.
    block_nums (int) : Total count for three-seconds-blocks
    frame_rate (int): Number of frames to extract per second within the 3-second window.

    Returns:
    List[numpy.ndarray]: List of sequences, where each sequence is a numpy array of shape (num_frames, height, width, 1).
    """

    vidcap = cv2.VideoCapture(video_path)
    fps = vidcap.get(cv2.CAP_PROP_FPS)
    interval = int(fps * 3)

    sequences = []
    while True:
        frames = []
        for _ in range(interval):
            success, frame = vidcap.read()
            if not success:
                break
            frame = cv2.resize(frame, frame_size, interpolation=cv2.INTER_AREA)
            gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            gray_frame = np.expand_dims(gray_frame, axis=-1)  # 채널 수 늘려줌
            gray_frame = gray_frame.astype(np.float32) / 255.0 
            frames.append(gray_frame)

        if len(frames) == 0:
            break
        
        if len(frames) >= frame_rate : 
            sequences.append(np.array(frames[:frame_rate * 3]))  # 모든 frame이 3초단위로 들어갈 수 있도록 제어
        
        if len(sequences) > block_nums:
            break

    vidcap.release()
    return np.array(sequences[:-1])


In [3]:
def parse_annotations(annotations:list):
    """
    Extracts Every Annotation from json label file
    
    Args:
    annotations(List): List of Dictionary for annotations label with highlight and represent

    Returns:
    Dict: Whether each block is Highlight or not
    """
    highlight_map = {}
    
    for annot in annotations:
        block_num = annot['highlight']
        for num in block_num:
            highlight_map[num] = 1
            
    ret = [0] * len(highlight_map)
    for i, item in enumerate(highlight_map.items()):
        ret[item] = 1
                
    return highlight_map

#### Audio 데이터 전처리 함수

In [4]:
import librosa 
import librosa.display as dsp
from IPython.display import Audio
from moviepy.editor import VideoFileClip

In [5]:
def extract_audio(video_path, audio_path):
    # 비디오 파일 열기
    video_clip = VideoFileClip(video_path)
    
    # 오디오 추출 및 저장
    audio_clip = video_clip.audio
    audio_clip.write_audiofile(audio_path, verbose=False, logger=None)

    # 파일 닫기
    video_clip.close()

In [6]:
def preprocess_audio(audio_path, sample_rate=22050, n_fft=2048, hop_length=512, n_mels=130, segment_duration=3):
    # 오디오 파일 로드
    audio, sr = librosa.load(audio_path, sr=sample_rate)
    
    # 세그먼트 길이 계산 (샘플 단위)
    segment_length = int(sr * segment_duration)
    
    # 오디오를 3초 단위 세그먼트로 나누기
    segments = []
    num_segments = len(audio) // segment_length
    for i in range(num_segments):
        start_idx = i * segment_length
        end_idx = start_idx + segment_length
        segment = audio[start_idx:end_idx]
        
        # 멜 스펙트로그램 추출 및 디시벨 변환
        mel_spectrogram = librosa.feature.melspectrogram(y=segment, sr=sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels)
        mel_spectrogram_db = librosa.power_to_db(mel_spectrogram, ref=np.max)
        
        segments.append(mel_spectrogram_db)
    
    return np.array(segments)

In [7]:
# 무비 파일 경로
video_path = 'data/원천데이터/2~5분/test.mp4'
audio_path = 'audio.wav'

# 오디오 추출
extract_audio(video_path, audio_path)

### 전처리 진행

In [8]:
# video_length = ['2~5분', '5~20분']
video_length = ['2~5분']

In [9]:
new_video_data = []
output_json_path = f'processed/label/processed_video_data.json'

for i, leng in enumerate(video_length):

    output_video_dir = 'processed/video/'
    output_wav_dir = 'processed/wav/'
    output_audio_dir = 'processed/audio/'

    json_path = f'data/라벨링데이터/video_summary_validation_data({leng}).json'
    video_path = f'data/원천데이터/{leng}/'

    with open(json_path, 'r', encoding='utf-8') as f:
        label_data = json.load(f)

    if i == 0:
        video_idx = 1

    for item in tqdm(label_data):
        input_video_name = item['filename'] + '.mp4'
        input_video_path = os.path.join(video_path, input_video_name)

        output_video_name = f"processed_video_{video_idx}.npy"
        output_video_path = os.path.join(output_video_dir, output_video_name)
        
        output_wav_name = f"processed_video_{video_idx}.wav"
        output_wav_path =  os.path.join(output_wav_dir, output_wav_name)

        output_audio_name = f"processed_audio_{video_idx}.npy"
        output_audio_path =  os.path.join(output_audio_dir, output_audio_name)

        if not os.path.exists(input_video_path):
            print(f"Not Found : {input_video_path}")
            continue


        ######################################
        # 오디오 데이터 분리 후 저장
        extract_audio(input_video_path, output_wav_path) # mp4에서 오디오(wav) 추출 및 저장

        ######################################
        # 영상 전처리 진행 및 저장
        blocks_num = item["three_secs"][-1] + 1
        # print(item)
        annotations = item['annots']

        output = preprocess_video_every_3_seconds(input_video_path, (256, 256), blocks_num)
        np.save(output_video_path, output)

        ######################################
        # 오디오 전처리 및 학습 가능 파일로 저장
        # 저장 및 동기화 시간을 고려하여 영상 전처리 후 마지막 순서에 배치
        mel_spectrogram_segments = preprocess_audio(output_wav_path)
        np.save(output_audio_path, mel_spectrogram_segments)


        category = item["category"]

        item['filename'] = output_video_name
        item['category'] = category.encode('utf-8').decode()
        item['video_path'] = output_video_path
        item['audio_path'] = output_audio_path
        item['quality'] = '256 256' # 추 후에 데이터 사용할 때, split으로 사용할 수 있게 띄워쓰기로 구분

        video_idx += 1
        new_video_data.append(item)

# 전처리된 데이터에 대해 라벨을 새로 저장해줌
with open(output_json_path, 'w', encoding='utf-8') as f:
    json.dump(new_video_data, f, ensure_ascii=False, indent=2)

print(f"Process Finish :: {leng}")

100%|██████████| 99/99 [58:58<00:00, 35.75s/it]  

Process Finish :: 2~5분





In [11]:
from moviepy.editor import VideoFileClip, concatenate_videoclips

def extract_and_merge_segments(input_video_path, segments, output_video_path):
    # Load the video
    video = VideoFileClip(input_video_path)
    
    # Extract segments
    clips = []
    for start, end in segments:
        try:
            clip = video.subclip(start, end)
            if clip.duration is not None and clip.duration > 0:
                clips.append(clip)
            else:
                print(f"Invalid clip duration for segment ({start}, {end}): {clip.duration}")
        except Exception as e:
            print(f"Error processing segment ({start}, {end}): {e}")
    
    # Debug information for clip durations
    for i, clip in enumerate(clips):
        print(f"Clip {i}: start={segments[i][0]}, end={segments[i][1]}, duration={clip.duration}")

    # Concatenate clips
    if clips:
        try:
            final_clip = concatenate_videoclips(clips)
            # Write the result to a file
            final_clip.write_videofile(output_video_path, codec="libx264")
        except Exception as e:
            print(f"Error concatenating clips: {e}")
    else:
        print("No valid clips to concatenate")


In [13]:
# Example usage


Clip 0: start=3, end=6, duration=537.96
Clip 1: start=15, end=18, duration=525.96
Moviepy - Building video output_video.mp4.
MoviePy - Writing audio in output_videoTEMP_MPY_wvf_snd.mp3


                                                                        

MoviePy - Done.
Moviepy - Writing video output_video.mp4

Error concatenating clips: must be real number, not NoneType




In [11]:
from moviepy.editor import VideoFileClip, concatenate_videoclips

def make_clip_video(path, save_path, segments):
    clip_video = VideoFileClip(path)
    clips = []
    for segment in segments:
        clips.append(clip_video.subclip(segment[0], segment[1]))
        
    combined = concatenate_videoclips(clips)
    combined.write_videofile(save_path, fps=clip_video.fps)

In [12]:
segments = [(3, 6), (15, 18)]
input_video = "Pipeline/test.mp4"  # Path to the input video
output_video = "Pipeline/output_video.mp4"  # Path to save the output video

make_clip_video(input_video, output_video, segments)

Moviepy - Building video Pipeline/output_video.mp4.
MoviePy - Writing audio in output_videoTEMP_MPY_wvf_snd.mp3


                                                                      

MoviePy - Done.
Moviepy - Writing video Pipeline/output_video.mp4





TypeError: must be real number, not NoneType

In [2]:
import os
os.getenv("HOME")

'/Users/idaeho'

- 아래 코드로 moviepy.editor import 에러를 해결할 수 있었다.

In [2]:
import os 
os.environ["IMAGEIO_FFMPEG_EXE"] = "/opt/homebrew/bin/ffmpeg"
os.environ['FFMPEG_BINARY'] = "/opt/homebrew/bin/ffmpeg"

In [3]:
from moviepy.editor import VideoFileClip

In [1]:
min_length = 35
ratio = 0.5

In [2]:
int(min_length * ratio)

17