# Learning: Dynamic Obstacles and Advanced Planning

This comprehensive notebook guides you through building a complete system for handling **dynamic obstacles** in a grid map environment.

## What You'll Learn

1. Creating grid maps with **static and dynamic obstacles**
2. Using **filters** to estimate object positions (Kalman Filter, Particle Filter)
3. Creating **costmaps with relative velocity** for dynamic obstacle avoidance
4. Building a **replanning planner** that adapts to dynamic environments
5. Researching and implementing a **controller adapted for dynamic planning**

## Instructions

- Read each section carefully
- Look for `# TODO:` comments - these indicate where you need to write code
- Fill in the `...` placeholders
- Run cells as you complete them to test your work

## Prerequisites

- Understanding of classes (see [Understanding Classes and OOP](../oop_fundamentals/understanding_classes_and_oop.ipynb))
- Understanding of planners (see [Building a Custom Planner](../planning/learning_build_planner.ipynb))
- Understanding of costmaps (see [Building a Custom Costmap](../costmaps/learning_build_costmap.ipynb))
- Understanding of filters (see [Sensor Algorithms](sensor_algorithms.ipynb))

In [None]:
import sys

sys.path.insert(0, '../../../src')

import matplotlib.pyplot as plt
import numpy as np

from simple_autonomous_car import (
    BasePlanner,
    Car,
    CarState,
    GoalPlanner,
    GridCostmap,
    GridMap,
    KalmanFilter,
    ParticleFilter,
)

## Step 1: Creating Grid Maps with Static and Dynamic Obstacles

First, let's extend the GridMap to support dynamic obstacles.

**TODO**: Create a class that manages both static and dynamic obstacles!

In [None]:
class DynamicGridMap:
    """
    Grid map with both static and dynamic obstacles.

    TODO: Fill in the implementation!
    """

    def __init__(self, width: float, height: float, resolution: float = 0.5):
        """
        Initialize dynamic grid map.

        TODO: Store width, height, resolution
        TODO: Initialize lists for static and dynamic obstacles
        """
        self.width = width
        self.height = height
        self.resolution = resolution

        # TODO: Initialize obstacle lists
        self.static_obstacles = []  # List of [x, y] positions
        self.dynamic_obstacles = []  # List of dicts: {"position": [x, y], "velocity": [vx, vy], "size": radius}

    def add_static_obstacle(self, position: np.ndarray, size: float = 1.0):
        """
        Add a static obstacle.

        TODO: Append to static_obstacles list
        """
        # TODO: Add obstacle with position and size
        self.static_obstacles.append({"position": position.copy(), "size": size})

    def add_dynamic_obstacle(self, position: np.ndarray, velocity: np.ndarray, size: float = 1.0):
        """
        Add a dynamic obstacle.

        TODO: Append to dynamic_obstacles list
        """
        # TODO: Add obstacle with position, velocity, and size
        self.dynamic_obstacles.append({
            "position": position.copy(),
            "velocity": velocity.copy(),
            "size": size
        })

    def update_dynamic_obstacles(self, dt: float):
        """
        Update positions of dynamic obstacles.

        TODO: For each dynamic obstacle, update position += velocity * dt
        TODO: Handle boundary conditions (bounce or wrap around)
        """
        # TODO: Update each dynamic obstacle's position
        for obstacle in self.dynamic_obstacles:
            obstacle["position"] += obstacle["velocity"] * dt

            # TODO: Handle boundaries (bounce back)
            x, y = obstacle["position"]
            vx, vy = obstacle["velocity"]

            # Bounce off walls
            if x < -self.width/2 or x > self.width/2:
                obstacle["velocity"][0] = -vx  # Reverse x velocity
                obstacle["position"][0] = np.clip(x, -self.width/2, self.width/2)
            if y < -self.height/2 or y > self.height/2:
                obstacle["velocity"][1] = -vy  # Reverse y velocity
                obstacle["position"][1] = np.clip(y, -self.height/2, self.height/2)

    def get_all_obstacles(self, time: float = 0.0) -> np.ndarray:
        """
        Get all obstacle positions at a given time.

        TODO: Combine static and dynamic obstacles
        TODO: For dynamic obstacles, predict position at time
        """
        obstacles = []

        # TODO: Add static obstacles
        for obs in self.static_obstacles:
            obstacles.append(obs["position"])

        # TODO: Add dynamic obstacles (predict position at time)
        for obs in self.dynamic_obstacles:
            predicted_pos = obs["position"] + obs["velocity"] * time
            obstacles.append(predicted_pos)

        return np.array(obstacles) if obstacles else np.array([]).reshape(0, 2)

# Test the dynamic grid map
dynamic_map = DynamicGridMap(width=50.0, height=50.0, resolution=0.5)

# Add static obstacles
dynamic_map.add_static_obstacle(np.array([10.0, 10.0]), size=2.0)
dynamic_map.add_static_obstacle(np.array([-10.0, -10.0]), size=1.5)

# Add dynamic obstacles
dynamic_map.add_dynamic_obstacle(
    position=np.array([0.0, 0.0]),
    velocity=np.array([2.0, 1.0]),  # Moving at 2 m/s in x, 1 m/s in y
    size=1.5
)

print(f"Static obstacles: {len(dynamic_map.static_obstacles)}")
print(f"Dynamic obstacles: {len(dynamic_map.dynamic_obstacles)}")
print(f"All obstacles at t=0: {len(dynamic_map.get_all_obstacles(0.0))}")
print(f"All obstacles at t=5: {len(dynamic_map.get_all_obstacles(5.0))}")

## Step 2: Using Filters to Estimate Object Positions

Filters help us estimate the true position and velocity of dynamic obstacles from noisy measurements.

**TODO**: Implement object tracking using filters!

In [None]:
class ObjectTracker:
    """
    Tracks a dynamic object using a filter.

    TODO: Fill in the implementation!
    """

    def __init__(self, initial_position: np.ndarray, filter_type: str = "kalman"):
        """
        Initialize object tracker.

        TODO: Create filter (KalmanFilter or ParticleFilter)
        """
        self.filter_type = filter_type

        # TODO: Initialize filter based on type
        if filter_type == "kalman":
            # Kalman filter for [x, y, vx, vy]
            initial_state = np.array([initial_position[0], initial_position[1], 0.0, 0.0])
            initial_cov = np.eye(4) * 1.0
            process_noise = np.eye(4) * 0.1
            measurement_noise = np.eye(2) * 0.5  # Position measurement noise

            self.filter = KalmanFilter(
                initial_state=initial_state,
                initial_covariance=initial_cov,
                process_noise=process_noise,
                measurement_noise=measurement_noise,
            )
        else:  # particle
            # Particle filter for [x, y]
            initial_cov = np.eye(2) * 1.0
            self.filter = ParticleFilter(
                initial_state=initial_position,
                initial_covariance=initial_cov,
                num_particles=100,
            )

    def predict(self, dt: float):
        """
        Predict object state forward.

        TODO: Call filter.predict()
        """
        # TODO: Predict forward
        self.filter.predict(dt)

    def update(self, measurement: np.ndarray):
        """
        Update with new measurement.

        TODO: Call filter.update()
        """
        # TODO: Update with measurement
        if self.filter_type == "kalman":
            # Extract position from measurement
            self.filter.update(measurement[:2])
        else:
            self.filter.update(measurement)

    def get_position(self) -> np.ndarray:
        """Get estimated position."""
        state = self.filter.get_state()
        return state[:2]  # First two elements are position

    def get_velocity(self) -> np.ndarray:
        """Get estimated velocity."""
        if self.filter_type == "kalman":
            state = self.filter.get_state()
            return state[2:4]  # vx, vy
        else:
            # Particle filter doesn't track velocity directly
            # Could estimate from position history
            return np.array([0.0, 0.0])

# Test object tracking
tracker = ObjectTracker(np.array([0.0, 0.0]), filter_type="kalman")

# Simulate tracking a moving object
true_positions = []
estimated_positions = []

true_pos = np.array([0.0, 0.0])
true_vel = np.array([1.0, 0.5])

for step in range(10):
    # True position (unknown to us)
    true_pos += true_vel * 0.1
    true_positions.append(true_pos.copy())

    # Noisy measurement
    measurement = true_pos + np.random.normal(0, 0.3, 2)

    # Update tracker
    tracker.update(measurement)
    tracker.predict(0.1)

    estimated_positions.append(tracker.get_position())

true_positions = np.array(true_positions)
estimated_positions = np.array(estimated_positions)

print(f"Final true position: {true_positions[-1]}")
print(f"Final estimated position: {estimated_positions[-1]}")
print(f"Estimation error: {np.linalg.norm(true_positions[-1] - estimated_positions[-1]):.3f}m")

## Step 3: Costmap with Relative Velocity Obstacles

For dynamic obstacles, we need to inflate based on relative velocity (time to collision).

**TODO**: Create a costmap that accounts for relative velocity!

In [None]:
class DynamicCostmap(GridCostmap):
    """
    Costmap that accounts for relative velocity of obstacles.

    TODO: Fill in the implementation!
    """

    def __init__(self, *args, **kwargs):
        """
        Initialize dynamic costmap.

        TODO: Call super().__init__()
        TODO: Store time_horizon for collision prediction
        """
        super().__init__(*args, **kwargs)
        self.time_horizon = kwargs.get("time_horizon", 2.0)  # Look ahead 2 seconds
        self.object_trackers = {}  # Dictionary of {object_id: ObjectTracker}

    def add_tracked_object(self, object_id: str, tracker: ObjectTracker):
        """
        Add a tracked object.

        TODO: Store tracker
        """
        self.object_trackers[object_id] = tracker

    def update_with_dynamic_obstacles(self, car_state: CarState, dt: float):
        """
        Update costmap accounting for relative velocity.

        TODO:
        1. Clear costmap
        2. For each tracked object:
           - Get estimated position and velocity
           - Calculate relative velocity (object_vel - car_vel)
           - Predict future positions over time_horizon
           - Inflate costmap around predicted positions
        """
        # TODO: Clear costmap
        self.costmap.fill(0.0)

        np.array([car_state.velocity * np.cos(car_state.heading),
                           car_state.velocity * np.sin(car_state.heading)])

        # TODO: Process each tracked object
        for obj_id, tracker in self.object_trackers.items():
            tracker.get_position()
            tracker.get_velocity()

            # TODO: Calculate relative velocity

            # TODO: Predict future positions
            for t in np.arange(0, self.time_horizon, 0.1):
                future_pos = ...  # obj_pos + rel_vel * t

                # TODO: Mark as occupied in costmap
                # Use self._world_to_grid() and set costmap value
                row, col = self._world_to_grid(future_pos, car_state)
                if 0 <= row < self.height_pixels and 0 <= col < self.width_pixels:
                    # Higher cost for closer time (more urgent)
                    urgency = 1.0 - (t / self.time_horizon)
                    self.costmap[row, col] = max(self.costmap[row, col], urgency)

        # TODO: Inflate obstacles
        from simple_autonomous_car.costmap.inflation import inflate_obstacles
        self.costmap = inflate_obstacles(self.costmap, self.inflation_radius, self.resolution)

# Test dynamic costmap
dynamic_costmap = DynamicCostmap(
    width=40.0,
    height=40.0,
    resolution=0.5,
    time_horizon=2.0,
)

# Add a tracked object
tracker = ObjectTracker(np.array([10.0, 10.0]), filter_type="kalman")
tracker.filter.state[2:4] = np.array([2.0, 1.0])  # Set velocity
dynamic_costmap.add_tracked_object("obstacle_1", tracker)

car_state = CarState(x=0.0, y=0.0, heading=0.0, velocity=5.0)
dynamic_costmap.update_with_dynamic_obstacles(car_state, dt=0.1)

print("Costmap updated with dynamic obstacles")
print(f"Max cost: {np.max(dynamic_costmap.costmap):.3f}")

## Step 4: Replanning Planner for Dynamic Environments

A planner that replans at regular intervals to adapt to moving obstacles.

**TODO**: Implement a replanning planner!

In [None]:
class ReplanningPlanner(BasePlanner):
    """
    Planner that replans at regular intervals for dynamic environments.

    TODO: Fill in the implementation!
    """

    def __init__(self, base_planner: BasePlanner, replan_interval: float = 1.0, name: str = "replanning_planner"):
        """
        Initialize replanning planner.

        TODO: Store base_planner and replan_interval
        TODO: Initialize last_replan_time and current_plan
        """
        super().__init__(name=name)
        self.base_planner = base_planner
        self.replan_interval = replan_interval
        self.last_replan_time = -float('inf')
        self.current_plan = np.array([]).reshape(0, 2)

    def plan(
        self,
        car_state: CarState,
        perception_data: Optional[dict] = None,
        costmap: Optional["BaseCostmap"] = None,
        goal: Optional[np.ndarray] = None,
        current_time: float = 0.0,
    ) -> np.ndarray:
        """
        Plan path, replanning if needed.

        TODO:
        1. Check if enough time has passed since last replan
        2. If yes, call base_planner.plan() to get new plan
        3. Update last_replan_time and current_plan
        4. Return current_plan
        """
        # TODO: Check if replanning is needed
        time_since_replan = current_time - self.last_replan_time

        if time_since_replan >= self.replan_interval:
            # TODO: Replan using base planner
            new_plan = self.base_planner.plan(
                car_state=car_state,
                perception_data=perception_data,
                costmap=costmap,
                goal=goal,
            )

            # TODO: Update plan and time
            self.current_plan = new_plan
            self.last_replan_time = current_time

        return self.current_plan

# Test replanning planner
grid_map = GridMap.create_random_map(width=30.0, height=30.0, num_obstacles=5, seed=42)
base_planner = GoalPlanner(grid_map=grid_map, resolution=0.5)
replanning_planner = ReplanningPlanner(base_planner, replan_interval=1.0)

car_state = CarState(x=-10.0, y=-10.0, heading=0.0)
goal = np.array([10.0, 10.0])

# First plan
plan1 = replanning_planner.plan(car_state, goal=goal, current_time=0.0)
print(f"Plan at t=0.0: {len(plan1)} waypoints")

# Should not replan immediately
plan2 = replanning_planner.plan(car_state, goal=goal, current_time=0.5)
print(f"Plan at t=0.5 (no replan): {len(plan2)} waypoints")

# Should replan after interval
plan3 = replanning_planner.plan(car_state, goal=goal, current_time=1.5)
print(f"Plan at t=1.5 (replan): {len(plan3)} waypoints")

## Step 5: Controller Adapted for Dynamic Planning

A controller that can handle frequent replanning and adapts to changing plans.

**TODO**: Research and implement a controller that works well with dynamic replanning!

In [None]:
class DynamicController(BaseController):
    """
    Controller adapted for dynamic environments with frequent replanning.

    This controller:
    - Uses shorter lookahead when plan changes frequently
    - Adapts velocity based on plan stability
    - Handles plan discontinuities smoothly

    TODO: Fill in the implementation!
    """

    def __init__(
        self,
        base_lookahead: float = 8.0,
        min_lookahead: float = 3.0,
        target_velocity: float = 8.0,
        name: str = "dynamic_controller",
    ):
        """
        Initialize dynamic controller.

        TODO: Store parameters and initialize plan history
        """
        super().__init__(name=name)
        self.base_lookahead = base_lookahead
        self.min_lookahead = min_lookahead
        self.target_velocity = target_velocity
        self.previous_plan = None

    def _plan_stability(self, current_plan: np.ndarray) -> float:
        """
        Calculate plan stability (0.0 = completely different, 1.0 = identical).

        TODO: Compare current_plan with previous_plan
        """
        if self.previous_plan is None or len(current_plan) == 0:
            return 0.5  # Neutral stability

        # TODO: Compare plans (e.g., distance between first few waypoints)
        n_compare = min(5, len(current_plan), len(self.previous_plan))
        if n_compare == 0:
            return 0.0

        distances = np.linalg.norm(
            current_plan[:n_compare] - self.previous_plan[:n_compare],
            axis=1
        )

        # Stability is inverse of average distance (normalized)
        avg_distance = np.mean(distances)
        stability = np.exp(-avg_distance / 5.0)  # Decay with distance

        return stability

    def compute_control(
        self,
        car_state: CarState,
        perception_data: Optional[Dict[str, PerceptionPoints]] = None,
        costmap: Optional["BaseCostmap"] = None,
        plan: Optional[np.ndarray] = None,
        dt: float = 0.1,
    ) -> Dict[str, float]:
        """
        Compute control adapted for dynamic planning.

        TODO:
        1. Calculate plan stability
        2. Adapt lookahead distance based on stability (shorter if unstable)
        3. Adapt velocity based on stability (slower if unstable)
        4. Use Pure Pursuit-like logic with adaptive parameters
        5. Store current plan as previous_plan
        """
        if plan is None or len(plan) == 0:
            return {"acceleration": 0.0, "steering_rate": 0.0}

        # TODO: Calculate plan stability
        self._plan_stability(plan)

        # TODO: Adapt lookahead based on stability
        lookahead = ...  # base_lookahead * stability + min_lookahead * (1 - stability)

        # TODO: Adapt velocity based on stability
        target_vel = ...  # target_velocity * stability

        # TODO: Find lookahead point on plan
        car_pos = car_state.position()
        distances = np.linalg.norm(plan - car_pos, axis=1)

        # Find point at lookahead distance
        lookahead_idx = None
        for i, dist in enumerate(distances):
            if dist >= lookahead:
                lookahead_idx = i
                break

        if lookahead_idx is None:
            lookahead_idx = len(plan) - 1

        lookahead_point = plan[lookahead_idx]

        # TODO: Calculate steering (similar to Pure Pursuit)
        direction = lookahead_point - car_pos
        desired_heading = np.arctan2(direction[1], direction[0])
        heading_error = desired_heading - car_state.heading
        heading_error = np.arctan2(np.sin(heading_error), np.cos(heading_error))

        steering_rate = 2.0 * heading_error  # Proportional control

        # TODO: Calculate velocity control
        velocity_error = target_vel - car_state.velocity
        acceleration = 0.5 * velocity_error

        # TODO: Store plan for next iteration
        self.previous_plan = plan.copy()

        return {
            "acceleration": acceleration,
            "steering_rate": steering_rate
        }

# Test dynamic controller
controller = DynamicController(base_lookahead=10.0, min_lookahead=3.0, target_velocity=8.0)

car_state = CarState(x=0.0, y=0.0, heading=0.0, velocity=5.0)
plan1 = np.array([[5, 0], [10, 0], [15, 0]])
plan2 = np.array([[5, 2], [10, 3], [15, 4]])  # Different plan

control1 = controller.compute_control(car_state, plan=plan1)
print(f"Control 1 (first plan): {control1}")

control2 = controller.compute_control(car_state, plan=plan2)
print(f"Control 2 (changed plan): {control2}")
print("Notice how controller adapts to plan changes!")

## Step 6: Complete Dynamic Obstacle System

Now let's put it all together!

In [None]:
def simulate_dynamic_environment():
    """Simulate navigation in dynamic environment."""
    # Create dynamic map
    dynamic_map = DynamicGridMap(width=50.0, height=50.0)

    # Add obstacles
    dynamic_map.add_static_obstacle(np.array([15.0, 15.0]), size=2.0)
    dynamic_map.add_dynamic_obstacle(
        position=np.array([0.0, 10.0]),
        velocity=np.array([1.0, 0.0]),
        size=1.5
    )

    # Create trackers for dynamic obstacles
    trackers = {}
    for i, obs in enumerate(dynamic_map.dynamic_obstacles):
        tracker = ObjectTracker(obs["position"], filter_type="kalman")
        tracker.filter.state[2:4] = obs["velocity"]  # Set velocity
        trackers[f"obs_{i}"] = tracker

    # Create costmap
    costmap = DynamicCostmap(width=40.0, height=40.0, resolution=0.5, time_horizon=2.0)
    for obj_id, tracker in trackers.items():
        costmap.add_tracked_object(obj_id, tracker)

    # Create planner
    grid_map = GridMap(width=50.0, height=50.0, obstacles=dynamic_map.get_all_obstacles(0.0))
    base_planner = GoalPlanner(grid_map=grid_map, resolution=0.5)
    replanning_planner = ReplanningPlanner(base_planner, replan_interval=0.5)

    # Create controller
    controller = DynamicController(base_lookahead=8.0, min_lookahead=3.0)

    # Create car
    car = Car(initial_state=CarState(x=-20.0, y=-20.0, heading=0.0, velocity=5.0))
    goal = np.array([20.0, 20.0])

    # Simulate
    dt = 0.1
    trajectory = [car.state.position().copy()]

    for step in range(100):
        t = step * dt

        # Update dynamic obstacles
        dynamic_map.update_dynamic_obstacles(dt)

        # Update trackers (simulate measurements with noise)
        for obj_id, tracker in trackers.items():
            # Get true position from dynamic_map
            obs_idx = int(obj_id.split("_")[1])
            true_pos = dynamic_map.dynamic_obstacles[obs_idx]["position"]

            # Noisy measurement
            measurement = true_pos + np.random.normal(0, 0.2, 2)
            tracker.update(measurement)
            tracker.predict(dt)

        # Update costmap
        costmap.update_with_dynamic_obstacles(car.state, dt)

        # Replan if needed
        # Update grid_map with current obstacle positions
        current_obstacles = dynamic_map.get_all_obstacles(t)
        grid_map = GridMap(width=50.0, height=50.0, obstacles=current_obstacles)
        base_planner.grid_map = grid_map

        plan = replanning_planner.plan(car.state, costmap=costmap, goal=goal, current_time=t)

        # Compute control
        control = controller.compute_control(car.state, costmap=costmap, plan=plan, dt=dt)

        # Update car
        car.update(dt, acceleration=control["acceleration"], steering_rate=control["steering_rate"])
        trajectory.append(car.state.position().copy())

        # Check goal
        if np.linalg.norm(car.state.position() - goal) < 2.0:
            print(f"Goal reached at step {step}!")
            break

    # Visualize
    fig, ax = plt.subplots(figsize=(12, 12))

    # Plot static obstacles
    for obs in dynamic_map.static_obstacles:
        circle = plt.Circle(obs["position"], obs["size"], color='red', alpha=0.5)
        ax.add_patch(circle)

    # Plot dynamic obstacles (final positions)
    for obs in dynamic_map.dynamic_obstacles:
        circle = plt.Circle(obs["position"], obs["size"], color='orange', alpha=0.5)
        ax.add_patch(circle)

    # Plot trajectory
    trajectory = np.array(trajectory)
    ax.plot(trajectory[:, 0], trajectory[:, 1], 'g-', linewidth=2, label='Trajectory')

    # Plot goal
    ax.plot(goal[0], goal[1], 'ro', markersize=12, label='Goal', markeredgecolor='black', markeredgewidth=2)

    ax.set_xlim(-25, 25)
    ax.set_ylim(-25, 25)
    ax.set_aspect('equal')
    ax.grid(True, alpha=0.3)
    ax.legend()
    ax.set_title('Dynamic Obstacle Navigation')
    plt.show()

print("Running dynamic obstacle simulation...")
simulate_dynamic_environment()

## Summary

You've learned how to:

1. **Create dynamic grid maps** with static and moving obstacles
2. **Use filters** (Kalman, Particle) to track object positions and velocities
3. **Build dynamic costmaps** that account for relative velocity and time-to-collision
4. **Implement replanning planners** that adapt to changing environments
5. **Design controllers** that handle frequent plan changes smoothly

### Key Concepts

- **Static obstacles**: Fixed position obstacles
- **Dynamic obstacles**: Moving obstacles with velocity
- **Filtering**: Estimating true state from noisy measurements
- **Relative velocity**: Object velocity relative to car (for collision prediction)
- **Replanning**: Updating plan at regular intervals
- **Plan stability**: How much the plan changes between updates

### Next Steps

- Experiment with different filter types (Kalman vs Particle)
- Tune replanning frequency vs computation cost
- Implement more sophisticated collision prediction
- Add obstacle velocity estimation from perception data
- Research MPC (Model Predictive Control) for dynamic environments