In [1]:

import numpy as np
import matplotlib.pyplot as plt
import math
import torch
import matplotlib.pyplot as plt
from collections import namedtuple, deque
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
from itertools import count
from PIL import Image
import matplotlib

device = "cpu"

is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

np.random.seed(0)

In [2]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):

  def __init__(self, capacity):
    self.memory = deque([], maxlen = capacity)

  def push(self, *args):
    self.memory.append(Transition(*args))

  def sample(self, batch_size):
    return random.sample(self.memory, batch_size)

  def __len__(self):
    return len(self.memory)

In [3]:
class DQN2(nn.Module):

  def __init__(self):
    super(DQN2, self).__init__()
    self.net = nn.Sequential(
        nn.Linear(1, 16),
        nn.ReLU(),
        nn.Dropout(0.4),

        nn.Linear(16, 32),
        nn.ReLU(),
        nn.Dropout(0.4),

        nn.Linear(32, 64),
        nn.ReLU(),
        nn.Dropout(0.4),

        nn.Linear(64, 128),
        nn.ReLU(),
        nn.Dropout(0.4),

        nn.Linear(128, 64),
        nn.ReLU(),
        nn.Dropout(0.4),

        nn.Linear(64, 32),
        nn.ReLU(),
        nn.Dropout(0.4),

        nn.Linear(32, 16),
        nn.ReLU(),
        nn.Dropout(0.4),

        nn.Linear(16, 10),
        nn.Sigmoid()
    )

  def forward(self, x):
    x = self.net(x).to(device)
    return x

In [4]:
_# 연료와 스테이트 개수 정의
fuel = 299
goal = 63
# 중력
g = 9.81

In [5]:
# 각도 쎄타, v의 속도로 발사할 때 가는 거리
def distance(v, theta, wind_v):
  return (v*np.cos(theta) - wind_v) * 2 * v * np.sin(theta) / g

# 걸리는 시간
def time_cost(v, theta):
  return v * np.sin(theta) / g

In [6]:
# 액션별 바람 저항
def state_wind(v):
  if v == 5 or v == 6:
    wind_v = 0
  
  elif v == 7 or v == 8:
    wind_v = 1
  
  elif v == 9 or v == 10:
    wind_v = 2
  
  elif v == -5 or v == -6:
    wind_v = 0
  
  elif v == -7 or v == -8:
    wind_v = -1

  elif v == -9 or v == -10:
    wind_v = -2
  
  return wind_v

In [7]:
# 종료 계산
def is_terminate(fuel, next_state):
  if fuel < 0:
    print('lose')
    return True

  elif next_state > goal:
    print('lose')
    return True
  
  elif goal - 4 < next_state <= goal:
    print('win')
    return True
  
  else:
    return False


In [8]:
def reward(fuel, next_state):
    
  if fuel < 0 or next_state > goal :
    reward = -100
  
  elif goal - 1 < next_state <= goal:
    reward = 100
  
  else:
    reward = -1
  
  return reward

In [9]:
action = {'action1': (5, np.pi/6),
          'action2': (5, np.pi/3),
          'action3': (6, np.pi/6),
          'action4': (6, np.pi/3),
          'action5': (7, np.pi/6),
          'action6': (7, np.pi/3),
          'action7': (8, np.pi/6),
          'action8': (8, np.pi/3),
          'action9': (9, np.pi/6),
          'action10': (9, np.pi/3), 
          }

In [13]:
action_selection = ['action1','action2','action3','action4','action5','action6', 'action7', 'action8', 'action9', 'action10']
num_action = 10

batch_size = 256
gamma = 0.99

target_update = 3

n_actions = 10

current_s = torch.Tensor([0])

policy_net = DQN2().to(device)
target_net = DQN2().to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.SGD(policy_net.parameters(), lr = 1e-3)
memory = ReplayMemory(10000)

steps_done = 0

def select_action(state):
  with torch.no_grad():
    return policy_net(state).max(1)[1].view(1, 1)    



episode_durations = []

def plot_durations():
  plt.figure(2)
  plt.clf()
  durations_t = torch.tensor(episode_durations, dtype = torch.float)
  plt.title('Training...')
  plt.xlabel('Episode')
  plt.ylabel('Duration')
  plt.plot(durations_t.numpy())
  if len(durations_t) >= 100:
    means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
    means = torch.cat((torch.zeros(99), means))
    plt.plot(means.numpy())

  plt.pause(0.001) 
  if is_ipython:
        display.clear_output(wait=True)
        display.display(plt.gcf())  
        
# env 정의, (next_state, reward, done, {})을 반환
def env(current_location, current_action, fuel):
  v, theta = action[current_action]
  wind_v = state_wind(v)
  c_distance = distance(v, theta, wind_v)
  next_location = current_location + c_distance
  next_state = next_location
  c_reward = reward(fuel, next_state)
  done = is_terminate(fuel, next_state)
  environment = (next_state, c_reward, done, {})
  return environment

In [11]:
def optimizer_model():
  if len(memory) < batch_size:
    return  
  transitions = memory.sample(batch_size)
  batch = Transition(*zip(*transitions))

  non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device = device, dtype = torch.bool)
  non_final_next_states = torch.cat([s for s in batch.next_state if s is not None]).to(device)
  state_batch = torch.cat(batch.state).to(device)
  action_batch = torch.cat(batch.action)
  action_batch_long = torch.tensor(action_batch, dtype = torch.int64).to(device)
  reward_batch = torch.cat(batch.reward)
  
  state_action_values = policy_net(state_batch).gather(1, action_batch_long)

  next_state_values = torch.zeros(batch_size, device = device)
  next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
  expected_state_action_values = (next_state_values * gamma) + reward_batch

  criterion = nn.MSELoss()
  loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))
  total_loss.append(loss)

  optimizer.zero_grad()
  loss.backward()
  for param in policy_net.parameters():
    param.grad.data.clamp_(-1, 1)
  optimizer.step()

In [14]:
num_episodes = 100
total_loss = []

for i in range(num_episodes):
  location = 0
  old_state = torch.tensor([[0]])
  next_state_train = 0
  fuel = 299  
  for t in count():
    action_idx = int(select_action(torch.Tensor([[next_state_train]]).to(device)).item()) # ex) 5
    action_type = action_selection[action_idx] # ex) 'action4'
    v, theta = action[action_type]
    next_state_train, reward_c, done, _ = env(location, action_type, fuel)
    next_state_train = torch.Tensor([[next_state_train]])
    wind_v = state_wind(v)
    location = location + distance(v, theta, wind_v)
    reward_c = torch.tensor([[reward_c]], device = device)

    if not done:
      next_state_train = next_state_train
    
    else:
      next_state_train = None

    print(next_state_train)
    action_idx_t = torch.Tensor([[action_idx]])
    memory.push(old_state, action_idx_t, next_state_train, reward_c)

    old_state = next_state_train

    optimizer_model()
    if done:
      episode_durations.append(t + 1)
      plot_durations()
      break

  if i % target_update == 0:
    target_net.load_state_dict(policy_net.state_dict())
  
print('Complete')

<Figure size 432x288 with 0 Axes>

Complete


<Figure size 432x288 with 0 Axes>