In [2]:
import random
import torch
from torch import nn
from torch.distributions import Categorical
import torch.nn.functional as F
import numpy as np
from collections import deque
import gym

In [3]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [4]:
class TacoYaki():
    def __init__(self):
        self.size = 4
        self.board = np.ones(self.size*self.size)
        self.actions = {}
        self.action_space()
        self.encoded_actions = np.eye(self.size*self.size)
        
    def reset(self):
        self.board = np.ones(self.size*self.size)
    
    def sample_game(self, shuffles):
        self.reset()
        next_state = self.board.copy()
        action = random.choice(list(self.actions.keys()))
        state, _, _, _ = self.step(action)
        replay_buffer = []
        done, score, reward = True, 0, 0
        for i in range(random.randint(4, shuffles)):
            replay_buffer.append((state, action, next_state, score, done, reward))
            next_state = state
            action = random.choice(list(self.actions.keys()))
            state, reward, done, _ = self.step(action)
            score += reward
        replay_buffer.append((state, action, next_state, score, done, reward))
        return self.board, list(reversed(replay_buffer))
        
    def step(self, action):
        x, y = self.action_to_coordinate(action)
        self.board[action] = not self.board[action]
        if x - 1 >= 0:
            index = self.size * (x - 1) + y
            self.board[index] = not self.board[index]
        if y - 1 >= 0:
            index = self.size * x + y - 1
            self.board[index] = not self.board[index]
        if x + 1 < self.size:
            index = self.size * (x + 1) + y
            self.board[index] = not self.board[index]
        if y + 1 < self.size:
            index = self.size * x + y + 1
            self.board[index] = not self.board[index]
        done, reward = self.complete()
        return self.board.copy(), reward, done, 0    
            
    def show_board(self):
        print("----")
        for i in range(self.size):
            for j in range(self.size):
                print(self.board[self.size * i + j], end="\t")
            print()
            
    def action_to_coordinate(self, action):
        if action <= 15 and action >= 0:
            return self.actions[action]
        else:
            print("Actions is not present")
            return -1
            
    def action_space(self):
        for i in range(self.size):
            for j in range(self.size):
                self.actions[self.size*i + j] = (i, j)
                
    def complete(self):
        for cell in self.board:
            if cell == False:
                return False, -1
        return True, 0
    
    def set_state(self, state):
        self.board = state.copy()

In [5]:
env = TacoYaki()
state, replay_buffer = env.sample_game(50)
env.show_board()
for event in replay_buffer:
    state, action, next_state, score, done, reward = event
    env.step(action)
# print(torch.argmax(actor.act(state)).item(), actor.act(state))
env.show_board()

----
1.0	1.0	1.0	1.0	
1.0	0.0	1.0	0.0	
1.0	1.0	1.0	1.0	
1.0	0.0	1.0	1.0	
----
1.0	1.0	1.0	1.0	
1.0	1.0	1.0	1.0	
1.0	1.0	1.0	1.0	
1.0	1.0	1.0	1.0	


In [13]:
class NeuralNet(nn.Module):
    def __init__(self, input_dims, hidden_dims, output_dims):
        super(NeuralNet, self).__init__()

        """ CODE HERE:
                Implement the neural network here
        """
        self.fc1 = nn.Linear(input_dims, hidden_dims)
        self.fc2 = nn.Linear(hidden_dims, hidden_dims)
        self.fc3 = nn.Linear(hidden_dims, output_dims)

    def forward(self, x):
        """ CODE HERE:
                Implement the forward propagation
        """
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

In [73]:
class Agent:
    def __init__(self, gamma, epsilon, lr, input_dims, batch_size, n_actions,
                 max_mem_size=100000, eps_end=0.01, eps_dec=5e-4):
        self.gamma = gamma
        self.epsilon = epsilon
        self.eps_min = eps_end
        self.eps_dec = eps_dec
        self.lr = lr
        self.action_space = [i for i in range(n_actions)]
        self.mem_size = max_mem_size
        self.batch_size = batch_size
        self.mem_cntr = 0
        self.iter_cntr = 0
        self.replace_target = 100

        self.Q_eval = NeuralNet(input_dims, 256, n_actions).to(device)
        self.optimizer = torch.optim.Adam(self.Q_eval.parameters(), lr=self.lr)
        self.loss = nn.MSELoss()
        
        self.state_memory = np.zeros((self.mem_size, input_dims),
                                     dtype=np.float32)
        self.new_state_memory = np.zeros((self.mem_size, input_dims),
                                         dtype=np.float32)
        self.action_memory = np.zeros(self.mem_size, dtype=np.int32)
        self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)
        self.terminal_memory = np.zeros(self.mem_size, dtype=np.bool)

    def store_transition(self, state, action, reward, state_, terminal):
        index = self.mem_cntr % self.mem_size
        self.state_memory[index] = state
        self.new_state_memory[index] = state_
        self.reward_memory[index] = reward
        self.action_memory[index] = action
        self.terminal_memory[index] = terminal

        self.mem_cntr += 1

    def get_action(self, state):
        if np.random.random() < self.epsilon:  # with probability eps, the agent selects a random action
            action = np.random.choice(self.action_space)
            return action
        else:  # with probability 1 - eps, the agent selects a greedy policy
            with torch.no_grad():
                state_tensor = torch.from_numpy(state).float().unsqueeze(0).to(device)
                q_values = self.Q_eval(state_tensor)
                action = torch.argmax(q_values)
            return action.item()

    def learn(self):
        if self.mem_cntr < self.batch_size:
            return

        self.optimizer.zero_grad()

        max_mem = min(self.mem_cntr, self.mem_size)

        batch = np.random.choice(max_mem, self.batch_size, replace=False)
        batch_index = np.arange(self.batch_size, dtype=np.int32)

        state_batch = torch.tensor(self.state_memory[batch]).to(device)
        new_state_batch = torch.tensor(
                self.new_state_memory[batch]).to(device)
        action_batch = self.action_memory[batch]
        reward_batch = torch.tensor(
                self.reward_memory[batch]).to(device)
        terminal_batch = torch.tensor(
                self.terminal_memory[batch]).to(device)

        q_eval = self.Q_eval.forward(state_batch)[batch_index, action_batch]
        q_next = self.Q_eval.forward(new_state_batch)
        q_next[terminal_batch] = 0.0

        q_target = reward_batch + self.gamma*torch.max(q_next, dim=1)[0]

        loss = self.loss(q_target, q_eval)
        loss.backward()
        self.optimizer.step()

        self.iter_cntr += 1
        self.epsilon = self.epsilon - self.eps_dec \
            if self.epsilon > self.eps_min else self.eps_min

In [None]:
env = gym.make("LunarLander-v2")
agent = Agent(gamma = 0.99, epsilon = 1.0, lr = 0.003, input_dims=8, batch_size = 64, n_actions= 4)

scores , eps_history = deque(maxlen = 100), deque(maxlen = 100)
n_games = 5000
for i in range(n_games):
    score = 0
    done = False
    observation = env.reset()
    while not done:
        action = agent.get_action(observation)
        observation_, reward, done, info = env.step(action)
        score += reward
        
        agent.store_transition(observation,  action, reward, observation_, done)
        agent.learn()
        observation = observation_
    scores.append(score)
    eps_history.append(agent.epsilon)
        
    avg_score = np.mean(scores)
        
    print('episode', i, 'avg score:', avg_score, 'epsilon:', agent.epsilon)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  self.terminal_memory = np.zeros(self.mem_size, dtype=np.bool)


episode 0 avg score: -129.68257655489668 epsilon: 0.9980000000000002
episode 1 avg score: -92.9987382157642 epsilon: 0.9645000000000039
episode 2 avg score: -133.31439359162002 epsilon: 0.9265000000000081
episode 3 avg score: -121.15118697313645 epsilon: 0.8870000000000124
episode 4 avg score: -114.66236917440946 epsilon: 0.8435000000000172
episode 5 avg score: -107.91646441191646 epsilon: 0.800000000000022
episode 6 avg score: -107.59517768870307 epsilon: 0.7510000000000274
episode 7 avg score: -141.8807530253879 epsilon: 0.6905000000000341
episode 8 avg score: -136.5952044299328 epsilon: 0.628000000000041
episode 9 avg score: -134.6780522549261 epsilon: 0.5650000000000479
episode 10 avg score: -123.94922716135926 epsilon: 0.49350000000005506
episode 11 avg score: -158.32210283317215 epsilon: 0.3110000000000549
episode 12 avg score: -144.4584248645459 epsilon: 0.24100000000005484
episode 13 avg score: -131.84444251947406 epsilon: 0.14850000000005475
episode 14 avg score: -104.75230049

In [30]:
env = TacoYaki()


mem_size = 40000
learning_rate= 0.001
solved_score = 300
gamma = 0.99
batch_size = 10000
episodes = 10000
shuffles = 30

action_num = 16
hidden_dim = 265
observation_dim = 16

CELoss = nn.CrossEntropyLoss()

target_net = NeuralNet(observation_dim, hidden_dim, action_num).to(device)

# behavior_net.load_state_dict(target_net.state_dict())

optimizer= torch.optim.Adam(target_net.parameters(), lr=learning_rate)

In [31]:
# implementation of Deep value Iteration
recent_losses = deque(maxlen = 100)
testing = []

buffers = []
for ep in range(episodes):
    state, replay_buffer = env.sample_game(5)
    mean_loss = 100
    buffers.append(replay_buffer)
    steps = 0
    while mean_loss > 1:
        state, action, next_state, score, done, reward = random.choice(replay_buffer)
        state_tensor = torch.from_numpy(state).float().unsqueeze(0).to(device)
        testing.append(state)
        pred_actions = target_net.forward(state_tensor)
        target = torch.from_numpy(env.encoded_actions[action]).reshape(1,-1).float().to(device)
#         print(pred_actions, target)
        loss = CELoss(pred_actions, target)
        
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if steps == 0:
            print(loss.item())
        
        recent_losses.append(loss.item())
        steps += 1
        
        if (steps+1) % 1000 == 0:
            mean_loss = np.array(recent_losses).mean()
            print("mean step loss in last 1000 steps ",mean_loss)
    print("\n" + "New episode " + str(ep+1))

# for buffer in buffers:
#     print()
#     for event in buffer:
#         state, action, next_state, score, done, reward = event
#         act= torch.argmax(target_net(state))
#         print(action, act)

2.7840230464935303
mean step loss in last 1000 steps  0.0005283898013294674

New episode 1
5.7897162437438965
mean step loss in last 1000 steps  0.0004430137446615845

New episode 2
11.006105422973633


KeyboardInterrupt: 