# 5 — Topics: Publishers & Subscribers

> Learn real-time data flow between nodes using the Publish/Subscribe pattern.

**Goal:** Write a publisher and subscriber from scratch, understand message types, and see data flowing between two nodes.

In [None]:
#| default_exp topics_pubsub

## 5.1 The Publish/Subscribe Pattern

```
┌──────────────┐                           ┌──────────────┐
│  Publisher    │   /my_topic               │  Subscriber  │
│  Node        │──────────────────────────► │  Node        │
│              │   (msg type: String)       │              │
└──────────────┘                           └──────────────┘
```

- **Publisher:** Creates messages and sends them on a topic
- **Subscriber:** Listens on a topic and processes messages in a callback
- **Decoupled:** Publisher doesn't know (or care) who's listening
- **Many-to-many:** Multiple publishers and subscribers can share a topic

## 5.2 A Simple Publisher

Let's publish a string message every second:

In [None]:
#| export

SIMPLE_PUBLISHER = '''
#!/usr/bin/env python3
"""Publishes a String message to /chatter every second."""

import rclpy
from rclpy.node import Node
from std_msgs.msg import String      # <-- import the message type


class SimplePublisher(Node):
    def __init__(self):
        super().__init__('simple_publisher')

        # Create a publisher
        #   arg1: message type (String)
        #   arg2: topic name ('/chatter')
        #   arg3: QoS queue size
        self.publisher_ = self.create_publisher(
            String,          # message type
            'chatter',       # topic name
            10               # queue size (QoS depth)
        )

        # Timer triggers publishing every 1 second
        self.timer = self.create_timer(1.0, self.timer_callback)
        self.count = 0
        self.get_logger().info('Publisher started on /chatter')

    def timer_callback(self):
        # Create and populate a message
        msg = String()
        msg.data = f'Hello ROS2! Message #{self.count}'

        # Publish it!
        self.publisher_.publish(msg)

        self.get_logger().info(f'Published: "{msg.data}"')
        self.count += 1


def main(args=None):
    rclpy.init(args=args)
    node = SimplePublisher()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()
'''

print(SIMPLE_PUBLISHER)

### Key Points

1. **`create_publisher(String, 'chatter', 10)`**
   - `String` — the message type from `std_msgs.msg`
   - `'chatter'` — the topic name (becomes `/chatter` in the global namespace)
   - `10` — QoS history depth (how many undelivered messages to buffer)

2. **`self.publisher_.publish(msg)`** — sends the message to all subscribers

3. The **timer** drives the publish rate. Without it, you'd need another trigger.

## 5.3 A Simple Subscriber

Now let's write a node that listens to `/chatter`:

In [None]:
#| export

SIMPLE_SUBSCRIBER = '''
#!/usr/bin/env python3
"""Subscribes to /chatter and logs received messages."""

import rclpy
from rclpy.node import Node
from std_msgs.msg import String


class SimpleSubscriber(Node):
    def __init__(self):
        super().__init__('simple_subscriber')

        # Create a subscription
        self.subscription = self.create_subscription(
            String,                    # message type
            'chatter',                 # topic name
            self.listener_callback,    # callback function
            10                         # QoS queue size
        )
        self.get_logger().info('Subscriber started — listening on /chatter')

    def listener_callback(self, msg: String):
        """Called every time a message arrives on /chatter."""
        self.get_logger().info(f'Received: "{msg.data}"')


def main(args=None):
    rclpy.init(args=args)
    node = SimpleSubscriber()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()
'''

print(SIMPLE_SUBSCRIBER)

### How to Run (Two Terminals)

```bash
# Terminal 1 — Publisher
ros2 run my_robot_pkg simple_publisher

# Terminal 2 — Subscriber
ros2 run my_robot_pkg simple_subscriber
```

You'll see:
```
# Terminal 1:
[INFO] Published: "Hello ROS2! Message #0"
[INFO] Published: "Hello ROS2! Message #1"

# Terminal 2:
[INFO] Received: "Hello ROS2! Message #0"
[INFO] Received: "Hello ROS2! Message #1"
```

## 5.4 Inspecting Topics from the CLI

While both nodes are running:

```bash
# List all topics
ros2 topic list -t
# /chatter [std_msgs/msg/String]
# /parameter_events [rcl_interfaces/msg/ParameterEvent]
# /rosout [rcl_interfaces/msg/Log]

# Echo the topic (see messages in real time)
ros2 topic echo /chatter
# data: Hello ROS2! Message #42

# Check the publish rate
ros2 topic hz /chatter
# average rate: 1.000

# Publish a message manually (for testing)
ros2 topic pub /chatter std_msgs/msg/String "{data: 'manual message'}"

# One-shot publish (doesn't repeat)
ros2 topic pub --once /chatter std_msgs/msg/String "{data: 'just once'}"
```

## 5.5 Common Message Types

In robotics, you'll use these messages constantly:

### `geometry_msgs/msg/Twist` — Velocity Commands

```python
from geometry_msgs.msg import Twist

cmd = Twist()
cmd.linear.x = 0.5    # forward speed (m/s)
cmd.linear.y = 0.0    # strafe (usually 0 for diff-drive)
cmd.linear.z = 0.0    # vertical (usually 0)
cmd.angular.x = 0.0   # roll rate
cmd.angular.y = 0.0   # pitch rate
cmd.angular.z = 0.3   # yaw rate (turning) (rad/s)
```

Published on `/cmd_vel` to control robot movement.

### `sensor_msgs/msg/Image` — Camera Images

```python
from sensor_msgs.msg import Image
# Fields: header, height, width, encoding, step, data
# We'll use cv_bridge to convert to/from numpy arrays
```

### `sensor_msgs/msg/LaserScan` — LIDAR Data

```python
from sensor_msgs.msg import LaserScan
# Fields: ranges[], angle_min, angle_max, range_min, range_max
```

## 5.6 A Robot Example: Velocity Publisher

Let's write a practical publisher — a node that sends velocity commands:

In [None]:
#| export

VELOCITY_PUBLISHER = '''
#!/usr/bin/env python3
"""Publishes Twist messages to /cmd_vel — makes a robot drive in a circle."""

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist


class CircleDriver(Node):
    def __init__(self):
        super().__init__('circle_driver')

        # Declare parameters with defaults
        self.declare_parameter('linear_speed', 0.2)   # m/s
        self.declare_parameter('angular_speed', 0.5)   # rad/s
        self.declare_parameter('publish_rate', 10.0)    # Hz

        rate = self.get_parameter('publish_rate').value

        self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
        self.timer = self.create_timer(1.0 / rate, self.timer_callback)

        self.get_logger().info(
            f'CircleDriver started — publishing to /cmd_vel at {rate} Hz'
        )

    def timer_callback(self):
        msg = Twist()
        msg.linear.x = self.get_parameter('linear_speed').value
        msg.angular.z = self.get_parameter('angular_speed').value
        self.publisher_.publish(msg)


def main(args=None):
    rclpy.init(args=args)
    node = CircleDriver()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        # IMPORTANT: send zero velocity on shutdown!
        stop_msg = Twist()  # all zeros
        node.publisher_.publish(stop_msg)
        node.get_logger().info('Stopping — sent zero velocity')
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()
'''

print(VELOCITY_PUBLISHER)

### Safety Note

Notice we send a **zero velocity** on shutdown. This is critical — if your node crashes and the last command was "drive forward", the robot will keep driving! Always ensure graceful shutdown sends a stop command.

## 5.7 QoS in Practice

For most cases, the integer shorthand (queue depth) is sufficient. For advanced control:

```python
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy

# Sensor data QoS (used by camera drivers)
sensor_qos = QoSProfile(
    reliability=ReliabilityPolicy.BEST_EFFORT,
    durability=DurabilityPolicy.VOLATILE,
    history=HistoryPolicy.KEEP_LAST,
    depth=1     # only keep the latest frame
)

# Use it:
self.create_subscription(Image, '/camera/image_raw', self.callback, sensor_qos)
```

### Pre-built QoS Profiles

```python
from rclpy.qos import qos_profile_sensor_data, qos_profile_system_default

# qos_profile_sensor_data = BEST_EFFORT, VOLATILE, KEEP_LAST(5)
# qos_profile_system_default = RELIABLE, VOLATILE, KEEP_LAST(10)
```

**Use `qos_profile_sensor_data` for camera topics** — it's what camera drivers use.

## 5.8 Topic Remapping

What if two nodes use different topic names? **Remap** at launch time:

```bash
# The publisher uses /chatter, but the subscriber expects /messages
ros2 run my_robot_pkg simple_subscriber --ros-args -r chatter:=messages

# Or in a launch file:
Node(
    package='my_robot_pkg',
    executable='simple_subscriber',
    remappings=[('chatter', 'messages')]
)
```

Remapping is powerful — it lets you reuse nodes without changing their code.

## 5.9 Recording and Replaying Topics (rosbag2)

**rosbag2** records topic data to disk so you can replay it later:

```bash
# Record all topics
ros2 bag record -a -o my_recording

# Record specific topics
ros2 bag record /chatter /cmd_vel -o my_recording

# Inspect the recording
ros2 bag info my_recording

# Replay (publishes recorded messages back on their topics)
ros2 bag play my_recording

# Replay at half speed
ros2 bag play my_recording --rate 0.5
```

This is incredibly useful for:
- **Testing** — record real camera data, replay at your desk
- **Debugging** — replay the exact sequence that caused a bug
- **Demos** — show results without real hardware

## 5.10 Exercises

1. **String echo:** Write a subscriber that reverses the string and republishes on `/chatter_reversed`
2. **Counter:** Write a publisher that sends `Int32` messages counting up, and a subscriber that computes a running average
3. **rosbag:** Record `/chatter` for 10 seconds, then replay it while running the subscriber
4. **QoS experiment:** Set the publisher to RELIABLE and subscriber to BEST_EFFORT — verify they connect. Then reverse it — observe the silent failure.

---

**Next →** [Notebook 06: Camera Setup in ROS2](06_camera_setup.ipynb)