# Adding to ROSLib - Solutions

The first step is to make sure you are talking to your Create

In [None]:
import os
os.environ['ROS_DOMAIN_ID']="0"
os.environ['RMW_IMPLEMENTATION']="rmw_cyclonedds_cpp"
!printenv | grep 'ROS'
!printenv | grep 'RMW'

In [None]:
!ros2 topic list

In [None]:
%%writefile Subs/ROS2Lib.py

'''
Stolen from  Kate and Maddie

This library allows you to control the create with a number of simple, synchronous calls.
'''

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
import time

from irobot_create_msgs.action import DriveDistance
from irobot_create_msgs.action import RotateAngle
from irobot_create_msgs.msg import LedColor
from irobot_create_msgs.msg import LightringLeds
from irobot_create_msgs.msg import AudioNoteVector 
from irobot_create_msgs.msg import AudioNote 
from builtin_interfaces.msg import Duration  
import geometry_msgs.msg

from rclpy.qos import qos_profile_sensor_data
from sensor_msgs.msg import BatteryState
from irobot_create_msgs.action import DockServo
from irobot_create_msgs.action import Undock

class Audio(Node):
    '''
    Set up a node that lets you publish notes to the speaker
    '''
    def __init__(self, namespace = '/Picard'):   
        '''
        define the node and set up the publisher
        '''
        super().__init__('audio_publisher')
        
        self.audio_publisher = self.create_publisher(AudioNoteVector, namespace + '/cmd_audio', 10)
        self.audio = AudioNoteVector()
        self.append = False
        
    def beep(self, frequency = 440):
        '''
        publish the requested frequency for 1 second
        '''
        self.audio.notes = [AudioNote(frequency = frequency, max_runtime = Duration(sec = 1, nanosec = 0))]
        self.audio_publisher.publish(self.audio)

class Lights(Node):
    '''
    Create a publisher that will update the LED color on the Create
    '''
    def __init__(self, namespace = '/Picard'):  
        '''
        Create a node, nitialize possible colors and create the publisher as part of this node.
        '''
        super().__init__('led_publisher')

        red = LedColor(red=255, green=0, blue=0)
        green = LedColor(red=0, green=255, blue=0)
        blue = LedColor(red=0, green=0, blue=255)
        yellow = LedColor(red=255, green=255, blue=0)
        pink = LedColor(red=255, green=0, blue=255)
        cyan = LedColor(red=0, green=255, blue=255)
        purple = LedColor(red=127, green=0, blue=255)
        white = LedColor(red=255, green=255, blue=255)
        grey = LedColor(red=189, green=189, blue=189)
        tufts_blue = LedColor(red=98, green=166, blue=10)
        tufts_brown = LedColor(red=94, green=75, blue=60)
        self.colors = [white, red, green, blue, yellow, pink, cyan, purple, grey,tufts_blue,tufts_brown]

        self.lights = self.create_publisher(LightringLeds, namespace + '/cmd_lightring', 10)
        self.lightring = LightringLeds()
        self.lightring.override_system = True  # override the Create error lighting

    def set_color(self, led_colors):
        '''
        set up the proper message type and publish it
        '''
        current_time = self.get_clock().now()
        self.lightring.leds = [self.colors[led_colors]]*6 
        self.lightring.header.stamp = current_time.to_msg()
        self.lights.publish(self.lightring)

    def reset(self):
        '''
        Release contriol of the lights and "gives" it back to the robot. 
        '''
        print('Resetting color to white')
        self.lightring.override_system = False
        white = [self.colors[0]]*6
        self.lightring.leds = white

        self.lights.publish(self.lightring)

class TwistIt(Node):
    '''
    Control the speed of the Create - both linearly and rotationally to form a twist.
    '''
    def __init__(self, namespace = '/Picard'):   
        '''
        define the node and the publisher
        '''
        super().__init__('twist_publisher')
        self.twist_publisher = self.create_publisher(geometry_msgs.msg.Twist, namespace + '/cmd_vel', 10)

    def move(self, x,y,z,th, speed, turn):
        '''
        publish the desired twist
        '''
        twist = geometry_msgs.msg.Twist()
        twist.linear.x = x * speed
        twist.linear.y = y * speed
        twist.linear.z = z * speed
        twist.angular.x = 0.0
        twist.angular.y = 0.0
        twist.angular.z = th * turn
        self.twist_publisher.publish(twist)

class Drive(Node):
    '''
    Set up a node, 'drive_distance_action_client', that is an action 
    client and will drive the Create across a desired distance.  The done flag
    tells the wait function in CreateLib that you are done
    '''
    def __init__(self, namespace = '/Picard'):
        #  define an action client for driving the Create
        self.done = False
        super().__init__('drive_distance_action_client')
        self._action = ActionClient(self, DriveDistance, namespace + '/drive_distance')
        # note that the action client uses message type DriveDistance and we defined the namespace
        
    def set_goal(self, distance = 0.5, speed = 0.15):
        '''
        Set the goal of speed and distance and sets the callback to know when the goal is accepted
        '''
        self.done = False
        goal_msg = DriveDistance.Goal()
        goal_msg.distance = distance
        goal_msg.max_translation_speed = speed

        self._action.wait_for_server()  # Wait for the server to be available and then send the goal.
        self._send_goal_future = self._action.send_goal_async(goal_msg)
        self._send_goal_future.add_done_callback(self.goal_request_callback)

    def goal_request_callback(self, future): # A callback that is executed when the future is complete.
        '''
        run this when the action server responds with either go or no-go and set up another callback for when the action is done
        '''
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            self.done = True
            return
        self.get_logger().info('Goal accepted :)')
        # goal accepted - now wait for it to be executed
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        '''
        run this when the action is done
        '''        
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result))
        self.done = True

class Rotate(Node):
    '''
    Set up a node, 'rotate_action_client', that is an action client and 
    will rotate the Create through a desired angle. The methods are very
    similar to those in Drive
    '''
    def __init__(self, namespace = '/Picard'):
        #  define an action client for turning the Create
        self.done = False
        super().__init__('rotate_action_client')
        self._action = ActionClient(self, RotateAngle, namespace + '/rotate_angle')
        
    def set_goal(self, angle=1.57, max_rotation_speed=0.5):
        self.done = False
        goal_msg = RotateAngle.Goal()
        goal_msg.angle = angle 
        goal_msg.max_rotation_speed = max_rotation_speed

        self._action.wait_for_server() # wait for server
        self._send_goal_future = self._action.send_goal_async(goal_msg)  
        self._send_goal_future.add_done_callback(self.goal_response_callback)
        
    def goal_response_callback(self, future):  # execute when action server says go or no-go.
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            self.done = True
            return
        self.get_logger().info('Goal accepted :)')
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):  #run when goal is completed
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result))
        self.done = True
        
class Dock(Node):
    '''
    Set up a node, 'dock_action_client', that is an action client and 
    will dock the Create. The methods are very similar to those in Drive
    '''
    def __init__(self, namespace = '/Picard'):
        #  define an action client for turning the Create
        self.done = False
        super().__init__('dock_action_client')
        self._action = ActionClient(self, DockServo, namespace + '/dock')
        
    def set_goal(self):
        self.done = False
        goal_msg = DockServo.Goal()

        self._action.wait_for_server() # wait for server
        self._send_goal_future = self._action.send_goal_async(goal_msg)  
        self._send_goal_future.add_done_callback(self.goal_response_callback)
        
    def goal_response_callback(self, future):  # execute when action server says go or no-go.
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            self.done = True
            return
        self.get_logger().info('Goal accepted :)')
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):  #run when goal is completed
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result))
        self.done = True
        
class unDock(Node):
    '''
    Set up a node, 'undock_action_client', that is an action client and 
    will undock the Create. The methods are very similar to those in Drive
    '''
    def __init__(self, namespace = '/Picard'):
        #  define an action client for turning the Create
        self.done = False
        super().__init__('undock_action_client')
        self._action = ActionClient(self, Undock, namespace + '/undock')
        
    def set_goal(self):
        self.done = False
        goal_msg = Undock.Goal()

        self._action.wait_for_server() # wait for server 
        self._send_goal_future = self._action.send_goal_async(goal_msg) 
        self._send_goal_future.add_done_callback(self.goal_response_callback)
      
    def goal_response_callback(self, future):  # execute when action server says go or no-go.
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            self.done = True
            return
        self.get_logger().info('Goal accepted :)')
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):  #run when goal is completed
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result))
        self.done = True

        
class Battery(Node):
    '''
    Set up a node that lets you subscribe to the battery level
    '''
    def __init__(self, namespace = '/Picard'):   
        '''
        define the node and set up the subscriber
        '''
        super().__init__('battery_subscription')
        
        self.subscription = self.create_subscription(BatteryState, namespace + '/battery_state', self.callback, qos_profile_sensor_data)
        self.done = False
        
    def callback(self, msg: BatteryState):
        self.charge = 100*msg.percentage
        self.done = True   # tell the parent program you are done - you have data


And you will have to edit the CreateLib file as well

In [None]:
%%writefile Subs/CreateLib.py
'''
This library talks to the ROS library, setting up some key behaviors
'''

import rclpy, os
from Subs.ROS2Lib import Drive, Rotate, Lights, Audio, TwistIt, Battery, Dock, unDock
from Subs.TCPLib import TCPServer
import time

class Create():
    def __init__ (self, namespace = ''):
        rclpy.init(args = None)
        self.namespace = namespace
        self.drive_client = Drive(namespace)
        self.rotate_client = Rotate(namespace)
        self.led_publisher = Lights(namespace)
        self.audio_publisher = Audio(namespace)
        self.twist_publisher = TwistIt(namespace)
        self.serial = None
        
        self.battery_sub = Battery(namespace)
        self.dock_client = Dock(namespace)
        self.undock_client = unDock(namespace)
        
        print('ros domain: ' + str(os.environ['ROS_DOMAIN_ID']))
        print('middleware: ' + str(os.environ['RMW_IMPLEMENTATION']))
        time.sleep(1)

    def LED(self,color):
        '''
        changes the color of the LED
        '''
        led_colors = color
        print('publish LED ', end = '')
        self.led_publisher.set_color(led_colors)
        time.sleep(1)
        print('done')

    def beep(self, frequency = 440):
        '''
        Beeps
        '''
        print('publish beep ', end = '')
        self.audio_publisher.beep(frequency)
        time.sleep(1)
        print('done')
        
    def twist(self, x, y, z, th, speed, turn):
        '''
        twists the Create - move in x,y,z and rotate theta
        '''
        print('publish twist ', end = '')
        self.twist_publisher.move(x,y,z,th, speed, turn)
        print('done')
            
    def turn(self,angle = 90, speed = 0.5):
        '''
        rotates a given angle
        '''
        
        angle = angle/180*3.1415
        print('turn %0.2f: goal' % angle, end = '')
        self.rotate_client.set_goal(float(angle), speed)
        print(' set ', end = '')
        self.wait(self.rotate_client)
        print('done')

    def forward(self,dist = 0.5):
        '''
        goes the distance and then stops the ROS2 connection
        '''
        speed = 0.25
        print('forward %0.2f: goal' % dist, end = '')
        self.drive_client.set_goal(float(dist),speed)
        print(' set ', end = '')
        self.wait(self.drive_client)
        print('done')

    def dockIt(self):
        '''
        docks the robot
        '''
        print('docking goal', end = '')
        self.dock_client.set_goal()
        print(' set ', end = '')
        self.wait(self.dock_client)
        print('done')

    def undockIt(self):
        '''
        undocks the robot
        '''
        print('undocking goal', end = '')
        future = self.undock_client.set_goal()
        print(' set ', end = '')
        self.wait(self.undock_client)
        #rclpy.spin_until_future_complete(self.undock_client, future)
        print('done')

    def battery(self):
        print('ask battery ', end = '')
        self.wait(self.battery_sub)
        print('done')
        return(' %0.1f ' % self.battery_sub.charge)
        
    def wait(self, client):
        rclpy.spin_once(client)
        while not client.done:
            #time.sleep(0.1)
            print('...', end = '')
            rclpy.spin_once(client)
            
    def close(self):
        print('closing ', end = '')
        self.drive_client.destroy_node()
        self.rotate_client.destroy_node()
        self.led_publisher.destroy_node()
        self.audio_publisher.destroy_node()
        rclpy.shutdown()
        print('done')

# ----------------------------------------serial calls using serial over TCP------------------------- 

    def serial_init(self, IP, PORT, timeout = 0):
        self.serial = TCPServer (IP, PORT, timeout)
        
    def serial_write(self, string):
        if self.serial:
            self.serial.write(string)
        else:
            print('serial not initialized')
            
    def serial_write_binary(self, string):
        if self.serial:
            self.serial.write_binary(string)
        else:
            print('serial not initialized')
            
    def serial_abort(self):
        self.serial.write_binary(b'\x03')
            
    def serial_run(self, code):
        code = code.replace('\n','\r\n')
        code = code.replace('\t','    ')
        if self.serial:
            self.serial.write_binary(b'\x05') # Ctrl E
            self.serial.write(code)
            self.serial.write_binary(b'\x04')  #Ctrl D
        else:
            print('serial not initialized')
            
    def serial_read(self):
        if self.serial:
            return self.serial.read()
        else:
            print('serial not initialized')
        return None
        
    def serial_close(self):
        if self.serial:
            return self.serial.close()
        else:
            print('serial not initialized')


Now restart the kernel and try it all out below

In [None]:
import time, os
from Subs.CreateLib import Create

MyCreate = Create('/rogers')

MyCreate.undockIt()
print(MyCreate.battery())
MyCreate.turn(-45)
MyCreate.forward(0.1)
time.sleep(3) #wait a little so that it is ready to dock again
MyCreate.beep()
MyCreate.dockIt()
MyCreate.close()