# Function Approximation
 
Daeshik Kang

In [None]:
import gym      
import snake_gym
import collections
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

#Hyperparameters
learning_rate = 0.02
gamma         = 0.99
buffer_limit  = 20000
batch_size    = 64

class ReplayBuffer():
    def __init__(self):
        self.buffer = collections.deque(maxlen=buffer_limit)
    
    def put(self, data):
        self.buffer.append(data)
    
    def sample(self, n):
        mini_batch = random.sample(self.buffer, n)
        s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
        
        for transition in mini_batch:
            s, a, r, s_prime, done_mask = transition
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            done_mask_lst.append([done_mask])

        return torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
               torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
               torch.tensor(done_mask_lst)
    
    def size(self):
        return len(self.buffer)

class Qnet(nn.Module):
    def __init__(self):
        super(Qnet, self).__init__()
        self.Input = nn.Linear(100, 256)
        self.Hidden1 = nn.Linear(256, 256)
        self.Hidden2 = nn.Linear(256, 256)
        self.Output = nn.Linear(256, 4)

    def forward(self, x):
        x = F.relu(self.Input(x))
        x = F.relu(self.Hidden1(x))
        x = F.relu(self.Hidden2(x))
        x = self.Output(x)
        return x
      
    def sample_action(self, obs, epsilon):
        obs = torch.reshape(obs, (-1,))
        out = self.forward(obs)
        coin = random.random()
        if coin < epsilon:
            return random.randint(0,3)
        else : 
            return out.argmax().item()
            
def train(q, q_target, memory, optimizer):
    for i in range(10):
        s,a,r,s_prime,done_mask = memory.sample(batch_size)
        s = torch.reshape(s, (batch_size, 10*10))
        s_prime = torch.reshape(s_prime, (batch_size, 10*10))
        q_out = q(s)
        #print(s.shape, q_out.shape)
        q_a = q_out.gather(1,a)
        max_q_prime = q_target(torch.reshape(s_prime, (batch_size, 100))).max(1)[0].unsqueeze(1)
        target = r + gamma * max_q_prime * done_mask
        loss = F.smooth_l1_loss(q_a, target)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

def main():
    env = gym.make('Snake-v0')
    print(type(env))
    q = Qnet()
    q_target = Qnet()
    q_target.load_state_dict(q.state_dict())
    memory = ReplayBuffer()

    print_interval = 50
    score = 0.0  
    optimizer = optim.Adam(q.parameters(), lr=learning_rate)
    render = False

    for n_epi in range(10000):
        epsilon = max(0.01, 0.08 - 0.01*(n_epi/500)) #Linear annealing from 8% to 1%
        s = env.reset()
        done = False
        #print(f'Ep{n_epi+1:4d}')
        num_step = 1
        while not done:
            a = q.sample_action(torch.from_numpy(s).float(), epsilon)      
            #print(a)
            s_prime, r, done, info = env.step(a)
            '''
            print(f'{r} ', end='')
            if num_step%10 == 0:
                print()
            '''
            done_mask = 0.0 if done else 1.0
            memory.put((s,a,r,s_prime, done_mask))
            s = s_prime

            score += r
            
            if render:
                env.render()
            num_step = num_step + 1
            if done:
                #print()
                break
        
        
        if score/print_interval > 2:
            render = True
            
        if memory.size()>100:
            train(q, q_target, memory, optimizer)

        if n_epi%print_interval==0 and n_epi!=0:
            q_target.load_state_dict(q.state_dict())
            print("n_episode :{}, score : {:.1f}, n_buffer : {}, eps : {:.1f}%".format(
                                                            n_epi, score/print_interval, memory.size(), epsilon*100))
            score = 0.0
    env.close()

if __name__ == '__main__':
    main()

<class 'snake_gym.envs.snake_env.SnakeEnv'>


In [None]:
import gym      
import snake_gym
import collections
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

#Hyperparameters
learning_rate = 0.0005
gamma         = 0.98
buffer_limit  = 10000
batch_size    = 50

class ReplayBuffer():
    def __init__(self):
        self.buffer = collections.deque(maxlen=buffer_limit)
    
    def put(self, data):
        self.buffer.append(data)
    
    def sample(self, n):
        mini_batch = random.sample(self.buffer, n)
        s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
        
        for transition in mini_batch:
            s, a, r, s_prime, done_mask = transition
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            done_mask_lst.append([done_mask])

        return torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
               torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
               torch.tensor(done_mask_lst)
    
    def size(self):
        return len(self.buffer)

class Qnet(nn.Module):
    def __init__(self):
        super(Qnet, self).__init__()
        self.fc1 = nn.Linear(100, 150)
        self.fc2 = nn.Linear(150, 75)
        self.fc3 = nn.Linear(75, 4)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x
      
    def sample_action(self, obs, epsilon):
        obs = torch.reshape(obs, (-1,))
        out = self.forward(obs)
        coin = random.random()
        if coin < epsilon:
            return random.randint(0, 3)
        else : 
            return out.argmax().item()
            
def train(q, q_target, memory, optimizer):
    for i in range(10):
        s,a,r,s_prime,done_mask = memory.sample(batch_size)
        s = torch.reshape(s, (batch_size, 10*10))
        s_prime = torch.reshape(s_prime, (batch_size, 10*10))
        q_out = q(s)
        q_a = q_out.gather(1,a)
        max_q_prime = q_target(s_prime).max(1)[0].unsqueeze(1)
        target = r + gamma * max_q_prime * done_mask
        loss = F.smooth_l1_loss(q_a, target)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

def main():
    env = gym.make('Snake-v0')
    env.energy_consum = True
    
    q = Qnet()
    q_target = Qnet()
    q_target.load_state_dict(q.state_dict())
    memory = ReplayBuffer()

    print_interval = 20
    score = 0.0  
    optimizer = optim.Adam(q.parameters(), lr=learning_rate)
    render = False

    for n_epi in range(10000):
        env.seed(1)
        epsilon = max(0.01, 0.1 - 0.01*(n_epi/200)) #Linear annealing from 8% to 1%
        s = env.reset()
        done = False
        #print('New Episode!')
        #print()
        while not done:
            a = q.sample_action(torch.from_numpy(s).float(), epsilon)      
            #print(a)
            s_prime, r, done, info = env.step(a)
            #print(r, end='')
            done_mask = 0.0 if done else 1.0
            memory.put((s,a,r,s_prime, done_mask))
            s = s_prime

            score += r
            
            if render:
                env.render()
                
            if done:
                break
            
        if score/print_interval > 50:
            render = True
            
        if memory.size()>1000:
            train(q, q_target, memory, optimizer)

        if n_epi%print_interval==0 and n_epi!=0:
            q_target.load_state_dict(q.state_dict())
            print("n_episode :{}, score : {:.1f}, n_buffer : {}, eps : {:.1f}%".format(
                                                            n_epi, score/print_interval, memory.size(), epsilon*100))
            score = 0.0
    env.close()

if __name__ == '__main__':
    main()

n_episode :20, score : 7.8, n_buffer : 233, eps : 9.9%
n_episode :40, score : 8.4, n_buffer : 467, eps : 9.8%
n_episode :60, score : 8.9, n_buffer : 707, eps : 9.7%
n_episode :80, score : 7.9, n_buffer : 924, eps : 9.6%
n_episode :100, score : 5.3, n_buffer : 1338, eps : 9.5%
n_episode :120, score : 3.3, n_buffer : 2784, eps : 9.4%
n_episode :140, score : 9.4, n_buffer : 4055, eps : 9.3%
n_episode :160, score : 9.7, n_buffer : 4598, eps : 9.2%
n_episode :180, score : 16.9, n_buffer : 4864, eps : 9.1%
n_episode :200, score : 16.8, n_buffer : 5250, eps : 9.0%
n_episode :220, score : 17.3, n_buffer : 5729, eps : 8.9%
n_episode :240, score : 19.1, n_buffer : 6548, eps : 8.8%
n_episode :260, score : 19.4, n_buffer : 7775, eps : 8.7%
n_episode :280, score : 15.9, n_buffer : 9022, eps : 8.6%
n_episode :300, score : 15.5, n_buffer : 10000, eps : 8.5%
n_episode :320, score : 16.2, n_buffer : 10000, eps : 8.4%
n_episode :340, score : 22.9, n_buffer : 10000, eps : 8.3%
n_episode :360, score : 17.