In [1]:
import torch

#check if CUDA is available
if torch.cuda.is_available():
  device = torch.device("cuda")
  print("Running on the GPU")
else:
  device = torch.device("cpu")
  print("Running on the CPU")

Running on the GPU


In [1]:
import cv2
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import optimizers
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Dropout, GlobalAveragePooling2D, concatenate, Activation
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from sklearn.metrics import classification_report
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [4]:
# Load the dataset
train_path = 'Train'
test_path = 'Test'
class_names = ['Fire', 'NonFire']
train_data = []
train_labels = []
test_data = []
test_labels = []

In [5]:
# Load the training data
for class_name in class_names:
    class_path = os.path.join(train_path, class_name)
    for video_name in os.listdir(class_path):
        video_path = os.path.join(class_path, video_name)
        cap = cv2.VideoCapture(video_path)
        while(cap.isOpened()):
            ret, frame = cap.read()
            if ret == False:
                break
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            resized = cv2.resize(gray, (112, 112), interpolation = cv2.INTER_AREA)
            train_data.append(resized.reshape((112, 112, 1)))
            train_labels.append(class_names.index(class_name))

In [6]:
#Load the testing data
for class_name in class_names:
  class_path = os.path.join(test_path, class_name)
  for video_name in os.listdir(class_path):
    video_path = os.path.join(class_path, video_name)
    cap = cv2.VideoCapture(video_path)
    while(cap.isOpened()):
      ret, frame = cap.read()
      if ret == False:
        break
      gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
      resized = cv2.resize(gray, (112, 112), interpolation = cv2.INTER_AREA)
      test_data.append(resized.reshape((112, 112, 1)))
      test_labels.append(class_names.index(class_name))

In [7]:
#Convert the data into numpy arrays
train_data = np.array(train_data)
train_labels = np.array(train_labels)
test_data = np.array(test_data)
test_labels = np.array(test_labels)

In [8]:
#One-hot encode the labels
train_labels = to_categorical(train_labels)
test_labels = to_categorical(test_labels)

In [2]:
# Define the SA-FDS model
def sa_fds(input_shape, num_classes):
    input_img = Input(shape=input_shape)

    x = Conv2D(32, (3, 3), padding='same', activation='relu')(input_img)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    # Fire module 1
    squeeze1 = Conv2D(8, (1, 1), padding='same', activation='relu')(x)
    expand1 = concatenate([Conv2D(32, (1, 1), padding='same', activation='relu')(squeeze1),
                           Conv2D(32, (3, 3), padding='same', activation='relu')(squeeze1)])
    x = Dropout(0.2)(expand1)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    # Fire module 2
    squeeze2 = Conv2D(8, (1, 1), padding='same', activation='relu')(x)
    expand2 = concatenate([Conv2D(32, (1, 1), padding='same', activation='relu')(squeeze2),
                           Conv2D(32, (3, 3), padding='same', activation='relu')(squeeze2)])
    x = Dropout(0.2)(expand2)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    # Fire module 3
    squeeze3 = Conv2D(8, (1, 1), padding='same', activation='relu')(x)
    expand3 = concatenate([Conv2D(32, (1, 1), padding='same', activation='relu')(squeeze3),
                           Conv2D(32, (3, 3), padding='same', activation='relu')(squeeze3)])
    x = Dropout(0.5)(expand3)

    # Output layer
    x = Conv2D(num_classes, (1, 1), padding='valid')(x)
    x = GlobalAveragePooling2D()(x)
    output = Activation('softmax')(x)

    model = Model(inputs=input_img, outputs=output)
    return model

In [10]:
# Define the input image size
input_shape = (112, 112, 1)
num_classes = 2

In [19]:
# Define the batch size and number of epochs
batch_size = 32
epochs = 100

In [20]:
# Define the optimizer and loss function
optimizer = optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.99, decay=0.001)
loss = 'binary_crossentropy'

In [21]:
# Define the SA-FDS model
model = sa_fds(input_shape, num_classes)

In [22]:
#Compile the model
model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])

In [23]:
# Define the data augmentation parameters
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    brightness_range=[0.5, 1.5],
    zoom_range=[0.8, 1.2],
    horizontal_flip=True,
    fill_mode='nearest'
)

test_datagen = ImageDataGenerator(rescale=1./255)

In [24]:
# Create the train and test generators
train_generator = train_datagen.flow(np.array(train_data), np.array(train_labels), batch_size=batch_size)
test_generator = test_datagen.flow(np.array(test_data), np.array(test_labels), batch_size=batch_size)

In [25]:
# Train the model
model.fit(train_generator, steps_per_epoch=len(train_generator), epochs=epochs, validation_data=test_generator, validation_steps=len(test_generator))

Epoch 1/100


2023-04-01 22:22:07.195471: I tensorflow/stream_executor/cuda/cuda_dnn.cc:384] Loaded cuDNN version 8600
2023-04-01 22:22:07.639709: I tensorflow/stream_executor/cuda/cuda_blas.cc:1614] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.


Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78/100
Epoch 7

<keras.callbacks.History at 0x7f3b8970ea00>

In [26]:
# Test the model
test_loss, test_acc = model.evaluate(test_generator, steps=len(test_generator))
print('Accuracy : ', test_acc)

Accuracy :  0.9826533794403076


In [27]:
#save the model
model.save('SA-FDS.h5')

In [28]:
#load the model
from tensorflow.keras.models import load_model
loaded_model = load_model('SA-FDS.h5')

**Test on New Videos**

In [1]:
import tkinter as tk
from tkinter import filedialog
from PIL import ImageTk, Image
from tensorflow.keras.models import load_model
import cv2
import os
import numpy as np

# Load the trained model and define other necessary variables
model = load_model('SA-FDS.h5')
class_names = ['Fire', 'Non Fire']
confidence_threshold = 0.8

# Define the thresholds for temporal detection
T1 = 0.7
T2 = 60

# Define variables for temporal detection
F_ref = None

# Create the Tkinter application window
window = tk.Tk()
window.title("FireWatch")
window.geometry("800x600")

# Define custom styles
title_style = {
    "font": ("Arial", 24, "bold"),
    "fg": "black",
}

button_style = {
    "font": ("Arial", 14),
    "padx": 20,
    "pady": 10,
    "bg": "lightblue"
}

output_style = {
    "font": ("Arial", 16),
    "pady": 20
}

video_frame_style = {
    "pady": 10
}

# Create the title label
title_label = tk.Label(window, text="FireWatch", **title_style)
title_label.pack()

# Create the video frame
video_frame = tk.Frame(window, **video_frame_style)
video_frame.pack(pady=10)

# Create the output label
output_label = tk.Label(window, text="", **output_style)
output_label.pack()

# Create the video player function
def play_video():
    video_path = filedialog.askopenfilename(filetypes=[("Video files", "*.mp4")])
    if video_path:
        process_video(video_path)

# Process the video
def process_video(video_path):
    global F_ref

    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    video_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    video_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*'hvc1')
    output_path = "output_video.mp4"
    out = cv2.VideoWriter(output_path, fourcc, fps, (video_width, video_height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Perform frame differencing
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        resized = cv2.resize(gray, (112, 112), interpolation=cv2.INTER_AREA)
        x = np.expand_dims(resized, axis=0) / 255.0
        print(x.shape)

        # Pass current frame through the SA-FDS model for prediction
        prediction = model.predict(x)
        cnn_label = class_names[np.argmax(prediction)]
        confidence_score = np.max(prediction)

        # Temporal detection algorithm
        if F_ref is None:
            prediction = cnn_label
        else:
            if confidence_score > T1:
                prediction = cnn_label
            else:
                mse = np.mean((resized.astype("float") - F_ref.astype("float")) ** 2)
                distance = np.sqrt(mse)
                if distance > T2:
                    prediction = 'Fire'
                else:
                    prediction = 'Non Fire'

        # Update F_ref with current frame
        F_ref = resized

        text = f'{prediction} ({confidence_score:.2f})'
        cv2.putText(frame, text, (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
        out.write(frame)

        # Display the processed frame in the Tkinter window
        image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        image = image.resize((video_width, video_height), Image.ANTIALIAS)
        img_tk = ImageTk.PhotoImage(image)
        video_label.configure(image=img_tk)
        video_label.image = img_tk

        # Update the output label
        output_label.configure(text=text)

        window.update()

    cap.release()
    out.release()

# Create the browse button
browse_button = tk.Button(window, text="Browse Video", command=play_video, **button_style)
browse_button.pack()

# Create the video label
video_label = tk.Label(video_frame)
video_label.pack()

# Run the Tkinter event loop
window.mainloop()

(1, 112, 112)


  image = image.resize((video_width, video_height), Image.ANTIALIAS)


(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 11

(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)
(1, 112, 112)


**Test on Live Stream from Modified CCTV Camera**

In [None]:
import cv2
import os
import tensorflow as tf
import tkinter as tk
from PIL import Image, ImageTk
import threading
import numpy as np
from tensorflow.keras.models import load_model

# Load the trained model and define other necessary variables
model = load_model('SA-FDS_Retrain.h5')
class_names = ['Fire', 'Non Fire']
confidence_threshold = 0.8
T1 = 0.7
T2 = 60
F_ref = None

# Create the Tkinter application window
window = tk.Tk()
window.title("FireWatch")
window.geometry("800x600")

# Create custom styles (you can define these as needed)
title_style = {
    "font": ("Arial", 24, "bold"),
    "fg": "black",
}

button_style = {
    "font": ("Arial", 14),
    "padx": 20,
    "pady": 10,
    "bg": "lightblue"
}

video_frame_style = {
    "pady": 10
}

# Create the title label
title_label = tk.Label(window, text="FireWatch", **title_style)
title_label.pack()

# Create a label to display the video feed
video_label = tk.Label(window)
video_label.pack()

# Create a function to display the video stream
def display_video(rtsp_url):
    global F_ref  # Declare F_ref as global

    cap = cv2.VideoCapture(rtsp_url)
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        resized = cv2.resize(gray, (112, 112), interpolation=cv2.INTER_AREA)
        x = np.expand_dims(resized, axis=0) / 255.0

        prediction = model.predict(x)
        cnn_label = class_names[np.argmax(prediction)]
        confidence_score = np.max(prediction)

        if F_ref is None:
            prediction = cnn_label
        else:
            if confidence_score > T1:
                prediction = cnn_label
            else:
                mse = np.mean((resized.astype("float") - F_ref.astype("float")) ** 2)
                distance = np.sqrt(mse)
                if distance > T2:
                    prediction = 'Fire'
                else:
                    prediction = 'Non Fire'

        F_ref = resized

        text = f'{prediction} ({confidence_score:.2f})'
        cv2.putText(frame, text, (140, 220), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 2, cv2.LINE_AA)

        image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        image = image.resize((640, 480), Image.ANTIALIAS)
        img_tk = ImageTk.PhotoImage(image)
        video_label.configure(image=img_tk)
        video_label.image = img_tk

        window.update()

    cap.release()

# Function to start the live stream
def start_stream():
    rtsp_url = entry_rtsp.get()
    threading.Thread(target=display_video, args=(rtsp_url,), daemon=True).start()

# Create an entry field for entering the RTSP URL
entry_rtsp = tk.Entry(window, font=("Arial", 14))
entry_rtsp.pack()

# Create a start button
start_button = tk.Button(window, text="Start Live Stream", command=start_stream, **button_style)
start_button.pack()

# Run the Tkinter event loop
window.mainloop()



  image = image.resize((640, 480), Image.ANTIALIAS)


