<a href="https://colab.research.google.com/github/7amzaGH/NeptuNet-AUV-Intelligent-System/blob/main/navigation/navigation_full_system.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
"""
Pipeline Following Controller
Converts YOLO bounding box detections to motor commands
"""

import numpy as np
from dataclasses import dataclass

@dataclass
class ControlCommand:
    """Motor control commands (normalized -1 to 1)"""
    surge: float = 0.0      # Forward/Backward
    sway: float = 0.0       # Right/Left
    heave: float = 0.0      # Up/Down
    yaw: float = 0.0        # Rotation

class PipelineController:
    """
    Generates control commands to center pipeline in camera view
    while maintaining forward motion.
    """

    def __init__(self, frame_width=640, frame_height=480,
                 tolerance=20, camera_angle=45.0):
        self.frame_width = frame_width
        self.frame_height = frame_height
        self.tolerance = tolerance
        self.camera_angle = camera_angle

        self.target_pipeline_width = 0.3  # 30% of frame

        # Control gains - tuned experimentally
        self.kp_horizontal = 0.01
        self.kp_vertical = 0.01
        self.kp_distance = 0.5

    def compute_control(self, box, depth_data=None):
        """
        Convert bounding box to control commands.

        Args:
            box: [x1, y1, x2, y2] from YOLO detection
            depth_data: dict with 'depth' and 'target_depth' keys

        Returns:
            ControlCommand with normalized values
        """
        cmd = ControlCommand()

        if box is None or len(box) != 4:
            return cmd

        # Calculate box center and dimensions
        x_center = (box[0] + box[2]) / 2
        y_center = (box[1] + box[3]) / 2
        box_width = box[2] - box[0]

        frame_center_x = self.frame_width / 2
        frame_center_y = self.frame_height / 2

        # Position errors
        error_x = x_center - frame_center_x
        error_y = y_center - frame_center_y

        # Constant forward motion when pipeline detected
        cmd.surge = 0.5

        # Horizontal centering
        if abs(error_x) > self.tolerance:
            cmd.sway = np.clip(error_x * self.kp_horizontal, -1.0, 1.0)

        # Vertical centering (depends on camera orientation)
        if abs(error_y) > self.tolerance:
            if self.camera_angle > 60:
                # Downward camera: adjust forward speed
                speed_adj = -error_y * self.kp_vertical * 0.3
                cmd.surge += np.clip(speed_adj, -0.2, 0.2)
            else:
                # Forward camera: adjust altitude
                cmd.heave = np.clip(-error_y * self.kp_vertical, -1.0, 1.0)

        # Safety: slow down if too close
        desired_width = self.frame_width * self.target_pipeline_width
        if box_width > desired_width * 1.5:
            cmd.surge *= 0.5

        # Depth control
        if depth_data:
            depth_error = depth_data['depth'] - depth_data['target_depth']
            if abs(depth_error) > 0.2:
                cmd.heave += np.clip(depth_error * 0.5, -0.3, 0.3)

        # Yaw correction for large lateral errors
        if abs(error_x) > self.frame_width * 0.2:
            cmd.yaw = np.clip(error_x * 0.005, -0.3, 0.3)

        # Final clipping
        cmd.surge = np.clip(cmd.surge, -1.0, 1.0)
        cmd.sway = np.clip(cmd.sway, -1.0, 1.0)
        cmd.heave = np.clip(cmd.heave, -1.0, 1.0)
        cmd.yaw = np.clip(cmd.yaw, -1.0, 1.0)

        return cmd

    def get_binary_commands(self, box, depth_data=None):
        """
        Simple on/off commands for basic thruster control.

        Returns:
            Dictionary with binary direction commands
        """
        cmd = self.compute_control(box, depth_data)

        return {
            "left": 1 if cmd.sway < -0.1 else 0,
            "right": 1 if cmd.sway > 0.1 else 0,
            "up": 1 if cmd.heave > 0.1 else 0,
            "down": 1 if cmd.heave < -0.1 else 0,
            "forward": 1 if cmd.surge > 0.1 else 0,
            "backward": 1 if cmd.surge < -0.1 else 0,
        }


if __name__ == "__main__":
    controller = PipelineController(frame_width=640, frame_height=480)

    # Test with sample detection
    bbox = [200, 150, 400, 350]
    depth = {'depth': 2.3, 'target_depth': 2.0}

    cmd = controller.compute_control(bbox, depth)
    print(f"Control: surge={cmd.surge:.2f}, sway={cmd.sway:.2f}, "
          f"heave={cmd.heave:.2f}, yaw={cmd.yaw:.2f}")

    binary = controller.get_binary_commands(bbox, depth)
    print(f"Binary: {binary}")

Control: surge=0.50, sway=0.00, heave=0.15, yaw=0.00
Binary: {'left': 0, 'right': 0, 'up': 1, 'down': 0, 'forward': 1, 'backward': 0}


In [4]:
"""
Motion Stabilization with PID Control
Maintains stable orientation using IMU feedback
"""

import time
import numpy as np
#from controller import ControlCommand


class PIDController:
    """
    Standard PID controller implementation.
    """

    def __init__(self, kp=0.5, ki=0.1, kd=0.2, output_limit=1.0):
        self.kp = kp
        self.ki = ki
        self.kd = kd
        self.output_limit = output_limit

        self.integral = 0.0
        self.prev_error = 0.0
        self.prev_time = time.time()

    def update(self, error):
        """
        Calculate control output.

        Args:
            error: Difference between setpoint and current value

        Returns:
            Control signal
        """
        current_time = time.time()
        dt = current_time - self.prev_time
        if dt <= 0:
            dt = 0.01

        # PID terms
        p_term = self.kp * error

        self.integral += error * dt
        self.integral = np.clip(self.integral, -10, 10)  # anti-windup
        i_term = self.ki * self.integral

        d_term = self.kd * (error - self.prev_error) / dt

        output = p_term + i_term + d_term
        output = np.clip(output, -self.output_limit, self.output_limit)

        self.prev_error = error
        self.prev_time = current_time

        return output

    def reset(self):
        """Reset internal state."""
        self.integral = 0.0
        self.prev_error = 0.0
        self.prev_time = time.time()


class MotionController:
    """
    High-level motion control with attitude stabilization.
    Uses PID feedback from IMU to maintain level orientation.
    """

    def __init__(self):
        # PID controllers for each axis
        self.pid_roll = PIDController(kp=0.5, ki=0.1, kd=0.2)
        self.pid_pitch = PIDController(kp=0.5, ki=0.1, kd=0.2)
        self.pid_yaw = PIDController(kp=0.3, ki=0.05, kd=0.15)
        self.pid_depth = PIDController(kp=1.0, ki=0.2, kd=0.5)

        # Target orientation (level)
        self.target_roll = 0.0
        self.target_pitch = 0.0
        self.target_yaw = 0.0

    def stabilize(self, cmd, imu_data, depth_data):
        """
        Apply stabilization corrections to control commands.

        Args:
            cmd: Raw ControlCommand from pipeline controller
            imu_data: dict with 'roll', 'pitch', 'yaw' in degrees
            depth_data: dict with 'depth' and 'target_depth'

        Returns:
            Stabilized ControlCommand
        """
        stabilized = ControlCommand()

        # Attitude stabilization
        roll_error = self.target_roll - imu_data['roll']
        roll_correction = self.pid_roll.update(roll_error)

        pitch_error = self.target_pitch - imu_data['pitch']
        pitch_correction = self.pid_pitch.update(pitch_error)

        stabilized.sway = cmd.sway - roll_correction * 0.3
        stabilized.surge = cmd.surge - pitch_correction * 0.3

        # Heading control
        yaw_error = self._normalize_angle(self.target_yaw - imu_data['yaw'])
        yaw_correction = self.pid_yaw.update(yaw_error)
        stabilized.yaw = cmd.yaw + yaw_correction

        # Depth control
        depth_error = depth_data['target_depth'] - depth_data['depth']
        depth_correction = self.pid_depth.update(depth_error)
        stabilized.heave = cmd.heave + depth_correction

        # Clip outputs
        stabilized.surge = np.clip(stabilized.surge, -1.0, 1.0)
        stabilized.sway = np.clip(stabilized.sway, -1.0, 1.0)
        stabilized.heave = np.clip(stabilized.heave, -1.0, 1.0)
        stabilized.yaw = np.clip(stabilized.yaw, -1.0, 1.0)

        return stabilized

    def set_target_heading(self, yaw):
        """Set desired heading angle in degrees."""
        self.target_yaw = yaw

    def _normalize_angle(self, angle):
        """Normalize angle to [-180, 180] range."""
        while angle > 180:
            angle -= 360
        while angle < -180:
            angle += 360
        return angle


if __name__ == "__main__":
    #from controller import PipelineController

    controller = PipelineController()
    motion = MotionController()

    # Test with tilted AUV
    bbox = [200, 150, 400, 350]
    imu = {'roll': 5.0, 'pitch': -3.0, 'yaw': 90.0}
    depth = {'depth': 2.3, 'target_depth': 2.0}

    raw_cmd = controller.compute_control(bbox, depth)
    print(f"Raw: surge={raw_cmd.surge:.2f}, sway={raw_cmd.sway:.2f}")

    stable_cmd = motion.stabilize(raw_cmd, imu, depth)
    print(f"Stabilized: surge={stable_cmd.surge:.2f}, sway={stable_cmd.sway:.2f}")

Raw: surge=0.50, sway=0.00
Stabilized: surge=0.20, sway=0.30


In [5]:
"""
Path Planning and Search Behavior
Handles pipeline loss and search patterns
"""

import numpy as np
import time
#from controller import ControlCommand


class PathPlanner:
    """
    State machine for navigation behavior.
    Manages transitions between SEARCHING, FOLLOWING, and LOST states.
    """

    def __init__(self, lost_threshold=30):
        self.state = "SEARCHING"
        self.lost_counter = 0
        self.lost_threshold = lost_threshold

        self.search_start_time = time.time()

    def update_state(self, detection_found):
        """
        Update state based on detection status.

        Args:
            detection_found: True if pipeline detected in current frame

        Returns:
            Current state string
        """
        if detection_found:
            self.state = "FOLLOWING"
            self.lost_counter = 0
        else:
            self.lost_counter += 1

            if self.lost_counter > self.lost_threshold:
                self.state = "LOST"
            elif self.state == "FOLLOWING":
                self.state = "SEARCHING"

        return self.state

    def get_search_command(self):
        """
        Generate search pattern when pipeline is not visible.
        Uses sweeping motion to relocate pipeline.

        Returns:
            ControlCommand for search behavior
        """
        cmd = ControlCommand()

        # Slow forward motion with oscillating rotation
        cmd.surge = 0.2

        search_time = time.time() - self.search_start_time
        cmd.yaw = 0.3 * np.sin(search_time * 0.5)

        return cmd

    def compute_waypoint_heading(self, current_pos, waypoint):
        """
        Calculate required heading to reach waypoint.

        Args:
            current_pos: (x, y) in meters
            waypoint: (x, y) target position

        Returns:
            Heading in degrees [0, 360)
        """
        dx = waypoint[0] - current_pos[0]
        dy = waypoint[1] - current_pos[1]

        target_heading = np.degrees(np.arctan2(dy, dx))

        if target_heading < 0:
            target_heading += 360

        return target_heading

    def get_distance_to_waypoint(self, current_pos, waypoint):
        """Calculate Euclidean distance to waypoint."""
        dx = waypoint[0] - current_pos[0]
        dy = waypoint[1] - current_pos[1]
        return np.sqrt(dx**2 + dy**2)

    def is_waypoint_reached(self, current_pos, waypoint, tolerance=1.0):
        """
        Check if within acceptance radius of waypoint.

        Args:
            current_pos: Current (x, y) position
            waypoint: Target (x, y) position
            tolerance: Acceptance radius in meters

        Returns:
            True if waypoint reached
        """
        distance = self.get_distance_to_waypoint(current_pos, waypoint)
        return distance < tolerance


if __name__ == "__main__":
    #from controller import PipelineController

    planner = PathPlanner(lost_threshold=30)
    controller = PipelineController()

    print("Testing state transitions:")

    detections = [
        (1, True, "Pipeline visible"),
        (2, True, "Following"),
        (3, False, "Lost detection"),
        (4, False, "Searching..."),
        (5, False, "Still searching..."),
    ]

    for frame, detected, note in detections:
        state = planner.update_state(detected)
        print(f"Frame {frame}: {note} -> State: {state}")

        if state == "LOST":
            search_cmd = planner.get_search_command()
            print(f"  Search command: surge={search_cmd.surge:.2f}, yaw={search_cmd.yaw:.2f}")

    print("\nTesting waypoint navigation:")
    current = (0.0, 0.0)
    waypoint = (10.0, 5.0)

    heading = planner.compute_waypoint_heading(current, waypoint)
    distance = planner.get_distance_to_waypoint(current, waypoint)
    reached = planner.is_waypoint_reached(current, waypoint)

    print(f"Current: {current}, Target: {waypoint}")
    print(f"Heading: {heading:.1f}¬∞, Distance: {distance:.2f}m, Reached: {reached}")

Testing state transitions:
Frame 1: Pipeline visible -> State: FOLLOWING
Frame 2: Following -> State: FOLLOWING
Frame 3: Lost detection -> State: SEARCHING
Frame 4: Searching... -> State: SEARCHING
Frame 5: Still searching... -> State: SEARCHING

Testing waypoint navigation:
Current: (0.0, 0.0), Target: (10.0, 5.0)
Heading: 26.6¬∞, Distance: 11.18m, Reached: False


In [6]:
"""
Complete AUV Navigation System Integration
Shows how to use all three modules together
"""

import time
#from controller import PipelineController
#from motion_control import MotionController
#from path_planning import PathPlanner


def main():
    """
    Main navigation loop
    Integrates: YOLO detection ‚Üí Controller ‚Üí Motion Control ‚Üí Thrusters
    """

    print("ü§ñ AUV Navigation System Starting...")
    print("="*60)

    # === INITIALIZE ALL MODULES ===
    controller = PipelineController(
        frame_width=640,
        frame_height=480,
        camera_angle=45.0
    )

    motion = MotionController()
    planner = PathPlanner(lost_threshold=30)

    print("‚úÖ All modules initialized\n")

    # === SIMULATION: Main Control Loop ===
    print("Starting navigation simulation...\n")

    # Simulate different scenarios over time
    scenarios = [
        # (time, bbox, imu, depth, description)
        (1, [220, 190, 420, 290], {'roll': 0, 'pitch': 0, 'yaw': 90},
         {'depth': 2.0, 'target_depth': 2.0}, "Pipeline centered"),

        (2, [320, 190, 520, 290], {'roll': 2, 'pitch': -1, 'yaw': 90},
         {'depth': 2.1, 'target_depth': 2.0}, "Pipeline drifting right"),

        (3, [400, 200, 600, 300], {'roll': 3, 'pitch': 1, 'yaw': 92},
         {'depth': 2.3, 'target_depth': 2.0}, "Far right, too deep"),

        (4, None, {'roll': 5, 'pitch': 2, 'yaw': 95},
         {'depth': 2.5, 'target_depth': 2.0}, "PIPELINE LOST!"),

        (5, None, {'roll': 4, 'pitch': 1, 'yaw': 98},
         {'depth': 2.4, 'target_depth': 2.0}, "Still searching..."),
    ]

    for t, bbox, imu, depth, description in scenarios:
        print(f"‚è±Ô∏è  Time: {t}s - {description}")
        print("-" * 60)

        # === STEP 1: Update Navigation State ===
        detection_found = bbox is not None
        state = planner.update_state(detection_found)
        print(f"üì° State: {state}")

        # === STEP 2: Get Control Command ===
        if state == "FOLLOWING":
            # Normal pipeline following
            raw_cmd = controller.compute_control(bbox, depth)
            print(f"üéÆ Raw Control: surge={raw_cmd.surge:+.2f}, "
                  f"sway={raw_cmd.sway:+.2f}, heave={raw_cmd.heave:+.2f}")

        elif state == "LOST":
            # Execute search pattern
            raw_cmd = planner.get_search_command()
            print(f"üîç Search Pattern: surge={raw_cmd.surge:+.2f}, "
                  f"yaw={raw_cmd.yaw:+.2f}")
        else:
            # Searching state (just lost)
            raw_cmd = planner.get_search_command()
            print(f"üîç Initiating search...")

        # === STEP 3: Apply Motion Stabilization ===
        stabilized_cmd = motion.stabilize(raw_cmd, imu, depth)
        print(f"‚ö° Stabilized: surge={stabilized_cmd.surge:+.2f}, "
              f"sway={stabilized_cmd.sway:+.2f}, "
              f"heave={stabilized_cmd.heave:+.2f}, "
              f"yaw={stabilized_cmd.yaw:+.2f}")

        # === STEP 4: Send to Thrusters (Your Hardware Interface) ===
        # send_to_thrusters(stabilized_cmd)
        print(f"üöÄ Commands sent to thrusters\n")

        time.sleep(1)

    print("="*60)
    print("‚úÖ Simulation Complete")


def simple_example():
    """
    Simplified example for quick testing
    """
    print("\nüìù SIMPLE EXAMPLE\n")

    # Initialize
    controller = PipelineController()

    # YOLO detected pipeline at these coordinates
    bbox = [300, 200, 500, 300]  # [x1, y1, x2, y2]

    # Depth sensor reading
    depth = {'depth': 2.5, 'target_depth': 2.0}

    # Get control command
    cmd = controller.compute_control(bbox, depth)

    print(f"Pipeline detected at: {bbox}")
    print(f"Depth: {depth['depth']}m (target: {depth['target_depth']}m)")
    print(f"\nControl Commands:")
    print(f"  Forward/Back: {cmd.surge:+.2f}")
    print(f"  Left/Right:   {cmd.sway:+.2f}")
    print(f"  Up/Down:      {cmd.heave:+.2f}")
    print(f"  Rotation:     {cmd.yaw:+.2f}")

    # Or get simple binary commands
    binary = controller.get_binary_commands(bbox, depth)
    print(f"\nBinary Commands: {binary}")


# === HOW TO USE IN YOUR REAL SYSTEM ===
def real_world_integration():
    """
    Template for integrating with actual hardware
    """

    # Initialize navigation system
    controller = PipelineController(
        frame_width=640,
        frame_height=480,
        camera_angle=45.0  # Adjust to your camera mount
    )
    motion = MotionController()
    planner = PathPlanner()

    # Main loop (runs continuously)
    while True:
        # === GET SENSOR DATA ===
        # frame = camera.read()                    # Get camera frame
        # bbox = yolo_model.detect(frame)          # Run YOLO detection
        # imu = imu_sensor.read()                  # Read IMU
        # depth = depth_sensor.read()              # Read depth

        # === NAVIGATION LOGIC ===
        # state = planner.update_state(bbox is not None)

        # if state == "FOLLOWING":
        #     cmd = controller.compute_control(bbox, depth)
        # else:
        #     cmd = planner.get_search_command()

        # === STABILIZATION ===
        # stable_cmd = motion.stabilize(cmd, imu, depth)

        # === SEND TO THRUSTERS ===
        # thruster_interface.send(stable_cmd)

        # time.sleep(0.05)  # 20Hz control loop

        pass  # Remove this and add your code


if __name__ == "__main__":
    # Run main simulation
    main()

    # Or run simple example
    simple_example()

    # See real_world_integration() for actual implementation template

ü§ñ AUV Navigation System Starting...
‚úÖ All modules initialized

Starting navigation simulation...

‚è±Ô∏è  Time: 1s - Pipeline centered
------------------------------------------------------------
üì° State: FOLLOWING
üéÆ Raw Control: surge=+0.50, sway=+0.00, heave=+0.00
‚ö° Stabilized: surge=+0.50, sway=+0.00, heave=+0.00, yaw=-1.00
üöÄ Commands sent to thrusters

‚è±Ô∏è  Time: 2s - Pipeline drifting right
------------------------------------------------------------
üì° State: FOLLOWING
üéÆ Raw Control: surge=+0.50, sway=+1.00, heave=+0.00
‚ö° Stabilized: surge=+0.26, sway=+1.00, heave=-0.17, yaw=-1.00
üöÄ Commands sent to thrusters

‚è±Ô∏è  Time: 3s - Far right, too deep
------------------------------------------------------------
üì° State: FOLLOWING
üéÆ Raw Control: surge=+0.50, sway=+1.00, heave=+0.15
‚ö° Stabilized: surge=+0.77, sway=+1.00, heave=-0.33, yaw=-0.70
üöÄ Commands sent to thrusters

‚è±Ô∏è  Time: 4s - PIPELINE LOST!
---------------------------------------