# Ros Topic

<img src="./images/topic.jpg" width="30%">

ROS Topic to komunikacja na zasadzie Publish - Subscribe. 

Działa to na zasadzie *kanału* komunikacji, gdzie na każdym *topicu* komunikowane są wiadomości w danym temacie i o tym samym formacie danych.

Możemy podejrzeć wiadomości 

In [1]:
from run_in_term import run_lxterminal

In [2]:
import rclpy
from rclpy.node import Node

from std_msgs.msg import String


class MinimalNode(Node):

    def __init__(self):
        super().__init__('minimal_publisher')
    


rclpy.init()

minimal_node = MinimalNode()


# możemy podejrzeć wszytkie topiki aktualnie w systemie

In [5]:
# w konsoli 
#run_lxterminal("ros2 topic list")
# jeśli nie masz otwarte symulacji
run_lxterminal("ros2 launch turtlebot3_gazebo turtlebot3_world.launch.py")


In [9]:
minimal_node.get_topic_names_and_types()

[('/camera/camera_info', ['sensor_msgs/msg/CameraInfo']),
 ('/camera/image_raw', ['sensor_msgs/msg/Image']),
 ('/camera/image_raw/compressed', ['sensor_msgs/msg/CompressedImage']),
 ('/camera/image_raw/compressedDepth', ['sensor_msgs/msg/CompressedImage']),
 ('/camera/image_raw/theora', ['theora_image_transport/msg/Packet']),
 ('/clock', ['rosgraph_msgs/msg/Clock']),
 ('/cmd_vel', ['geometry_msgs/msg/Twist']),
 ('/imu', ['sensor_msgs/msg/Imu']),
 ('/joint_states', ['sensor_msgs/msg/JointState']),
 ('/odom', ['nav_msgs/msg/Odometry']),
 ('/parameter_events', ['rcl_interfaces/msg/ParameterEvent']),
 ('/performance_metrics', ['gazebo_msgs/msg/PerformanceMetrics']),
 ('/robot_description', ['std_msgs/msg/String']),
 ('/rosout', ['rcl_interfaces/msg/Log']),
 ('/scan', ['sensor_msgs/msg/LaserScan']),
 ('/tf', ['tf2_msgs/msg/TFMessage']),
 ('/tf_static', ['tf2_msgs/msg/TFMessage']),
 ('/wiadomosc', ['std_msgs/msg/String'])]

# Stwórzmy najprostrzego *publishera*

Publisher to obiekt pozwalający wysyłać wiadomości na danym topicu

In [6]:
 publisher_wiadomosc = minimal_node.create_publisher(String, 'wiadomosc', 10)

# Wykonaj jeszcze raz wylistowanie topików



Możemy teraz wysłać wiadomości na topic wiadomosc

In [10]:
publisher_wiadomosc.publish("hello") #nie zadziała

TypeError: Expected <class 'std_msgs.msg._string.String'>, got <class 'str'>

# ale wcześniej musimy skonstuować odpowiedni obiekt *message*

In [11]:
import std_msgs

wiadomosc = std_msgs.msg.String()


wiadomosc.data = "hello"

publisher_wiadomosc.publish(wiadomosc)

### oczytajmy tą wiadomość w konsoli
ros2 topic echo -- pokaże wiadomości ale tylko te które powstały po jego stworzeniu, wyślij wiadomość jeszcze raz

In [12]:
run_lxterminal("ros2 topic echo wiadomosc")

In [10]:
from IPython.display import IFrame

IFrame('http://localhost:6080', width=800, height=450)

## możemy też wiadomość wysyłać co jakiś czas, z wykorzystaniem timera

In [14]:
def timer_callback():
        wiadomosc.data = "wiadomość"
        publisher_wiadomosc.publish(wiadomosc)
try:
    timer.cancel()
except NameError:
    pass
timer_period = 0.5  # seconds
timer = minimal_node.create_timer(timer_period, timer_callback)


In [15]:
# Timer będzie działał tylko wtedy gdy będziemy *spinnować*
# zatrzymaj jupter interruptem (kwadrat)
rclpy.spin(minimal_node)

KeyboardInterrupt: 

In [16]:
timer.cancel() # zatrzymajmy timer aby nie było go przy kolejnym spinie

# Ćwiczenie: Publikuj nową wiadomość za każdym razem, np powiększoną o jeden

## Możemy także interaktywnie wysyłać wiadomości z jupyter, za pomocą *interact*
Najłatwiej do tego celu stworzyć funkcję która będzie przyjmować tekst. Wtedy interact automatycznie dobierze jako wejście pole tekstowe

In [7]:
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
import std_msgs

#interact manual daje dodatkowy przycisk aby wysyłać wiadomość dopiero po jego naciśnięciu
@interact_manual
def wyslij_wiadomosc(wiadomosc_tekst = "hello world"):
    wiadomosc = std_msgs.msg.String()
    wiadomosc.data = wiadomosc_tekst
    publisher_wiadomosc.publish(wiadomosc)

interactive(children=(Text(value='hello world', continuous_update=False, description='wiadomosc_tekst'), Butto…

In [8]:
publisher_wiadomosc.get_subscription_count()

2

# Ćwiczenie (big):

Posterujmy prawdziwym robocikiem

Robot potrzebuje komend przesyłanych topickiem /cmd_vel

'/cmd_vel', ['geometry_msgs/msg/Twist']

In [16]:
from geometry_msgs.msg import Twist

print(Twist())

# skonstruuj publishera na topic "/cmd_vel"

#publisher_cmd_vel = ...

geometry_msgs.msg.Twist(linear=geometry_msgs.msg.Vector3(x=0.0, y=0.0, z=0.0), angular=geometry_msgs.msg.Vector3(x=0.0, y=0.0, z=0.0))


In [18]:
from IPython.display import IFrame

IFrame('http://localhost:6080', width=800, height=450)

In [19]:
# wyslij komendę troszeczkę robota ruszającą do przodu


troche_predkosc_do_przodu = Twist()
#troche_predkosc_do_przodu.linear.x = ...
# opublikuj wiadomosc

geometry_msgs.msg.Twist(linear=geometry_msgs.msg.Vector3(x=0.0, y=0.0, z=0.0), angular=geometry_msgs.msg.Vector3(x=0.0, y=0.0, z=0.0))

# Zatrzymaj robocika

In [None]:
zatrzymaj_predkosc = Twist()
publisher_cmd_vel.publish(zatrzymaj_predkosc)


# Możemy też dorobić sobie guziki

In [51]:
from ipywidgets import Button, HBox, VBox, Label

publisher_cmd_vel = minimal_node.create_publisher(Twist, 'cmd_vel', 10)
przyciski = ['lewo', 'przod', 'prawo', 'tyl','stop']
items = [Button(description=w) for w in przyciski]



def w_lewo(b):
    print(b)
    w_lewo_twist = Twist()
    w_lewo_twist.angular.z = 0.2
    publisher_cmd_vel.publish(w_lewo_twist)


items[0].on_click(w_lewo)


VBox([HBox([items[0],items[1],items[2]]),HBox([items[4],items[3],items[4]])])

#left_box = VBox([items[0], items[2],items[1]])
#right_box = VBox([items[2], items[3]])
#HBox([left_box, right_box, items[4]])

VBox(children=(HBox(children=(Button(description='lewo', style=ButtonStyle()), Button(description='przod', sty…

Button(description='lewo', style=ButtonStyle())


In [None]:
# Cwiczenie dorob guziki

# Dla ambitnych: zrob fabryke

In [52]:
for item in items:
    item.unobserve_all()

In [55]:
publisher_cmd_vel.publish(Twist())

[Next exercise: 4. ROS Topic - Subscriber](4.%20ROS%20Topic%20-%20Subscriber.ipynb)