In [1]:
from google.colab import drive

drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [2]:
!unzip -q '/content/gdrive/MyDrive/nnfl-2021-assignment-2.zip' 

In [3]:
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.models import Sequential
import os
import pandas as pd
from tqdm.auto import tqdm
from tensorflow.keras.layers import *
import cv2
import numpy as np

In [4]:
import scipy.misc
# from tensorflow.keras.applications.resnet_v2 import ResNet50V2
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.resnet_v2 import preprocess_input, decode_predictions
from tensorflow.keras import layers
from tensorflow.keras.layers import Input, Add, Dense, Activation, ZeroPadding2D, BatchNormalization, Flatten, Conv2D, AveragePooling2D, MaxPooling2D, GlobalMaxPooling2D
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.initializers import random_uniform, glorot_uniform, constant, identity
from tensorflow.python.framework.ops import EagerTensor
from matplotlib.pyplot import imshow
from tensorflow.python.keras import backend as K
from keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

%matplotlib inline

In [None]:
# Thoughts:
# Add data augmentation
# Normalization

# Why use an LSTM at all?

In [5]:
def load_video(path, resize, max_frames=0):
    cap = cv2.VideoCapture(path)
    frames = []
    # try:
    while True:
        # print('HO')
        ret, frame = cap.read()
        if not ret:
            break
        # frame = crop_center_square(frame)
        frame = cv2.resize(frame, resize)
        frame = frame[:, :, [2, 1, 0]]
        frames.append(frame)

        if len(frames) == max_frames:
            break
    # finally:
    # cap.release()
    return np.array(frames)


def get_labels(train_labels_list, train_vids_list, indices):
  labels = []
  for i in indices:
    label = train_labels_list.loc[train_labels_list["File"] == train_vids_list[i]]["True_Label"] == "fire"
    labels.append(label.astype('int'))

  return np.asarray(labels)

def get_one_label(train_labels_list, video):
  label = train_labels_list.loc[train_labels_list["File"] == video]["True_Label"] == "fire"
  return label.astype(int)

In [6]:
# load list of files
train_vids_list = os.listdir("./train")
test_vids_list = os.listdir("./test")

# Load labels
train_labels_list = pd.read_csv('train.csv')

In [7]:
class SppnetLayer(Layer):
    '''This layer takes an input tensor and pools the tensor
      in local spatial bins.
      This layer uses Max pooling.
      It accepts input in tensorflow format. # channels last

    # Input
        list of filter in form [x,y,z] 
    # Input shape : 4d tensor [None, X,Y, channels]
    # Output shape : 3d tensor [None,pooled dim, channels] 

    '''
    def __init__(self, filters = [1], **kwargs):
        self.filters = filters
        super(SppnetLayer, self).__init__(**kwargs)

    def compute_output_shape(self, input_shape):
        length = 0;
        for f_size in self.filters:
            length+= (f_size*f_size)
        return (input_shape[0],length*input_shape[3])
      
    def get_config(self):
        config = {'filters': self.filters}
        base_config = super(SppnetLayer, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

    def call(self, inputs):
      output = []
      for f_size in self.filters:
        win_size = K.int_shape(inputs)[1]/f_size
        #print(win_size)
        win_size = int(win_size)
        for x_start in range(0,f_size):
          for y_start in range(0,f_size):
            X = int(x_start*win_size)
            Y = int(y_start*win_size)
            result = K.max(inputs[:,X:X+win_size,Y:Y+win_size,:],axis = (1,2))
            output.append(result)
      output = K.concatenate(output)
      return output

In [8]:

# MODELS:
def make_model(input_shape):
  cnn = Sequential()
  cnn.add(Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=input_shape))
  cnn.add(MaxPooling2D(pool_size=(3, 3)))
  cnn.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
  cnn.add(MaxPooling2D(pool_size=(3, 3)))
  # cnn.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
  # cnn.add(MaxPooling2D(pool_size=(3, 3)))
  cnn.add(Flatten())
  cnn.add(Dense(256, activation='relu'))
  cnn.add(Dense(128, activation='relu'))

  cl = Sequential()
  cl.add(TimeDistributed(cnn, input_shape=(30, 100, 100, 3)))
  cl.add(LSTM(256, input_shape=(30, 128),return_sequences=True,activation='tanh'))
  cl.add(Dense(32,activation='relu'))
  cl.add(Dense(1,activation='sigmoid'))


  return cl

In [14]:
earlystop = EarlyStopping(monitor='loss',
                          min_delta=0.004,
                          patience=10,
                          verbose=0,
                          mode='auto')

learning_rate_reduction = ReduceLROnPlateau(monitor='loss', 
                                            patience=3, 
                                            verbose=1, 
                                            factor=0.5, 
                                            min_lr=0.00001,
                                            min_delta=0.0015)

# checkpoint = ModelCheckpoint(filepath='/content/Checkpoints', 
#                              monitor="val_loss",
#                              verbose=1, 
#                              save_best_only=True)

callbacks = [earlystop, learning_rate_reduction] #, checkpoint]

In [15]:
# Load the data:
data, labels = [], []

for i in tqdm(train_vids_list):
  j = load_video(f'train/{i}', resize=(100, 100), max_frames=30)
  data.append(j)
  labels.append(get_one_label(train_labels_list, i))


  0%|          | 0/120 [00:00<?, ?it/s]

In [10]:
data = np.asarray(data)
labels = np.asarray(labels)
print(data.shape)
print(labels.shape)

(120, 30, 100, 100, 3)
(120, 1)


In [11]:
cl = make_model(input_shape=(100, 100, 3))
cl.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5), loss="binary_crossentropy", metrics=['accuracy']) #add precision and recall
cl.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 time_distributed (TimeDistr  (None, 30, 128)          2035008   
 ibuted)                                                         
                                                                 
 lstm (LSTM)                 (None, 30, 256)           394240    
                                                                 
 dense_2 (Dense)             (None, 30, 32)            8224      
                                                                 
 dense_3 (Dense)             (None, 30, 1)             33        
                                                                 
Total params: 2,437,505
Trainable params: 2,437,505
Non-trainable params: 0
_________________________________________________________________


In [20]:
cl.fit(data, labels, batch_size=4, epochs=100,validation_split=0.2)# callbacks=callbacks)

ValueError: ignored

In [None]:
# Load the data:
test_data, test_labels = [], []

for i in tqdm(test_vids_list):
  j = load_video(f'test/{i}', resize=(100, 100), max_frames=30)
  test_data.append(j)

test_data = np.asarray(test_data)
print(test_data.shape)
test_labels = []
test_labels.extend(cl.predict(test_data, batch_size=4))
test_labels = np.asarray(test_labels)
print(test_labels.shape)

  0%|          | 0/20 [00:00<?, ?it/s]

(20, 30, 100, 100, 3)
(20, 30, 1)


In [None]:
fin_labels = np.mean(test_labels, axis=1) < 0.5
fin_labels = fin_labels.astype(int)

In [None]:
#fin_labels

In [None]:
fire_labels = []
for i in fin_labels:
  if i[0] == 1:
    fire_labels.append('fire')
  else:
    fire_labels.append('not_fire')

In [None]:
def get_one_label(train_labels_list, video):
  label = train_labels_list.loc[train_labels_list["File"] == video]["True_Label"] == "fire"
  return label.astype(int)

test_file = pd.read_csv('test.csv')
l = [None for i in range(len(fire_labels))]
test_file['Label'] = l

for i, vid in enumerate(test_vids_list):
  test_file.loc[test_file['File'] == vid, "Label"] = fire_labels[i]

In [None]:
#test_file

In [None]:
test_file.to_csv('out3.csv', index=False)