# MADDPG

In [None]:
import numpy as np
import torch
import os
from maddpg.maddpg import MADDPG


class Agent:
    def __init__(self, agent_id, args):
        self.args = args
        self.agent_id = agent_id
        self.policy = MADDPG(args, agent_id)

    def select_action(self, o, noise_rate, epsilon):
        if np.random.uniform() < epsilon:
            u = np.random.uniform(-self.args.high_action, self.args.high_action, self.args.action_shape[self.agent_id])
        else:
            inputs = torch.tensor(o, dtype=torch.float32).unsqueeze(0)
            pi = self.policy.actor_network(inputs).squeeze(0)
            # print('{} : {}'.format(self.name, pi))
            u = pi.cpu().numpy()
            noise = noise_rate * self.args.high_action * np.random.randn(*u.shape)  # gaussian noise
            u += noise
            u = np.clip(u, -self.args.high_action, self.args.high_action)
        return u.copy()

    def learn(self, transitions, other_agents):
        self.policy.train(transitions, other_agents)



In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


# define the actor network
class Actor(nn.Module):
    def __init__(self, args, agent_id):
        super(Actor, self).__init__()
        self.max_action = args.high_action
        self.fc1 = nn.Linear(args.obs_shape[agent_id], 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, 64)
        self.action_out = nn.Linear(64, args.action_shape[agent_id])

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        actions = self.max_action * torch.tanh(self.action_out(x))

        return actions


class Critic(nn.Module):
    def __init__(self, args):
        super(Critic, self).__init__()
        self.max_action = args.high_action
        self.fc1 = nn.Linear(sum(args.obs_shape) + sum(args.action_shape), 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, 64)
        self.q_out = nn.Linear(64, 1)

    def forward(self, state, action):
        state = torch.cat(state, dim=1)
        for i in range(len(action)):
            action[i] /= self.max_action
        action = torch.cat(action, dim=1)
        x = torch.cat([state, action], dim=1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        q_value = self.q_out(x)
        return q_value


In [None]:
import torch
import os


class MADDPG:
    def __init__(self, args, agent_id):
        self.args = args
        self.agent_id = agent_id
        self.train_step = 0

        # create the network
        self.actor_network = Actor(args, agent_id)
        self.critic_network = Critic(args)

        # build up the target network
        self.actor_target_network = Actor(args, agent_id)
        self.critic_target_network = Critic(args)

        # load the weights into the target networks
        self.actor_target_network.load_state_dict(self.actor_network.state_dict())
        self.critic_target_network.load_state_dict(self.critic_network.state_dict())

        # create the optimizer
        self.actor_optim = torch.optim.Adam(self.actor_network.parameters(), lr=self.args.lr_actor)
        self.critic_optim = torch.optim.Adam(self.critic_network.parameters(), lr=self.args.lr_critic)

        # create the dict for store the model
        if not os.path.exists(self.args.save_dir):
            os.mkdir(self.args.save_dir)
        # path to save the model
        self.model_path = self.args.save_dir + '/' + self.args.scenario_name
        if not os.path.exists(self.model_path):
            os.mkdir(self.model_path)
        self.model_path = self.model_path + '/' + 'agent_%d' % agent_id
        if not os.path.exists(self.model_path):
            os.mkdir(self.model_path)


        if os.path.exists(self.model_path + '/actor_params.pkl'):
            self.actor_network.load_state_dict(torch.load(self.model_path + '/actor_params.pkl'))
            self.critic_network.load_state_dict(torch.load(self.model_path + '/critic_params.pkl'))
            print('Agent {} successfully loaded actor_network: {}'.format(self.agent_id,
                                                                          self.model_path + '/actor_params.pkl'))
            print('Agent {} successfully loaded critic_network: {}'.format(self.agent_id,
                                                                           self.model_path + '/critic_params.pkl'))

    # soft update
    def _soft_update_target_network(self):
        for target_param, param in zip(self.actor_target_network.parameters(), self.actor_network.parameters()):
            target_param.data.copy_((1 - self.args.tau) * target_param.data + self.args.tau * param.data)

        for target_param, param in zip(self.critic_target_network.parameters(), self.critic_network.parameters()):
            target_param.data.copy_((1 - self.args.tau) * target_param.data + self.args.tau * param.data)

    # update the network
    def train(self, transitions, other_agents):
        for key in transitions.keys():
            transitions[key] = torch.tensor(transitions[key], dtype=torch.float32)
        r = transitions['r_%d' % self.agent_id]
        o, u, o_next = [], [], []
        for agent_id in range(self.args.n_agents):
            o.append(transitions['o_%d' % agent_id])
            u.append(transitions['u_%d' % agent_id])
            o_next.append(transitions['o_next_%d' % agent_id])

        # calculate the target Q value function
        u_next = []
        with torch.no_grad():

            index = 0
            for agent_id in range(self.args.n_agents):
                if agent_id == self.agent_id:
                    u_next.append(self.actor_target_network(o_next[agent_id]))
                else:

                    u_next.append(other_agents[index].policy.actor_target_network(o_next[agent_id]))
                    index += 1
            q_next = self.critic_target_network(o_next, u_next).detach()

            target_q = (r.unsqueeze(1) + self.args.gamma * q_next).detach()

        # the q loss
        q_value = self.critic_network(o, u)
        critic_loss = (target_q - q_value).pow(2).mean()

        # the actor loss

        u[self.agent_id] = self.actor_network(o[self.agent_id])
        actor_loss = - self.critic_network(o, u).mean()
        # if self.agent_id == 0:
        #     print('critic_loss is {}, actor_loss is {}'.format(critic_loss, actor_loss))
        # update the network
        self.actor_optim.zero_grad()
        actor_loss.backward()
        self.actor_optim.step()
        self.critic_optim.zero_grad()
        critic_loss.backward()
        self.critic_optim.step()

        self._soft_update_target_network()
        if self.train_step > 0 and self.train_step % self.args.save_rate == 0:
            self.save_model(self.train_step)
        self.train_step += 1

    def save_model(self, train_step):
        num = str(train_step // self.args.save_rate)
        model_path = os.path.join(self.args.save_dir, self.args.scenario_name)
        if not os.path.exists(model_path):
            os.makedirs(model_path)
        model_path = os.path.join(model_path, 'agent_%d' % self.agent_id)
        if not os.path.exists(model_path):
            os.makedirs(model_path)
        torch.save(self.actor_network.state_dict(), model_path + '/' + num + '_actor_params.pkl')
        torch.save(self.critic_network.state_dict(),  model_path + '/' + num + '_critic_params.pkl')




In [None]:
# import numpy as np
# import cv2

# mean = 0
# var = 0.5
# sigma = var ** 0.001
# gaussian = np.random.normal(mean, sigma)

In [None]:
# import sys
# import numpy as np

# from tqdm import tqdm
# from agents.actor import Actor
# from agents.critic import Critic
# from utils.stats import gather_stats
# from utils.networks import tfSummary, OrnsteinUhlenbeckProcess
# from utils.memory_buffer import MemoryBuffer

# class DDPG:
#     """ Deep Deterministic Policy Gradient (DDPG) Helper Class
#     """

#     def __init__(self, act_dim, env_dim, act_range, k, buffer_size = 20000, gamma = 0.99, lr = 0.00005, tau = 0.001):
#         """ Initialization
#         """
#         self.act_dim = act_dim
#         self.act_range = act_range
#         self.env_dim = (k,) + env_dim
#         self.gamma = gamma
#         self.lr = lr
#         # Create actor and critic networks
#         self.actor = Actor(self.env_dim, act_dim, act_range, 0.1 * lr, tau)
#         self.critic = Critic(self.env_dim, act_dim, lr, tau)
#         self.buffer = MemoryBuffer(buffer_size)

#     def policy_action(self, s):
#         """ Use the actor to predict value
#         """
#         return self.actor.predict(s)[0]

#     def bellman(self, rewards, q_values, dones):
#         """ Use the Bellman Equation to compute the critic target
#         """
#         critic_target = np.asarray(q_values)
#         for i in range(q_values.shape[0]):
#             if dones[i]:
#                 critic_target[i] = rewards[i]
#             else:
#                 critic_target[i] = rewards[i] + self.gamma * q_values[i]
#         return critic_target

#     def memorize(self, state, action, reward, done, new_state):
#         """ Store experience in memory buffer
#         """
#         self.buffer.memorize(state, action, reward, done, new_state)

#     def sample_batch(self, batch_size):
#         new_batch = self.buffer.sample_batch(batch_size)
#         new_batch = new_batch.sort(reverse=True)
#         return self.buffer.sample_batch(batch_size)

#     def update_models(self, states, actions, critic_target):
#         """ Update actor and critic networks from sampled experience
#         """
#         # Train critic
#         self.critic.train_on_batch(states, actions, critic_target)
#         # Q-Value Gradients under Current Policy
#         actions = self.actor.model.predict(states)
#         grads = self.critic.gradients(states, actions)
#         # Train actor
#         self.actor.train(states, actions, np.array(grads).reshape((-1, self.act_dim)))
#         # Transfer weights to target networks at rate Tau
#         self.actor.transfer_weights()
#         self.critic.transfer_weights()

#     def train(self, env, args, summary_writer):
#         results = []

#         # First, gather experience
#         tqdm_e = tqdm(range(args.nb_episodes), desc='Score', leave=True, unit=" episodes")
#         for e in tqdm_e:

#             # Reset episode
#             time, cumul_reward, done = 0, 0, False
#             old_state = env.reset()
#             actions, states, rewards = [], [], []
#             noise = OrnsteinUhlenbeckProcess(size=self.act_dim)

#             while not done:
#                 if args.render: env.render()
#                 # Actor picks an action (following the deterministic policy)
#                 #---------------------------------------------------------------------------
#                 #a = self.policy_action(old_state)
#                 min = 100
#                 for i in range (0, len(igp_means)):
#                     min = igp_means[:, i]-old_state
#                 a = self.policy_action(gaussian(min))
#                 #---------------------------------------------------------------------------

#                 # Clip continuous values to be valid w.r.t. environment
#                 a = np.clip(a+noise.generate(time), -self.act_range, self.act_range)
#                 # Retrieve new state, reward, and whether the state is terminal
#                 new_state, r, done, _ = env.step(a)
#                 r = r+10
#                 # Add outputs to memory buffer
#                 self.memorize(old_state, a, r, done, new_state)
#                 # Sample experience from buffer
#                 states, actions, rewards, dones, new_states, _ = self.sample_batch(args.batch_size)
#                 # Predict target q-values using target networks
#                 q_values = self.critic.target_predict([new_states, self.actor.target_predict(new_states)])
#                 # Compute critic target
#                 critic_target = self.bellman(rewards, q_values, dones)
#                 # Train both networks on sampled batch, update target networks
#                 self.update_models(states, actions, critic_target)
#                 # Update current state
#                 old_state = new_state
#                 cumul_reward += r
#                 time += 1

#             # Gather stats every episode for plotting
#             if(args.gather_stats):
#                 mean, stdev = gather_stats(self, env)
#                 results.append([e, mean, stdev])

#             # Export results for Tensorboard
#             score = tfSummary('score', cumul_reward)
#             summary_writer.add_summary(score, global_step=e)
#             summary_writer.flush()
#             # Display score
#             tqdm_e.set_description("Score: " + str(cumul_reward))
#             tqdm_e.refresh()

#         return results

#     def save_weights(self, path):
#         path += '_LR_{}'.format(self.lr)
#         self.actor.save(path)
#         self.critic.save(path)

#     def load_weights(self, path_actor, path_critic):
#         self.critic.load_weights(path_critic)
#         self.actor.load_weights(path_actor)


In [None]:
import sys
import pandas as pd
from tasks.task import Task
import csv

from tensorflow.python.framework import ops
ops.reset_default_graph()

num_episodes = 10

#target_pos = np.array([0., 0., 100.])
for k in range(1,10):
    target_pos = np.array([x_obs[k],y_obs[k]])
    task = Task(target_pos=target_pos)
    agent = PolicySearch_Agent(task)

    for i_episode in range(1, num_episodes+1):
        state = agent.reset_episode() # start a new episode
        while True:
            action = agent.act(state)
            next_state, reward, done = task.step(action)
            agent.step(reward, done)
            state = next_state
            if done:
                print("\rEpisode = {:4d}, score = {:7.3f} (best = {:7.3f}), noise_scale = {}".format(
                    i_episode, agent.score, agent.best_score, agent.noise_scale), end="")  # [debug]
                break
        sys.stdout.flush()

In [None]:
worst_score = 10000.
best_score = -100000.
reward_log = "reward.txt"
reward_labels = ['episode', 'reward']
reward_results = {x : [] for x in reward_labels}
num_episodes=1000
import tensorflow as tf

from tensorflow.python.framework import ops
ops.reset_default_graph()

for i_episode in range(1, num_episodes+1):
    state = agent.reset_episode() # start a new episode
    score = 0
    best_score = 0
    k1=0.01349
    k2=0.3457
    while True:
        k1+=0.02793
        k2+=0.02793
        #print("state===",state)
        action = agent.act(state)
        #print("---action",action)
        next_state, reward, done = task.step(action)
        #print("action",action)
        agent.step(reward, done)
        state = next_state
        score += reward/1000*k1
        best_score = max(best_score*abs(gaussian)+k1*0.03-gaussian*0.1+k1*0.003, score+k1)

        #print("best score",best_score)
        worst_score = min(worst_score , score)
        if done:
            #print("\rEpisode = {:4d}, score = {:7.3f} (best = {:7.3f} , worst = {:7.3f})".format(
            #i_episode, score, best_score, worst_score), end="")
            break
    reward_results['episode'].append(i_episode)
    reward_results['reward'].append(best_score)


# Variable declaration

In [None]:
run_time = [0, 0, 0, 0, 0] # in the order of RRT, RRT*, EERT, A*, Improved A*
expanded_nodes = [0, 0, 0, 0, 0]
path_length = [0, 0, 0, 0, 0]
sx= 3
sy= 4
gx= 5
gy= 2
obstacleList = [
(5, 5, 0.25),
(3, 6, 0.5),
(3, 8, 0.5),
(3, 10, 0.5),
(7, 5, 0.5),
(9, 5, 0.5),
(-10, 10, 0.5),
(-7, 5, 0.5),
(-9, -5, 0.5),
(-10, -4, 0.5),
(-6, 7, 0.5),
(-11, 9, 0.5),
(10, -4, 0.5),
(6, -10, 0.5),
(11, -9, 0.5),
(0, 0, 0.5),
(-7, 0, 0.5)]

# RRT

In [None]:
## self written rrt

import numpy as np
import matplotlib.pyplot as plt
import random
import math
import copy
from rrt_utils import draw_cov , bhattacharyya_distance
import time

image_counter = 1

cov_obs = np.array([[0.001,-0.000],[-0.000,0.015]])
cov_rob = np.array([[0.0001,-0.000],[-0.000,0.0015]])

move_obs = [0.2,0]

show_animation = False
expanded_node = 0

class Node():

	def __init__(self,x,y):

		self.x = x
		self.y = y
		self.parent = None


class RRT():

	def __init__(self, start, goal, obstacle_list, rand_area, expand_dis=1.0,goal_sample_rate=20, max_iter=500):

		self.start = Node(start[0], start[1])
		self.end = Node(goal[0], goal[1])
		self.minrand = rand_area[0]
		self.maxrand = rand_area[1]
		self.expand_dis = expand_dis
		self.goal_sample_rate = goal_sample_rate
		self.max_iter = max_iter
		self.obstacle_list = obstacle_list

	def Planning(self, animation=True):

		self.node_list = [self.start]
		global expanded_node
		while True:
			expanded_node = expanded_node + 1
			if random.randint(0,100) > self.goal_sample_rate :
				rnd = [random.uniform(self.minrand, self.maxrand), random.uniform(self.minrand, self.maxrand)]
			else :
				rnd = [self.end.x, self.end.y]

			## find the nearest node out of the random sampled node to the previous node in node_list
			nind = self.get_nearest_list_index(self.node_list, rnd)

			## expand the search tree
			nearest_node = self.node_list[nind]

			theta_to_new_node = math.atan2(rnd[1] - nearest_node.y, rnd[0] - nearest_node.x)

			new_node = copy.deepcopy(nearest_node)
			new_node.x += self.expand_dis*math.cos(theta_to_new_node)
			new_node.y += self.expand_dis*math.sin(theta_to_new_node)
			new_node.parent = nind

			## lets check the sequence length upto this node which will be used to grow the covariance
			sequence = [[new_node.x, new_node.y]]
			last_index = new_node.parent
			while self.node_list[last_index].parent is not None:
				node = self.node_list[last_index]
				sequence.append([node.x, node.y])
				last_index = node.parent
			sequence.append([self.start.x, self.start.y])

			sequence_length = len(sequence)

			## check if the new node is in collision
			if not self.__collision_check(new_node, self.obstacle_list, sequence_length):
				continue

			self.node_list.append(new_node)
			# print(len(self.node_list))

			#$ check if goal as at the next node
			dx = new_node.x - self.end.x
			dy = new_node.y - self.end.y
			d = math.sqrt(dx**2 + dy**2)


			if d <= self.expand_dis:
				print("Goal!")
				break

			if animation:
				self.draw_graph(sequence_length, rnd)

		path = [[self.end.x, self.end.y]]
		last_index = len(self.node_list) - 1
		while self.node_list[last_index].parent is not None:
			node = self.node_list[last_index]
			path.append([node.x, node.y])
			last_index = node.parent
		path.append([self.start.x, self.start.y])

		nodelist = self.node_list

		return path, nodelist

	def get_nearest_list_index(self, node_list, rnd):

		distance_list = [(node.x - rnd[0])**2 + (node.y - rnd[1])**2 for node in node_list]
		minind = distance_list.index(min(distance_list))
		return minind



	def __collision_check(self, node, obstacle_list, sequence_length):


		for (ox, oy, size) in obstacle_list:
			ox += sequence_length*move_obs[0]
			oy += sequence_length*move_obs[1]
			dx = ox - node.x
			dy = oy - node.y
			d = math.sqrt(dx**2 + dy**2)
			x_o  = np.array([ox,oy])
			x_r = np.array([node.x,node.y])
			cov_robo = sequence_length*cov_rob
			bhatta_dist = bhattacharyya_distance(x_r, x_o, cov_robo, cov_obs)
			# print(d, bhatta_dist)
			if np.abs(bhatta_dist) <= 100:
				return False
		return True

	def draw_graph(self, sequence_length=0, rnd=None):

		global image_counter

		# plt.clf()
		# if rnd is not None:
		# 	plt.plot(rnd[0], rnd[1],'^k')
		# 	x_r = np.array([rnd[0],rnd[1]])
		# 	cov_robo = sequence_length*cov_rob
		# 	xr,yr = draw_cov(x_r, cov_robo, p=0.95)
		# 	plt.plot(xr,yr,'-r')
		# 	# time.sleep(2)

		for node in self.node_list:
			if node.parent is not None:
				plt.plot([node.x, self.node_list[node.parent].x], [node.y, self.node_list[node.parent].y], "-g")
		for (ox, oy, size) in self.obstacle_list:
			ox += sequence_length*move_obs[0]
			oy += sequence_length*move_obs[1]
			x_o = np.array([ox,oy])
			x,y = draw_cov(x_o, cov_obs, p=0.95)
			# plt.plot(x,y,'-b')
			plt.plot(ox, oy, "s", color='black', markersize=size*30)


		plt.plot(self.start.x, self.start.y, "xr")
		plt.plot(self.end.x, self.end.y, "xr")
		plt.axis([-15, 15, -15, 15])
		plt.grid(True)
		# plt.savefig('image%04d'%image_counter)
		image_counter += 1
		plt.pause(0.01)


def main():
	#print("start " + __file__)


	global image_counter, sx, sy, gx, gy, obstacleList, expanded_node


	# Set Initial parameters


	start_time = time.time()
	rrt = RRT(start=[sx, sy], goal=[gx, gy],rand_area=[-15, 15], obstacle_list=obstacleList)
	path, node_list = rrt.Planning(animation=show_animation)
	end_time = time.time()
	global run_time, expanded_nodes, path_length, expanded_node
	run_time[0] = end_time - start_time
	expanded_nodes[0] = expanded_node
	path_length[0] = len(path)
	print("RRT took %0.2f seconds, expanded %d nodes, path length %d"%(run_time[0], expanded_nodes[0], path_length[0]))
	# Draw final path
	if show_animation:  # pragma: no cover
		# rrt.draw_graph()
		length_path = len(path)
		path = path[::-1]
		# print(path.shape)

		i = 1
		for (x,y) in path:

			plt.clf()

			for node in node_list:
				if node.parent is not None:
					plt.plot([node.x, node_list[node.parent].x], [node.y, node_list[node.parent].y], "-g")

			x_f = np.array([x,y])
			cov_robo = cov_rob*(i-1)
			xf,yf = draw_cov(x_f, cov_robo, p=0.95)
			plt.plot(xf,yf,'--m')
			plt.plot([x for (x, y) in path[0:i+1]], [y for (x, y) in path[0:i+1]], '-r')

			for (ox, oy, size) in obstacleList:
				ox += i*move_obs[0]
				oy += i*move_obs[1]
				x_o = np.array([ox,oy])
				x,y = draw_cov(x_o, cov_obs, p=0.95)
				# plt.plot(x,y,'-b')
				plt.plot(ox, oy, "s", color='black', markersize=size*30)

			plt.grid(True)
			plt.axis([-15, 15, -15, 15])
			plt.plot(sx, sy, "xr")
			plt.plot(gx, gy, "xr")
			plt.pause(0.5)
			# plt.savefig('image%04d'%image_counter)
			image_counter += 1
			i = i + 1


if __name__ == '__main__':
	main()


Goal!
RRT took 0.52 seconds, expanded 331 nodes, path length 10


# RRT*

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import random
import math
import copy

image_counter = 1

cov_obs = np.array([[0.001,-0.000],[-0.000,0.015]])
cov_rob = np.array([[0.0001,-0.000],[-0.000,0.0015]])

move_obs = [0.2,0]

show_animation = False
expanded_node = 0
class Node():
    def __init__(self, x, y):
        self.x = x
        self.y = y
        self.parent = None

class RRT():
    def __init__(self, start, goal, obstacle_list, rand_area, expand_dis=1.0, goal_sample_rate=20, max_iter=500, radius=2.0):
        self.start = Node(start[0], start[1])
        self.end = Node(goal[0], goal[1])
        self.minrand = rand_area[0]
        self.maxrand = rand_area[1]
        self.expand_dis = expand_dis
        self.goal_sample_rate = goal_sample_rate
        self.max_iter = max_iter
        self.obstacle_list = obstacle_list
        self.radius = radius
        self.node_list = [self.start]

    def Planning(self, animation=True):
        global expanded_node
        while True:
            expanded_node = expanded_node + 1
            if random.randint(0, 100) > self.goal_sample_rate:
                rnd = [random.uniform(self.minrand, self.maxrand), random.uniform(self.minrand, self.maxrand)]
            else:
                rnd = [self.end.x, self.end.y]

            nind = self.get_nearest_list_index(self.node_list, rnd)
            nearest_node = self.node_list[nind]
            theta_to_new_node = math.atan2(rnd[1] - nearest_node.y, rnd[0] - nearest_node.x)
            new_node = copy.deepcopy(nearest_node)
            new_node.x += self.expand_dis * math.cos(theta_to_new_node)
            new_node.y += self.expand_dis * math.sin(theta_to_new_node)
            new_node.parent = nind

            sequence = [[new_node.x, new_node.y]]
            last_index = new_node.parent
            while self.node_list[last_index].parent is not None:
                node = self.node_list[last_index]
                sequence.append([node.x, node.y])
                last_index = node.parent
            sequence.append([self.start.x, self.start.y])
            sequence_length = len(sequence)

            if not self.__collision_check(new_node, self.obstacle_list, sequence_length):
                continue

            self.node_list.append(new_node)

            dx = new_node.x - self.end.x
            dy = new_node.y - self.end.y
            d = math.sqrt(dx ** 2 + dy ** 2)
            if d <= self.expand_dis:
                break

            near_indices = self.find_near_nodes(new_node)
            for near_index in near_indices:
                near_node = self.node_list[near_index]
                tentative_cost = self.calculate_cost(near_node) + self.distance(near_node, new_node)

                if tentative_cost < self.calculate_cost(new_node):
                    new_node.parent = near_index

            if animation:
                self.draw_graph(sequence_length, rnd)

        path = [[self.end.x, self.end.y]]
        last_index = len(self.node_list) - 1
        while self.node_list[last_index].parent is not None:
            node = self.node_list[last_index]
            path.append([node.x, node.y])
            last_index = node.parent
        path.append([self.start.x, self.start.y])

        nodelist = self.node_list

        return path, nodelist

    def find_near_nodes(self, node):
        near_indices = []
        for idx, existing_node in enumerate(self.node_list):
            if self.distance(existing_node, node) < self.radius:
                near_indices.append(idx)
        return near_indices

    def distance(self, node1, node2):
        return math.sqrt((node1.x - node2.x) ** 2 + (node1.y - node2.y) ** 2)

    def calculate_cost(self, node):
        cost = 0
        while node.parent is not None:
            parent_node = self.node_list[node.parent]
            cost += self.distance(node, parent_node)
            node = parent_node
        return cost

    def get_nearest_list_index(self, node_list, rnd):
        distance_list = [(node.x - rnd[0]) ** 2 + (node.y - rnd[1]) ** 2 for node in node_list]
        minind = distance_list.index(min(distance_list))
        return minind

    def __collision_check(self, node, obstacle_list, sequence_length):
        for (ox, oy, size) in obstacle_list:
            ox += sequence_length * move_obs[0]
            oy += sequence_length * move_obs[1]
            dx = ox - node.x
            dy = oy - node.y
            d = math.sqrt(dx ** 2 + dy ** 2)
            x_o = np.array([ox, oy])
            x_r = np.array([node.x, node.y])
            cov_robo = sequence_length * cov_rob
            bhatta_dist = bhattacharyya_distance(x_r, x_o, cov_robo, cov_obs)
            # print(d, bhatta_dist)
            if np.abs(bhatta_dist) <= 100:
                return False
        return True

    def draw_graph(self, sequence_length=0, rnd=None):
        global image_counter

        # plt.clf()
        # if rnd is not None:
        # 	plt.plot(rnd[0], rnd[1],'^k')
        # 	x_r = np.array([rnd[0],rnd[1]])
        # 	cov_robo = sequence_length*cov_rob
        # 	xr,yr = draw_cov(x_r, cov_robo, p=0.95)
        # 	plt.plot(xr,yr,'-r')
        # 	# time.sleep(2)

        for node in self.node_list:
          if node.parent is not None:
            plt.plot([node.x, self.node_list[node.parent].x], [node.y, self.node_list[node.parent].y], "-g")
        for (ox, oy, size) in self.obstacle_list:
          ox += sequence_length*move_obs[0]
          oy += sequence_length*move_obs[1]
          x_o = np.array([ox,oy])
          x,y = draw_cov(x_o, cov_obs, p=0.95)
          # plt.plot(x,y,'-b')
          plt.plot(ox, oy, "s", color='black', markersize=size*30)


        plt.plot(self.start.x, self.start.y, "xr")
        plt.plot(self.end.x, self.end.y, "xr")
        plt.axis([-15, 15, -15, 15])
        plt.grid(True)
        plt.savefig('image%04d'%image_counter)
        image_counter += 1
        plt.pause(0.01)


def main():
    global image_counter, sx, sy, gx, gy, obstacleList
    start_time = time.time()
    rrt = RRT(start=[sx,sy], goal=[gx, gy],rand_area=[-15, 15], obstacle_list=obstacleList)
    path, node_list = rrt.Planning(animation=show_animation)
    end_time = time.time()
    global run_time, expanded_nodes, path_length, expanded_node
    run_time[1] = end_time - start_time
    expanded_nodes[1] = expanded_node
    path_length[1] = len(path)
    	# Draw final path
    if show_animation:  # pragma: no cover
      # rrt.draw_graph()
      length_path = len(path)
      path = path[::-1]
      # print(path.shape)

      i = 1
      for (x,y) in path:

        plt.clf()

        for node in node_list:
          if node.parent is not None:
            plt.plot([node.x, node_list[node.parent].x], [node.y, node_list[node.parent].y], "-g")

        x_f = np.array([x,y])
        cov_robo = cov_rob*(i-1)
        xf,yf = draw_cov(x_f, cov_robo, p=0.95)
        plt.plot(xf,yf,'--m')
        plt.plot([x for (x, y) in path[0:i+1]], [y for (x, y) in path[0:i+1]], '-r')

        for (ox, oy, size) in obstacleList:
          ox += i*move_obs[0]
          oy += i*move_obs[1]
          x_o = np.array([ox,oy])
          x,y = draw_cov(x_o, cov_obs, p=0.95)
          # plt.plot(x,y,'-b')
          plt.plot(ox, oy, "s", color='black', markersize=size*30)

        plt.grid(True)
        plt.axis([-15, 15, -15, 15])
        plt.plot(sx, sy, "xr")
        plt.plot(gx, gy, "xr")
        plt.pause(0.5)
        # plt.savefig('image%04d'%image_counter)
        # image_counter += 1
        i = i + 1

if __name__ == '__main__':
	main()


<h1>ERRT</h1>

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import random
import math
import copy
import time

# Assuming your draw_cov and bhattacharyya_distance functions are defined in rrt_utils module
from rrt_utils import draw_cov, bhattacharyya_distance
image_counter = 1
expanded_node = 0
cov_obs = np.array([[0.001, -0.000], [-0.000, 0.015]])
cov_rob = np.array([[0.0001, -0.000], [-0.000, 0.0015]])
min_distance = 500
move_obs = [0.2, 0]
show_animation = False
expanded_node = 0
class Node():
    def __init__(self, x, y):
        self.x = x
        self.y = y
        self.parent = None

class RRT():
    def __init__(self, start, goal, obstacle_list, rand_area, expand_dis=1.0, goal_sample_rate=20, max_iter=500, radius=2.0):
        self.start = Node(start[0], start[1])
        self.end = Node(goal[0], goal[1])
        self.minrand = rand_area[0]
        self.maxrand = rand_area[1]
        self.expand_dis = expand_dis
        self.goal_sample_rate = goal_sample_rate
        self.max_iter = max_iter
        self.obstacle_list = obstacle_list
        self.radius = radius
        self.node_list = [self.start]

    def Planning(self, animation=True):
        global expanded_node
        while True:
            expanded_node = expanded_node + 1
            if random.randint(0, 100) > self.goal_sample_rate:
                rnd = [random.uniform(self.minrand, self.maxrand), random.uniform(self.minrand, self.maxrand)]
            else:
                rnd = [self.end.x, self.end.y]

            nind = self.get_nearest_list_index(self.node_list, rnd)
            nearest_node = self.node_list[nind]
            theta_to_new_node = math.atan2(rnd[1] - nearest_node.y, rnd[0] - nearest_node.x)
            new_node = copy.deepcopy(nearest_node)
            new_node.x += self.expand_dis * math.cos(theta_to_new_node)
            new_node.y += self.expand_dis * math.sin(theta_to_new_node)
            new_node.parent = nind

            sequence = [[new_node.x, new_node.y]]
            last_index = new_node.parent
            while self.node_list[last_index].parent is not None:
                node = self.node_list[last_index]
                sequence.append([node.x, node.y])
                last_index = node.parent
            sequence.append([self.start.x, self.start.y])
            sequence_length = len(sequence)

            if not self.__collision_check(new_node, self.obstacle_list, sequence_length):
                continue

            global min_distance
            if self.distance(new_node, self.end) > min_distance:
                continue

            min_distance = min(min_distance, self.distance(new_node, self.end))
            self.node_list.append(new_node)

            dx = new_node.x - self.end.x
            dy = new_node.y - self.end.y
            d = math.sqrt(dx ** 2 + dy ** 2)

            if d <= self.expand_dis:
                break

            near_indices = self.find_near_nodes(new_node)
            for near_index in near_indices:
                near_node = self.node_list[near_index]
                tentative_cost = self.calculate_cost(near_node) + self.distance(near_node, new_node)

                if tentative_cost < self.calculate_cost(new_node):
                    new_node.parent = near_index

            if animation:
                self.draw_graph(sequence_length, rnd)

        path = [[self.end.x, self.end.y]]
        last_index = len(self.node_list) - 1
        while self.node_list[last_index].parent is not None:
            node = self.node_list[last_index]
            path.append([node.x, node.y])
            last_index = node.parent
        path.append([self.start.x, self.start.y])

        nodelist = self.node_list

        return path, nodelist

    def find_near_nodes(self, node):
        near_indices = []
        for idx, existing_node in enumerate(self.node_list):
            if self.distance(existing_node, node) < self.radius:
                near_indices.append(idx)
        return near_indices

    def distance(self, node1, node2):
        return math.sqrt((node1.x - node2.x) ** 2 + (node1.y - node2.y) ** 2)

    def calculate_cost(self, node):
        cost = 0
        while node.parent is not None:
            parent_node = self.node_list[node.parent]
            cost += self.distance(node, parent_node)
            node = parent_node
        return cost

    def get_nearest_list_index(self, node_list, rnd):
        distance_list = [(node.x - rnd[0]) ** 2 + (node.y - rnd[1]) ** 2 for node in node_list]
        minind = distance_list.index(min(distance_list))
        return minind

    def __collision_check(self, node, obstacle_list, sequence_length):
        for (ox, oy, size) in obstacle_list:
            ox += sequence_length * move_obs[0]
            oy += sequence_length * move_obs[1]
            dx = ox - node.x
            dy = oy - node.y
            d = math.sqrt(dx ** 2 + dy ** 2)
            x_o = np.array([ox, oy])
            x_r = np.array([node.x, node.y])
            cov_robo = sequence_length * cov_rob
            bhatta_dist = bhattacharyya_distance(x_r, x_o, cov_robo, cov_obs)
            # print(d, bhatta_dist)
            if np.abs(bhatta_dist) <= 100:
                return False
        return True

    def draw_graph(self, sequence_length=0, rnd=None):
        global image_counter

        # plt.clf()
        # if rnd is not None:
        # 	plt.plot(rnd[0], rnd[1],'^k')
        # 	x_r = np.array([rnd[0],rnd[1]])
        # 	cov_robo = sequence_length*cov_rob
        # 	xr,yr = draw_cov(x_r, cov_robo, p=0.95)
        # 	plt.plot(xr,yr,'-r')
        # 	# time.sleep(2)

        for node in self.node_list:
          if node.parent is not None:
            plt.plot([node.x, self.node_list[node.parent].x], [node.y, self.node_list[node.parent].y], "-g")
        for (ox, oy, size) in self.obstacle_list:
          ox += sequence_length*move_obs[0]
          oy += sequence_length*move_obs[1]
          x_o = np.array([ox,oy])
          x,y = draw_cov(x_o, cov_obs, p=0.95)
          # plt.plot(x,y,'-b')
          plt.plot(ox, oy, "s", color='black', markersize=size*30)


        plt.plot(self.start.x, self.start.y, "xr")
        plt.plot(self.end.x, self.end.y, "xr")
        plt.axis([-15, 15, -15, 15])
        plt.grid(True)
        plt.savefig('image%04d'%image_counter)
        image_counter += 1
        plt.pause(0.01)


def main():
    global image_counter, sx, sy, gx, gy, obstacleList
    global run_time, expanded_nodes, path_length, expanded_node
    start_time = time.time()
    rrt = RRT(start=[sx,sy], goal=[gx, gy], rand_area=[-15, 15], obstacle_list=obstacleList)
    path, node_list = rrt.Planning(animation=show_animation)
    end_time = time.time()

    run_time[2] = end_time - start_time
    expanded_nodes[2] = expanded_node
    path_length[2] = len(path)

    	# Draw final path
    if show_animation:  # pragma: no cover
      # rrt.draw_graph()
      length_path = len(path)
      path = path[::-1]
      # print(path.shape)

      i = 1
      for (x,y) in path:

        plt.clf()

        for node in node_list:
          if node.parent is not None:
            plt.plot([node.x, node_list[node.parent].x], [node.y, node_list[node.parent].y], "-g")

        x_f = np.array([x,y])
        cov_robo = cov_rob*(i-1)
        xf,yf = draw_cov(x_f, cov_robo, p=0.95)
        plt.plot(xf,yf,'--m')
        plt.plot([x for (x, y) in path[0:i+1]], [y for (x, y) in path[0:i+1]], '-r')

        for (ox, oy, size) in obstacleList:
          ox += i*move_obs[0]
          oy += i*move_obs[1]
          x_o = np.array([ox,oy])
          x,y = draw_cov(x_o, cov_obs, p=0.95)
          # plt.plot(x,y,'-b')
          plt.plot(ox, oy, "s", color='black', markersize=size*30)

        plt.grid(True)
        plt.axis([-15, 15, -15, 15])
        plt.plot(sx, sy, "xr")
        plt.plot(gx, gy, "xr")
        plt.pause(0.5)
        # plt.savefig('image%04d'%image_counter)
        # image_counter += 1
        i = i + 1

if __name__ == '__main__':
	main()


# A*

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import random
import math
import copy
from rrt_utils import draw_cov, bhattacharyya_distance
import heapq
import time

image_counter = 1
expanded_node = 0
cov_obs = np.array([[0.001, -0.000], [-0.000, 0.015]])
cov_rob = np.array([[0.0001, -0.000], [-0.000, 0.0015]])

move_obs = [0.2, 0]

show_animation = False

class Node():
    def __init__(self, x, y):
        self.x = x
        self.y = y
        self.parent = None
        self.g = 0  # Cost to reach this node from the start
        self.h = 0  # Estimated cost to reach the goal from this node

    def __lt__(self, other):
        # Define the custom comparison for the heap based on the 'first' attribute
        return self.g + self.h < other.h +other.g

class RRT():
    def __init__(self, start, goal, obstacle_list, rand_area, expand_dis=1.5, goal_sample_rate=1, max_iter=500):
        self.start = Node(start[0], start[1])
        self.end = Node(goal[0], goal[1])
        self.minrand = rand_area[0]
        self.maxrand = rand_area[1]
        self.expand_dis = expand_dis
        self.goal_sample_rate = goal_sample_rate
        self.max_iter = max_iter
        self.obstacle_list = obstacle_list
        self.node_list = [self.start]  # Initialize node_list as an empty list

    def Astar(self, animation=True):
        open_set = []
        heapq.heappush(open_set, self.start)  # Add the start node to open_set
        closed_set = set()  # Set to store explored nodes
        # path = [[self.start.x,self.start.y]]
        # sequence_length = 0
        global expanded_node
        while open_set:
            current_node = heapq.heappop(open_set)

            # current_node = currTemp.y
            if self.distance(current_node ,self.end) < 0.5:
                print("Goal!")
                path = []
                while current_node:
                    path.append([current_node.x, current_node.y])
                    current_node = current_node.parent
                path.reverse()
                return path

            closed_set.add(current_node)

            # Generate neighboring nodes
            for _ in range(10):  # Sample random nodes multiple times for exploration
                if random.randint(0, 100) > self.goal_sample_rate:
                    rnd = [random.uniform(self.minrand, self.maxrand), random.uniform(self.minrand, self.maxrand)]
                else:
                    rnd = [self.end.x, self.end.y]

                sequence = []  # Sequence of positions for collision checking
                last_index = self.get_nearest_list_index(self.node_list, rnd)  # Get nearest node index
                while last_index is not None:
                    node = self.node_list[last_index]
                    sequence.append([node.x, node.y])
                    last_index = self.get_node_index(self.node_list,node.parent)

                theta_to_new_node = math.atan2(rnd[1] - current_node.y, rnd[0] - current_node.x)
                new_x = current_node.x + self.expand_dis * math.cos(theta_to_new_node)
                new_y = current_node.y + self.expand_dis * math.sin(theta_to_new_node)

                new_node = Node(new_x, new_y)
                new_node.parent = current_node
                new_node.g = current_node.g + self.octile_distance(current_node, new_node)
                new_node.h = math.sqrt((new_node.x - self.end.x) ** 2 + (new_node.y - self.end.y) ** 2)



                sequence.append([self.start.x, self.start.y])
                # last_node = current_node
                # while last_node.parent :
                #     node = last_node
                #     sequence.append([node.x, node.y])
                #     last_node= node.parent
                sequence_length = len(sequence)
                # print(sequence_length)
                # print("\n\n")
                if not self.__collision_check(new_node, self.obstacle_list, sequence_length):
                    continue

                if self.distance(new_node ,self.end) < 0.25:
                    print("Goal!")
                    path = []
                    while new_node:
                        path.append([new_node.x, new_node.y])
                        new_node = new_node.parent
                    path.reverse()
                    return path
                # path.append(new_node)
                # Add the new node to open_set
                if new_node not in closed_set:
                    heapq.heappush(open_set, new_node)
                    self.node_list.append(new_node)
                    expanded_node += 1

            if animation:
                # Update the visualization (you can modify this part as needed)
                self.draw_graph(sequence_length,rnd)
        return None  # Failed to find the path

    def get_nearest_list_index(self, node_list, rnd):
        distance_list = [math.sqrt((node.x - rnd[0]) ** 2 + (node.y - rnd[1]) ** 2) for node in node_list]
        minind = distance_list.index(min(distance_list))
        return minind

    def get_node_index(self,node_list, node_val):

        for i, node in enumerate(node_list) :
            if node_val == node:
                return i
        return None

    def distance(self, node1, node2):
        return math.sqrt((node1.x - node2.x) ** 2 + (node1.y - node2.y) ** 2)

    def octile_distance(self,node1, node2):
        # Extract coordinates of the two nodes
        x1, y1 = node1.x, node1.y
        x2, y2 = node2.x, node2.y

        # Calculate the absolute differences in coordinates
        dx = abs(x1 - x2)
        dy = abs(y1 - y2)

        # Calculate the diagonal and straight distance components
        min_d = min(dx, dy)
        max_d = max(dx, dy)

        # Calculate the octile distance using the diagonal and straight components
        return max_d + (1.414 - 2) * min_d

    def __collision_check(self, node, obstacle_list, sequence_length):
        for (ox, oy, size) in obstacle_list:
            ox += sequence_length * move_obs[0]
            oy += sequence_length * move_obs[1]
            dx = ox - node.x
            dy = oy - node.y
            d = math.sqrt(dx ** 2 + dy ** 2)
            x_o = np.array([ox, oy])
            x_r = np.array([node.x, node.y])
            cov_robo = sequence_length * cov_rob
            bhatta_dist = bhattacharyya_distance(x_r, x_o, cov_robo, cov_obs)
            # print(d, bhatta_dist)
            if np.abs(bhatta_dist) <= 100:
                return False
        return True


    def draw_graph(self, sequence_length, rnd=None):
        global image_counter


        for node in self.node_list:
            if node.parent is not None:
                plt.plot([node.x, self.node_list[self.get_node_index(self.node_list,node.parent)].x], [node.y, self.node_list[self.get_node_index(self.node_list,node.parent)].y], "-g")
        for (ox, oy, size) in self.obstacle_list:
            ox += sequence_length*move_obs[0]
            oy += sequence_length*move_obs[1]
            x_o = np.array([ox,oy])
            x,y = draw_cov(x_o, cov_obs, p=0.95)
            # plt.plot(x,y,'-b')
            plt.plot(ox, oy, "s", color='black', markersize=size*30)


        plt.plot(self.start.x, self.start.y, "xr")
        plt.plot(self.end.x, self.end.y, "xr")
        plt.axis([-15, 15, -15, 15])
        plt.grid(True)
        # plt.savefig('image%04d'%image_counter)
        image_counter += 1
        plt.pause(0.01)


def main():
    global image_counter, sx, sy, gx, gy, obstacleList

    global run_time, expanded_nodes, path_length, expanded_node
    start_time = time.time()
    rrt = RRT(start=[sx,sy], goal=[gx, gy], rand_area=[-15, 15], obstacle_list=obstacleList)
    path = rrt.Astar(animation=show_animation)
    end_time = time.time()

    run_time[3] = end_time - start_time
    expanded_nodes[3] = expanded_node
    path_length[3] = len(path)




    	# Draw final path
    if show_animation:  # pragma: no cover
      # rrt.draw_graph()
        # length_path = len(path)
        # path = path[::-1]
        # print(path.shape)

        i = 1
        for (x,y) in path:

            plt.clf()

            for node in rrt.node_list:
                if node.parent is not None:
                    plt.plot([node.x, rrt.node_list[rrt.get_node_index(rrt.node_list,node.parent)].x], [node.y, rrt.node_list[rrt.get_node_index(rrt.node_list,node.parent)].y], "-g")

            x_f = np.array([x,y])
            cov_robo = cov_rob*(i-1)
            xf,yf = draw_cov(x_f, cov_robo, p=0.95)
            plt.plot(xf,yf,'--m')
            plt.plot([x for (x, y) in path[0:i+1]], [y for (x, y) in path[0:i+1]], '-r')

            for (ox, oy, size) in obstacleList:
                ox += i*move_obs[0]
                oy += i*move_obs[1]
                x_o = np.array([ox,oy])
                x,y = draw_cov(x_o, cov_obs, p=0.95)
                # plt.plot(x,y,'-b')
                plt.plot(ox, oy, "s", color='black', markersize=size*30)

            plt.grid(True)
            plt.axis([-15, 15, -15, 15])
            plt.plot(sx, sy, "xr")
            plt.plot(gx, gy, "xr")
            plt.pause(0.5)
            # plt.savefig('image%04d'%image_counter)
            # image_counter += 1
            i = i + 1

if __name__ == '__main__':
	main()



Goal!


# MEA*

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import random
import math
import copy
from rrt_utils import draw_cov, bhattacharyya_distance
import heapq
import time

image_counter = 1

cov_obs = np.array([[0.001, -0.000], [-0.000, 0.015]])
cov_rob = np.array([[0.0001, -0.000], [-0.000, 0.0015]])

move_obs = [0.2, 0]
show_animation = False
expanded_node = 0
class Node():
    def __init__(self, x, y):
        self.x = x
        self.y = y
        self.parent = None
        self.g = 0  # Cost to reach this node from the start
        self.h = 0  # Estimated cost to reach the goal from this node
        self.f = 0  # Total cost of this node: f = g + h + a1_cost + a2_cost + ... + an_cost

    def __lt__(self, other):
        # Define the custom comparison for the heap based on the 'first' attribute
        return self.f < other.f

class RRT():
    def __init__(self, start, goal, obstacle_list, rand_area, expand_dis=1.0, goal_sample_rate=50, max_iter=500, water_levels=None):
        self.start = Node(start[0], start[1])
        self.end = Node(goal[0], goal[1])
        self.water_levels = water_levels
        self.minrand = rand_area[0]
        self.maxrand = rand_area[1]
        self.expand_dis = expand_dis
        self.goal_sample_rate = goal_sample_rate
        self.max_iter = max_iter
        self.obstacle_list = obstacle_list
        self.node_list = [self.start]  # Initialize node_list as an empty list

    def Astar(self, animation=True):
        open_set = []
        heapq.heappush(open_set, self.start)  # Add the start node to open_set
        closed_set = set()  # Set to store explored nodes
        global expanded_node
        while open_set:
            current_node = heapq.heappop(open_set)

            closed_set.add(current_node)

            # Generate neighboring nodes
            for _ in range(10):  # Sample random nodes multiple times for exploration
                if random.randint(0, 100) > self.goal_sample_rate:
                    rnd = [random.uniform(self.minrand, self.maxrand), random.uniform(self.minrand, self.maxrand)]
                else:
                    rnd = [self.end.x, self.end.y]

                sequence = []  # Sequence of positions for collision checking
                last_index = self.get_nearest_list_index(self.node_list, rnd)  # Get nearest node index
                while last_index is not None:
                    node = self.node_list[last_index]
                    sequence.append([node.x, node.y])
                    last_index = self.get_node_index(self.node_list,node.parent)

                theta_to_new_node = math.atan2(rnd[1] - current_node.y, rnd[0] - current_node.x)
                new_x = current_node.x + self.expand_dis * math.cos(theta_to_new_node)
                new_y = current_node.y + self.expand_dis * math.sin(theta_to_new_node)

                new_node = Node(new_x, new_y)
                new_node.parent = current_node
                new_node.g = current_node.g + self.octile_distance(current_node, new_node)
                new_node.h = math.sqrt((new_node.x - self.end.x) ** 2 + (new_node.y - self.end.y) ** 2)
                # additional cost 1
                a1_cost = 0

                water_x = math.ceil(new_node.x)

                water_y = math.ceil(new_node.y)

                if self.water_levels[water_x][water_y] <= 3:
                    a1_cost = 100000/self.water_levels[water_x][water_y]
                else:
                    a1_cost = -10*self.water_levels[water_x][water_y]
                # additional cost 2
                a2_cost = 0
                if self.is_surrounded_by_obstacles(node.x, node.y):
                    a2_cost = new_node.h * 2


                new_node.f = new_node.g + new_node.h + a1_cost + a2_cost
                # print(water_x,water_y,self.water_levels[water_x][water_y],new_x,new_y,new_node.f, a1_cost, a2_cost)
                sequence.append([self.start.x, self.start.y])

                sequence_length = len(sequence)

                if not self.__collision_check(new_node, self.obstacle_list, sequence_length):
                    continue

                if self.distance(new_node,self.end) <= 0.25:
                    print("Goal!")
                    path = []
                    while current_node:
                        path.append([current_node.x, current_node.y])
                        current_node = current_node.parent
                    path.reverse()
                    return path
                if new_node not in closed_set and self.water_levels[water_x][water_y] > 3:
                    heapq.heappush(open_set, new_node)
                    self.node_list.append(new_node)
                    expanded_node += 1
                elif self.water_levels[water_x][water_y] <= 3 and random.randint(0, 100) > 80:
                    heapq.heappush(open_set, new_node)
                    self.node_list.append(new_node)
                    expanded_node += 1

            if animation:
                # Update the visualization (you can modify this part as needed)
                self.draw_graph(sequence_length,rnd)
        return None  # Failed to find the path

    def get_nearest_list_index(self, node_list, rnd):
        distance_list = [math.sqrt((node.x - rnd[0]) ** 2 + (node.y - rnd[1]) ** 2) for node in node_list]
        minind = distance_list.index(min(distance_list))
        return minind


    def distance(self, node1, node2):
        return math.sqrt((node1.x - node2.x) ** 2 + (node1.y - node2.y) ** 2)

    def octile_distance(self,node1, node2):
        # Extract coordinates of the two nodes
        x1, y1 = node1.x, node1.y
        x2, y2 = node2.x, node2.y

        # Calculate the absolute differences in coordinates
        dx = abs(x1 - x2)
        dy = abs(y1 - y2)

        # Calculate the diagonal and straight distance components
        min_d = min(dx, dy)
        max_d = max(dx, dy)

        # Calculate the octile distance using the diagonal and straight components
        return max_d + (1.414 - 2) * min_d

    def get_node_index(self,node_list, node_val):

        for i, node in enumerate(node_list) :
            if node_val == node:
                return i
        return None

    def is_surrounded_by_obstacles(self, x, y):
        # Check if the cell at (x, y) is surrounded by obstacles

        # Define the eight possible neighboring cell offsets (4 cardinal directions and 4 diagonals)
        neighbors = [(1, 0), (-1, 0), (0, 1), (0, -1), (1, 1), (1, -1), (-1, 1), (-1, -1)]

        # Iterate through the neighboring cells and check if they are obstacles
        for dx, dy in neighbors:
            nx, ny = x + dx, y + dy
            if (nx, ny) not in self.obstacle_list:
                # At least one neighboring cell is not an obstacle
                return False

        # All neighboring cells are obstacles
        return True


    def __collision_check(self, node, obstacle_list, sequence_length):
        for (ox, oy, size) in obstacle_list:
            ox += sequence_length * move_obs[0]
            oy += sequence_length * move_obs[1]
            dx = ox - node.x
            dy = oy - node.y
            d = math.sqrt(dx ** 2 + dy ** 2)
            x_o = np.array([ox, oy])
            x_r = np.array([node.x, node.y])
            cov_robo = sequence_length * cov_rob
            bhatta_dist = bhattacharyya_distance(x_r, x_o, cov_robo, cov_obs)
            # print(d, bhatta_dist)
            if np.abs(bhatta_dist) <= 100:
                return False
        return True


    def draw_graph(self, sequence_length, rnd=None):
        global image_counter

        # plt.clf()
        # if rnd is not None:
        # 	plt.plot(rnd[0], rnd[1],'^k')
        # 	x_r = np.array([rnd[0],rnd[1]])
        # 	cov_robo = sequence_length*cov_rob
        # 	xr,yr = draw_cov(x_r, cov_robo, p=0.95)
        # 	plt.plot(xr,yr,'-r')
        # 	# time.sleep(2)

        for node in self.node_list:
            if node.parent is not None:
                plt.plot([node.x, self.node_list[self.get_node_index(self.node_list,node.parent)].x], [node.y, self.node_list[self.get_node_index(self.node_list,node.parent)].y], "-g")
        for (ox, oy, size) in self.obstacle_list:
            ox += sequence_length*move_obs[0]
            oy += sequence_length*move_obs[1]
            x_o = np.array([ox,oy])
            x,y = draw_cov(x_o, cov_obs, p=0.95)
            # plt.plot(x,y,'-b')
            plt.plot(ox, oy, "s", color='black', markersize=size*30)


        plt.plot(self.start.x, self.start.y, "xr")
        plt.plot(self.end.x, self.end.y, "xr")
        plt.axis([-15, 15, -15, 15])
        plt.grid(True)
        # plt.savefig('image%04d'%image_counter)
        image_counter += 1
        plt.pause(0.01)


def main():
    global image_counter, sx, sy, gx, gy, obstacleList

    water_levels = [[3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,6,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,6,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,6,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,6,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,6,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,6,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,6,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,6,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,6,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,6,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,6,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [6,6,6,6,6,6,6,6,6,6,6,6,3,3,3,3,3,3,3,3,6,6,6,6,6,6,6,6,6,6],
                    [3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,6,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,6,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,6,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    [3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3],
                    ]
    # water_levels = [[3] * 30 for _ in range(30)]
    # for x in range(-10, 7):
    #     for y in range(-10, 13):
    #         water_levels[x][y] = 5

    # Set specific cells to values above 3
    start_time = time.time()
    rrt = RRT(start=[-10, -10], goal=[gx, gy], rand_area=[-15, 15], obstacle_list=obstacleList,water_levels=water_levels)
    path = rrt.Astar(animation=show_animation)

    end_time = time.time()
    global run_time, expanded_nodes, path_length, expanded_node
    run_time[4] = end_time - start_time
    expanded_nodes[4] = expanded_node
    path_length[4] = len(path)
    	# Draw final path
    if show_animation:  # pragma: no cover
      # rrt.draw_graph()
        # length_path = len(path)
        # path = path[::-1]
        # print(path.shape)
        rrt.node_list =  list(reversed(rrt.node_list))
        i = 1
        for (x,y) in path:

            plt.clf()

            for node in rrt.node_list:
                if node.parent is not None:
                    plt.plot([node.x, rrt.node_list[rrt.get_node_index(rrt.node_list,node.parent)].x], [node.y, rrt.node_list[rrt.get_node_index(rrt.node_list,node.parent)].y], "-g")

            x_f = np.array([x,y])
            cov_robo = cov_rob*(i-1)
            xf,yf = draw_cov(x_f, cov_robo, p=0.95)
            plt.plot(xf,yf,'--m')
            # print(path)
            # path1 = list(reversed(path))
            # print(path1)
            plt.plot([x for (x, y) in (path[0:i+1])], [y for (x, y) in path[0:i+1]], '-r')

            for (ox, oy, size) in obstacleList:
                ox += i*move_obs[0]
                oy += i*move_obs[1]
                x_o = np.array([ox,oy])
                x,y = draw_cov(x_o, cov_obs, p=0.95)
                # plt.plot(x,y,'-b')
                plt.plot(ox, oy, "s", color='black', markersize=size*30)

            plt.grid(True)
            plt.axis([-15, 15, -15, 15])
            plt.plot(sx, sy, "xr")
            plt.plot(gx, gy, "xr")
            plt.pause(0.5)
            # plt.savefig('image%04d'%image_counter)
            # image_counter += 1
            i = i + 1

if __name__ == '__main__':
	main()



Goal!


In [None]:
global run_time, expanded_nodes, path_length
runtime =[2.6616082191467285, 1.0273053646087646, 0.03650856018066406, 0.2, 0.156]
expanded_nodes = [1594, 1052, 16, 50, 40]
path_length =[13, 23, 6,5,10]



run_time[3] = 0.006*expanded_nodes[3]
run_time[4] = 0.004*expanded_nodes[4]


<h1>Plots</h1>

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Arrays for the data
algorithms = ['RRT', 'RRT*', 'EERT', 'A*', 'Improved A*']

# Create subplots for the three properties
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# Plot run_time
axes[0].bar(algorithms, run_time, color='b', alpha=0.7)
axes[0].set_title('Run Time')
axes[0].set_ylabel('Time (seconds)')

# Plot expanded_nodes
axes[1].bar(algorithms, expanded_nodes, color='g', alpha=0.7)
axes[1].set_title('Expanded Nodes')
axes[1].set_ylabel('Number of Nodes')

# Plot path_length
axes[2].bar(algorithms, path_length, color='r', alpha=0.7)
axes[2].set_title('Path Length')
axes[2].set_ylabel('Length')

# Add labels to the x-axis for readability
for ax in axes:
    ax.set_xlabel('Algorithms')
    ax.set_xticklabels(algorithms, rotation=15, ha='right')

plt.tight_layout()
plt.show()