In [1]:
import rospy
import numpy as np
import random
from turtlesim.srv import Spawn
from turtlesim.srv import Kill
from turtlesim.srv import SetPen
from turtlesim.msg import Pose
from geometry_msgs.msg import Twist
from pkg_tsr.srv import Task
from std_msgs.msg import Float32
from std_msgs.msg import String
import math

rospy.init_node("Roboty")

In [2]:
kill_robot = rospy.ServiceProxy('kill',Kill)
class Robot():
    position = Pose()
    def __init__(self, name, x, y):
        self.name = name
        create_new_turtle = rospy.ServiceProxy('spawn', Spawn)
        create_new_turtle(x, y, 0, name)
        
        position = Pose(x,y,0,0,0)
        self.batteryDrain = rospy.get_param("robot/batteryDrain")
        self.batteryCharge = rospy.get_param("robot/batteryCharge")
        
        self.set_pen = rospy.ServiceProxy(name+'/set_pen',SetPen)
        self.set_pen(0,0,0,0,1)
        
        self.follow_service = rospy.Service(name+'/go_to_goal', Task, self.go_to_goal)
        self.spiral_service = rospy.Service(name+'/do_spiral_goal', Task, self.do_spiral_goal)
        
        self.go_to = rospy.ServiceProxy(name+'/go_to_goal',Task)
        self.do_spiral = rospy.ServiceProxy(name+'/do_spiral_goal',Task)
        
        self.velocity_pub = rospy.Publisher(name+'/cmd_vel', Twist, queue_size=10)
        self.pose_sub = rospy.Subscriber(name+'/pose',Pose, self.callback_pose)
        self.battery_pub = rospy.Publisher(name+'/battery',Float32, queue_size=10)
        self.battery = 100.0
        
        rospy.set_param(self.name+"/tryb_pracy","autonomiczny")
        
        self.req_sub = rospy.Subscriber(name +'/polecenia',String, self.callback_req)
        self.info_pub = rospy.Publisher('komunikaty_robotow',String, queue_size=10)
        
    def isChargeStation(self,x,y):
        return (x >= 9.5 and y >= 9.5) or (x <= 1.5 and y <= 1.5)
        
    # odbieranie poleceń
    def callback_req(self,req):
        arguments = req.data.split(',')
        #if str(req.data) == "podlej":
        #    self.robots[0].go_to_goal(3,2)
            
        if arguments[0].strip() == "idz":
            x = float(arguments[1])
            y = float(arguments[2])
            komunikat = req.data.split(',',3)[3] 
            self.info_pub.publish(self.name+': '+ f"Zmierzam do punktu ({x},{y})")
            self.go_to(x,y)
            self.info_pub.publish(self.name+': '+ komunikat)
            if self.battery < 30 and not self.isChargeStation(x,y) and not "Rozpoczynam podlewanie" in komunikat:
                self.info_pub.publish(self.name+': '+ "Niski poziom baterii!")
        elif arguments[0].strip() == "spirala":
            self.info_pub.publish(self.name+': ' + "Robię spiralę")
            self.do_spiral(0,0)
            self.info_pub.publish(self.name+': ' + "Skończyłem robić spiralę")
        
    def go_to_goal(self, msg_srv):
        max_speed = 0.3 # dla liniowych i obrotowych
        tolerance = 0.05 # max odleglosc od punktu
        
        # obracanie się w stronę punktu
        while True:
            celTheta = math.atan2(msg_srv.y - self.position.y,msg_srv.x - self.position.x)
            celDelta = celTheta - self.position.theta
            if abs(celDelta) < tolerance*0.005:
                break
            # poprawiamy
            msg_vel = Twist()
            speed = np.clip(0.2*(abs(celDelta))+0.01,0,max_speed)
            if celDelta > 0:
                msg_vel.angular.z = speed
            else:
                msg_vel.angular.z = -speed
            if rospy.get_param(self.name+'/tryb_pracy')=='autonomiczny':
                self.velocity_pub.publish(msg_vel)
            else:
                msg_vel.angular.z=0
                self.velocity_pub.publish(msg_vel)
                return "Interrupted"
            
        # jazda do punktu
        while True:
            celDelta = math.sqrt((msg_srv.x - self.position.x)**2 + (msg_srv.y - self.position.y)**2)
            if celDelta < tolerance:
                break
            #poprawiamy
            msg_vel = Twist()
            msg_vel.linear.x = np.clip(0.25*celDelta+0.025,0,max_speed)
            if rospy.get_param(self.name+'/tryb_pracy')=='autonomiczny':
                self.velocity_pub.publish(msg_vel)
            else:
                msg_vel.linear.x=0
                self.velocity_pub.publish(msg_vel)
                return "Interrupted"
            
        return "finished"
    
    def do_spiral_goal(self, msg_srv):
        start = rospy.get_time()
        while True:
            time = rospy.get_time()-start
            if time > 20:
                break
            msg = Twist()
            msg.linear.x = 0.3
            msg.angular.z = 3/(time+1)
            if rospy.get_param(self.name+'/tryb_pracy')=='autonomiczny':
                self.velocity_pub.publish(msg)
            else:
                return "Interrupted"
            rospy.sleep(1)
        return "finished"    
        
    def callback_pose(self, data):
        #deltaX = data.x - self.position.x
        #deltaY = data.y - self.position.y
        #distance = math.sqrt( deltaX**2 + deltaY**2)
        if self.isChargeStation(data.x,data.y):
            self.battery += self.batteryCharge
        else:
            self.battery -= data.linear_velocity * self.batteryDrain
        self.battery = np.clip(self.battery,0,100)
        self.battery_pub.publish(self.battery)
        self.position = data
    
    def get_params(self):
        try:
            return rospy.get_param(self.name)
        except KeyError:
            print("Brak parametrow")
            
    def unregister(self):
        self.follow_service.shutdown()
        self.spiral_service.shutdown()
        self.pose_sub.unregister()
        self.req_sub.unregister()
        kill_robot(self.name)
        
# funkcja pomocnicza do kierowania robotem
def kieruj(robot,listaPunktow):
    klient = rospy.ServiceProxy(robot.name+'/go_to_goal',Task)
    for punkt in listaPunktow:
        klient(punkt.x,punkt.y)
        
class Punkt():
    x = 0.0
    y = 0.0
    def __init__(self,x,y):
        self.x = x
        self.y = y

In [9]:
# stworzenie robotów
roboty = [
    Robot("R1",1,3),
    Robot("R2",5,10.5),
    Robot("R3",10,8)
]

In [7]:
# ustawienie baterii na inną wartość niż 100%
for robot in roboty:
    robot.battery = random.randint(40,100)
    print(robot.battery)

89
80
40


In [8]:
# usunięcie robotów, serwisów i subscriberów z nimi związnych
for robot in roboty:
    robot.unregister()

[WARN] [1626005167.525960]: Inbound TCP/IP connection failed: connection from sender terminated before handshake header received. 0 bytes were received. Please check sender for additional details.
[WARN] [1626005167.573302]: Inbound TCP/IP connection failed: connection from sender terminated before handshake header received. 0 bytes were received. Please check sender for additional details.
