In [4]:
import cv2
import os
import numpy as np

def extract_frames_from_folder(folder_path, output_folder, resize_shape=(224, 224), apply_noise_reduction=True, apply_optical_flow=True):
    for video_filename in os.listdir(folder_path):
        video_path = os.path.join(folder_path, video_filename)
        output_subfolder = os.path.join(output_folder, video_filename.split('.')[0])

        extract_frames(video_path, output_subfolder, resize_shape, apply_noise_reduction, apply_optical_flow)

def extract_frames(video_path, output_folder, resize_shape=(224, 224), apply_noise_reduction=True, apply_optical_flow=True):
    # Open the video file
    cap = cv2.VideoCapture(video_path)

    # Get frames per second (fps) and total number of frames
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Create the output folder if it doesn't exist
    os.makedirs(output_folder, exist_ok=True)

    frame_count = 0

    # Initialize previous frame for optical flow
    prev_frame = None

    while True:
        # Read a frame from the video
        ret, frame = cap.read()

        if not ret:
            break

        # Apply noise reduction (Gaussian blur) if specified
        if apply_noise_reduction:
            frame = cv2.GaussianBlur(frame, (5, 5), 0)

        # Resize the frame to the desired shape
        frame = cv2.resize(frame, resize_shape)

        # Convert the frame to uint8 if it's not already
        if frame.dtype != 'uint8':
            frame = (frame * 255).astype('uint8')

        # Apply optical flow if specified
        if apply_optical_flow and prev_frame is not None:
            # Convert frames to grayscale
            prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
            current_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

            # Compute optical flow using Farneback method
            flow = cv2.calcOpticalFlowFarneback(prev_gray, current_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)


            # Save optical flow as an image file
            flow_filename = os.path.join(output_folder, f"flow_{frame_count:04d}.jpg")
            save_optical_flow_image(flow, flow_filename)

        # Save the frame as an image file
        frame_filename = os.path.join(output_folder, f"frame_{frame_count:04d}.jpg")
        cv2.imwrite(frame_filename, frame)

        frame_count += 1
        prev_frame = frame

    # Release the video capture object
    cap.release()
    ####
def save_optical_flow_image(flow, filename):
    # Calculate polar coordinates
    mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])

    # Convert polar coordinates to HSV color representation
    hsv = np.zeros((flow.shape[0], flow.shape[1], 3), dtype=np.uint8)
    hsv[..., 0] = ang * 180 / np.pi / 2
    hsv[..., 1] = 255
    hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)

    # Convert HSV to BGR
    flow_bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

    # Save the optical flow as an image file
    cv2.imwrite(filename, flow_bgr)

#####

# Specify the paths to the "normal" and "anomaly" folders
normal_folder_path = "dataset/normal"
anomaly_folder_path = "dataset/anomaly"

# Specify the output folders for frames and optical flow
output_normal_folder = "normal_output"
output_anomaly_folder = "anomaly_output"

# Resize shape for the frames
resize_shape = (224, 224)

# Extract frames from the "normal" folder with noise reduction and optical flow
extract_frames_from_folder(normal_folder_path, output_normal_folder, resize_shape, apply_noise_reduction=True, apply_optical_flow=True)

# Extract frames from the "anomaly" folder with noise reduction and optical flow
extract_frames_from_folder(anomaly_folder_path, output_anomaly_folder, resize_shape, apply_noise_reduction=True, apply_optical_flow=True)


In [15]:
import cv2
import os
import numpy as np
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report

# Function to load and preprocess frames using ResNet
def load_and_preprocess_frames(folder_path, model, resize_shape=(224, 224)):
    data = []
    labels = []
    for video_folder in os.listdir(folder_path):
        video_folder_path = os.path.join(folder_path, video_folder)
        for frame_filename in os.listdir(video_folder_path):
            frame_path = os.path.join(video_folder_path, frame_filename)
            image = cv2.imread(frame_path)
            image = cv2.resize(image, resize_shape)
            image = preprocess_input(image)  # Preprocess according to ResNet requirements
            data.append(image)
            labels.append(1 if "normal" in folder_path else 0)
    return np.array(data), np.array(labels)

# Specify the paths to the "normal" and "anomaly" folders
normal_folder_path = "normal_output"
anomaly_folder_path = "anomaly_output"

# Load ResNet50 model pre-trained on ImageNet data
resnet_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Create a new model by removing the last layers of ResNet
feature_extractor = Model(inputs=resnet_model.input, outputs=resnet_model.layers[-1].output)

# Load and preprocess data for normal and anomaly classes using ResNet
normal_data, normal_labels = load_and_preprocess_frames(normal_folder_path, feature_extractor)
anomaly_data, anomaly_labels = load_and_preprocess_frames(anomaly_folder_path, feature_extractor)

# Concatenate normal and anomaly data
all_data = np.concatenate((normal_data, anomaly_data))
all_labels = np.concatenate((normal_labels, anomaly_labels))

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(all_data, all_labels, test_size=0.2, random_state=42)

# Flatten the feature vectors for SVM
X_train_flatten = X_train.reshape((X_train.shape[0], -1))
X_test_flatten = X_test.reshape((X_test.shape[0], -1))

# Train a Support Vector Machine (SVM) classifier
svm_classifier = SVC(probability=True)
svm_classifier.fit(X_train_flatten, y_train)

# Make predictions on the test set
predictions = svm_classifier.predict(X_test_flatten)

# Evaluate the model
accuracy = accuracy_score(y_test, predictions)
print("Accuracy:", accuracy)

# Display classification report
print("Classification Report:\n", classification_report(y_test, predictions))


KeyboardInterrupt: 

In [16]:
######
# Modify the load_and_preprocess_frames function
def load_and_preprocess_frames(folder_path, model, resize_shape=(224, 224), batch_size=50):
    data = []
    labels = []
    for video_filename in os.listdir(folder_path):
        video_path = os.path.join(folder_path, video_filename)
        
        # Extract frames from the video
        frames = extract_frames(video_path, resize_shape=resize_shape)
        
        # Preprocess each frame using the ResNet model
        preprocessed_frames = [preprocess_input(frame) for frame in frames]
        
        # Stack frames to create a 4D tensor
        video_data = np.stack(preprocessed_frames, axis=0)
        
        # Append data and label
        data.append(video_data)
        labels.append(1 if "normal" in folder_path else 0)
        
        # Process in batches
        if len(data) == batch_size:
            yield np.concatenate(data), np.array(labels)
            data, labels = [], []

    # Process the remaining frames
    if data:
        yield np.concatenate(data), np.array(labels)

# Now load and preprocess data for normal and anomaly classes using ResNet in batches
batch_size = 50
normal_data_batches = load_and_preprocess_frames(normal_folder_path, feature_extractor, batch_size=batch_size)
anomaly_data_batches = load_and_preprocess_frames(anomaly_folder_path, feature_extractor, batch_size=batch_size)

# Concatenate normal and anomaly data in batches
all_data_batches = []
all_labels_batches = []

for normal_batch, anomaly_batch in zip(normal_data_batches, anomaly_data_batches):
    batch_data, batch_labels = np.concatenate((normal_batch[0], anomaly_batch[0])), np.concatenate((normal_batch[1], anomaly_batch[1]))
    all_data_batches.append(batch_data)
    all_labels_batches.append(batch_labels)

# Concatenate all batches
all_data = np.concatenate(all_data_batches)
all_labels = np.concatenate(all_labels_batches)

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(all_data, all_labels, test_size=0.2, random_state=42)

######
# Flatten the feature vectors for SVM
X_train_flatten = X_train.reshape((X_train.shape[0], -1))
X_test_flatten = X_test.reshape((X_test.shape[0], -1))

# Train a Support Vector Machine (SVM) classifier
svm_classifier = SVC(probability=True)
svm_classifier.fit(X_train_flatten, y_train)

# Make predictions on the test set
predictions = svm_classifier.predict(X_test_flatten)

# Evaluate the model
accuracy = accuracy_score(y_test, predictions)
print("Accuracy:", accuracy)

# Display classification report
print("Classification Report:\n", classification_report(y_test, predictions))


MemoryError: Unable to allocate 25.3 GiB for an array with shape (45059, 224, 224, 3) and data type float32