In [42]:
import os
import cv2

# Define the dataset path
dataset_path = r"C:\Users\Lenovo\OneDrive - Alexandria University\Desktop\Projects\TheftClassification\Shop DataSet"
non_lifters_path = os.path.join(dataset_path, 'non shop lifters')
lifters_path = os.path.join(dataset_path, 'shop lifters')

# Function to get video info (number of frames, dimensions)
def get_video_info(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        return None
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    cap.release()
    return frame_count, frame_width, frame_height

# Explore videos in both classes
def explore_videos(directory):
    video_files = os.listdir(directory)
    video_info = []
    for video in video_files:
        video_path = os.path.join(directory, video)
        info = get_video_info(video_path)
        if info:
            frame_count, frame_width, frame_height = info
            video_info.append({
                'file': video,
                'frames': frame_count,
                'width': frame_width,
                'height': frame_height
            })
    return video_info

# Explore the non-lifters videos
print("Non-shoplifters:")
non_lifters_info = explore_videos(non_lifters_path)
for video in non_lifters_info:
    print(video)

# Explore the shoplifters videos
print("\nShoplifters:")
lifters_info = explore_videos(lifters_path)
for video in lifters_info:
    print(video)

# Print out some basic statistics
print(f"\nTotal non-shoplifters videos: {len(non_lifters_info)}")
print(f"Total shoplifters videos: {len(lifters_info)}")

Non-shoplifters:
{'file': 'shop_lifter_n_0.mp4', 'frames': 450, 'width': 704, 'height': 576}
{'file': 'shop_lifter_n_0_1.mp4', 'frames': 450, 'width': 704, 'height': 576}
{'file': 'shop_lifter_n_1.mp4', 'frames': 450, 'width': 704, 'height': 576}
{'file': 'shop_lifter_n_10.mp4', 'frames': 475, 'width': 704, 'height': 576}
{'file': 'shop_lifter_n_100.mp4', 'frames': 400, 'width': 704, 'height': 576}
{'file': 'shop_lifter_n_100_1.mp4', 'frames': 400, 'width': 704, 'height': 576}
{'file': 'shop_lifter_n_101.mp4', 'frames': 300, 'width': 704, 'height': 576}
{'file': 'shop_lifter_n_101_1.mp4', 'frames': 300, 'width': 704, 'height': 576}
{'file': 'shop_lifter_n_102.mp4', 'frames': 275, 'width': 704, 'height': 576}
{'file': 'shop_lifter_n_102_1.mp4', 'frames': 275, 'width': 704, 'height': 576}
{'file': 'shop_lifter_n_103.mp4', 'frames': 275, 'width': 704, 'height': 576}
{'file': 'shop_lifter_n_103_1.mp4', 'frames': 275, 'width': 704, 'height': 576}
{'file': 'shop_lifter_n_104.mp4', 'frames': 

In [44]:
import os
import shutil
from sklearn.model_selection import train_test_split

# Directories with your videos
shop_lifters_dir = r"C:\Users\Lenovo\OneDrive - Alexandria University\Desktop\Projects\TheftClassification\Shop DataSet\shop lifters"
non_shop_lifters_dir = r"C:\Users\Lenovo\OneDrive - Alexandria University\Desktop\Projects\TheftClassification\Shop DataSet\non shop lifters"

# Directories for split data
preprocessed_train_dir = r"C:\Users\Lenovo\OneDrive - Alexandria University\Desktop\Projects\TheftClassification\Preprocessed2\train"
preprocessed_valid_dir = r"C:\Users\Lenovo\OneDrive - Alexandria University\Desktop\Projects\TheftClassification\Preprocessed2\valid"
preprocessed_test_dir = r"C:\Users\Lenovo\OneDrive - Alexandria University\Desktop\Projects\TheftClassification\Preprocessed2\test"

# Create directories if not already present
for folder in [preprocessed_train_dir, preprocessed_valid_dir, preprocessed_test_dir]:
    os.makedirs(folder + '/shop_lifters', exist_ok=True)
    os.makedirs(folder + '/non_shop_lifters', exist_ok=True)

# Get video file paths and labels
shoplifters_video_paths = [os.path.join(shop_lifters_dir, f) for f in os.listdir(shop_lifters_dir) if f.endswith('.mp4')]
non_shoplifters_video_paths = [os.path.join(non_shop_lifters_dir, f) for f in os.listdir(non_shop_lifters_dir) if f.endswith('.mp4')]

# Labels (1 for shoplifters, 0 for non-shoplifters)
shoplifters_labels = [1] * len(shoplifters_video_paths)
non_shoplifters_labels = [0] * len(non_shoplifters_video_paths)

# Combine video paths and labels
video_paths = shoplifters_video_paths + non_shoplifters_video_paths
labels = shoplifters_labels + non_shoplifters_labels

# Split into train + temp (for valid/test), stratify by labels
X_train, X_temp, y_train, y_temp = train_test_split(video_paths, labels, test_size=0.3, stratify=labels, random_state=42)

# Split temp into validation and test, stratify by labels
X_valid, X_test, y_valid, y_test = train_test_split(X_temp, y_temp, test_size=0.5, stratify=y_temp, random_state=42)

# Function to move files to the respective directories
def move_files(file_paths, labels, target_dir):
    for file_path, label in zip(file_paths, labels):
        if label == 1:  # Shoplifters
            target_folder = os.path.join(target_dir, 'shop_lifters')
        else:  # Non-shoplifters
            target_folder = os.path.join(target_dir, 'non_shop_lifters')
        shutil.copy(file_path, target_folder)  # Copy video to target folder

# Move videos to train/validation/test directories
move_files(X_train, y_train, preprocessed_train_dir)
move_files(X_valid, y_valid, preprocessed_valid_dir)
move_files(X_test, y_test, preprocessed_test_dir)

print("Train, validation, and test split completed.")


Train, validation, and test split completed.


In [45]:
import os
import cv2
import numpy as np
import random
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import Sequence
import tensorflow as tf

# Function to adjust frame count to the target frames
def adjust_frame_count(frames, target_frame_count=290):
    frame_count = len(frames)
    
    if frame_count == target_frame_count:
        return frames
    if frame_count > target_frame_count:
        indices = np.linspace(0, frame_count - 1, target_frame_count).astype(int)
        return [frames[i] for i in indices]
    else:
        last_frame = frames[-1]
        while len(frames) < target_frame_count:
            frames.append(last_frame)
        return frames

In [46]:
# Function to apply consistent augmentation for the same video
def augment_frame(frame, augmentation_params):
    if augmentation_params.get("flip", False):
        frame = cv2.flip(frame, 1)  # Horizontal flip

    angle = augmentation_params.get("rotation", 0)
    if angle != 0:
        h, w = frame.shape[:2]
        M = cv2.getRotationMatrix2D((w // 2, h // 2), angle, 1)
        frame = cv2.warpAffine(frame, M, (w, h))

    brightness = augmentation_params.get("brightness", 0)
    contrast = augmentation_params.get("contrast", 1.0)
    frame = cv2.convertScaleAbs(frame, alpha=contrast, beta=brightness)

    return frame

# Generate augmentation parameters for each video
def generate_augmentation_params():
    return {
        "flip": random.random() > 0.5,
        "rotation": random.uniform(-7, 7),
        "brightness": random.randint(-20, 20),
        "contrast": random.uniform(0.85, 1.15),
    }

In [50]:
# Preprocess video with optional augmentation for training data
def preprocess_video(video_path, desired_size=(200, 200), target_frames=150, augment=False):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Failed to open video: {video_path}")
        return None

    if augment:
        augmentation_params = generate_augmentation_params()
    else:
        augmentation_params = None

    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        resized_frame = cv2.resize(frame, desired_size)
        if augment:
            resized_frame = augment_frame(resized_frame, augmentation_params)
        frames.append(resized_frame)

    cap.release()
    frames = adjust_frame_count(frames, target_frames)
    frames = np.array(frames) / 255.0  # Normalize to [0, 1]

    return frames

In [52]:
class VideoDataGenerator(Sequence):
    def __init__(self, video_paths, labels, batch_size, target_frames=150, desired_size=(200, 200), augment=False, shuffle=True):
        self.video_paths = video_paths
        self.labels = labels
        self.batch_size = batch_size
        self.target_frames = target_frames
        self.desired_size = desired_size
        self.augment = augment
        self.shuffle = shuffle
        self.on_epoch_end()
    
    def __len__(self):
        return int(np.floor(len(self.video_paths) / self.batch_size))
    
    def __getitem__(self, index):
        batch_indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
        batch_video_paths = [self.video_paths[i] for i in batch_indexes]
        batch_labels = [self.labels[i] for i in batch_indexes]
        X, y = self.__data_generation(batch_video_paths, batch_labels)
        return X, y
    
    def on_epoch_end(self):
        self.indexes = np.arange(len(self.video_paths))
        if self.shuffle:
            np.random.shuffle(self.indexes)
    
    def __data_generation(self, batch_video_paths, batch_labels):
        X = np.empty((self.batch_size, self.target_frames, *self.desired_size, 3))
        y = np.empty((self.batch_size), dtype=int)
        for i, (video_path, label) in enumerate(zip(batch_video_paths, batch_labels)):
            X[i,] = preprocess_video(video_path, desired_size=self.desired_size, target_frames=self.target_frames, augment=self.augment)
            y[i] = label
        return X, y

In [54]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.metrics import Precision, Recall

def build_3d_cnn_model(input_shape):
    model = models.Sequential()
    
    # 3D Convolutional layers
    model.add(layers.Conv3D(4, kernel_size=(3, 3, 3), activation='relu', input_shape=input_shape))
    model.add(layers.MaxPooling3D(pool_size=(2, 2, 2)))
    model.add(layers.Conv3D(8, kernel_size=(3, 3, 3), activation='relu'))
    model.add(layers.MaxPooling3D(pool_size=(2, 2, 2)))
    model.add(layers.Conv3D(16, kernel_size=(3, 3, 3), activation='relu'))
    model.add(layers.MaxPooling3D(pool_size=(2, 2, 2)))
    
    # Flatten the output from the convolutional layers
    model.add(layers.Flatten())
    
    # Fully connected layers
    model.add(layers.Dense(128, activation='relu'))  # First fully connected layer
    model.add(layers.Dropout(0.5))  # Dropout for regularization
    model.add(layers.Dense(64, activation='relu'))  # Second fully connected layer
    model.add(layers.Dropout(0.25))  # Another Dropout layer for regularization
    
    # Output layer for binary classification
    model.add(layers.Dense(1, activation='sigmoid'))  
    
    return model

# Define input shape
input_shape = (150, 200, 200, 3)
model = build_3d_cnn_model(input_shape)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy', 'Precision', 'Recall'])

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [58]:
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.utils.class_weight import compute_class_weight

# Parameters for the generator
batch_size = 4
target_frames = 150
desired_size = (200, 200)

# Create data generators
train_generator = VideoDataGenerator(X_train, y_train, batch_size=batch_size, target_frames=target_frames, desired_size=desired_size, augment=True)  # Augmentation ON
valid_generator = VideoDataGenerator(X_valid, y_valid, batch_size=batch_size, target_frames=target_frames, desired_size=desired_size, augment=False)  # Augmentation OFF
test_generator = VideoDataGenerator(X_test, y_test, batch_size=batch_size, target_frames=target_frames, desired_size=desired_size, augment=False)  # Augmentation OFF

# Calculate class weights to handle class imbalance
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(y_train), y=y_train)
class_weight_dict = {0: class_weights[0], 1: class_weights[1]}

# Define callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6)

# Train the model
history = model.fit(train_generator,
                    epochs=25,
                    validation_data=valid_generator,
                    class_weight=class_weight_dict,
                    callbacks=[early_stopping, reduce_lr])

Epoch 1/25


  self._warn_if_super_not_called()


[1m149/149[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m504s[0m 3s/step - Precision: 0.4735 - Recall: 0.3792 - accuracy: 0.5817 - loss: 1.5422 - val_Precision: 0.0000e+00 - val_Recall: 0.0000e+00 - val_accuracy: 0.6250 - val_loss: 0.6817 - learning_rate: 0.0010
Epoch 2/25
[1m149/149[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m503s[0m 3s/step - Precision: 0.3789 - Recall: 0.3350 - accuracy: 0.5347 - loss: 0.6937 - val_Precision: 0.0000e+00 - val_Recall: 0.0000e+00 - val_accuracy: 0.6250 - val_loss: 0.6603 - learning_rate: 0.0010
Epoch 3/25
[1m149/149[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m471s[0m 3s/step - Precision: 0.3619 - Recall: 0.1859 - accuracy: 0.6196 - loss: 0.6726 - val_Precision: 0.5238 - val_Recall: 0.9167 - val_accuracy: 0.6562 - val_loss: 0.6930 - learning_rate: 0.0010
Epoch 4/25
[1m149/149[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m465s[0m 3s/step - Precision: 0.4003 - Recall: 0.6203 - accuracy: 0.4947 - loss: 0.6924 - val_Precision: 0.3750 - val_

In [59]:
# Evaluate the model on the test set
test_loss, test_accuracy, test_precision, test_recall = model.evaluate(test_generator)
print(f"Test Accuracy: {test_accuracy:.4f}, Precision: {test_precision:.4f}, Recall: {test_recall:.4f}")

[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m71s[0m 2s/step - Precision: 0.9651 - Recall: 1.0000 - accuracy: 0.9862 - loss: 0.0369
Test Accuracy: 0.9844, Precision: 0.9608, Recall: 1.0000


In [60]:
model.save('shoplifting_detection_model22_2.keras')  # Save as .keras
model.save('shoplifting_detection_model22_2.h5')     # Save as .h5

