In [21]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import random
import numpy as np
from collections import deque, namedtuple
import matplotlib.pyplot as plt
import math
from typing import Optional, Union
import gym
from gym import logger, spaces
from gym.envs.classic_control import utils
from gym.error import DependencyNotInstalled

### Actor-Critic Objective Function and Constraints
Loss Functions for Critic and Actor
- Critic Loss Function
    - $L_{Critic}=E[(V(s)−(r+γV(s′)))^2]$ 
- Actor Loss Function
    - $L_{Actor}=−E[logπ(a∣s)⋅A(s,a)]−β⋅H(π(⋅∣s))$
- Overall Objective Function
    - $L=L_{Critic}+α⋅L_{Actor}$

### Constraints
State and Action Space Constraints
- State Constraints
    - $𝑠∈𝑆$ : The state must belong to the state space $S$ provided by the environment.
    - $𝑆=[x,x',θ,θ']$ : The state consists of the cart position $x$, cart velocity $x′$,pole angle $θ$, and pole angular velocity $θ$
- Action Constraints
    - $𝑎∈𝐴$ : The action must belong to the discrete action space $A$ provided by the environment.
    - $𝐴={0,1}$ : The agent can apply force to move the cart left or right.

### Termination Conditions
- The episode terminates when
    - The cart position exceeds the allowed range $[−2.4,2.4]$
        - $∣𝑥∣>2.4$->Terminate Episode
    - The maximum time step limit is reached:
        - t≥max_steps->Terminate Episode

### Final Formulation of the Optimization Problem
Objective Function
- $min  θ Actor,θ Critic L=L_{Critic}+α\cdot L_{Actor}$

### $S.t$
- $s∈S,a∈A$
- $|x∣≤2.4$
- $∣θ∣≤12$
- $t≤max steps$

In [26]:
class CartPoleEnv(gym.Env):
    def __init__(self, render_mode: Optional[str] = "human", max_steps: int = 50):
        super().__init__()
        self.max_steps = max_steps  # 시간 제한 스텝 수
        self.current_step = 0      # 현재 스텝 수 초기화
        self.render_mode = render_mode 
        self.gravity = 9.8
        self.masscart = 1.0
        self.masspole = 0.1
        self.total_mass = self.masspole + self.masscart
        self.length = 0.5  # 실제 폴의 절반 길이
        self.polemass_length = self.masspole * self.length
        self.force_mag = 10.0
        self.tau = 0.02  # 상태 업데이트 간격 (초)
        self.kinematics_integrator = "euler"

        self.theta_threshold_radians = 12 * 2 * math.pi / 360
        self.x_threshold = 2.4

        # 추가: 안정성 임계값
        self.x_stable_threshold = 0.5  # 안정성 판단을 위한 x 위치 임계값
        self.theta_stable_threshold = 0.05  # 안정성 판단을 위한 각도 임계값
        self.stable_steps = 0  # 안정적 유지 시간 초기화

        high = np.array(
            [
                self.x_threshold * 2,
                np.finfo(np.float32).max,
                self.theta_threshold_radians * 2,
                np.finfo(np.float32).max,
            ],
            dtype=np.float32,
        )

        self.action_space = spaces.Discrete(2)
        self.observation_space = spaces.Box(-high, high, dtype=np.float32)

        self.render_mode = render_mode

        self.screen_width = 600
        self.screen_height = 400
        self.screen = None
        self.clock = None
        self.isopen = True
        self.state = None
        self.steps_beyond_terminated = None

        # Spec 속성 설정
        self.spec = type('', (), {'id': "CustomCartPole-v0"})()


    def step(self, action):
        err_msg = f"{action!r} ({type(action)}) invalid"
        assert self.action_space.contains(action), err_msg
        assert self.state is not None, "Call reset before using step method."
        x, x_dot, theta, theta_dot = self.state
        force = self.force_mag if action == 1 else -self.force_mag
        costheta = math.cos(theta)
        sintheta = math.sin(theta)

        temp = (
            force + self.polemass_length * theta_dot**2 * sintheta
        ) / self.total_mass
        thetaacc = (self.gravity * sintheta - costheta * temp) / (
            self.length * (4.0 / 3.0 - self.masspole * costheta**2 / self.total_mass)
        )
        xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass

        if self.kinematics_integrator == "euler":
            x = x + self.tau * x_dot
            x_dot = x_dot + self.tau * xacc
            theta = theta + self.tau * theta_dot
            theta_dot = theta_dot + self.tau * thetaacc
        else:  # semi-implicit euler
            x_dot = x_dot + self.tau * xacc
            x = x + self.tau * x_dot
            theta_dot = theta_dot + self.tau * thetaacc
            theta = theta + self.tau * theta_dot

        self.state = (x, x_dot, theta, theta_dot)
        self.current_step += 1

        terminated = bool(
            x < -self.x_threshold or x > self.x_threshold
            or self.current_step >= self.max_steps
        )

        # 보상 계산
        distance_reward = max(1 - (abs(x) / self.x_threshold), 0)
        angle_reward = max(1 - (abs(theta) / self.theta_threshold_radians), 0)
        force_penalty = (abs(force) / self.force_mag) ** 2

        # 안정성 보너스
        stability_bonus = 0
        if abs(x) < self.x_stable_threshold and abs(theta) < self.theta_stable_threshold:
            self.stable_steps += 1
            stability_bonus = min(self.stable_steps * 0.02, 1.0)  # 안정성 최대치 제한
        else:
            self.stable_steps = 0

        # 종합 보상
        reward = (
            angle_reward * 0.6
            + distance_reward * 0.3
            - force_penalty * 0.1
            + stability_bonus
        )
        reward = max(reward, 0)

        if terminated:
            reward -= 0.5  # 에피소드 종료 시 패널티

        # 보상 스케일링
        reward_scaling_factor = 1.0
        reward *= reward_scaling_factor

        if self.render_mode == "human":
            self.render()

        return np.array(self.state, dtype=np.float32), reward, terminated, False, {}


    def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
        self.np_random, seed = gym.utils.seeding.np_random(seed)
        self.current_step = 0  # 스텝 카운터 초기화
        self.state = np.array([0.0, 0.0, np.pi, 0.0], dtype=np.float32)  # 초기 상태 설정
        self.steps_beyond_terminated = None

        if self.render_mode == "human":
            self.render()
        return np.array(self.state, dtype=np.float32), {}

    def render(self):
        if self.render_mode is None:
            gym.logger.warn(
                "You are calling render method without specifying any render mode. "
                "You can specify the render_mode at initialization, "
                f'e.g. gym("{self.spec.id}", render_mode="rgb_array")'
            )
            return

        try:
            import pygame
            from pygame import gfxdraw
        except ImportError:
            raise DependencyNotInstalled(
                "pygame is not installed, run `pip install gym[classic_control]`"
            )

        if self.screen is None:
            pygame.init()
            if self.render_mode == "human":
                pygame.display.init()
                self.screen = pygame.display.set_mode(
                    (self.screen_width, self.screen_height)
                )
            else:  # mode == "rgb_array"
                self.screen = pygame.Surface((self.screen_width, self.screen_height))
        if self.clock is None:
            self.clock = pygame.time.Clock()

        world_width = self.x_threshold * 2
        scale = self.screen_width / world_width
        polewidth = 10.0
        polelen = scale * (2 * self.length)
        cartwidth = 50.0
        cartheight = 30.0

        if self.state is None:
            return None

        x = self.state

        self.surf = pygame.Surface((self.screen_width, self.screen_height))
        self.surf.fill((255, 255, 255))

        l, r, t, b = -cartwidth / 2, cartwidth / 2, cartheight / 2, -cartheight / 2
        axleoffset = cartheight / 4.0
        cartx = x[0] * scale + self.screen_width / 2.0  # MIDDLE OF CART
        carty = 100  # TOP OF CART
        cart_coords = [(l, b), (l, t), (r, t), (r, b)]
        cart_coords = [(c[0] + cartx, c[1] + carty) for c in cart_coords]
        gfxdraw.aapolygon(self.surf, cart_coords, (0, 0, 0))
        gfxdraw.filled_polygon(self.surf, cart_coords, (0, 0, 0))

        l, r, t, b = (
            -polewidth / 2,
            polewidth / 2,
            polelen - polewidth / 2,
            -polewidth / 2,
        )

        pole_coords = []
        for coord in [(l, b), (l, t), (r, t), (r, b)]:
            coord = pygame.math.Vector2(coord).rotate_rad(-x[2])
            coord = (coord[0] + cartx, coord[1] + carty + axleoffset)
            pole_coords.append(coord)
        gfxdraw.aapolygon(self.surf, pole_coords, (202, 152, 101))
        gfxdraw.filled_polygon(self.surf, pole_coords, (202, 152, 101))

        gfxdraw.aacircle(
            self.surf,
            int(cartx),
            int(carty + axleoffset),
            int(polewidth / 2),
            (129, 132, 203),
        )
        gfxdraw.filled_circle(
            self.surf,
            int(cartx),
            int(carty + axleoffset),
            int(polewidth / 2),
            (129, 132, 203),
        )

        gfxdraw.hline(self.surf, 0, self.screen_width, carty, (0, 0, 0))

        self.surf = pygame.transform.flip(self.surf, False, True)
        self.screen.blit(self.surf, (0, 0))
        if self.render_mode == "human":
            pygame.event.pump()
            self.clock.tick(self.metadata["render_fps"])
            pygame.display.flip()

        elif self.render_mode == "rgb_array":
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(self.screen)), axes=(1, 0, 2)
            )

    def close(self):
        if self.screen is not None:
            import pygame

            pygame.display.quit()
            pygame.quit()
            self.isopen = False

env

In [27]:

env = CartPoleEnv(render_mode="rgb_array", max_steps=200)
state, _ = env.reset()
print("Custom initial state:", state)

Custom initial state: [0.        0.        3.1415927 0.       ]


AC netwrok

In [28]:
import torch
import torch.nn as nn
import torch.distributions as distributions

class ActorNetwork(nn.Module):
    def __init__(self, state_size, action_size, seed):
        super(ActorNetwork, self).__init__()
        self.seed = torch.manual_seed(seed)
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        logits = self.fc3(x)  # Policy logits
        return distributions.Categorical(logits=logits)

class CriticNetwork(nn.Module):
    def __init__(self, state_size, seed):
        super(CriticNetwork, self).__init__()
        self.seed = torch.manual_seed(seed)
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, 1)  # Output state value
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        value = self.fc3(x)  # State value
        return value


Hyper parameter

In [29]:
import torch

# Hyperparameters
BUFFER_SIZE = int(1e5)
BATCH_SIZE = 64
GAMMA = 0.99
TAU = 1e-3
LR = 5e-4
UPDATE_EVERY = 4

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [30]:
# Replay Buffer
class ReplayBuffer:
    def __init__(self, action_size, buffer_size, batch_size, seed):
        self.action_size = action_size
        self.memory = deque(maxlen=buffer_size)
        self.batch_size = batch_size
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
        self.seed = random.seed(seed)

    def add(self, state, action, reward, next_state, done):
        e = self.experience(state, action, reward, next_state, done)
        self.memory.append(e)

    def sample(self):
        experiences = random.sample(self.memory, k=self.batch_size)
        states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float().to(device)
        actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(device)
        rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
        next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float().to(device)
        dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(device)

        return (states, actions, rewards, next_states, dones)

    def __len__(self):
        return len(self.memory)

In [31]:
import torch.nn.functional as F
import torch.optim as optim
import random
from collections import deque, namedtuple

class Agent:
    def __init__(self, state_size, action_size, seed, actor_lr=1e-3, critic_lr=1e-3):
        """Actor-Critic Agent 초기화"""
        self.state_size = state_size
        self.action_size = action_size
        self.seed = random.seed(seed)

        # Actor와 Critic 네트워크 정의
        self.actor = ActorNetwork(state_size, action_size, seed).to(device)
        self.critic = CriticNetwork(state_size, seed).to(device)

        # Optimizer 설정
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=critic_lr)

        # Replay Buffer 초기화
        self.memory = ReplayBuffer(action_size, BUFFER_SIZE, BATCH_SIZE, seed)
        self.t_step = 0

    def step(self, state, action, reward, next_state, done, gamma=0.99):
        """경험 저장 및 Replay Buffer 기반 학습"""
        # Replay Buffer에 경험 추가
        self.memory.add(state, action, reward, next_state, done)

        # 매 UPDATE_EVERY 스텝마다 학습
        self.t_step = (self.t_step + 1) % UPDATE_EVERY
        if self.t_step == 0 and len(self.memory) > BATCH_SIZE:
            experiences = self.memory.sample()
            self.learn(experiences, gamma)

    def act(self, state, eps=0.):
        """현재 상태에서 행동 선택 (Actor 네트워크 기반)"""
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        self.actor.eval()
        with torch.no_grad():
            action_probs = self.actor(state)  # 정책 확률
        self.actor.train()

        # 확률에 따라 행동 샘플링
        action = action_probs.sample()
        return action.item()

    def learn(self, experiences, gamma):
        """Replay Buffer에서 샘플링한 경험으로 Actor와 Critic 학습"""
        states, actions, rewards, next_states, dones = experiences

        # Critic 업데이트: 상태 가치 계산
        values = self.critic(states)
        next_values = self.critic(next_states).detach()
        targets = rewards + (1 - dones) * gamma * next_values
        critic_loss = F.mse_loss(values, targets)

        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # Actor 업데이트: 정책 그래디언트 계산
        action_probs = self.actor(states)
        log_probs = action_probs.log_prob(actions.squeeze(-1))
        advantages = targets - values.detach()  # Advantage 계산

        # Advantage 정규화
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-5)

        # 엔트로피 보너스 계산
        entropy_bonus = action_probs.entropy().mean()

        # Actor 손실 계산 (엔트로피 보너스 포함)
        actor_loss = -(log_probs * advantages).mean() - 0.01 * entropy_bonus

        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # Optional: 학습 상태 출력
        # print(f"Actor Loss: {actor_loss.item()}, Log Probs: {log_probs.mean().item()}, Advantage: {advantages.mean().item()}, Entropy: {entropy_bonus.item()}")
        # print(f"Critic Loss: {critic_loss.item()}, Values: {values.mean().item()}, Targets: {targets.mean().item()}")


learning

In [33]:
# Actor-Critic 학습 함수
def actor_critic_train(n_episodes=10000, max_t=200, gamma=0.99):
    scores = []
    scores_window = deque(maxlen=100)

    for i_episode in range(1, n_episodes + 1):
        state, _ = env.reset()
        score = 0

        for t in range(max_t):
            action = agent.act(state)
            next_state, reward, done, _, _ = env.step(action)
            agent.step(state, action, reward, next_state, done, gamma)

            state = next_state
            score += reward
            if done:
                break

        scores_window.append(score)
        scores.append(score)

        print(f'\rEpisode {i_episode}\tAverage Score: {np.mean(scores_window):.2f}', end="")
        if i_episode % 100 == 0:
            print(f'\rEpisode {i_episode}\tAverage Score: {np.mean(scores_window):.2f}')
        if np.mean(scores_window) >= 200.0:
            print(f'\nEnvironment solved in {i_episode} episodes!\tAverage Score: {np.mean(scores_window):.2f}')
            torch.save(agent.actor.state_dict(), 'actor_checkpoint.pth')
            torch.save(agent.critic.state_dict(), 'critic_checkpoint.pth')
            break
        if i_episode == n_episodes:
            print(f'\nEnvironment solved in {i_episode} episodes!\tAverage Score: {np.mean(scores_window):.2f}')
            torch.save(agent.actor.state_dict(), 'actor_checkpoint.pth')
            torch.save(agent.critic.state_dict(), 'critic_checkpoint.pth')
            
    return scores

# 환경 및 하이퍼파라미터 설정
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

agent = Agent(state_size=state_size, action_size=action_size, seed=0)
scores = actor_critic_train()

Episode 100	Average Score: 15.17
Episode 200	Average Score: 15.66
Episode 300	Average Score: 16.71
Episode 400	Average Score: 15.48
Episode 500	Average Score: 15.64
Episode 600	Average Score: 15.98
Episode 700	Average Score: 14.02
Episode 800	Average Score: 15.01
Episode 900	Average Score: 15.78
Episode 1000	Average Score: 16.06
Episode 1100	Average Score: 15.97
Episode 1200	Average Score: 14.05
Episode 1300	Average Score: 15.37
Episode 1400	Average Score: 16.43
Episode 1500	Average Score: 14.16
Episode 1600	Average Score: 15.48
Episode 1700	Average Score: 17.04
Episode 1800	Average Score: 15.89
Episode 1900	Average Score: 15.37
Episode 2000	Average Score: 15.27
Episode 2100	Average Score: 14.86
Episode 2200	Average Score: 16.39
Episode 2300	Average Score: 16.86
Episode 2400	Average Score: 16.32
Episode 2500	Average Score: 15.67
Episode 2600	Average Score: 16.30
Episode 2700	Average Score: 16.06
Episode 2800	Average Score: 17.45
Episode 2900	Average Score: 16.06
Episode 3000	Average Sc

test

In [17]:
import os
import torch
import torch.nn.functional as F
import random
import numpy as np
import gym
from gym.wrappers import RecordVideo
from collections import deque
from IPython.display import HTML, display
import glob
import base64
import io

# GPU 또는 CPU 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 비디오 표시 함수
def show_video():
    """Display the video from the 'video/' folder."""
    mp4list = glob.glob('video/*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay 
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video. Make sure the 'video/' folder exists and contains recordings.")

# 비디오 녹화 및 실행
# 비디오 녹화 및 실행
def show_video_of_model(agent, env):
    """Run the trained agent in the environment and record a video."""
    if not os.path.exists("video/"):
        os.makedirs("video/")

    # RecordVideo 래퍼로 감싸기
    env = RecordVideo(env, video_folder="video/", episode_trigger=lambda x: True)

    # Actor 모델 로드
    agent.actor.load_state_dict(torch.load('actor_checkpoint.pth', map_location=device))
    
    state, _ = env.reset()
    done = False

    while not done:
        action = agent.act(state)  # Actor의 정책을 사용하여 행동 선택
        state, reward, done, truncated, _ = env.step(action)
        if done or truncated:
            break

    env.close()
    print("Video recording complete. Check the 'video/' folder.")

# 학습된 에이전트로 비디오 생성 및 표시
# 환경 생성
env = CartPoleEnv(render_mode="rgb_array", max_steps=200)

# 에이전트 생성 및 비디오 녹화 실행
agent = Agent(state_size=4, action_size=2, seed=0)
show_video_of_model(agent, env)
show_video()



  logger.warn(
  agent.actor.load_state_dict(torch.load('actor_checkpoint.pth', map_location=device))


MoviePy - Building video c:\Cart_Pole_dqn_ac\AC\video\rl-video-episode-0.mp4.
MoviePy - Writing video c:\Cart_Pole_dqn_ac\AC\video\rl-video-episode-0.mp4



                                                                       

MoviePy - Done !
MoviePy - video ready c:\Cart_Pole_dqn_ac\AC\video\rl-video-episode-0.mp4
Video recording complete. Check the 'video/' folder.


