In [None]:
%pip install opencv-python

In [3]:
import cv2
import os

# Specify the path to the UCF50 dataset folder
data_dir = r'C:\Users\Admin\OneDrive\Desktop\Ucfdata'

# Replace placeholders with actual values
action_category = 'Punch'
video_name = 'v_Punch_g01_c01.avi'

# Construct the path to the video file
video_path = os.path.join(data_dir, action_category, video_name)

# Example: Load the first frame of the video
cap = cv2.VideoCapture(video_path)

# Check if the video is successfully opened
if not cap.isOpened():
    print(f"Error: Could not open video file '{video_path}'")
else:
    # Read the first frame
    ret, frame = cap.read()

    # Check if the frame is successfully read
    if ret:
        # Display the first frame (optional)
        cv2.imshow('First Frame', frame)
        cv2.waitKey(0)
        cv2.destroyAllWindows()
    else:
        print(f"Error: Could not read frame from video file '{video_path}'")

# Release the video capture object
cap.release()


In [None]:
%pip install opencv-python numpy scikit-learn tensorflow

In [1]:
%pip show tensorflow

Name: tensorflow
Version: 2.15.0
Summary: TensorFlow is an open source machine learning framework for everyone.
Home-page: https://www.tensorflow.org/
Author: Google Inc.
Author-email: packages@tensorflow.org
License: Apache 2.0
Location: c:\users\admin\appdata\local\programs\python\python310\lib\site-packages
Requires: tensorflow-intel
Required-by: 
Note: you may need to restart the kernel to use updated packages.


In [7]:
import numpy as np

from sklearn.ensemble import IsolationForest
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.preprocessing import OneHotEncoder


In [8]:
import tensorflow as tf 
from tensorflow.keras.models import Sequential
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt

In [9]:
# Define your parameters
seq_len = 5  # Sequence length
img_height, img_width = 90, 90  # Image dimensions

# Load UCF50 dataset
classes = os.listdir(data_dir)

def frames_extraction(video_path):
    frames_list = []
    vidObj = cv2.VideoCapture(video_path)
    count = 1
    while count <= seq_len:
        success, image = vidObj.read()
        if success:
            image = cv2.resize(image, (img_height, img_width))
            frames_list.append(image)
            count += 1
        else:
            print("Defected frame")
            break
    return frames_list



In [None]:
def create_data(input_dir):
    X = []
    Y = []
    for c in classes:
        print(c)
        files_list = os.listdir(os.path.join(input_dir, c))
        for f in files_list[:600]:
            frames = frames_extraction(os.path.join(os.path.join(input_dir, c), f))
            if len(frames) == seq_len:
                X.append(frames)
                Y.append(c)
    X = np.asarray(X)
    Y = np.asarray(Y)
    return X, Y

# Create and preprocess data
X, Y = create_data(data_dir)


In [None]:

# Split the data into training, validation, and testing sets
X_train, X_temp, y_train, y_temp = train_test_split(X, Y, test_size=0.4, shuffle=True, random_state=1)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, shuffle=True, random_state=1)

# Convert labels to one-hot encoding
encoder = OneHotEncoder(sparse=False)
y_train_encoded = encoder.fit_transform(y_train.reshape(-1, 1))
y_val_encoded = encoder.transform(y_val.reshape(-1, 1))
y_test_encoded = encoder.transform(y_test.reshape(-1, 1))

# Data preprocessing for Conv3D model
X_train = X_train / 255.0
X_val = X_val / 255.0
X_test = X_test / 255.0

# Data preprocessing for Isolation Forest
X_train_flatten = X_train.reshape(X_train.shape[0], -1)
X_val_flatten = X_val.reshape(X_val.shape[0], -1)
X_test_flatten = X_test.reshape(X_test.shape[0], -1)
X_train_flatten = X_train_flatten / 255.0
X_val_flatten = X_val_flatten / 255.0
X_test_flatten = X_test_flatten / 255.0


In [None]:
# Model for violent action detection
model_violence = keras.Sequential()
model_violence.add(layers.Conv3D(32, kernel_size=(3, 3, 3), activation='relu', kernel_initializer='he_uniform', input_shape=(seq_len, img_height, img_width, 3)))
model_violence.add(layers.MaxPooling3D(pool_size=(2, 2, 2)))
model_violence.add(layers.BatchNormalization(center=True, scale=True))
model_violence.add(layers.Dropout(0.5))
model_violence.add(layers.Conv3D(64, kernel_size=(3, 3, 3), activation='relu', kernel_initializer='he_uniform'))
model_violence.add(layers.MaxPooling3D(pool_size=(2, 2, 2)))
model_violence.add(layers.BatchNormalization(center=True, scale=True))
model_violence.add(layers.Dropout(0.5))
model_violence.add(layers.Flatten())
model_violence.add(layers.Dense(256, activation='relu', kernel_initializer='he_uniform'))
model_violence.add(layers.Dense(256, activation='relu', kernel_initializer='he_uniform'))
model_violence.add(layers.Dense(len(classes), activation='softmax'))

model_violence.compile(loss='categorical_crossentropy',
                       optimizer=keras.optimizers.Adam(lr=0.001),
                       metrics=['accuracy'])


In [None]:
# Train the model for violent action detection
history_violence = model_violence.fit(X_train, y_train_encoded,
                                      batch_size=5,
                                      epochs=25,
                                      verbose=1,
                                      validation_data=(X_val, y_val_encoded))


In [None]:
# Evaluate the model for violent action detection
scores_violence = model_violence.evaluate(X_test, y_test_encoded)
print(f"Violent Action Detection Test Accuracy: {scores_violence[1]*100}")
y_pred_violence = model_violence.predict(X_test)
y_pred_violence = np.argmax(y_pred_violence, axis=1)
y_test_violence = np.argmax(y_test_encoded, axis=1)
print(classification_report(y_test_violence, y_pred_violence))


In [None]:
# Model for anomalous crowd behavior detection using Isolation Forests
iso_forest = IsolationForest(contamination=0.1)
iso_forest.fit(X_train_flatten)


In [None]:
# Predict on test set
anomaly_pred = iso_forest.predict(X_test_flatten)

# Evaluate the anomalous crowd behavior detection
anomaly_true = np.ones_like(anomaly_pred)  # Since UCF50 is not annotated for anomalies, consider all as normal
print("Anomalous Crowd Behavior Detection Classification Report:")
print(classification_report(anomaly_true, anomaly_pred))



In [None]:
# Visualization of Conv3D model training history
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history_violence.history['accuracy'], label='Training Accuracy')
plt.plot(history_violence.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Conv3D Model Training History')

plt.subplot(1, 2, 2)
plt.plot(history_violence.history['loss'], label='Training Loss')
plt.plot(history_violence.history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.title('Conv3D Model Training History')

plt.show()

In [None]:

# Live surveillance using webcam
cap = cv2.VideoCapture(0)

# Initialize frame buffer
frames_buffer = []

while True:
    # Read a frame from the webcam
    ret, frame = cap.read()

    # Resize the frame to match the model input size
    frame = cv2.resize(frame, (img_width, img_height))

    # Display the frame
    cv2.imshow('Live Surveillance', frame)

    # Add the frame to the buffer
    frames_buffer.append(frame)

    # Keep the buffer size equal to the sequence length
    if len(frames_buffer) > seq_len:
        frames_buffer.pop(0)

    # Perform violence detection every seq_len frames
    if len(frames_buffer) == seq_len:
        # Convert frames buffer to a sequence for violence detection
        sequence = np.asarray(frames_buffer).reshape((1, seq_len, img_height, img_width, 3))

        # Predict violence using the Conv3D model
        violence_result = model_violence.predict(sequence)
        predicted_class = classes[np.argmax(violence_result)]
        print(f"Violence Detection Result: {predicted_class}")

        # Reset the buffer for the next sequence
        frames_buffer = []

    # Break the loop if 'q' is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release the webcam and close the OpenCV window
cap.release()
cv2.destroyAllWindows()