In [1]:
import os
import cv2
import numpy as np
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import GlobalAveragePooling2D
from tensorflow.keras.preprocessing.sequence import pad_sequences
import shutil
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import Layer
import tensorflow as tf
from tensorflow.keras.utils import register_keras_serializable
from tensorflow.keras import layers, models



In [4]:
import tensorflow as tf
from tensorflow.keras import layers, models

# Define a custom transformer encoder layer
class TransformerEncoder1(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs):
        super(TransformerEncoder1, self).__init__(**kwargs)
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim, output_shape=embed_dim)
        self.ffn = models.Sequential(
            [
                layers.Dense(ff_dim, activation="relu"),
                layers.Dense(embed_dim)  # FFN output matches embed_dim
            ]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training=None):
        # Multi-head self-attention layer
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)  # Residual connection + normalization

        # Feed-forward network
        ffn_output = self.ffn(out1)  # FFN output dimension matches embed_dim
        ffn_output = self.dropout2(ffn_output, training=training)

        # Residual connection + normalization
        return self.layernorm2(out1 + ffn_output)

# Define the Transformer-based model
def create_transformer_model(input_shape, embed_dim=512, num_heads=8, ff_dim=512):
    inputs = layers.Input(shape=input_shape)

    # Transformer Encoder block
    x = TransformerEncoder1(embed_dim, num_heads, ff_dim)(inputs)

    # Global pooling to reduce sequence dimension
    x = layers.GlobalAveragePooling1D()(x)

    # Dense layers for classification
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.5)(x)  # Dropout to prevent overfitting
    outputs = layers.Dense(13, activation='softmax')(x)  # 13 output units for multi-class classification

    model = models.Model(inputs=inputs, outputs=outputs)

    # Compile the model
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    return model


# Create the transformer model
input_shape = (500, 512)  # e.g., (n_frames, feature_dim)
transformer_model1 = create_transformer_model(input_shape)

# Model summary
transformer_model1.summary()


In [5]:
from tensorflow.keras.models import load_model


saved_model_path1 = '/Users/prabeshsharma/Documents/Unsual_activity_Detection/Saved_Model/transformer_video_classifierAbnormal.h5'
model1 = load_model(saved_model_path1, custom_objects={'TransformerEncoder': TransformerEncoder1})

# Check model summary after loading
model1.summary()



In [9]:

weights_path = '/Users/prabeshsharma/Documents/Unsual_activity_Detection/Saved_Model/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5'

# Initialize the VGG16 model with the downloaded weights
base_model = VGG16(weights=weights_path, include_top=False, input_shape=(224, 224, 3))
feature_extractor = Model(inputs=base_model.input, outputs=GlobalAveragePooling2D()(base_model.output))

# Define the list of class names
class_names = [
    "Abuse", "Arrest", "Arson","Assault","Burglary","Explosion","Fighting",
    "RoadAccident","Robbery","Shooting","Shoplifting","Stealing","Vandalism"]



# Extract frames from video
def extract_frames(video_path, output_folder, frame_rate=10):
    video_cap = cv2.VideoCapture(video_path)
    success, image = video_cap.read()
    count = 0
    frame_count = 0

    while success:
        if count % frame_rate == 0:
            image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
            frame_filename = os.path.join(output_folder, f"frame_{frame_count}.png")
            cv2.imwrite(frame_filename, image)  # Save frame as PNG file
            frame_count += 1
        success, image = video_cap.read()
        count += 1

    video_cap.release()

# Extract features from frames
def extract_features_from_frames(frames_folder, model):
    features_list = []
    for frame_file in sorted(os.listdir(frames_folder)):
        img_path = os.path.join(frames_folder, frame_file)
        
        # Read image using OpenCV
        img = cv2.imread(img_path)
        img = cv2.resize(img, (224, 224))  # Resize to target size
        img = img.astype("float32") / 255.0  # Normalize pixel values
        
        # Expand dimensions to match the input shape expected by the model
        img_array = np.expand_dims(img, axis=0)
        
        # Extract features using the model
        features = model.predict(img_array, verbose=0)
        features_list.append(features.flatten())
    
    return np.array(features_list)

# Classify video
def classify_video(features, model):
    max_length = 500  # Assuming 500 was the sequence length used during training
    padded_features = pad_sequences([features], maxlen=max_length, padding='post')
    
    prediction = model.predict(padded_features)
    predicted_class_index = np.argmax(prediction, axis=1)[0]
    
    predicted_class_name = class_names[predicted_class_index]
    
    return predicted_class_name

# Cleanup frames
def cleanup_frames(frames_folder):
    shutil.rmtree(frames_folder)

# Example usage:
video_path = '/Users/prabeshsharma/Documents/Unsual_activity_Detection/2024-10-17 09.30.48.mp4'
frames_output_folder = 'ExtractedFrame'
os.makedirs(frames_output_folder, exist_ok=True)

# Step 1: Extract frames
extract_frames(video_path, frames_output_folder)

# Step 2: Extract features
features = extract_features_from_frames(frames_output_folder, feature_extractor)

# Step 3: Load the trained model
# Load the model with the custom layer registered
# model = load_model('/Users/prabeshsharma/Documents/Unsual_activity_Detection/Saved_Model/kerasTransformer.h5')
# Step 4: Classify the video
predicted_class = classify_video(features, model1)
print(f"The predicted class for the video is: {predicted_class}")

# Step 5: Cleanup frames
cleanup_frames(frames_output_folder)


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 70ms/step
The predicted class for the video is: RoadAccident
