In [21]:
import numpy as np
import torch

a=torch.ByteTensor([1,1,1,1]).cuda()
b=torch.zeros(4).cuda()
b[a]=torch.FloatTensor([56,6,7,8]).cuda()
b

tensor([56.,  6.,  7.,  8.], device='cuda:0')

In [None]:
import torch
import random
from torch import nn
from torch import optim
import torch.nn.functional as F
from collections import namedtuple

In [None]:
# proto-type of namedtuple 
# use it after assigning to variable
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

GAMMA = 0.99
NUM_EPISODES = 100000
BATCH_SIZE = 7
CAPACITY = 20

In [None]:
class ReplayMemory :
    
    def __init__(self, CAPACITY) :
        self.capacity = CAPACITY
        self.memory = []
        self.index = 0
        return
    
    def push(self, state, action, next_state, reward) :
        
        if len(self.memory) < self.capacity :
            self.memory.append(None)
        
        self.memory[self.index] = Transition(state, action, next_state, reward)
        self.index = (self.index + 1) % self.capacity
        
        return
    
    def sample(self, batch_size) :
        return random.sample(self.memory, batch_size)
    
    def __len__(self) :
        return len(self.memory)           

In [None]:
class Brain :
    
    def __init__(self, num_states, num_actions) :
        
        self.num_actions = num_actions
        self.memory = ReplayMemory(CAPACITY)
        
        # setting the Neural net. structure.
        self.model = nn.Sequential()
        self.model.add_module('fc1', nn.Linear(num_states, 512))
        self.model.add_module('relu1', nn.ReLU())
        self.model.add_module('fc2', nn.Linear(512, 512))
        self.model.add_module('relu2', nn.ReLU())
        self.model.add_module('fc3', nn.Linear(512, num_actions))
        
        # For using GPU
        self.model.cuda()
        
        # self.model.parameters() : self.model의 구성요소인 신경망의 모든 weight값 호출
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.0001)
        
        return
    
    def replay(self) :
        
        if len(self.memory) < BATCH_SIZE :
            return
        
        elif len(self.memory) >= BATCH_SIZE :
            transitions = self.memory.sample(BATCH_SIZE)
            
            # zip : pairing each elements of iterable in order
            batch = Transition(*zip(*transitions))

            # transformation data-type to tensor. (default dim=0)
            state_batch = torch.cat(batch.state)
            action_batch = torch.cat(batch.action)
            reward_batch = torch.cat(batch.reward)
            non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
            
            # get the model ready for evaluation from input to output mode.
            self.model.eval()
            
            # due to self.model.eval(), self.model(state_batch) evaluates output.
            # the input is 'state_batch'
            # self.model(state_batch).gather(1, action_batch) means that 
            # in dim=1, pick the elements corresponding to action_batch structure.
            # q(s, a)
            state_action_values = self.model(state_batch).gather(1, action_batch)
            
            # tuple(map(lambda s : s is not None, (2, 3, 1, None))) ==> (True, True, True, False)
            # non_final_mask is binary elemented(0,1) tensor(vector)
            non_final_mask = torch.ByteTensor(tuple(map(lambda s : s is not None, batch.next_state))).cuda()
             
            # using 'detach()', store max of next_q value of next_state_values tensor.
            next_state_values = torch.zeros(BATCH_SIZE).cuda() 
            next_state_values[non_final_mask] = self.model(non_final_next_states).max(1)[0].detach()
            
            # reward + gamma * max q(s', a') in tensor form.
            expected_state_action_values = reward_batch + GAMMA * next_state_values
            
            # get the model ready for train
            self.model.train()
            
            # 'F.smooth_l1_loss' means Huber function.
            loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1)).cuda()
            
            # Initialize the gradient
            self.optimizer.zero_grad()
            
            # Calculate back-propagation
            loss.backward()
            
            # Apply calculated back-propa to optimizer.
            self.optimizer.step()
            
            return
        
    def decide_action(self, state, episode) :
        
        epsilon = 0.5 * (1 / (episode + 1))
        
        if epsilon <= np.random.uniform(0, 1) :
            self.model.eval()
            with torch.no_grad() :
                action = self.model(state).max(1)[1].view(1, 1) # .view(1, 1) transform some tensor to size 1*1
        else :
            action = torch.LongTensor([[random.randrange(self.num_actions)]])
        
        return action.cuda()

In [None]:
class Agent :
    
    def __init__(self, num_states, num_actions) :
        self.brain = Brain(num_states, num_actions)
        return
    
    def update_q_function(self) :
        self.brain.replay()
        return
    
    def get_action(self, state, episode) :
        return self.brain.decide_action(state, episode)
    
    def memorize(self, state, action, next_state, reward) :
        self.brain.memory.push(state, action, next_state, reward)
        return

In [None]:
import DQN
from Environment import *

env = environment()
agent = DQN.Agent(69, 8)

for epi in range(NUM_EPISODES) :
    
    now_state = env.reset()
    done = False
    
    while not done :
        
        action = agent.get_action(now_state, epi)

        next_state, reward, done = env.step(action)
        
        agent.memorize(now_state, action, next_state, reward)
        
        agent.update_q_function()
        
        if done : 
            print('Episode :', epi)
            break
        
        now_state = next_state