In [11]:
import cv2
import yaml
import torch
import torch.cuda
import numpy as np
from ultralytics import YOLO
from tkinter import Tk, filedialog
from scipy.spatial.distance import cdist

In [None]:
class ViolenceFeatureExtractor:
    def __init__(self, detection_model_path, segmentation_model_path, pose_model_path):
        # Initialize device and GPU settings
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.print_gpu_info()
        
        # GPU optimization settings
        if torch.cuda.is_available():
            torch.backends.cudnn.benchmark = True
            torch.backends.cudnn.deterministic = False
            torch.cuda.empty_cache()
            torch.cuda.set_per_process_memory_fraction(0.8)
        
        # Load models and move to GPU
        self.detection_model = YOLO(detection_model_path).to(self.device)
        self.segmentation_model = YOLO(segmentation_model_path).to(self.device)
        self.pose_model = YOLO(pose_model_path).to(self.device)
        
        # Define violence-related objects and relevant classes
        self.violence_objects = ['knife', 'gun', 'baseball bat', 'stick', 'bottle']
        self.relevant_classes = ['person'] + self.violence_objects
        
        # Define colors for visualization
        self.colors = {
            'violence': (0, 0, 255),    # Red
            'person': (0, 255, 0),      # Green
            'interaction': (255, 0, 0),  # Blue
            'keypoint': (255, 255, 0),  # Yellow
            'connection': (0, 255, 255)  # Cyan
        }
        
        # Performance and detection settings
        self.frame_skip = 2
        self.input_size = 640
        self.conf_threshold = 0.5
        self.interaction_threshold = 0.5  # For person-to-person interaction detection

    def print_gpu_info(self):
        """Print GPU information"""
        print("\nGPU Information:")
        if torch.cuda.is_available():
            print(f"GPU Device: {torch.cuda.get_device_name(0)}")
            print(f"CUDA Version: {torch.version.cuda}")
            print(f"Total GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
            print(f"Available Memory: {torch.cuda.memory_allocated(0) / 1e9:.2f} GB")
        else:
            print("No GPU available. Using CPU.")

    def preprocess_frame(self, frame):
        """Preprocess frame for model input"""
        try:
            # Convert to RGB
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            
            # Calculate size to maintain aspect ratio
            h, w = frame_rgb.shape[:2]
            r = self.input_size / max(h, w)
            new_h, new_w = int(h * r), int(w * r)
            
            # Resize
            resized = cv2.resize(frame_rgb, (new_w, new_h))
            
            # Create canvas of input_size x input_size
            canvas = np.zeros((self.input_size, self.input_size, 3), dtype=np.uint8)
            
            # Calculate padding
            pad_h = (self.input_size - new_h) // 2
            pad_w = (self.input_size - new_w) // 2
            
            # Place resized image on canvas
            canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized
            
            # Normalize
            normalized = canvas.astype(np.float32) / 255.0
            
            return normalized, (r, pad_w, pad_h)
            
        except Exception as e:
            print(f"Error in preprocessing: {e}")
            return None, None

    def analyze_person_interactions(self, person_boxes):
        """Analyze interactions between detected people"""
        interactions = []
        if len(person_boxes) < 2:
            return interactions

        for i in range(len(person_boxes)):
            for j in range(i + 1, len(person_boxes)):
                box1 = person_boxes[i]
                box2 = person_boxes[j]
                
                # Calculate centers
                center1 = [(box1[0] + box1[2])/2, (box1[1] + box1[3])/2]
                center2 = [(box2[0] + box2[2])/2, (box2[1] + box2[3])/2]
                
                # Calculate distance and box sizes
                distance = np.sqrt((center1[0] - center2[0])**2 + (center1[1] - center2[1])**2)
                box1_size = (box1[2] - box1[0]) * (box1[3] - box1[1])
                box2_size = (box2[2] - box2[0]) * (box2[3] - box2[1])
                avg_size = (box1_size + box2_size) / 2
                
                # Check for close interaction
                if distance < avg_size * self.interaction_threshold:
                   interactions.append({
                         'person1_idx': i,
                         'person2_idx': j,
                         'distance': distance,
                         'relative_distance': distance / avg_size,
                         'center1': center1,
                         'center2': center2,
                         'box1': box1,
                         'box2': box2
                     })
        
        return interactions

    def calculate_motion_features(self, prev_poses, current_poses):
        """Calculate motion features between consecutive frames"""
        try:
            if not prev_poses or not current_poses:
                return {
                    'average_speed': 0,
                    'motion_intensity': 0,
                    'sudden_movements': 0
                }

            # Convert poses to numpy arrays
            prev_poses = np.array(prev_poses)
            current_poses = np.array(current_poses)

            if prev_poses.shape == current_poses.shape:
                # Calculate displacement
                displacement = np.linalg.norm(current_poses - prev_poses, axis=2)
                average_speed = np.mean(displacement)
                motion_intensity = np.std(displacement)
                sudden_movements = np.sum(displacement > np.mean(displacement) + 2 * np.std(displacement))

                return {
                    'average_speed': float(average_speed),
                    'motion_intensity': float(motion_intensity),
                    'sudden_movements': int(sudden_movements)
                }
            
            return {
                'average_speed': 0,
                'motion_intensity': 0,
                'sudden_movements': 0
            }
            
        except Exception as e:
            print(f"Error in motion calculation: {e}")
            return {
                'average_speed': 0,
                'motion_intensity': 0,
                'sudden_movements': 0
            }
    def analyze_poses_for_violence(self, poses):
        """Analyze poses for potential aggressive/violent behavior"""
        try:
            if not poses:
                return False

            for pose in poses:
                # Convert pose to numpy array for calculations
                pose_array = np.array(pose)
                
                # Check for rapid arm movements (high confidence keypoints only)
                arm_keypoints = [5, 7, 9, 6, 8, 10]  # Shoulders, elbows, wrists
                arm_positions = pose_array[arm_keypoints]
                arm_confidences = arm_positions[:, 2]
                
                if np.mean(arm_confidences) > 0.5:
                    # Calculate arm angles and velocities
                    # Add your specific pose analysis logic here
                    return True
                    
            return False
            
        except Exception as e:
            print(f"Error in pose analysis: {e}")
            return False

    def rescale_coords(self, x, y, scale_info):
        """Rescale coordinates back to original image size"""
        scale, pad_w, pad_h = scale_info
        x_orig = (x - pad_w) / scale
        y_orig = (y - pad_h) / scale
        return int(x_orig), int(y_orig)

    def draw_detections(self, frame, det_results, pose_results, interactions, scale_info):
        """Draw detections, poses, and interactions"""
        display_frame = frame.copy()

        # Draw object detections
        for result in det_results:
            boxes = result.boxes
            for box in boxes:
                try:
                    # Get box coordinates and rescale them
                    x1, y1, x2, y2 = map(float, box.xyxy[0].cpu().numpy())
                    x1, y1 = self.rescale_coords(x1, y1, scale_info)
                    x2, y2 = self.rescale_coords(x2, y2, scale_info)
                    
                    cls = result.names[int(box.cls[0])]
                    conf = float(box.conf[0])

                    # Only draw relevant classes
                    if cls in self.relevant_classes:
                        color = (self.colors['violence'] if cls in self.violence_objects 
                                else self.colors['person'])

                        cv2.rectangle(display_frame, (x1, y1), (x2, y2), color, 2)
                        label = f'{cls} {conf:.2f}'
                        
                        (text_w, text_h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)
                        cv2.rectangle(display_frame, (x1, y1-text_h-5), (x1+text_w, y1), color, -1)
                        cv2.putText(display_frame, label, (x1, y1-5), 
                                  cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)

                except Exception as e:
                    print(f"Error in detection drawing: {e}")
                    continue

        # Draw interactions
        for interaction in interactions:
            try:
            # Get centers from interaction data
                 x1, y1 = self.rescale_coords(interaction['center1'][0], interaction['center1'][1], scale_info)
                 x2, y2 = self.rescale_coords(interaction['center2'][0], interaction['center2'][1], scale_info)
              
            # Draw line between interacting people
                 cv2.line(display_frame, (x1, y1), (x2, y2), self.colors['interaction'], 2)
            
            # Optional: Draw interaction distance
                 mid_point = ((x1 + x2)//2, (y1 + y2)//2)
                 distance_label = f"D: {interaction['relative_distance']:.2f}"
                 cv2.putText(display_frame, distance_label, mid_point, 
                         cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.colors['interaction'], 2)
            
            except Exception as e:
                 print(f"Error drawing interaction: {e}")
                 continue

        # Draw pose keypoints and connections
        if pose_results:
            for result in pose_results:
                if result.keypoints is not None:
                    for kpts in result.keypoints:
                        try:
                            keypoints_data = kpts.data[0].cpu().numpy()
                            
                            # Draw keypoints
                            for keypoint in keypoints_data:
                                x, y, conf = keypoint
                                if conf > 0.5:
                                    x, y = self.rescale_coords(x, y, scale_info)
                                    cv2.circle(display_frame, (x, y), 4, self.colors['keypoint'], -1)

                            # Draw connections
                            connections = [(5,7), (7,9), (6,8), (8,10), (5,6), 
                                         (11,13), (13,15), (12,14), (14,16), (11,12)]
                            for connection in connections:
                                pt1 = keypoints_data[connection[0]]
                                pt2 = keypoints_data[connection[1]]
                                
                                if pt1[2] > 0.5 and pt2[2] > 0.5:
                                    x1, y1 = self.rescale_coords(pt1[0], pt1[1], scale_info)
                                    x2, y2 = self.rescale_coords(pt2[0], pt2[1], scale_info)
                                    cv2.line(display_frame, (x1, y1), (x2, y2),
                                           self.colors['connection'], 2)
                        except Exception as e:
                            print(f"Error in pose drawing: {e}")
                            continue

        # Add violence indicators
        if self.current_risk_level > 0.7:  # High risk threshold
            cv2.putText(display_frame, "HIGH RISK", (10, 60), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

        # Add frame information
        cv2.putText(display_frame, "Press 'q' to quit, 'p' to pause/resume", 
                   (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

        return display_frame

    def extract_features(self, frame, prev_frame_data=None):
        """Extract violence-relevant features from frame"""
        try:
            # Preprocess frame
            processed_frame, scale_info = self.preprocess_frame(frame)
            if processed_frame is None:
                return None, frame

            # Convert to tensor and add batch dimension
            frame_tensor = torch.from_numpy(processed_frame).permute(2, 0, 1).unsqueeze(0).to(self.device)

            # Run models with GPU acceleration
            with torch.cuda.amp.autocast():
                det_results = self.detection_model(frame_tensor, verbose=False)
                pose_results = self.pose_model(frame_tensor, verbose=False)

            # Initialize features
            features = {
                'objects': [],
                'poses': [],
                'interactions': [],
                'motion': {},
                'violence_indicators': {
                    'weapon_present': False,
                    'close_interaction': False,
                    'rapid_motion': False,
                    'aggressive_pose': False
                }
            }

            # Process relevant detections
            person_boxes = []
            for result in det_results:
                for box in result.boxes:
                    try:
                        cls = result.names[int(box.cls[0])]
                        if cls in self.relevant_classes:
                            conf = float(box.conf[0])
                            box_coords = box.xyxy[0].cpu().numpy().tolist()
                            
                            features['objects'].append({
                                'class': cls,
                                'confidence': conf,
                                'box': box_coords
                            })
                            
                            if cls == 'person':
                                person_boxes.append(box_coords)
                            elif cls in self.violence_objects:
                                features['violence_indicators']['weapon_present'] = True
                    except Exception as e:
                        print(f"Error processing detection: {e}")
                        continue

            # Analyze person interactions
            if len(person_boxes) >= 2:
                interactions = self.analyze_person_interactions(person_boxes)
                features['interactions'] = interactions
                features['violence_indicators']['close_interaction'] = len(interactions) > 0

            # Process poses and analyze for violence
            if pose_results:
                for result in pose_results:
                    if result.keypoints is not None:
                        for kpts in result.keypoints:
                            try:
                                pose_data = kpts.data[0].cpu().numpy().tolist()
                                features['poses'].append(pose_data)
                            except Exception as e:
                                print(f"Error processing pose: {e}")
                                continue

                features['violence_indicators']['aggressive_pose'] = self.analyze_poses_for_violence(features['poses'])

            # Calculate motion features
            if prev_frame_data and 'poses' in prev_frame_data:
                motion_features = self.calculate_motion_features(
                    prev_frame_data['poses'], features['poses'])
                features['motion'] = motion_features
                
                features['violence_indicators']['rapid_motion'] = motion_features.get('average_speed', 0) > 10

            # Calculate overall risk level
            risk_weights = {
                'weapon_present': 0.4,
                'close_interaction': 0.3,
                'rapid_motion': 0.2,
                'aggressive_pose': 0.1
            }
            
            self.current_risk_level = sum(
                risk_weights[indicator] * int(value)
                for indicator, value in features['violence_indicators'].items()
            )

            # Draw detections
            annotated_frame = self.draw_detections(
                frame, det_results, pose_results, 
                features['interactions'], scale_info
            )

            return features, annotated_frame

        except Exception as e:
            print(f"Error in feature extraction: {e}")
            return None, frame

In [13]:

def process_video(video_path, extractor, output_path):
    """Process video with GPU acceleration"""
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Could not open video file")
        return

    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Create video writer
    output_video_path = video_path.rsplit('.', 1)[0] + '_analyzed.mp4'
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

    # Initialize data storage
    video_data = {
        'metadata': {
            'path': video_path,
            'fps': fps,
            'frame_count': frame_count,
            'width': frame_width,
            'height': frame_height
        },
        'frames': []
    }

    frame_idx = 0
    prev_frame_data = None
    paused = False

    try:
        while True:
            if not paused:
                ret, frame = cap.read()
                if not ret:
                    break

                # Skip frames if needed
                if frame_idx % extractor.frame_skip != 0:
                    frame_idx += 1
                    continue

                # Extract features and get annotated frame
                features, annotated_frame = extractor.extract_features(frame, prev_frame_data)
                
                if features is not None:
                    frame_data = {
                        'frame_index': frame_idx,
                        'timestamp': frame_idx / fps,
                        'features': features
                    }
                    
                    video_data['frames'].append(frame_data)
                    prev_frame_data = features
                    out.write(annotated_frame)

                    # Show progress
                    if frame_idx % (30 * extractor.frame_skip) == 0:
                        progress = (frame_idx / frame_count) * 100
                        print(f"Processing: {progress:.1f}% complete")
                        if torch.cuda.is_available():
                            print(f"GPU Memory: {torch.cuda.memory_allocated() / 1e9:.2f} GB")

                    # Display frame
                    cv2.imshow('Violence Detection Analysis', annotated_frame)

                frame_idx += 1

                # Handle key events
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    break
                elif key == ord('p'):
                    paused = not paused
                    print("Paused - Press 'p' to resume" if paused else "Resumed")

    except Exception as e:
        print(f"Error during processing: {e}")
        import traceback
        traceback.print_exc()

    finally:
        # Cleanup
        cap.release()
        out.release()
        cv2.destroyAllWindows()
        
        # Final GPU cleanup
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

        # Save features
        try:
            with open(output_path, 'w') as f:
                yaml.dump(video_data, f, default_flow_style=False)
            print(f"Features saved to: {output_path}")
            print(f"Analyzed video saved to: {output_video_path}")
        except Exception as e:
            print(f"Error saving data: {e}")

    return video_data

if __name__ == "__main__":
    # Initialize Tkinter
    root = Tk()
    root.withdraw()

    # Get video file
    video_path = filedialog.askopenfilename(
        title="Select Video File",
        filetypes=[("Video Files", "*.mp4;*.avi")]
    )

    if not video_path:
        print("No video file selected")
        exit()

In [10]:
detection_model_path = r'C:\Users\harme\Desktop\violence detection\yolo11m.pt'
segmentation_model_path = r'C:\Users\harme\Desktop\violence detection\yolo11m-seg.pt'
pose_model_path = r'C:\Users\harme\Desktop\violence detection\yolo11m-pose.pt'
    
extractor = ViolenceFeatureExtractor(
        detection_model_path,
        segmentation_model_path,
        pose_model_path
    )

    # Process video
output_path = r'C:\Users\harme\Desktop\video-detect-gpu\violence_features.yaml'
video_data = process_video(video_path, extractor, output_path)
print("Analysis complete!")




GPU Information:
GPU Device: NVIDIA GeForce RTX 3050 Laptop GPU
CUDA Version: 11.8
Total GPU Memory: 4.29 GB
Available Memory: 0.00 GB


  with torch.cuda.amp.autocast():


Processing: 0.0% complete
GPU Memory: 0.30 GB
Processing: 90.9% complete
GPU Memory: 0.30 GB
Features saved to: C:\Users\harme\Desktop\video-detect-gpu\violence_features.yaml
Analyzed video saved to: C:/Users/harme/Desktop/video-detect-gpu/NV_1_analyzed.mp4
Analysis complete!
