In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.cluster import KMeans
from tensorflow.keras.applications import InceptionV3, ResNet152, VGG19
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import Model
from tensorflow.keras.applications.inception_v3 import preprocess_input as preprocess_inception
from tensorflow.keras.applications.vgg19 import preprocess_input as preprocess_vgg
from tensorflow.keras.applications.resnet_v2 import preprocess_input as preprocess_resnet

In [None]:
# Hyperparameters 
img_size = 299
BATCH_SIZE = 32
EPOCHS = 20
n_clusters = 5
max_seq_length = n_clusters
NUM_FEATURES = 2048 

In [None]:
# Extracting Features Function
preprocessing_functions = {
    'inception': preprocess_inception,
    'vgg': preprocess_vgg,
    'resnet': preprocess_resnet
}

def extract_features_from_frames(frames, model_name):
    base_models = {
        'inception': InceptionV3(weights='imagenet', include_top=False, pooling='avg'),
        'vgg': VGG19(weights='imagenet', include_top=False, pooling='avg'),
        'resnet': ResNet152(weights='imagenet', include_top=False, pooling='avg')
    }
    model = base_models[model_name]
    frame_features = model.predict(frames)
    return frame_features

In [None]:
def load_video(path, max_frames=100, resize=(299, 299), model_name='inception'):
    cap = cv2.VideoCapture(path)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            
            if not ret:
                break
            frame = cv2.resize(frame, resize)
            frame = frame.astype(np.float32) / 255.0 
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(frame)

            if len(frames) == max_frames:
                break
    finally:
        cap.release()
    # Preprocessing
    frames = np.array(frames)
    frames = preprocessing_functions[model_name](frames)
    return frames


In [None]:
def select_keyframes(frames, n_clusters=25):
    flatten_frames = frames.reshape(frames.shape[0], -1) 
    
    kmeans = KMeans(n_clusters=n_clusters, random_state=0).fit(flatten_frames)
    
    keyframes_idx = []
    for i in range(n_clusters):
        distances = [np.linalg.norm(frame - kmeans.cluster_centers_[i]) for frame in flatten_frames]
        keyframes_idx.append(np.argmin(distances))
    
    keyframes_idx.sort()
    keyframes = frames[keyframes_idx]
    return keyframes

In [None]:
# for 25 frames
import pandas as pd

def prepare_all_videos(csv_path, main_output_directory, model_name, max_seq_length):
    df = pd.read_csv(csv_path)
    num_samples = len(df)
    frame_features = np.zeros(shape=(num_samples, max_seq_length, NUM_FEATURES), dtype="float32")
    frame_masks = np.zeros(shape=(num_samples, max_seq_length, 1), dtype="bool")
    all_labels = []
    
    if model_name == 'inception':
        img_size = (299, 299)
    elif model_name == 'resnet':
        img_size = (224, 224)
    elif model_name == 'vgg':
        img_size = (224, 224)
    
    for idx, row in df.iterrows():
        video_path = os.path.join(main_output_directory, row['Video Name'])
        label = row['normalized_labels']
        
        frames = load_video(video_path, resize=img_size)
        frames = select_keyframes(frames)
        frames = extract_features_from_frames(frames, model_name=model_name)
        
        sequence_length = frames.shape[0]
        if sequence_length > max_seq_length:
            frames = frames[:max_seq_length]
            sequence_length = max_seq_length
        elif sequence_length < max_seq_length:
            frame_masks[idx, sequence_length:, :] = 1
        
        frame_features[idx, :sequence_length, :] = frames
        all_labels.append(label)
    
    all_labels = np.array(all_labels)
    
    return (frame_features, frame_masks), all_labels


In [None]:
train_features, train_labels = prepare_all_videos('train.csv', train_dir, model_name='inception', max_seq_length=max_seq_length)
print(f"Frame features in train set: {train_data[0].shape}")
print(f"Frame masks in train set: {train_data[1].shape}")
'''Frame features in train set: (380, 25, 2048)
Frame masks in train set: (380, 25, 1)'''

In [None]:
np.savez('train_features_25_inception.npz', train_features)
np.save('train_labels_25_inception.npy', train_labels)

In [None]:
train_data = np.load('train_features_25_inception.npz')
train_features = train_data['features']
train_masks = train_data['masks']
train_labels = np.load('train_labels_25_inception.npy')

In [None]:
test_features, test_labels = prepare_all_videos('test.csv', test_dir, model_name='inception', max_seq_length=max_seq_length)
np.savez('test_features_25_inception.npz', features=test_features[0], masks=test_features[1])
np.save('test_labels_25_inception.npy', test_labels)
print(f"Frame features in test set: {test_features[0].shape}")   #Frame features in test set: (120, 25, 2048)
print(f"Frame masks in test set: {test_features[1].shape}")  #Frame masks in test set: (120, 25, 1)

In [None]:
val_features, val_labels = prepare_all_videos('val_stage1.csv', val_dir, model_name='inception', max_seq_length=max_seq_length)
np.savez('val_features_5_inception.npz', features=val_features[0], masks=val_features[1])
np.save('val_labels_5_inception.npy', val_labels)
print(f"Frame features in validation set: {val_features[0].shape}") # Frame features in validation set: (96, 25, 2048)
print(f"Frame masks in validation set: {val_features[1].shape}") # Frame masks in validation set: (96, 25, 1)

## Adding Sequence Part

In [None]:
class F1Score(tf.keras.metrics.Metric):
    def __init__(self, name='f1_score', **kwargs):
        super().__init__(name=name, **kwargs)
        self.precision = tf.keras.metrics.Precision()
        self.recall = tf.keras.metrics.Recall()

    def update_state(self, y_true, y_pred, sample_weight=None):
        self.precision.update_state(y_true, y_pred, sample_weight)
        self.recall.update_state(y_true, y_pred, sample_weight)

    def result(self):
        return 2 * ((self.precision.result() * self.recall.result()) /
                    (self.precision.result() + self.recall.result() + tf.keras.backend.epsilon()))

    def reset_states(self):
        self.precision.reset_states()
        self.recall.reset_states()

In [None]:
from keras.callbacks import EarlyStopping
from sklearn.utils import class_weight

early_stopping = EarlyStopping(monitor='val_loss', patience=3)

weights = class_weight.compute_class_weight('balanced', classes=np.unique(train_labels_resnet), y=train_labels_resnet)
class_weights = dict(enumerate(weights))

reduce_lr = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.001)


In [None]:
from tensorflow.keras.metrics import AUC
from tensorflow.keras.callbacks import EarlyStopping

def inception_lstm_model():
    frame_features_input = keras.Input((max_seq_length, NUM_FEATURES))
    mask_input = keras.Input((max_seq_length,), dtype="bool")

    layer = keras.layers.LSTM(32, return_sequences=True, kernel_regularizer=regularizers.l2(0.01))(
        frame_features_input, mask=mask_input
    )
    layer = keras.layers.Dropout(0.5)(layer)
    layer = keras.layers.Dense(16, kernel_regularizer=regularizers.l2(0.01))(layer)
    layer = keras.layers.Dropout(0.5)(layer)
    output = keras.layers.Dense(1, activation="sigmoid")(layer)

    LSTM_model = keras.Model([frame_features_input, mask_input], output)

    LSTM_model.compile(optimizer='adam', loss='binary_crossentropy',
              metrics=[
                  'accuracy',
                  tf.keras.metrics.Precision(name='precision'),
                  tf.keras.metrics.Recall(name='recall'),
                  tf.keras.metrics.AUC(name='auc'),
                  F1Score()
              ])
    )
    return LSTM_model

In [None]:
from tensorflow import keras
def inception_lstm_experiment():
    filepath = "F:\\Augmented videos 20\\negin\\video_classifier"
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1
    )
    early_stopping = EarlyStopping(monitor='val_loss', patience=3)

    seq_model = resnet_lstm_model()
    history = seq_model.fit(
    [train_features, train_masks],
    train_labels,
    validation_data=([val_features, val_masks], val_labels),
    epochs=100,
    callbacks=[checkpoint, early_stopping],
    class_weight = class_weights
)

    seq_model.load_weights(filepath)
    loss, f1_score, auc = seq_model.evaluate([test_features, test_masks], test_labels)
    print(f"F1 Score: {f1_score}")
    print(f"AUC: {auc}")

    return history, seq_model

In [None]:
_, sequence_model = resnet_lstm_experiment()