In [None]:
import cv2
import numpy as np

# Input video
video_path = "videoplayback.mp4"
cap = cv2.VideoCapture(video_path)

# Optical flow params
farneback_params = dict(
    pyr_scale=0.5,
    levels=3,
    winsize=15,
    iterations=3,
    poly_n=5,
    poly_sigma=1.2,
    flags=0
)

ret, prev_frame = cap.read()
if not ret:
    print("Error: Cannot read video")
    cap.release()
    exit()

prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

# Writer for explosion-only output
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter("explosion_detected.avi", fourcc, 20.0, (prev_frame.shape[1], prev_frame.shape[0]))

while True:
    ret, frame = cap.read()
    if not ret:
        break

    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # Frame difference (for sudden brightness increase = explosion)
    diff = cv2.absdiff(gray, prev_gray)
    _, thresh = cv2.threshold(diff, 50, 255, cv2.THRESH_BINARY)

    # If large bright change -> possible explosion
    explosion_area = np.sum(thresh) / 255
    if explosion_area > 5000:  # tune threshold
        # Optical Flow to find direction of sparks
        flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, **farneback_params)
        mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        
        # Draw motion vectors
        step = 16
        h, w = gray.shape
        for y in range(0, h, step):
            for x in range(0, w, step):
                if mag[y, x] > 2:  # strong motion only
                    dx = int(x + flow[y, x, 0])
                    dy = int(y + flow[y, x, 1])
                    cv2.arrowedLine(frame, (x, y), (dx, dy), (0, 0, 255), 2)

        cv2.putText(frame, "Explosion Detected!", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 3)
        out.write(frame)
        cv2.imshow("Explosion & Flow", frame)

    prev_gray = gray

    if cv2.waitKey(30) & 0xFF == 27:
        break

cap.release()
out.release()
cv2.destroyAllWindows()


In [1]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import argparse
import os

class ExplosionDetector:
    def __init__(self, video_path):
        self.video_path = video_path
        self.cap = cv2.VideoCapture(video_path)
        self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
        self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        # Detection parameters
        self.brightness_threshold = 50  # Sudden brightness increase
        self.motion_threshold = 30      # Motion magnitude threshold
        self.explosion_duration = 10    # Frames to consider as one explosion
        
        # Storage for analysis
        self.explosions = []
        self.frame_brightness = []
        self.optical_flow_history = deque(maxlen=5)
        
        # For visualization
        self.explosion_count = 0
        self.current_explosion_frames = []
        
    def calculate_brightness(self, frame):
        """Calculate average brightness of frame"""
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        return np.mean(gray)
    
    def detect_motion_vectors(self, prev_gray, curr_gray):
        """Calculate optical flow to detect motion patterns"""
        # Calculate dense optical flow
        flow = cv2.calcOpticalFlowPyrLK(prev_gray, curr_gray, 
                                      corners=cv2.goodFeaturesToTrack(prev_gray, 100, 0.3, 7),
                                      nextPts=None,
                                      winSize=(15, 15),
                                      maxLevel=2,
                                      criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))
        
        if flow[0] is not None:
            good_old = flow[0][flow[1].ravel() == 1]
            good_new = flow[2][flow[1].ravel() == 1]
            
            if len(good_old) > 0 and len(good_new) > 0:
                # Calculate motion vectors
                motion_vectors = good_new - good_old
                return motion_vectors
        
        return np.array([])
    
    def detect_radial_motion(self, motion_vectors, center_point):
        """Detect if motion is radiating outward from center (explosion pattern)"""
        if len(motion_vectors) == 0:
            return False, []
        
        # Calculate angles of motion vectors relative to center
        radial_count = 0
        radial_vectors = []
        
        for vector in motion_vectors:
            angle = np.arctan2(vector[1], vector[0])
            magnitude = np.linalg.norm(vector)
            
            if magnitude > self.motion_threshold:
                radial_vectors.append((angle, magnitude))
                radial_count += 1
        
        # If significant radial motion detected
        is_radial = radial_count > len(motion_vectors) * 0.3
        return is_radial, radial_vectors
    
    def detect_explosion_in_frame(self, frame, prev_frame, frame_idx):
        """Detect explosion characteristics in current frame"""
        current_brightness = self.calculate_brightness(frame)
        prev_brightness = self.calculate_brightness(prev_frame) if prev_frame is not None else current_brightness
        
        brightness_change = current_brightness - prev_brightness
        
        # Convert to grayscale for motion detection
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY) if prev_frame is not None else gray
        
        # Detect motion vectors
        motion_vectors = self.detect_motion_vectors(prev_gray, gray)
        
        # Find potential explosion center (brightest region)
        blur = cv2.GaussianBlur(gray, (21, 21), 0)
        _, max_val, _, max_loc = cv2.minMaxLoc(blur)
        
        # Check for radial motion from bright center
        is_radial, radial_vectors = self.detect_radial_motion(motion_vectors, max_loc)
        
        # Explosion detection criteria
        is_explosion = (
            brightness_change > self.brightness_threshold and  # Sudden brightness increase
            is_radial and                                       # Radial motion pattern
            len(radial_vectors) > 5                            # Sufficient motion vectors
        )
        
        return is_explosion, {
            'brightness_change': brightness_change,
            'explosion_center': max_loc,
            'radial_vectors': radial_vectors,
            'frame_idx': frame_idx
        }
    
    def draw_explosion_analysis(self, frame, explosion_data):
        """Draw explosion analysis on frame"""
        if not explosion_data:
            return frame
        
        center = explosion_data['explosion_center']
        radial_vectors = explosion_data['radial_vectors']
        
        # Draw explosion center
        cv2.circle(frame, center, 20, (0, 0, 255), 3)
        cv2.putText(frame, 'EXPLOSION CENTER', (center[0]-50, center[1]-30), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
        
        # Draw radial motion vectors
        scale_factor = 50
        for angle, magnitude in radial_vectors:
            end_x = int(center[0] + scale_factor * magnitude * np.cos(angle))
            end_y = int(center[1] + scale_factor * magnitude * np.sin(angle))
            
            cv2.arrowedLine(frame, center, (end_x, end_y), (0, 255, 255), 2, tipLength=0.3)
        
        # Add explosion counter
        cv2.putText(frame, f'Explosions Detected: {self.explosion_count}', 
                   (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
        # Highlight explosion with colored border
        h, w = frame.shape[:2]
        cv2.rectangle(frame, (5, 5), (w-5, h-5), (0, 255, 255), 5)
        
        return frame
    
    def process_video(self):
        """Main processing loop"""
        frame_idx = 0
        prev_frame = None
        last_explosion_frame = -self.explosion_duration
        
        print(f"Processing video: {self.video_path}")
        print(f"Total frames: {self.total_frames}, FPS: {self.fps}")
        
        while True:
            ret, frame = self.cap.read()
            if not ret:
                break
            
            # Detect explosion
            is_explosion, explosion_data = self.detect_explosion_in_frame(frame, prev_frame, frame_idx)
            
            # Store brightness for analysis
            self.frame_brightness.append(self.calculate_brightness(frame))
            
            if is_explosion and (frame_idx - last_explosion_frame) > self.explosion_duration:
                self.explosion_count += 1
                last_explosion_frame = frame_idx
                self.explosions.append(explosion_data)
                print(f"Explosion #{self.explosion_count} detected at frame {frame_idx} ({frame_idx/self.fps:.2f}s)")
            
            # Draw analysis on frame
            if is_explosion or (frame_idx - last_explosion_frame) < self.explosion_duration:
                frame = self.draw_explosion_analysis(frame, explosion_data if is_explosion else self.explosions[-1] if self.explosions else None)
            
            # Display frame
            frame_resized = cv2.resize(frame, (1280, 720))  # Resize for display
            cv2.imshow('Explosion Detection', frame_resized)
            
            # Control playback
            key = cv2.waitKey(int(1000/self.fps)) & 0xFF
            if key == ord('q'):
                break
            elif key == ord(' '):  # Pause/unpause
                cv2.waitKey(0)
            
            prev_frame = frame.copy()
            frame_idx += 1
        
        self.cap.release()
        cv2.destroyAllWindows()
        
        self.generate_analysis_report()
    
    def generate_analysis_report(self):
        """Generate detailed analysis report"""
        print("\n" + "="*60)
        print("EXPLOSION ANALYSIS REPORT")
        print("="*60)
        print(f"Video: {self.video_path}")
        print(f"Total Explosions Detected: {self.explosion_count}")
        print(f"Video Duration: {self.total_frames/self.fps:.2f} seconds")
        print(f"Explosion Rate: {self.explosion_count/(self.total_frames/self.fps):.3f} explosions/second")
        
        if self.explosions:
            print("\nExplosion Details:")
            for i, explosion in enumerate(self.explosions, 1):
                frame_idx = explosion['frame_idx']
                timestamp = frame_idx / self.fps
                print(f"  Explosion {i}:")
                print(f"    Time: {timestamp:.2f}s (Frame {frame_idx})")
                print(f"    Center: {explosion['explosion_center']}")
                print(f"    Brightness Change: {explosion['brightness_change']:.2f}")
                print(f"    Motion Vectors: {len(explosion['radial_vectors'])}")
        
        # Plot brightness over time
        self.plot_brightness_analysis()
    
    def plot_brightness_analysis(self):
        """Plot brightness analysis with explosion markers"""
        if not self.frame_brightness:
            return
        
        time_axis = np.arange(len(self.frame_brightness)) / self.fps
        
        plt.figure(figsize=(12, 6))
        plt.plot(time_axis, self.frame_brightness, 'b-', linewidth=1, label='Frame Brightness')
        
        # Mark explosions
        explosion_times = [exp['frame_idx']/self.fps for exp in self.explosions]
        explosion_brightness = [self.frame_brightness[exp['frame_idx']] for exp in self.explosions if exp['frame_idx'] < len(self.frame_brightness)]
        
        if explosion_times:
            plt.scatter(explosion_times[:len(explosion_brightness)], explosion_brightness, 
                       color='red', s=100, marker='*', label='Explosions', zorder=5)
        
        plt.xlabel('Time (seconds)')
        plt.ylabel('Average Frame Brightness')
        plt.title(f'Brightness Analysis - {self.explosion_count} Explosions Detected')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()

def main():
    parser = argparse.ArgumentParser(description='Explosion Detection and Analysis')
    parser.add_argument('video_path', help='Path to video file')
    parser.add_argument('--brightness_threshold', type=float, default=50, 
                       help='Brightness change threshold for explosion detection')
    parser.add_argument('--motion_threshold', type=float, default=30,
                       help='Motion magnitude threshold')
    
    args = parser.parse_args()
    
    if not os.path.exists(args.video_path):
        print(f"Error: Video file '{args.video_path}' not found!")
        return
    
    # Create detector instance
    detector = ExplosionDetector(args.video_path)
    detector.brightness_threshold = args.brightness_threshold
    detector.motion_threshold = args.motion_threshold
    
    # Process video
    detector.process_video()

if __name__ == "__main__":
    # For direct execution without command line args
    import sys
    if len(sys.argv) == 1:
        # Option 1: Hardcode your video path here
        video_path = r"C:\path\to\your\video.mp4"  # Replace with your actual path
        
        # Option 2: Or let user input the path
        if not os.path.exists(video_path) or video_path == r"videoplayback.mp4":
            video_path = input("Enter video file path: ")
        
        if os.path.exists(video_path):
            print(f"Loading video: {video_path}")
            detector = ExplosionDetector(video_path)
            detector.process_video()
        else:
            print("File not found!")
    else:
        main()

usage: ipykernel_launcher.py [-h]
                             [--brightness_threshold BRIGHTNESS_THRESHOLD]
                             [--motion_threshold MOTION_THRESHOLD]
                             video_path
ipykernel_launcher.py: error: the following arguments are required: video_path


AttributeError: 'tuple' object has no attribute 'tb_frame'

In [2]:
import cv2
import numpy as np

def detect_explosions():
    video_path = "Fire-Cracker.mp4"              # 🔹 Input path fixed here
    output_path = "Explosion_Output.mp4"         # 🔹 Output video path

    cap = cv2.VideoCapture(video_path)

    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    ret, prev_frame = cap.read()
    if not ret:
        print("Error reading video.")
        return

    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

    # Auto thresholds (relative to scene brightness)
    avg_brightness = np.mean(prev_gray)
    explosion_threshold = avg_brightness * 200   # motion threshold
    min_flow_magnitude = 2.0

    explosion_count = 0
    cooldown = 0  # avoid double counting

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        diff = cv2.absdiff(prev_gray, gray)
        _, thresh = cv2.threshold(diff, 40, 255, cv2.THRESH_BINARY)
        motion_area = np.sum(thresh) / 255

        if motion_area > explosion_threshold and cooldown == 0:
            explosion_count += 1
            cooldown = fps // 2  # half-second cooldown
            cv2.putText(frame, f"Explosion #{explosion_count}", (30, 50),
                        cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0,0,255), 3)

            # Optical flow for spark/fire direction
            flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None,
                                                0.5, 3, 15, 3, 5, 1.2, 0)
            step = 20
            h, w = gray.shape
            y, x = np.mgrid[step//2:h:step, step//2:w:step].reshape(2,-1).astype(int)
            fx, fy = flow[y, x].T

            for (x1, y1, dx, dy) in zip(x, y, fx, fy):
                if dx**2 + dy**2 > min_flow_magnitude**2:
                    cv2.arrowedLine(frame, (x1, y1),
                                    (int(x1+dx), int(y1+dy)),
                                    (0,255,0), 2, tipLength=0.5)

        if cooldown > 0:
            cooldown -= 1

        prev_gray = gray.copy()
        out.write(frame)

    cap.release()
    out.release()
    print(f"✅ Done. Explosions detected: {explosion_count}")
    print(f"💾 Output saved at {output_path}")


# Run it
detect_explosions()


✅ Done. Explosions detected: 8
💾 Output saved at Explosion_Output.mp4


Updated

In [11]:
import cv2
import numpy as np

def detect_explosions():
    video_path = "Fire-Cracker.mp4"              # Input path
    output_path = "Explosion_Output-1.mp4"         # Output video

    cap = cv2.VideoCapture(video_path)

    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    ret, prev_frame = cap.read()
    if not ret:
        print("❌ Error reading video.")
        return

    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

    # Auto thresholds (relative to scene brightness)
    avg_brightness = np.mean(prev_gray)
    explosion_threshold = avg_brightness * 200   # motion threshold
    min_flow_magnitude = 2.0

    explosion_count = 0
    cooldown = 0       # prevents double-counting
    show_arrows = False

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        diff = cv2.absdiff(prev_gray, gray)
        _, thresh = cv2.threshold(diff, 40, 255, cv2.THRESH_BINARY)
        motion_area = np.sum(thresh) / 255

        # Detect explosion only when a big spike occurs & cooldown is over
        if motion_area > explosion_threshold and cooldown == 0:
            explosion_count += 1
            cooldown = fps * 2   # 2 seconds cooldown to avoid duplicates
            show_arrows = True

            cv2.putText(frame, f"Explosion #{explosion_count}", (30, 50),
                        cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0,0,255), 3)

        # If explosion is happening (sparks still moving)
        if show_arrows:
            # Optical flow (motion direction)
            flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None,
                                                0.5, 3, 15, 3, 5, 1.2, 0)
            step = 20
            h, w = gray.shape
            y, x = np.mgrid[step//2:h:step, step//2:w:step].reshape(2,-1).astype(int)
            fx, fy = flow[y, x].T

            # Draw arrows for sparks
            for (x1, y1, dx, dy) in zip(x, y, fx, fy):
                if dx**2 + dy**2 > min_flow_magnitude**2:
                    cv2.arrowedLine(frame, (x1, y1),
                                    (int(x1+dx), int(y1+dy)),
                                    (0,255,0), 2, tipLength=0.5)

            # Highlight explosion region with circle
            contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            for cnt in contours:
                if cv2.contourArea(cnt) > 500:  # ignore noise
                    (x_c, y_c), radius = cv2.minEnclosingCircle(cnt)
                    center = (int(x_c), int(y_c))
                    cv2.circle(frame, center, int(radius), (0,0,255), 3)

            # Stop showing arrows if motion dies down
            if motion_area < explosion_threshold * 0.2:
                show_arrows = False

        if cooldown > 0:
            cooldown -= 1

        # Show total explosions on every frame
        cv2.putText(frame, f"Total Explosions: {explosion_count}", (30, height - 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255,255,0), 2)

        prev_gray = gray.copy()
        out.write(frame)

    cap.release()
    out.release()

    print(f"✅ Done. Total Explosions Detected: {explosion_count}")
    print(f"💾 Output saved as {output_path}")


# Run it
detect_explosions()


✅ Done. Total Explosions Detected: 4
💾 Output saved as Explosion_Output-1.mp4


In [12]:
#!/usr/bin/env python3
"""
Stable Explosion Detector

- Input: "Fire-Cracker.mp4" (fixed filename - change if needed)
- Output: "Explosion_Output_stable.mp4"

Behavior:
- Confirms an explosion only when area + brightness spike + motion magnitude hold for N frames.
- Keeps arrows and a highlight circle visible until motion decays for several frames.
- Uses sparse optical flow (PyrLK) for arrow directions inside each active bbox.
- Designed to be robust to short transient sparks and to generalize to other videos.

Tune thresholds near the top if you see false positives/negatives.
"""

import cv2
import numpy as np
import os
from collections import deque
import math
import time

# ---------- PARAMETERS (tweak these if needed) ----------
INPUT_VIDEO = "Fire-Cracker.mp4"
OUTPUT_VIDEO = "Explosion_Output-2.mp4"

ROLLING_BG_FRAMES = 6         # frames used to compute rolling background brightness
BRIGHT_DELTA = 15.0           # local brightness must exceed rolling background by this
MIN_AREA_FRAC = 0.002         # min motion area fraction of frame to consider (e.g., 0.002 -> 0.2%)
MIN_FLOW_MAG = 1.5            # mean optical-flow magnitude threshold inside bbox for candidate
MIN_CONFIRM_FRAMES = 2        # candidate must satisfy criteria for this many consecutive frames to be confirmed
DECAY_FRAMES = 10             # after motion drops, keep showing arrows/circle for this many frames
IOU_MATCH_THRESH = 0.25       # IoU threshold to match detection to existing event
MAX_FEATURES_PER_BOX = 80     # max points to sample inside bbox for PyrLK
FEATURE_QUALITY = 0.01
FEATURE_MIN_DIST = 6
MOTION_DIFF_THRESH = 30       # threshold for absdiff -> binary motion map
DOWNSCALE_WIDTH = None        # set to int width to downscale for speed, or None to keep original
# ------------------------------------------------------

def box_iou(boxA, boxB):
    xA,yA,wA,hA = boxA; xB,yB,wB,hB = boxB
    xA2,yA2 = xA+wA, yA+hA
    xB2,yB2 = xB+wB, yB+hB
    xi1 = max(xA, xB); yi1 = max(yA, yB)
    xi2 = min(xA2, xB2); yi2 = min(yA2, yB2)
    inter_w = max(0, xi2-xi1); inter_h = max(0, yi2-yi1)
    inter = inter_w * inter_h
    union = wA*hA + wB*hB - inter
    return inter/union if union>0 else 0.0

def clamp_bbox_to_frame(x,y,w,h,fw,fh):
    x = max(0, min(x, fw-1))
    y = max(0, min(y, fh-1))
    w = max(1, min(w, fw-x))
    h = max(1, min(h, fh-y))
    return (x,y,w,h)

def detect_explosions_stable():
    if not os.path.exists(INPUT_VIDEO):
        raise FileNotFoundError(f"Input video not found: {INPUT_VIDEO}")

    cap = cv2.VideoCapture(INPUT_VIDEO)
    if not cap.isOpened():
        raise RuntimeError("Cannot open video")

    fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
    orig_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0)
    orig_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0)

    # optional downscale for speed
    if DOWNSCALE_WIDTH and orig_w>0:
        scale = DOWNSCALE_WIDTH / orig_w
        W = DOWNSCALE_WIDTH
        H = int(orig_h * scale)
    else:
        W, H = orig_w, orig_h

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(OUTPUT_VIDEO, fourcc, fps, (W, H))
    if not out.isOpened():
        # fallback to avi
        out = cv2.VideoWriter(os.path.splitext(OUTPUT_VIDEO)[0]+".avi", cv2.VideoWriter_fourcc(*"XVID"), fps, (W,H))

    ret, first = cap.read()
    if not ret:
        cap.release()
        raise RuntimeError("Could not read first frame")
    if (W,H) != (orig_w, orig_h):
        first = cv2.resize(first, (W,H))
    prev_gray = cv2.cvtColor(first, cv2.COLOR_BGR2GRAY)
    prev_gray = cv2.GaussianBlur(prev_gray, (5,5), 0)

    # rolling background brightness (global) - initialize with first frame
    bg_brightness_hist = deque([float(np.mean(prev_gray))]*ROLLING_BG_FRAMES, maxlen=ROLLING_BG_FRAMES)

    # Events structure
    # Each event is a dict:
    # {id, bbox, active (bool), frames_confirmed (int), decay (int), last_seen_frame (int)}
    events = []
    next_event_id = 1
    confirmed_count = 0

    frame_idx = 0
    start_time = time.time()

    # LK params for sparse flow
    feature_params = dict(maxCorners=MAX_FEATURES_PER_BOX, qualityLevel=FEATURE_QUALITY,
                          minDistance=FEATURE_MIN_DIST, blockSize=7)
    lk_params = dict(winSize=(21,21), maxLevel=3,
                     criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01))

    while True:
        ret, frame = cap.read()
        frame_idx += 1
        if not ret:
            break
        if (W,H) != (orig_w, orig_h):
            frame = cv2.resize(frame, (W,H))

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        gray = cv2.GaussianBlur(gray, (5,5), 0)

        # motion map via frame diff
        diff = cv2.absdiff(gray, prev_gray)
        _, motion_map = cv2.threshold(diff, MOTION_DIFF_THRESH, 255, cv2.THRESH_BINARY)
        kernel = np.ones((5,5), np.uint8)
        motion_map = cv2.morphologyEx(motion_map, cv2.MORPH_CLOSE, kernel, iterations=1)
        motion_map = cv2.morphologyEx(motion_map, cv2.MORPH_OPEN, kernel, iterations=1)

        # contours of motion
        contours_info = cv2.findContours(motion_map, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        contours = contours_info[0] if len(contours_info)==2 else contours_info[1]

        # Update rolling bg brightness (exclude current frame then compute local delta against mean of history)
        bg_mean = float(np.mean(bg_brightness_hist))

        # frame-relative thresholds
        frame_area = float(W * H)
        min_area_pixels = max(40, int(MIN_AREA_FRAC * frame_area))

        # For each motion contour, compute local metrics and try to match / create events
        detected_boxes = []
        candidate_infos = []  # hold info for each candidate contour
        for cnt in contours:
            area = cv2.contourArea(cnt)
            if area < min_area_pixels:
                continue
            x,y,w,h = cv2.boundingRect(cnt)
            x,y,w,h = clamp_bbox_to_frame(x,y,w,h,W,H)
            detected_boxes.append((x,y,w,h))

            # local brightness
            roi = gray[y:y+h, x:x+w]
            if roi.size == 0:
                continue
            local_mean = float(np.mean(roi))
            bright_delta_local = local_mean - bg_mean

            # estimate mean optical flow magnitude inside bbox using sparse LK on prev_gray -> gray
            mask = np.zeros_like(prev_gray)
            mask[y:y+h, x:x+w] = 255
            pts = cv2.goodFeaturesToTrack(prev_gray, mask=mask, **feature_params)
            mean_flow_mag = 0.0
            if pts is not None and len(pts) > 0:
                # calc pyrLK for these points
                next_pts, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, gray, pts, None, **lk_params)
                good_old = pts[status.flatten()==1].reshape(-1,2)
                good_new = next_pts[status.flatten()==1].reshape(-1,2)
                if len(good_old) > 0:
                    deltas = good_new - good_old
                    mags = np.linalg.norm(deltas, axis=1)
                    mean_flow_mag = float(np.mean(mags))
                else:
                    mean_flow_mag = 0.0
            else:
                mean_flow_mag = 0.0

            candidate_infos.append({
                "bbox": (x,y,w,h),
                "area": area,
                "local_mean": local_mean,
                "bright_delta": bright_delta_local,
                "mean_flow_mag": mean_flow_mag
            })

        # Match/update existing events with detected boxes
        matched_event_idxs = set()
        for info in candidate_infos:
            x,y,w,h = info["bbox"]
            matched = False
            for ei, ev in enumerate(events):
                iou = box_iou((x,y,w,h), ev["bbox"])
                if iou > IOU_MATCH_THRESH:
                    # update event
                    ev["bbox"] = (
                        min(ev["bbox"][0], x),
                        min(ev["bbox"][1], y),
                        max(ev["bbox"][0]+ev["bbox"][2], x+w) - min(ev["bbox"][0], x),
                        max(ev["bbox"][1]+ev["bbox"][3], y+h) - min(ev["bbox"][1], y)
                    )
                    ev["last_seen_frame"] = frame_idx
                    # reset decay if motion present
                    ev["decay"] = 0
                    matched = True
                    matched_event_idxs.add(ei)
                    # If event not yet active (confirming), evaluate confirmation logic below
                    break
            if not matched:
                # new candidate: check multi-criteria (area already > min_area)
                cond_bright = info["bright_delta"] >= BRIGHT_DELTA
                cond_flow = info["mean_flow_mag"] >= MIN_FLOW_MAG
                # Candidate: if brightness spike AND flow strong => start / increment candidate
                if cond_bright and cond_flow:
                    # create a new temporary event as 'candidate'
                    ev = {
                        "id": next_event_id,
                        "bbox": info["bbox"],
                        "active": False,               # becomes True after confirmation frames
                        "frames_confirmed": 1,
                        "decay": 0,
                        "last_seen_frame": frame_idx
                    }
                    events.append(ev)
                    next_event_id += 1
                else:
                    # Not meeting both criteria: do not create a candidate.
                    # This avoids counting small random sparks or reflections.
                    pass

        # Now update confirmation for candidate events and handle activation / decay
        for ev in events:
            # compute motion & flow inside current ev bbox if present in this frame
            x,y,w,h = ev["bbox"]
            x,y,w,h = clamp_bbox_to_frame(int(x),int(y),int(w),int(h),W,H)
            roi_motion = motion_map[y:y+h, x:x+w]
            motion_pixels = int(np.sum(roi_motion)/255)

            # sample points inside bbox to compute flow mag
            pts = None
            mask = np.zeros_like(prev_gray)
            mask[y:y+h, x:x+w] = 255
            pts = cv2.goodFeaturesToTrack(prev_gray, mask=mask, **feature_params)
            mean_flow_mag = 0.0
            if pts is not None and len(pts) > 0:
                next_pts, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, gray, pts, None, **lk_params)
                good_old = pts[status.flatten()==1].reshape(-1,2)
                good_new = next_pts[status.flatten()==1].reshape(-1,2)
                if len(good_old)>0:
                    deltas = good_new - good_old
                    mags = np.linalg.norm(deltas, axis=1)
                    mean_flow_mag = float(np.mean(mags))
            else:
                mean_flow_mag = 0.0

            # Decide confirmation: if it was previously created as candidate, increase frames_confirmed when current metrics are still strong
            # To avoid re-checking brightness here we approximate by requiring motion_pixels and mean_flow_mag strong
            area_ok = (motion_pixels >= max(1, int(0.5 * min_area_pixels)))  # some motion inside bbox
            flow_ok = (mean_flow_mag >= MIN_FLOW_MAG * 0.7)
            if not ev["active"]:
                if area_ok and flow_ok:
                    ev["frames_confirmed"] += 1
                    if ev["frames_confirmed"] >= MIN_CONFIRM_FRAMES:
                        ev["active"] = True
                        confirmed_count += 1
                        # event becomes active; reset decay
                        ev["decay"] = 0
                else:
                    # decay candidate if not sustained
                    ev["frames_confirmed"] = max(0, ev["frames_confirmed"] - 1)
            else:
                # active event: if motion dies, increase decay; else reset decay
                if area_ok or flow_ok:
                    ev["decay"] = 0
                else:
                    ev["decay"] += 1

        # Remove old events that decayed fully or not seen for long time
        events = [ev for ev in events if ev["decay"] <= DECAY_FRAMES and (frame_idx - ev["last_seen_frame"]) <= (fps * 8)]

        # --- DRAWING: for each active event draw circle and arrows (arrows persist while event.decay <= DECAY_FRAMES)
        display = frame.copy()
        for ev in events:
            if not ev["active"]:
                continue
            x,y,w,h = ev["bbox"]
            x,y,w,h = clamp_bbox_to_frame(int(x),int(y),int(w),int(h),W,H)

            # Motion mask inside bbox & highlight circle
            roi_motion = motion_map[y:y+h, x:x+w]
            contours_roi_info = cv2.findContours(roi_motion, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            contours_roi = contours_roi_info[0] if len(contours_roi_info)==2 else contours_roi_info[1]
            # If ROI has contours, compute an enclosing circle on combined mask
            combined_mask = np.zeros((h, w), dtype=np.uint8)
            for c in contours_roi:
                if cv2.contourArea(c) > 20:
                    cv2.drawContours(combined_mask, [c], -1, 255, -1)
            if np.count_nonzero(combined_mask) > 0:
                ys, xs = np.where(combined_mask == 255)
                center_local = (int(np.mean(xs)), int(np.mean(ys)))
                # convert to frame coords
                center_global = (x + center_local[0], y + center_local[1])
                # radius: use max distance to edge of combined mask
                radius = max(10, int(0.5 * math.sqrt(w*w + h*h)))
                cv2.circle(display, center_global, radius, (0,0,255), 3)
            else:
                # fallback circle on bbox center
                cx, cy = x + w//2, y + h//2
                radius = max(10, int(0.5 * math.sqrt(w*w + h*h)))
                cv2.circle(display, (cx,cy), radius, (0,0,255), 2)

            # arrows: sample features inside bbox and compute pyrLK for direction
            mask = np.zeros_like(prev_gray)
            mask[y:y+h, x:x+w] = 255
            pts = cv2.goodFeaturesToTrack(prev_gray, mask=mask, maxCorners=MAX_FEATURES_PER_BOX,
                                          qualityLevel=FEATURE_QUALITY, minDistance=FEATURE_MIN_DIST, blockSize=7)
            if pts is not None and len(pts) > 0:
                next_pts, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, gray, pts, None, **lk_params)
                good_old = pts[status.flatten()==1].reshape(-1,2)
                good_new = next_pts[status.flatten()==1].reshape(-1,2)
                for (ox, oy), (nxp, nyp) in zip(good_old, good_new):
                    ox_i, oy_i = int(ox), int(oy)
                    nx_i, ny_i = int(nxp), int(nyp)
                    dx, dy = nx_i - ox_i, ny_i - oy_i
                    if abs(dx) + abs(dy) > 1:
                        cv2.arrowedLine(display, (ox_i, oy_i), (nx_i, ny_i), (0,255,255), 1, tipLength=0.35)

        # overlay explosion count live
        cv2.putText(display, f"Total Explosions: {confirmed_count}", (12, H-18),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,0), 2)

        out.write(display)

        # update rolling brightness and prev frame
        bg_brightness_hist.append(float(np.mean(gray)))
        prev_gray = gray.copy()

        # optional progress log
        if frame_idx % 50 == 0:
            elapsed = time.time() - start_time
            print(f"[{frame_idx}/{total_frames if total_frames>0 else '?'}] confirmed={confirmed_count} elapsed={elapsed:.1f}s")

    cap.release()
    out.release()
    print("Finished. Total confirmed explosions:", confirmed_count)
    print("Output saved to:", OUTPUT_VIDEO)

if __name__ == "__main__":
    detect_explosions_stable()


[50/548] confirmed=0 elapsed=0.6s
[100/548] confirmed=12 elapsed=17.5s
[150/548] confirmed=15 elapsed=38.7s
[200/548] confirmed=22 elapsed=67.1s
[250/548] confirmed=25 elapsed=99.9s
[300/548] confirmed=38 elapsed=122.8s
[350/548] confirmed=47 elapsed=157.4s
[400/548] confirmed=47 elapsed=191.9s
[450/548] confirmed=47 elapsed=215.1s
[500/548] confirmed=48 elapsed=215.8s
Finished. Total confirmed explosions: 48
Output saved to: Explosion_Output-2.mp4


In [None]:
import cv2
import numpy as np

def detect_explosions():
    video_path = "Fire-Cracker.mp4"       # Input
    output_path = "Explosion_Output-3.mp4"  # Output

    cap = cv2.VideoCapture(video_path)

    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    ret, prev_frame = cap.read()
    if not ret:
        print("❌ Error reading video.")
        return

    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

    explosion_detected = False
    explosion_count = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        diff = cv2.absdiff(prev_gray, gray)
        _, thresh = cv2.threshold(diff, 50, 255, cv2.THRESH_BINARY)

        # Find contours of motion regions
        contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

        if contours:
            largest = max(contours, key=cv2.contourArea)
            area = cv2.contourArea(largest)

            if area > 5000:  # big enough = explosion
                if not explosion_detected:
                    explosion_detected = True
                    explosion_count += 1
                    print(f"💥 Explosion detected at frame {int(cap.get(cv2.CAP_PROP_POS_FRAMES))}")

                # Draw circle around explosion
                (x_c, y_c), radius = cv2.minEnclosingCircle(largest)
                center = (int(x_c), int(y_c))
                cv2.circle(frame, center, int(radius), (0,0,255), 4)

                # Compute optical flow for sparks/fire direction
                flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None,
                                                    0.5, 3, 15, 3, 5, 1.2, 0)
                step = 20
                h, w = gray.shape
                y, x = np.mgrid[step//2:h:step, step//2:w:step].reshape(2,-1).astype(int)
                fx, fy = flow[y, x].T

                for (x1, y1, dx, dy) in zip(x, y, fx, fy):
                    if dx**2 + dy**2 > 4:  # only strong motion
                        cv2.arrowedLine(frame, (x1, y1),
                                        (int(x1+dx), int(y1+dy)),
                                        (0,255,0), 2, tipLength=0.5)

        prev_gray = gray.copy()

        # Overlay total explosions
        cv2.putText(frame, f"Total Explosions: {explosion_count}", (30, height - 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255,255,0), 2)

        out.write(frame)

    cap.release()
    out.release()

    print(f"✅ Done. Final Explosions Counted: {explosion_count}")
    print(f"💾 Output saved as {output_path}")


# Run it
detect_explosions()


In [16]:
import cv2
import numpy as np

# Input/output paths
input_path = "Fire-Cracker.mp4"
output_path = "explosion_detected-4.mp4"

# Video setup
cap = cv2.VideoCapture(input_path)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, int(cap.get(cv2.CAP_PROP_FPS)),
                      (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))

# Background subtractor
fgbg = cv2.createBackgroundSubtractorMOG2(history=200, varThreshold=100, detectShadows=False)

# For optical flow
ret, prev_frame = cap.read()
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

explosion_detected = False
explosion_count = 0
frames_since_explosion = 0
arrow_persistence = 15  # keep arrows for these many frames after sparks die

while True:
    ret, frame = cap.read()
    if not ret:
        break

    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # Foreground mask for explosion regions
    fgmask = fgbg.apply(gray)
    fgmask = cv2.medianBlur(fgmask, 5)
    fgmask = cv2.threshold(fgmask, 200, 255, cv2.THRESH_BINARY)[1]

    contours, _ = cv2.findContours(fgmask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    current_explosion = False
    for contour in contours:
        if cv2.contourArea(contour) > 600:  # adjust threshold
            x, y, w, h = cv2.boundingRect(contour)
            center = (x + w // 2, y + h // 2)
            cv2.circle(frame, center, max(w, h) // 2, (0, 0, 255), 2)

            current_explosion = True

    # Explosion event logic
    if current_explosion and not explosion_detected:
        explosion_detected = True
        explosion_count += 1
        frames_since_explosion = 0
    elif not current_explosion and explosion_detected:
        frames_since_explosion += 1
        if frames_since_explosion > arrow_persistence:
            explosion_detected = False

    # Optical flow arrows (for sparks/fire direction)
    flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None,
                                        0.5, 3, 15, 3, 5, 1.2, 0)
    step = 20
    h, w = gray.shape
    y, x = np.mgrid[step//2:h:step, step//2:w:step].astype(int)
    fx, fy = flow[y, x].T

    if explosion_detected:
        for (x1, y1, dx, dy) in zip(x, y, fx, fy):
            if dx*dx + dy*dy > 1:  # only strong motions
                cv2.arrowedLine(frame, (x1, y1), (int(x1+dx), int(y1+dy)),
                                (0, 255, 0), 2, tipLength=0.4)

    prev_gray = gray

    # Label
    cv2.putText(frame, f"Explosions Detected: {explosion_count}",
                (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

    out.write(frame)

cap.release()
out.release()
print(f"Processing complete. Output saved as {output_path}")


ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()

DEEP

In [13]:
import cv2
import numpy as np
from collections import deque
import os

class ExplosionDetector:
    def __init__(self):
        # Parameters for explosion detection
        self.brightness_threshold = 200
        self.area_threshold = 50
        self.frame_history = 10
        self.motion_threshold = 5
        
        # Store previous frames for comparison
        self.prev_frames = deque(maxlen=self.frame_history)
        self.prev_gray = None
        
        # Explosion tracking
        self.explosion_count = 0
        self.explosion_regions = []
        
    def detect_explosion(self, frame):
        # Convert to grayscale
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # Store current frame
        if self.prev_gray is not None:
            self.prev_frames.append(self.prev_gray)
        self.prev_gray = gray
        
        if len(self.prev_frames) < self.frame_history:
            return frame, False, None
        
        # Calculate brightness changes
        brightness_changes = []
        for prev_frame in self.prev_frames:
            diff = cv2.absdiff(gray, prev_frame)
            brightness_changes.append(diff)
        
        if not brightness_changes:
            return frame, False, None
            
        # Average the brightness changes
        avg_change = np.mean(brightness_changes, axis=0).astype(np.uint8)
        
        # Apply Gaussian blur to reduce noise
        avg_change = cv2.GaussianBlur(avg_change, (5, 5), 0)
        
        # Threshold to find bright areas
        _, bright_mask = cv2.threshold(avg_change, self.brightness_threshold, 255, cv2.THRESH_BINARY)
        
        # Morphological operations to clean up the mask
        kernel = np.ones((5, 5), np.uint8)
        bright_mask = cv2.morphologyEx(bright_mask, cv2.MORPH_CLOSE, kernel)
        bright_mask = cv2.morphologyEx(bright_mask, cv2.MORPH_OPEN, kernel)
        
        # Find contours in the bright areas
        contours, _ = cv2.findContours(bright_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        explosion_detected = False
        explosion_center = None
        
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > self.area_threshold:
                explosion_detected = True
                
                # Calculate center of explosion
                M = cv2.moments(contour)
                if M["m00"] != 0:
                    cX = int(M["m10"] / M["m00"])
                    cY = int(M["m01"] / M["m00"])
                    explosion_center = (cX, cY)
                    
                    # Draw contour and center
                    cv2.drawContours(frame, [contour], -1, (0, 0, 255), 2)
                    cv2.circle(frame, explosion_center, 5, (0, 255, 0), -1)
                    
                    # Store explosion region
                    self.explosion_regions.append({
                        'center': explosion_center,
                        'contour': contour,
                        'frame_count': 0
                    })
        
        return frame, explosion_detected, explosion_center
    
    def analyze_motion_direction(self, frame, explosion_center):
        if explosion_center is None or len(self.prev_frames) < 2:
            return frame
            
        # Calculate optical flow for motion direction
        prev_frame = self.prev_frames[-1]
        current_frame = self.prev_gray
        
        # Use Lucas-Kanade method for optical flow
        feature_params = dict(maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
        p0 = cv2.goodFeaturesToTrack(prev_frame, mask=None, **feature_params)
        
        if p0 is not None:
            p1, st, err = cv2.calcOpticalFlowPyrLK(prev_frame, current_frame, p0, None)
            
            if p1 is not None:
                # Select good points
                good_new = p1[st == 1]
                good_old = p0[st == 1]
                
                # Draw motion vectors
                for i, (new, old) in enumerate(zip(good_new, good_old)):
                    a, b = new.ravel()
                    c, d = old.ravel()
                    
                    # Only draw vectors near explosion center
                    dist_to_explosion = np.sqrt((a - explosion_center[0])**2 + (b - explosion_center[1])**2)
                    if dist_to_explosion < 100:  # Only within 100 pixels of explosion
                        motion_vector = (a - c, b - d)
                        vector_magnitude = np.sqrt(motion_vector[0]**2 + motion_vector[1]**2)
                        
                        if vector_magnitude > self.motion_threshold:
                            # Draw arrow for direction
                            cv2.arrowedLine(frame, (int(c), int(d)), (int(a), int(b)), (255, 0, 0), 2)
        
        return frame
    
    def update_explosion_regions(self, frame):
        # Update and draw explosion regions
        for region in self.explosion_regions:
            region['frame_count'] += 1
            
            # Draw expanding circle to represent ongoing explosion
            radius = min(50, region['frame_count'] * 5)
            cv2.circle(frame, region['center'], radius, (0, 255, 255), 2)
        
        # Remove old explosions
        self.explosion_regions = [r for r in self.explosion_regions if r['frame_count'] < 10]
        
        return frame

def process_video(video_path, output_path="output_video.mp4"):
    # Check if video file exists
    if not os.path.exists(video_path):
        print(f"Error: Video file '{video_path}' not found.")
        return
    
    # Initialize detector
    detector = ExplosionDetector()
    
    # Open video
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Could not open video.")
        return
    
    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    # Define codec and create VideoWriter
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    frame_count = 0
    explosion_detected_in_video = False
    
    print("Processing video...")
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Detect explosion
        processed_frame, explosion_detected, explosion_center = detector.detect_explosion(frame)
        
        if explosion_detected:
            explosion_detected_in_video = True
            detector.explosion_count += 1
            
            # Analyze motion direction
            processed_frame = detector.analyze_motion_direction(processed_frame, explosion_center)
            
            # Add explosion count text
            cv2.putText(processed_frame, f"Explosion #{detector.explosion_count}", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
        
        # Update and draw explosion regions
        processed_frame = detector.update_explosion_regions(processed_frame)
        
        # Display total explosion count
        cv2.putText(processed_frame, f"Total Explosions: {detector.explosion_count}", (10, 70), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        
        # Display frame number
        cv2.putText(processed_frame, f"Frame: {frame_count}", (10, height - 20), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
        
        # Write to output video
        out.write(processed_frame)
        
        frame_count += 1
        if frame_count % 30 == 0:  # Print progress every 30 frames
            print(f"Processed {frame_count} frames...")
    
    # Release everything
    cap.release()
    out.release()
    
    print(f"Analysis complete. Total explosions detected: {detector.explosion_count}")
    print(f"Processed video saved to: {output_path}")
    
    if not explosion_detected_in_video:
        print("No explosions detected in the video.")
        
    return detector.explosion_count

if __name__ == "__main__":
    # Process the Fire-Cracker.mp4 video
    video_file = "Fire-Cracker.mp4"
    output_file = "firecracker_explosion_analysis.mp4"
    
    explosion_count = process_video(video_file, output_file)
    
    print(f"\nDone! Open '{output_file}' to see the explosion analysis.")
    print(f"Total explosions detected: {explosion_count}")

Processing video...
Processed 30 frames...
Processed 60 frames...
Processed 90 frames...
Processed 120 frames...
Processed 150 frames...
Processed 180 frames...
Processed 210 frames...
Processed 240 frames...
Processed 270 frames...
Processed 300 frames...
Processed 330 frames...
Processed 360 frames...
Processed 390 frames...
Processed 420 frames...
Processed 450 frames...
Processed 480 frames...
Processed 510 frames...
Processed 540 frames...
Analysis complete. Total explosions detected: 10
Processed video saved to: firecracker_explosion_analysis.mp4

Done! Open 'firecracker_explosion_analysis.mp4' to see the explosion analysis.
Total explosions detected: 10


In [14]:
import cv2
import numpy as np
from collections import deque
import os

class ImprovedExplosionDetector:
    def __init__(self):
        # Parameters for explosion detection
        self.brightness_threshold = 220  # Increased for better detection
        self.area_threshold = 100  # Increased to filter out small bright areas
        self.frame_history = 5
        self.motion_threshold = 3
        
        # Store previous frames for comparison
        self.prev_frames = deque(maxlen=self.frame_history)
        self.prev_gray = None
        
        # Explosion tracking
        self.explosion_count = 0
        self.explosion_regions = []
        self.explosion_cooldown = 0  # Prevents multiple counts for the same explosion
        
    def detect_explosion(self, frame):
        # Convert to grayscale
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # Store current frame
        if self.prev_gray is not None:
            self.prev_frames.append(self.prev_gray)
        self.prev_gray = gray
        
        if len(self.prev_frames) < self.frame_history:
            return frame, False, None
        
        # Calculate brightness changes
        brightness_changes = []
        for prev_frame in self.prev_frames:
            diff = cv2.absdiff(gray, prev_frame)
            brightness_changes.append(diff)
        
        if not brightness_changes:
            return frame, False, None
            
        # Average the brightness changes
        avg_change = np.mean(brightness_changes, axis=0).astype(np.uint8)
        
        # Apply Gaussian blur to reduce noise
        avg_change = cv2.GaussianBlur(avg_change, (9, 9), 0)
        
        # Threshold to find bright areas
        _, bright_mask = cv2.threshold(avg_change, self.brightness_threshold, 255, cv2.THRESH_BINARY)
        
        # Morphological operations to clean up the mask
        kernel = np.ones((7, 7), np.uint8)
        bright_mask = cv2.morphologyEx(bright_mask, cv2.MORPH_CLOSE, kernel)
        bright_mask = cv2.morphologyEx(bright_mask, cv2.MORPH_OPEN, kernel)
        
        # Find contours in the bright areas
        contours, _ = cv2.findContours(bright_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        explosion_detected = False
        explosion_center = None
        largest_contour = None
        max_area = 0
        
        # Find the largest bright area (most likely the explosion)
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > max_area:
                max_area = area
                largest_contour = contour
        
        # Check if the largest area meets our threshold
        if largest_contour is not None and max_area > self.area_threshold:
            explosion_detected = True
            
            # Calculate center of explosion
            M = cv2.moments(largest_contour)
            if M["m00"] != 0:
                cX = int(M["m10"] / M["m00"])
                cY = int(M["m01"] / M["m00"])
                explosion_center = (cX, cY)
                
                # Draw contour and center
                cv2.drawContours(frame, [largest_contour], -1, (0, 0, 255), 2)
                cv2.circle(frame, explosion_center, 5, (0, 255, 0), -1)
                
                # Store explosion region (only one per frame)
                if self.explosion_cooldown <= 0:
                    self.explosion_regions.append({
                        'center': explosion_center,
                        'contour': largest_contour,
                        'frame_count': 0
                    })
                    self.explosion_cooldown = 15  # Set cooldown to prevent multiple counts
        
        # Decrease cooldown counter
        if self.explosion_cooldown > 0:
            self.explosion_cooldown -= 1
        
        return frame, explosion_detected, explosion_center
    
    def analyze_motion_direction(self, frame, explosion_center):
        if explosion_center is None or len(self.prev_frames) < 2:
            return frame
            
        # Calculate optical flow for motion direction
        prev_frame = self.prev_frames[-1]
        current_frame = self.prev_gray
        
        # Create a mask around the explosion area
        mask = np.zeros_like(current_frame)
        x, y = explosion_center
        cv2.circle(mask, (x, y), 100, 255, -1)  # Create a circular mask around explosion
        
        # Use Lucas-Kanade method for optical flow
        feature_params = dict(maxCorners=200, qualityLevel=0.1, minDistance=5, blockSize=7)
        p0 = cv2.goodFeaturesToTrack(prev_frame, mask=mask, **feature_params)
        
        if p0 is not None:
            p1, st, err = cv2.calcOpticalFlowPyrLK(prev_frame, current_frame, p0, None)
            
            if p1 is not None:
                # Select good points
                good_new = p1[st == 1]
                good_old = p0[st == 1]
                
                # Draw motion vectors
                for i, (new, old) in enumerate(zip(good_new, good_old)):
                    a, b = new.ravel()
                    c, d = old.ravel()
                    
                    # Calculate motion vector
                    motion_vector = (a - c, b - d)
                    vector_magnitude = np.sqrt(motion_vector[0]**2 + motion_vector[1]**2)
                    
                    # Only draw significant movements
                    if vector_magnitude > self.motion_threshold:
                        # Draw arrow for direction
                        cv2.arrowedLine(frame, (int(c), int(d)), (int(a), int(b)), (255, 0, 0), 2, tipLength=0.3)
        
        return frame
    
    def update_explosion_regions(self, frame):
        # Update and draw only the most recent explosion region
        if self.explosion_regions:
            region = self.explosion_regions[-1]  # Only keep the most recent explosion
            region['frame_count'] += 1
            
            # Draw expanding circle to represent ongoing explosion
            radius = min(80, region['frame_count'] * 4)
            cv2.circle(frame, region['center'], radius, (0, 255, 255), 2)
            
            # Remove if it's been too long
            if region['frame_count'] > 20:
                self.explosion_regions.pop()
        
        return frame

def process_video(video_path, output_path="output_video.mp4"):
    # Check if video file exists
    if not os.path.exists(video_path):
        print(f"Error: Video file '{video_path}' not found.")
        return 0
    
    # Initialize detector
    detector = ImprovedExplosionDetector()
    
    # Open video
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Could not open video.")
        return 0
    
    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    # Define codec and create VideoWriter
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    frame_count = 0
    explosion_detected_in_video = False
    
    print("Processing video...")
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Detect explosion
        processed_frame, explosion_detected, explosion_center = detector.detect_explosion(frame)
        
        if explosion_detected and detector.explosion_cooldown == 14:  # Count only once per explosion
            explosion_detected_in_video = True
            detector.explosion_count += 1
            
            # Analyze motion direction
            processed_frame = detector.analyze_motion_direction(processed_frame, explosion_center)
            
            # Add explosion count text
            cv2.putText(processed_frame, f"Explosion #{detector.explosion_count}", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
        
        # Update and draw explosion regions
        processed_frame = detector.update_explosion_regions(processed_frame)
        
        # Display total explosion count
        cv2.putText(processed_frame, f"Total Explosions: {detector.explosion_count}", (10, 70), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        
        # Display frame number
        cv2.putText(processed_frame, f"Frame: {frame_count}", (10, height - 20), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
        
        # Write to output video
        out.write(processed_frame)
        
        frame_count += 1
        if frame_count % 30 == 0:  # Print progress every 30 frames
            print(f"Processed {frame_count} frames...")
    
    # Release everything
    cap.release()
    out.release()
    
    print(f"Analysis complete. Total explosions detected: {detector.explosion_count}")
    print(f"Processed video saved to: {output_path}")
    
    if not explosion_detected_in_video:
        print("No explosions detected in the video.")
        
    return detector.explosion_count

if __name__ == "__main__":
    # Process the Fire-Cracker.mp4 video
    video_file = "Fire-Cracker.mp4"
    output_file = "improved_explosion_analysis.mp4"
    
    explosion_count = process_video(video_file, output_file)
    
    print(f"\nDone! Open '{output_file}' to see the explosion analysis.")
    print(f"Total explosions detected: {explosion_count}")

Processing video...
Processed 30 frames...
Processed 60 frames...
Processed 90 frames...
Processed 120 frames...
Processed 150 frames...
Processed 180 frames...
Processed 210 frames...
Processed 240 frames...
Processed 270 frames...
Processed 300 frames...
Processed 330 frames...
Processed 360 frames...
Processed 390 frames...
Processed 420 frames...
Processed 450 frames...
Processed 480 frames...
Processed 510 frames...
Processed 540 frames...
Analysis complete. Total explosions detected: 2
Processed video saved to: improved_explosion_analysis.mp4

Done! Open 'improved_explosion_analysis.mp4' to see the explosion analysis.
Total explosions detected: 2


In [15]:
import cv2
import numpy as np
from collections import deque
import os

class ImprovedExplosionDetector:
    def __init__(self):
        # Parameters for explosion detection
        self.brightness_threshold = 220  # Increased for better detection
        self.area_threshold = 100  # Increased to filter out small bright areas
        self.frame_history = 5
        self.motion_threshold = 3
        
        # Store previous frames for comparison
        self.prev_frames = deque(maxlen=self.frame_history)
        self.prev_gray = None
        
        # Explosion tracking
        self.explosion_count = 0
        self.explosion_regions = []
        self.explosion_cooldown = 0  # Prevents multiple counts for the same explosion
        
    def detect_explosion(self, frame):
        # Convert to grayscale
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # Store current frame
        if self.prev_gray is not None:
            self.prev_frames.append(self.prev_gray)
        self.prev_gray = gray
        
        if len(self.prev_frames) < self.frame_history:
            return frame, False, None
        
        # Calculate brightness changes
        brightness_changes = []
        for prev_frame in self.prev_frames:
            diff = cv2.absdiff(gray, prev_frame)
            brightness_changes.append(diff)
        
        if not brightness_changes:
            return frame, False, None
            
        # Average the brightness changes
        avg_change = np.mean(brightness_changes, axis=0).astype(np.uint8)
        
        # Apply Gaussian blur to reduce noise
        avg_change = cv2.GaussianBlur(avg_change, (9, 9), 0)
        
        # Threshold to find bright areas
        _, bright_mask = cv2.threshold(avg_change, self.brightness_threshold, 255, cv2.THRESH_BINARY)
        
        # Morphological operations to clean up the mask
        kernel = np.ones((7, 7), np.uint8)
        bright_mask = cv2.morphologyEx(bright_mask, cv2.MORPH_CLOSE, kernel)
        bright_mask = cv2.morphologyEx(bright_mask, cv2.MORPH_OPEN, kernel)
        
        # Find contours in the bright areas
        contours, _ = cv2.findContours(bright_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        explosion_detected = False
        explosion_center = None
        largest_contour = None
        max_area = 0
        
        # Find the largest bright area (most likely the explosion)
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > max_area:
                max_area = area
                largest_contour = contour
        
        # Check if the largest area meets our threshold
        if largest_contour is not None and max_area > self.area_threshold:
            explosion_detected = True
            
            # Calculate center of explosion
            M = cv2.moments(largest_contour)
            if M["m00"] != 0:
                cX = int(M["m10"] / M["m00"])
                cY = int(M["m01"] / M["m00"])
                explosion_center = (cX, cY)
                
                # Draw contour and center
                cv2.drawContours(frame, [largest_contour], -1, (0, 0, 255), 2)
                cv2.circle(frame, explosion_center, 5, (0, 255, 0), -1)
                
                # Store explosion region (only one per frame)
                if self.explosion_cooldown <= 0:
                    self.explosion_regions.append({
                        'center': explosion_center,
                        'contour': largest_contour,
                        'frame_count': 0
                    })
                    self.explosion_cooldown = 15  # Set cooldown to prevent multiple counts
        
        # Decrease cooldown counter
        if self.explosion_cooldown > 0:
            self.explosion_cooldown -= 1
        
        return frame, explosion_detected, explosion_center
    
    def analyze_motion_direction(self, frame, explosion_center):
        if explosion_center is None or len(self.prev_frames) < 2:
            return frame
            
        # Calculate optical flow for motion direction
        prev_frame = self.prev_frames[-1]
        current_frame = self.prev_gray
        
        # Create a mask around the explosion area
        mask = np.zeros_like(current_frame)
        x, y = explosion_center
        cv2.circle(mask, (x, y), 100, 255, -1)  # Create a circular mask around explosion
        
        # Use Lucas-Kanade method for optical flow
        feature_params = dict(maxCorners=200, qualityLevel=0.1, minDistance=5, blockSize=7)
        p0 = cv2.goodFeaturesToTrack(prev_frame, mask=mask, **feature_params)
        
        if p0 is not None:
            p1, st, err = cv2.calcOpticalFlowPyrLK(prev_frame, current_frame, p0, None)
            
            if p1 is not None:
                # Select good points
                good_new = p1[st == 1]
                good_old = p0[st == 1]
                
                # Draw motion vectors
                for i, (new, old) in enumerate(zip(good_new, good_old)):
                    a, b = new.ravel()
                    c, d = old.ravel()
                    
                    # Calculate motion vector
                    motion_vector = (a - c, b - d)
                    vector_magnitude = np.sqrt(motion_vector[0]**2 + motion_vector[1]**2)
                    
                    # Only draw significant movements
                    if vector_magnitude > self.motion_threshold:
                        # Draw arrow for direction
                        cv2.arrowedLine(frame, (int(c), int(d)), (int(a), int(b)), (255, 0, 0), 2, tipLength=0.3)
        
        return frame
    
    def update_explosion_regions(self, frame):
        # Update and draw only the most recent explosion region
        if self.explosion_regions:
            region = self.explosion_regions[-1]  # Only keep the most recent explosion
            region['frame_count'] += 1
            
            # Draw expanding circle to represent ongoing explosion
            radius = min(80, region['frame_count'] * 4)
            cv2.circle(frame, region['center'], radius, (0, 255, 255), 2)
            
            # Remove if it's been too long
            if region['frame_count'] > 20:
                self.explosion_regions.pop()
        
        return frame

def process_video(video_path, output_path="output_video.mp4"):
    # Check if video file exists
    if not os.path.exists(video_path):
        print(f"Error: Video file '{video_path}' not found.")
        return 0
    
    # Initialize detector
    detector = ImprovedExplosionDetector()
    
    # Open video
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Could not open video.")
        return 0
    
    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    # Define codec and create VideoWriter
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    frame_count = 0
    explosion_detected_in_video = False
    
    print("Processing video...")
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Detect explosion
        processed_frame, explosion_detected, explosion_center = detector.detect_explosion(frame)
        
        if explosion_detected and detector.explosion_cooldown == 14:  # Count only once per explosion
            explosion_detected_in_video = True
            detector.explosion_count += 1
            
            # Analyze motion direction
            processed_frame = detector.analyze_motion_direction(processed_frame, explosion_center)
            
            # Add explosion count text
            cv2.putText(processed_frame, f"Explosion #{detector.explosion_count}", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
        
        # Update and draw explosion regions
        processed_frame = detector.update_explosion_regions(processed_frame)
        
        # Display total explosion count
        cv2.putText(processed_frame, f"Total Explosions: {detector.explosion_count}", (10, 70), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        
        # Display frame number
        cv2.putText(processed_frame, f"Frame: {frame_count}", (10, height - 20), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
        
        # Write to output video
        out.write(processed_frame)
        
        frame_count += 1
        if frame_count % 30 == 0:  # Print progress every 30 frames
            print(f"Processed {frame_count} frames...")
    
    # Release everything
    cap.release()
    out.release()
    
    print(f"Analysis complete. Total explosions detected: {detector.explosion_count}")
    print(f"Processed video saved to: {output_path}")
    
    if not explosion_detected_in_video:
        print("No explosions detected in the video.")
        
    return detector.explosion_count

if __name__ == "__main__":
    # Process the Fire-Cracker.mp4 video
    video_file = "Fire-Cracker.mp4"
    output_file = "improved_explosion_analysis-1.mp4"
    
    explosion_count = process_video(video_file, output_file)
    
    print(f"\nDone! Open '{output_file}' to see the explosion analysis.")
    print(f"Total explosions detected: {explosion_count}")

Processing video...
Processed 30 frames...
Processed 60 frames...
Processed 90 frames...
Processed 120 frames...
Processed 150 frames...
Processed 180 frames...
Processed 210 frames...
Processed 240 frames...
Processed 270 frames...
Processed 300 frames...
Processed 330 frames...
Processed 360 frames...
Processed 390 frames...
Processed 420 frames...
Processed 450 frames...
Processed 480 frames...
Processed 510 frames...
Processed 540 frames...
Analysis complete. Total explosions detected: 2
Processed video saved to: improved_explosion_analysis-1.mp4

Done! Open 'improved_explosion_analysis-1.mp4' to see the explosion analysis.
Total explosions detected: 2


In [1]:
import cv2
import numpy as np
import os

class ExplosionAnalyzer:
    def __init__(self):
        # Detection parameters
        self.brightness_threshold = 220
        self.area_threshold = 150
        self.motion_threshold = 1.5
        
        # State variables
        self.prev_frame = None
        self.prev_gray = None
        self.explosion_count = 0
        self.explosion_active = False
        self.explosion_cooldown = 0
        self.explosion_center = None
        self.explosion_lifetime = 0
        
    def detect_explosion(self, frame):
        # Convert to grayscale
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        if self.prev_gray is None:
            self.prev_gray = gray
            self.prev_frame = frame.copy()
            return frame, False, None
        
        # Calculate absolute difference between frames
        frame_diff = cv2.absdiff(gray, self.prev_gray)
        
        # Apply threshold to find significant changes
        _, thresh = cv2.threshold(frame_diff, self.brightness_threshold, 255, cv2.THRESH_BINARY)
        
        # Apply morphological operations to clean up the mask
        kernel = np.ones((9, 9), np.uint8)
        thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
        thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
        
        # Find contours in the thresholded image
        contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        explosion_detected = False
        explosion_center = None
        max_area = 0
        largest_contour = None
        
        # Find the largest bright area
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > max_area:
                max_area = area
                largest_contour = contour
        
        # Check if the largest area meets our threshold
        if largest_contour is not None and max_area > self.area_threshold:
            explosion_detected = True
            
            # Calculate center of explosion
            M = cv2.moments(largest_contour)
            if M["m00"] != 0:
                cX = int(M["m10"] / M["m00"])
                cY = int(M["m01"] / M["m00"])
                explosion_center = (cX, cY)
                
                # Draw contour and center
                cv2.drawContours(frame, [largest_contour], -1, (0, 0, 255), 3)
                cv2.circle(frame, explosion_center, 8, (0, 255, 0), -1)
        
        # Update previous frame
        self.prev_gray = gray
        self.prev_frame = frame.copy()
        
        return frame, explosion_detected, explosion_center
    
    def analyze_explosion_dynamics(self, frame, explosion_center):
        if explosion_center is None or self.prev_frame is None:
            return frame
        
        # Convert frames to grayscale
        prev_gray = cv2.cvtColor(self.prev_frame, cv2.COLOR_BGR2GRAY)
        curr_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # Calculate dense optical flow using Farneback method
        flow = cv2.calcOpticalFlowFarneback(prev_gray, curr_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        
        # Create a grid for motion vectors
        h, w = flow.shape[:2]
        y, x = explosion_center
        
        # Define the region around the explosion to analyze
        y_start = max(0, y - 100)
        y_end = min(h, y + 100)
        x_start = max(0, x - 100)
        x_end = min(w, x + 100)
        
        step = 12  # Sample every 12 pixels
        
        # Draw motion vectors in the explosion region
        for i in range(y_start, y_end, step):
            for j in range(x_start, x_end, step):
                dx, dy = flow[i, j]
                magnitude = np.sqrt(dx**2 + dy**2)
                
                # Only draw significant movements
                if magnitude > self.motion_threshold:
                    # Draw arrow for direction
                    end_point = (int(j + dx*3), int(i + dy*3))  # Scale for visibility
                    cv2.arrowedLine(frame, (j, i), end_point, (255, 0, 0), 2, tipLength=0.4)
        
        return frame
    
    def update_explosion_state(self, explosion_detected, explosion_center):
        # Handle explosion cooldown
        if self.explosion_cooldown > 0:
            self.explosion_cooldown -= 1
        
        # Detect new explosion
        if explosion_detected and not self.explosion_active and self.explosion_cooldown <= 0:
            self.explosion_active = True
            self.explosion_count += 1
            self.explosion_center = explosion_center
            self.explosion_lifetime = 0
            self.explosion_cooldown = 25  # Prevent multiple detections
        
        # Update active explosion
        if self.explosion_active:
            self.explosion_lifetime += 1
            if self.explosion_lifetime > 25:  # Explosion lasts for 25 frames
                self.explosion_active = False

def process_video(input_path, output_path="explosion_analysis_output.mp4"):
    """
    Process a video to detect explosions, highlight them, show direction arrows, and count explosions.
    
    Args:
        input_path (str): Path to the input video file
        output_path (str): Path to save the output video with analysis
    """
    
    # Verify input file exists
    if not os.path.exists(input_path):
        print(f"Error: Input video '{input_path}' not found.")
        return 0
    
    # Initialize explosion analyzer
    analyzer = ExplosionAnalyzer()
    
    # Open video
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        print("Error: Could not open video.")
        return 0
    
    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    # Define codec and create VideoWriter
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    frame_count = 0
    
    print("Processing video for explosion detection...")
    print("Press 'q' to stop processing early if running in a compatible environment.")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Detect explosion in current frame
        processed_frame, explosion_detected, explosion_center = analyzer.detect_explosion(frame)
        
        # Update explosion state and count
        analyzer.update_explosion_state(explosion_detected, explosion_center)
        
        # If explosion is active, analyze and visualize
        if analyzer.explosion_active:
            # Analyze explosion dynamics (sparks, fire direction)
            processed_frame = analyzer.analyze_explosion_dynamics(processed_frame, analyzer.explosion_center)
            
            # Draw expanding circle to represent ongoing explosion
            radius = min(100, analyzer.explosion_lifetime * 5)
            cv2.circle(processed_frame, analyzer.explosion_center, radius, (0, 255, 255), 3)
            
            # Add explosion count text
            cv2.putText(processed_frame, f"EXPLOSION #{analyzer.explosion_count}", (20, 40), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)
        
        # Display total explosion count
        cv2.putText(processed_frame, f"Total Explosions: {analyzer.explosion_count}", (20, 90), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)
        
        # Display frame number
        cv2.putText(processed_frame, f"Frame: {frame_count}", (20, height - 20), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        
        # Write to output video
        out.write(processed_frame)
        
        frame_count += 1
        
        # Print progress every 50 frames
        if frame_count % 50 == 0:
            print(f"Processed {frame_count} frames...")
    
    # Release resources
    cap.release()
    out.release()
    
    print(f"Analysis complete. Total explosions detected: {analyzer.explosion_count}")
    print(f"Processed video saved to: {output_path}")
        
    return analyzer.explosion_count

if __name__ == "__main__":
    # Configuration
    INPUT_VIDEO = "Fire-Cracker.mp4"  # Your video file path
    OUTPUT_VIDEO = "Explosion_Analysis_Result-1.mp4"  # Output video file
    
    # Process the video
    explosion_count = process_video(INPUT_VIDEO, OUTPUT_VIDEO)
    
    # Results summary
    print("\n" + "="*50)
    print("EXPLOSION ANALYSIS RESULTS")
    print("="*50)
    print(f"Input video: {INPUT_VIDEO}")
    print(f"Output video: {OUTPUT_VIDEO}")
    print(f"Total explosions detected: {explosion_count}")
    print("="*50)
    
    # Instructions for viewing
    print("\nINSTRUCTIONS:")
    print(f"1. Open '{OUTPUT_VIDEO}' to view the analysis results")
    print("2. Explosions are highlighted with:")
    print("   - Red contours: Explosion boundaries")
    print("   - Green dot: Epicenter of explosion")
    print("   - Blue arrows: Direction of sparks/fire")
    print("   - Yellow circle: Expanding shockwave")
    print("3. Explosion count is displayed at the top of the video")

Processing video for explosion detection...
Press 'q' to stop processing early if running in a compatible environment.
Processed 50 frames...
Processed 100 frames...
Processed 150 frames...
Processed 200 frames...
Processed 250 frames...
Processed 300 frames...
Processed 350 frames...
Processed 400 frames...
Processed 450 frames...
Processed 500 frames...
Analysis complete. Total explosions detected: 2
Processed video saved to: Explosion_Analysis_Result-1.mp4

EXPLOSION ANALYSIS RESULTS
Input video: Fire-Cracker.mp4
Output video: Explosion_Analysis_Result-1.mp4
Total explosions detected: 2

INSTRUCTIONS:
1. Open 'Explosion_Analysis_Result-1.mp4' to view the analysis results
2. Explosions are highlighted with:
   - Red contours: Explosion boundaries
   - Green dot: Epicenter of explosion
   - Blue arrows: Direction of sparks/fire
   - Yellow circle: Expanding shockwave
3. Explosion count is displayed at the top of the video


In [2]:
import cv2
import numpy as np
import os

class ExplosionAnalyzer:
    def __init__(self):
        # Detection parameters
        self.brightness_threshold = 210
        self.area_threshold = 100
        self.motion_threshold = 1.0
        
        # State variables
        self.prev_frame = None
        self.prev_gray = None
        self.prev_bright_mask = None
        self.explosion_count = 0
        self.explosion_active = False
        self.explosion_cooldown = 0
        self.explosion_center = None
        self.explosion_lifetime = 0
        self.explosion_regions = []
        
    def detect_explosion(self, frame):
        # Convert to grayscale
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        if self.prev_gray is None:
            self.prev_gray = gray
            self.prev_frame = frame.copy()
            return frame, False, None
        
        # Calculate absolute difference between frames
        frame_diff = cv2.absdiff(gray, self.prev_gray)
        
        # Apply threshold to find significant changes
        _, thresh = cv2.threshold(frame_diff, self.brightness_threshold, 255, cv2.THRESH_BINARY)
        
        # Apply morphological operations to clean up the mask
        kernel = np.ones((7, 7), np.uint8)
        thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
        thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
        
        # Find contours in the thresholded image
        contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        explosion_detected = False
        explosion_center = None
        max_area = 0
        largest_contour = None
        
        # Find the largest bright area
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > max_area:
                max_area = area
                largest_contour = contour
        
        # Check if the largest area meets our threshold
        if largest_contour is not None and max_area > self.area_threshold:
            explosion_detected = True
            
            # Calculate center of explosion
            M = cv2.moments(largest_contour)
            if M["m00"] != 0:
                cX = int(M["m10"] / M["m00"])
                cY = int(M["m01"] / M["m00"])
                explosion_center = (cX, cY)
                
                # Draw contour and center
                cv2.drawContours(frame, [largest_contour], -1, (0, 0, 255), 3)
                cv2.circle(frame, explosion_center, 8, (0, 255, 0), -1)
        
        # Update previous frame and bright mask
        self.prev_gray = gray
        self.prev_frame = frame.copy()
        self.prev_bright_mask = thresh
        
        return frame, explosion_detected, explosion_center
    
    def analyze_explosion_dynamics(self, frame, explosion_center):
        if explosion_center is None or self.prev_frame is None or self.prev_bright_mask is None:
            return frame
        
        # Convert current frame to grayscale
        curr_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # Create a bright mask for current frame
        _, curr_thresh = cv2.threshold(curr_gray, 200, 255, cv2.THRESH_BINARY)
        
        # Find contours in current bright areas
        curr_contours, _ = cv2.findContours(curr_thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        # Find contours in previous bright areas
        prev_contours, _ = cv2.findContours(self.prev_bright_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        # For each bright region in current frame, find direction from explosion center
        for contour in curr_contours:
            area = cv2.contourArea(contour)
            if area > 20:  # Only consider significant bright areas
                M = cv2.moments(contour)
                if M["m00"] != 0:
                    cX = int(M["m10"] / M["m00"])
                    cY = int(M["m01"] / M["m00"])
                    
                    # Calculate direction from explosion center to this bright spot
                    dir_x = cX - explosion_center[0]
                    dir_y = cY - explosion_center[1]
                    
                    # Normalize direction vector
                    magnitude = np.sqrt(dir_x**2 + dir_y**2)
                    if magnitude > 10:  # Only draw arrows for significant movement
                        # Scale for visibility
                        scale = 30 / magnitude if magnitude > 0 else 0
                        end_x = explosion_center[0] + int(dir_x * scale)
                        end_y = explosion_center[1] + int(dir_y * scale)
                        
                        # Draw arrow from explosion center to bright spot
                        cv2.arrowedLine(frame, explosion_center, (end_x, end_y), 
                                       (255, 0, 0), 2, tipLength=0.3)
        
        return frame
    
    def update_explosion_state(self, explosion_detected, explosion_center):
        # Handle explosion cooldown
        if self.explosion_cooldown > 0:
            self.explosion_cooldown -= 1
        
        # Detect new explosion
        if explosion_detected and not self.explosion_active and self.explosion_cooldown <= 0:
            self.explosion_active = True
            self.explosion_count += 1
            self.explosion_center = explosion_center
            self.explosion_lifetime = 0
            self.explosion_cooldown = 25  # Prevent multiple detections
            self.explosion_regions.append({
                'center': explosion_center,
                'lifetime': 0,
                'active': True
            })
        
        # Update active explosions
        for region in self.explosion_regions:
            if region['active']:
                region['lifetime'] += 1
                if region['lifetime'] > 30:  # Explosion lasts for 30 frames
                    region['active'] = False
        
        # Check if any explosion is still active
        self.explosion_active = any(region['active'] for region in self.explosion_regions)

def process_video(input_path, output_path="explosion_analysis_output.mp4"):
    """
    Process a video to detect explosions, highlight them, show direction arrows, and count explosions.
    
    Args:
        input_path (str): Path to the input video file
        output_path (str): Path to save the output video with analysis
    """
    
    # Verify input file exists
    if not os.path.exists(input_path):
        print(f"Error: Input video '{input_path}' not found.")
        return 0
    
    # Initialize explosion analyzer
    analyzer = ExplosionAnalyzer()
    
    # Open video
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        print("Error: Could not open video.")
        return 0
    
    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    # Define codec and create VideoWriter
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    frame_count = 0
    
    print("Processing video for explosion detection...")
    print("Analyzing explosion dynamics and direction of sparks...")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Create a copy for processing
        processed_frame = frame.copy()
        
        # Detect explosion in current frame
        processed_frame, explosion_detected, explosion_center = analyzer.detect_explosion(processed_frame)
        
        # Update explosion state and count
        analyzer.update_explosion_state(explosion_detected, explosion_center)
        
        # If explosion is active, analyze and visualize
        if analyzer.explosion_active:
            # Analyze explosion dynamics (sparks, fire direction)
            processed_frame = analyzer.analyze_explosion_dynamics(processed_frame, analyzer.explosion_center)
            
            # Draw expanding circles for all active explosions
            for region in analyzer.explosion_regions:
                if region['active']:
                    radius = min(120, region['lifetime'] * 5)
                    cv2.circle(processed_frame, region['center'], radius, (0, 255, 255), 2)
            
            # Add explosion count text
            cv2.putText(processed_frame, f"EXPLOSION #{analyzer.explosion_count}", (20, 40), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)
        
        # Display total explosion count
        cv2.putText(processed_frame, f"Total Explosions: {analyzer.explosion_count}", (20, 90), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)
        
        # Display frame number
        cv2.putText(processed_frame, f"Frame: {frame_count}", (20, height - 20), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        
        # Write to output video
        out.write(processed_frame)
        
        frame_count += 1
        
        # Print progress every 50 frames
        if frame_count % 50 == 0:
            print(f"Processed {frame_count} frames...")
    
    # Release resources
    cap.release()
    out.release()
    
    print(f"Analysis complete. Total explosions detected: {analyzer.explosion_count}")
    print(f"Processed video saved to: {output_path}")
        
    return analyzer.explosion_count

if __name__ == "__main__":
    # Configuration
    INPUT_VIDEO = "Fire-Cracker.mp4"  # Your video file path
    OUTPUT_VIDEO = "Explosion_Analysis_Result-2.mp4"  # Output video file
    
    # Process the video
    explosion_count = process_video(INPUT_VIDEO, OUTPUT_VIDEO)
    
    # Results summary
    print("\n" + "="*50)
    print("EXPLOSION ANALYSIS RESULTS")
    print("="*50)
    print(f"Input video: {INPUT_VIDEO}")
    print(f"Output video: {OUTPUT_VIDEO}")
    print(f"Total explosions detected: {explosion_count}")
    print("="*50)
    
    # Instructions for viewing
    print("\nINSTRUCTIONS:")
    print(f"1. Open '{OUTPUT_VIDEO}' to view the analysis results")
    print("2. Explosions are highlighted with:")
    print("   - Red contours: Explosion boundaries")
    print("   - Green dot: Epicenter of explosion")
    print("   - Blue arrows: Direction of sparks/fire")
    print("   - Yellow circle: Expanding shockwave")
    print("3. Explosion count is displayed at the top of the video")

Processing video for explosion detection...
Analyzing explosion dynamics and direction of sparks...
Processed 50 frames...
Processed 100 frames...
Processed 150 frames...
Processed 200 frames...
Processed 250 frames...
Processed 300 frames...
Processed 350 frames...
Processed 400 frames...
Processed 450 frames...
Processed 500 frames...
Analysis complete. Total explosions detected: 3
Processed video saved to: Explosion_Analysis_Result-2.mp4

EXPLOSION ANALYSIS RESULTS
Input video: Fire-Cracker.mp4
Output video: Explosion_Analysis_Result-2.mp4
Total explosions detected: 3

INSTRUCTIONS:
1. Open 'Explosion_Analysis_Result-2.mp4' to view the analysis results
2. Explosions are highlighted with:
   - Red contours: Explosion boundaries
   - Green dot: Epicenter of explosion
   - Blue arrows: Direction of sparks/fire
   - Yellow circle: Expanding shockwave
3. Explosion count is displayed at the top of the video


In [3]:
import cv2
import numpy as np
import os

class ExplosionAnalyzer:
    def __init__(self):
        # Detection parameters
        self.brightness_threshold = 180  # Lowered for better detection
        self.area_threshold = 50
        
        # State variables
        self.prev_frame = None
        self.prev_gray = None
        self.explosion_count = 0
        self.active_explosions = []  # List to track all active explosions
        
    def detect_explosions(self, frame):
        # Convert to grayscale
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        if self.prev_gray is None:
            self.prev_gray = gray
            self.prev_frame = frame.copy()
            return frame, []
        
        # Calculate absolute difference between frames
        frame_diff = cv2.absdiff(gray, self.prev_gray)
        
        # Apply threshold to find significant changes
        _, thresh = cv2.threshold(frame_diff, self.brightness_threshold, 255, cv2.THRESH_BINARY)
        
        # Apply morphological operations to clean up the mask
        kernel = np.ones((5, 5), np.uint8)
        thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
        thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
        
        # Find contours in the thresholded image
        contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        explosion_centers = []
        
        # Process all contours that meet our threshold
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > self.area_threshold:
                # Calculate center of bright area
                M = cv2.moments(contour)
                if M["m00"] != 0:
                    cX = int(M["m10"] / M["m00"])
                    cY = int(M["m01"] / M["m00"])
                    explosion_centers.append((cX, cY))
        
        # Update previous frame
        self.prev_gray = gray
        self.prev_frame = frame.copy()
        
        return frame, explosion_centers
    
    def update_explosion_tracking(self, current_centers):
        # Update existing explosions and add new ones
        updated_explosions = []
        
        # First, update existing explosions
        for explosion in self.active_explosions:
            center, lifetime, arrows = explosion
            
            # Find if this explosion still exists in current frame
            found_match = False
            for curr_center in current_centers:
                distance = np.sqrt((center[0] - curr_center[0])**2 + (center[1] - curr_center[1])**2)
                if distance < 50:  # If close enough, consider it the same explosion
                    updated_explosions.append((curr_center, lifetime + 1, arrows))
                    found_match = True
                    break
            
            # If no match found but explosion is still recent, keep it with same center
            if not found_match and lifetime < 30:
                updated_explosions.append((center, lifetime + 1, arrows))
        
        # Add new explosions
        for center in current_centers:
            # Check if this is a new explosion (not close to any existing one)
            is_new = True
            for explosion in updated_explosions:
                existing_center, _, _ = explosion
                distance = np.sqrt((center[0] - existing_center[0])**2 + (center[1] - existing_center[1])**2)
                if distance < 50:
                    is_new = False
                    break
            
            if is_new:
                updated_explosions.append((center, 1, []))
                self.explosion_count += 1
        
        self.active_explosions = updated_explosions
    
    def calculate_directions(self, frame):
        # Calculate optical flow for direction analysis
        if self.prev_frame is None:
            return frame
        
        # Convert frames to grayscale
        prev_gray = cv2.cvtColor(self.prev_frame, cv2.COLOR_BGR2GRAY)
        curr_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # Calculate dense optical flow using Farneback method
        flow = cv2.calcOpticalFlowFarneback(prev_gray, curr_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        
        # Update arrows for each active explosion
        updated_explosions = []
        for explosion in self.active_explosions:
            center, lifetime, old_arrows = explosion
            
            # Create new arrows based on optical flow
            new_arrows = []
            h, w = flow.shape[:2]
            x, y = center
            
            # Define the region around the explosion to analyze
            y_start = max(0, y - 80)
            y_end = min(h, y + 80)
            x_start = max(0, x - 80)
            x_end = min(w, x + 80)
            
            step = 15  # Sample every 15 pixels
            
            # Create motion vectors in the explosion region
            for i in range(y_start, y_end, step):
                for j in range(x_start, x_end, step):
                    dx, dy = flow[i, j]
                    magnitude = np.sqrt(dx**2 + dy**2)
                    
                    # Only create arrows for significant movements
                    if magnitude > 1.0:
                        # Store arrow information
                        new_arrows.append(((j, i), (int(j + dx*3), int(i + dy*3))))
            
            updated_explosions.append((center, lifetime, new_arrows))
        
        self.active_explosions = updated_explosions
        return frame
    
    def draw_explosion_effects(self, frame):
        # Draw all effects for active explosions
        for explosion in self.active_explosions:
            center, lifetime, arrows = explosion
            
            # Draw explosion center (green dot)
            cv2.circle(frame, center, 6, (0, 255, 0), -1)
            
            # Draw expanding circle (yellow)
            radius = min(100, lifetime * 3)
            cv2.circle(frame, center, radius, (0, 255, 255), 2)
            
            # Draw all arrows (blue)
            for start, end in arrows:
                cv2.arrowedLine(frame, start, end, (255, 0, 0), 2, tipLength=0.3)
            
            # Draw explosion number near center
            explosion_idx = self.active_explosions.index(explosion) + 1
            cv2.putText(frame, f"{explosion_idx}", 
                       (center[0] + 10, center[1] - 10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
        
        return frame

def process_video(input_path, output_path="explosion_analysis_output.mp4"):
    """
    Process a video to detect explosions, highlight them, show direction arrows, and count explosions.
    
    Args:
        input_path (str): Path to the input video file
        output_path (str): Path to save the output video with analysis
    """
    
    # Verify input file exists
    if not os.path.exists(input_path):
        print(f"Error: Input video '{input_path}' not found.")
        return 0
    
    # Initialize explosion analyzer
    analyzer = ExplosionAnalyzer()
    
    # Open video
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        print("Error: Could not open video.")
        return 0
    
    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    # Define codec and create VideoWriter
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    frame_count = 0
    
    print("Processing video for explosion detection...")
    print("Analyzing explosion dynamics and direction of sparks...")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Create a copy for processing
        processed_frame = frame.copy()
        
        # Detect explosions in current frame
        processed_frame, explosion_centers = analyzer.detect_explosions(processed_frame)
        
        # Update explosion tracking
        analyzer.update_explosion_tracking(explosion_centers)
        
        # Calculate directions for active explosions
        processed_frame = analyzer.calculate_directions(processed_frame)
        
        # Draw all explosion effects (dots, circles, arrows)
        processed_frame = analyzer.draw_explosion_effects(processed_frame)
        
        # Display total explosion count
        cv2.putText(processed_frame, f"Total Explosions: {analyzer.explosion_count}", (20, 40), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2)
        
        # Display active explosion count
        cv2.putText(processed_frame, f"Active Explosions: {len(analyzer.active_explosions)}", (20, 80), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2)
        
        # Display frame number
        cv2.putText(processed_frame, f"Frame: {frame_count}", (20, height - 20), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        
        # Write to output video
        out.write(processed_frame)
        
        frame_count += 1
        
        # Print progress every 50 frames
        if frame_count % 50 == 0:
            print(f"Processed {frame_count} frames...")
    
    # Release resources
    cap.release()
    out.release()
    
    print(f"Analysis complete. Total explosions detected: {analyzer.explosion_count}")
    print(f"Processed video saved to: {output_path}")
        
    return analyzer.explosion_count

if __name__ == "__main__":
    # Configuration
    INPUT_VIDEO = "Fire-Cracker.mp4"  # Your video file path
    OUTPUT_VIDEO = "Explosion_Analysis_Result-3.mp4"  # Output video file
    
    # Process the video
    explosion_count = process_video(INPUT_VIDEO, OUTPUT_VIDEO)
    
    # Results summary
    print("\n" + "="*50)
    print("EXPLOSION ANALYSIS RESULTS")
    print("="*50)
    print(f"Input video: {INPUT_VIDEO}")
    print(f"Output video: {OUTPUT_VIDEO}")
    print(f"Total explosions detected: {explosion_count}")
    print("="*50)
    
    # Instructions for viewing
    print("\nINSTRUCTIONS:")
    print(f"1. Open '{OUTPUT_VIDEO}' to view the analysis results")
    print("2. Explosions are highlighted with:")
    print("   - Green dot: Epicenter of explosion (persists while fire/sparks exist)")
    print("   - Blue arrows: Direction of sparks/fire (persists while fire/sparks exist)")
    print("   - Yellow circle: Expanding shockwave")
    print("   - Red numbers: Identification number for each explosion")
    print("3. Explosion count is displayed at the top of the video")

Processing video for explosion detection...
Analyzing explosion dynamics and direction of sparks...
Processed 50 frames...
Processed 100 frames...
Processed 150 frames...
Processed 200 frames...
Processed 250 frames...
Processed 300 frames...
Processed 350 frames...
Processed 400 frames...
Processed 450 frames...
Processed 500 frames...
Analysis complete. Total explosions detected: 31
Processed video saved to: Explosion_Analysis_Result-3.mp4

EXPLOSION ANALYSIS RESULTS
Input video: Fire-Cracker.mp4
Output video: Explosion_Analysis_Result-3.mp4
Total explosions detected: 31

INSTRUCTIONS:
1. Open 'Explosion_Analysis_Result-3.mp4' to view the analysis results
2. Explosions are highlighted with:
   - Green dot: Epicenter of explosion (persists while fire/sparks exist)
   - Blue arrows: Direction of sparks/fire (persists while fire/sparks exist)
   - Yellow circle: Expanding shockwave
   - Red numbers: Identification number for each explosion
3. Explosion count is displayed at the top of t

In [None]:
import cv2
import numpy as np
import os

class AccurateExplosionDetector:
    def __init__(self):
        # Detection parameters - tuned for firecracker videos
        self.brightness_threshold = 200
        self.area_threshold = 100
        self.min_explosion_duration = 5  # frames
        self.max_explosion_duration = 30  # frames
        
        # State variables
        self.prev_frame = None
        self.prev_gray = None
        self.explosion_count = 0
        self.active_explosions = []  # List of active explosions with their properties
        
    def detect_explosions(self, frame):
        # Convert to grayscale and apply slight blur to reduce noise
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        gray = cv2.GaussianBlur(gray, (5, 5), 0)
        
        if self.prev_gray is None:
            self.prev_gray = gray
            self.prev_frame = frame.copy()
            return frame, []
        
        # Calculate absolute difference between frames
        frame_diff = cv2.absdiff(gray, self.prev_gray)
        
        # Apply threshold to find significant changes
        _, thresh = cv2.threshold(frame_diff, self.brightness_threshold, 255, cv2.THRESH_BINARY)
        
        # Apply morphological operations to clean up the mask
        kernel = np.ones((7, 7), np.uint8)
        thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
        thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
        
        # Find contours in the thresholded image
        contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        explosion_centers = []
        
        # Process all contours that meet our threshold
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > self.area_threshold:
                # Calculate center of bright area
                M = cv2.moments(contour)
                if M["m00"] != 0:
                    cX = int(M["m10"] / M["m00"])
                    cY = int(M["m01"] / M["m00"])
                    
                    # Additional check: ensure this is actually a bright area in the current frame
                    if gray[cY, cX] > 200:  # Pixel at center should be bright
                        explosion_centers.append((cX, cY))
        
        # Update previous frame
        self.prev_gray = gray
        self.prev_frame = frame.copy()
        
        return frame, explosion_centers
    
    def update_explosion_tracking(self, current_centers):
        # First, update existing explosions
        updated_explosions = []
        
        for explosion in self.active_explosions:
            center, lifetime, max_brightness = explosion
            
            # Check if this explosion still exists in current frame
            found_match = False
            min_distance = float('inf')
            closest_center = None
            
            for curr_center in current_centers:
                distance = np.sqrt((center[0] - curr_center[0])**2 + (center[1] - curr_center[1])**2)
                if distance < min_distance:
                    min_distance = distance
                    closest_center = curr_center
            
            # If close enough, consider it the same explosion
            if min_distance < 50 and closest_center is not None:
                # Update with new center and increment lifetime
                updated_explosions.append((closest_center, lifetime + 1, max_brightness))
                found_match = True
            elif lifetime < self.max_explosion_duration:
                # Keep the explosion active even if not detected in this frame
                updated_explosions.append((center, lifetime + 1, max_brightness))
        
        # Add new explosions
        for center in current_centers:
            # Check if this is a new explosion (not close to any existing one)
            is_new = True
            for explosion in updated_explosions:
                existing_center, _, _ = explosion
                distance = np.sqrt((center[0] - existing_center[0])**2 + (center[1] - existing_center[1])**2)
                if distance < 50:
                    is_new = False
                    break
            
            if is_new:
                # Get brightness at center for new explosion
                gray = cv2.cvtColor(self.prev_frame, cv2.COLOR_BGR2GRAY)
                brightness = gray[center[1], center[0]]
                updated_explosions.append((center, 1, brightness))
                self.explosion_count += 1
                print(f"New explosion detected at frame {len(self.active_explosions)}")
        
        # Remove explosions that have been active too long
        self.active_explosions = [exp for exp in updated_explosions if exp[1] <= self.max_explosion_duration]
    
    def calculate_directions(self, frame):
        if self.prev_frame is None:
            return frame
        
        # Convert frames to grayscale
        prev_gray = cv2.cvtColor(self.prev_frame, cv2.COLOR_BGR2GRAY)
        curr_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # Calculate dense optical flow using Farneback method
        flow = cv2.calcOpticalFlowFarneback(prev_gray, curr_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        
        # For each active explosion, calculate direction vectors
        for i, explosion in enumerate(self.active_explosions):
            center, lifetime, max_brightness = explosion
            x, y = center
            
            # Only show arrows for explosions that are still bright
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            if gray[y, x] > 150 and lifetime > 1:  # Only if center is still bright
                # Create a grid around the explosion center
                h, w = flow.shape[:2]
                y_start = max(0, y - 60)
                y_end = min(h, y + 60)
                x_start = max(0, x - 60)
                x_end = min(w, x + 60)
                
                step = 12  # Sample every 12 pixels
                
                # Draw direction arrows
                for i in range(y_start, y_end, step):
                    for j in range(x_start, x_end, step):
                        dx, dy = flow[i, j]
                        magnitude = np.sqrt(dx**2 + dy**2)
                        
                        # Only draw arrows for significant movement
                        if magnitude > 1.5:
                            # Draw arrow from point to direction of movement
                            end_point = (int(j + dx*3), int(i + dy*3))
                            cv2.arrowedLine(frame, (j, i), end_point, (255, 0, 0), 2, tipLength=0.3)
        
        return frame
    
    def draw_explosion_effects(self, frame):
        # Draw all effects for active explosions
        for i, explosion in enumerate(self.active_explosions):
            center, lifetime, max_brightness = explosion
            x, y = center
            
            # Only draw if explosion is still active and bright
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            current_brightness = gray[y, x]
            
            if lifetime >= self.min_explosion_duration and current_brightness > 100:
                # Draw explosion center (green dot)
                cv2.circle(frame, center, 6, (0, 255, 0), -1)
                
                # Draw expanding circle (yellow)
                radius = min(80, lifetime * 3)
                cv2.circle(frame, center, radius, (0, 255, 255), 2)
                
                # Draw explosion number near center
                cv2.putText(frame, f"{i+1}", (x + 10, y - 10), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
        
        return frame

def process_video(input_path, output_path="explosion_analysis_output.mp4"):
    """
    Process a video to detect explosions, highlight them, show direction arrows, and count explosions.
    
    Args:
        input_path (str): Path to the input video file
        output_path (str): Path to save the output video with analysis
    """
    
    # Verify input file exists
    if not os.path.exists(input_path):
        print(f"Error: Input video '{input_path}' not found.")
        return 0
    
    # Initialize explosion analyzer
    analyzer = AccurateExplosionDetector()
    
    # Open video
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        print("Error: Could not open video.")
        return 0
    
    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    # Define codec and create VideoWriter
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    frame_count = 0
    
    print("Processing video for explosion detection...")
    print("Analyzing explosion dynamics and direction of sparks...")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Create a copy for processing
        processed_frame = frame.copy()
        
        # Detect explosions in current frame
        processed_frame, explosion_centers = analyzer.detect_explosions(processed_frame)
        
        # Update explosion tracking
        analyzer.update_explosion_tracking(explosion_centers)
        
        # Calculate directions for active explosions
        processed_frame = analyzer.calculate_directions(processed_frame)
        
        # Draw all explosion effects (dots, circles, arrows)
        processed_frame = analyzer.draw_explosion_effects(processed_frame)
        
        # Display total explosion count
        cv2.putText(processed_frame, f"Total Explosions: {analyzer.explosion_count}", (20, 40), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2)
        
        # Display active explosion count
        active_count = len([exp for exp in analyzer.active_explosions if exp[1] >= analyzer.min_explosion_duration])
        cv2.putText(processed_frame, f"Active Explosions: {active_count}", (20, 80), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2)
        
        # Display frame number
        cv2.putText(processed_frame, f"Frame: {frame_count}", (20, height - 20), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        
        # Write to output video
        out.write(processed_frame)
        
        frame_count += 1
        
        # Print progress every 50 frames
        if frame_count % 50 == 0:
            print(f"Processed {frame_count} frames...")
    
    # Release resources
    cap.release()
    out.release()
    
    print(f"Analysis complete. Total explosions detected: {analyzer.explosion_count}")
    print(f"Processed video saved to: {output_path}")
        
    return analyzer.explosion_count

if __name__ == "__main__":
    # Configuration
    INPUT_VIDEO = "Fire-Cracker.mp4"  # Your video file path
    OUTPUT_VIDEO = "Accurate_Explosion_Analysis_Result.mp4"  # Output video file
    
    # Process the video
    explosion_count = process_video(INPUT_VIDEO, OUTPUT_VIDEO)
    
    # Results summary
    print("\n" + "="*50)
    print("EXPLOSION ANALYSIS RESULTS")
    print("="*50)
    print(f"Input video: {INPUT_VIDEO}")
    print(f"Output video: {OUTPUT_VIDEO}")
    print(f"Total explosions detected: {explosion_count}")
    print("="*50)
    
    # Instructions for viewing
    print("\nINSTRUCTIONS:")
    print(f"1. Open '{OUTPUT_VIDEO}' to view the analysis results")
    print("2. Explosions are highlighted with:")
    print("   - Green dot: Epicenter of explosion")
    print("   - Blue arrows: Direction of sparks/fire")
    print("   - Yellow circle: Expanding shockwave")
    print("   - Red numbers: Identification number for each explosion")
    print("3. Explosion count is displayed at the top of the video")