In [None]:
!pip install stable-baselines3 gym numpy
!pip install 'shimmy>=0.2.1'

In [None]:
%matplotlib inline



In [18]:
import gym
from gym import spaces
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output, display
from gazebo_msgs.msg import ModelStates, ModelState
from gazebo_msgs.srv import SetModelState
import rospy
import sys
from clover.srv import SetVelocity, SetVelocityRequest, SetRates, SetRatesRequest, Navigate, NavigateRequest
import numpy as np
from geometry_msgs.msg import Wrench, Vector3, PoseStamped
from mavros_msgs.srv import CommandBool, CommandTOL, SetMode
from std_msgs.msg import String
from mavros_msgs.msg import State
from std_srvs.srv import Empty
import tf.transformations as transformations
import math
import random 

class PositionControlEnv(gym.Env):
    metadata = {'render.modes': ['human']}
    def __init__(self):
        super(PositionControlEnv, self).__init__()

        # Define the low and high bounds for each action Kp, Kd, Ki
        low_bounds = np.array([0, 0, 0], dtype=np.float32)
        high_bounds = np.array([1, 1, 0.0001], dtype=np.float32)
        
        # Create the action space with custom bounds for each action
        self.action_space = spaces.Box(low=low_bounds, high=high_bounds, dtype=np.float32)
        

        # State: current position [-10, 10] + target position
        self.observation_space = spaces.Box(low=-1000, high=1000, shape=(3,), dtype=np.float32)

        self.max_step_per_episode = 512 
        self.flipped = False
        rospy.init_node('jupyter_velocity_controller', anonymous=True)
        # Create a ROS publisher on the vision_pose topic
        self.vision_pose_pub = rospy.Publisher('/mavros/vision_pose/pose', PoseStamped, queue_size=10)
        #self.position_subscriber = rospy.Subscriber('/mavros/local_position/pose', PoseStamped, self.position_callback)
        self.set_velocity_service = rospy.ServiceProxy('/set_velocity', SetVelocity)
        self.set_rates_service = rospy.ServiceProxy('/set_rates', SetRates)
        self.navigate_service = rospy.ServiceProxy('/navigate', Navigate)
        

    def reset(self):
        self.generate_random_positions()
        self.previous_error = np.array([0.0, 0.0, 0.0])
        self.sum_error = np.array([0.0, 0.0, 0.0])
        self.current_step = 0
        self.step_to_success = 0
        self.halfway = False
        self.set_velocity(0, 0, 0)
        rospy.sleep(3)
        self.set_rates(0, 0, 0, 0)
        
        if not self.flipped:
            self.land()
        else:
            self.flipped = False
        #self.reset_world()
        '''self.flipped = False
        self.set_rates(0, 0, 0, 0.55)
        rospy.sleep(2)'''
        state = np.array([np.linalg.norm(self.position_error), np.linalg.norm(self.position_error-self.previous_error), np.linalg.norm(self.sum_error+self.position_error)])
        return state

    def generate_random_positions(self):
        # Generate random coordinates
        x = random.uniform(-1, 9)
        y = random.uniform(-1, 9)
        z = random.uniform(0.3, 3)
        a = random.uniform(-1, 9)
        b = random.uniform(-1, 9)
        c = 0.1
        self.target_position = np.array([5, 5, 1.5])
        self.current_position = np.array([0, 0, 0.1])
        #self.target_position = np.array([x, y, z])
        #self.current_position = np.array([a, b, c])
        self.position_error = self.target_position - self.current_position
        self.distance = np.linalg.norm(self.position_error)
        print("Current start_point: ", self.current_position)
        print("Current target: ", self.target_position)
        

    def set_model_state(self):
        rospy.wait_for_service('/gazebo/set_model_state')
        try:
            set_state = rospy.ServiceProxy('/gazebo/set_model_state', SetModelState)

            state_msg = ModelState()
            state_msg.model_name = 'clover'
            state_msg.pose.position.x = self.current_position[0]  # Set your x position
            state_msg.pose.position.y = self.current_position[1]  # Set your y position
            state_msg.pose.position.z = self.current_position[2]  # Set your z position
            # Set orientation using either quaternion or roll, pitch, yaw
            state_msg.pose.orientation.x = 0.0
            state_msg.pose.orientation.y = 0.0
            state_msg.pose.orientation.z = 0.0
            state_msg.pose.orientation.w = 1.0
            
            resp = set_state(state_msg)
            print("Model state reset.")
        except rospy.ServiceException as e:
            print("Service set model state call failed: %s" % e)
    
    def land(self, land=True):
        rospy.wait_for_service('/mavros/cmd/land')
        try:
            land_service = rospy.ServiceProxy('/mavros/cmd/land', CommandTOL)
            land_service(0.0, 0.0, 0.0, 0.0, 0.0)
            print("Landing starts.")
            while self.check_arm_status() == True:
                rospy.sleep(1)
            rospy.sleep(5)
            print("Landing command completed.")
        except rospy.ServiceException as e:
            print("Service landing call failed: %s" % e)
    

    def check_arm_status(self):
        try:
            current_state = rospy.wait_for_message("/mavros/state", State, timeout=5)
            is_armed = current_state.armed
            return is_armed
            #print("Is the drone armed?", is_armed)
        
        except rospy.ROSException as e:
            print("Failed to get state message within timeout:", str(e))
        except rospy.ROSInterruptException:
            print("Node was shutdown")
        
    def reset_world(self):
        try:
            reset_world_service = rospy.ServiceProxy('/gazebo/reset_world', Empty)
            response = reset_world_service()
            rospy.sleep(2)
            print("World reset successful.", response)
        except rospy.ServiceException as e:
            print("World reset failed: %s" % e)
            
    def step(self, action):
        if self.current_step ==0:
            #self.set_rates(0, 0, 0, 0.579)
            #rospy.sleep(5)
            self.set_model_state()
            #self.current_position, _ = self.get_current_position()
            
        Kpid = action.reshape(-1)
        position_error = self.position_error
        #PID control logic 
        velocity = position_error * Kpid[0] + (position_error-self.previous_error)* Kpid[1] + (position_error + self.sum_error) * Kpid[2]
        normalized_v = np.array([x / (abs(x) + 1) for x in velocity]) # velocity ~ [-1,1]
        
        #print(Kpid)
        
        self.set_velocity(normalized_v[0], normalized_v[1],normalized_v[2])
        rospy.sleep(0.04)# need to be mverify: control frequency 250Hz?
        
        self.current_step += 1
        
        
        self.current_position, self.flipped = self.get_current_position()
        self.previous_error = position_error
        self.sum_error += position_error
        self.position_error = self.target_position - self.current_position # update position error after taking a step
        
        if self.flipped: #self.fail_threshold:
            self.set_rates(0, 0, 0, 0) #stop flipping
            reward = -1000 #self.fail_reward
            self.step_to_success = 0
            print("Drone tends to flip upside down, manual restart")
            done = True
            
        if not (-2 <= self.current_position[0] < 10) or not (-2 <= self.current_position[1] < 10) or self.current_position[2] > 4:
            reward = 0 #self.fail_reward
            self.step_to_success = 0
            print("Drone flies too far, manual restart")
            done = True
        elif np.linalg.norm(self.position_error) <= 0.25: #self.success_threshold:
            self.step_to_success += 1
            done = False
            reward = 0
            if self.step_to_success == 50:
                #average_speed  = self.distance / (self.current_step*0.04)
                average_speed  = self.distance / ((self.current_step-50)*0.04) 
                reward = math.exp(average_speed*10)
                print("Reach the goal! Average flying speed:", average_speed, "m/s")
                #done = True
            
        elif np.linalg.norm(self.position_error) < (self.distance / 2): 
            reward = 0
            if self.halfway == False:
                halfway_speed  = self.distance / (self.current_step*0.04)
                reward = math.exp(halfway_speed*5)
                self.halfway = True
                print("Half way done! Keep going")
            self.step_to_success = 0
            done = False
        else:
            self.step_to_success = 0
            reward = 0
            done = False

        if self.current_step >= self.max_step_per_episode: #flexible self.max_steps_per_episode:
            print("Episode ends")
            reward = 0
            done = True
        
        state = np.array([np.linalg.norm(self.position_error), np.linalg.norm(self.position_error-self.previous_error), np.linalg.norm(self.sum_error+self.position_error)])
        
        return state, reward, done, {}

    
    def close(self):
        if hasattr(self, 'figure'):
            plt.close(self.figure)

    #velocity controller
    def position_callback(self, msg):
        self.current_position = msg.pose
        #print("Position received:", self.current_position)
        
    def get_current_position(self):
        try:
            model_states = rospy.wait_for_message('/gazebo/model_states', ModelStates, timeout=5)
            PI_2 = math.pi / 6 #modified to prevent large angle
            #position_message = rospy.wait_for_message('/mavros/local_position/pose', PoseStamped, timeout=5)
            drone_index = model_states.name.index('clover')  
            position = model_states.pose[drone_index].position
            #print("Current position:", position.x, position.y, position.z)
            
            #check if the drone is flipped upside down
            orientation = model_states.pose[drone_index].orientation
            
            quaternion = [orientation.x, orientation.y, orientation.z, orientation.w]
            #angle = self.quaternion_to_inclined_angle(quaternion)

            roll, pitch, yaw = transformations.euler_from_quaternion(quaternion)
            flipped = abs(roll) > PI_2 or abs(pitch) > PI_2

            return  np.array([position.x, position.y, position.z]), flipped#, angle
        except rospy.ROSException as e:
            print("Failed to get position message within timeout:", str(e))
            return np.array([0, 0, 0]), True#, 0
        except rospy.ROSInterruptException:
            print("Node was shutdown")
            
    def quaternion_to_inclined_angle(self, q):
            # Normalize the quaternion
            q = q / np.linalg.norm(q)
            qw, qx, qy, qz = q
            
            # Transform the local up vector (0, 0, 1) using the quaternion
            # This effectively gives us the drone's up vector in global coordinates
            up_vector_transformed = np.array([
                2*(qx*qz + qw*qy),
                2*(qy*qz - qw*qx),
                2*(qw*qw - 0.5 + qz*qz)
            ])
            
            # Calculate the angle between the transformed up vector and the global up vector (0, 0, 1)
            # This is the inclined angle of the drone relative to the vertical axis
            dot_product = up_vector_transformed.dot(np.array([0, 0, 1]))
            inclined_angle = np.arccos(dot_product)
            
            # Convert the angle to degrees for easier interpretation
            inclined_angle_degrees = np.degrees(inclined_angle)
            
            return inclined_angle_degrees
        
    def set_velocity(self, vx, vy, vz):
        velocity_request = SetVelocityRequest()
        velocity_request.vx = vx
        velocity_request.vy = vy
        velocity_request.vz = vz
        velocity_request.frame_id = 'map'
        velocity_request.auto_arm = 1
        try:
            response = self.set_velocity_service(velocity_request)
            #print("velocity setpoint: ", vx, vy, vz)
            
        except rospy.ServiceException as e:
            print("Service call failed: %s" % e)

    def set_rates(self, r, p, y, t):
        rates_request = SetRatesRequest()
        rates_request.roll_rate = r
        rates_request.pitch_rate = p
        rates_request.yaw_rate = y
        rates_request.thrust = t
        rates_request.auto_arm = 1
        try:
            response = self.set_rates_service(rates_request)
            print("rates setpoint: ", r, p, y, t)
            #print("Service call successful. Response: ", response)
        except rospy.ServiceException as e:
            print("Service call failed: %s" % e)



In [19]:
import rospy
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from std_srvs.srv import Empty, EmptyRequest
from stable_baselines3.common.callbacks import BaseCallback, CheckpointCallback

class PauseSimulationCallback(BaseCallback):
    def __init__(self, verbose=0):
        super(PauseSimulationCallback, self).__init__(verbose)
        self.pause_service = rospy.ServiceProxy('/gazebo/pause_physics', Empty)
        self.unpause_service = rospy.ServiceProxy('/gazebo/unpause_physics', Empty)
        
    def _on_step(self) -> bool:
        return True  # return False if you want to stop the training

    def _on_rollout_start(self) -> None:
        # This method will be called at the start of each rollout
        self.unpause_simulation()

    def _on_rollout_end(self) -> None:
        # This method will be called at the end of each rollout,
        # i.e., after collecting a bunch of experience but before updating the policy.
        self.pause_simulation()

    def pause_simulation(self):
        try:
            self.pause_service(EmptyRequest())
            print("Simulation paused")
        except rospy.ServiceException as e:
            print("Failed to pause simulation: %s" % e)

    def unpause_simulation(self):
        try:
            self.unpause_service(EmptyRequest())
            print("Simulation unpaused")
        except rospy.ServiceException as e:
            print("Failed to unpause simulation: %s" % e)

# Usage
env = DummyVecEnv([lambda: PositionControlEnv()])
model = PPO("MlpPolicy", env, n_steps=512, verbose=1)

# Initialize the callback
pause_simulation_callback = PauseSimulationCallback()
checkpoint_callback = CheckpointCallback(save_freq=1024, save_path='./models/', name_prefix='PID_DRL_model')
# Combine the callbacks
callback_list = [pause_simulation_callback, checkpoint_callback]

# Start training
#model.learn(total_timesteps=10000, callback=callback_list)


Using cpu device


In [12]:
model = PPO.load("./models/PID_DRL_model_14000_steps", env=env, n_steps=512)
model.learn(total_timesteps=10000, callback=callback_list)


Current start_point:  [1.57521632 1.06912925 0.1       ]
Current target:  [-0.19573214  1.051712    2.05706316]
rates setpoint:  0 0 0 0
Landing starts.
Landing command completed.
Simulation unpaused
Model state reset.
Half way done! Keep going
Reach the goal! Average flying speed: 0.37492084498224443 m/s
Current start_point:  [0.44293743 4.12872124 0.1       ]
Current target:  [7.7714107  0.68974439 1.95006782]
rates setpoint:  0 0 0 0
Landing starts.
Landing command completed.
Model state reset.
Half way done! Keep going
Reach the goal! Average flying speed: 0.9065463874377099 m/s
Current start_point:  [3.20467973 8.49254782 0.1       ]
Current target:  [2.4290567  2.9679765  1.75562099]
rates setpoint:  0 0 0 0
Landing starts.
Landing command completed.
Model state reset.
Simulation paused
----------------------------
| time/              |     |
|    fps             | 3   |
|    iterations      | 1   |
|    time_elapsed    | 146 |
|    total_timesteps | 512 |
----------------------

KeyboardInterrupt: 

In [None]:
model = PPO.load("./models/PID_DRL_model_15000_steps", env=env, n_steps=512)
eval_env = PositionControlEnv()
eval_env.max_step_per_episode = 1000
obs = eval_env.reset()
step_to_success = 0
for _ in range(10000): 
    action, _states = model.predict(obs, deterministic=True)
    print(action)
    obs, reward, done, info = eval_env.step(action)
    if done:
        print("Game is Over", info)
        break
print("Game is Over", info)
eval_env.close()


Current start_point:  [0.  0.  0.1]
Current target:  [5.  5.  1.5]
