In [1]:
!pip install pybullet

Collecting pybullet
  Downloading pybullet-3.2.6.tar.gz (80.5 MB)
     ---------------------------------------- 0.0/80.5 MB ? eta -:--:--
     ---------------------------------------- 0.0/80.5 MB ? eta -:--:--
     --------------------------------------- 0.0/80.5 MB 330.3 kB/s eta 0:04:04
     --------------------------------------- 0.1/80.5 MB 762.6 kB/s eta 0:01:46
     ---------------------------------------- 0.2/80.5 MB 1.3 MB/s eta 0:01:02
     ---------------------------------------- 0.4/80.5 MB 2.3 MB/s eta 0:00:35
     ---------------------------------------- 0.6/80.5 MB 3.0 MB/s eta 0:00:27
     ---------------------------------------- 0.8/80.5 MB 3.5 MB/s eta 0:00:23
      --------------------------------------- 1.5/80.5 MB 5.2 MB/s eta 0:00:16
     - -------------------------------------- 2.2/80.5 MB 7.4 MB/s eta 0:00:11
     - -------------------------------------- 2.3/80.5 MB 7.3 MB/s eta 0:00:11
     - -------------------------------------- 2.4/80.5 MB 6.4 MB/s eta 0:00:1

In [3]:
!pip install gym

Collecting gym
  Downloading gym-0.26.2.tar.gz (721 kB)
     ---------------------------------------- 0.0/721.7 kB ? eta -:--:--
     ---------------------------------------- 0.0/721.7 kB ? eta -:--:--
     -- ---------------------------------- 41.0/721.7 kB 960.0 kB/s eta 0:00:01
     ---- ---------------------------------- 81.9/721.7 kB 1.1 MB/s eta 0:00:01
     ----------- -------------------------- 215.0/721.7 kB 1.9 MB/s eta 0:00:01
     ------------------------ ------------- 460.8/721.7 kB 3.2 MB/s eta 0:00:01
     ------------------------------------ - 696.3/721.7 kB 4.0 MB/s eta 0:00:01
     -------------------------------------- 721.7/721.7 kB 3.8 MB/s eta 0:00:00
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with st

In [5]:
!pip install tensorflow

Collecting tensorflow
  Downloading tensorflow-2.16.1-cp311-cp311-win_amd64.whl.metadata (3.5 kB)
Collecting tensorflow-intel==2.16.1 (from tensorflow)
  Downloading tensorflow_intel-2.16.1-cp311-cp311-win_amd64.whl.metadata (5.0 kB)
Collecting absl-py>=1.0.0 (from tensorflow-intel==2.16.1->tensorflow)
  Downloading absl_py-2.1.0-py3-none-any.whl.metadata (2.3 kB)
Collecting astunparse>=1.6.0 (from tensorflow-intel==2.16.1->tensorflow)
  Downloading astunparse-1.6.3-py2.py3-none-any.whl.metadata (4.4 kB)
Collecting flatbuffers>=23.5.26 (from tensorflow-intel==2.16.1->tensorflow)
  Downloading flatbuffers-24.3.25-py2.py3-none-any.whl.metadata (850 bytes)
Collecting gast!=0.5.0,!=0.5.1,!=0.5.2,>=0.2.1 (from tensorflow-intel==2.16.1->tensorflow)
  Downloading gast-0.5.4-py3-none-any.whl.metadata (1.3 kB)
Collecting google-pasta>=0.1.1 (from tensorflow-intel==2.16.1->tensorflow)
  Downloading google_pasta-0.2.0-py3-none-any.whl.metadata (814 bytes)
Collecting h5py>=3.10.0 (from tensorflow-

In [6]:
import gym
import pybullet as p
import pybullet_data
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from collections import deque
import random
import matplotlib.pyplot as plt

In [8]:
# Actor (정책 네트워크) 정의
def build_actor(state_shape, action_space):
    model = tf.keras.models.Sequential([
        layers.Input(shape=state_shape),
        layers.Dense(24, activation='relu'),
        layers.Dense(24, activation='relu'),
        layers.Dense(action_space, activation='softmax')
    ])
    return model

# Critic (가치 네트워크) 정의
def build_critic(state_shape):
    model = tf.keras.models.Sequential([
        layers.Input(shape=state_shape),
        layers.Dense(24, activation='relu'),
        layers.Dense(24, activation='relu'),
        layers.Dense(1, activation='linear')
    ])
    return model

# 경험 재생 버퍼
class Memory:
    def __init__(self):
        self.states = deque(maxlen=memory_size)
        self.actions = deque(maxlen=memory_size)
        self.rewards = deque(maxlen=memory_size)
        self.next_states = deque(maxlen=memory_size)
        self.dones = deque(maxlen=memory_size)
        self.log_probs = deque(maxlen=memory_size)

    def store(self, state, action, reward, next_state, done, log_prob):
        self.states.append(state)
        self.actions.append(action)
        self.rewards.append(reward)
        self.next_states.append(next_state)
        self.dones.append(done)
        self.log_probs.append(log_prob)

    def clear(self):
        self.states.clear()
        self.actions.clear()
        self.rewards.clear()
        self.next_states.clear()
        self.dones.clear()
        self.log_probs.clear()

# 어드밴티지 및 리턴 계산
def compute_advantages(rewards, values, next_values, dones, gamma, lambda_gae):
    advantages = []
    returns = []
    gae = 0
    for i in reversed(range(len(rewards))):
        delta = rewards[i] + gamma * next_values[i] * (1 - dones[i]) - values[i]
        gae = delta + gamma * lambda_gae * gae
        advantages.insert(0, gae)
        returns.insert(0, gae + values[i])
    return np.array(advantages), np.array(returns)

# 행동 선택
def choose_action(state, actor):
    state = state.reshape([1, *state.shape])
    action_probs = actor.predict(state).flatten()
    action = np.random.choice(len(action_probs), p=action_probs)
    log_prob = np.log(action_probs[action])
    return action, log_prob

# 학습 과정
def train(memory, actor, critic, actor_optimizer, critic_optimizer, batch_size, epochs, gamma, lambda_gae, epsilon):
    states = np.array(memory.states)
    actions = np.array(memory.actions)
    rewards = np.array(memory.rewards)
    next_states = np.array(memory.next_states)
    dones = np.array(memory.dones)
    old_log_probs = np.array(memory.log_probs)

    values = critic.predict(states)
    next_values = critic.predict(next_states)
    advantages, returns = compute_advantages(rewards, values, next_values, dones, gamma, lambda_gae)

    for epoch in range(epochs):
        indices = np.arange(len(states))
        np.random.shuffle(indices)
        for i in range(0, len(states), batch_size):
            idx = indices[i:i + batch_size]
            batch_states = states[idx]
            batch_actions = actions[idx]
            batch_advantages = advantages[idx]
            batch_returns = returns[idx]
            batch_old_log_probs = old_log_probs[idx]

            with tf.GradientTape() as tape:
                action_probs = actor(batch_states, training=True)
                action_log_probs = tf.math.log(tf.reduce_sum(action_probs * tf.one_hot(batch_actions, actor.output_shape[-1]), axis=1))
                ratios = tf.exp(action_log_probs - batch_old_log_probs)
                clipped_ratios = tf.clip_by_value(ratios, 1 - epsilon, 1 + epsilon)
                actor_loss = -tf.reduce_mean(tf.minimum(ratios * batch_advantages, clipped_ratios * batch_advantages))

            actor_grads = tape.gradient(actor_loss, actor.trainable_variables)
            actor_optimizer.apply_gradients(zip(actor_grads, actor.trainable_variables))

            critic_loss = critic.train_on_batch(batch_states, batch_returns)

    memory.clear()

# 환경 설정
class EndoscopeEnv(gym.Env):
    def __init__(self):
        self.action_space = gym.spaces.Discrete(4)
        self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(6,), dtype=np.float32)  # 6개의 상태 변수
        self.client = p.connect(p.DIRECT)
        self.reset()

    def reset(self):
        p.resetSimulation(physicsClientId=self.client)
        p.setGravity(0, 0, -9.8, physicsClientId=self.client)
        p.setAdditionalSearchPath(pybullet_data.getDataPath())
        p.loadURDF("plane.urdf", physicsClientId=self.client)
        self.maze_size = 5
        self.wall_height = 0.5
        self.wall_thickness = 0.1
        self.agent_start_pos = [0.5, 0.5, 0.1]
        self.goal_pos = [4.5, 4.5, 0.1]
        self.walls = [
            [(0, 0), (self.maze_size, 0)],
            [(0, 0), (0, self.maze_size)],
            [(self.maze_size, 0), (self.maze_size, self.maze_size)],
            [(0, self.maze_size), (self.maze_size, self.maze_size)],
            [(1, 0), (1, 3)],
            [(3, 2), (3, 5)],
            [(2, 2), (4, 2)]
        ]
        for wall in self.walls:
            self.create_wall(wall[0], wall[1])
        self.agent = p.loadURDF("r2d2.urdf", self.agent_start_pos, physicsClientId=self.client)
        self.goal_visual = p.createVisualShape(p.GEOM_SPHERE, radius=0.2, rgbaColor=[1, 0, 0, 1])
        self.goal = p.createMultiBody(baseVisualShapeIndex=self.goal_visual, basePosition=self.goal_pos, physicsClientId=self.client)
        agent_pos = list(p.getBasePositionAndOrientation(self.agent, physicsClientId=self.client)[0])
        self.state = agent_pos + self.goal_pos
        return np.array(self.state)

    def create_wall(self, start_pos, end_pos):
        length = np.linalg.norm(np.array(end_pos) - np.array(start_pos))
        orientation = np.arctan2(end_pos[1] - start_pos[1], end_pos[0] - start_pos[0])
        mid_pos = [(start_pos[0] + end_pos[0]) / 2, (start_pos[1] + end_pos[1]) / 2, self.wall_height / 2]
        wall_visual = p.createVisualShape(p.GEOM_BOX, halfExtents=[length / 2, self.wall_thickness / 2, self.wall_height / 2])
        wall_collision = p.createCollisionShape(p.GEOM_BOX, halfExtents=[length / 2, self.wall_thickness / 2, self.wall_height / 2])
        p.createMultiBody(baseVisualShapeIndex=wall_visual, baseCollisionShapeIndex=wall_collision, basePosition=mid_pos,
                          baseOrientation=p.getQuaternionFromEuler([0, 0, orientation]), physicsClientId=self.client)

    def step(self, action):
        if action == 0:  # forward
            p.applyExternalForce(self.agent, -1, [5, 0, 0], [0, 0, 0], p.WORLD_FRAME, physicsClientId=self.client)
        elif action == 1:  # backward
            p.applyExternalForce(self.agent, -1, [-5, 0, 0], [0, 0, 0], p.WORLD_FRAME, physicsClientId=self.client)
        elif action == 2:  # left
            p.applyExternalForce(self.agent, -1, [0, 5, 0], [0, 0, 0], p.WORLD_FRAME, physicsClientId=self.client)
        elif action == 3:  # right
            p.applyExternalForce(self.agent, -1, [0, -5, 0], [0, 0, 0], p.WORLD_FRAME, physicsClientId=self.client)

        p.stepSimulation(physicsClientId=self.client)

        agent_pos = list(p.getBasePositionAndOrientation(self.agent, physicsClientId=self.client)[0])
        next_state = agent_pos + self.goal_pos
        distance = np.linalg.norm(np.array(agent_pos[:2]) - np.array(self.goal_pos[:2]))

        if distance < 0.1:
            reward = 100
            done = True
        else:
            reward = -distance
            done = False

        return np.array(next_state), reward, done, {}

    def render(self, mode='rgb_array'):
        view_matrix = p.computeViewMatrixFromYawPitchRoll(cameraTargetPosition=[2.5, 2.5, 0],
                                                          distance=5,
                                                          yaw=50,
                                                          pitch=-35,
                                                          roll=0,
                                                          upAxisIndex=2,
                                                          physicsClientId=self.client)
        proj_matrix = p.computeProjectionMatrixFOV(fov=60,
                                                   aspect=1.0,
                                                   nearVal=0.1,
                                                   farVal=100.0,
                                                   physicsClientId=self.client)
        width, height, rgbImg, depthImg, segImg = p.getCameraImage(width=640,
                                                                   height=480,
                                                                   viewMatrix=view_matrix,
                                                                   projectionMatrix=proj_matrix,
                                                                   physicsClientId=self.client)
        return rgbImg

    def close(self):
        p.disconnect(self.client)

In [13]:
# PPO 하이퍼파라미터 설정
gamma = 0.99
lambda_gae = 0.95
epsilon = 0.2
learning_rate_actor = 0.0003
learning_rate_critic = 0.001
epochs = 10
batch_size = 64
memory_size = 2048
episodes = 100
max_steps = 1000  # 에피소드당 최대 스텝 수

In [14]:
# PPO 학습 및 실행
env = EndoscopeEnv()
state_shape = env.observation_space.shape
action_space = env.action_space.n

actor = build_actor(state_shape, action_space)
critic = build_critic(state_shape)

# 모델 컴파일 추가
actor.compile(optimizer=tf.keras.optimizers.Adam(learning_rate_actor))
critic.compile(optimizer=tf.keras.optimizers.Adam(learning_rate_critic), loss='mse')

actor_optimizer = tf.keras.optimizers.Adam(learning_rate_actor)
critic_optimizer = tf.keras.optimizers.Adam(learning_rate_critic)

memory = Memory()

# 이동 경로 시각화를 위한 데이터 저장
episode_paths = []

for episode in range(episodes):
    state = env.reset()
    done = False
    score = 0
    path = [state[:2]]
    step_count = 0

    while not done and step_count < max_steps:  # 스텝 수 제한 추가
        action, log_prob = choose_action(state, actor)
        next_state, reward, done, _ = env.step(action)
        memory.store(state, action, reward, next_state, done, log_prob)
        state = next_state
        score += reward
        path.append(state[:2])
        step_count += 1  # 스텝 수 증가

    print(f"Episode: {episode}, Score: {score}, Steps: {step_count}")  # 스텝 수 출력
    train(memory, actor, critic, actor_optimizer, critic_optimizer, batch_size, epochs, gamma, lambda_gae, epsilon)
    episode_paths.append(path)

env.close()

# Matplotlib을 사용한 시각화
def plot_paths(paths, title):
    plt.figure(figsize=(10, 10))
    for path in paths:
        path = np.array(path)
        plt.plot(path[:, 0], path[:, 1], marker='o')
    plt.title(title)
    plt.xlabel('X')
    plt.ylabel('Y')
    plt.grid()
    plt.show()

# 최근 5개의 에피소드 경로 시각화
plot_paths(episode_paths[-5:], 'Endoscope Path - Last 5 Episodes')


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11

# 미로 확인

In [None]:
# PyBullet 초기화
p.connect(p.DIRECT)
p.setAdditionalSearchPath(pybullet_data.getDataPath())

# 중력 설정
p.setGravity(0, 0, -9.8)

# 평면 생성
plane_id = p.loadURDF("plane.urdf")

# 장기(maze) 벽 생성 함수
def create_wall(start_pos, end_pos):
    length = np.linalg.norm(np.array(end_pos) - np.array(start_pos))
    orientation = np.arctan2(end_pos[1] - start_pos[1], end_pos[0] - start_pos[0])
    mid_pos = [(start_pos[0] + end_pos[0]) / 2, (start_pos[1] + end_pos[1]) / 2, 0.25]
    wall_visual = p.createVisualShape(p.GEOM_BOX, halfExtents=[length / 2, 0.05, 0.25])
    wall_collision = p.createCollisionShape(p.GEOM_BOX, halfExtents=[length / 2, 0.05, 0.25])
    p.createMultiBody(baseVisualShapeIndex=wall_visual, baseCollisionShapeIndex=wall_collision, basePosition=mid_pos,
                      baseOrientation=p.getQuaternionFromEuler([0, 0, orientation]))

# 장기(maze) 벽 생성
walls = [
    [(0, 0), (5, 0)],
    [(0, 0), (0, 5)],
    [(5, 0), (5, 5)],
    [(0, 5), (5, 5)],
    [(1, 0), (1, 3)],
    [(3, 2), (3, 5)],
    [(2, 2), (4, 2)]
]

for wall in walls:
    create_wall(wall[0], wall[1])

# 내시경 에이전트 생성
agent_start_pos = [0.5, 0.5, 0.1]
agent = p.loadURDF("r2d2.urdf", agent_start_pos)

# 목표 위치 설정
goal_pos = [4.5, 4.5, 0.1]
goal_visual = p.createVisualShape(p.GEOM_SPHERE, radius=0.2, rgbaColor=[1, 0, 0, 1])
goal = p.createMultiBody(baseVisualShapeIndex=goal_visual, basePosition=goal_pos)

# PyBullet 시뮬레이션 단계 실행
p.stepSimulation()

# 환경 렌더링
def render_env():
    view_matrix = p.computeViewMatrixFromYawPitchRoll(cameraTargetPosition=[2.5, 2.5, 0],
                                                      distance=6,
                                                      yaw=50,
                                                      pitch=-35,
                                                      roll=0,
                                                      upAxisIndex=2)
    proj_matrix = p.computeProjectionMatrixFOV(fov=60,
                                               aspect=1.0,
                                               nearVal=0.1,
                                               farVal=100.0)
    width, height, rgbImg, _, _ = p.getCameraImage(width=640,
                                                   height=480,
                                                   viewMatrix=view_matrix,
                                                   projectionMatrix=proj_matrix)
    return rgbImg

# 시각화
image = render_env()
plt.imshow(image)
plt.title('3D Environment Visualization')
plt.axis('off')
plt.show()

# PyBullet 연결 종료
p.disconnect()