In [None]:
import os
import cv2
import torch
import numpy as np
import joblib
from torchvision import transforms
from ultralytics import YOLO
import mediapipe as mp

class VideoAnomalyPredictor:
    def __init__(self, config):
        self.config = config
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.load_models()
        self.initialize_components()
        
    def load_models(self):
        """Load trained model and preprocessing components"""
        self.model = torch.load(self.config['model_save_path'], map_location=self.device)
        self.model.eval()
        self.scaler = joblib.load(self.config['scaler_save_path'])
        self.label_mapping = joblib.load('label_mapping.joblib')
        self.reverse_mapping = {v: k for k, v in self.label_mapping.items()}
        
        # Load detection models
        self.yolo = YOLO('yolov8x-seg.pt')
        self.mp_pose = mp.solutions.pose.Pose(
            static_image_mode=False,
            min_detection_confidence=0.5,
            model_complexity=2
        )

    def initialize_components(self):
        """Initialize feature tracking components"""
        self.frame_buffer = deque(maxlen=32)
        self.tracker = InteractionTracker(window_size=30, feature_dim=4)

    def preprocess_frame(self, frame):
        """Process frame for feature extraction"""
        frame = cv2.resize(frame, (224, 224))
        return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    def extract_features(self, frame):
        """Extract features from a single frame"""
        # Detect people
        results = self.yolo(frame)[0]
        detections = []
        
        if results.boxes is not None and len(results.boxes) > 0:
            person_indices = [i for i, cls in enumerate(results.boxes.cls) if int(cls) == 0]
            
            for idx in person_indices:
                box = results.boxes[idx]
                x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
                cropped = frame[y1:y2, x1:x2]
                
                # Get pose keypoints
                results_pose = self.mp_pose.process(cropped)
                if results_pose.pose_landmarks:
                    kps = np.array([[lm.x, lm.y, lm.z, lm.visibility] 
                                   for lm in results_pose.pose_landmarks.landmark])
                else:
                    kps = np.zeros((33, 4))
                
                detections.append({
                    'bbox': [x1, y1, x2, y2],
                    'keypoints': kps,
                    'confidence': float(box.conf)
                })
        
        # Calculate interactions and temporal features
        interactions = self.calculate_interactions(detections)
        temporal = self.get_temporal_features(frame)
        tracked = self.tracker.update(interactions)
        group = self.tracker.group_analysis(detections)
        
        return spatial, temporal, tracked, group

    def predict_video(self, video_path, output_path=None):
        """Make predictions on a video file"""
        cap = cv2.VideoCapture(video_path)
        predictions = []
        
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
                
            # Preprocess and extract features
            processed_frame = self.preprocess_frame(frame)
            spatial, temporal, tracked, group = self.extract_features(processed_frame)
            
            # Combine and normalize features
            features = np.concatenate([spatial, temporal, tracked, group])
            features = self.scaler.transform([features])
            
            # Make prediction
            with torch.no_grad():
                tensor = torch.FloatTensor(features).to(self.device)
                outputs = self.model(tensor)
                pred = torch.argmax(outputs).item()
                label = self.reverse_mapping[pred]
                predictions.append(label)
            
            # Visualization
            cv2.putText(frame, f"Prediction: {label}", (10, 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            
            if output_path:
                cv2.imwrite(os.path.join(output_path, f"frame_{len(predictions):04d}.jpg"), frame)
            else:
                cv2.imshow('Prediction', frame)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
        
        cap.release()
        cv2.destroyAllWindows()
        return self.aggregate_predictions(predictions)

    def aggregate_predictions(self, predictions):
        """Combine frame predictions into final video prediction"""
        counts = np.bincount(predictions)
        final_label = self.reverse_mapping[np.argmax(counts)]
        confidence = np.max(counts) / len(predictions)
        return final_label, confidence

if __name__ == "__main__":
    config = {
        "model_save_path": "final_model.pth",
        "scaler_save_path": "final_scaler.joblib",
        "class_pattern": r"^([A-Za-z]+)\d+",
        "interaction_weights": {
            "tracked": 2.5,
            "group": 3.0
        }
    }
    
    predictor = VideoAnomalyPredictor(config)
    video_path = "path/to/your/video.mp4"
    prediction, confidence = predictor.predict_video(video_path)
    print(f"Video Prediction: {prediction} (Confidence: {confidence:.2%})")