In [10]:
%%bash --bg
# [Cell 1] - Start ROS Core
source /opt/ros/noetic/setup.bash
roscore


In [11]:
%%bash --bg
# [Cell 2] - Start Gazebo Simulation
source /opt/ros/noetic/setup.bash
export TURTLEBOT3_MODEL=burger
roslaunch turtlebot3_gazebo turtlebot3_world.launch


In [12]:
# [Revised Cell 3] - Autonomous Navigation with Integrated Obstacle Avoidance
import rospy
import numpy as np
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
from nav_msgs.msg import OccupancyGrid, Odometry

class AutonomousNavigator:
    def __init__(self):
        # ROS Initialization
        if not rospy.core.is_initialized():
            rospy.init_node('autonomous_navigator', anonymous=True)
        
        # Publishers/Subscribers
        self.cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
        rospy.Subscriber('/scan', LaserScan, self.scan_callback)
        rospy.Subscriber('/odom', Odometry, self.odom_callback)
        
        # Configuration
        self.rate = rospy.Rate(10)
        self.safe_distance = 0.5  # meters
        self.current_pose = None
        self.latest_scan = None
        
        # Path Planning Parameters
        self.grid_map = np.zeros((10, 10))  # Will be updated with real map data
        self.path = []
        self.current_waypoint = 0
        
        # Initialize after 2 seconds to ensure subscribers are ready
        rospy.Timer(rospy.Duration(2), self.initialize_navigation, oneshot=True)

    def initialize_navigation(self, event):
        """Start navigation after initial setup"""
        self.replan_path()
        self.follow_path()

    def scan_callback(self, data):
        """Process laser scan data for obstacle detection"""
        self.latest_scan = data.ranges
        self.check_obstacles()

    def odom_callback(self, data):
        """Store current robot position"""
        self.current_pose = data.pose.pose

    def check_obstacles(self):
        """Check for obstacles in the immediate path"""
        if self.latest_scan is None:
            return
            
        # Analyze front 90-degree sector
        front_scan = np.array(self.latest_scan[-45:] + self.latest_scan[:45])
        obstacle_distance = np.nanmin(front_scan)
        
        if obstacle_distance < self.safe_distance:
            self.avoid_obstacle()

    def avoid_obstacle(self):
        """Emergency obstacle avoidance maneuver"""
        stop_cmd = Twist()
        stop_cmd.linear.x = 0.0
        self.cmd_vel_pub.publish(stop_cmd)
        
        # Turn until path is clear
        turn_cmd = Twist()
        turn_cmd.angular.z = 0.5 if np.random.rand() > 0.5 else -0.5
        self.cmd_vel_pub.publish(turn_cmd)
        rospy.sleep(1)
        
        # Replan path after obstacle detection
        self.replan_path()

    def replan_path(self):
        """Dynamic path replanning with obstacle avoidance"""
        if self.current_pose is None:
            return
            
        # Convert current position to grid coordinates
        start_x = int(self.current_pose.position.x / 0.2)
        start_y = int(self.current_pose.position.y / 0.2)
        goal = (9, 9)  # Fixed goal for demonstration
        
        # Replan path with updated grid map
        self.path = self.astar(self.grid_map, (start_x, start_y), goal)
        self.current_waypoint = 0
        
        if not self.path:
            rospy.logwarn("No valid path found!")
            return
            
        rospy.loginfo("New path computed with %d waypoints", len(self.path))

    def follow_path(self):
        """Follow the computed path with continuous obstacle checking"""
        while not rospy.is_shutdown() and self.current_waypoint < len(self.path):
            target = self.path[self.current_waypoint]
            self.move_to_waypoint(target)
            self.rate.sleep()

    def move_to_waypoint(self, target):
        """Adaptive movement controller with obstacle awareness"""
        cmd = Twist()
        cmd.linear.x = 0.2  # Base forward speed
        
        # Simple proportional controller for steering
        if self.current_pose is not None:
            dx = target[0] - self.current_pose.position.x
            dy = target[1] - self.current_pose.position.y
            angle_to_target = np.arctan2(dy, dx)
            cmd.angular.z = 0.5 * angle_to_target  # P-controller
            
        self.cmd_vel_pub.publish(cmd)
        
        # Check if waypoint is reached
        distance = np.sqrt(dx**2 + dy**2)
        if distance < 0.1:  # 10cm tolerance
            self.current_waypoint += 1

    @staticmethod
    def astar(grid, start, goal):
        """Improved A* implementation with 8-direction movement"""
        # [Keep the original A* implementation but add obstacle checking]
        # ... (same as original A* code but using current grid map) ...
        return path  # Return computed path

# Initialize and run
if __name__ == '__main__':
    try:
        navigator = AutonomousNavigator()
        rospy.spin()
    except rospy.ROSInterruptException:
        pass

In [9]:
# ---------------------------------------------
# Cell 3: Autonomous Navigation with A* and Obstacle Avoidance
# ---------------------------------------------
import rospy
from geometry_msgs.msg import Twist
from nav_msgs.msg import OccupancyGrid
import math
from queue import PriorityQueue
import time

# --- A* Algorithm Implementation ---
def astar(grid, start, goal):
    """
    A* path planning on a 2D grid.
    
    grid: 2D list with 0 for free space and 1 for obstacles.
    start: tuple (x, y) starting coordinate.
    goal: tuple (x, y) goal coordinate.
    
    Returns a list of coordinates from start to goal.
    """
    rows, cols = len(grid), len(grid[0])
    
    # Helper: check if a cell is valid
    def valid(cell):
        x, y = cell
        return 0 <= x < rows and 0 <= y < cols and grid[x][y] == 0
    
    # Heuristic: Euclidean distance
    def heuristic(cell, goal):
        return math.sqrt((cell[0] - goal[0])**2 + (cell[1] - goal[1])**2)
    
    # Movements: 4-connected grid (up, down, left, right)
    moves = [(-1, 0), (1, 0), (0, -1), (0, 1)]
    
    # Priority queue: (cost, cell, path)
    queue = PriorityQueue()
    queue.put((0 + heuristic(start, goal), 0, start, [start]))
    visited = set()
    
    while not queue.empty():
        est_total_cost, cost, current, path = queue.get()
        if current in visited:
            continue
        visited.add(current)
        
        if current == goal:
            return path
        
        for move in moves:
            next_cell = (current[0] + move[0], current[1] + move[1])
            if valid(next_cell) and next_cell not in visited:
                new_cost = cost + 1  # assume uniform cost
                est_total = new_cost + heuristic(next_cell, goal)
                new_path = path + [next_cell]
                queue.put((est_total, new_cost, next_cell, new_path))
    return []  # no path found

# --- Example Occupancy Grid Map ---
# 0 means free space, 1 means obstacle.
# For simplicity, we use a 10x10 grid.
grid_map = [
    [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
    [0, 1, 1, 1, 0, 0, 0, 1, 1, 0],
    [0, 1, 0, 0, 0, 1, 0, 0, 1, 0],
    [0, 1, 0, 1, 0, 1, 0, 1, 1, 0],
    [0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
    [0, 1, 0, 0, 0, 1, 1, 1, 0, 0],
    [0, 1, 1, 1, 0, 0, 0, 1, 0, 0],
    [0, 0, 0, 1, 1, 1, 0, 1, 0, 0],
    [0, 1, 0, 0, 0, 0, 0, 0, 0, 0],
    [0, 1, 1, 1, 1, 1, 1, 1, 1, 0]
]

# Define start and goal points on the grid (row, col)
start = (0, 0)
goal  = (9, 9)

# Compute the path using A*
path = astar(grid_map, start, goal)
print("Computed Path:", path)

# --- ROS Node to Follow the Path ---
class AutonomousNavigator:
    def __init__(self):
        rospy.init_node('autonomous_navigator', anonymous=True)
        self.cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
        self.rate = rospy.Rate(10)
        
        # Assume each grid cell corresponds to 0.2 meters in the Gazebo world.
        self.scale = 0.2
        
        # Convert grid path to world coordinates (simple conversion)
        self.waypoints = [(x * self.scale, y * self.scale) for x, y in path]
        print("Waypoints in world coordinates:", self.waypoints)
    
    def move_to_waypoint(self, target):
        """
        Simple controller: Move forward until close to target.
        This is a naive approach for illustration.
        """
        tol = 0.1  # tolerance in meters
        cmd = Twist()
        
        # In a real scenario, you would use odometry to know your current position.
        # For this example, we will simulate the process with a loop.
        # Here, we assume that calling this function sends the command and then waits.
        # In practice, you'd replace this with a proper feedback controller.
        
        # Set a constant forward speed for demonstration
        cmd.linear.x = 0.1
        cmd.angular.z = 0.0
        
        # Publish for a fixed duration to simulate movement
        move_duration = 3  # seconds (adjust as needed)
        start_time = time.time()
        while time.time() - start_time < move_duration and not rospy.is_shutdown():
            self.cmd_vel_pub.publish(cmd)
            self.rate.sleep()
    
    def follow_path(self):
        """
        Iterate through the list of waypoints and command the robot to move.
        This is a naive sequential follower.
        """
        for waypoint in self.waypoints:
            print("Moving towards waypoint:", waypoint)
            self.move_to_waypoint(waypoint)
            time.sleep(1)
            
        # After reaching the final waypoint, stop the robot.
        stop_cmd = Twist()
        stop_cmd.linear.x = 0.0
        stop_cmd.angular.z = 0.0
        self.cmd_vel_pub.publish(stop_cmd)
        print("Goal reached. Robot stopped.")
    
    def run(self):
        self.follow_path()

# Launch the autonomous navigator
navigator = AutonomousNavigator()
navigator.run()


Computed Path: [(0, 0), (0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (1, 9), (2, 9), (3, 9), (4, 9), (5, 9), (6, 9), (7, 9), (8, 9), (9, 9)]
Waypoints in world coordinates: [(0.0, 0.0), (0.0, 0.2), (0.0, 0.4), (0.0, 0.6000000000000001), (0.0, 0.8), (0.0, 1.0), (0.0, 1.2000000000000002), (0.0, 1.4000000000000001), (0.0, 1.6), (0.0, 1.8), (0.2, 1.8), (0.4, 1.8), (0.6000000000000001, 1.8), (0.8, 1.8), (1.0, 1.8), (1.2000000000000002, 1.8), (1.4000000000000001, 1.8), (1.6, 1.8), (1.8, 1.8)]
Moving towards waypoint: (0.0, 0.0)
Moving towards waypoint: (0.0, 0.2)
Moving towards waypoint: (0.0, 0.4)
Moving towards waypoint: (0.0, 0.6000000000000001)
Moving towards waypoint: (0.0, 0.8)
Moving towards waypoint: (0.0, 1.0)
Moving towards waypoint: (0.0, 1.2000000000000002)
Moving towards waypoint: (0.0, 1.4000000000000001)


ROSInterruptException: ROS shutdown request

In [13]:
def astar(grid, start, goal):
    """
    grid: 2D NumPy array (0=free, 1=obstacle)
    start, goal: (x, y) tuple indices in the grid.
    """
    open_set = [Node(start[0], start[1], 0.0, None)]
    closed_set = set()
    
    while open_set:
        open_set.sort()  # sort nodes by f-cost
        current = open_set.pop(0)
        
        if (current.x, current.y) == goal:
            # Reconstruct path by following the parent pointers.
            path = []
            while current is not None:
                path.append((current.x, current.y))
                current = current.parent
            return path[::-1]  # Reverse so that it is from start to goal.
        
        closed_set.add((current.x, current.y))
        
        # Consider neighbors for 8-connected grid.
        for dx, dy, cost in [(-1, 0, 1.0), (1, 0, 1.0), (0, -1, 1.0), (0, 1, 1.0),
                             (-1, -1, 1.4), (1, -1, 1.4), (-1, 1, 1.4), (1, 1, 1.4)]:
            nx = current.x + dx
            ny = current.y + dy
            # Check grid boundaries.
            if nx < 0 or ny < 0 or nx >= grid.shape[0] or ny >= grid.shape[1]:
                continue
            # Skip obstacles.
            if grid[nx, ny] != 0:
                continue
            if (nx, ny) in closed_set:
                continue
            
            neighbor = Node(nx, ny, current.cost + cost, current)
            neighbor.f = neighbor.cost + heuristic((nx, ny), goal)
            
            # Skip if a better path for this neighbor exists.
            skip = False
            for node in open_set:
                if (node.x, node.y) == (nx, ny) and node.f <= neighbor.f:
                    skip = True
                    break
            if skip:
                continue
            open_set.append(neighbor)
    
    return None  # If no path is found.


In [14]:
class SimpleAStarNavigator:
    def __init__(self):
        rospy.init_node('simple_a_star_navigator')
        
        # Parameters: can be adjusted via ROS parameters.
        self.map_topic = rospy.get_param('~map_topic', '/map')
        self.cmd_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
        self.path_pub = rospy.Publisher('/a_star_path', Marker, queue_size=1)
        
        self.map_data = None
        rospy.Subscriber(self.map_topic, OccupancyGrid, self.map_callback)
        
        # Fixed start and goal cells (grid indices). In practice, these would be dynamic.
        self.start_cell = (10, 10)
        self.goal_cell = (50, 50)
        
        rospy.sleep(2)  # Allow time for the map callback to run.
        if self.map_data is None:
            rospy.logerr("Map data is not available!")
            return
        
        grid = self.process_map(self.map_data)
        rospy.loginfo("Running A* on a grid of shape %s", grid.shape)
        path = astar(grid, self.start_cell, self.goal_cell)
        
        if path is None:
            rospy.logerr("No path found!")
        else:
            rospy.loginfo("Path found with %d nodes", len(path))
            self.publish_path(path)
            # For this simple example, we directly drive along the path.
            self.drive_along_path(path)
        
        rospy.spin()


In [15]:
def map_callback(self, msg):
    self.map_data = msg

def process_map(self, msg):
    """Convert OccupancyGrid message into a 2D numpy array (free:0, occupied:1)."""
    width = msg.info.width
    height = msg.info.height
    data = np.array(msg.data).reshape(height, width)  # row-major order
    grid = np.zeros((height, width), dtype=np.int8)
    grid[data >= 50] = 1  # Treat occupancy values >= 50 as obstacles.
    return grid


In [16]:
def publish_path(self, path):
        """Publish the computed path as a visualization marker for RViz."""
        marker = Marker()
        marker.header.frame_id = "map"
        marker.header.stamp = rospy.Time.now()
        marker.ns = "a_star"
        marker.id = 0
        marker.type = Marker.LINE_STRIP  # This creates a line connecting points.
        marker.action = Marker.ADD
        marker.scale.x = 0.05  # Line thickness
        marker.color.a = 1.0
        marker.color.r = 1.0
        
        # Add points along the path.
        for cell in path:
            pt = Point()
            # Convert cell indices to world coordinates.
            resolution = 0.05  # Adjust according to your map's resolution.
            pt.x = cell[1] * resolution  # x coordinate (column)
            pt.y = cell[0] * resolution  # y coordinate (row)
            pt.z = 0.0
            marker.points.append(pt)
        
        self.path_pub.publish(marker)


In [17]:
def drive_along_path(self, path):
        """
        For each waypoint in the path, send a simple forward command.
        Note: In a realistic application, you would implement a PID controller or 
        another feedback mechanism to precisely track waypoints.
        """
        rate = rospy.Rate(10)
        for cell in path:
            twist = Twist()
            twist.linear.x = 0.1  # Fixed forward velocity.
            twist.angular.z = 0.0  # For simplicity, no rotation is controlled here.
            self.cmd_pub.publish(twist)
            rate.sleep()
        # Stop the robot after the path is complete.
        self.cmd_pub.publish(Twist())


In [18]:
if __name__ == '__main__':
    try:
        SimpleAStarNavigator()
    except rospy.ROSInterruptException:
        pass


ROSException: rospy.init_node() has already been called with different arguments: ('autonomous_navigator', ['/home/saif/Desktop/Project/.venv/lib/python3.8/site-packages/ipykernel_launcher.py', '--f=/run/user/1000/jupyter/runtime/kernel-v3adb035e13b6b4f68123f78640598fa20ad2dfb4f.json'], True, None, False, False)