In [1]:
import os
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
import os
import pandas as pd
import cv2
import os
import glob
from tqdm import tqdm
import time

In [2]:
non_lifter_dir = 'non shop lifters'
lifter_dir = 'shop lifters'
DIR = os.getcwd()

In [3]:
non_lifter_df = pd.DataFrame(columns=['Video','Frames','FPS','Width','Height','Shoplifter'])
lifter_df = pd.DataFrame(columns=['Video','Frames','FPS','Width','Height','Shoplifter'])

In [4]:
print("[INFO] : Load all the images.....")
non_lifter_dir = os.path.join(DIR, non_lifter_dir)
lifetr_dir = os.path.join(DIR, lifter_dir)
non_lifter_videos = glob.glob(non_lifter_dir + '*/*.mp4')
lifter_videos = glob.glob(lifter_dir + '*/*.mp4')
len(non_lifter_videos), len(lifter_videos)

[INFO] : Load all the images.....


(531, 324)

In [5]:
dataset_path = os.getcwd()

non_shop_lifters_path = os.path.join(dataset_path, "non shop lifters")
shop_lifters_path = os.path.join(dataset_path, "shop lifters")

def get_unique_videos(folder_path, underscore_count):
    """Retrieve video files, filtering out duplicates based on underscore count."""
    video_files = [f for f in os.listdir(folder_path) if f.endswith('.mp4')]
    
    print(f"\nTotal videos in '{folder_path}' before filtering: {len(video_files)}")

    # Filter out videos where the filename contains the specified number of underscores
    unique_videos = [os.path.join(folder_path, f) for f in video_files if f.count('_') != underscore_count]

    print(f"Total videos in '{folder_path}' after filtering: {len(unique_videos)}")

    return unique_videos

# Get unique video files from both categories with respective underscore rules
non_lifter_videos = get_unique_videos(non_shop_lifters_path, 4)  
lifter_videos = get_unique_videos(shop_lifters_path, 3)  


Total videos in 'f:\Cellula Internship cv\Shop DataSet\non shop lifters' before filtering: 531
Total videos in 'f:\Cellula Internship cv\Shop DataSet\non shop lifters' after filtering: 313

Total videos in 'f:\Cellula Internship cv\Shop DataSet\shop lifters' before filtering: 324
Total videos in 'f:\Cellula Internship cv\Shop DataSet\shop lifters' after filtering: 324


In [6]:
non_lifter_videos[5]

'f:\\Cellula Internship cv\\Shop DataSet\\non shop lifters\\shop_lifter_n_102.mp4'

In [14]:
def decode_video(video_path, num_frames=100):
    # Load video file
    path = video_path.numpy().decode('utf-8')
    cap = cv2.VideoCapture(path,apiPreference=cv2.CAP_ANY)
    frames = []
    
    # Sample frames evenly from the video
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    sample_indices = np.linspace(50, total_frames - 1, num_frames, dtype=int)
    
    for i in sample_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, i)
        ret, frame = cap.read()
        if ret:
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            frame = np.expand_dims(frame, axis=-1)  # Add a channel dimension
            frame = tf.image.resize(frame, [128, 128])
            frame = tf.keras.applications.mobilenet_v2.preprocess_input(frame)
            frames.append(frame)
        else:
            print('false')
    
    cap.release()
    return tf.stack(frames)


In [15]:
# Create TensorFlow dataset and save all videos in one tensor
def create_dataset(video_paths, labels, batch_size=8):
    all_videos = []  # List to store tensors for all videos
    all_labels = []  # List to store corresponding labels
    
    # Iterate over video paths and labels to create video tensors
    for video, label in zip(video_paths, labels):
        video_tensor = tf.py_function(decode_video, [video], tf.float32)
        all_videos.append(video_tensor)
        all_labels.append(label)

    # Stack all video tensors into one tensor
    all_videos_tensor = tf.stack(all_videos)
    all_labels_tensor = tf.convert_to_tensor(all_labels)

    return all_videos_tensor, all_labels_tensor

In [16]:
# Create a dataset with a single video and its corresponding label
non_litfer_tensor,non_labels = create_dataset(non_lifter_videos, ['0']*len(non_lifter_videos))  # Use 0 as the label for 'non'

In [17]:
non_litfer_tensor.shape, non_labels.shape

(TensorShape([313, 100, 128, 128, 1]), TensorShape([313]))

In [19]:
# Create a dataset with a single video and its corresponding label
litfer_tensor,lifter_labels = create_dataset(lifter_videos, ['1']*len(lifter_videos))  # Use 0 as the label for 'non'

In [21]:
litfer_tensor.shape,lifter_labels.shape

(TensorShape([324, 100, 128, 128, 1]), TensorShape([324]))

In [22]:
data = tf.concat([non_litfer_tensor, litfer_tensor], axis=0)
labels = tf.concat([non_labels, lifter_labels], axis=0)

In [31]:
from sklearn.model_selection import train_test_split

# Convert TensorFlow tensors to NumPy arrays
data_np = data.numpy()
labels_np = labels.numpy()

# Split the data into training and validation sets
train_data, val_data, train_labels, val_labels = train_test_split(data_np, labels_np, test_size=0.2, random_state=42, shuffle=True)

In [None]:
train_data.shape,train_labels.shape

(509, 100, 128, 128, 1)

((509, 100, 128, 128, 1), (509,))

In [36]:
val_data.shape,val_labels.shape

((128, 100, 128, 128, 1), (128,))

In [37]:
# Split the data into training and validation sets
val_data, test_data, val_labels, test_labels = train_test_split(val_data, val_labels, test_size=0.25, random_state=42,shuffle=True)

### model implementation

In [88]:
from tensorflow.keras.applications import MobileNetV2,MobileNetV3Small
from tensorflow.keras.layers import TimeDistributed, LSTM, Dense, Dropout

def create_movinetv2_lstm_grayscale(num_frames, num_classes):
    # Base MobileNetV2 with grayscale input
    # Need to adjust the first layer to accept grayscale
    inputs = tf.keras.layers.Input(shape=(128, 128, 1))
    
    # Convert grayscale to 3-channel by repeating the channel
    # This allows us to still use the pretrained weights
    x = tf.keras.layers.Conv2D(3, (1, 1), padding='same')(inputs)

    # Base MobileNetV2 for feature extraction
    base_model = MobileNetV2(
        input_shape=(128, 128, 3),
        include_top=False,
        weights='imagenet',
        pooling='avg'
    )
    
    # Freeze early layers
    for layer in base_model.layers[:100]:
        layer.trainable = False
        
    # Create a model that can process a single grayscale frame
    frame_processor = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(128, 128, 1)),
        tf.keras.layers.Conv2D(3, (1, 1), padding='same'),  # Convert 1 channel to 3
        base_model
    ])
    
    # Create full model
    input_layer = tf.keras.layers.Input(shape=(num_frames, 128, 128, 1))
    
    # Apply CNN to each frame independently
    cnn_features = TimeDistributed(frame_processor)(input_layer)
    
    # Process temporal information
    lstm = LSTM(512, return_sequences=False)(cnn_features)
    x = Dropout(0.5)(lstm)
    output = Dense(num_classes, activation='softmax')(x)
    
    model = tf.keras.Model(inputs=input_layer, outputs=output)
    return model

In [90]:
# Assuming you've prepared these tensors already:
# train_video_tensors: shape (num_samples, num_frames, height, width, channels)
# train_labels: shape (num_samples,) containing class indices
# val_video_tensors: shape (num_samples, num_frames, height, width, channels)
# val_labels: shape (num_samples,) containing class indices

import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau

# Create MODEL (using the MoViNet+LSTM architecture recommended earlier)
MODEL = create_movinetv2_lstm_grayscale(num_frames=100, num_classes=2)  # Adjust parameters as needed

# Compile MODEL
MODEL.compile(
    optimizer=tf.keras.optimizers.Adam(1e-4),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy','f1_score']
)

# Set up callbacks for training
callbacks = [
    ModelCheckpoint(
        'best_theft_detection_model.keras',
        monitor='val_accuracy',
        save_best_only=True,
        verbose=1
    ),
    EarlyStopping(
        monitor='val_accuracy',
        patience=7,
        restore_best_weights=True,
        verbose=1
    ),
    ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.2,
        patience=3,
        min_lr=1e-6,
        verbose=1
    )
]


In [None]:
# Train the model directly with tensors
history = MODEL.fit(
    x=train_data,
    y=train_labels,
    validation_data=(val_data, val_labels),
    epochs=30,
    batch_size=8,  # Adjust based on your GPU memory
    callbacks=callbacks,
    verbose=1
)

In [None]:

# Evaluate the model on validation data
val_loss, val_accuracy = MODEL.evaluate(val_data, val_labels)
print(f"Validation Loss: {val_loss:.2f}")
print(f"Validation Accuracy: {val_accuracy:.2f}")

# Save the final model
MODEL.save('final_theft_detection_model.h5')

In [None]:
import matplotlib.pyplot as plt

def plot_training_history(history):
    plt.figure(figsize=(12, 4))
    
    # Plot accuracy
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    
    # Plot loss
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

# Visualize training history
plot_training_history(history)

In [None]:
def predict_on_video(video_path, model, num_frames=64):
    # Process video using the same preprocessing as in training
    video_tensor = decode_video(video_path, num_frames)
    video_tensor = tf.expand_dims(video_tensor, axis=0)  # Add batch dimension
    
    # Make prediction
    prediction = MODEL.predict(video_tensor)
    class_index = tf.argmax(prediction[0]).numpy()
    confidence = prediction[0][class_index].numpy()
    
    return class_index, confidence