In [36]:
import csv
import os
import cv2
import numpy as np
from PIL import Image
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from tensorflow.keras.models import load_model

In [38]:
# code to stitch up any video from data
def stitch_frames(sample_folder):
    frames = []
    for img_path in os.listdir( sample_folder):
        frame_path = os.path.join(sample_folder, img_path)
        frame = cv2.imread(frame_path)
        frames.append(frame)
    return frames

data_dir="/kaggle/input/ps2-testset/PS2 Test"

video_folder = 'video61'
output_video_path = "output2.mp4"

# Sort sample folders
sample_folders = sorted(os.listdir(os.path.join(data_dir,video_folder)))
img=os.listdir(os.path.join(data_dir,video_folder, sample_folders[0]))
# Initialize video writer
frame_height, frame_width, _ = cv2.imread(os.path.join(data_dir,video_folder, sample_folders[0], img[0])).shape
fourcc = cv2.VideoWriter_fourcc(*'mp4v')

out = cv2.VideoWriter(output_video_path, fourcc, 10, (frame_width, frame_height))

# Stitch frames and write to video
for sample in sample_folders:
    sample_folder = os.path.join(data_dir,video_folder, sample)
    
    frames = stitch_frames(sample_folder)
    for frame in frames:
        out.write(frame)

# Release video writer
out.release()



In [22]:
# preprocessing for the input images
def detect_black_border(image):
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    thresh = cv2.threshold(gray, 10, 255, cv2.THRESH_BINARY)[1]
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    largest_contour = sorted(contours, key=cv2.contourArea, reverse=True)[0]
    x, y, w, h = cv2.boundingRect(largest_contour)
    shrink_factor = 0.05
    x += int(w * shrink_factor)
    y += int(h * shrink_factor)
    w = int(w * (1 - 2 * shrink_factor))
    h = int(h * (1 - 2 * shrink_factor))
    return x, y, w, h

def img_preprocess(img, x, y, w, h, target_size=(224, 224)):
    image = img
    if x >= 0 and y >= 0 and w > 0 and h > 0:
        bbox = (x, y, x + w, y + h)
        cropped_image = Image.fromarray(image).crop(bbox)
        resized_image = cropped_image.resize(target_size, Image.BILINEAR)
    else:
        resized_image = cv2.resize(image, target_size, interpolation=cv2.INTER_LINEAR)
    return img_to_array(resized_image)

In [23]:


# change this variable to the path of the saved model uploaded in the models folder as pixelcnn.h5
model=load_model("/kaggle/input/final-model/pixelcnn.h5")

In [42]:
#code for detecting smoke from realtime mp4 videos. frames are taken 10 at a time
# detect smoke function to detect smoke using HSV thresholding techniques
def get_hsv(image):
    blur = cv2.GaussianBlur(image, (21, 21), 0)
    hsv = cv2.cvtColor(blur, cv2.COLOR_BGR2HSV)
    lower = np.array([0, 0, 50], dtype="uint8")
    upper = np.array([179, 50, 255], dtype="uint8")
    mask = cv2.inRange(hsv, lower, upper)
    no_smoke_pixels = cv2.countNonZero(mask)

    # Threshold of 12600 pixels has been set
    return no_smoke_pixels > 12600

def predict_smoke(frames):
    res=[]
    
    for frame in frames:
        res.append(get_hsv(frame))

            
    return 1 if round(res.count(True)/10)==0 else 0
    


def label_frame(frame, label):
    font = cv2.FONT_HERSHEY_SIMPLEX
    bottomLeftCornerOfText = (int(frame.shape[1]/2 - 100), frame.shape[0] - 20)
    fontScale = 1
    fontColor = (0, 0, 255) if label == 1 else (0, 255, 0)
    lineType = 2

    cv2.putText(frame, label, 
                bottomLeftCornerOfText, 
                font, 
                fontScale,
                fontColor,
                lineType)

def read_first_frame(video_path):
    cap = cv2.VideoCapture(video_path)
    
    # Check if the video opened successfully
    if not cap.isOpened():
        print("Error: Unable to open video.")
        return None
    
    # Read the first frame
    ret, frame = cap.read()
    
    # Release the video capture object
    cap.release()
    
    # Check if the frame was read successfully
    if not ret:
        print("Error: Unable to read first frame.")
        return None
    
    return frame


video_path = "/kaggle/working/output2.mp4"
output_video_path = "labeled_video4.mp4"
frames_per_set = 10

first_frame = read_first_frame(video_path)

if first_frame is not None:
    
    x1,y1,w,h=detect_black_border(first_frame)



cap = cv2.VideoCapture(video_path)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))


frames_buffer = []
temp=[]
while True:
    
    ret, frame = cap.read()
    if not ret:
        break

    frames_buffer.append(frame)
    img=img_preprocess(frame,x1,y1,w,h)
    temp.append(img)

    if len(frames_buffer) == frames_per_set:
        prediction = predict_smoke(temp)
        label = "Smoke detected" if prediction == 1 else "No Smoke"
        for frame in frames_buffer:
            label_frame(frame, label)
            out.write(frame)
        frames_buffer = []
        temp=[]

cap.release()
out.release()



KeyboardInterrupt: 