In [24]:
import cv2
import numpy as np
import tensorrt as trt
import torch
import os
from collections import deque
import gym
from gym import spaces
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv

# Ensure CUDA is available
if not torch.cuda.is_available():
    raise RuntimeError("CUDA is not available. Please check your GPU and PyTorch installation.")

class CombinedVisualizer:
    def __init__(self, window_size=100):
        self.window_size = window_size
        self.activities = deque(maxlen=window_size)
        self.activity_counts = {
            "walking": 0,
            "standing": 0,
            "sitting": 0,
            "pawing": 0,
            "Unknown": 0,
        }
        self.total_frames = 0

    def update(self, activity):
        self.activities.append(activity)
        self.activity_counts[activity] += 1
        self.total_frames += 1

    def create_visualization(self, frame):
        frame_height, frame_width = frame.shape[:2]
        defrag_height = 150
        defrag_width = frame_width
        defrag_image = np.ones((defrag_height, defrag_width, 3), dtype=np.uint8) * 255

        colors = {
            "walking": (0, 0, 255),  # Red
            "standing": (255, 0, 0),  # Blue
            "sitting": (0, 255, 0),  # Green
            "pawing": (255, 165, 0),  # Orange
            "Unknown": (128, 128, 128),  # Gray
        }

        segment_width = defrag_width // self.window_size
        for i, activity in enumerate(self.activities):
            x_start = i * segment_width
            x_end = x_start + segment_width
            color = colors.get(activity, (128, 128, 128))
            cv2.rectangle(defrag_image, (x_start, 0), (x_end, 80), color, -1)

        font = cv2.FONT_HERSHEY_SIMPLEX
        legend_items = [
            ("Walking", (0, 0, 255)),
            ("Standing", (255, 0, 0)),
            ("Sitting", (0, 255, 0)),
            ("Pawing", (255, 165, 0)),
            ("Unknown", (128, 128, 128)),
        ]

        x_offset = 10
        y_offset = 120
        for text, color in legend_items:
            cv2.rectangle(
                defrag_image,
                (x_offset, y_offset - 15),
                (x_offset + 20, y_offset + 5),
                color,
                -1,
            )
            percentage = (
                self.activity_counts.get(text.lower(), 0) / max(1, self.total_frames)
            ) * 100
            cv2.putText(
                defrag_image,
                f"{text}: {percentage:.1f}%",
                (x_offset + 30, y_offset),
                font,
                0.5,
                (0, 0, 0),
                1,
            )
            x_offset += 160

        combined_height = frame_height + defrag_height
        combined_image = np.zeros((combined_height, frame_width, 3), dtype=np.uint8)
        combined_image[:frame_height] = frame
        combined_image[frame_height:] = defrag_image

        return combined_image

class TensorRTInference:
    """TensorRT inference class for ViTPose model, optimized for GPU with TensorRT 10.8.0.43"""
    def __init__(self, engine_path):
        """Initialize TensorRT engine and allocate buffers on GPU."""
        self.logger = trt.Logger(trt.Logger.INFO)
        self.runtime = trt.Runtime(self.logger)

        # Load engine from file
        with open(engine_path, 'rb') as f:
            engine_bytes = f.read()
            self.engine = self.runtime.deserialize_cuda_engine(engine_bytes)

        if not self.engine:
            raise RuntimeError(f"Failed to load TensorRT engine from {engine_path}")

        self.context = self.engine.create_execution_context()
        
        # Define input and output shapes
        self.input_shape = (1, 3, 256, 192)  # ViTPose standard input
        self.output_shape = (1, 17, 64, 48)  # ViTPose standard output (17 keypoints)
        
        # Tensor names (adjust based on your engine)
        self.input_name = "input"
        self.output_name = "output"
        
        # Allocate GPU buffers
        self.allocate_buffers()

    def allocate_buffers(self):
        """Allocate CUDA memory for inputs and outputs."""
        self.inputs = []
        self.outputs = []
        self.bindings = []

        num_io_tensors = self.engine.num_io_tensors
        for i in range(num_io_tensors):
            tensor_name = self.engine.get_tensor_name(i)
            tensor_shape = self.engine.get_tensor_shape(tensor_name)
            tensor_dtype = trt.nptype(self.engine.get_tensor_dtype(tensor_name))
            
            if tensor_dtype == np.float32:
                torch_dtype = torch.float32
            elif tensor_dtype == np.float16:
                torch_dtype = torch.float16
            else:
                raise ValueError(f"Unsupported dtype: {tensor_dtype}")

            # Explicitly allocate on GPU (CUDA)
            tensor = torch.zeros(tuple(tensor_shape), dtype=torch_dtype, device='cuda')
            self.bindings.append(tensor.data_ptr())
            
            if self.engine.get_tensor_mode(tensor_name) == trt.TensorIOMode.INPUT:
                self.inputs.append(tensor)
            else:
                self.outputs.append(tensor)

            self.context.set_tensor_address(tensor_name, tensor.data_ptr())

    def preprocess_image(self, img):
        """Preprocess image for model input on GPU."""
        img = cv2.resize(img, (192, 256))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = img.astype(np.float32) / 255.0
        img = img.transpose(2, 0, 1)  # HWC to CHW
        img = np.expand_dims(img, axis=0)  # Add batch dimension
        
        # Move to GPU
        return torch.from_numpy(img).to('cuda')

    def postprocess_output(self, output):
        """Convert model output to keypoints on GPU, then move to CPU for further processing."""
        heatmaps = output.reshape(self.output_shape)
        keypoints = {}
        for batch_idx in range(heatmaps.shape[0]):
            person_keypoints = torch.zeros((17, 3), device='cuda')  # Process on GPU
            for kpt_idx in range(17):
                heatmap = heatmaps[batch_idx, kpt_idx]
                flat_idx = torch.argmax(heatmap)
                y, x = torch.unravel_index(flat_idx, heatmap.shape)
                orig_x = x * (192 / 48)  # Scale to input width
                orig_y = y * (256 / 64)  # Scale to input height
                confidence = heatmap[y, x]
                person_keypoints[kpt_idx] = torch.tensor([orig_x, orig_y, confidence], device='cuda')
            keypoints[batch_idx] = person_keypoints.cpu().numpy()  # Move to CPU for compatibility
        return keypoints
    
    def infer(self, img):
        """Run inference on an image using GPU."""
        preprocessed = self.preprocess_image(img)
        self.inputs[0].copy_(preprocessed)
        
        # Execute inference asynchronously on GPU
        self.context.execute_async_v3(stream_handle=torch.cuda.current_stream().cuda_stream)
        torch.cuda.synchronize()  # Wait for GPU computation to finish
        
        output = self.outputs[0]  # Already on GPU
        keypoints = self.postprocess_output(output)
        return keypoints

class HorseGaitEnv(gym.Env):
    """Custom Gym environment for horse gait RL."""
    def __init__(self, monitor):
        super(HorseGaitEnv, self).__init__()
        self.monitor = monitor
        self.action_space = spaces.Discrete(5)  # 5 actions: walking, standing, sitting, pawing, unknown
        # State space: keypoints (17 * 3), movement buffer, front paw history
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(17*3 + 10 + 30*2,), dtype=np.float32)
        self.current_keypoints = None
        self.frame_count = 0

    def reset(self):
        self.monitor.prev_positions = None
        self.monitor.movement_buffer.clear()
        self.monitor.front_paw_positions_history.clear()
        self.frame_count = 0
        return self._get_observation()

    def step(self, action):
        predicted_state = ["walking", "standing", "sitting", "pawing", "unknown"][action]
        heuristic_state = self.monitor.detect_state(self.current_keypoints)  # Use existing heuristic
        
        # Reward function
        reward = 1.0 if predicted_state == heuristic_state else -0.5
        
        # Additional reward shaping for pawing
        if predicted_state == "pawing" and self.monitor.detect_pawing_pattern(
            [self.current_keypoints[0][7][:2], self.current_keypoints[0][10][:2]],  # Front paws
            [self.monitor.calculate_angle(*[np.array(self.current_keypoints[0][i][:2]) for i in [5, 6, 7]]),
             self.monitor.calculate_angle(*[np.array(self.current_keypoints[0][i][:2]) for i in [8, 9, 10]])])
        ):
            reward += 2.0  # Bonus for correct pawing detection
        
        self.monitor.visualizer.update(predicted_state)
        self.frame_count += 1
        
        done = self.frame_count >= 1000  # Arbitrary episode length
        return self._get_observation(), reward, done, {}

    def _get_observation(self):
        if self.current_keypoints is None:
            return np.zeros(self.observation_space.shape, dtype=np.float32)
        
        keypoints_flat = self.current_keypoints[0].flatten()  # 17 keypoints * 3 (x, y, conf)
        movement_buffer = np.array(self.monitor.movement_buffer + [0] * (10 - len(self.monitor.movement_buffer)))
        paw_history = np.array(list(self.monitor.front_paw_positions_history) + [[0, 0]] * (30 - len(self.monitor.front_paw_positions_history))).flatten()
        return np.concatenate([keypoints_flat, movement_buffer, paw_history])

class HorseGaitMonitor:
    def __init__(self, model_path, output_dir="monitoring_output"):
        """Initialize the horse gait monitoring system with TensorRT on GPU and RL."""
        self.model = TensorRTInference(model_path)
        
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)

        self.pose_dirs = {
            "standing": os.path.join(output_dir, "standing"),
            "walking": os.path.join(output_dir, "walking"),
            "sitting": os.path.join(output_dir, "sitting"),
            "pawing": os.path.join(output_dir, "pawing"),
        }
        for dir_path in self.pose_dirs.values():
            os.makedirs(dir_path, exist_ok=True)

        self.prev_positions = None
        self.movement_buffer = deque(maxlen=10)
        self.state_buffer = deque(maxlen=15)
        self.visualizer = CombinedVisualizer()
        
        # New variables for improved pawing detection
        self.front_paw_positions_history = deque(maxlen=30)
        self.front_leg_angles_history = deque(maxlen=30)
        self.pawing_pattern_count = 0
        self.pawing_cooldown = 0
        self.pawing_detection_threshold = 3
        self.pawing_angle_threshold = (100, 140)

        # RL Integration
        self.env = HorseGaitEnv(self)
        self.rl_model = DQN("MlpPolicy", DummyVecEnv([lambda: self.env]), verbose=1, device="cuda")
        self.is_training = True  # Toggle training mode

    def calculate_angle(self, p1, p2, p3):
        """Calculate angle between three points (CPU-based for simplicity)."""
        v1 = p1 - p2
        v2 = p3 - p2
        cos_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
        angle = np.degrees(np.arccos(np.clip(cos_angle, -1.0, 1.0)))
        return angle

    def detect_pawing_pattern(self, front_paw_positions, front_leg_angles):
        """Detect pawing pattern by analyzing the history of front paw movements and leg angles."""
        self.front_paw_positions_history.append(front_paw_positions)
        self.front_leg_angles_history.append(front_leg_angles)
        
        if len(self.front_paw_positions_history) < 10:
            return False
            
        for leg_idx in range(2):
            y_positions = np.array([pos[leg_idx][1] for pos in self.front_paw_positions_history])
            y_movements = np.diff(y_positions)
            direction_changes = np.sign(y_movements[1:]) != np.sign(y_movements[:-1])
            num_direction_changes = np.sum(direction_changes)
            
            angles = np.array([angles[leg_idx] for angles in self.front_leg_angles_history])
            
            if num_direction_changes >= 3:
                vertical_range = np.max(y_positions) - np.min(y_positions)
                angles_in_range = np.any((angles >= self.pawing_angle_threshold[0]) & 
                                        (angles <= self.pawing_angle_threshold[1]))
                
                if vertical_range > 15 and angles_in_range:
                    return True
                    
        return False

    def detect_state(self, keypoints):
        """Enhanced detect_state method with improved pawing detection (used for heuristic rewards)."""
        movement_detected = False

        for person_id, kp_array in keypoints.items():
            leg_points = {
                "L_F_Hip": kp_array[5][:2],
                "L_F_Knee": kp_array[6][:2],
                "L_F_Paw": kp_array[7][:2],
                "R_F_Hip": kp_array[8][:2],
                "R_F_Knee": kp_array[9][:2],
                "R_F_Paw": kp_array[10][:2],
                "L_B_Hip": kp_array[11][:2],
                "L_B_Knee": kp_array[12][:2],
                "L_B_Paw": kp_array[13][:2],
                "R_B_Hip": kp_array[14][:2],
                "R_B_Knee": kp_array[15][:2],
                "R_B_Paw": kp_array[16][:2]
            }

            angles = {
                "left_front": self.calculate_angle(
                    np.array(leg_points["L_F_Hip"]),
                    np.array(leg_points["L_F_Knee"]),
                    np.array(leg_points["L_F_Paw"])
                ),
                "right_front": self.calculate_angle(
                    np.array(leg_points["R_F_Hip"]),
                    np.array(leg_points["R_F_Knee"]),
                    np.array(leg_points["R_F_Paw"])
                ),
                "left_back": self.calculate_angle(
                    np.array(leg_points["L_B_Hip"]),
                    np.array(leg_points["L_B_Knee"]),
                    np.array(leg_points["L_B_Paw"])
                ),
                "right_back": self.calculate_angle(
                    np.array(leg_points["R_B_Hip"]),
                    np.array(leg_points["R_B_Knee"]),
                    np.array(leg_points["R_B_Paw"])
                )
            }
            
            front_paw_positions = [leg_points["L_F_Paw"], leg_points["R_F_Paw"]]
            front_leg_angles = [angles["left_front"], angles["right_front"]]
            
            if self.pawing_cooldown > 0:
                self.pawing_cooldown -= 1
                return "pawing"
                
            if self.detect_pawing_pattern(front_paw_positions, front_leg_angles):
                self.pawing_pattern_count += 1
                if self.pawing_pattern_count >= self.pawing_detection_threshold:
                    self.pawing_pattern_count = 0
                    self.pawing_cooldown = 15
                    return "pawing"
            else:
                self.pawing_pattern_count = max(0, self.pawing_pattern_count - 0.5)
            
            if all(angle < 90 for angle in angles.values()):
                return "sitting"

            paw_positions = np.array([
                leg_points["L_F_Paw"],
                leg_points["R_F_Paw"],
                leg_points["L_B_Paw"],
                leg_points["R_B_Paw"]
            ])

            if self.prev_positions is not None:
                movements = np.linalg.norm(paw_positions - self.prev_positions, axis=1)
                self.movement_buffer.append(np.mean(movements))
                if len(self.movement_buffer) > 10:
                    self.movement_buffer.pop(0)
                movement_detected = np.mean(self.movement_buffer) > 5.0

            self.prev_positions = paw_positions

        current_state = "walking" if movement_detected else "standing"
        self.state_buffer.append(current_state)
        if len(self.state_buffer) > 15:
            self.state_buffer.pop(0)
        return max(set(self.state_buffer), key=self.state_buffer.count)

    def draw_state_annotation(self, frame, state):
        """Draw state annotation on frame without detailed visualization."""
        return frame

    def save_frame(self, frame, state, frame_count):
        """Save frame to appropriate directory based on state."""
        if state in self.pose_dirs:
            filename = os.path.join(self.pose_dirs[state], f"frame_{frame_count}.jpg")
            cv2.imwrite(filename, frame)

    def draw_keypoints(self, frame, keypoints):
        """Return original frame without keypoints drawing."""
        return frame

    def process_video(self, video_path):
        """Process video file and analyze horse gait on GPU with RL."""
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError(f"Could not open video file: {video_path}")

        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        combined_height = height + 150
        output_path = os.path.join(self.output_dir, "video_with_analysis_rl.mp4")
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, combined_height))

        frame_count = 0
        obs = self.env.reset()

        cv2.namedWindow("Horse Gait Analysis", cv2.WINDOW_NORMAL)

        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    break

                keypoints = self.model.infer(frame)
                self.env.current_keypoints = keypoints

                # RL Prediction
                action, _ = self.rl_model.predict(obs, deterministic=not self.is_training)
                current_state = ["walking", "standing", "sitting", "pawing", "unknown"][action]
                
                obs, reward, done, info = self.env.step(action)
                
                if self.is_training:
                    self.rl_model.learn(total_timesteps=1, reset_num_timesteps=False)

                annotated_frame = frame.copy()
                self.save_frame(annotated_frame, current_state, frame_count)

                combined_display = self.visualizer.create_visualization(annotated_frame)
                cv2.imshow("Horse Gait Analysis", combined_display)
                out.write(combined_display)

                frame_count += 1
                if done:
                    obs = self.env.reset()

                if cv2.waitKey(1) & 0xFF == ord("q"):
                    break

        finally:
            cap.release()
            out.release()
            cv2.destroyAllWindows()
            print(f"\nProcessing complete! Output saved to: {output_path}")

def main():
    """Main function to run the horse gait analysis on GPU with RL."""
    engine_path = "C:/Users/hp/Downloads/vitpose-l-ap10k.engine"
    try:
        print(f"Using GPU: {torch.cuda.get_device_name(0)}")
        monitor = HorseGaitMonitor(engine_path)
        monitor.process_video("E:/vitpose/strach.mp4")
    except Exception as e:
        print(f"Error processing video: {str(e)}")
        raise

if __name__ == "__main__":
    main()

IndentationError: unindent does not match any outer indentation level (<tokenize>, line 317)

In [2]:
import cv2
import numpy as np
import tensorrt as trt
import torch
import os
from collections import deque

# Ensure CUDA is available
if not torch.cuda.is_available():
    raise RuntimeError("CUDA is not available. Please check your GPU and PyTorch installation.")

class CombinedVisualizer:
    def __init__(self, window_size=100):
        self.window_size = window_size
        self.activities = deque(maxlen=window_size)
        self.activity_counts = {
            "walking": 0,
            "standing": 0,
            "sitting": 0,
            "pawing": 0,
            "Unknown": 0,
        }
        self.total_frames = 0

    def update(self, activity):
        self.activities.append(activity)
        self.activity_counts[activity] += 1
        self.total_frames += 1

    def create_visualization(self, frame):
        frame_height, frame_width = frame.shape[:2]
        defrag_height = 150
        defrag_width = frame_width
        defrag_image = np.ones((defrag_height, defrag_width, 3), dtype=np.uint8) * 255

        colors = {
            "walking": (0, 0, 255),  # Red
            "standing": (255, 0, 0),  # Blue
            "sitting": (0, 255, 0),  # Green
            "pawing": (255, 165, 0),  # Orange
            "Unknown": (128, 128, 128),  # Gray
        }

        segment_width = defrag_width // self.window_size
        for i, activity in enumerate(self.activities):
            x_start = i * segment_width
            x_end = x_start + segment_width
            color = colors.get(activity, (128, 128, 128))
            cv2.rectangle(defrag_image, (x_start, 0), (x_end, 80), color, -1)

        font = cv2.FONT_HERSHEY_SIMPLEX
        legend_items = [
            ("Walking", (0, 0, 255)),
            ("Standing", (255, 0, 0)),
            ("Sitting", (0, 255, 0)),
            ("Pawing", (255, 165, 0)),
            ("Unknown", (128, 128, 128)),
        ]

        x_offset = 10
        y_offset = 120
        for text, color in legend_items:
            cv2.rectangle(
                defrag_image,
                (x_offset, y_offset - 15),
                (x_offset + 20, y_offset + 5),
                color,
                -1,
            )
            percentage = (
                self.activity_counts.get(text.lower(), 0) / max(1, self.total_frames)
            ) * 100
            cv2.putText(
                defrag_image,
                f"{text}: {percentage:.1f}%",
                (x_offset + 30, y_offset),
                font,
                0.5,
                (0, 0, 0),
                1,
            )
            x_offset += 160

        combined_height = frame_height + defrag_height
        combined_image = np.zeros((combined_height, frame_width, 3), dtype=np.uint8)
        combined_image[:frame_height] = frame
        combined_image[frame_height:] = defrag_image

        return combined_image

class TensorRTInference:
    """TensorRT inference class for ViTPose model, optimized for GPU with TensorRT 10.8.0.43"""
    def __init__(self, engine_path):
        """Initialize TensorRT engine and allocate buffers on GPU."""
        self.logger = trt.Logger(trt.Logger.INFO)
        self.runtime = trt.Runtime(self.logger)

        # Load engine from file
        with open(engine_path, 'rb') as f:
            engine_bytes = f.read()
            self.engine = self.runtime.deserialize_cuda_engine(engine_bytes)

        if not self.engine:
            raise RuntimeError(f"Failed to load TensorRT engine from {engine_path}")

        self.context = self.engine.create_execution_context()
        
        # Define input and output shapes
        self.input_shape = (1, 3, 256, 192)  # ViTPose standard input
        self.output_shape = (1, 17, 64, 48)  # ViTPose standard output (17 keypoints)
        
        # Tensor names (adjust based on your engine)
        self.input_name = "input"
        self.output_name = "output"
        
        # Allocate GPU buffers
        self.allocate_buffers()

    def allocate_buffers(self):
        """Allocate CUDA memory for inputs and outputs."""
        self.inputs = []
        self.outputs = []
        self.bindings = []

        num_io_tensors = self.engine.num_io_tensors
        for i in range(num_io_tensors):
            tensor_name = self.engine.get_tensor_name(i)
            tensor_shape = self.engine.get_tensor_shape(tensor_name)
            tensor_dtype = trt.nptype(self.engine.get_tensor_dtype(tensor_name))
            
            if tensor_dtype == np.float32:
                torch_dtype = torch.float32
            elif tensor_dtype == np.float16:
                torch_dtype = torch.float16
            else:
                raise ValueError(f"Unsupported dtype: {tensor_dtype}")

            # Explicitly allocate on GPU (CUDA)
            tensor = torch.zeros(tuple(tensor_shape), dtype=torch_dtype, device='cuda')
            self.bindings.append(tensor.data_ptr())
            
            if self.engine.get_tensor_mode(tensor_name) == trt.TensorIOMode.INPUT:
                self.inputs.append(tensor)
            else:
                self.outputs.append(tensor)

            self.context.set_tensor_address(tensor_name, tensor.data_ptr())

    def preprocess_image(self, img):
        """Preprocess image for model input on GPU."""
        # Convert to RGB and resize on CPU first (cv2 doesn't support GPU natively)
        img = cv2.resize(img, (192, 256))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = img.astype(np.float32) / 255.0
        img = img.transpose(2, 0, 1)  # HWC to CHW
        img = np.expand_dims(img, axis=0)  # Add batch dimension
        
        # Move to GPU
        return torch.from_numpy(img).to('cuda')

    def postprocess_output(self, output):
        """Convert model output to keypoints on GPU, then move to CPU for further processing."""
        heatmaps = output.reshape(self.output_shape)
        keypoints = {}
        for batch_idx in range(heatmaps.shape[0]):
            person_keypoints = torch.zeros((17, 3), device='cuda')  # Process on GPU
            for kpt_idx in range(17):
                heatmap = heatmaps[batch_idx, kpt_idx]
                flat_idx = torch.argmax(heatmap)
                y, x = torch.unravel_index(flat_idx, heatmap.shape)
                orig_x = x * (192 / 48)  # Scale to input width
                orig_y = y * (256 / 64)  # Scale to input height
                confidence = heatmap[y, x]
                person_keypoints[kpt_idx] = torch.tensor([orig_x, orig_y, confidence], device='cuda')
            keypoints[batch_idx] = person_keypoints.cpu().numpy()  # Move to CPU for compatibility
        return keypoints
    
    def infer(self, img):
        """Run inference on an image using GPU."""
        preprocessed = self.preprocess_image(img)
        self.inputs[0].copy_(preprocessed)
        
        # Execute inference asynchronously on GPU
        self.context.execute_async_v3(stream_handle=torch.cuda.current_stream().cuda_stream)
        torch.cuda.synchronize()  # Wait for GPU computation to finish
        
        output = self.outputs[0]  # Already on GPU
        keypoints = self.postprocess_output(output)
        return keypoints

class HorseGaitMonitor:
    def __init__(self, model_path, output_dir="monitoring_output"):
        """Initialize the horse gait monitoring system with TensorRT on GPU."""
        self.model = TensorRTInference(model_path)
        
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)

        self.pose_dirs = {
            "standing": os.path.join(output_dir, "standing"),
            "walking": os.path.join(output_dir, "walking"),
            "sitting": os.path.join(output_dir, "sitting"),
            "pawing": os.path.join(output_dir, "pawing"),
        }
        for dir_path in self.pose_dirs.values():
            os.makedirs(dir_path, exist_ok=True)

        self.prev_positions = None
        self.movement_buffer = []
        self.state_buffer = []
        self.visualizer = CombinedVisualizer()
        
        # New variables for improved pawing detection
        self.front_paw_positions_history = deque(maxlen=30)  # Store past positions for pattern analysis
        self.front_leg_angles_history = deque(maxlen=30)     # Store past angles for pattern analysis
        self.pawing_pattern_count = 0                       # Counter for potential pawing patterns
        self.pawing_cooldown = 0                           # Cooldown after pawing is detected
        self.pawing_detection_threshold = 3                # Number of repetitions to confirm pawing
        self.pawing_angle_threshold = (100, 140)           # Angle range for potential pawing

    def calculate_angle(self, p1, p2, p3):
        """Calculate angle between three points (CPU-based for simplicity)."""
        v1 = p1 - p2
        v2 = p3 - p2
        cos_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
        angle = np.degrees(np.arccos(np.clip(cos_angle, -1.0, 1.0)))
        return angle

    def detect_pawing_pattern(self, front_paw_positions, front_leg_angles):
        """
        Detect pawing pattern by analyzing the history of front paw movements and leg angles.
        Pawing is characterized by repetitive vertical movement of front legs with specific angle patterns.
        """
        # Add current positions and angles to history
        self.front_paw_positions_history.append(front_paw_positions)
        self.front_leg_angles_history.append(front_leg_angles)
        
        # Need enough history to detect pattern
        if len(self.front_paw_positions_history) < 10:
            return False
            
        # Check for pawing in either front leg
        for leg_idx in range(2):  # 0 for left front, 1 for right front
            # Extract vertical (y) position history for this leg
            y_positions = np.array([pos[leg_idx][1] for pos in self.front_paw_positions_history])
            
            # Calculate vertical movements (positive values = downward movement)
            y_movements = np.diff(y_positions)
            
            # Check angle history for this leg
            angles = np.array([angles[leg_idx] for angles in self.front_leg_angles_history])
            
            # Pawing pattern detection:
            # 1. Find direction changes (up to down and down to up)
            direction_changes = np.sign(y_movements[1:]) != np.sign(y_movements[:-1])
            num_direction_changes = np.sum(direction_changes)
            
            # 2. Check if we have at least 3 direction changes (up-down-up or down-up-down)
            if num_direction_changes >= 3:
                # 3. Check vertical range of movement (should be significant for pawing)
                vertical_range = np.max(y_positions) - np.min(y_positions)
                
                # 4. Check if angles were in pawing range at some point
                angles_in_range = np.any((angles >= self.pawing_angle_threshold[0]) & 
                                         (angles <= self.pawing_angle_threshold[1]))
                
                # Confirm pawing if movement range is significant and angles were appropriate
                if vertical_range > 15 and angles_in_range:
                    return True
                    
        return False

    def detect_state(self, keypoints):
        """Enhanced detect_state method with improved pawing detection."""
        movement_detected = False

        for person_id, kp_array in keypoints.items():
            leg_points = {
                "L_F_Hip": kp_array[5][:2],
                "L_F_Knee": kp_array[6][:2],
                "L_F_Paw": kp_array[7][:2],
                "R_F_Hip": kp_array[8][:2],
                "R_F_Knee": kp_array[9][:2],
                "R_F_Paw": kp_array[10][:2],
                "L_B_Hip": kp_array[11][:2],
                "L_B_Knee": kp_array[12][:2],
                "L_B_Paw": kp_array[13][:2],
                "R_B_Hip": kp_array[14][:2],
                "R_B_Knee": kp_array[15][:2],
                "R_B_Paw": kp_array[16][:2]
            }

            angles = {
                "left_front": self.calculate_angle(
                    np.array(leg_points["L_F_Hip"]),
                    np.array(leg_points["L_F_Knee"]),
                    np.array(leg_points["L_F_Paw"])
                ),
                "right_front": self.calculate_angle(
                    np.array(leg_points["R_F_Hip"]),
                    np.array(leg_points["R_F_Knee"]),
                    np.array(leg_points["R_F_Paw"])
                ),
                "left_back": self.calculate_angle(
                    np.array(leg_points["L_B_Hip"]),
                    np.array(leg_points["L_B_Knee"]),
                    np.array(leg_points["L_B_Paw"])
                ),
                "right_back": self.calculate_angle(
                    np.array(leg_points["R_B_Hip"]),
                    np.array(leg_points["R_B_Knee"]),
                    np.array(leg_points["R_B_Paw"])
                )
            }
            
            # Extract front paw positions for pattern analysis
            front_paw_positions = [
                leg_points["L_F_Paw"],
                leg_points["R_F_Paw"]
            ]
            
            # Extract front leg angles for pattern analysis
            front_leg_angles = [
                angles["left_front"],
                angles["right_front"]
            ]
            
            # Improved pawing detection using repetitive pattern analysis
            is_pawing_pattern = self.detect_pawing_pattern(front_paw_positions, front_leg_angles)
            
            # State determination with cooldown logic for pawing
            if self.pawing_cooldown > 0:
                self.pawing_cooldown -= 1
                return "pawing"
                
            if is_pawing_pattern:
                self.pawing_pattern_count += 1
                if self.pawing_pattern_count >= self.pawing_detection_threshold:
                    self.pawing_pattern_count = 0
                    self.pawing_cooldown = 15  # Keep "pawing" state for 15 frames after detection
                    return "pawing"
            else:
                # Gradually decrease the pattern count when no pawing is detected
                self.pawing_pattern_count = max(0, self.pawing_pattern_count - 0.5)
            
            # Check for sitting behavior
            if all(angle < 90 for angle in angles.values()):
                return "sitting"

            # Movement detection for walking vs standing
            paw_positions = np.array([
                leg_points["L_F_Paw"],
                leg_points["R_F_Paw"],
                leg_points["L_B_Paw"],
                leg_points["R_B_Paw"]
            ])

            if self.prev_positions is not None:
                movements = np.linalg.norm(paw_positions - self.prev_positions, axis=1)
                self.movement_buffer.append(np.mean(movements))
                if len(self.movement_buffer) > 10:
                    self.movement_buffer.pop(0)
                movement_detected = np.mean(self.movement_buffer) > 5.0

            self.prev_positions = paw_positions

        current_state = "walking" if movement_detected else "standing"
        self.state_buffer.append(current_state)
        if len(self.state_buffer) > 15:
            self.state_buffer.pop(0)
        return max(set(self.state_buffer), key=self.state_buffer.count)

    def draw_state_annotation(self, frame, state):
        """Draw state annotation on frame without detailed visualization."""
        # Simply return the original frame without any annotations
        return frame

    def save_frame(self, frame, state, frame_count):
        """Save frame to appropriate directory based on state."""
        if state in self.pose_dirs:
            filename = os.path.join(self.pose_dirs[state], f"frame_{frame_count}.jpg")
            cv2.imwrite(filename, frame)

    def draw_keypoints(self, frame, keypoints):
        """Return original frame without keypoints drawing."""
        # Simply return the original frame without drawing keypoints
        return frame

    def process_video(self, video_path):
        """Process video file and analyze horse gait on GPU."""
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError(f"Could not open video file: {video_path}")

        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        combined_height = height + 150  # Add space for visualizer
        output_path = os.path.join(self.output_dir, "video_with_analysis2.mp4")
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, combined_height))

        frame_count = 0
        last_announced_state = None

        cv2.namedWindow("Horse Gait Analysis", cv2.WINDOW_NORMAL)

        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    break

                keypoints = self.model.infer(frame)
                current_state = self.detect_state(keypoints)
                
                # No keypoint visualization
                annotated_frame = frame.copy()
                
                self.save_frame(annotated_frame, current_state, frame_count)

                self.visualizer.update(current_state)
                combined_display = self.visualizer.create_visualization(annotated_frame)

                cv2.imshow("Horse Gait Analysis", combined_display)
                out.write(combined_display)

                frame_count += 1

                if cv2.waitKey(1) & 0xFF == ord("q"):
                    break

        finally:
            cap.release()
            out.release()
            cv2.destroyAllWindows()
            print(f"\nProcessing complete! Output saved to: {output_path}")

def main():
    """Main function to run the horse gait analysis on GPU."""
    engine_path = "C:/Users/hp/Downloads/vitpose-l-ap10k.engine"
    try:
        # Print GPU info
        print(f"Using GPU: {torch.cuda.get_device_name(0)}")
        monitor = HorseGaitMonitor(engine_path)
        monitor.process_video(r"E:\vitpose\strach.mp4")
    except Exception as e:
        print(f"Error processing video: {str(e)}")
        raise

if __name__ == "__main__":
    main()

Using GPU: NVIDIA GeForce RTX 3050 Laptop GPU

Processing complete! Output saved to: monitoring_output\video_with_analysis2.mp4


In [18]:
import torch
import tensorrt as trt
import os

# Model paths
ONNX_PATH = "C:/Users/hp/Downloads/vitpose-l-ap10k.onnx"
TRT_PATH = ONNX_PATH.replace('.onnx', '.engine')

# Model configuration
C, H, W = (3, 256, 192)
input_names = ["input_0"]
output_names = ["output_0"]

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Create dummy input tensor
inputs = torch.randn(1, C, H, W).to(device)

# Dynamic axes configuration
dynamic_axes = {
    'input_0': {0: 'batch_size'},
    'output_0': {0: 'batch_size'}
}

def export_engine(onnx, im, file, workspace=4, verbose=False, prefix='Tensorrt'):
    logger = trt.Logger(trt.Logger.INFO)
    if verbose:
        logger.min_severity = trt.Logger.Severity.VERBOSE
    
    # Initialize builder and config
    builder = trt.Builder(logger)
    config = builder.create_builder_config()
    config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, workspace << 30)
    
    # Create network
    flag = (1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH))
    network = builder.create_network(flag)
    
    # Parse ONNX
    parser = trt.OnnxParser(network, logger)
    if not parser.parse_from_file(str(onnx)):
        raise RuntimeError(f'failed to load ONNX file: {onnx}')

    # Process inputs and outputs
    inputs = [network.get_input(i) for i in range(network.num_inputs)]
    outputs = [network.get_output(i) for i in range(network.num_outputs)]
    
    # Print input and output details
    for inp in inputs:
        print(f'{prefix} input "{inp.name}" with shape{inp.shape} {inp.dtype}')
    for out in outputs:
        print(f'{prefix} output "{out.name}" with shape{out.shape} {out.dtype}')

    # Set profile for dynamic shapes
    profile = builder.create_optimization_profile()
    for inp in inputs:
        profile.set_shape(
            inp.name, 
            (1, *im.shape[1:]),                           # min shape
            (max(1, im.shape[0] // 2), *im.shape[1:]),    # optimal shape
            im.shape                                       # max shape
        )
    config.add_optimization_profile(profile)

    # Force FP32 precision - no FP16 configuration
    print(f'{prefix} building FP32 engine')
    
    # Build and save engine
    try:
        serialized_engine = builder.build_serialized_network(network, config)
    except AttributeError:
        plan = builder.build_engine(network, config)
        serialized_engine = plan.serialize()
        plan.destroy()
    
    if serialized_engine is None:
        raise RuntimeError(f'{prefix} failed to build TensorRT engine')
    
    with open(file, 'wb') as f:
        f.write(serialized_engine)
    
    return True

# Check if ONNX file exists
if not os.path.exists(ONNX_PATH):
    raise FileNotFoundError(f"ONNX file not found at {ONNX_PATH}")

# Convert to TensorRT
print(f"Converting {ONNX_PATH} to TensorRT engine (FP32)...")
success = export_engine(
    onnx=ONNX_PATH,
    im=inputs,
    file=TRT_PATH,
    verbose=False
)

if success:
    print(f"Successfully converted to TensorRT engine (FP32): {TRT_PATH}")
else:
    print("Conversion failed!")

Converting C:/Users/hp/Downloads/vitpose-l-ap10k.onnx to TensorRT engine (FP32)...
Tensorrt input "input_0" with shape(-1, 3, 256, 192) DataType.FLOAT
Tensorrt output "output_0" with shape(-1, 17, 64, 48) DataType.FLOAT
Tensorrt building FP32 engine
Successfully converted to TensorRT engine (FP32): C:/Users/hp/Downloads/vitpose-l-ap10k.engine


In [24]:

import torch
import tensorrt as trt
import numpy as np
import onnxruntime
import cv2

class TensorRTValidator:
    def __init__(self, engine_path):
        self.logger = trt.Logger(trt.Logger.INFO)
        self.runtime = trt.Runtime(self.logger)
        
        with open(engine_path, 'rb') as f:
            engine_bytes = f.read()
            self.engine = self.runtime.deserialize_cuda_engine(engine_bytes)
            
        if not self.engine:
            raise RuntimeError(f"Failed to load TensorRT engine from {engine_path}")
            
        self.context = self.engine.create_execution_context()
        self.allocate_buffers()

    def allocate_buffers(self):
        self.inputs = []
        self.outputs = []
        self.bindings = []

        num_io_tensors = self.engine.num_io_tensors
        for i in range(num_io_tensors):
            tensor_name = self.engine.get_tensor_name(i)
            tensor_shape = self.engine.get_tensor_shape(tensor_name)
            tensor_dtype = trt.nptype(self.engine.get_tensor_dtype(tensor_name))
            
            if tensor_dtype == np.float32:
                torch_dtype = torch.float32
            elif tensor_dtype == np.float16:
                torch_dtype = torch.float16
            else:
                raise ValueError(f"Unsupported dtype: {tensor_dtype}")

            tensor = torch.zeros(tuple(tensor_shape), dtype=torch_dtype, device='cuda')
            self.bindings.append(tensor.data_ptr())
            
            if self.engine.get_tensor_mode(tensor_name) == trt.TensorIOMode.INPUT:
                self.inputs.append(tensor)
            else:
                self.outputs.append(tensor)

            self.context.set_tensor_address(tensor_name, tensor.data_ptr())

    def infer(self, input_tensor):
        self.inputs[0].copy_(input_tensor)
        self.context.execute_async_v3(stream_handle=torch.cuda.current_stream().cuda_stream)
        torch.cuda.synchronize()
        return self.outputs[0].clone()

def preprocess_image(image_path, target_size=(256, 192)):
    """Load and preprocess image for model input."""
    img = cv2.imread(image_path)
    if img is None:
        raise ValueError(f"Failed to load image from {image_path}")
    
    orig_img = img.copy()
    img = cv2.resize(img, (target_size[1], target_size[0]))  # (width, height)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = img.astype(np.float32) / 255.0  # Ensure float32 here
    mean = np.array([0.485, 0.456, 0.406], dtype=np.float32)
    std = np.array([0.229, 0.224, 0.225], dtype=np.float32)
    img = (img - mean) / std
    # Convert to tensor and ensure float32
    img_tensor = torch.from_numpy(img.transpose(2, 0, 1)).unsqueeze(0).to(torch.float32).cuda()
    return img_tensor, orig_img

def extract_keypoints(heatmap, threshold=0.1):
    """Extract keypoints from heatmap tensor."""
    batch_size, num_keypoints, height, width = heatmap.shape
    keypoints = []
    
    for b in range(batch_size):
        batch_keypoints = []
        for k in range(num_keypoints):
            heatmap_k = heatmap[b, k]
            max_val = torch.max(heatmap_k)
            if max_val < threshold:
                batch_keypoints.append((-1, -1, float(max_val)))
                continue
            y, x = torch.where(heatmap_k == max_val)
            batch_keypoints.append((int(x[0]), int(y[0]), float(max_val)))
        keypoints.append(batch_keypoints)
    return keypoints

def draw_keypoints(image, keypoints, color=(0, 255, 0)):
    """Draw keypoints on the image."""
    for x, y, conf in keypoints:
        if x != -1 and y != -1:
            cv2.circle(image, (x, y), 5, color, -1)
    return image

def validate_models_with_image(onnx_path, engine_path, image_path):
    print("Starting model validation with image...")
    
    input_tensor, orig_img = preprocess_image(image_path)
    
    # ONNX inference
    print("\nRunning ONNX inference...")
    providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
    ort_session = onnxruntime.InferenceSession(onnx_path, providers=providers)
    ort_inputs = {ort_session.get_inputs()[0].name: input_tensor.cpu().numpy()}  # float32 ensured
    onnx_output_np = ort_session.run(None, ort_inputs)[0]
    onnx_output = torch.from_numpy(onnx_output_np).cuda()
    print("ONNX output shape:", onnx_output.shape)
    
    # TensorRT inference
    print("\nRunning TensorRT inference...")
    trt_validator = TensorRTValidator(engine_path)
    trt_output = trt_validator.infer(input_tensor)
    print("TensorRT output shape:", trt_output.shape)
    
    # Compare outputs
    diff = torch.abs(onnx_output - trt_output)
    max_diff = float(torch.max(diff))
    mean_diff = float(torch.mean(diff))
    is_close = torch.allclose(onnx_output, trt_output, rtol=1e-3, atol=1e-3)
    
    print("\nValidation Results:")
    print(f"Outputs match within tolerance: {is_close}")
    print(f"Maximum absolute difference: {max_diff:.6f}")
    print(f"Mean absolute difference: {mean_diff:.6f}")
    
    # Extract keypoints
    onnx_keypoints = extract_keypoints(onnx_output)[0]
    trt_keypoints = extract_keypoints(trt_output)[0]
    
    # Scale keypoints to original image size
    orig_h, orig_w = orig_img.shape[:2]
    scale_x = orig_w / 48
    scale_y = orig_h / 64
    onnx_keypoints_scaled = [(int(x * scale_x), int(y * scale_y), conf) if x != -1 else (-1, -1, conf) for x, y, conf in onnx_keypoints]
    trt_keypoints_scaled = [(int(x * scale_x), int(y * scale_y), conf) if x != -1 else (-1, -1, conf) for x, y, conf in trt_keypoints]
    
    # Print keypoints
    print("\nONNX Keypoints (x, y, confidence):")
    for i, (x, y, conf) in enumerate(onnx_keypoints_scaled):
        print(f"Keypoint {i}: ({x}, {y}), Confidence: {conf:.6f}")
    
    print("\nTensorRT Keypoints (x, y, confidence):")
    for i, (x, y, conf) in enumerate(trt_keypoints_scaled):
        print(f"Keypoint {i}: ({x}, {y}), Confidence: {conf:.6f}")
    
    # Visualize
    onnx_img = draw_keypoints(orig_img.copy(), onnx_keypoints_scaled, color=(0, 255, 0))
    trt_img = draw_keypoints(orig_img.copy(), trt_keypoints_scaled, color=(255, 0, 0))
    
    cv2.imshow("ONNX Keypoints", onnx_img)
    cv2.imshow("TensorRT Keypoints", trt_img)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

if __name__ == "__main__":
    if not torch.cuda.is_available():
        raise RuntimeError("CUDA is not available. Please check your PyTorch installation.")
        
    ONNX_PATH = "C:/Users/hp/Downloads/vitpose-l-ap10k.onnx"
    TRT_PATH = ONNX_PATH.replace('.onnx', '.engine')
    IMAGE_PATH = "E:/lame.webp"  # Replace with your image path
    
    print(f"CUDA Device: {torch.cuda.get_device_name()}")
    print(f"CUDA Version: {torch.version.cuda}")
    
    validate_models_with_image(ONNX_PATH, TRT_PATH, IMAGE_PATH)

CUDA Device: NVIDIA GeForce RTX 3050 Laptop GPU
CUDA Version: 12.1
Starting model validation with image...

Running ONNX inference...
ONNX output shape: torch.Size([1, 17, 64, 48])

Running TensorRT inference...
TensorRT output shape: torch.Size([1, 17, 64, 48])

Validation Results:
Outputs match within tolerance: True
Maximum absolute difference: 0.000544
Mean absolute difference: 0.000002

ONNX Keypoints (x, y, confidence):
Keypoint 0: (285, 102), Confidence: 0.964143
Keypoint 1: (224, 91), Confidence: 0.919919
Keypoint 2: (204, 234), Confidence: 0.887964
Keypoint 3: (469, 193), Confidence: 0.698245
Keypoint 4: (735, 91), Confidence: 0.838982
Keypoint 5: (530, 346), Confidence: 0.960392
Keypoint 6: (490, 469), Confidence: 0.938401
Keypoint 7: (449, 591), Confidence: 0.910693
Keypoint 8: (428, 336), Confidence: 0.885177
Keypoint 9: (408, 459), Confidence: 0.951775
Keypoint 10: (367, 581), Confidence: 0.921609
Keypoint 11: (775, 265), Confidence: 0.904391
Keypoint 12: (775, 367), Confi

In [2]:
print(trt.__version__)

10.8.0.43


In [12]:
import os
import torch
import os
import torch
import tensorrt as trt
import onnx
import onnxruntime
from easy_ViTPose.easy_ViTPose.vit_models.model import ViTPose
from easy_ViTPose.easy_ViTPose.configs.ViTPose_coco import model_small as model_cfg

In [13]:
def export_engine(onnx, im, file, half, dynamic, workspace=4, verbose=False, prefix='Tensorrt'):
    logger = trt.Logger(trt.Logger.INFO)
    if verbose:
        logger.min_severity = trt.Logger.Severity.VERBOSE
    
    # Initialize builder and config
    builder = trt.Builder(logger)
    config = builder.create_builder_config()
    config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, workspace << 30)
    
    # Create network
    flag = (1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH))
    network = builder.create_network(flag)
    
    # Parse ONNX
    parser = trt.OnnxParser(network, logger)
    if not parser.parse_from_file(str(onnx)):
        raise RuntimeError(f'failed to load ONNX file: {onnx}')

    # Process inputs and outputs
    inputs = [network.get_input(i) for i in range(network.num_inputs)]
    outputs = [network.get_output(i) for i in range(network.num_outputs)]
    
    # Print input and output details
    for inp in inputs:
        print(f'{prefix} input "{inp.name}" with shape{inp.shape} {inp.dtype}')
    for out in outputs:
        print(f'{prefix} output "{out.name}" with shape{out.shape} {out.dtype}')

    # Handle dynamic batching
    if dynamic:
        if im.shape[0] <= 1:
            print(f'{prefix} WARNING ⚠️ --dynamic model requires maximum --batch-size argument')
        profile = builder.create_optimization_profile()
        for inp in inputs:
            profile.set_shape(
                inp.name, 
                (1, *im.shape[1:]),                           # min shape
                (max(1, im.shape[0] // 2), *im.shape[1:]),    # optimal shape
                im.shape                                       # max shape
            )
        config.add_optimization_profile(profile)

    # Configure precision
    print(f'{prefix} building FP{16 if builder.platform_has_fast_fp16 and half else 32} engine')
    if builder.platform_has_fast_fp16 and half:
        config.set_flag(trt.BuilderFlag.FP16)

    # Build and save engine
    try:
        # First try the new API (TensorRT 8.4+)
        serialized_engine = builder.build_serialized_network(network, config)
    except AttributeError:
        # Fallback for older versions
        plan = builder.build_engine(network, config)
        serialized_engine = plan.serialize()
        plan.destroy()
    
    if serialized_engine is None:
        raise RuntimeError(f'{prefix} failed to build TensorRT engine')
    
    with open(file, 'wb') as f:
        f.write(serialized_engine)
    
    return True

In [15]:
import torch

ONNX_PATH = "C:/Users/hp/Downloads/vitpose-l-ap10k.onnx"
TRT_PATH = ONNX_PATH.replace('.onnx', '.engine')
C, H, W = (3, 256, 192)

input_names = ["input_0"]
output_names = ["output_0"]

# Use CUDA if available, otherwise CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Create a dummy input tensor
inputs = torch.randn(1, C, H, W).to(device)

dynamic_axes = {'input_0' : {0 : 'batch_size'},
                'output_0' : {0 : 'batch_size'}}


In [16]:
export_engine(ONNX_PATH, inputs, TRT_PATH, False, True, verbose=False)

Tensorrt input "input_0" with shape(-1, 3, 256, 192) DataType.FLOAT
Tensorrt output "output_0" with shape(-1, 17, 64, 48) DataType.FLOAT
Tensorrt building FP32 engine


True

In [26]:
import tensorrt as trt
logger = trt.Logger(trt.Logger.INFO)
runtime = trt.Runtime(logger)
with open("C:/Users/hp/Downloads/vitpose-l-ap10k.engine", 'rb') as f:
    engine = runtime.deserialize_cuda_engine(f.read())
for i in range(engine.num_io_tensors):
    name = engine.get_tensor_name(i)
    shape = engine.get_tensor_shape(name)
    mode = engine.get_tensor_mode(name)
    dtype = engine.get_tensor_dtype(name)
    print(f"Tensor {i}: {name}, Shape: {shape}, Mode: {mode}, Dtype: {dtype}")

Tensor 0: input_0, Shape: (1, 3, 256, 192), Mode: TensorIOMode.INPUT, Dtype: DataType.FLOAT
Tensor 1: output_0, Shape: (1, 17, 64, 48), Mode: TensorIOMode.OUTPUT, Dtype: DataType.FLOAT


In [19]:
import cv2
import numpy as np
import tensorrt as trt
import torch
import os
from collections import deque

class CombinedVisualizer:
    def __init__(self, window_size=100):
        self.window_size = window_size
        self.activities = deque(maxlen=window_size)
        self.activity_counts = {
            "walking": 0,
            "standing": 0,
            "sitting": 0,
            "pawing": 0,
            "Unknown": 0,
        }
        self.total_frames = 0

    def update(self, activity):
        self.activities.append(activity)
        self.activity_counts[activity] += 1
        self.total_frames += 1

    def create_visualization(self, frame):
        frame_height, frame_width = frame.shape[:2]
        defrag_height = 150
        defrag_width = frame_width
        defrag_image = np.ones((defrag_height, defrag_width, 3), dtype=np.uint8) * 255

        colors = {
            "walking": (0, 0, 255),  # Red
            "standing": (255, 0, 0),  # Blue
            "sitting": (0, 255, 0),  # Green
            "pawing": (255, 165, 0),  # Orange
            "Unknown": (128, 128, 128),  # Gray
        }

        segment_width = defrag_width // self.window_size
        for i, activity in enumerate(self.activities):
            x_start = i * segment_width
            x_end = x_start + segment_width
            color = colors.get(activity, (128, 128, 128))
            cv2.rectangle(defrag_image, (x_start, 0), (x_end, 80), color, -1)

        font = cv2.FONT_HERSHEY_SIMPLEX
        legend_items = [
            ("Walking", (0, 0, 255)),
            ("Standing", (255, 0, 0)),
            ("Sitting", (0, 255, 0)),
            ("Pawing", (255, 165, 0)),
            ("Unknown", (128, 128, 128)),
        ]

        x_offset = 10
        y_offset = 120
        for text, color in legend_items:
            cv2.rectangle(
                defrag_image,
                (x_offset, y_offset - 15),
                (x_offset + 20, y_offset + 5),
                color,
                -1,
            )
            percentage = (
                self.activity_counts.get(text.lower(), 0) / max(1, self.total_frames)
            ) * 100
            cv2.putText(
                defrag_image,
                f"{text}: {percentage:.1f}%",
                (x_offset + 30, y_offset),
                font,
                0.5,
                (0, 0, 0),
                1,
            )
            x_offset += 160

        combined_height = frame_height + defrag_height
        combined_image = np.zeros((combined_height, frame_width, 3), dtype=np.uint8)
        combined_image[:frame_height] = frame
        combined_image[frame_height:] = defrag_image

        return combined_image

class TensorRTInference:
    """TensorRT inference class for ViTPose model, updated for TensorRT 10.8.0.43"""
    def __init__(self, engine_path):
        """Initialize TensorRT engine and allocate buffers."""
        self.logger = trt.Logger(trt.Logger.INFO)
        self.runtime = trt.Runtime(self.logger)

        # Load engine from file
        with open(engine_path, 'rb') as f:
            engine_bytes = f.read()
            self.engine = self.runtime.deserialize_cuda_engine(engine_bytes)

        if not self.engine:
            raise RuntimeError(f"Failed to load TensorRT engine from {engine_path}")

        self.context = self.engine.create_execution_context()
        
        # Define input and output shapes (adjust if your .engine file specifies otherwise)
        self.input_shape = (1, 3, 256, 192)  # ViTPose standard input
        self.output_shape = (1, 17, 64, 48)  # ViTPose standard output (17 keypoints)
        
        # Tensor names (assuming defaults; adjust based on your engine)
        self.input_name = "input"
        self.output_name = "output"
        
        # Allocate device buffers
        self.allocate_buffers()

    def allocate_buffers(self):
        """Allocate CUDA memory for inputs and outputs using TensorRT 10.x API."""
        self.inputs = []
        self.outputs = []
        self.bindings = []

        num_io_tensors = self.engine.num_io_tensors
        for i in range(num_io_tensors):
            tensor_name = self.engine.get_tensor_name(i)
            tensor_shape = self.engine.get_tensor_shape(tensor_name)
            tensor_dtype = trt.nptype(self.engine.get_tensor_dtype(tensor_name))
            
            if tensor_dtype == np.float32:
                torch_dtype = torch.float32
            elif tensor_dtype == np.float16:
                torch_dtype = torch.float16
            else:
                raise ValueError(f"Unsupported dtype: {tensor_dtype}")

            tensor = torch.zeros(tuple(tensor_shape), dtype=torch_dtype, device='cuda')
            self.bindings.append(tensor.data_ptr())
            
            if self.engine.get_tensor_mode(tensor_name) == trt.TensorIOMode.INPUT:
                self.inputs.append(tensor)
            else:
                self.outputs.append(tensor)

            self.context.set_tensor_address(tensor_name, tensor.data_ptr())

    def preprocess_image(self, img):
        """Preprocess image for model input."""
        img = cv2.resize(img, (192, 256))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = img.astype(np.float32) / 255.0
        img = img.transpose(2, 0, 1)  # HWC to CHW
        img = np.expand_dims(img, axis=0)  # Add batch dimension
        return img

    def postprocess_output(self, output):
        """Convert model output to keypoints."""
        heatmaps = output.reshape(self.output_shape)
        keypoints = {}
        for batch_idx in range(heatmaps.shape[0]):
            person_keypoints = np.zeros((17, 3))  # 17 keypoints (x, y, confidence)
            for kpt_idx in range(17):
                heatmap = heatmaps[batch_idx, kpt_idx]
                flat_idx = np.argmax(heatmap)
                y, x = np.unravel_index(flat_idx, heatmap.shape)
                orig_x = x * (192 / 48)  # Scale to input width
                orig_y = y * (256 / 64)  # Scale to input height
                confidence = heatmap[y, x]
                person_keypoints[kpt_idx] = [orig_x, orig_y, confidence]
            keypoints[batch_idx] = person_keypoints
        return keypoints

    def infer(self, img):
        """Run inference on an image."""
        preprocessed = self.preprocess_image(img)
        torch_input = torch.from_numpy(preprocessed).cuda()
        self.inputs[0].copy_(torch_input)
        
        self.context.execute_async_v3(stream_handle=torch.cuda.current_stream().cuda_stream)
        torch.cuda.synchronize()
        
        output = self.outputs[0].cpu().numpy()
        keypoints = self.postprocess_output(output)
        return keypoints

class HorseGaitMonitor:
    def __init__(self, model_path, output_dir="monitoring_output"):
        """Initialize the horse gait monitoring system with TensorRT."""
        self.model = TensorRTInference(model_path)
        
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)

        self.pose_dirs = {
            "standing": os.path.join(output_dir, "standing"),
            "walking": os.path.join(output_dir, "walking"),
            "sitting": os.path.join(output_dir, "sitting"),
            "pawing": os.path.join(output_dir, "pawing"),
        }
        for dir_path in self.pose_dirs.values():
            os.makedirs(dir_path, exist_ok=True)

        self.prev_positions = None
        self.movement_buffer = []
        self.state_buffer = []
        self.visualizer = CombinedVisualizer()

    def calculate_angle(self, p1, p2, p3):
        """Calculate angle between three points."""
        v1 = p1 - p2
        v2 = p3 - p2
        cos_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
        angle = np.degrees(np.arccos(np.clip(cos_angle, -1.0, 1.0)))
        return angle

    def detect_state(self, keypoints):
        """Detect the horse's current state based on keypoints."""
        movement_detected = False

        for person_id, kp_array in keypoints.items():
            leg_points = {
                "L_F_Hip": kp_array[5][:2],
                "L_F_Knee": kp_array[6][:2],
                "L_F_Paw": kp_array[7][:2],
                "R_F_Hip": kp_array[8][:2],
                "R_F_Knee": kp_array[9][:2],
                "R_F_Paw": kp_array[10][:2],
                "L_B_Hip": kp_array[11][:2],
                "L_B_Knee": kp_array[12][:2],
                "L_B_Paw": kp_array[13][:2],
                "R_B_Hip": kp_array[14][:2],
                "R_B_Knee": kp_array[15][:2],
                "R_B_Paw": kp_array[16][:2]
            }

            angles = {
                "left_front": self.calculate_angle(
                    np.array(leg_points["L_F_Hip"]),
                    np.array(leg_points["L_F_Knee"]),
                    np.array(leg_points["L_F_Paw"])
                ),
                "right_front": self.calculate_angle(
                    np.array(leg_points["R_F_Hip"]),
                    np.array(leg_points["R_F_Knee"]),
                    np.array(leg_points["R_F_Paw"])
                ),
                "left_back": self.calculate_angle(
                    np.array(leg_points["L_B_Hip"]),
                    np.array(leg_points["L_B_Knee"]),
                    np.array(leg_points["L_B_Paw"])
                ),
                "right_back": self.calculate_angle(
                    np.array(leg_points["R_B_Hip"]),
                    np.array(leg_points["R_B_Knee"]),
                    np.array(leg_points["R_B_Paw"])
                )
            }

            if any(100 <= angle <= 130 for angle in [angles["left_front"], angles["right_front"]]):
                return "pawing"
            if all(angle < 90 for angle in angles.values()):
                return "sitting"

            paw_positions = np.array([
                leg_points["L_F_Paw"],
                leg_points["R_F_Paw"],
                leg_points["L_B_Paw"],
                leg_points["R_B_Paw"]
            ])

            if self.prev_positions is not None:
                movements = np.linalg.norm(paw_positions - self.prev_positions, axis=1)
                self.movement_buffer.append(np.mean(movements))
                if len(self.movement_buffer) > 10:
                    self.movement_buffer.pop(0)
                movement_detected = np.mean(self.movement_buffer) > 5.0

            self.prev_positions = paw_positions

        current_state = "walking" if movement_detected else "standing"
        self.state_buffer.append(current_state)
        if len(self.state_buffer) > 15:
            self.state_buffer.pop(0)
        return max(set(self.state_buffer), key=self.state_buffer.count)

    def draw_state_annotation(self, frame, state):
        """Draw state annotation on frame."""
        annotated_frame = frame.copy()
        colors = {
            "standing": (255, 0, 0),  # Blue
            "walking": (0, 0, 255),   # Red
            "sitting": (0, 255, 0),   # Green
            "pawing": (255, 165, 0)   # Orange
        }
        cv2.putText(
            annotated_frame,
            f"State: {state.upper()}",
            (10, 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            1,
            colors.get(state, (128, 128, 128)),
            2
        )
        return annotated_frame

    def save_frame(self, frame, state, frame_count):
        """Save frame to appropriate directory based on state."""
        if state in self.pose_dirs:
            filename = os.path.join(self.pose_dirs[state], f"frame_{frame_count}.jpg")
            cv2.imwrite(filename, frame)

    def process_video(self, video_path):
        """Process video file and analyze horse gait."""
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError(f"Could not open video file: {video_path}")

        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        combined_height = height + 150  # Add space for visualizer
        output_path = os.path.join(self.output_dir, "video_with_analysis.mp4")
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, combined_height))

        frame_count = 0
        last_announced_state = None

        cv2.namedWindow("Horse Gait Analysis", cv2.WINDOW_NORMAL)

        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    break

                keypoints = self.model.infer(frame)
                current_state = self.detect_state(keypoints)

                if current_state != last_announced_state:
                    print(f"Frame {frame_count}: Horse is now {current_state}")
                    last_announced_state = current_state

                annotated_frame = self.draw_state_annotation(frame, current_state)
                self.save_frame(annotated_frame, current_state, frame_count)

                self.visualizer.update(current_state)
                combined_display = self.visualizer.create_visualization(annotated_frame)

                cv2.imshow("Horse Gait Analysis", combined_display)
                out.write(combined_display)

                frame_count += 1
                if frame_count % 30 == 0:
                    progress = (frame_count / total_frames) * 100
                    print(f"Processing progress: {progress:.1f}%")

                if cv2.waitKey(1) & 0xFF == ord("q"):
                    break

        finally:
            cap.release()
            out.release()
            cv2.destroyAllWindows()
            print(f"\nProcessing complete! Output saved to: {output_path}")

def main():
    """Main function to run the horse gait analysis."""
    engine_path = "C:/Users/hp/Downloads/vitpose-l-ap10k.engine"
    try:
        monitor = HorseGaitMonitor(engine_path)
        monitor.process_video("E:/walk.mp4")
    except Exception as e:
        print(f"Error processing video: {str(e)}")
        raise

if __name__ == "__main__":
    main()

Frame 0: Horse is now standing
Frame 1: Horse is now walking
Frame 5: Horse is now pawing
Frame 6: Horse is now walking


  cos_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))


Frame 10: Horse is now pawing
Frame 11: Horse is now walking
Frame 12: Horse is now pawing
Frame 13: Horse is now walking
Frame 14: Horse is now pawing
Frame 15: Horse is now walking
Frame 22: Horse is now pawing
Frame 23: Horse is now walking
Frame 28: Horse is now pawing
Frame 29: Horse is now walking
Processing progress: 0.0%
Frame 30: Horse is now sitting
Frame 32: Horse is now walking
Frame 33: Horse is now pawing
Frame 34: Horse is now walking
Frame 39: Horse is now pawing
Frame 41: Horse is now walking
Frame 43: Horse is now sitting
Frame 44: Horse is now walking
Processing progress: 0.0%
Frame 60: Horse is now pawing
Frame 61: Horse is now walking
Frame 63: Horse is now pawing
Frame 64: Horse is now walking
Processing progress: 0.0%

Processing complete! Output saved to: monitoring_output\video_with_analysis.mp4


KeyboardInterrupt: 

In [4]:
import tensorrt as trt

In [5]:
print(trt.__version__)

10.8.0.43


In [1]:
import cv2
import numpy as np
import tensorrt as trt
import torch
import os
from collections import deque

# Ensure CUDA is available
if not torch.cuda.is_available():
    raise RuntimeError("CUDA is not available. Please check your GPU and PyTorch installation.")

class CombinedVisualizer:
    def __init__(self, window_size=100):
        self.window_size = window_size
        self.activities = deque(maxlen=window_size)
        self.activity_counts = {
            "walking": 0,
            "standing": 0,
            "sitting": 0,
            "pawing": 0,
            "Unknown": 0,
        }
        self.total_frames = 0

    def update(self, activity):
        self.activities.append(activity)
        self.activity_counts[activity] += 1
        self.total_frames += 1

    def create_visualization(self, frame):
        frame_height, frame_width = frame.shape[:2]
        defrag_height = 150
        defrag_width = frame_width
        defrag_image = np.ones((defrag_height, defrag_width, 3), dtype=np.uint8) * 255

        colors = {
            "walking": (0, 0, 255),  # Red
            "standing": (255, 0, 0),  # Blue
            "sitting": (0, 255, 0),  # Green
            "pawing": (255, 165, 0),  # Orange
            "Unknown": (128, 128, 128),  # Gray
        }

        segment_width = defrag_width // self.window_size
        for i, activity in enumerate(self.activities):
            x_start = i * segment_width
            x_end = x_start + segment_width
            color = colors.get(activity, (128, 128, 128))
            cv2.rectangle(defrag_image, (x_start, 0), (x_end, 80), color, -1)

        font = cv2.FONT_HERSHEY_SIMPLEX
        legend_items = [
            ("Walking", (0, 0, 255)),
            ("Standing", (255, 0, 0)),
            ("Sitting", (0, 255, 0)),
            ("Pawing", (255, 165, 0)),
            ("Unknown", (128, 128, 128)),
        ]

        x_offset = 10
        y_offset = 120
        for text, color in legend_items:
            cv2.rectangle(
                defrag_image,
                (x_offset, y_offset - 15),
                (x_offset + 20, y_offset + 5),
                color,
                -1,
            )
            percentage = (
                self.activity_counts.get(text.lower(), 0) / max(1, self.total_frames)
            ) * 100
            cv2.putText(
                defrag_image,
                f"{text}: {percentage:.1f}%",
                (x_offset + 30, y_offset),
                font,
                0.5,
                (0, 0, 0),
                1,
            )
            x_offset += 160

        combined_height = frame_height + defrag_height
        combined_image = np.zeros((combined_height, frame_width, 3), dtype=np.uint8)
        combined_image[:frame_height] = frame
        combined_image[frame_height:] = defrag_image

        return combined_image

class TensorRTInference:
    """TensorRT inference class for ViTPose model, optimized for GPU with TensorRT 10.8.0.43"""
    def __init__(self, engine_path):
        """Initialize TensorRT engine and allocate buffers on GPU."""
        self.logger = trt.Logger(trt.Logger.INFO)
        self.runtime = trt.Runtime(self.logger)

        # Load engine from file
        with open(engine_path, 'rb') as f:
            engine_bytes = f.read()
            self.engine = self.runtime.deserialize_cuda_engine(engine_bytes)

        if not self.engine:
            raise RuntimeError(f"Failed to load TensorRT engine from {engine_path}")

        self.context = self.engine.create_execution_context()
        
        # Define input and output shapes
        self.input_shape = (1, 3, 256, 192)  # ViTPose standard input
        self.output_shape = (1, 17, 64, 48)  # ViTPose standard output (17 keypoints)
        
        # Tensor names (adjust based on your engine)
        self.input_name = "input"
        self.output_name = "output"
        
        # Allocate GPU buffers
        self.allocate_buffers()

    def allocate_buffers(self):
        """Allocate CUDA memory for inputs and outputs."""
        self.inputs = []
        self.outputs = []
        self.bindings = []

        num_io_tensors = self.engine.num_io_tensors
        for i in range(num_io_tensors):
            tensor_name = self.engine.get_tensor_name(i)
            tensor_shape = self.engine.get_tensor_shape(tensor_name)
            tensor_dtype = trt.nptype(self.engine.get_tensor_dtype(tensor_name))
            
            if tensor_dtype == np.float32:
                torch_dtype = torch.float32
            elif tensor_dtype == np.float16:
                torch_dtype = torch.float16
            else:
                raise ValueError(f"Unsupported dtype: {tensor_dtype}")

            # Explicitly allocate on GPU (CUDA)
            tensor = torch.zeros(tuple(tensor_shape), dtype=torch_dtype, device='cuda')
            self.bindings.append(tensor.data_ptr())
            
            if self.engine.get_tensor_mode(tensor_name) == trt.TensorIOMode.INPUT:
                self.inputs.append(tensor)
            else:
                self.outputs.append(tensor)

            self.context.set_tensor_address(tensor_name, tensor.data_ptr())

    def preprocess_image(self, img):
        """Preprocess image for model input on GPU."""
        # Convert to RGB and resize on CPU first (cv2 doesn't support GPU natively)
        img = cv2.resize(img, (192, 256))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = img.astype(np.float32) / 255.0
        img = img.transpose(2, 0, 1)  # HWC to CHW
        img = np.expand_dims(img, axis=0)  # Add batch dimension
        
        # Move to GPU
        return torch.from_numpy(img).to('cuda')

    def postprocess_output(self, output):
        """Convert model output to keypoints on GPU, then move to CPU for further processing."""
        heatmaps = output.reshape(self.output_shape)
        keypoints = {}
        for batch_idx in range(heatmaps.shape[0]):
            person_keypoints = torch.zeros((17, 3), device='cuda')  # Process on GPU
            for kpt_idx in range(17):
                heatmap = heatmaps[batch_idx, kpt_idx]
                flat_idx = torch.argmax(heatmap)
                y, x = torch.unravel_index(flat_idx, heatmap.shape)
                orig_x = x * (192 / 48)  # Scale to input width
                orig_y = y * (256 / 64)  # Scale to input height
                confidence = heatmap[y, x]
                person_keypoints[kpt_idx] = torch.tensor([orig_x, orig_y, confidence], device='cuda')
            keypoints[batch_idx] = person_keypoints.cpu().numpy()  # Move to CPU for compatibility
        return keypoints
    def infer(self, img):
        """Run inference on an image using GPU."""
        preprocessed = self.preprocess_image(img)
        self.inputs[0].copy_(preprocessed)
        
        # Execute inference asynchronously on GPU
        self.context.execute_async_v3(stream_handle=torch.cuda.current_stream().cuda_stream)
        torch.cuda.synchronize()  # Wait for GPU computation to finish
        
        output = self.outputs[0]  # Already on GPU
        keypoints = self.postprocess_output(output)
        return keypoints

class HorseGaitMonitor:
    def __init__(self, model_path, output_dir="monitoring_output"):
        """Initialize the horse gait monitoring system with TensorRT on GPU."""
        self.model = TensorRTInference(model_path)
        
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)

        self.pose_dirs = {
            "standing": os.path.join(output_dir, "standing"),
            "walking": os.path.join(output_dir, "walking"),
            "sitting": os.path.join(output_dir, "sitting"),
            "pawing": os.path.join(output_dir, "pawing"),
        }
        for dir_path in self.pose_dirs.values():
            os.makedirs(dir_path, exist_ok=True)

        self.prev_positions = None
        self.movement_buffer = []
        self.state_buffer = []
        self.visualizer = CombinedVisualizer()

    def calculate_angle(self, p1, p2, p3):
        """Calculate angle between three points (CPU-based for simplicity)."""
        v1 = p1 - p2
        v2 = p3 - p2
        cos_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
        angle = np.degrees(np.arccos(np.clip(cos_angle, -1.0, 1.0)))
        return angle

    def detect_state(self, keypoints):
        """Detect the horse's current state based on keypoints."""
        movement_detected = False

        for person_id, kp_array in keypoints.items():
            leg_points = {
                "L_F_Hip": kp_array[5][:2],
                "L_F_Knee": kp_array[6][:2],
                "L_F_Paw": kp_array[7][:2],
                "R_F_Hip": kp_array[8][:2],
                "R_F_Knee": kp_array[9][:2],
                "R_F_Paw": kp_array[10][:2],
                "L_B_Hip": kp_array[11][:2],
                "L_B_Knee": kp_array[12][:2],
                "L_B_Paw": kp_array[13][:2],
                "R_B_Hip": kp_array[14][:2],
                "R_B_Knee": kp_array[15][:2],
                "R_B_Paw": kp_array[16][:2]
            }

            angles = {
                "left_front": self.calculate_angle(
                    np.array(leg_points["L_F_Hip"]),
                    np.array(leg_points["L_F_Knee"]),
                    np.array(leg_points["L_F_Paw"])
                ),
                "right_front": self.calculate_angle(
                    np.array(leg_points["R_F_Hip"]),
                    np.array(leg_points["R_F_Knee"]),
                    np.array(leg_points["R_F_Paw"])
                ),
                "left_back": self.calculate_angle(
                    np.array(leg_points["L_B_Hip"]),
                    np.array(leg_points["L_B_Knee"]),
                    np.array(leg_points["L_B_Paw"])
                ),
                "right_back": self.calculate_angle(
                    np.array(leg_points["R_B_Hip"]),
                    np.array(leg_points["R_B_Knee"]),
                    np.array(leg_points["R_B_Paw"])
                )
            }

            if any(100 <= angle <= 130 for angle in [angles["left_front"], angles["right_front"]]):
                return "pawing"
            if all(angle < 90 for angle in angles.values()):
                return "sitting"

            paw_positions = np.array([
                leg_points["L_F_Paw"],
                leg_points["R_F_Paw"],
                leg_points["L_B_Paw"],
                leg_points["R_B_Paw"]
            ])

            if self.prev_positions is not None:
                movements = np.linalg.norm(paw_positions - self.prev_positions, axis=1)
                self.movement_buffer.append(np.mean(movements))
                if len(self.movement_buffer) > 10:
                    self.movement_buffer.pop(0)
                movement_detected = np.mean(self.movement_buffer) > 5.0

            self.prev_positions = paw_positions

        current_state = "walking" if movement_detected else "standing"
        self.state_buffer.append(current_state)
        if len(self.state_buffer) > 15:
            self.state_buffer.pop(0)
        return max(set(self.state_buffer), key=self.state_buffer.count)

    def draw_state_annotation(self, frame, state):
        """Draw state annotation on frame."""
        annotated_frame = frame.copy()
        colors = {
            "standing": (255, 0, 0),  # Blue
            "walking": (0, 0, 255),   # Red
            "sitting": (0, 255, 0),   # Green
            "pawing": (255, 165, 0)   # Orange
        }
        cv2.putText(
            annotated_frame,
            f"State: {state.upper()}",
            (10, 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            1,
            colors.get(state, (128, 128, 128)),
            2
        )
        return annotated_frame

    def save_frame(self, frame, state, frame_count):
        """Save frame to appropriate directory based on state."""
        if state in self.pose_dirs:
            filename = os.path.join(self.pose_dirs[state], f"frame_{frame_count}.jpg")
            cv2.imwrite(filename, frame)

    def process_video(self, video_path):
        """Process video file and analyze horse gait on GPU."""
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError(f"Could not open video file: {video_path}")

        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        combined_height = height + 150  # Add space for visualizer
        output_path = os.path.join(self.output_dir, "video_with_analysis2.mp4")
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, combined_height))

        frame_count = 0
        last_announced_state = None

        cv2.namedWindow("Horse Gait Analysis", cv2.WINDOW_NORMAL)

        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    break

                keypoints = self.model.infer(frame)
                current_state = self.detect_state(keypoints)
                

                if current_state != last_announced_state:
                    print(f"Frame {frame_count}: Horse is now {current_state}")
                    last_announced_state = current_state

                annotated_frame = self.draw_state_annotation(frame, current_state)
                self.save_frame(annotated_frame, current_state, frame_count)

                self.visualizer.update(current_state)
                combined_display = self.visualizer.create_visualization(annotated_frame)

                cv2.imshow("Horse Gait Analysis", combined_display)
                out.write(combined_display)

                frame_count += 1
                if frame_count % 30 == 0:
                    progress = (frame_count / total_frames) * 100
                    print(f"Processing progress: {progress:.1f}%")

                if cv2.waitKey(1) & 0xFF == ord("q"):
                    break

        finally:
            cap.release()
            out.release()
            cv2.destroyAllWindows()
            print(f"\nProcessing complete! Output saved to: {output_path}")

def main():
    """Main function to run the horse gait analysis on GPU."""
    engine_path = "C:/Users/hp/Downloads/vitpose-l-ap10k.engine"
    try:
        # Print GPU info
        print(f"Using GPU: {torch.cuda.get_device_name(0)}")
        monitor = HorseGaitMonitor(engine_path)
        monitor.process_video("E:/vitpose/strach.mp4")
    except Exception as e:
        print(f"Error processing video: {str(e)}")
        raise

if __name__ == "__main__":
    main()

Using GPU: NVIDIA GeForce RTX 3050 Laptop GPU
Frame 0: Horse is now standing
Frame 12: Horse is now pawing
Frame 14: Horse is now standing
Frame 27: Horse is now pawing
Frame 28: Horse is now standing
Processing progress: 3.7%
Frame 58: Horse is now pawing
Frame 59: Horse is now standing
Processing progress: 7.4%
Processing progress: 11.2%
Processing progress: 14.9%
Processing progress: 18.6%
Processing progress: 22.3%
Processing progress: 26.1%
Processing progress: 29.8%
Frame 242: Horse is now pawing
Frame 243: Horse is now standing
Processing progress: 33.5%
Processing progress: 37.2%
Frame 329: Horse is now pawing
Processing progress: 40.9%
Frame 330: Horse is now standing
Frame 355: Horse is now pawing
Frame 357: Horse is now standing
Processing progress: 44.7%
Frame 368: Horse is now pawing
Frame 369: Horse is now standing
Frame 382: Horse is now pawing
Frame 384: Horse is now standing
Processing progress: 48.4%
Frame 395: Horse is now pawing
Frame 396: Horse is now standing
Proc

In [21]:
import cv2
import numpy as np
import matplotlib.pyplot as plt

import pycuda.driver as cuda
import pycuda.autoinit
from easy_ViTPose.easy_ViTPose.vit_utils import engine as engine_utils # TRT Engine creation/save/load utils

from time import time
from PIL import Image
from torchvision.transforms import transforms

from vit_utils.visualization import draw_points_and_skeleton, joints_dict
from vit_utils.dist_util import get_dist_info, init_dist
from vit_utils.top_down_eval import keypoints_from_heatmaps

TRT_PATH = "C:/Users/hp/Downloads/vitpose-l-ap10k.engine"

# SETUP TRT
logger = trt.Logger(trt.Logger.ERROR)
trt_runtime = trt.Runtime(logger)

print("Loading cached TensorRT engine from {}".format(TRT_PATH))
trt_engine = engine_utils.load_engine(trt_runtime, TRT_PATH)

# This allocates memory for network inputs/outputs on both CPU and GPU
inputs, outputs, bindings, stream = engine_utils.allocate_buffers(trt_engine)

# Execution context is needed for inference
context = trt_engine.create_execution_context()

ModuleNotFoundError: No module named 'utils'

In [14]:
# Prepare input data
IMG_PATH = "E:/lame.webp"
img = cv2.cvtColor(cv2.imread(IMG_PATH), cv2.COLOR_BGR2RGB)
org_h, org_w = img.shape[:2]
img_input = cv2.resize(img, (192, 256), interpolation=cv2.INTER_LINEAR)
img_input = img_input.astype(np.float32).transpose(2, 0, 1)[None, ...] / 255

# Copy it into appropriate place into memory
# (inputs was returned earlier by allocate_buffers())
np.copyto(inputs[0].host, img_input.ravel())

# Feed to model
tic = time()
bs = 1
# Fetch output from the model
heatmaps = engine_utils.do_inference(
    context, bindings=bindings, inputs=inputs,
    outputs=outputs, stream=stream)[0]
heatmaps = heatmaps.reshape((1, 25, 64, 48))

elapsed_time = time()-tic
print(f">>> Output size: {heatmaps.shape} ---> {elapsed_time:.4f} sec. elapsed [{elapsed_time**-1: .1f} fps]\n")    

points, prob = keypoints_from_heatmaps(heatmaps=heatmaps, center=np.array([[org_w//2, org_h//2]]), scale=np.array([[org_w, org_h]]),
                                        unbiased=True, use_udp=True)
points = np.concatenate([points[:, :, ::-1], prob], axis=2)

# Visualization 
for pid, point in enumerate(points):
    img = draw_points_and_skeleton(img.copy(), point, joints_dict()['coco']['skeleton'], person_index=pid,
                                    points_color_palette='gist_rainbow', skeleton_color_palette='jet',
                                    points_palette_samples=10, confidence_threshold=0.4)
    
    plt.figure(figsize=(5,10))
    plt.imshow(img)
    plt.title("Result")
    plt.axis('off')
    plt.show()

NameError: name 'inputs' is not defined