In [1]:
#!/usr/bin/env python3
"""
Enhanced Topic Testing Script for my_steel Robot

This script discovers topics on the ROS2 network, subscribes to a configurable
set of important topics, collects simple statistics (message count, sample
payload snippets, message rate) and prints a summary report at the end.

Usage: python3 scripts/test_topics.py [duration_seconds]

The script is conservative: subscribers use a QoS profile suitable for
interacting with micro-ROS bridges (RELIABLE by default) but this can be
adjusted in the configuration below.
"""
import sys
import time
import threading
from typing import Dict, List, Any

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy

# Import message types
from std_msgs.msg import Int32
from sensor_msgs.msg import Imu, JointState, Range, Illuminance
from geometry_msgs.msg import Twist
from rcl_interfaces.msg import ParameterEvent, Log


class TopicTester(Node):
    """Extensible topic testing node that collects basic stats for topics."""

    def __init__(self):
        super().__init__('topic_tester')

        # Default QoS profile (reliable, keep last, depth=10)
        self.qos_profile = QoSProfile(
            reliability=ReliabilityPolicy.RELIABLE,
            history=HistoryPolicy.KEEP_LAST,
            depth=10,
        )

        # Topic configurations: add or update entries here to watch more topics
        self.topic_configs: Dict[str, Dict] = {
            '/ddd/imu': {
                'msg_type': Imu,
                'description': 'IMU sensor data (acceleration, gyroscope)',
                'expected_rate': 5.0, # Angepasst an die Firmware (delay(200))
                'timeout': 5.0,
                'test_fields': ['linear_acceleration', 'angular_velocity', 'header'],
            },
            '/ddd/range_tof': {
                'msg_type': Range,
                'description': 'Time-of-Flight distance sensor data',
                'expected_rate': 5.0, # Angepasst an die Firmware (delay(200))
                'timeout': 3.0,
                'test_fields': ['range', 'header'],
            },
            '/ddd/illuminance': {
                'msg_type': Illuminance,
                'description': 'Ambient light sensor data',
                'expected_rate': 5.0, # Angepasst an die Firmware (delay(200))
                'timeout': 3.0,
                'test_fields': ['illuminance', 'header'],
            },
            '/ddd/cmd_vel': {
                'msg_type': Twist,
                'description': 'Motor velocity commands',
                'expected_rate': 10.0,
                'timeout': 3.0,
                'test_fields': ['linear', 'angular'],
            },
            '/joint_states': {
                'msg_type': JointState,
                'description': 'Joint state information',
                'expected_rate': 5.0, # Angepasst an die Firmware (delay(200))
                'timeout': 3.0,
                'test_fields': ['name', 'position', 'velocity'],
            },
            '/pico_count': {
                'msg_type': Int32,
                'description': 'Pico heartbeat counter',
                'expected_rate': 1.0,
                'timeout': 3.0,
                'test_fields': ['data'],
            },
            '/parameter_events': {
                'msg_type': ParameterEvent,
                'description': 'Parameter change events',
                'expected_rate': 0.1,
                'timeout': 10.0,
                'test_fields': ['node'],
            },
            '/rosout': {
                'msg_type': Log,
                'description': 'ROS logging messages',
                'expected_rate': 1.0,
                'timeout': 5.0,
                'test_fields': ['level', 'name', 'msg'],
            },
        }

        # Runtime tracking structures
        self.subscribers: Dict[str, Any] = {}
        self.message_counts: Dict[str, int] = {}
        self.last_message_time: Dict[str, float] = {}
        self.first_message_time: Dict[str, float] = {}
        self.sample_messages: Dict[str, Any] = {}
        self.lock = threading.Lock()

    def create_subscriber_for_topic(self, topic_name: str, config: Dict):
        """Create a subscription for topic_name using the configured msg_type."""
        msg_type = config.get('msg_type')
        if msg_type is None:
            self.get_logger().warning(f'No msg_type for {topic_name}, skipping')
            return

        def _cb(msg, topic=topic_name):
            now = time.time()
            with self.lock:
                self.message_counts[topic] = self.message_counts.get(topic, 0) + 1
                self.last_message_time[topic] = now
                if topic not in self.first_message_time:
                    self.first_message_time[topic] = now
                if topic not in self.sample_messages:
                    self.sample_messages[topic] = []
                if len(self.sample_messages[topic]) < 3:
                    try:
                        self.sample_messages[topic].append(str(msg))
                    except Exception:
                        self.sample_messages[topic].append('<unserializable message>')

        qos = config.get('qos', self.qos_profile)
        sub = self.create_subscription(msg_type, topic_name, _cb, qos)
        self.subscribers[topic_name] = sub
        self.get_logger().info(f'🔗 Subscribed to {topic_name}')

    def discover_and_test_topics(self):
        """Discover topics and subscribe to those in the configuration."""
        topic_list = self.get_topic_names_and_types()
        available = {name for (name, _) in topic_list}
        self.get_logger().info(f'🔍 Found {len(available)} topics:')
        for t in sorted(available):
            self.get_logger().info(f'  - {t}')

        for topic_name, cfg in self.topic_configs.items():
            if topic_name not in available:
                self.get_logger().warning(f'⚠️  Topic {topic_name} not available — subscribing anyway')
            self.create_subscriber_for_topic(topic_name, cfg)

    def compute_rate(self, topic: str) -> float:
        with self.lock:
            count = self.message_counts.get(topic, 0)
            start = self.first_message_time.get(topic)
            last = self.last_message_time.get(topic)
        if count <= 1 or start is None or last is None:
            return 0.0
        elapsed = last - start
        if elapsed <= 0.0:
            return float(count)
        return float(count) / elapsed

    def print_status_report(self):
        """Print a summary report for all configured topics."""
        now = time.time()
        print('\n' + '=' * 80)
        print('📊 TOPIC TEST REPORT')
        print('=' * 80)

        for topic_name, cfg in self.topic_configs.items():
            desc = cfg.get('description', '')
            expected = cfg.get('expected_rate', None)
            timeout = cfg.get('timeout', 5.0)
            with self.lock:
                count = self.message_counts.get(topic_name, 0)
                last = self.last_message_time.get(topic_name, 0)
                samples = self.sample_messages.get(topic_name, [])
            rate = self.compute_rate(topic_name)
            
            status = '❓ Not tested'
            if count == 0:
                status = '🚫 NOT_AVAILABLE'
            else:
                if (now - last) > timeout:
                    status = f'⏰ TIMEOUT (no msg in {now-last:.1f}s)'
                else:
                    status = '✅ OK'
                    if expected is not None and rate < expected * 0.7:
                        status = '⚠️ LOW_RATE'

            print(f"\n🔸 {topic_name}\n   Description: {desc}\n   Status: {status}")
            print(f"   Messages: {count} (Observed Rate: {rate:.2f} Hz, Expected: {expected} Hz)")
            if samples:
                print('   Sample messages:')
                for s in samples:
                    print('    ', s[:250] + ('...' if len(s) > 250 else ''))

        print('\nTest complete.\n')

    def run_test(self, duration: float = 30.0):
        """Run the test for duration seconds and periodically print progress."""
        self.discover_and_test_topics()

        start_time = time.time()
        next_report = start_time + 5.0
        self.get_logger().info(f'🚀 Starting topic test for {duration}s...')

        while rclpy.ok():
            now = time.time()
            elapsed = now - start_time
            if now >= next_report:
                self.get_logger().info(f'⏱ Runtime: {elapsed:.1f}s')
                next_report = now + 5.0
            if elapsed >= duration:
                break
            rclpy.spin_once(self, timeout_sec=0.1)

        self.print_status_report()


def main():
    rclpy.init()
    duration = 30.0
    if len(sys.argv) > 1:
        try:
            duration = float(sys.argv[1])
        except Exception:
            print('Invalid duration argument, using default 30s')

    tester = TopicTester()
    try:
        tester.run_test(duration)
    except KeyboardInterrupt:
        print('Interrupted by user')
    finally:
        tester.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()


Invalid duration argument, using default 30s


[INFO] [1759250319.969226441] [topic_tester]: 🔍 Found 10 topics:
[INFO] [1759250319.973919709] [topic_tester]:   - /ddd/cmd_vel
[INFO] [1759250319.977142852] [topic_tester]:   - /ddd/illuminance
[INFO] [1759250319.980558587] [topic_tester]:   - /ddd/imu
[INFO] [1759250319.983950197] [topic_tester]:   - /ddd/odom
[INFO] [1759250319.987427637] [topic_tester]:   - /ddd/range
[INFO] [1759250319.990905241] [topic_tester]:   - /ddd/range_tof
[INFO] [1759250319.994142180] [topic_tester]:   - /joint_states
[INFO] [1759250319.999112114] [topic_tester]:   - /parameter_events
[INFO] [1759250320.002538412] [topic_tester]:   - /pico_count
[INFO] [1759250320.005670953] [topic_tester]:   - /rosout
[INFO] [1759250320.012622419] [topic_tester]: 🔗 Subscribed to /ddd/imu
[INFO] [1759250320.017792839] [topic_tester]: 🔗 Subscribed to /ddd/range_tof
[INFO] [1759250320.355478745] [topic_tester]: 🔗 Subscribed to /ddd/illuminance
[INFO] [1759250320.360072431] [topic_tester]: 🔗 Subscribed to /ddd/cmd_vel
[INFO]


📊 TOPIC TEST REPORT

🔸 /ddd/imu
   Description: IMU sensor data (acceleration, gyroscope)
   Status: 🚫 NOT_AVAILABLE
   Messages: 0 (Observed Rate: 0.00 Hz, Expected: 5.0 Hz)

🔸 /ddd/range_tof
   Description: Time-of-Flight distance sensor data
   Status: 🚫 NOT_AVAILABLE
   Messages: 0 (Observed Rate: 0.00 Hz, Expected: 5.0 Hz)

🔸 /ddd/illuminance
   Description: Ambient light sensor data
   Status: 🚫 NOT_AVAILABLE
   Messages: 0 (Observed Rate: 0.00 Hz, Expected: 5.0 Hz)

🔸 /ddd/cmd_vel
   Description: Motor velocity commands
   Status: 🚫 NOT_AVAILABLE
   Messages: 0 (Observed Rate: 0.00 Hz, Expected: 10.0 Hz)

🔸 /joint_states
   Description: Joint state information
   Status: 🚫 NOT_AVAILABLE
   Messages: 0 (Observed Rate: 0.00 Hz, Expected: 5.0 Hz)

🔸 /pico_count
   Description: Pico heartbeat counter
   Status: 🚫 NOT_AVAILABLE
   Messages: 0 (Observed Rate: 0.00 Hz, Expected: 1.0 Hz)

🔸 /parameter_events
   Description: Parameter change events
   Status: 🚫 NOT_AVAILABLE
   Messages