# rclpy Snippets

[![nbviewer](https://raw.githubusercontent.com/jupyter/design/master/logos/Badges/nbviewer_badge.svg)](https://nbviewer.jupyter.org/github/CollaborativeRoboticsLab/foundations-of-robotics-labs/blob/master/1-sense-think-act/02-rclpy-snippets.ipynb)
[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/CollaborativeRoboticsLab/foundations-of-robotics-labs/master?filepath=1-sense-think-act/02-rclpy-snippets.ipynb)

The `Sphero RVR` is a great robot for learning some robotics, but how do we work with ROS? This tutorial will show you how to use snippets of ROS2 code using `rclpy` in python to interact with the RVR.

## Schedule of Notebooks

Try to complete the following notebooks in the order listed below. Each notebook has an estimated duration.

1. [Getting Started](01-getting-started.ipynb) - **60 minutes**
3. [Rclpy Snippets](02-rclpy-snippets.ipynb) - **60 minutes**
4. [Assignment](03-sta-assignment.ipynb) - **120 minutes**

> **Note:**
> 
> **This is the main resource to derive a solution for the lab assignment. You can use this notebook as a reference to complete the lab assignment.**

### Testing the Snippets

To test the snippets, you will need to have the RVR running. You will also need to have the `sphero_rvr_ros` package installed and running the main application stack.

```bash
# in a terminal on the raspberry pi onboard the RVR
# as a reminder, you can run the main application stack with
ros2 launch sphero_rvr_bringup sphero_rvr_bringup.launch
```

> **Hints**
> - You can then run the snippets directly in the jupyter notebook
> - You can open terminals in jupyter environment by clicking `File > New > Terminal` in the top left corner of the jupyter notebook.
> - **if you encounter errors when running the snippets, try restarting the python kernel in the jupyter notebook.**

## Setting up different ROS Environments to Communicate

> **NOTE:**
> 
> We need to set up the ROS network to be able to communicate with the RVR from our computer.
> Refer to the instructions in the [ros tooling notebook](./01-ros-tooling.ipynb) to understand how the network is set up for the ROS environment on the computer.

These snippets include the necessary environment variables to communicate with the RVR from your computer. They just need to be configured for the right RVR and run before any other ROS related code.

## Setting up the remote ROS environment with Python

The Jupyter Notebook is running on another computer. We need to tell the RVR where to send the ROS messages.

**The following snippets will help python find the ROS environment and communicate with the RVR:**

In [None]:
"""
setting up ros enrionment variables
ROS 2 networking setup for shared Wi‑Fi with discovery server
"""

import os

#####################################################################
# # !!!! DON'T FORGET TO CHANGE THIS TO THE ROBOT'S IP ADDRESS !!!! #
#####################################################################

# change the ROS environment variable to the robot's IP address
ROBOT_HOSTNAME = "192.168.1.101"     # robot (or discovery server) IP/hostname
# for example, this could be an IP or a hostname depending on what is reachable on the network
CONSOLE_HOSTNAME = "192.168.1.201"   # this laptop/container IP/hostname
# for example, notice that this is the IP of the computer running this script not the robot

# another example using hostnames is as follows
# ROBOT_HOSTNAME = 'robotics1' # for example, this could be an IP or a hostname depending on what is reachable on the network
# CONSOLE_HOSTNAME = 'group1' # for example, notice that this is the hostname of the computer running this script not the robot

DISCOVERY_SERVER_PORT = 11811        # fastdds discovery server port
DISCOVERY_SERVER_HOST = ROBOT_HOSTNAME
ROS_DOMAIN_ID = "23"                 # the ROS network unique domain ID per robot - change this to match the robot's domain ID

# the following lines set up the networking variables for the scripts to run properly
# ROS 2 networking (Fast DDS discovery server)
os.environ["ROS_DOMAIN_ID"] = ROS_DOMAIN_ID           # isolates traffic by domain
os.environ["ROS_HOSTNAME"] = CONSOLE_HOSTNAME         # advertises the correct address
os.environ["ROS_LOCALHOST_ONLY"] = "0"                # allow LAN comms
os.environ["RMW_IMPLEMENTATION"] = "rmw_fastrtps_cpp" # ensure Fast DDS middleware is used
os.environ["ROS_DISCOVERY_SERVER"] = f"{DISCOVERY_SERVER_HOST}:{DISCOVERY_SERVER_PORT}"  # discovery server location

In [None]:
# ROS 2 uses a communication middleware called RMW (ROS Middleware)
# to start RMW correctly, we need to initialize rclpy
import rclpy  # Initialize rclpy once at the top

# Initialize rclpy once - this must be done before creating any nodes
if not rclpy.ok():
    rclpy.init()
    print('rclpy initialized')

## Receiving Messages

A core part of ROS programming is the use of ***`callback`*** functions. In ROS, A callback function is executed when a message is received on a topic. This is common place in many programs that perform communications. Messaging applications such as Messenger, Slack, or Discord also use a callback pattern to pass messages around the world.

The following code snippet shows how to create a callback function, subscribe to a topic, and execute the callback function when a message is received.

In [None]:
"""receiving message data"""


# import python libraries
import rclpy # ros library for python
from rclpy.node import Node # import the Node class from rclpy
from sensor_msgs.msg import Imu # import the Imu message type definition


# create a class to subscribe to sensor messages and show the data
class SubscribeAndShowNode(Node):
    def __init__(self):
        super().__init__('subscribe_and_show_node')
        # subscribe to the topic ~imu from the rvr namespace
        self.imu_sub = self.create_subscription(Imu, '/rvr_driver/imu/data', self.callback, 10)

    # callback function to receive the data
    def callback(self, msg):
        # print the data received
        print('IMU DATA FROM FRAME: {}'.format(msg.header.frame_id))

# main function
def main():
    print('Starting SubscribeAndShowNode...')

    try:
        # create an instance of the SubscribeAndShowNode class
        # this will subscribe to the topic and show the data
        sub_show_node = SubscribeAndShowNode()

        # spin() simply keeps python from exiting until this node is stopped
        rclpy.spin(sub_show_node)

    except KeyboardInterrupt:
        print('Interrupted')
    finally:
        print('Node has shutdown')

print('the SubscribeAndShowNode class has been declared')

#### Test out subscribing to a topic

Run the following code snippet to see how to subscribe to a topic and print out the message. Inspect the ROS Network Graph to see the node that was created and the topic it is subscribing to.

> ⚠️ **Warning: ROS 2 nodes may not stop with the Jupyter stop button or interrupt!**
>
> The following cell will run forever. You will need to use **Kernel → Restart** to stop it.
> 
> When running `rclpy.spin()` or timer-based ROS 2 nodes in a Jupyter notebook, the stop button and `Kernel → Interrupt` often do **not** work. This is a limitation of ROS 2 and Jupyter integration.
>
> **If a cell running a ROS 2 node will not stop:**
> 
> - Use **Kernel → Restart** to force-stop the node and free resources.
> - After restarting, always re-run the ROS environment setup cells before running any node code again.
> 
> This is expected behaviour for ROS 2 in notebooks and is not a bug in your code.

In [None]:
"""run the main function to start the node and test it"""
main()

## Sending Commands

Sending commands in a ROS application is very similar to receiving messages but instead of receiving messages, you are sending them. Another component in the system will listen for and receive these messages. The following code snippet shows how to create a ***`publisher`***, publish a message, and shutdown the publisher.

In [None]:
"""sending commands to the robot to move it"""


# import python libraries
import rclpy # ros library for python
from rclpy.node import Node # import the Node class from rclpy
from geometry_msgs.msg import Twist # import the Twist message type definition


# create a class to send commands to the robot
class PublishTwistCommand(Node):
    # the constructor
    # the default value for the angular speed is 0.0
    # change this value when instantiating the class to make the robot turn
    def __init__(self, angular_speed=0.0):
        super().__init__('publish_twist_command_node')

        # create a publisher to send commands to the robot
        self.cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)

        # create a Twist message
        self.twist = Twist()

        # create a rate to publish the data
        self.rate = self.create_rate(2)

        # create a variable to hold the angular speed
        self.angular_speed = angular_speed

    def spin(self):
        # print a message
        print('starting the PublishTwistCommand node')

        # loop until the node is shutdown
        while rclpy.ok():
            # set the linear and angular speeds in the Twist message
            self.twist.linear.x = 0.0
            self.twist.angular.z = self.angular_speed

            # publish the Twist message
            self.cmd_vel_pub.publish(self.twist)

            # sleep for the remainder of the loop
            self.rate.sleep()


# main function
def main():
    print('Starting PublishTwistCommand...')

    try:
        # create an instance of the PublishTwistCommand class
        # this will publish the Twist message
        ptc = PublishTwistCommand(1.0) # change the angular speed to make the robot turn

        # this is an example of a manual loop in ROS
        # this also keeps python from exiting until this node is stopped
        # but it blocks execution until the node is stopped
        ptc.spin()

    except KeyboardInterrupt:
        print('Interrupted')
    finally:
        # stop the robot when the node is stopped
        # send a Twist message with linear and angular speeds set to 0.0
        ptc.cmd_vel_pub.publish(Twist())
        print('Node has shutdown')

print('the PublishTwistCommand class has been declared')

#### Test out publishing to a topic

Run the following code snippet to see how to publish to a topic. Inspect the ROS Network Graph to see the node that was created and the topic it is publishing to.

In [None]:
# run the main function to start the node and test it
main()

## ROS-Style Loops

In robotics it is common to have a loop of activities that runs continuously at regular intervals. This is often referred to as a loop in programming. The best practice for creating loops in a ROS application is to use a ***`Timer`***. The following code snippet shows how to create a ROS-Style loop using a ROS Timer.

In [None]:
"""timer based loop that cycles LED colours"""


import rclpy # ros library for python
from rclpy.node import Node # import the Node class from rclpy
from std_msgs.msg import ColorRGBA # import the ColorRGBA message type definition
import random # python library to generate random numbers


# create a class to send commands to the robot
class TimerBasedLoopNode(Node):
    def __init__(self):
        super().__init__('timer_based_loop_node')
        # create a timer to call the callback function every 1 second
        self.timer = self.create_timer(1.0, self.callback)
        # create a publisher to send commands to the robot
        self.pub = self.create_publisher(ColorRGBA, '/rvr_driver/led/headlight', 10)

    # callback function to send commands to the robot
    def callback(self):
        # create a message
        msg = ColorRGBA()

        # add a random colour to the message
        msg.r = 255.0*random.random()
        msg.g = 255.0*random.random()
        msg.b = 255.0*random.random()
        msg.a = 255.0

        # publish the message
        self.pub.publish(msg)


# main function
def main():
    print('Starting TimerBasedLoopNode...')

    try:
        # create an instance of the TimerBasedLoopNode class
        # this will publish the ColorRGBA message
        tbn = TimerBasedLoopNode()

        # spin() simply keeps python from exiting until this node is stopped
        rclpy.spin(tbn)

    except KeyboardInterrupt:
        print('Interrupted')
    finally:
        print('Node has shutdown')

print('the TimerBasedLoopNode class has been declared')

#### Test out a ROS-Style Loop

Run the following code snippet to see how to create a ROS-Style loop. Inspect the ROS Network Graph to see the node that was created and the topic it is publishing to.

In [None]:
# run the main function to start the node and test it
main()

## Putting it all together

Now that we have seen how to receive messages, send commands, and create ROS-style loops, we can put it all together to create a simple ROS program. The following code snippet shows how to create a ROS node that receives messages from the RVR's colour sensor and sends commands to the RVR's LEDs.

In [None]:
"""ROS node that cycles LED colours based on a colour sensor"""


import rclpy # ros library for python
from rclpy.node import Node # import the Node class from rclpy
from std_msgs.msg import ColorRGBA # import the ColorRGBA message type definition


# create a class to represent the ROS Node
class ObservedColourProjectorNode(Node):
    def __init__(self):
        super().__init__('observed_colour_projector_node')
        # subscribe to the topic ~ground_color from the rvr namespace
        self.ground_color_sub = self.create_subscription(ColorRGBA, '/rvr_driver/ground_color', self.ground_color_cb, 10)

        # create a publisher to send commands to the robot
        self.headlight_color_pub = self.create_publisher(ColorRGBA, '/rvr_driver/led/headlight', 10)

        # create a timer to manage the state of the node
        # (hint) Sometimes node logic is implemented in subscriber callbacks but this is not recommended if the logic is complex or time consuming.
        # Timers are a better place to implement node logic.
        # this is a typical place to see node logic such as state machines
        self.timer = self.create_timer(1.0, self.node_state_timer_cb)

        # state variable to keep track of the current colour
        self.current_color = ColorRGBA()

    # callback function to receive the data
    def ground_color_cb(self, msg):
        # print the data received
        print('COLOUR SENSOR DATA: {}, {}, {}, {}'.format(msg.r, msg.g, msg.b, msg.a))

        # store the data in the state variable
        self.current_color = msg

    # timer callback function to manage the state of the node
    def node_state_timer_cb(self):
        # create a message
        msg = ColorRGBA()

        # add the current sensed colour to the message
        msg.r = self.current_color.r
        msg.g = self.current_color.g
        msg.b = self.current_color.b
        msg.a = 255.0

        # publish the message
        self.headlight_color_pub.publish(msg)


# main function
def main():
    print('Starting ObservedColourProjectorNode...')

    try:
        # create an instance of the ObservedColourProjectorNode class
        # this will subscribe to the topic and show the data
        ocpn = ObservedColourProjectorNode()

        # spin() simply keeps python from exiting until this node is stopped
        rclpy.spin(ocpn)

    except KeyboardInterrupt:
        print('Interrupted')
    finally:
        print('Node has shutdown')

print('the ObservedColourProjectorNode class has been declared')

#### Test out the ROS Program

Run the following code snippet to see how to create a ROS program. Inspect the ROS Network Graph to see the node that was created and the topics it is subscribing and publishing to.

In [None]:
# run the main function to start the node and test it
main()