## CartPlole Example

### 1. Random Walk

In [1]:
import gym
from PIL import Image
env = gym.make('CartPole-v0')
state = env.reset()
done = False
frames_random = []
i = 0
for _ in range(200):
    img = env.render(mode='rgb_array')
    img = Image.fromarray(img)
    frames_random.append(img)
    action = env.action_space.sample()
    next_state, reward, done, _ = env.step(action)
    state = next_state
    i = i+1
print(i)
frames_random[0].save('CartPole_random.gif', format='GIF', append_images=frames_random[1:], save_all=True, duration=0.0001)
print("save picture -- CartPole_random.gif")
env.close()



200


### 2. DQN reinforce learning

reference :
1. https://gym.openai.com/evaluations/eval_EIcM1ZBnQW2LBaFN6FY65g/
2. https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html

In [1]:
import gym
from torch import nn, optim
import torch.nn.functional as F
import torch
from collections import deque
import numpy as np
import math
import random
from collections import namedtuple
from PIL import Image, ImageDraw
import os

In [2]:
class myDQN(nn.Module):

    def __init__(self, layers, class_num, device):
        super(myDQN, self).__init__()
        input_dim = 4
        linear_layers = nn.ModuleList()
        for dim in layers:
            linear_layers.append(nn.Linear(input_dim, dim).to(device))
            linear_layers.append(nn.LeakyReLU())
            input_dim = dim
        linear_layers.append(nn.Linear(input_dim, class_num).to(device))
        self.linear_layers = linear_layers

    def forward(self, x):
        for layer in self.linear_layers:
            x = layer(x)
        return x


class memoryDataset(object):
    def __init__(self, maxlen, device):
        self.memory = deque(maxlen=maxlen)
        self.subset = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))
        self.device = device

    def push(self, state, action, next_state, reward, done):
        state = torch.tensor(state, dtype=torch.float).to(self.device)
        action = torch.tensor([action], dtype=torch.long).to(self.device)
        reward = torch.tensor(reward, dtype=torch.float).to(self.device)
        next_state = torch.tensor(next_state, dtype=torch.float).to(self.device)
        ##False:0, True:1
        done = torch.tensor([done], dtype=torch.long).to(self.device)
        self.memory.append(self.subset(state, action, reward, next_state, done))

    def __len__(self):
        return len(self.memory)

    def sample(self, batch_size):
        batch = random.sample(self.memory, min(len(self.memory), batch_size))
        batch = self.subset(*zip(*batch))
        return batch

In [3]:
class DQNCartPoleSolver():
    def __init__(self, n_episodes=10000, n_win_ticks=195, max_env_steps=None, gamma=1.0, epsilon=1.0, epsilon_min=0.01,
                 epsilon_log_decay=0.995, alpha=0.01, alpha_decay=0.01, batch_size=64, monitor=True, quiet=False, device='cpu', pretrained=None, rendering=False):
        self.device = device
        self.memory = memoryDataset(maxlen=100000, device=device)
        self.env = gym.make('CartPole-v0')
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_log_decay
        self.alpha = alpha
        self.alpha_decay = alpha_decay
        self.n_episodes = n_episodes
        self.n_win_ticks = n_win_ticks
        self.batch_size = batch_size
        self.quiet = quiet
        self.class_num = 2
        self.layers = [4, 4, 4]
        self.model = myDQN(self.layers, self.class_num, self.device)
        self.optimizer = optim.Adam(params=self.model.parameters(), lr=alpha, weight_decay=alpha_decay)
        if max_env_steps is not None: self.env._max_episode_steps = max_env_steps
        save_folder = os.path.join(os.getcwd(), 'model')
        if not os.path.isdir(save_folder):
            os.mkdir(save_folder)
        self.save_path = os.path.join(save_folder, 'model.pkl')


    def choose_action(self, state, epsilon=None):
        if epsilon is not None and np.random.random() <= epsilon:
            return self.env.action_space.sample()
        else:
            with torch.no_grad():
                state = torch.tensor([state], dtype=torch.float).to(self.device)
                action = self.model(state) if self.device =='cpu' else self.model(state).cpu()
                return int(action.max(dim=1).indices.numpy())

    def get_epsilon(self, t):
        return max(self.epsilon_min, min(self.epsilon, 1.0 - math.log10((t + 1) * self.epsilon_decay)))

    def replay(self, batch_size):
        batch = self.memory.sample(batch_size)
        state = torch.stack(batch.state)
        action = torch.stack(batch.action)
        next_state = torch.stack(batch.next_state)
        reward = torch.stack(batch.reward)
        done = torch.stack(batch.done)
        with torch.no_grad():
            next_state_action_values = self.model(next_state)
        next_state_value = torch.max(next_state_action_values, dim=1).values.view(-1, 1)
        reward = reward.view(-1, 1)
        target_state_value = torch.stack([reward + (self.gamma * next_state_value), reward], dim=1).squeeze().gather(1, done)

        self.optimizer.zero_grad()
        state_action_values = self.model(state).gather(1, action)
        loss = F.mse_loss(state_action_values, target_state_value)
        loss.backward()
        self.optimizer.step()
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def run(self):
        scores = deque(maxlen=100)
        for episode in range(self.n_episodes):
            state = self.env.reset()
            done = False
            i = 0
            while not done:
                action = self.choose_action(state, self.get_epsilon(episode))
                next_state, reward, done, _ = self.env.step(action)
                self.memory.push(state, action, reward, next_state, done)
                state = next_state
                i += 1

            scores.append(i)
            mean_score = np.mean(scores) ##최근 100개가 버티는 시간의 Mean이 조건을 만족하면 멈춤..
            if mean_score >= self.n_win_ticks and episode >= 100:
                if not self.quiet: print('{} episodes. Solved after {} trials '.format(episode, episode - 100))
                SAVE_PATH = '/model/model.pkl'
                self.save_model()

                return episode - 100
            if episode % 100 == 0 and not self.quiet:
                print('[Episode {}] - Mean survival time over last 100 episodes was {} ticks. epsilon : {}'.format(episode, mean_score, self.epsilon))

            self.replay(self.batch_size)

        if not self.quiet: print('Did not solve after {} episodes'.format(episode))
        return episode

    def save_model(self):
        param_groups= {}
        param_groups['model_state_dict'] = self.model.state_dict()
        param_groups['optimizer_state_dict'] = self.optimizer.state_dict()
        param_groups['layers'] = self.layers
        param_groups['class_num'] = self.class_num
        param_groups['gamma'] = self.gamma
        param_groups['epsilon'] = self.epsilon
        param_groups['alpha'] = self.alpha
        param_groups['alpha_decay'] = self.alpha_decay
        param_groups['batch_size'] = self.batch_size
        torch.save(param_groups, self.save_path)

    def load_model(self):
        checkpoint = torch.load(self.save_path)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

    def render_policy_net(self):
        self.load_model()
        #self.env = gym.wrappers.Monitor(self.env, os.path.join(os.getcwd(), 'model','cartpole-1'), force=True)
        state = self.env.reset()
        done = False
        frames = []
        raw_frames = []
        i = 0
        while not done:
            img = self.env.render(mode='rgb_array')
            raw_frames.append(img)
            img = Image.fromarray(img)
            frames.append(img)
            action = self.choose_action(state)
            next_state, reward, done, _ = self.env.step(action)
            state = next_state
            i = i+1
        print(i)
        self.env.close()
        frames[0].save('CartPole_result.gif', format='GIF', append_images=frames[1:], save_all=True, duration=0.0001)
        print("save picture -- CartPole_result.gif")
        return frames, raw_frames

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
agent = DQNCartPoleSolver(device=device)
#agent.run()
frames, raw_frames = agent.render_policy_net()

200
