# PyBullet + DQN algorithm

앞서 pybullet으로 구성한 

- **상태**: `[cos(theta), sin(theta), theta_dot]`
- **행동**: 진자 관절에 가하는 토크 (-2.0 ~ 2.0)
- **보상**: 진자가 위로 설수록, 각속도와 토크가 작을수록 높은 보상 획득.

## 1. Policy (DQN) algorithm

In [18]:
from collections import namedtuple, deque
import matplotlib
import matplotlib.pyplot as plt
import random

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

디버깅한 환경 클래스.

In [None]:
import gymnasium as gym
from gymnasium import spaces
import pybullet as p
import pybullet_data
import numpy as np
import cv2
import os

class PendulumEnv(gym.Env):
    metadata = {"render_modes":["human", "rgb_array"]}
    
    # record_video와 video_path 옵션을 추가하여 동영상 기록을 제어합니다.
    def __init__(self, render_mode="DIRECT", record_video=False, video_path=None):
        super(PendulumEnv, self).__init__()
        
        self.render_mode = render_mode
        self.record_video = record_video
        self.videoPath = video_path

        if p.isConnected():
            p.disconnect()

        if self.render_mode == 'human':
            self.client = p.connect(p.GUI)
        else:
            self.client = p.connect(p.DIRECT)

        self.g = 9.81
        self.maxTorque = 20.0

        # 비디오 녹화가 활성화된 경우에만 카메라 및 프레임 버퍼를 설정합니다.
        if self.record_video:
            self._setup_camera_and_video()
        
        # 모델 불러오기 
        p.setAdditionalSearchPath(pybullet_data.getDataPath())
        p.loadURDF("plane.urdf")
        p.setGravity(0, 0, -self.g)

        # **중요: 로컬 경로를 사용하도록 수정**
        modelPath = os.path.join('models', "pendulum_test.urdf")
        print(f"model path: {modelPath}")
        
        self.modelId = p.loadURDF(modelPath, useFixedBase=True)

        # 사용자님의 변수명을 그대로 사용합니다.
        self.actionSpace = spaces.Discrete(3, start=-1)
        high = np.array([1.0, 1.0, 15.0], dtype=np.float32)
        self.observationSpace = spaces.Box(low=-high, high=high, dtype=np.float32)

        self.stepCounter = 0
    
    def _setup_camera_and_video(self):
        """비디오 녹화에 필요한 설정을 초기화하는 헬퍼 함수"""
        self.frames = []
        self._render_width = 640
        self._render_height = 480
        
        cam_eye_pos = [0, 2.5, 1.5]
        cam_target_pos = [0, 0, 1]
        cam_up_vector = [0, 0, 1]

        self.view_matrix = p.computeViewMatrix(
            cameraEyePosition=cam_eye_pos,
            cameraTargetPosition=cam_target_pos,
            cameraUpVector=cam_up_vector
        )
        self.projection_matrix = p.computeProjectionMatrixFOV(
            fov=60.0,
            aspect=float(self._render_width) / self._render_height,
            nearVal=0.1,
            farVal=100.0
        )
        if self.videoPath and not os.path.exists(self.videoPath):
            os.makedirs(self.videoPath)

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.stepCounter = 0

        # 진자를 아래로 매달린 상태(pi)에서 약간 흔들리게 시작
        initialAngle = np.pi + self.np_random.uniform(low=-0.1, high=0.1)
        p.resetJointState(self.modelId, 1, targetValue=initialAngle, targetVelocity=0)

        # 조인트 내부 모터 비활성화 (필수)
        p.setJointMotorControl2(self.modelId, 1, p.VELOCITY_CONTROL, force=0)
        
        return self._get_obs(), {}
    
    def step(self, action):
        torque = float(action) * self.maxTorque
        p.setJointMotorControl2(self.modelId, 1, p.TORQUE_CONTROL, force=torque)
        p.stepSimulation()

        # **핵심 수정: record_video가 True일 때만 프레임을 캡처합니다.**
        if self.record_video:
            img = p.getCameraImage(self._render_width, self._render_height, 
                               viewMatrix=self.view_matrix,
                               projectionMatrix=self.projection_matrix,
                               renderer=p.ER_TINY_RENDERER)
            rgb = np.array(img[2]).reshape((self._render_height, self._render_width, 4))[:,:,:3]
            self.frames.append(rgb)

        obs = self._get_obs()
        reward, terminated = self._calculate_reward_and_done(obs, action)
        self.stepCounter += 1
        truncated = self.stepCounter >= 200 # 스윙업 과제는 보통 200 스텝으로 설정

        return obs, reward, terminated, truncated, {}

    def _get_obs(self):
        jointState = p.getJointState(self.modelId, 1)
        theta, theta_dot = jointState[0], jointState[1]
        theta = (theta + np.pi) % (2*np.pi) - np.pi
        return np.array([np.cos(theta), np.sin(theta), theta_dot], dtype=np.float32)
    
    def _calculate_reward_and_done(self, obs, action):
        cos_theta, sin_theta, theta_dot = obs
        
        is_upright = cos_theta > 0.99
        is_slow = abs(theta_dot) < 0.1
        
        terminated = bool(is_upright and is_slow)

        if terminated:
            # 성공 시, 비용이 0이 되어 가장 큰 보상을 받음
            reward = 0
        else:
            # 목표: 진자를 위로 올리기 (고전적인 비용 기반 보상 함수)
            theta = np.arctan2(sin_theta, cos_theta)
            reward = -(theta**2 + 0.1 * abs(theta_dot) + 0.001 * (float(action)**2))
        
        return reward, terminated

    def videoRecord(self):
        if self.record_video and self.frames:
            videoFile = os.path.join(self.videoPath, f"recordings.mp4")
            print(f"Video recording start... saving to {videoFile}")
            video = cv2.VideoWriter(videoFile, cv2.VideoWriter_fourcc(*'mp4v'), 30, (self._render_width, self._render_height))
            # video = cv2.VideoWriter(videoFile, cv2.VideoWriter_fourcc(*'avc1'), 30, (self._render_width, self._render_height))
            for frame in self.frames:
                video.write(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
            video.release()
            print("Video saved.")
        else:
            print(f"Function called, but self.record_video {self.record_video} or self.frames {len(self.frames)}")
    
    def close(self):        
        if self.record_video:
            self.videoRecord()
        p.disconnect(self.client)


### DQN (Deep Q-learning Network) 이란?
여기서부터는 공부를 하고 해보자. 

앞서 환경을 구성했고, 어떤 알고리즘을 사용할지 정했으니.

당장 할 일이 있으니 그걸 처리하고, 이렇게 계속해서 해보자. 


In [20]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward', 'done'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

class QNetwork(nn.Module):
    def __init__(self, n_observations, n_actions):
        super(QNetwork, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)
    
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

In [9]:
class DQNAgent:
    def __init__(self, state_size, action_space):
        self.state_size = state_size
        self.action_space = action_space
        self.action_size = action_space.n
        
        self.action_map = {i: i + self.action_space.start for i in range(self.action_size)}

        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.batch_size = 64
        self.memory_capacity = 10000

        self.device = torch.device(
            'cuda' if torch.cuda.is_available() else
            'mps' if torch.backends.mps.is_available() else
            'cpu'
        )
        print(f"Using device: {self.device}")

        self.policy_net = QNetwork(state_size, self.action_size).to(self.device)
        self.target_net = QNetwork(state_size, self.action_size).to(self.device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()

        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=self.learning_rate)
        self.memory = ReplayMemory(self.memory_capacity)

    def act(self, state):
        if random.random() < self.epsilon:
            return self.action_space.sample()
        
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        with torch.no_grad():
            q_values = self.policy_net(state)
        
        action_index = np.argmax(q_values.cpu().data.numpy())
        return self.action_map[action_index]

    def learn(self):
        if len(self.memory) < self.batch_size:
            return

        minibatch = self.memory.sample(self.batch_size)
        states, actions, rewards, next_states, dones = zip(*minibatch)

        states = torch.FloatTensor(np.array(states)).to(self.device)
        action_indices = [a - self.action_space.start for a in actions]
        actions = torch.LongTensor(action_indices).unsqueeze(1).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor(np.array(next_states)).to(self.device)
        dones = torch.FloatTensor(dones).to(self.device)

        current_q_values = self.policy_net(states).gather(1, actions)
        next_q_values = self.target_net(next_states).max(1)[0].detach()
        target_q_values = rewards + (self.gamma * next_q_values * (1 - dones))

        loss = nn.MSELoss()(current_q_values.squeeze(), target_q_values)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
            
    def update_target_net(self):
        self.target_net.load_state_dict(self.policy_net.state_dict())



### 학습 코드

In [None]:
def train():
    # videoPath를 지정하여 에피소드 종료 시 동영상이 저장되도록 합니다.
    VIDEO_PATH = 'video/train'
    MODEL_PATH = "trained/dqn_pendulum_model.pth"
    env = PendulumEnv(render_mode="DIRECT", record_video=True ,video_path=VIDEO_PATH)
    
    # 사용자님의 변수명을 그대로 사용합니다.
    state_size = env.observationSpace.shape[0]
    action_space = env.actionSpace
    
    agent = DQNAgent(state_size, action_space)
    
    num_episodes = 5 # 주의: 100 에피소드도 매우 오래 걸릴 수 있습니다.
    EVALUATION_STEPS = 240 * 3
    target_update_frequency = 5

    print("="*50)
    print("DQN 학습을 시작합니다.")
    # print("경고: 매 스텝마다 비디오 프레임을 캡처하므로 매우 느리게 실행됩니다.")
    print("="*50)

    try:
        for episode in range(num_episodes):
            
            state, _ = env.reset()
            total_reward = 0
            done = False
            truncated = False

            while not (done or truncated):
                action = agent.act(state)

                next_state, reward, done, truncated, _ = env.step(action)
                
                agent.memory.push(state, action, reward, next_state, bool(done or truncated))
                
                state = next_state
                total_reward += reward
                
                agent.learn()

            if episode % target_update_frequency == 0:
                agent.update_target_net()

            if episode % 100 == 0:
                print(f"Episode: {episode+1}/{num_episodes}, Score: {-1*total_reward:.2f}, Epsilon: {agent.epsilon:.2f}")
    
    finally:
        # 에피소드가 끝나면 env.close()를 호출하여 동영상을 저장합니다.
        env.close()
        

    # 최종 모델 저장
    torch.save(agent.policy_net.state_dict(), MODEL_PATH)
    print("Training finished and model saved.")


if __name__ == "__main__":
    train()

model path: models/pendulum_test.urdf
Using device: cuda
DQN 학습을 시작합니다.
Episode: 1/5, Score: 1911.92, Epsilon: 0.50
Video recording start... saving to video/train/recordings.avi
Video saved.
Training finished and model saved.


### 학습된 모델 테스트

In [None]:
def evaluate():
    """
    학습된 DQN 모델을 불러와 성능을 테스트하고 동영상으로 저장합니다.
    """
    # --- 설정 ---
    MODEL_PATH = "trained/dqn_pendulum_model.pth"
    VIDEO_PATH = "video"
    NUM_EPISODES = 2 # 테스트할 에피소드 수

    # --- 모델 파일 존재 여부 확인 ---
    if not os.path.exists(MODEL_PATH):
        print(f"Error: 모델 파일 '{MODEL_PATH}'을 찾을 수 없습니다.")
        print("먼저 train_dqn.py를 실행하여 모델을 학습하고 저장해주세요.")
        return

    # --- 환경 및 에이전트 설정 ---
    # render_mode='human'으로 GUI를 띄우고, record_video=True로 동영상 저장을 활성화합니다.
    env = PendulumEnv(render_mode="human", record_video=True, video_path=VIDEO_PATH)
    
    state_size = env.observationSpace.shape[0]
    action_space = env.actionSpace
    agent = DQNAgent(state_size, action_space)

    # --- 학습된 모델 로드 ---
    print(f"Loading trained model from '{MODEL_PATH}'...")
    agent.policy_net.load_state_dict(torch.load(MODEL_PATH))
    agent.policy_net.eval() # 모델을 평가 모드로 설정 (매우 중요!)
    
    agent.epsilon = 0.0
    EVALUATION_STEPS = 240 * 5 

    print("Starting evaluation...")
    
    try:
        for episode in range(NUM_EPISODES):
            state, info = env.reset()
            total_reward = 0
            done = False
            truncated = False

            for step in range(EVALUATION_STEPS):
                # --- 최적의 행동 선택 (무작위성 없음) ---
                # .eval() 모드에서는 신경망이 가장 높은 Q값을 갖는 행동을 선택합니다.
                action = agent.act(state)
                
                next_state, reward, done, truncated, info = env.step(action)
                state = next_state
                total_reward += reward
                
                # # GUI에서 보기 좋도록 약간의 딜레이 추가
                # time.sleep(1. / 240.)

            print(f"Evaluation Episode {episode + 1}: Total Reward = {total_reward:.2f}")

    finally:
        # --- 환경 종료 (동영상 저장) ---
        # env.close()가 호출되면서 동영상이 'videos' 폴더에 저장됩니다.
        env.close()

if __name__ == "__main__":
    evaluate()

startThreads creating 1 threads.
starting thread 0
started thread 0 
argc=2
argv[0] = --unused
argv[1] = --start_demo_name=Physics Server
ExampleBrowserThreadFunc started
X11 functions dynamically loaded using dlopen/dlsym OK!
X11 functions dynamically loaded using dlopen/dlsym OK!
Creating context
Created GL 3.3 context
Direct GLX rendering context obtained
Making context current
GL_VENDOR=NVIDIA Corporation
GL_RENDERER=NVIDIA GeForce RTX 4080 SUPER/PCIe/SSE2
GL_VERSION=3.3.0 NVIDIA 580.65.06
GL_SHADING_LANGUAGE_VERSION=3.30 NVIDIA via Cg compiler
pthread_getconcurrency()=0
Version = 3.3.0 NVIDIA 580.65.06
Vendor = NVIDIA Corporation
Renderer = NVIDIA GeForce RTX 4080 SUPER/PCIe/SSE2
b3Printf: Selected demo: Physics Server
startThreads creating 1 threads.
starting thread 0
started thread 0 
MotionThreadFunc thread started
model path: models/pendulum_test.urdf
Using device: cuda
ven = NVIDIA Corporation
Loading trained model from 'dqn_pendulum_model.pth'...
Starting evaluation...
ven =