In [1]:
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
import joblib

In [2]:
# Load the TFLite version of MobileNet SSD model for human detection
tflite_model_path = "../model/mobilenet/1.tflite"
interpreter = tf.lite.Interpreter(model_path=tflite_model_path)
interpreter.allocate_tensors()

In [3]:
# Get input and output details
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

In [6]:
from dataclasses import dataclass
from typing import List, Tuple, Dict

In [8]:

@dataclass
class ModelConfig:
    cnn_img_size: Tuple[int, int] = (64, 64)
    i3d_frame_size: Tuple[int, int] = (224, 224)
    max_frames: int = 32
    confidence_threshold: float = 0.2
    human_class_id: int = 1
    ssd_input_size: Tuple[int, int] = (300, 300)

@dataclass
class ModelPaths:
    ssd_model: str
    cnn_model: str
    i3d_model: str
    label_map: str

class ActionPredictor:
    def __init__(self, config: ModelConfig, model_paths: ModelPaths):
        self.config = config
        self.cnn_model = joblib.load(model_paths.cnn_model)
        self.i3d_model = self._load_i3d_model(model_paths.i3d_model)
        self.labels = self._load_labels(model_paths.label_map)
        self.cnn_labels = {
            0: 'clapping', 
            1: 'dancing', 
            2: 'laughing', 
            3: 'running'
        }

    def _load_i3d_model(self, model_dir: str) -> tf.keras.Model:
        """Load I3D model with proper configuration."""
        return tf.compat.v1.saved_model.load_v2(model_dir, tags=['train'])

    def _load_labels(self, label_file: str) -> List[str]:
        """Load action labels from file."""
        with open(label_file, "r") as f:
            return [line.strip() for line in f.readlines()]

    def preprocess_cnn_frame(self, frame: np.ndarray) -> np.ndarray:
        """Preprocess frame for CNN model."""
        frame = cv2.resize(frame, self.config.cnn_img_size)
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        frame = img_to_array(frame) / 255.0
        return np.expand_dims(frame, axis=0)

    def preprocess_i3d_frames(self, frames: List[np.ndarray]) -> tf.Tensor:
        """Preprocess frame sequence for I3D model."""
        processed_frames = []
        for frame in frames:
            frame = cv2.resize(frame, self.config.i3d_frame_size)
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            processed_frames.append(frame / 255.0)
        return tf.convert_to_tensor(processed_frames, dtype=tf.float32)

    def predict_cnn(self, frame: np.ndarray) -> Tuple[str, float]:
        """Predict action using CNN model."""
        preprocessed = self.preprocess_cnn_frame(frame)
        predictions = self.cnn_model.predict(preprocessed)
        class_idx = np.argmax(predictions, axis=1)[0]
        return self.cnn_labels[class_idx], predictions[0][class_idx]

    def predict_i3d(self, frames: List[np.ndarray]) -> Dict[str, float]:
        """Predict action using I3D model."""
        if len(frames) < self.config.max_frames:
            # Pad frames if necessary
            frames.extend([frames[-1]] * (self.config.max_frames - len(frames)))
        
        preprocessed = self.preprocess_i3d_frames(frames[:self.config.max_frames])
        input_tensor = tf.expand_dims(preprocessed, axis=0)
        
        try:
            signature_key = list(self.i3d_model.signatures.keys())[0]
            predictions = self.i3d_model.signatures[signature_key](input_tensor)
            logits = predictions['default'].numpy()[0]
            probabilities = tf.nn.softmax(logits).numpy()
            
            # Get top 5 predictions
            top_indices = np.argsort(probabilities)[-5:][::-1]
            return {
                self.labels[idx]: float(probabilities[idx])
                for idx in top_indices
            }
        except Exception as e:
            print(f"I3D prediction error: {str(e)}")
            return {}

class HumanDetector:
    def __init__(self, model_path: str, config: ModelConfig):
        self.config = config
        self.interpreter = tf.lite.Interpreter(model_path=model_path)
        self.interpreter.allocate_tensors()
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()
        
        # Print model details for debugging
        print("Input details:", self.input_details)
        print("Output details:", self.output_details)
        
        # Get actual required input shape from model
        self.input_shape = self.input_details[0]['shape']
        print(f"Required input shape: {self.input_shape}")

    def preprocess_frame(self, frame: np.ndarray) -> np.ndarray:
        """
        Preprocess frame for MobileNet SSD model:
        - Resize to required dimensions
        - Convert to float32
        - Normalize to [0, 1]
        """
        # Resize frame
        input_frame = cv2.resize(frame, (self.input_shape[1], self.input_shape[2]))
        
        # Convert to float32 and normalize to [0, 1]
        input_frame = input_frame.astype(np.float32) / 255.0
        
        # Add batch dimension
        input_tensor = np.expand_dims(input_frame, axis=0)
        
        return input_tensor

    def detect(self, frame: np.ndarray) -> List[Tuple[int, int, int, int]]:
        """Detect humans in frame using MobileNet SSD."""
        if frame is None or frame.size == 0:
            return []

        original_h, original_w = frame.shape[:2]
        
        # Preprocess the frame
        input_tensor = self.preprocess_frame(frame)

        # Set the input tensor
        self.interpreter.set_tensor(self.input_details[0]['index'], input_tensor)
        self.interpreter.invoke()

        # Get output tensors
        detection_output = None
        for output in self.output_details:
            tensor = self.interpreter.get_tensor(output['index'])
            if len(tensor.shape) == 3 and tensor.shape[2] == 4:  # This is likely our detection tensor
                detection_output = tensor[0]
            
        if detection_output is None:
            print("Could not find detection output tensor")
            return []

        human_boxes = []
        num_detections = len(detection_output)
        
        for i in range(num_detections):
            detection = detection_output[i]
            score = detection[2]  # Typically: [ymin, xmin, score, class_id]
            class_id = int(detection[3])
            
            if score > self.config.confidence_threshold and class_id == self.config.human_class_id:
                # Get coordinates
                ymin, xmin = detection[0], detection[1]
                ymax, xmax = ymin + 0.1, xmin + 0.1  # Default small box if height/width not provided
                
                # Convert normalized coordinates to pixel coordinates
                startX = int(xmin * original_w)
                startY = int(ymin * original_h)
                endX = int(xmax * original_w)
                endY = int(ymax * original_h)
                
                # Ensure coordinates are within image bounds
                startX = max(0, startX)
                startY = max(0, startY)
                endX = min(original_w, endX)
                endY = min(original_h, endY)
                
                # Only add valid boxes
                if endX > startX and endY > startY:
                    human_boxes.append((startX, startY, endX, endY))

        return human_boxes

class ActionRecognitionSystem:
    def __init__(self, config: ModelConfig, model_paths: ModelPaths):
        self.config = config
        self.human_detector = HumanDetector(model_paths.ssd_model, config)
        self.action_predictor = ActionPredictor(config, model_paths)
        self.frame_buffer = []

    def process_frame(self, frame: np.ndarray) -> Dict:
        """Process a single frame and return detections and actions."""
        if frame is None or frame.size == 0:
            return {}

        # Buffer frames for I3D
        self.frame_buffer.append(frame.copy())
        if len(self.frame_buffer) > self.config.max_frames:
            self.frame_buffer.pop(0)

        # Detect humans
        human_boxes = self.human_detector.detect(frame)
        
        # Process individual humans with CNN
        cnn_actions = []
        for box in human_boxes:
            startX, startY, endX, endY = box
            if all(x >= 0 for x in [startX, startY, endX, endY]):
                human_frame = frame[startY:endY, startX:endX]
                if human_frame.size > 0:
                    action, confidence = self.action_predictor.predict_cnn(human_frame)
                    cnn_actions.append((action, confidence))

        # Process scene-level actions with I3D if enough frames
        i3d_actions = {}
        if len(self.frame_buffer) >= self.config.max_frames:
            i3d_actions = self.action_predictor.predict_i3d(self.frame_buffer)

        return {
            'num_people': len(human_boxes),
            'human_boxes': human_boxes,
            'cnn_actions': cnn_actions,
            'i3d_actions': i3d_actions
        }

def process_video(video_path: str, system: ActionRecognitionSystem):
    """Process video file and display frame-by-frame analysis."""
    cap = cv2.VideoCapture(video_path)

    try:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            result = system.process_frame(frame)
            
            # Draw bounding boxes and predictions on the frame
            for box in result['human_boxes']:
                startX, startY, endX, endY = box
                cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2)
            
            # Display CNN predictions
            for i, (action, confidence) in enumerate(result['cnn_actions']):
                cv2.putText(frame, f"Person {i+1}: {action} ({confidence:.2f})", 
                            (10, 30 + i * 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            
            # Display I3D predictions
            y_offset = 30 + len(result['cnn_actions']) * 30
            for i, (action, confidence) in enumerate(result['i3d_actions'].items()):
                cv2.putText(frame, f"Scene: {action} ({confidence:.2f})", 
                            (10, y_offset + i * 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            
            # Display the frame
            cv2.imshow("Frame", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    finally:
        cap.release()
        cv2.destroyAllWindows()

def main():
    config = ModelConfig()
    model_paths = ModelPaths(
        ssd_model="../model/mobilenet/1.tflite",
        cnn_model="../model/cnn/HAR_CNN.joblib",
        i3d_model="../model/i3d/",
        label_map="../model/i3d/label_map.txt"
    )

    system = ActionRecognitionSystem(config, model_paths)
    video_path = "../data/no-test.mp4"
    
    process_video(video_path, system)

if __name__ == "__main__":
    main()
    

Input details: [{'name': 'normalized_input_image_tensor', 'index': 0, 'shape': array([1, 1, 1, 3], dtype=int32), 'shape_signature': array([ 1, -1, -1,  3], dtype=int32), 'dtype': <class 'numpy.float32'>, 'quantization': (0.0, 0), 'quantization_parameters': {'scales': array([], dtype=float32), 'zero_points': array([], dtype=int32), 'quantized_dimension': 0}, 'sparsity_parameters': {}}]
Output details: [{'name': 'raw_outputs/box_encodings', 'index': 298, 'shape': array([ 1, 33,  4], dtype=int32), 'shape_signature': array([ 1, -1,  4], dtype=int32), 'dtype': <class 'numpy.float32'>, 'quantization': (0.0, 0), 'quantization_parameters': {'scales': array([], dtype=float32), 'zero_points': array([], dtype=int32), 'quantized_dimension': 0}, 'sparsity_parameters': {}}, {'name': 'raw_outputs/class_predictions', 'index': 302, 'shape': array([ 1, 33, 91], dtype=int32), 'shape_signature': array([ 1, -1, 91], dtype=int32), 'dtype': <class 'numpy.float32'>, 'quantization': (0.0, 0), 'quantization_par

KeyboardInterrupt: 