In [None]:
import tensorflow as tf
import numpy as np
from collections import deque

def create_drone_dqn(action_size=6):
    # 3D grid input
    grid_input = tf.keras.Input(shape=(20, 20, 20, 1), name='grid_input')
    x1 = tf.keras.layers.Conv3D(16, 3, activation='relu', padding='same')(grid_input)
    x1 = tf.keras.layers.MaxPooling3D(2)(x1)
    x1 = tf.keras.layers.Conv3D(32, 3, activation='relu', padding='same')(x1)
    x1 = tf.keras.layers.MaxPooling3D(2)(x1)
    x1 = tf.keras.layers.Flatten()(x1)

    # Pose + goal input
    pose_input = tf.keras.Input(shape=(6,), name='pose_input')
    x2 = tf.keras.layers.Dense(64, activation='relu')(pose_input)
    x2 = tf.keras.layers.Dense(64, activation='relu')(x2)

    # Combine
    x = tf.keras.layers.Concatenate()([x1, x2])
    x = tf.keras.layers.Dense(256, activation='relu')(x)
    output = tf.keras.layers.Dense(action_size, activation='linear')(x)

    model = tf.keras.Model(inputs=[grid_input, pose_input], outputs=output)
    model.compile(optimizer=tf.keras.optimizers.Adam(1e-4), loss='mse')
    return model


In [None]:
ACTION_SIZE = 6  # Updated
EPISODES = 1000
EPSILON = 1.0
EPSILON_DECAY = 0.995
EPSILON_MIN = 0.05
GAMMA = 0.99
BATCH_SIZE = 32
MEMORY_SIZE = 10000
TARGET_UPDATE_FREQ = 10

policy_net = create_drone_dqn(ACTION_SIZE)
target_net = create_drone_dqn(ACTION_SIZE)
target_net.set_weights(policy_net.get_weights())

dummy_grid = np.zeros((1, 20, 20, 20, 1), dtype=np.float32)
dummy_pose = np.zeros((1, 6), dtype=np.float32)

policy_net.predict([dummy_grid, dummy_pose])
target_net.predict([dummy_grid, dummy_pose])
memory = deque(maxlen=MEMORY_SIZE)


In [None]:
def get_local_map_3d(global_map, robot_pose, size=20):
    half = size // 2
    x, y, z = robot_pose
    local_map = np.full((size, size, size), 1, dtype=np.int8)  # pad with 1s (obstacles)

    for i in range(-half, half):
        for j in range(-half, half):
            for k in range(-half, half):
                gi, gj, gk = x + i, y + j, z + k
                li, lj, lk = i + half, j + half, k + half  # local map indices

                if (0 <= gi < global_map.shape[0] and
                    0 <= gj < global_map.shape[1] and
                    0 <= gk < global_map.shape[2]):
                    local_map[li, lj, lk] = global_map[gi, gj, gk]
                # else: keep as 1 (padding/obstacle)
    
    return local_map
import random

def safe_move_3d(robot_pose, action_map, action, shape):
    """
    shape: (rows, cols, depth) of the 3D map
    """
    max_x = shape
    max_y = shape
    max_z = shape
    flag = True

    while flag:
        dx, dy, dz = action_map[action]
        new_x = robot_pose[0] + dx
        new_y = robot_pose[1] + dy
        new_z = robot_pose[2] + dz

        if (0 <= new_x < max_x) and (0 <= new_y < max_y) and (0 <= new_z < max_z):
            flag = False
            return [new_x, new_y, new_z]
        else:
            action = random.randint(0, len(action_map) - 1)


In [None]:
import numpy as np

def compute_reward_3d(robot_pose, goal_pose, action, collided, reached_goal,
                      previous_distance, current_distance,
                      explored_cells, visited_set, occ_map, recent_pos):
    reward = 0.0
    rx, ry, rz = robot_pose
    gx, gy, gz = goal_pose
    dx, dy, dz = action

    # Penalize repeated positions (loops)
    if recent_pos.count(tuple(robot_pose)) >= 1:
        reward -= 20 * recent_pos.count(tuple(robot_pose))

    # Axis alignment and directed motion
    if rx == gx:
        reward += 200
        y_diff = gy - ry
        if (y_diff > 0 and dy == 1) or (y_diff < 0 and dy == -1):
            reward += 200
        else:
            reward -= 100

        z_diff = gz - rz
        if (z_diff > 0 and dz == 1) or (z_diff < 0 and dz == -1):
            reward += 200
        else:
            reward -= 100

    if ry == gy:
        reward += 200
        x_diff = gx - rx
        if (x_diff > 0 and dx == 1) or (x_diff < 0 and dx == -1):
            reward += 200
        else:
            reward -= 100

        z_diff = gz - rz
        if (z_diff > 0 and dz == 1) or (z_diff < 0 and dz == -1):
            reward += 200
        else:
            reward -= 100

    if rz == gz:
        reward += 200
        x_diff = gx - rx
        if (x_diff > 0 and dx == 1) or (x_diff < 0 and dx == -1):
            reward += 200
        else:
            reward -= 100

        y_diff = gy - ry
        if (y_diff > 0 and dy == 1) or (y_diff < 0 and dy == -1):
            reward += 200
        else:
            reward -= 100

    # Terminal conditions
    if collided:
        return reward - 1000.0

    if reached_goal:
        return reward + 1000.0

    # Boundary penalties and center reward (assuming size 60x60x60)
    for i, val in enumerate([rx, ry, rz]):
        if val < 3 or val > 56:
            reward -= 20
        else:
            reward += 10

    # Distance shaping
    if current_distance < previous_distance:
        reward += 20.0
    else:
        reward -= 20.0

    # Time penalty
    reward -= 5

    # Closer-to-goal axis-wise reward
    reward += (60 - abs(gx - rx)) + (60 - abs(gy - ry)) + (60 - abs(gz - rz))

    # Exploration bonus
    reward += 0.1 * explored_cells

    # Lookahead for obstacle in 3D
    is_clear = True
    for step in range(1, 11):
        nx = rx + dx * step
        ny = ry + dy * step
        nz = rz + dz * step

        if (0 <= nx < occ_map.shape[0] and
            0 <= ny < occ_map.shape[1] and
            0 <= nz < occ_map.shape[2]):
            if occ_map[nx, ny, nz] >= 1:
                reward -= 10 * (10 - step)
                is_clear = False
                break
        else:
            reward -= 5  # out of bounds
            break

    if is_clear:
        reward += 5

    return reward


In [None]:
def env_step_3d(state, action, goal, direction, huristic, explored, recent_pos):
    """
    Simulates one environment step in a 3D grid.

    Parameters:
        state          : global 3D occupancy map (np.ndarray of shape [X, Y, Z])
        action         : new robot position [x, y, z]
        goal           : goal position [x, y, z]
        direction      : movement direction (dx, dy, dz)
        huristic       : previous distance to goal
        explored       : set of visited positions
        recent_pos     : deque of recent positions

    Returns:
        next_state     : local 3D map (1, 20, 20, 20, 1)
        reward         : reward for the step
        done           : whether the episode should terminate
    """
    # --- Extract local 3D map ---
    next_map = get_local_map_3d(state, action, size=20)
    next_state = next_map.astype(np.float32).reshape(1, 20, 20, 20, 1)

    # --- Collision check ---
    x, y, z = action
    collided = state[x, y, z] >= 1

    # --- Goal check ---
    reached_goal = (x == goal[0]) and (y == goal[1]) and (z == goal[2])

    # --- Distance update ---
    previous_distance = huristic
    current_distance = np.linalg.norm(np.array(goal) - np.array(action))

    # --- Compute reward ---
    reward = compute_reward_3d(
        robot_pose=action,
        goal_pose=goal,
        action=direction,
        collided=collided,
        reached_goal=reached_goal,
        previous_distance=previous_distance,
        current_distance=current_distance,
        explored_cells=10,
        visited_set=explored,
        occ_map=state,
        recent_pos=recent_pos
    )

    # --- Terminate if goal reached or collided ---
    done = reached_goal or collided

    return next_state, reward, done


In [None]:
from collections import deque

MAX_STEPS_EP = 3600

global_map = np.zeros((60, 60, 60), dtype=np.int8)
global_map[10:15, 10:50,0:40] = 1
global_map[30:35, 5:55 ,0:20] = 1
global_map[41:42, 41:42 , 0:55] = 1

robot_goal_map = {
    0: ((5, 5, 5), (8, 55, 45)),
    1: ((5, 5, 5), (50, 3, 10)),
    2: ((5, 5, 5), (25, 25, 25)),
    3: ((8, 8, 8), (38, 42, 12)),
    4: ((5, 55, 5), (50, 20, 30)),
    5: ((25, 25, 25), (5, 5, 5)),
    6: ((8, 30, 10), (45, 10, 20)),
    7: ((55, 55, 55), (20, 10, 10)),
    8: ((50, 30, 10), (5, 30, 45)),
    9: ((3, 40, 50), (40, 3, 3)),
}


action_map = {
    0: (-1, 0, 0),  # up x
    1: (1, 0, 0),   # down x
    2: (0, -1, 0),  # left y
    3: (0, 1, 0),   # right y
    4: (0, 0, -1),  # down z
    5: (0, 0, 1),   # up z
}


for episode in range(EPISODES):
    # --- Reset environment ---
    #print(robot_pose)
    pair_id = episode // 100
    robot_pose, goal_pose = robot_goal_map[pair_id]
    print("pair id: " + str(pair_id) + " robo: " + str(robot_pose) + " goal: " + str(goal_pose))
    robot_pose = list(robot_pose)
    goal_pose = list(goal_pose)
    recent_pos = deque(maxlen=20)
    local_map = get_local_map_3d(global_map, robot_pose, size=20)
    state_grid = local_map.astype(np.float32).reshape(1, 20, 20, 20, 1)
    state_pose = np.array([[robot_pose[0], robot_pose[1], robot_pose[2],goal_pose[0], goal_pose[1], goal_pose[2]]], dtype=np.float32)
    done = False
    total_reward = 0
    #print("rbpose",robot_pose)
    step_count = 0
    while not done and step_count<=MAX_STEPS_EP:
        step_count += 1
        # --- Epsilon-greedy action selection ---
        # if np.random.rand() <= EPSILON:
        #     action = random.randint(0, ACTION_SIZE - 1)
        # else:
        #     q_vals = policy_net.predict([state_grid[np.newaxis, ...], state_pose[np.newaxis, ...]], verbose=0)
        #     action = np.argmax(q_vals[0])

        # --- Take action in environment ---
        #print("current", robot_pose)
        huristic = np.sqrt(np.square(goal_pose[0] - robot_pose[0]) + np.square(goal_pose[1] - robot_pose[1]))
        q_values = policy_net.predict([state_grid, state_pose], verbose=0)
        action = np.argmax(q_values[0])
        move = action_map[action]
       # print(move)
       # robot_pose[0] = robot_pose[0] + move[0]
       # robot_pose[1] = robot_pose[1] + move[1]
        robot_pose = safe_move_3d(robot_pose,action_map,action,60)
        recent_pos.append(tuple(robot_pose))
       # next_grid, reward, done = env_step(global_map, robot_pose,goal_pose,move,huristic,explored,recent_pos)  # Placeholder
       # next_pose = np.array([[robot_pose[0], robot_pose[1], goal_pose[0], goal_pose[1]]], dtype=np.float32)
        next_grid, reward, done = env_step_3d(global_map, robot_pose, goal_pose, move,huristic,10, recent_pos)
        next_pose = np.array([[robot_pose[0], robot_pose[1], robot_pose[2],goal_pose[0], goal_pose[1], goal_pose[2]]], dtype=np.float32)
        #print(robot_pose,reward)
        memory.append((state_grid, state_pose, action, reward, next_grid, next_pose, done))

        state_grid, state_pose = next_grid, next_pose
        total_reward += reward

        # --- Train if enough samples ---
        if len(memory) >= BATCH_SIZE:
            minibatch = random.sample(memory, BATCH_SIZE)

            grids = np.array([x[0] for x in minibatch])
            poses = np.array([x[1] for x in minibatch])
            actions = np.array([x[2] for x in minibatch])
            rewards = np.array([x[3] for x in minibatch])
            next_grids = np.array([x[4] for x in minibatch])
            next_poses = np.array([x[5] for x in minibatch])
            dones = np.array([x[6] for x in minibatch])

          #  print("grids shape:", grids.shape)   # Should be (batch_size, 20, 20, 1)
           # print("poses shape:", poses.shape)   # Should be (batch_size, 4)

            q_targets = policy_net.predict([grids.reshape(-1, 20, 20, 20, 1),poses.reshape(-1, 6)], verbose=0)
            q_next = target_net.predict([next_grids.reshape(-1, 20, 20, 20, 1),next_poses.reshape(-1, 6)], verbose=0)
            #policy_net.fit([grids.reshape(-1, 20, 20, 20, 1),poses.reshape(-1, 6)], q_targets, verbose=0)


           # q_targets = policy_net.predict([grids.reshape(-1,20,20,1), poses.reshape(-1,4)], verbose=0)
            #q_next = target_net.predict([next_grids.reshape(-1,20,20,1), next_poses.reshape(-1,4)], verbose=0)

            for i in range(BATCH_SIZE):
                target = rewards[i]
                if not dones[i]:
                    target += GAMMA * np.amax(q_next[i])
                q_targets[i][actions[i]] = target

            # --- Train policy network ---
            policy_net.fit([grids.reshape(-1, 20, 20, 20, 1),poses.reshape(-1, 6)], q_targets, verbose=0)

    # --- Update target network ---
    if episode % TARGET_UPDATE_FREQ == 0:
        target_net.set_weights(policy_net.get_weights())

    # --- Epsilon decay ---
    if EPSILON > EPSILON_MIN:
        EPSILON *= EPSILON_DECAY
    if episode % 50 == 0:
        policy_net.save(f"/storage/projects2/e19-4yp-g28-peraswarm-local-nav/checkpoint_ep_drone{episode}.h5")
        print(f" Saved checkpoint at episode {episode}")
    print(f"Episode {episode+1}/{EPISODES} - Total reward: {total_reward:.2f} - Epsilon: {EPSILON:.2f}")
    
policy_net.save("path_planing_model_3d_final.h5")
