In [2]:
import cv2
import numpy as np
from pathlib import Path
from typing import List, Tuple, Optional
import json
from datetime import datetime

class KeyframeExtractor:
    """
    Reusable keyframe extraction utility for detecting scene changes in videos.
    """
    
    def __init__(self, output_dir: str = "extracted_keyframes"):
        """
        Initialize the keyframe extractor.
        
        Args:
            output_dir: Directory where keyframes will be saved
        """
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        
    def extract_keyframes(
        self,
        video_path: str,
        threshold: float = 100.0,
        min_interval_seconds: float = 1.0,
        max_keyframes: Optional[int] = None,
        save_metadata: bool = True
    ) -> List[dict]:
        """
        Extract keyframes from video when significant scene changes occur.
        
        Args:
            video_path: Path to the input video file
            threshold: Scene change detection threshold (0-255, higher = more different)
            min_interval_seconds: Minimum time between keyframes in seconds
            max_keyframes: Maximum number of keyframes to extract (None = unlimited)
            save_metadata: Whether to save extraction metadata as JSON
            
        Returns:
            List of dictionaries containing keyframe info:
            [
                {
                    'frame_number': int,
                    'timestamp': float,
                    'file_path': str,
                    'diff_score': float
                },
                ...
            ]
        """
        cap = cv2.VideoCapture(video_path)
        
        if not cap.isOpened():
            raise ValueError(f"Cannot open video file: {video_path}")
        
        # Get video properties
        fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        duration = total_frames / fps
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        
        print(f"\n{'='*70}")
        print(f"KEYFRAME EXTRACTION")
        print(f"{'='*70}")
        print(f"Video: {Path(video_path).name}")
        print(f"Duration: {duration:.2f}s | Frames: {total_frames} | FPS: {fps:.2f}")
        print(f"Resolution: {width}x{height}")
        print(f"Threshold: {threshold} | Min interval: {min_interval_seconds}s")
        print(f"{'='*70}\n")
        
        min_interval_frames = int(min_interval_seconds * fps)
        
        keyframes = []
        frame_count = 0
        last_keyframe_idx = -min_interval_frames
        
        # Read first frame
        ret, prev_frame = cap.read()
        if not ret:
            cap.release()
            return keyframes
        
        # Save first frame as keyframe
        first_keyframe = self._save_keyframe(
            frame=prev_frame,
            video_path=video_path,
            frame_number=0,
            timestamp=0.0,
            diff_score=0.0,
            keyframe_index=0
        )
        keyframes.append(first_keyframe)
        print(f"✓ Keyframe 0: t=0.00s (first frame)")
        
        prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
        frame_count = 1
        
        while True:
            ret, curr_frame = cap.read()
            
            if not ret:
                break
            
            # Check if max keyframes reached
            if max_keyframes and len(keyframes) >= max_keyframes:
                print(f"\nReached maximum keyframes limit ({max_keyframes})")
                break
            
            # Check minimum interval
            if frame_count - last_keyframe_idx < min_interval_frames:
                frame_count += 1
                continue
            
            # Convert to grayscale for comparison
            curr_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)
            
            # Calculate frame difference
            frame_diff = cv2.absdiff(prev_gray, curr_gray)
            diff_score = np.mean(frame_diff)
            
            # Extract keyframe if significant change detected
            if diff_score > threshold:
                timestamp = frame_count / fps
                
                keyframe_info = self._save_keyframe(
                    frame=curr_frame,
                    video_path=video_path,
                    frame_number=frame_count,
                    timestamp=timestamp,
                    diff_score=diff_score,
                    keyframe_index=len(keyframes)
                )
                
                keyframes.append(keyframe_info)
                last_keyframe_idx = frame_count
                
                print(f"✓ Keyframe {len(keyframes)-1}: t={timestamp:.2f}s "
                      f"(frame {frame_count}, diff={diff_score:.2f})")
            
            prev_gray = curr_gray
            frame_count += 1
            
            # Progress indicator every 100 frames
            if frame_count % 100 == 0:
                progress = (frame_count / total_frames) * 100
                print(f"  Processing... {progress:.1f}% ({frame_count}/{total_frames} frames)", 
                      end='\r')
        
        # Always save last frame if it's not too close to the previous keyframe
        if frame_count - last_keyframe_idx >= min_interval_frames:
            cap.set(cv2.CAP_PROP_POS_FRAMES, total_frames - 1)
            ret, last_frame = cap.read()
            if ret:
                last_keyframe = self._save_keyframe(
                    frame=last_frame,
                    video_path=video_path,
                    frame_number=total_frames - 1,
                    timestamp=duration,
                    diff_score=0.0,
                    keyframe_index=len(keyframes)
                )
                keyframes.append(last_keyframe)
                print(f"✓ Keyframe {len(keyframes)-1}: t={duration:.2f}s (last frame)")
        
        cap.release()
        
        # Save metadata
        if save_metadata:
            self._save_metadata(
                video_path=video_path,
                keyframes=keyframes,
                threshold=threshold,
                min_interval_seconds=min_interval_seconds,
                video_info={
                    'fps': fps,
                    'total_frames': total_frames,
                    'duration': duration,
                    'width': width,
                    'height': height
                }
            )
        
        print(f"\n{'='*70}")
        print(f"✓ EXTRACTION COMPLETE")
        print(f"{'='*70}")
        print(f"Total keyframes extracted: {len(keyframes)}")
        print(f"Average keyframe interval: {duration/len(keyframes):.2f}s")
        print(f"Keyframes saved to: {self.output_dir}")
        print(f"{'='*70}\n")
        
        return keyframes
    
    def _save_keyframe(
        self,
        frame: np.ndarray,
        video_path: str,
        frame_number: int,
        timestamp: float,
        diff_score: float,
        keyframe_index: int
    ) -> dict:
        """Save a single keyframe and return its metadata."""
        video_name = Path(video_path).stem
        filename = f"{video_name}_keyframe_{keyframe_index:04d}_f{frame_number:06d}_t{timestamp:.2f}s.jpg"
        file_path = self.output_dir / filename
        
        # Save with high quality
        cv2.imwrite(str(file_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
        
        return {
            'keyframe_index': keyframe_index,
            'frame_number': frame_number,
            'timestamp': timestamp,
            'file_path': str(file_path),
            'filename': filename,
            'diff_score': diff_score
        }
    
    def _save_metadata(
        self,
        video_path: str,
        keyframes: List[dict],
        threshold: float,
        min_interval_seconds: float,
        video_info: dict
    ):
        """Save extraction metadata as JSON."""
        metadata = {
            'extraction_date': datetime.now().isoformat(),
            'source_video': str(Path(video_path).absolute()),
            'video_info': video_info,
            'extraction_params': {
                'threshold': threshold,
                'min_interval_seconds': min_interval_seconds
            },
            'keyframes': keyframes,
            'total_keyframes': len(keyframes)
        }
        
        video_name = Path(video_path).stem
        metadata_path = self.output_dir / f"{video_name}_metadata.json"
        
        with open(metadata_path, 'w') as f:
            json.dump(metadata, indent=2, fp=f)
        
        print(f"✓ Metadata saved: {metadata_path}")
    

# Example usage and testing
if __name__ == "__main__":
    extractor = KeyframeExtractor(output_dir="keyframes_output")
    
    # Example 1: Basic scene change detection
    print("Example 1: Scene change detection")
    keyframes = extractor.extract_keyframes(
        video_path="realistic_meeting_recording.mp4",
        threshold=100.0,          # Adjust based on your video
        min_interval_seconds=2.0,  # At least 2 seconds between keyframes
        save_metadata=True
    )
    
    print(f"\nExtracted {len(keyframes)} keyframes")
    print("\nFirst 3 keyframes:")
    for kf in keyframes[:3]:
        print(f"  • {kf['filename']} at {kf['timestamp']:.2f}s")

Example 1: Scene change detection

KEYFRAME EXTRACTION
Video: realistic_meeting_recording.mp4
Duration: 150.00s | Frames: 4500 | FPS: 30.00
Resolution: 1920x1080
Threshold: 100.0 | Min interval: 2.0s

✓ Keyframe 0: t=0.00s (first frame)
✓ Keyframe 1: t=45.00s (frame 1350, diff=210.44)
✓ Keyframe 2: t=120.00s (frame 3600, diff=210.34)
✓ Keyframe 3: t=150.00s (last frame)ames)
✓ Metadata saved: keyframes_output\realistic_meeting_recording_metadata.json

✓ EXTRACTION COMPLETE
Total keyframes extracted: 4
Average keyframe interval: 37.50s
Keyframes saved to: keyframes_output


Extracted 4 keyframes

First 3 keyframes:
  • realistic_meeting_recording_keyframe_0000_f000000_t0.00s.jpg at 0.00s
  • realistic_meeting_recording_keyframe_0001_f001350_t45.00s.jpg at 45.00s
  • realistic_meeting_recording_keyframe_0002_f003600_t120.00s.jpg at 120.00s


In [None]:
import cv2
from pathlib import Path

def extract_uniform_frames(
    video_path: str, 
    output_dir: str = "uniform_frames",
    num_frames: int = 10,
    display_time: float = 1.0  # seconds to display each frame if needed
):
    """
    Extract frames uniformly across the video.

    Args:
        video_path: Path to video file
        output_dir: Folder to save extracted frames
        num_frames: Total frames to extract
        display_time: Optional, time to display each frame (not for saving)
    """
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError(f"Cannot open video: {video_path}")
    
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    
    # Calculate frame indices to extract
    step = max(total_frames // num_frames, 1)
    frame_indices = [i*step for i in range(num_frames)]
    
    extracted_frames = []
    for idx, frame_no in enumerate(frame_indices):
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_no)
        ret, frame = cap.read()
        if not ret:
            continue
        # Save frame
        filename = output_dir / f"frame_{idx:03d}_f{frame_no:06d}.jpg"
        cv2.imwrite(str(filename), frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
        extracted_frames.append(filename)
        
        # Optional: Display frame for a while
        cv2.imshow("Frame", frame)
        cv2.waitKey(int(display_time * 1000))  # display_time in ms
    
    cap.release()
    cv2.destroyAllWindows()
    
    print(f"Extracted {len(extracted_frames)} frames to {output_dir}")
    return extracted_frames

# Example usage
if __name__ == "__main__":
    frames = extract_uniform_frames(
        video_path="test_video.mp4",
        num_frames=10,
        display_time=1.0  # display each frame for 1 second
    )


Extracted 10 frames to uniform_frames


In [1]:
import cv2
from pathlib import Path

def extract_uniform_frames(
    video_path: str, 
    output_dir: str = "uniform_frames2",
    num_frames: int = 10,
    display_time: float = 1.0  # seconds to display each frame if needed
):
    """
    Extract frames uniformly across the video.

    Args:
        video_path: Path to video file
        output_dir: Folder to save extracted frames
        num_frames: Total frames to extract
        display_time: Optional, time to display each frame (not for saving)
    """
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError(f"Cannot open video: {video_path}")
    
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    
    # Calculate frame indices to extract
    step = max(total_frames // num_frames, 1)
    frame_indices = [i*step for i in range(num_frames)]
    
    extracted_frames = []
    for idx, frame_no in enumerate(frame_indices):
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_no)
        ret, frame = cap.read()
        if not ret:
            continue
        # Save frame
        filename = output_dir / f"frame_{idx:03d}_f{frame_no:06d}.jpg"
        cv2.imwrite(str(filename), frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
        extracted_frames.append(filename)
        
        # Optional: Display frame for a while
        cv2.imshow("Frame", frame)
        cv2.waitKey(int(display_time * 1000))  # display_time in ms
    
    cap.release()
    cv2.destroyAllWindows()
    
    print(f"Extracted {len(extracted_frames)} frames to {output_dir}")
    return extracted_frames

# Example usage
if __name__ == "__main__":
    frames = extract_uniform_frames(
        video_path="test_video2.mp4",
        num_frames=10,
        display_time=1.0  # display each frame for 1 second
    )


Extracted 10 frames to uniform_frames2
