In [2]:
!pip install gym

Collecting gym
  Downloading gym-0.24.1.tar.gz (696 kB)
[K     |████████████████████████████████| 696 kB 12.9 MB/s eta 0:00:01
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h    Preparing wheel metadata ... [?25ldone
Collecting cloudpickle>=1.2.0
  Downloading cloudpickle-2.1.0-py3-none-any.whl (25 kB)
Collecting gym-notices>=0.0.4
  Downloading gym_notices-0.0.7-py3-none-any.whl (2.7 kB)
Building wheels for collected packages: gym
  Building wheel for gym (PEP 517) ... [?25ldone
[?25h  Created wheel for gym: filename=gym-0.24.1-py3-none-any.whl size=793137 sha256=1a6e0d518bd75f98673b92275f2f3f9ba94cfa1b1807d6b6a6a4aa0b5b439e2c
  Stored in directory: /home/hshwang/.cache/pip/wheels/5a/e9/0b/5536e77ed2edbbf067ecff287ec039633d40daee4d8dac7716
Successfully built gym
Installing collected packages: gym-notices, cloudpickle, gym
Successfully installed cloudpickle-2.1.0 gym-0.24.1 gym-notices-0.0.7


In [3]:
import gym
import numpy as np
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt

In [102]:
device = torch.device('cuda' if torch.cuda.is_available else 'cpu')

In [103]:
env = gym.make('CartPole-v1')
env._max_episode_steps=1000
print(f'에피소드 종료까지 최대 상한 스텝: {env._max_episode_steps}')

에피소드 종료까지 최대 상한 스텝: 1000


In [104]:
help(env.unwrapped)

Help on CartPoleEnv in module gym.envs.classic_control.cartpole object:

class CartPoleEnv(gym.core.Env)
 |  CartPoleEnv(*args, **kwds)
 |  
 |  ### Description
 |  
 |  This environment corresponds to the version of the cart-pole problem described by Barto, Sutton, and Anderson in
 |  ["Neuronlike Adaptive Elements That Can Solve Difficult Learning Control Problem"](https://ieeexplore.ieee.org/document/6313077).
 |  A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track.
 |  The pendulum is placed upright on the cart and the goal is to balance the pole by applying forces
 |   in the left and right direction on the cart.
 |  
 |  ### Action Space
 |  
 |  The action is a `ndarray` with shape `(1,)` which can take values `{0, 1}` indicating the direction
 |   of the fixed force the cart is pushed with.
 |  
 |  | Num | Action                 |
 |  |-----|------------------------|
 |  | 0   | Push cart to the left  |
 |  | 1   | Push cart to the righ

In [105]:
state = env.reset()
n_state = state.shape[0]
n_action = env.action_space.n
print(f'관찰의 차원: {n_state}')
print(f'취할 수 있는 행동: {n_action}')

관찰의 차원: 4
취할 수 있는 행동: 2


In [120]:
class NN_model(nn.Module):
    def __init__(self,input_dim=n_state,output_dim=n_action):
        super(NN_model,self).__init__()
        '''
        입력변수
            input_dim: state의 차원 -> cartpole [위치, 속도, 각도, 각속도]
            output_dim: action의 차원 -> cartpole [왼쪽, 오른쪽]
                        critic의 차원 -> 1
        N.N 구조
            4 layer구조 (2 hidden layer).
            hidden node개수는 64개로 통일.
            activation function은 Relu 설정
        '''
        self.lin1 = nn.Linear(input_dim,64)
        self.lin2 = nn.Linear(64,64)
        self.policy_out = nn.Linear(64,output_dim)
        self.value_out = nn.Linear(64,1)
        
    def forward(self,x):
        x = F.relu(self.lin1(x))
        x = F.relu(self.lin2(x))
        policy = self.policy_out(x)
        value = self.value_out(x).squeeze(dim=-1)
        return policy, value
    
    def get_action(self,x):
        if x.shape == 1:
            x = torch.FloatTensor(x[None]).to(device)
        else:
            x = torch.FloatTensor(x).to(device)
        policy,_ = self.forward(x)
        policy = F.softmax(policy,dim=-1).detach().cpu().numpy().ravel()
        actions = np.random.choice(n_action, p=policy) 
        return actions

In [130]:
def A2C_loss(state,action,rewards,next_state,done,gamma=0.99,agent=agent):
    '''
    목적: A2C loss계산후 agent 학습
        목적함수: -log(policy)*value + (value_infer-value_target)**2 + policy*log(policy)
            "-log(policy)*value": Actor-loss(exploitation)
            "policy*log(policy)": Actor-entropy(exploration)
            "(value_infer-value_target)**2": Critic-loss
    입력인자
        state: 상태 [1,4]
        action: 액션 [1,2]
        rewards: 보상 - TD step에 따라 step의 수가 정해져 있음 [TD_step]
        next_state: 다음상태 [1,4]
        done: 종료여부 
        gamma: discount factor(할인율)
        agent: agent
    출력인자
        total_loss: 모니터링을 위한 total_loss
    '''
    def Calculate_target(rewards,gamma):
        '''
        목표
            G_t = R_{t+1}+gamma*R_{t+2}+gamma**2+R_{t+3}+...
        '''
        returns = rewards[-1] 
        for reward in reversed(rewards[:-1]):
            returns = gamma*returns+reward
        return returns.view(-1,)
    
    states = torch.FloatTensor(state).to(device)
    actions = torch.LongTensor(action).to(device)
    rewards = torch.FloatTensor(rewards).to(device)
    next_states = torch.FloatTensor(next_state).to(device)
    dones = torch.tensor(done,dtype=torch.uint8).to(device)
    
    policy,value = agent(states)
    next_policy,next_value = agent(next_states)
    
    probs = F.softmax(policy,dim=-1)
    logprobs = F.log_softmax(policy,dim=-1)
    next_value[done] = 0.
    
    target_value = Calculate_target(rewards,gamma)
    advantage = target_value-value
    
    logp_actions = logprobs[np.arange(states.shape[0]),actions]
    
    entropy = -probs*logprobs
    actor_loss = -(logp_actions*advantage.detach()).mean()-0.001*entropy.mean()
    critic_loss = F.mse_loss(target_value.detach(),value)
    total_loss = actor_loss + critic_loss
    
    return total_loss

In [None]:
def play_or_train_agent(env,agent,optimizer,train=True,TD_step=0):
    '''
    목표: agent를 환경에 맞게 train할것인지 단순 play할것인지 구성
    입력인자
        env: environment(CartPole)
        agent: agnet
        train: 학습여부(True: 학습, False: play)
        TD_step: TemporalDifference(0) -> MonteCarlo(-1)
    출력인자
        total_reward: 
        loss: 모니터링을 위한 TD error의 추이
    '''
    
    total_reward = 0
    done = False
    state = env.reset()
    
    while True:    
        action = agent.get_action(state)#[0]

        states, actions, rewards, new_states, dones = [],[],[],[],[]
        # action
        if TD_step != -1:
            for it in range(TD_step+1):
                new_state,reward,done,_ = env.step(action)
                actions.append(action)
                rewards.append(reward)
                new_states.append(new_state)
                dones.append(done)
                state = new_state.copy()
                if done:
                    break
                total_reward += reward
            states.append(state)
        else:
            while not done:
                new_state,reward,done,_ = env.step(action)  
                actions.append(action)
                rewards.append(reward)
                new_states.append(new_state)
                dones.append(done)
                
                total_reward += reward
                state = new_state.copy()
             states.append(state)    
        if train:
            optimizer.zero_grad()
            loss = A2C_loss(states,actions,rewards,new_states,dones,agent)
            loss.backward()
            optimizer.step()
    if train:
        return total_reward, loss.item()
    else:
        return None

In [None]:
from tqdm import trange
from IPython.display import clear_output

agent = NN_model().to(device)
optimizer = optim.Adam(agent.parameters(),lr=1e-04)

episode_rewards = []
max_episodes = 100
mini_sessions = 50

for episode in trange(max_episodes):
    mini_reward = []
    for mini_session in range(mini_sessions):
        episode_reward, total_loss = play_or_train_agent(env,agent=agent,optimizer=optimizer,train=True)
        mini_reward.append(episode_reward)
    
    episode_rewards.append(np.mean(mini_reward))
    A2C_losses.append(np.mean(mini_td))
    
    clear_output(True)
    print("Episode",episode)
    
    plt.figure(figsize=[12, 10])
    plt.title("Total reward per each episode")
    plt.plot(episode_rewards)
    plt.grid()
    plt.show()
    
    if np.mean(mini_reward) >= 500:
        print(f"Agent finds solution! Final score : {np.mean(mini_reward)}")
        break