***250707 홍송남 교수님 <현대자동차 Bootcamp 실습자료> - Q-learning***

##0. Visualize
###0.1. 필요한 패키지 설치
  - `ffmpeg`, `imageio`: 비디오/오디오의 인코딩 및 디코딩, 입출력 패키지입니다.
  - `gymnasium[classic_control]`: 강화학습 환경 라이브러리 중 classic control 모듈을 설치합니다.

In [55]:
!apt-get update -qq
!apt-get install -y ffmpeg > /dev/null
!pip install gymnasium[classic_control] imageio imageio-ffmpeg > /dev/null 2>&1

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


### 0.2. 비디오 관련 함수 정의
- 학습 결과를 시각적으로 확인하기 위해 결과를 이미지 프레임단위로 저장하여 비디오로 렌더링합니다.
- 이를 저장하고 colab상에서 재생하기 위한 함수 `show_video`를 정의합니다.

In [56]:
import os, glob, io, base64
from IPython.display import HTML, display

os.makedirs('video', exist_ok=True)

def show_video(name):
    mp4list = glob.glob(f'video/{name}.mp4')
    if mp4list:
        mp4 = mp4list[0]
        video = io.open(mp4,'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(f'''
            <video autoplay controls style="max-height: 400;">
              <source src="data:video/mp4;base64,{encoded.decode('ascii')}" type="video/mp4"/>
            </video>'''))
    else:
        print("Could not find video")


---
## 1. Gym environment 구축

In [57]:
import gymnasium as gym

env = gym.make('CartPole-v1', render_mode='rgb_array')

Gym Environment options
- `env.observation_space.n`: Dimension of State space
- `env.action_space.n`: Dimension of Action space
- Others : https://gymnasium.farama.org/api/env/

In [58]:
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

print("State  :", state_dim)
print("Action :", action_dim)


State  : 4
Action : 2


---
## 2. Actor-Critic

### 2.1. Actor 및 Critic Network 정의

In [59]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Actor (Policy) Network
class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        action_probs = F.softmax(self.fc2(x), dim=-1)
        return action_probs


# Critic (Value) Network
class ValueNetwork(nn.Module):
    def __init__(self, state_dim, hidden_dim=128):
        super(ValueNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        state_value = self.fc2(x)
        return state_value.squeeze(-1)

### 2.2 Actor 및 Critic Network 생성
- learning rate
  - Actor: 0.0003
  - Critic: 0.001
- Optimizer: ADAM

In [60]:
import torch.optim as optim

critic_learning_rate = 1e-3
actor_learning_rate = 3e-4

Actor = PolicyNetwork(state_dim, action_dim)
Critic = ValueNetwork(state_dim)
policy_optimizer = optim.Adam(Actor.parameters(), lr=actor_learning_rate)
value_optimizer = optim.Adam(Critic.parameters(), lr=critic_learning_rate)

In [None]:
def Actor_Critic(env, Actor, Critic, policy_optimizer, value_optimizer, n_episodes, gamma=0.99):
  for epi in range(n_episodes):
    state, _ = env.reset()
    done = False
    while not done:
      # Convert state to tensor
      state = torch.as_tensor(state, dtype=torch.float32)

      # Actor: get action probabilities and sample
      action_probs = Actor(state)
      dist = torch.distributions.Categorical(action_probs)
      action = dist.sample()

      # Environment step
      next_state, reward, terminated, truncated, _ = env.step(action.item())
      done = terminated or truncated

      # Convert next state to tensor
      next_state = torch.as_tensor(next_state, dtype=torch.float32)

      # Critic: state value estimates
      next_state_value = Critic(next_state)
      state_value = Critic(state)

      # Compute TD error
      td_error = reward + gamma * next_state_value * (1 - done) - state_value

      # Update Actor (policy)
      policy_loss = -dist.log_prob(action) * td_error.detach()
      policy_optimizer.zero_grad()
      policy_loss.backward()
      policy_optimizer.step()

      # Update Critic (value)
      value_loss = td_error.pow(2)
      value_optimizer.zero_grad()
      value_loss.backward()
      value_optimizer.step()

      # Move to next state
      state = next_state

  return Actor, Critic

### 2.3. Actor-Critic 학습 수행


In [62]:
gamma = 0.99        # Discount factor
n_episodes = 1000

Actor, Critic = Actor_Critic(env, Actor, Critic, policy_optimizer, value_optimizer, n_episodes)

  state = torch.tensor(state, dtype=torch.float32)


### 2.4. 학습 결과 확인

In [68]:
import imageio

writer = imageio.get_writer('video/actorcritic.mp4', macro_block_size=1, fps=50)
state, _ = env.reset()
done = False
step = 0

while not done:
    step += 1
    frame = env.render()
    writer.append_data(frame)

    state = torch.tensor(state, dtype=torch.float32)
    action_probs = Actor(state)
    action = torch.distributions.Categorical(action_probs).sample().item()
    state, _, terminated, truncated, _ = env.step(action)
    done = terminated or truncated

print("Steps:", step)
writer.close()
env.close()
show_video('actorcritic')

Steps: 379
