In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import numpy as np
import cv2
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, MaxPooling3D, Flatten, Dense, Dropout, BatchNormalization, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [None]:
# Constants
max_frames = 30  # Max frames per video
frame_size = (64, 64)  # Frame size
num_classes = 2  # Number of classes
frame_sampling_rate = 5

In [None]:
# Function to load video data from directories
def load_videos_from_directory(directory):
    videos = []
    labels = []
    class_mapping = {name: idx for idx, name in enumerate(os.listdir(directory))}  # Map folder names to indices
    for label in class_mapping.keys():
        label_dir = os.path.join(directory, label)
        if os.path.isdir(label_dir):
            for video_file in os.listdir(label_dir):
                if video_file.endswith('.mp4'):
                    video_path = os.path.join(label_dir, video_file)
                    # Load and preprocess video
                    frames = load_and_preprocess_video(video_path, frame_sampling_rate)
                    if frames is not None:
                        videos.append(frames)
                        labels.append(class_mapping[label])  # Use the mapped index
    return np.array(videos), np.array(labels)

# Function to load and preprocess individual videos
def load_and_preprocess_video(video_path, frame_sampling_rate):
    cap = cv2.VideoCapture(video_path)
    frames = []
    count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if count % frame_sampling_rate == 0:  # Sample every nth frame
            frame = cv2.resize(frame, frame_size)
            frames.append(frame / 255.0)  # Normalize pixel values
        count += 1

    cap.release()

    # Pad or truncate frames to max_frames
    if len(frames) < max_frames:
        frames += [np.zeros(frame_size + (3,))] * (max_frames - len(frames))  # Pad with zeros
    return np.array(frames)[:max_frames]


In [None]:
# Load the dataset
X_train, y_train = load_videos_from_directory('/content/drive/My Drive/archive/TrimedDataset/Train')
X_test, y_test = load_videos_from_directory('/content/drive/My Drive/archive/TrimedDataset/Test')

In [None]:
# Determine the number of unique classes
num_classes = len(np.unique(y_train))

# Print the number of classes to verify
print("Number of classes:", num_classes)

# One-hot encode the labels
y_train = to_categorical(y_train, num_classes=num_classes)
y_test = to_categorical(y_test, num_classes=num_classes)

# Check shapes
print("X_train shape:", X_train.shape)  # Should be (num_samples, max_frames, 64, 64, 3)
print("y_train shape:", y_train.shape)  # Should be (num_samples, num_classes)



In [None]:
# Define the 3D CNN model
model = Sequential()
model.add(Input(shape=(max_frames, frame_size[0], frame_size[1], 3)))
model.add(Conv3D(32, (3, 3, 3), activation='relu'))
model.add(MaxPooling3D(pool_size=(2, 2, 2)))
model.add(Conv3D(64, (3, 3, 3), activation='relu'))
model.add(MaxPooling3D(pool_size=(2, 2, 2)))
model.add(Conv3D(128, (3, 3, 3), activation='relu'))
model.add(MaxPooling3D(pool_size=(2, 2, 2)))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(num_classes, activation='softmax'))

In [None]:
# Compile the model
optimizer = Adam(learning_rate=0.0001)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])


In [None]:
from tensorflow.keras.utils import Sequence
import numpy as np

class VideoDataGenerator(Sequence):
    def __init__(self, X, y, batch_size=16, shuffle=True):
        self.X = X
        self.y = y
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.indices = np.arange(len(self.X))
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.X) / self.batch_size))

    def __getitem__(self, index):
        indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        X_batch = self.X[indices]
        y_batch = self.y[indices]
        return X_batch, y_batch

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

# Create the data generator
train_generator = VideoDataGenerator(X_train, y_train, batch_size=16)


In [None]:
history = model.fit(
    X_train, y_train,
    epochs=50,
    validation_data=(X_test, y_test)
)

In [None]:
model.save('3d_cnn_model2.keras')

In [None]:
!pip install playsound

In [None]:
import cv2
import numpy as np
from tensorflow.keras.models import load_model
from IPython.display import Audio, display  # Import Audio from IPython

# Load the trained model
model = load_model('3d_cnn_model2.keras')

# Define video processing parameters
frame_size = (64, 64)  # Resize frames to this size
max_frames = 30        # Maximum number of frames to sample
frame_sampling_rate = 1  # Sample every nth frame

def load_and_preprocess_video(video_path):
    """Load and preprocess video for anomaly detection."""
    cap = cv2.VideoCapture(video_path)
    frames = []
    count = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if count % frame_sampling_rate == 0:
            # Resize and normalize frame
            frame = cv2.resize(frame, frame_size)
            frames.append(frame / 255.0)  # Normalize pixel values
        count += 1

    cap.release()

    # Pad or truncate frames to fit max_frames
    if len(frames) < max_frames:
        frames += [np.zeros(frame_size + (3,))] * (max_frames - len(frames))  # Padding
    else:
        frames = frames[:max_frames]  # Truncating

    return np.array(frames)

def detect_anomaly(video_path):
    """Process the video and detect anomalies."""
    video_frames = load_and_preprocess_video(video_path)
    video_frames = np.expand_dims(video_frames, axis=0)  # Add batch dimension

    # Make prediction
    prediction = model.predict(video_frames)

    # Normalize predictions to range 0-1
    prediction_normalized = prediction / np.sum(prediction, axis=1, keepdims=True)

    print("Normalized Prediction:", prediction_normalized)

    # Check for anomaly (assuming the prediction output is structured correctly)
    if prediction_normalized[0][4] > 0.5:  # Adjust the index based on your model's output
        print("Anomaly detected!")
        # Play the warning sound using IPython display
        display(Audio('/content/drive/My Drive/sound.mp3', autoplay=True))  # Use .mp3 or .wav
    else:
        print("No anomaly detected.")
        print(prediction_normalized[0][4])

# Use the function with a video file
video_path = '/content/drive/My Drive/footage3.mp4'  # Path to the input video file
detect_anomaly(video_path)




In [None]:

import cv2
import numpy as np
from keras.models import load_model
from IPython.display import Audio,display

# Load your trained model
model = load_model('3d_cnn_model2.keras')  # Update with your model path

def load_and_preprocess_video(video_path, frame_sampling_rate=5, max_frames=30):
    # Load and preprocess the video
    cap = cv2.VideoCapture(video_path)
    frames = []
    count = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if count % frame_sampling_rate == 0:
            frame = cv2.resize(frame, (64, 64))
            frames.append(frame / 255.0)  # Normalize pixel values
            if len(frames) == max_frames:
                break
        count += 1

    cap.release()
    # Pad frames if necessary
    while len(frames) < max_frames:
        frames.append(np.zeros((64, 64, 3)))  # Add zero frames to reach max_frames

    return np.array(frames)

def detect_anomaly(video_path):
    # Load and preprocess the video
    video_data = load_and_preprocess_video(video_path)

    # Expand dimensions to match model input shape
    video_data = np.expand_dims(video_data, axis=0)

    # Make predictions
    predictions = model.predict(video_data)

    # Normalize predictions
    prediction_normalized = predictions / np.sum(predictions)

    # Scale predictions to a range of 0.1 to 1.0
    min_val = np.min(prediction_normalized)
    max_val = np.max(prediction_normalized)

    desired_min = 0.1
    desired_max = 1.0

    scaled_predictions = ((prediction_normalized - min_val) / (max_val - min_val)) * (desired_max - desired_min) + desired_min

    # Print scaled predictions
    print("Scaled Predictions:", scaled_predictions)

    # Check the value at index 4 (assuming you want to check the 5th class)
    if scaled_predictions[0][4] < 0.2:  # Adjust the threshold as needed
        print("Anomaly detected!")
        # Play a warning sound
        display(Audio('/content/drive/My Drive/sound.mp3', autoplay=True)) # Update with your sound file path
        print(scaled_predictions[0][4])
    else:
        print("No anomaly detected.")
        print(scaled_predictions[0][4])

# Use the function with a video file
video_path = '/content/drive/My Drive/footage6.mp4'  # Path to the input video file
detect_anomaly(video_path)

In [None]:
!ls '/content/drive/My Drive/archive/TrimedDataset/Test'