In [15]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, MaxPooling3D, Flatten, Dense, Dropout
from sklearn.model_selection import train_test_split
from tqdm import tqdm

# Set random seed for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

# Configuration
IMG_SIZE = (64, 64)  # Resize frames to 64x64
FRAME_COUNT = 16  # Number of frames per video clip
BATCH_SIZE = 8
EPOCHS = 20

# Function to preprocess a single video
def preprocess_video(video_path, img_size=IMG_SIZE, frame_count=FRAME_COUNT):
    cap = cv2.VideoCapture(video_path)
    frames = []
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Calculate frame step to uniformly sample frame_count frames
    frame_step = max(1, total_frames // frame_count)
    
    for i in range(0, total_frames, frame_step):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i)
        ret, frame = cap.read()
        if not ret:
            break
        # Resize and normalize frame
        frame = cv2.resize(frame, img_size)
        frame = frame / 255.0  # Normalize to [0,1]
        frames.append(frame)
        if len(frames) == frame_count:
            break
    
    cap.release()
    
    # If fewer frames, pad with zeros
    while len(frames) < frame_count:
        frames.append(np.zeros((img_size[0], img_size[1], 3)))
    
    return np.array(frames[:frame_count])

# Load dataset
def load_dataset(data_dir, classes=['Violence', 'NonViolence']):
    X, y = [], []
    for label, class_name in enumerate(classes):
        class_dir = os.path.join(data_dir, class_name)
        for video_file in tqdm(os.listdir(class_dir), desc=f"Loading {class_name}"):
            if video_file.endswith('.mp4'):
                video_path = os.path.join(class_dir, video_file)
                frames = preprocess_video(video_path)
                X.append(frames)
                y.append(label)
    return np.array(X), np.array(y)

# Define 3D CNN model
def create_model(input_shape=(FRAME_COUNT, IMG_SIZE[0], IMG_SIZE[1], 3)):
    model = Sequential([
        Conv3D(32, (3, 3, 3), activation='relu', padding='same', input_shape=input_shape),
        MaxPooling3D((2, 2, 2)),
        Conv3D(64, (3, 3, 3), activation='relu', padding='same'),
        MaxPooling3D((2, 2, 2)),
        Conv3D(128, (3, 3, 3), activation='relu', padding='same'),
        MaxPooling3D((2, 2, 2)),
        Flatten(),
        Dense(256, activation='relu'),
        Dropout(0.5),
        Dense(1, activation='sigmoid')  # Binary classification
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

# Main function
def main():
    # Set dataset path
    data_dir = r'C:\Users\Namo\fight recognization\extract\Real Life Violence Dataset'
    
    # Load and preprocess data
    print("Loading dataset...")
    X, y = load_dataset(data_dir)
    
    # Split into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # Create and train model
    model = create_model()
    print("Training model...")
    model.fit(
        X_train, y_train,
        validation_data=(X_test, y_test),
        batch_size=BATCH_SIZE,
        epochs=EPOCHS,
        verbose=1
    )
    
    # Evaluate model
    print("Evaluating model...")
    loss, accuracy = model.evaluate(X_test, y_test)
    print(f"Test Accuracy: {accuracy:.4f}")
    
    # Save model
    model.save('violence_detection_model.h5')
    print("Model saved as 'violence_detection_model.h5'")

if __name__ == '__main__':
    main()

Loading dataset...


Loading Violence: 100%|████████████████████████████████████████████████████████████| 1000/1000 [04:41<00:00,  3.55it/s]
Loading NonViolence: 100%|█████████████████████████████████████████████████████████| 1000/1000 [02:32<00:00,  6.55it/s]


Training model...
Epoch 1/20
[1m195/195[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m53s[0m 254ms/step - accuracy: 0.5353 - loss: 0.6960 - val_accuracy: 0.7801 - val_loss: 0.4619
Epoch 2/20
[1m195/195[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m49s[0m 251ms/step - accuracy: 0.7652 - loss: 0.4853 - val_accuracy: 0.8491 - val_loss: 0.3669
Epoch 3/20
[1m195/195[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m55s[0m 280ms/step - accuracy: 0.8175 - loss: 0.3844 - val_accuracy: 0.8645 - val_loss: 0.3070
Epoch 4/20
[1m195/195[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m56s[0m 287ms/step - accuracy: 0.8441 - loss: 0.3330 - val_accuracy: 0.8798 - val_loss: 0.2781
Epoch 5/20
[1m195/195[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m58s[0m 299ms/step - accuracy: 0.8754 - loss: 0.3020 - val_accuracy: 0.8849 - val_loss: 0.2758
Epoch 6/20
[1m195/195[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m58s[0m 296ms/step - accuracy: 0.8876 - loss: 0.2602 - val_accuracy: 0.9028 - val_loss:



Test Accuracy: 0.8977
Model saved as 'violence_detection_model.h5'


In [1]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, MaxPooling3D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
from sklearn.utils.class_weight import compute_class_weight
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
import random

# Set random seed for reproducibility
np.random.seed(42)
tf.random.set_seed(42)
random.seed(42)

# Configuration
IMG_SIZE = (64, 64)
FRAME_COUNT = 16
BATCH_SIZE = 8
EPOCHS = 50
LEARNING_RATE = 0.001

class VideoAugmentation:
    """Class for video data augmentation techniques"""
    
    @staticmethod
    def random_horizontal_flip(frames, prob=0.5):
        """Randomly flip video horizontally"""
        if random.random() < prob:
            return np.flip(frames, axis=2)  # Flip along width axis
        return frames
    
    @staticmethod
    def random_brightness_adjustment(frames, prob=0.5, factor_range=(0.7, 1.3)):
        """Randomly adjust brightness"""
        if random.random() < prob:
            factor = random.uniform(*factor_range)
            frames = frames * factor
            frames = np.clip(frames, 0, 1)
        return frames
    
    @staticmethod
    def random_contrast_adjustment(frames, prob=0.5, factor_range=(0.8, 1.2)):
        """Randomly adjust contrast"""
        if random.random() < prob:
            factor = random.uniform(*factor_range)
            mean = np.mean(frames)
            frames = (frames - mean) * factor + mean
            frames = np.clip(frames, 0, 1)
        return frames
    
    @staticmethod
    def random_rotation(frames, prob=0.3, angle_range=(-10, 10)):
        """Randomly rotate frames"""
        if random.random() < prob:
            angle = random.uniform(*angle_range)
            h, w = frames.shape[1:3]
            center = (w // 2, h // 2)
            rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
            
            rotated_frames = []
            for frame in frames:
                rotated_frame = cv2.warpAffine(frame, rotation_matrix, (w, h))
                rotated_frames.append(rotated_frame)
            return np.array(rotated_frames)
        return frames
    
    @staticmethod
    def random_temporal_crop(frames, prob=0.3):
        """Randomly crop temporal sequence"""
        if random.random() < prob and len(frames) > FRAME_COUNT // 2:
            start_idx = random.randint(0, len(frames) - FRAME_COUNT // 2)
            end_idx = start_idx + FRAME_COUNT // 2
            cropped_frames = frames[start_idx:end_idx]
            # Duplicate frames to maintain sequence length
            while len(cropped_frames) < FRAME_COUNT:
                cropped_frames = np.concatenate([cropped_frames, cropped_frames])
            return cropped_frames[:FRAME_COUNT]
        return frames
    
    @staticmethod
    def random_frame_dropout(frames, prob=0.2, dropout_rate=0.1):
        """Randomly drop frames and duplicate others"""
        if random.random() < prob:
            n_frames = len(frames)
            n_drop = int(n_frames * dropout_rate)
            drop_indices = random.sample(range(n_frames), n_drop)
            
            # Replace dropped frames with adjacent frames
            for idx in drop_indices:
                if idx > 0:
                    frames[idx] = frames[idx - 1]
                elif idx < n_frames - 1:
                    frames[idx] = frames[idx + 1]
        return frames
    
    @staticmethod
    def add_gaussian_noise(frames, prob=0.3, noise_factor=0.02):
        """Add gaussian noise to frames"""
        if random.random() < prob:
            noise = np.random.normal(0, noise_factor, frames.shape)
            frames = frames + noise
            frames = np.clip(frames, 0, 1)
        return frames

def preprocess_video(video_path, img_size=IMG_SIZE, frame_count=FRAME_COUNT, augment=False):
    """Enhanced video preprocessing with augmentation support"""
    if not os.path.exists(video_path):
        print(f"Warning: Video not found: {video_path}")
        return None
    
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Warning: Could not open video: {video_path}")
        return None
    
    frames = []
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    if total_frames == 0:
        cap.release()
        return None
    
    # Calculate frame step to uniformly sample frame_count frames
    frame_step = max(1, total_frames // frame_count)
    
    for i in range(0, total_frames, frame_step):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i)
        ret, frame = cap.read()
        if not ret:
            break
        # Resize and normalize frame
        frame = cv2.resize(frame, img_size)
        frame = frame / 255.0  # Normalize to [0,1]
        frames.append(frame)
        if len(frames) == frame_count:
            break
    
    cap.release()
    
    # If fewer frames, pad with zeros or duplicate last frame
    while len(frames) < frame_count:
        if len(frames) > 0:
            frames.append(frames[-1])  # Duplicate last frame
        else:
            frames.append(np.zeros((img_size[0], img_size[1], 3)))
    
    frames = np.array(frames[:frame_count])
    
    # Apply augmentations if training
    if augment:
        augmenter = VideoAugmentation()
        frames = augmenter.random_horizontal_flip(frames)
        frames = augmenter.random_brightness_adjustment(frames)
        frames = augmenter.random_contrast_adjustment(frames)
        frames = augmenter.random_rotation(frames)
        frames = augmenter.random_temporal_crop(frames)
        frames = augmenter.random_frame_dropout(frames)
        frames = augmenter.add_gaussian_noise(frames)
    
    return frames

def load_dataset(data_dir, classes=['Violence', 'NonViolence'], augment_training=True):
    """Load dataset with optional augmentation"""
    X, y = [], []
    
    for label, class_name in enumerate(classes):
        class_dir = os.path.join(data_dir, class_name)
        if not os.path.exists(class_dir):
            print(f"Warning: Directory not found: {class_dir}")
            continue
            
        video_files = [f for f in os.listdir(class_dir) if f.endswith(('.mp4', '.avi', '.mov'))]
        
        for video_file in tqdm(video_files, desc=f"Loading {class_name}"):
            video_path = os.path.join(class_dir, video_file)
            frames = preprocess_video(video_path, augment=False)  # Base version
            
            if frames is not None:
                X.append(frames)
                y.append(label)
                
                # Add augmented versions for training data (especially for minority class)
                if augment_training and (label == 0 or len(video_files) < 100):  # Violence class or small dataset
                    for _ in range(2):  # Create 2 augmented versions
                        aug_frames = preprocess_video(video_path, augment=True)
                        if aug_frames is not None:
                            X.append(aug_frames)
                            y.append(label)
    
    return np.array(X), np.array(y)

def create_model(input_shape=(FRAME_COUNT, IMG_SIZE[0], IMG_SIZE[1], 3)):
    """Enhanced 3D CNN model with batch normalization"""
    model = Sequential([
        Conv3D(32, (3, 3, 3), activation='relu', padding='same', input_shape=input_shape),
        BatchNormalization(),
        MaxPooling3D((2, 2, 2)),
        
        Conv3D(64, (3, 3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        MaxPooling3D((2, 2, 2)),
        
        Conv3D(128, (3, 3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        MaxPooling3D((2, 2, 2)),
        
        Flatten(),
        Dense(256, activation='relu'),
        BatchNormalization(),
        Dropout(0.5),
        
        Dense(128, activation='relu'),
        Dropout(0.3),
        
        Dense(1, activation='sigmoid')  # Binary classification
    ])
    
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
        loss='binary_crossentropy', 
        metrics=['accuracy', 'precision', 'recall']
    )
    return model

def plot_training_history(history):
    """Plot training history"""
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    # Accuracy
    axes[0, 0].plot(history.history['accuracy'], label='Training Accuracy')
    axes[0, 0].plot(history.history['val_accuracy'], label='Validation Accuracy')
    axes[0, 0].set_title('Model Accuracy')
    axes[0, 0].set_xlabel('Epoch')
    axes[0, 0].set_ylabel('Accuracy')
    axes[0, 0].legend()
    
    # Loss
    axes[0, 1].plot(history.history['loss'], label='Training Loss')
    axes[0, 1].plot(history.history['val_loss'], label='Validation Loss')
    axes[0, 1].set_title('Model Loss')
    axes[0, 1].set_xlabel('Epoch')
    axes[0, 1].set_ylabel('Loss')
    axes[0, 1].legend()
    
    # Precision
    axes[1, 0].plot(history.history['precision'], label='Training Precision')
    axes[1, 0].plot(history.history['val_precision'], label='Validation Precision')
    axes[1, 0].set_title('Model Precision')
    axes[1, 0].set_xlabel('Epoch')
    axes[1, 0].set_ylabel('Precision')
    axes[1, 0].legend()
    
    # Recall
    axes[1, 1].plot(history.history['recall'], label='Training Recall')
    axes[1, 1].plot(history.history['val_recall'], label='Validation Recall')
    axes[1, 1].set_title('Model Recall')
    axes[1, 1].set_xlabel('Epoch')
    axes[1, 1].set_ylabel('Recall')
    axes[1, 1].legend()
    
    plt.tight_layout()
    plt.savefig('training_history.png', dpi=300, bbox_inches='tight')
    plt.show()

def plot_confusion_matrix(y_true, y_pred, classes=['NonViolence', 'Violence']):
    """Plot confusion matrix"""
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.savefig('confusion_matrix.png', dpi=300, bbox_inches='tight')
    plt.show()

def plot_roc_curve(y_true, y_prob):
    """Plot ROC curve"""
    fpr, tpr, _ = roc_curve(y_true, y_prob)
    auc_score = roc_auc_score(y_true, y_prob)
    
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {auc_score:.3f})')
    plt.plot([0, 1], [0, 1], 'k--', label='Random Classifier')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.legend()
    plt.grid(True)
    plt.savefig('roc_curve.png', dpi=300, bbox_inches='tight')
    plt.show()

def evaluate_model_comprehensive(model, X_test, y_test, classes=['NonViolence', 'Violence']):
    """Comprehensive model evaluation"""
    print("=" * 50)
    print("COMPREHENSIVE MODEL EVALUATION")
    print("=" * 50)
    
    # Get predictions
    y_prob = model.predict(X_test).flatten()
    y_pred = (y_prob > 0.5).astype(int)
    
    # Basic metrics
    loss, accuracy, precision, recall = model.evaluate(X_test, y_test, verbose=0)
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    auc_score = roc_auc_score(y_test, y_prob)
    
    print(f"Test Accuracy:  {accuracy:.4f}")
    print(f"Test Precision: {precision:.4f}")
    print(f"Test Recall:    {recall:.4f}")
    print(f"Test F1-Score:  {f1_score:.4f}")
    print(f"Test AUC Score: {auc_score:.4f}")
    print(f"Test Loss:      {loss:.4f}")
    
    # Classification report
    print("\nDETAILED CLASSIFICATION REPORT:")
    print("-" * 50)
    print(classification_report(y_test, y_pred, target_names=classes, digits=4))
    
    # Confusion matrix
    print("\nCONFUSION MATRIX:")
    print("-" * 20)
    cm = confusion_matrix(y_test, y_pred)
    print(f"True Negatives:  {cm[0,0]}")
    print(f"False Positives: {cm[0,1]}")
    print(f"False Negatives: {cm[1,0]}")
    print(f"True Positives:  {cm[1,1]}")
    
    # Plot visualizations
    plot_confusion_matrix(y_test, y_pred, classes)
    plot_roc_curve(y_test, y_prob)
    
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1_score,
        'auc_score': auc_score,
        'loss': loss
    }

def main():
    """Main training function"""
    # Set dataset path
    data_dir = r'C:\Users\Namo\fight recognization\extract\Real Life Violence Dataset'
    
    print("Loading dataset with augmentation...")
    X, y = load_dataset(data_dir, augment_training=True)
    
    if len(X) == 0:
        print("No data loaded. Please check your dataset path.")
        return
    
    print(f"Total samples: {len(X)}")
    print(f"Violence samples: {np.sum(y)}")
    print(f"Non-violence samples: {len(y) - np.sum(y)}")
    
    # Split into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    
    print(f"Training samples: {len(X_train)}")
    print(f"Testing samples: {len(X_test)}")
    
    # Calculate class weights for imbalanced dataset
    class_weights = compute_class_weight(
        'balanced', 
        classes=np.unique(y_train), 
        y=y_train
    )
    class_weight_dict = {i: class_weights[i] for i in range(len(class_weights))}
    print(f"Class weights: {class_weight_dict}")
    
    # Create model
    model = create_model()
    model.summary()
    
    # Define callbacks
    callbacks = [
        EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True,
            verbose=1
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=1e-7,
            verbose=1
        ),
        ModelCheckpoint(
            'best_violence_detection_model.keras',
            monitor='val_accuracy',
            save_best_only=True,
            verbose=1
        )
    ]
    
    # Train model
    print("Training model...")
    history = model.fit(
        X_train, y_train,
        validation_data=(X_test, y_test),
        batch_size=BATCH_SIZE,
        epochs=EPOCHS,
        class_weight=class_weight_dict,
        callbacks=callbacks,
        verbose=1
    )
    
    # Plot training history
    plot_training_history(history)
    
    # Load best model and evaluate
    model.load_weights('best_violence_detection_model.keras')
    metrics = evaluate_model_comprehensive(model, X_test, y_test)
    
    # Save final model
    model.save('violence_detection_model_final.keras')
    print("\nModel saved as 'violence_detection_model_final.keras'")
    
    # Save metrics to file
    with open('model_metrics.txt', 'w') as f:
        f.write("Violence Detection Model Metrics\n")
        f.write("=" * 40 + "\n")
        for metric, value in metrics.items():
            f.write(f"{metric.capitalize()}: {value:.4f}\n")
    
    print("Training complete! Check the generated plots and metrics file.")

if __name__ == '__main__':
    main()

Loading dataset with augmentation...


Loading Violence:   1%|▊                                                             | 13/1000 [00:19<24:32,  1.49s/it]


KeyboardInterrupt: 

In [3]:
import tensorflow as tf
print(tf.__version__)

2.18.1


In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, MaxPooling3D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
from sklearn.utils.class_weight import compute_class_weight
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
import random

# Set random seed for reproducibility
np.random.seed(42)
tf.random.set_seed(42)
random.seed(42)

# Configuration
IMG_SIZE = (64, 64)
FRAME_COUNT = 16
BATCH_SIZE = 8
EPOCHS = 50
LEARNING_RATE = 0.001

class VideoAugmentation:
    """Class for video data augmentation techniques"""
    
    @staticmethod
    def random_horizontal_flip(frames, prob=0.5):
        if random.random() < prob:
            return np.flip(frames, axis=2)
        return frames
    
    @staticmethod
    def random_brightness_adjustment(frames, prob=0.5, factor_range=(0.7, 1.3)):
        if random.random() < prob:
            factor = random.uniform(*factor_range)
            frames = frames * factor
            frames = np.clip(frames, 0, 1)
        return frames
    
    @staticmethod
    def random_contrast_adjustment(frames, prob=0.5, factor_range=(0.8, 1.2)):
        if random.random() < prob:
            factor = random.uniform(*factor_range)
            mean = np.mean(frames)
            frames = (frames - mean) * factor + mean
            frames = np.clip(frames, 0, 1)
        return frames
    
    @staticmethod
    def random_rotation(frames, prob=0.3, angle_range=(-10, 10)):
        if random.random() < prob:
            angle = random.uniform(*angle_range)
            h, w = frames.shape[1:3]
            center = (w // 2, h // 2)
            rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
            rotated_frames = []
            for frame in frames:
                rotated_frame = cv2.warpAffine(frame, rotation_matrix, (w, h))
                rotated_frames.append(rotated_frame)
            return np.array(rotated_frames)
        return frames
    
    @staticmethod
    def random_temporal_crop(frames, prob=0.3):
        if random.random() < prob and len(frames) > FRAME_COUNT // 2:
            start_idx = random.randint(0, len(frames) - FRAME_COUNT // 2)
            end_idx = start_idx + FRAME_COUNT // 2
            cropped_frames = frames[start_idx:end_idx]
            while len(cropped_frames) < FRAME_COUNT:
                cropped_frames = np.concatenate([cropped_frames, cropped_frames])
            return cropped_frames[:FRAME_COUNT]
        return frames
    
    @staticmethod
    def random_frame_dropout(frames, prob=0.2, dropout_rate=0.1):
        if random.random() < prob:
            n_frames = len(frames)
            n_drop = int(n_frames * dropout_rate)
            drop_indices = random.sample(range(n_frames), n_drop)
            for idx in drop_indices:
                if idx > 0:
                    frames[idx] = frames[idx - 1]
                elif idx < n_frames - 1:
                    frames[idx] = frames[idx + 1]
        return frames
    
    @staticmethod
    def add_gaussian_noise(frames, prob=0.3, noise_factor=0.02):
        if random.random() < prob:
            noise = np.random.normal(0, noise_factor, frames.shape)
            frames = frames + noise
            frames = np.clip(frames, 0, 1)
        return frames

def preprocess_video(video_path, img_size=IMG_SIZE, frame_count=FRAME_COUNT, augment=False):
    if not os.path.exists(video_path):
        print(f"Warning: Video not found: {video_path}")
        return None
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Warning: Could not open video: {video_path}")
        return None
    frames = []
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if total_frames == 0:
        cap.release()
        return None
    frame_step = max(1, total_frames // frame_count)
    for i in range(0, total_frames, frame_step):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i)
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, img_size)
        frame = frame / 255.0
        frames.append(frame)
        if len(frames) == frame_count:
            break
    cap.release()
    while len(frames) < frame_count:
        if len(frames) > 0:
            frames.append(frames[-1])
        else:
            frames.append(np.zeros((img_size[0], img_size[1], 3)))
    frames = np.array(frames[:frame_count])
    if augment:
        augmenter = VideoAugmentation()
        frames = augmenter.random_horizontal_flip(frames)
        frames = augmenter.random_brightness_adjustment(frames)
        frames = augmenter.random_contrast_adjustment(frames)
        frames = augmenter.random_rotation(frames)
        frames = augmenter.random_temporal_crop(frames)
        frames = augmenter.random_frame_dropout(frames)
        frames = augmenter.add_gaussian_noise(frames)
    return frames

def load_dataset(data_dir, classes=['Violence', 'NonViolence'], augment_training=True):
    X, y = [], []
    for label, class_name in enumerate(classes):
        class_dir = os.path.join(data_dir, class_name)
        if not os.path.exists(class_dir):
            print(f"Warning: Directory not found: {class_dir}")
            continue
        video_files = [f for f in os.listdir(class_dir) if f.endswith(('.mp4', '.avi', '.mov'))]
        for video_file in tqdm(video_files, desc=f"Loading {class_name}"):
            video_path = os.path.join(class_dir, video_file)
            frames = preprocess_video(video_path, augment=False)
            if frames is not None:
                X.append(frames)
                y.append(label)
                if augment_training and (label == 0 or len(video_files) < 100):
                    for _ in range(2):
                        aug_frames = preprocess_video(video_path, augment=True)
                        if aug_frames is not None:
                            X.append(aug_frames)
                            y.append(label)
    return np.array(X), np.array(y)

def create_model(input_shape=(FRAME_COUNT, IMG_SIZE[0], IMG_SIZE[1], 3)):
    model = Sequential([
        Conv3D(32, (3, 3, 3), activation='relu', padding='same', input_shape=input_shape),
        BatchNormalization(),
        MaxPooling3D((2, 2, 2)),
        Conv3D(64, (3, 3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        MaxPooling3D((2, 2, 2)),
        Conv3D(128, (3, 3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        MaxPooling3D((2, 2, 2)),
        Flatten(),
        Dense(256, activation='relu'),
        BatchNormalization(),
        Dropout(0.5),
        Dense(128, activation='relu'),
        Dropout(0.3),
        Dense(1, activation='sigmoid')
    ])
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
        loss='binary_crossentropy',
        metrics=['accuracy', 'precision', 'recall']
    )
    return model

def convert_to_tflite(model, output_path='violence_detection_model.tflite', quantization='float16'):
    """Convert Keras model to TFLite with optional quantization"""
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    
    if quantization == 'float16':
        converter.target_spec.supported_types = [tf.float16]
    elif quantization == 'int8':
        converter.inference_type = tf.int8
        converter.inference_input_type = tf.int8
        converter.inference_output_type = tf.int8
        # Representative dataset for quantization (optional, improves accuracy)
        def representative_dataset():
            for _ in range(100):
                yield [np.random.rand(1, FRAME_COUNT, IMG_SIZE[0], IMG_SIZE[1], 3).astype(np.float32)]
        converter.representative_dataset = representative_dataset
    
    tflite_model = converter.convert()
    
    with open(output_path, 'wb') as f:
        f.write(tflite_model)
    print(f"TFLite model saved as '{output_path}'")

def plot_training_history(history):
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    axes[0, 0].plot(history.history['accuracy'], label='Training Accuracy')
    axes[0, 0].plot(history.history['val_accuracy'], label='Validation Accuracy')
    axes[0, 0].set_title('Model Accuracy')
    axes[0, 0].set_xlabel('Epoch')
    axes[0, 0].set_ylabel('Accuracy')
    axes[0, 0].legend()
    axes[0, 1].plot(history.history['loss'], label='Training Loss')
    axes[0, 1].plot(history.history['val_loss'], label='Validation Loss')
    axes[0, 1].set_title('Model Loss')
    axes[0, 1].set_xlabel('Epoch')
    axes[0, 1].set_ylabel('Loss')
    axes[0, 1].legend()
    axes[1, 0].plot(history.history['precision'], label='Training Precision')
    axes[1, 0].plot(history.history['val_precision'], label='Validation Precision')
    axes[1, 0].set_title('Model Precision')
    axes[1, 0].set_xlabel('Epoch')
    axes[1, 0].set_ylabel('Precision')
    axes[1, 0].legend()
    axes[1, 1].plot(history.history['recall'], label='Training Recall')
    axes[1, 1].plot(history.history['val_recall'], label='Validation Recall')
    axes[1, 1].set_title('Model Recall')
    axes[1, 1].set_xlabel('Epoch')
    axes[1, 1].set_ylabel('Recall')
    axes[1, 1].legend()
    plt.tight_layout()
    plt.savefig('training_history.png', dpi=300, bbox_inches='tight')
    plt.show()

def plot_confusion_matrix(y_true, y_pred, classes=['NonViolence', 'Violence']):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.savefig('confusion_matrix.png', dpi=300, bbox_inches='tight')
    plt.show()

def plot_roc_curve(y_true, y_prob):
    fpr, tpr, _ = roc_curve(y_true, y_prob)
    auc_score = roc_auc_score(y_true, y_prob)
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {auc_score:.3f})')
    plt.plot([0, 1], [0, 1], 'k--', label='Random Classifier')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.legend()
    plt.grid(True)
    plt.savefig('roc_curve.png', dpi=300, bbox_inches='tight')
    plt.show()

def evaluate_model_comprehensive(model, X_test, y_test, classes=['NonViolence', 'Violence']):
    print("=" * 50)
    print("COMPREHENSIVE MODEL EVALUATION")
    print("=" * 50)
    y_prob = model.predict(X_test).flatten()
    y_pred = (y_prob > 0.5).astype(int)
    loss, accuracy, precision, recall = model.evaluate(X_test, y_test, verbose=0)
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    auc_score = roc_auc_score(y_test, y_prob)
    print(f"Test Accuracy:  {accuracy:.4f}")
    print(f"Test Precision: {precision:.4f}")
    print(f"Test Recall:    {recall:.4f}")
    print(f"Test F1-Score:  {f1_score:.4f}")
    print(f"Test AUC Score: {auc_score:.4f}")
    print(f"Test Loss:      {loss:.4f}")
    print("\nDETAILED CLASSIFICATION REPORT:")
    print("-" * 50)
    print(classification_report(y_test, y_pred, target_names=classes, digits=4))
    print("\nCONFUSION MATRIX:")
    print("-" * 20)
    cm = confusion_matrix(y_test, y_pred)
    print(f"True Negatives:  {cm[0,0]}")
    print(f"False Positives: {cm[0,1]}")
    print(f"False Negatives: {cm[1,0]}")
    print(f"True Positives:  {cm[1,1]}")
    plot_confusion_matrix(y_test, y_pred, classes)
    plot_roc_curve(y_test, y_prob)
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1_score,
        'auc_score': auc_score,
        'loss': loss
    }

def evaluate_tflite_model(tflite_path, X_test, y_test, classes=['NonViolence', 'Violence']):
    """Evaluate TFLite model"""
    interpreter = tf.lite.Interpreter(model_path=tflite_path)
    interpreter.allocate_tensors()
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    
    y_prob = []
    for sample in tqdm(X_test, desc="Evaluating TFLite model"):
        interpreter.set_tensor(input_details[0]['index'], sample[np.newaxis, ...].astype(np.float32))
        interpreter.invoke()
        prob = interpreter.get_tensor(output_details[0]['index'])[0]
        y_prob.append(prob)
    
    y_prob = np.array(y_prob).flatten()
    y_pred = (y_prob > 0.5).astype(int)
    
    auc_score = roc_auc_score(y_test, y_prob)
    print("\nTFLite Model Evaluation:")
    print(f"AUC Score: {auc_score:.4f}")
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred, target_names=classes, digits=4))
    
    plot_confusion_matrix(y_test, y_pred, classes)
    plot_roc_curve(y_test, y_prob)

def main():
    data_dir = r'C:\Users\Namo\fight recognization\extract\Real Life Violence Dataset'
    print("Loading dataset with augmentation...")
    X, y = load_dataset(data_dir, augment_training=True)
    if len(X) == 0:
        print("No data loaded. Please check your dataset path.")
        return
    print(f"Total samples: {len(X)}")
    print(f"Violence samples: {np.sum(y)}")
    print(f"Non-violence samples: {len(y) - np.sum(y)}")
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    print(f"Training samples: {len(X_train)}")
    print(f"Testing samples: {len(X_test)}")
    class_weights = compute_class_weight(
        'balanced',
        classes=np.unique(y_train),
        y=y_train
    )
    class_weight_dict = {i: class_weights[i] for i in range(len(class_weights))}
    print(f"Class weights: {class_weight_dict}")
    model = create_model()
    model.summary()
    callbacks = [
        EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True,
            verbose=1
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=1e-7,
            verbose=1
        ),
        ModelCheckpoint(
            'best_violence_detection_model.keras',
            monitor='val_accuracy',
            save_best_only=True,
            verbose=1
        )
    ]
    print("Training model...")
    history = model.fit(
        X_train, y_train,
        validation_data=(X_test, y_test),
        batch_size=BATCH_SIZE,
        epochs=EPOCHS,
        class_weight=class_weight_dict,
        callbacks=callbacks,
        verbose=1
    )
    plot_training_history(history)
    model.load_weights('best_violence_detection_model.keras')
    metrics = evaluate_model_comprehensive(model, X_test, y_test)
    print("\nConverting to TFLite (Float16)...")
    convert_to_tflite(model, 'violence_detection_model_float16.tflite', quantization='float16')
    print("\nConverting to TFLite (Int8)...")
    convert_to_tflite(model, 'violence_detection_model_int8.tflite', quantization='int8')
    print("\nEvaluating TFLite Float16 model...")
    evaluate_tflite_model('violence_detection_model_float16.tflite', X_test, y_test)
    with open('model_metrics.txt', 'w') as f:
        f.write("Violence Detection Model Metrics\n")
        f.write("=" * 40 + "\n")
        for metric, value in metrics.items():
            f.write(f"{metric.capitalize()}: {value:.4f}\n")
    print("Training complete! Check the generated plots, metrics file, and TFLite models.")

if __name__ == '__main__':
    main()

Loading dataset with augmentation...


Loading Violence:  20%|████████████▎                                                | 202/1000 [03:16<24:34,  1.85s/it]