### 3 Steps
- First step is to camera is stable or not 
- second step is to detect ROI by connected component analysis
- Block based approach is used 1 and 2 step
- Using K-temporal info decide it's smoke or not
br

In [2]:
import cv2
import numpy as np

# Open video file
cap = cv2.VideoCapture('../demoVideos/test (1).mp4')

while True:
    ret, frame = cap.read()
    frame = cv2.resize(frame, (960,540))
    
    if not ret:
        break
        
    cv2.imshow('Frames', frame)
    
    if cv2.waitKey(25) & 0xFF == ord('q'):
        break   

# Release video capture object and clse all windows
cap.release()
cv2.destroyAllWindows()

## Research Paper Implementation

In [26]:
# WX and WY (Sub-block size):
# These parameters define the size of the sub-blocks used for analysis.
# Larger values (e.g., 32x32) will make the algorithm 
# less sensitive to small movements efficient for computation

def camera_motion_detection(BG, BI, WX=20, WY=20, Th=0.5):
    # Calculate BD(t, x, y) as per equation (1)
    # Absolute block mean difference
    BD = np.abs(BG - BI)
    
    # Calculate sub-block average AD(i, j) as per equation (2)
    M, N = BD.shape
    i_blocks = M // WY
    j_blocks = N // WX
    
    AD = np.zeros((i_blocks, j_blocks))
    
    for i in range(i_blocks):
        for j in range(j_blocks):
            y_start, y_end = i*WY, (i+1)*WY
            x_start, x_end = j*WX, (j+1)*WX
            AD[i, j] = np.mean(BD[y_start:y_end, x_start:x_end])
    
    # Count sub-blocks with AD(i, j) > Th
    blocks_above_threshold = np.sum(AD > Th)
    total_blocks = i_blocks * j_blocks
    
    # Check if more than half of sub-blocks are above threshold
    if blocks_above_threshold > 0.9*total_blocks:
        return BI.copy(), BD, True
    else:
        # No significant camera motion, segment blobs
        return BG, BD, False
    

In [27]:
def blob_segmentation(BD, Prev_BB, Th=1):
    # Create binary image BB(t,x,y) as per equation (3)
    BB = (BD >= Th).astype(int)
    
    # Apply bitwise AND with previous frame's BB if available (equation 4)
    if prev_BB is not None:
        BB = np.logical_and(BB, prev_BB).astype(int)

#     # Apply morphological operations to remove small blobs
#     dilation_kernel = np.ones((2,2), dtype=bool)
#     erosion_kernel = np.ones((1,1), dtype=bool)
    
    BB = binary_dilation(BB)
    BB = binary_erosion(BB)
    
    # Apply connected component algorithm
    labeled_array, num_features = label(BB)
    
    return BB, labeled_array, num_features

In [29]:
import cv2
import numpy as np
from scipy.ndimage import label, binary_erosion, binary_dilation

# Open video file
BLOCK_SIZE = 4
prev_BB = None
cap = cv2.VideoCapture('../demoVideos/smoke.mp4')

# Initial frame processing
ret, frame = cap.read()
frame = cv2.resize(frame, (960,540))
yuv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2YUV)
y_channel = yuv_frame[:, :, 0].astype(np.float32)
BG = cv2.boxFilter(y_channel, ddepth=-1, ksize=(BLOCK_SIZE, BLOCK_SIZE))

while True:
    ret, frame = cap.read()
    
    if not ret:
        break
        
    # Sequential Frame processing
    frame = cv2.resize(frame, (960,540))
    yuv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2YUV)
    y_channel = yuv_frame[:, :, 0].astype(np.float32)
    BI = cv2.boxFilter(y_channel, ddepth=-1, ksize=(BLOCK_SIZE, BLOCK_SIZE))

    new_BG, BD, is_moving = camera_motion_detection(BG, BI)
    BG = new_BG
    
    cv2.imshow('Difference', BD)

    if not is_moving:
        BB, BLOBS, N = blob_segmentation(BD, Prev_BB = prev_BB)
        blobs_visual = (BLOBS.astype(np.float32) / BLOBS.max()) * 255
        cv2.imshow('BB Visualization', blobs_visual)
#         print('NO of BLOBs: ',N)
        prev_BB = BB
    else:
        prev_BB = None
    
    cv2.putText(frame, "motion", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if is_moving else (0, 0, 255), 2)
    
    cv2.imshow('Frames', frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break   

# Release video capture object and clse all windows
cap.release()
cv2.destroyAllWindows()

In [4]:
cv2.destroyAllWindows()

### Smoke-detection approach using spatial and temporal analyses, which is based on the block-processing technique. This method analyzes energy-based and color-based features within the spatial, temporal, and spatial-temporal domains, before all the proposed features are combined using an SVM classifier

## High Magnitude Optical Flow Extraction

In [1]:
import cv2
import numpy as np

def detect_smoke_farneback(video_path):
    cap = cv2.VideoCapture(video_path)
    ret, first_frame = cap.read()
    first_frame = cv2.resize(first_frame, (960,540))
    prev_gray = cv2.cvtColor(first_frame, cv2.COLOR_BGR2GRAY)
    
    while True:
        ret, frame = cap.read()
        frame = cv2.resize(frame, (960,540))
        if not ret:
            break
        
        # Apply Gaussian blur to reduce noise
        frame_blur = cv2.GaussianBlur(frame, (5, 5), 0)
        gray = cv2.cvtColor(frame_blur, cv2.COLOR_BGR2GRAY)
        
        # Calculate dense optical flow using Farneback method
        flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None,
                                            pyr_scale=0.5, levels=3, winsize=15, 
                                            iterations=3, poly_n=5, poly_sigma=1.1, flags=0)
        
        # Compute magnitude and angle of 2D vector
        magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        
        # Normalize magnitude for visualization
        magnitude_norm = cv2.normalize(magnitude, None, 0, 255, cv2.NORM_MINMAX)
        
        # Create a fresh mask for each frame
        mask = np.zeros_like(frame)
        mask[..., 1] = 255  # Set green channel to 255
        
        # Threshold for high magnitude
        high_magnitude_mask = magnitude > 0.1
        mask[..., 0][high_magnitude_mask] = angle[high_magnitude_mask] * 180 / np.pi / 2
        mask[..., 2][high_magnitude_mask] = magnitude_norm[high_magnitude_mask]
        
        # Convert HSV to RGB (BGR) color representation
        rgb = cv2.cvtColor(mask, cv2.COLOR_HSV2BGR)
        result = cv2.addWeighted(frame, 1, rgb, 2, 0)
     
        cv2.imshow('Smoke Detection Farneback', result)
        
        k = cv2.waitKey(30) & 0xff
        if k == 27:  # Press 'Esc' to exit
            break
        
        # Update previous frame
        prev_gray = gray
    
    cv2.destroyAllWindows()
    cap.release()

# Usage
video_path = '../demoVideos/smoke.mp4'
detect_smoke_farneback(video_path)


In [259]:
cv2.destroyAllWindows()
cap.release()

### MOG Background Subtraction & Optical Flow

In [38]:
import cv2
import numpy as np

avgx = []
avgy = []

def detect_smoke_farneback_mog2(video_path):
    start_frame = 2000  
    
    cap = cv2.VideoCapture(video_path)
#     cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
    
    fgbg = cv2.createBackgroundSubtractorMOG2(detectShadows=True)
    ret, first_frame = cap.read()
    if not ret:
        print("Failed to read video")
        return
    
    first_frame = cv2.resize(first_frame, (960, 540))
    prev_gray = cv2.cvtColor(first_frame, cv2.COLOR_BGR2GRAY)
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        frame = cv2.resize(frame, (960, 540))
        
        # Background subtraction
        fgmask = fgbg.apply(frame)
        
        # Apply morphological operations to reduce noise
        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
        fgmask = cv2.morphologyEx(fgmask, cv2.MORPH_CLOSE, kernel)
        fgmask = cv2.morphologyEx(fgmask, cv2.MORPH_OPEN, kernel)
        
        # Calculate Farneback optical flow
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        
        # Get average flow direction
        avgx.append(np.mean(flow[fgmask > 0, 0]))
        avgy.append(np.mean(flow[fgmask > 0, 1]))

        if len(avgx) > 30: 
            avg_fx = np.mean(avgx)
            avg_fy = np.mean(avgy)
            avg_direction_angle = np.arctan2(avg_fy, avg_fx)
            avg_direction_degrees = np.degrees(avg_direction_angle)

            # Set text on black_frame
            font = cv2.FONT_HERSHEY_SIMPLEX
            cv2.putText(frame, f"Direction wrt X+: {avg_direction_degrees:.2f} degrees", (10, 30), font, 1, (0, 255, 0), 1, cv2.LINE_AA)

            avgx.pop(0)
            avgy.pop(0)
        
        
        step = 15
        for y in range(0, frame.shape[0], step):
            for x in range(0, frame.shape[1], step):
                fx, fy = flow[y, x]
                if fgmask[y, x] > 0:  # Only draw arrows on detected regions
                    cv2.arrowedLine(frame, (x, y), (int(x + fx), int(y + fy)), (0, 255, 0), 2, tipLength=4)
        
        # Display the result
        cv2.imshow('Smoke Detection (Farneback + MOG2)', frame)
        cv2.imshow('Foreground Mask', fgmask)
        
        if cv2.waitKey(30) & 0xFF == ord('q'):
            break
        
        # Update previous frame
        prev_gray = gray
    
    cap.release()
    cv2.destroyAllWindows()

video_path = '../demoVideos/smoke.mp4'
detect_smoke_farneback_mog2(video_path)

  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


In [21]:
cv2.destroyAllWindows()

## K-Means Clustering & Thresholding + Optical Flow

In [39]:
import cv2
import numpy as np
from sklearn.cluster import KMeans
from scipy.ndimage import gaussian_filter

def kmeans_clustering_lab(image, n_clusters=3):
    lab_image = cv2.cvtColor(image, cv2.COLOR_BGR2Lab)
    reshaped_image = lab_image.reshape((-1, 3))
    kmeans = KMeans(n_clusters=n_clusters, random_state=42).fit(reshaped_image)
    clustered_image = kmeans.labels_.reshape(image.shape[:2])
    return clustered_image

def iterative_threshold_segmentation(image):
    hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    lower_bound = np.array([0, 50, 0], dtype=np.uint8)
    upper_bound = np.array([180, 200, 255], dtype=np.uint8)
    
    thresholded_image = cv2.inRange(hsv_image, lower_bound, upper_bound)
    return thresholded_image

def merge_segmentations(kmeans_result, threshold_result):
    merged_result = np.logical_and(kmeans_result, threshold_result)
    return merged_result.astype(np.uint8) * 255

def shen_filter(image):
    # Apply Gaussian filter as an approximation for Shen filter
    return gaussian_filter(image, sigma=1)

def remove_noise(image):
    # Use connected components to remove small regions
    num_labels, labels_im = cv2.connectedComponents(image)
    sizes = np.bincount(labels_im.ravel())
    mask_sizes = sizes > 1000  # Filter out small regions
    mask_sizes[0] = 0  # Background is set to False
    cleaned_image = mask_sizes[labels_im]
    return cleaned_image.astype(np.uint8) * 255

In [44]:
def process_video(video_path):
    cap = cv2.VideoCapture(video_path)
    ret, first_frame = cap.read()
    first_frame = cv2.resize(first_frame, (640, 480))
    prev_gray = cv2.cvtColor(first_frame, cv2.COLOR_BGR2GRAY)
    
    while cap.isOpened():
        ret, frame = cap.read()
        frame = cv2.resize(frame, (640, 480))
        if not ret:
            break

        # Step 1: K-means Clustering in Lab Color Space
        kmeans_result = kmeans_clustering_lab(frame, n_clusters=3)

        # Step 2: Iterative Threshold Segmentation in HSV Color Space
        threshold_result = iterative_threshold_segmentation(frame)

        # Step 3: Merge the Results
        merged_result = merge_segmentations(kmeans_result, threshold_result)

        # Step 4: Denoising
        filtered_result = shen_filter(merged_result)
        final_result = remove_noise(filtered_result)
        invert_mask = cv2.bitwise_not(final_result)
        
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        
        # Get average flow direction
        avgx.append(np.mean(flow[invert_mask > 0, 0]))
        avgy.append(np.mean(flow[invert_mask > 0, 1]))

        if len(avgx) > 30: 
            avg_fx = np.mean(avgx)
            avg_fy = np.mean(avgy)
            avg_direction_angle = np.arctan2(avg_fy, avg_fx)
            avg_direction_degrees = np.degrees(avg_direction_angle)

            # Set text on black_frame
            font = cv2.FONT_HERSHEY_SIMPLEX
            cv2.putText(frame, f"Direction wrt X+: {avg_direction_degrees:.2f} degrees", (10, 30), font, 1, (0, 255, 0), 1, cv2.LINE_AA)

            avgx.pop(0)
            avgy.pop(0)
        
        
        step = 15
        for y in range(0, frame.shape[0], step):
            for x in range(0, frame.shape[1], step):
                fx, fy = flow[y, x]
                if invert_mask[y, x] > 0:  # Only draw arrows on detected regions
                    cv2.arrowedLine(frame, (x, y), (int(x + fx), int(y + fy)), (0, 255, 0), 2, tipLength=4)
        
        # Display the result
        cv2.imshow('Smoke Detection (Farneback + Thresholding + KMeans)', frame)
        cv2.imshow('Foreground Mask', invert_mask)
        
        if cv2.waitKey(30) & 0xFF == ord('q'):
            break
        
        # Update previous frame
        prev_gray = gray

    cap.release()
    cv2.destroyAllWindows()

# Run the smoke segmentation on a sample video
video_path = '../demoVideos/smoke.mp4'
process_video(video_path)


In [23]:
cv2.destroyAllWindows()