In [1]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import numpy as np
import torch as torch
import torch.nn as nn
import random
from IPython.display import clear_output
from enum import Enum, auto
import yaml
from types import SimpleNamespace as SN

In [2]:
# get config of qmix algorithim 
with open('qmix.yaml', 'r') as f:
    qmix_config = yaml.load(f, Loader=yaml.FullLoader)
# get config of environment
with open('env.yaml', 'r') as f:
    env_config = yaml.load(f, Loader=yaml.FullLoader)

In [3]:
class Actions(Enum):
    NO_OP = 0
    MOVE_UP = auto()
    MOVE_DOWN = auto()
    MOVE_LEFT = auto()
    MOVE_RIGHT = auto()

    @property
    def delta(self):
        if self == self.NO_OP:
            return (0, 0)
        if self == self.MOVE_UP:
            return (-1, 0)
        if self == self.MOVE_DOWN:
            return (1, 0)
        if self == self.MOVE_LEFT:
            return (0, -1)
        if self == self.MOVE_RIGHT:
            return (0, 1)

    @property
    def one_hot(self):
        out_dim = len(Actions)
        y_one_hot = torch.zeros(out_dim)
        y_one_hot[self.value] = 1
        return y_one_hot.float()

In [351]:
class GridEnv():
    def __init__(self, rows, cols, n_agents):
        self.rows = rows
        self.cols = cols
        self.n_agents = n_agents

        self.state_shape = self.rows*self.cols
        self.obs_shape = (n_agents+2)*self.state_shape
        self.action_shape = len(Actions)
        self.agent_in_shape = self.obs_shape + self.action_shape

        self.block = None
        self.goal = None
        self.agent = None

        self.reset()
        
    def get_global_state(self):
        return torch.flatten(nn.functional.one_hot(torch.tensor([p[0]*self.cols+p[1] for p in (self.goal, self.block, self.agent.pos)]), self.rows*self.cols))
        
    def reset(self):
        positions = np.random.choice(self.rows*self.cols, 1+2, replace=False)

        self.goal = (positions[0]//self.cols, positions[0]%self.cols)
        self.block = (positions[1]//self.cols, positions[1]%self.cols)
        self.agent = Agent(self, (positions[2]//self.cols, positions[2]%self.cols))

    def get_reward(self):
        if list(self.block) == list(self.goal):
            return 0, True
        else:
            return -1, False

    def vizualize_grid(self): 
        grid = [list("."*self.cols) for _ in range(self.rows)]
        
        grid[self.goal[0]][self.goal[1]] = "G"
        grid[self.block[0]][self.block[1]]= "B"
        
        grid[self.agent.pos[0]][self.agent.pos[1]] = "A"
        
        return '\n'.join([' '.join(row) for row in grid])

    def __repr__(self):
        return str(self.vizualize_grid())

In [649]:
class AgentModel(nn.Module): 
    def __init__(self, input_shape, embed_dim, n_actions): # input shape is shape of a replay buffer
        super().__init__()
        self.embed_dim = embed_dim
        self.n_actions = n_actions
        self.linear1 = nn.Linear(input_shape, self.embed_dim) 
        self.rnn = nn.GRUCell(self.embed_dim, self.embed_dim)
        self.linear2 = nn.Linear(self.embed_dim, self.n_actions)

    def init_hidden(self):
        return self.linear1.weight.new(1, self.embed_dim).zero_()
    
    def forward(self, inputs, hidden_in): 
        x = nn.functional.relu(self.linear1(inputs)) 
        h_in = torch.reshape(hidden_in, (-1, self.embed_dim))
        hidden_out = self.rnn(x, h_in)
        q_values = self.linear2(hidden_out)
        return q_values, hidden_out

In [768]:
class Agent():
    def __init__(self, grid, pos):
        self.pos = pos
        self.grid = grid
        self.embed_dim = 64
        self.hidden_states = torch.zeros((self.embed_dim,)) 
        self.local_model = AgentModel(self.grid.agent_in_shape, self.embed_dim, len(Actions))
        self.prev_action = Actions.NO_OP
        self.epsilon = 0.1
        self.gamma = 0.01 

    def get_target_qvalue(self, reward, future_q_value):
        q_value = reward + self.gamma * future_q_value
        return q_value

    def take_action(self, action):
        future_position = self.pos + np.array(action.delta)
        future_position %= [self.grid.rows, self.grid.cols]
        
        if list(future_position) == list(self.grid.block):
            self.grid.block += np.array(action.delta)
            self.grid.block %= [self.grid.rows, self.grid.cols]
        else:
            self.pos = future_position
             
    def step(self):
        tstep_history = []
        terminated = False
        goal_reached = False

        current_state = self.grid.get_global_state()
        ep_prev_action = self.prev_action.one_hot
        
        agent_in = torch.unsqueeze(torch.cat((current_state, self.prev_action.one_hot)), 0)
        pred_qvalues, self.hidden_states = self.local_model(agent_in, self.hidden_states)
        pred_q_index = torch.argmax(pred_qvalues)

        if random.random() < self.epsilon:
            action = random.choice(list(Actions))
            pred_q_index = action.value
        else:
            action = Actions(int(pred_q_index))
            
        self.take_action(action)
        self.prev_action = action
        future_state = self.grid.get_global_state()
        future_in = torch.unsqueeze(torch.cat((future_state, self.prev_action.one_hot)), 0)

        reward, goal_reached = self.grid.get_reward()        
        future_qvalues, _  = self.local_model(future_in, self.hidden_states)
        future_q_index = torch.argmax(future_qvalues)
        
        target_qvalue = self.get_target_qvalue(reward, future_qvalues[0][future_q_index])
        target_qvalues = pred_qvalues
        target_qvalues[0][int(pred_q_index)] = target_qvalue

        # tstep_history.append((current_state, ep_prev_action, reward, future_state, terminated))
        tstep_history.append(("tstep"))
        return tstep_history, goal_reached

In [769]:
class ReplayBuffer():
    def __init__(self, batch_size, n_eps, n_tsteps):
        self.n_eps = n_eps
        self.n_tsteps = n_tsteps
        self.batch_size = batch_size
        self.eps_size = 0
        self.replay_buffer = [[[] for t in range(self.n_tsteps)] for eps in range(self.n_eps)]

    def add(self, episode, t, tstep_history):
        self.replay_buffer[episode][t].append(tstep_history)

    def can_sample(self): 
        return self.eps_size >= self.batch_size

    def least_filled_t(self, ids):
        least_filled = np.inf
        count = 0
        for id in ids:
            for tstep_history in x[id]:
                if tstep_history != []:
                   count += 1
            if least_filled > count:
                least_filled = count
            count = 0
        return least_filled
            
    def sample_batch(self): 
        assert self.can_sample()       
        if self.eps_size == self.batch_size:
            return self.replay_buffer[:self.batch_size][:self.least_filled_t(list(range(self.eps_size)))]   
        ids = np.random.choice(self.eps_size, size=self.batch_size, replace=False)
        return [self.replay_buffer[id][:self.least_filled_t(ids)] for id in ids] # returns random episode ids

In [778]:
class QLearner():
    def __init__(self, grid, batch_size, n_eps, n_tsteps):
        self.grid = grid
        self.n_tsteps = n_tsteps
        self.n_eps = n_eps
        self.batch_size = batch_size if batch_size < self.n_eps else self.n_eps
        self.n_batches = 1
        self.replay_buffer = ReplayBuffer(self.batch_size, self.n_eps, self.n_tsteps)   
        self.update_freq = 2
        self.target_model = AgentModel(self.grid.agent_in_shape, self.grid.agent.embed_dim, len(Actions))
        self.update_agent_model() # initialize agent and target model to be the same
        self.goal_reached_info = []
        self.alpha = 0.01
        self.loss = nn.HuberLoss()
        self.optimizer = torch.optim.Adam([{'params': self.target_model.parameters(), 'lr': self.alpha}])

    def episode(self, num_episode):
        tstep_history = []
        goal_reached = False
        vizualize = True
        t = 0
        self.grid.reset()
        while not goal_reached and t < self.n_tsteps: 
            tstep_history, goal_reached = self.grid.agent.step()
            self.replay_buffer.add(num_episode, t, tstep_history)
            t += 1 
            if goal_reached: 
                self.goal_reached_info.append(f"Goal reached at episode: {num_episode}, timestep: {t}")
                print("Goal Reached!")
            if vizualize:
                clear_output(wait=True)
                print(f"Epsiode: {num_episode} Timestep: {t}\n{self.grid.vizualize_grid()}")
        print("Episode Finished")

    def batch_generator(self):
        batches = []
        for i in range(self.n_batches):
            batches.append(self.replay_buffer.sample_batch())  
        return iter(batches)

    def update_agent_model(self):
       target_weights = self.target_model.state_dict()
       self.grid.agent.local_model.load_state_dict(target_weights)
        
    def run_episodes(self): 
        for episode in range(self.n_eps):
            self.episode(episode)  
            self.replay_buffer.eps_size += 1
            if episode % self.update_freq == 0:
                self.update_agent_model()
            if self.replay_buffer.can_sample():
                batch_generator = self.batch_generator()
                self.train(batch_generator)  
        
    def train(self, batch_generator):
        target_hidden_states = torch.zeros((self.grid.agent.embed_dim,)) 
        torch.autograd.set_detect_anomaly(True)
        for inputs, targets in batch_generator:  
            self.optimizer.zero_grad()  
            outputs, _ = self.target_model(inputs, target_hidden_states)  
            loss = self.loss(outputs, targets) 
            loss.backward() 
            self.optimizer.step() 

In [779]:
g = GridEnv(rows=6, cols=4, n_agents=1)
qlearner = QLearner(grid=g, batch_size=2, n_eps=10, n_tsteps=10)

In [780]:
gen = qlearner.run_episodes()

Epsiode: 1 Timestep: 10
. . . .
. A . .
. G . B
. . . .
. . . .
. . . .
Episode Finished


TypeError: linear(): argument 'input' (position 1) must be Tensor, not list