# Help classes

## Location

In [1]:
import numpy as np


class Location:
    def __init__(self, x, y):
        self.x = x
        self.y = y
        
    def toNumpy(self):
        return np.array([self.y, self.x], dtype=int)

    def dist(self, other):
        return np.linalg.norm(self.toNumpy() - other.toNumpy(), ord=1)
    
    def __eq__(self, other):
        return self.x == other.x and self.y == other.y
    
    def __lt__(self, other):
        return self.x < other.x and self.y < other.y
    
    def __repr__(self):
        return '(' + str(self.x) + ', ' + str(self.y) + ')'

# Networks

In [2]:
import torch


#DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
DEVICE = torch.device('cuda:0')

## FC

In [3]:
class QNet(torch.nn.Module):
    def __init__(self, states_dim, actions_dim, n_neurons=128):
        super(QNet, self).__init__()
        self.fc1 = torch.nn.Linear(states_dim, n_neurons)
        self.act1 = torch.nn.ReLU()
        self.fc2 = torch.nn.Linear(n_neurons, n_neurons)
        self.act2 = torch.nn.ReLU()
        self.fc3 = torch.nn.Linear(n_neurons, actions_dim)
        
    def forward(self, x):
        x = self.fc1(x)
        x = self.act1(x)
        x = self.fc2(x)
        x = self.act2(x)
        x = self.fc3(x)
        return x

## Conv

In [4]:
class QConv(torch.nn.Module):
    def __init__(self, size, actions_dim):
        super(QConv, self).__init__()
        self.conv1 = torch.nn.Conv2d(in_channels=1, out_channels=6, kernel_size=3, padding=2)
        self.actv1 = torch.nn.ReLU()
        self.pool1 = torch.nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.conv2 = torch.nn.Conv2d(in_channels=6, out_channels=16, kernel_size=3, padding=0)
        self.actv2 = torch.nn.ReLU()
        self.pool2 = torch.nn.MaxPool2d(kernel_size=2, stride=2)

        n = ((((size - 3 + 2 * 2) + 1)//2 - 3) + 1) // 2
        self.fc1 = torch.nn.Linear(n * n * 16, 128)
        self.actv3 = torch.nn.ReLU()
        
        self.fc2 = torch.nn.Linear(128, 32)
        self.actv4 = torch.nn.ReLU()
        
        self.fc3 = torch.nn.Linear(32, actions_dim)
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.actv1(x)
        x = self.pool1(x)
                
        x = self.conv2(x)
        x = self.actv2(x)
        x = self.pool2(x)
        
        x = x.view(x.size(0) * x.size(1) * x.size(2))
        x = self.fc1(x)
        x = self.actv3(x)
        x = self.fc2(x)
        x = self.actv4(x)
        x = self.fc3(x)
        return x

# GYM

## Agents

### Classic

In [5]:
from pandas.core.arrays.sparse import dtype
class Agent:
    def __init__(self, size):
        self.area_size = size
        self.location = Location(0, 0)
        self.purpose = 0
        self.attack = 0.1
    
    def reset(self):
        self.location = Location(0, 0)
        self.purpose = 0
        self.attack = 0.1

    def get_state(self):
        state = self.location.toNumpy()
        state = np.append(state, [self.purpose]).astype(int)
        return state

### Table

In [6]:
class TableAgent(Agent):
    def __init__(self, size):
        super().__init__(size)
        self.q_table = np.zeros((size, size, 2, size, size, 5))
        self.alpha = 0.1
        self.gamma = 0.6
        self.epsilon = 0.2

    def get_action(self, state, epsilon=-1):
        if np.random.uniform(0, 1) < epsilon:
            return np.random.randint(5)
        return np.argmax(self.q_table[state[0], state[1], state[2], 
                                      state[3], state[4]])

    def update(self, state, action, reward, next_state):
        old_value = self.q_table[state[0], state[1], state[2], state[3], 
                                 state[4], action]
        next_max = np.max(self.q_table[next_state[0], next_state[1], next_state[2], 
                                       next_state[3], next_state[4]])
        new_value = (1 - self.alpha) * old_value + self.alpha * (reward + self.gamma * next_max)
        self.q_table[state[0], state[1], state[2], state[3], state[4], action] = new_value

### Net

In [7]:
class NetAgent(Agent):
    def __init__(self, size, mode='fc'):
        super().__init__(size)
        self.net = QNet(states_dim=5, actions_dim=5).to(DEVICE) if mode == 'fc' \
                   else QConv(size, actions_dim=5).to(DEVICE)
        self.loss = torch.nn.MSELoss()
        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=1e-3, weight_decay=1e-5)
        self.gamma = 0.6
        self.epsilon = 0.2

    def get_action(self, state, epsilon=-1):
        if np.random.uniform(0, 1) < epsilon:
            return np.random.randint(5)
        state = torch.FloatTensor(state).to(DEVICE)
        q_values = self.net(state).reshape(-1)
        return int(q_values.argmax())

    def update(self, state, action, reward, done, next_state):
        state = torch.FloatTensor(state).to(DEVICE)
        next_state = torch.FloatTensor(next_state).to(DEVICE)

        old_value = self.net(state).reshape(-1)[action]
        next_max = self.net(next_state).reshape(-1).max()
        new_value = reward + self.gamma * (1 - done) * next_max

        self.optimizer.zero_grad()
        loss_val = self.loss(old_value, new_value)
        loss_val.backward()
        self.optimizer.step()

    def stop_train(self):
        self.net.eval()

    def start_train(self):
        self.net.train()

### Hybrid

In [8]:
class HybridAgent(Agent):
    def __init__(self, size, type_net='fc'):
        super().__init__(size)
        self.type_net = type_net
        self.net = QNet(states_dim=5, actions_dim=5).to(DEVICE) if type_net == 'fc' \
                   else QConv(size, actions_dim=5).to(DEVICE)
        self.loss = torch.nn.MSELoss()
        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=1e-3, weight_decay=1e-5)
        self.q_table = np.zeros((size, size, 2, size, size, 5))
        self.alpha = 0.1
        self.gamma = 0.6
        self.epsilon = 0.2

    def get_action(self, state, render, epsilon=-1, mode='net'):
        if np.random.uniform(0, 1) < epsilon:
            return np.random.randint(5)

        if mode == 'net':
            render = torch.FloatTensor(render).to(DEVICE)
            state = torch.FloatTensor(state).to(DEVICE)
            if self.type_net == 'fc':
                q_values = self.net(state).reshape(-1)
            else:
                q_values = self.net(render).reshape(-1)
            return int(q_values.argmax())

        return np.argmax(self.q_table[state[0], state[1], state[2], 
                                      state[3], state[4]])
        
    def update(self, state, action, reward, render, next_state):
        old_value = self.q_table[state[0], state[1], state[2], state[3], 
                                 state[4], action]
        next_max = np.max(self.q_table[next_state[0], next_state[1], next_state[2], 
                                       next_state[3], next_state[4]])
        new_value = (1 - self.alpha) * old_value + self.alpha * (reward + self.gamma * next_max)
        self.q_table[state[0], state[1], state[2], state[3], state[4], action] = new_value


        state_ = torch.FloatTensor(state).to(DEVICE)
        render = torch.FloatTensor(render).to(DEVICE)
        if self.type_net == 'fc':
            pred = self.net(state_).reshape(-1)[action]
        else:
            pred = self.net(render).reshape(-1)[action]
        
        self.optimizer.zero_grad()
        loss_val = self.loss(pred, torch.FloatTensor([new_value]).to(DEVICE))
        loss_val.backward()
        self.optimizer.step()

    def stop_train(self):
        self.net.eval()

    def start_train(self):
        self.net.train()

## Target

In [9]:
class Target:
    def __init__(self, size, x=None, y=None, n_routes=100, escape=np.random.random()):
        self.area_size = size
        self.backup = {'location': None, 'route': None}
        self.location = self.create_location(x, y, size)
        self.route = self.create_escape_route(n_routes)
        self.escape = escape
        
    
    def create_location(self, x, y, size):
        x = x if x is not None else np.random.randint(size)
        y = y if y is not None else np.random.randint(size)
        while x == 0 and y == 0:
            x = np.random.randint(size)
            y = np.random.randint(size)
        self.backup['location'] = Location(x, y)
        return Location(x, y)
    
    
    def create_escape_route(self, n):
        area = self.area_size
        loc = self.location
        x, y = loc.x, loc.y
        route = []
        for i in range(n):   
            dx = np.random.randint(-5, 5)
            dy = 5 - abs(dx) if np.random.uniform(0, 1) < 0.5 else abs(dx) - 5
            new_x = x + dx
            new_y = y + dy
            while not (0 <= new_x < area and 0 <= new_y < area):
                dx = np.random.randint(-5, 5)
                dy = 5 - abs(dx) if np.random.uniform(0, 1) < 0.5 else abs(dx) - 5
                new_x = x + dx
                new_y = y + dy
            route.append(Location(new_x, new_y))
            x = new_x
            y = new_y
        self.backup['route'] = list(route)
        return route


    def run(self):
        self.location = self.route.pop(0)

    def reset(self):
        self.location = Location(self.backup['location'].x, self.backup['location'].y)
        self.route = list(self.backup['route'])

    def get_state(self):
        return self.location.toNumpy()

    def update(self):
        self.location = Location(0, 0)

## Enviroment

### HunterWorldEnv

In [10]:
import gym
from gym import spaces
import pandas as pd
from IPython.display import display
from IPython.display import clear_output


class HunterWorldEnv(gym.Env):

    metadata = {"render_modes": ["human", "network"]}

    def __init__(self, agent, target, size=10, render_mode='human'):
        self.size = size
        self.agent = agent
        self.target = target
        self.render_mode = render_mode
        self.observation_space = spaces.Dict(
            {
                "agent": spaces.Box(0, np.array([size - 1, size - 1, 1]), dtype=int),
                "target": spaces.Box(0, size - 1, shape=(2,), dtype=int),
            }
        ) 
        self.action_space = spaces.Discrete(5)
        self._action_to_direction = {
            0: np.array([1, 0]),
            1: np.array([0, 1]),
            2: np.array([-1, 0]),
            3: np.array([0, -1]),
        }
        
    def _get_obs(self):
        agent = self.agent.get_state()
        target = self.target.get_state()
        return {"agent": agent,
                "target": target,
                "state": np.concatenate([agent, target])}

    def _get_info(self):
        return {"distance": self.agent.location.dist(self.target.location)}

    def reset(self):
        #super().reset(seed=SEED)
        self.agent.reset()
        self.target.reset()
        observation = self._get_obs()
        info = self._get_info()
        return observation, info

    def step(self, action):
        if action == 4:
            if self.agent.location == self.target.location and self.agent.purpose == 0:
                if self.agent.attack > self.target.escape:
                    self.agent.purpose = 1
                    self.target.update()
                    return self._get_obs(), 1000, False, self._get_info()
                self.agent.attack += 0.1
                self.target.run()
                return self._get_obs(), 1000, False, self._get_info()
            return self._get_obs(), -500, False, self._get_info()
                  
        direction = self._action_to_direction[action]
        new_loc = np.clip(self.agent.location.toNumpy() + direction, 0, self.size - 1)
        self.agent.location = Location(new_loc[1], new_loc[0])
        
        observation = self._get_obs()
        info = self._get_info()
        reward = -info['distance']

        if self.agent.location == self.target.location and self.agent.purpose == 1:
            return observation, 1000, True, info

        return observation, reward, False, info

    def render(self):
        if self.render_mode == 'human':
            return self.render_human()
        return self.render_network()

    def render_human(self):
        clear_output(wait=True)

        def cell_color(val):
            color = 'white'
            if val == 'A':
                color = 'blue'
            if val == 'T':
                color = 'green'
            if val == 'X':
                color = 'red'
            if val == 'O':
                color = 'yellow'
            return 'color: %s' % color
        
        n = self.size
        desk = np.full((n, n), '.').astype(str)
        desk[self.agent.location.y, self.agent.location.x] = 'A'
        desk[self.target.location.y, self.target.location.x] = 'T' if self.agent.purpose == 0 \
                                                                 else 'O'
        if self.agent.location == self.target.location and self.agent.purpose == 0:
            desk[self.agent.location.y, self.agent.location.x] = 'X'
        display(pd.DataFrame(desk).style.applymap(cell_color))

    def render_network(self):        
        n = self.size
        desk = np.full((n, n), 0).astype(int)
        desk[self.agent.location.y, self.agent.location.x] = 1
        desk[self.target.location.y, self.target.location.x] = 2 if self.agent.purpose == 0 \
                                                                 else 3
        if self.agent.location == self.target.location and self.agent.purpose == 0:
            desk[self.agent.location.y, self.agent.location.x] = 4
        return desk.reshape(1, n, n)

# Check

In [11]:
import pickle

SEED = 42
SIZE = 10
TYPE = 'conv'

with open('artifacts/target.gym', 'rb') as file:
    target = pickle.load(file)
with open('artifacts/agent_tab.gym', 'rb') as file:
    agent_tab = pickle.load(file)
with open('artifacts/agent_net.gym', 'rb') as file:
    agent_net = pickle.load(file)
with open('artifacts/agent_hyb.gym', 'rb') as file:
    agent_hyb = pickle.load(file)
    
battle = {'Table': 0, 'DQN': 0, 'Hybrid': 0}

In [12]:
from time import sleep


env = HunterWorldEnv(agent_tab, target, SIZE)
obs, _ = env.reset()
env.render()
sleep(1)


state = obs['state']
done = False
while not done:
    action = agent_tab.get_action(state)
    obs, reward, done, info = env.step(action)
    state = obs['state']
    env.render()
    sleep(0.3)
    battle['Table'] += 1

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
0,O,.,.,.,.,.,.,.,.,.
1,.,.,.,.,.,.,.,.,.,.
2,.,.,.,.,.,.,.,.,.,.
3,.,.,.,.,.,.,.,.,.,.
4,.,.,.,.,.,.,.,.,.,.
5,.,.,.,.,.,.,.,.,.,.
6,.,.,.,.,.,.,.,.,.,.
7,.,.,.,.,.,.,.,.,.,.
8,.,.,.,.,.,.,.,.,.,.
9,.,.,.,.,.,.,.,.,.,.


In [13]:
from time import sleep


agent_net.stop_train()
env = HunterWorldEnv(agent_net, target, SIZE)

obs, _ = env.reset()
env.render_mode = 'human'
env.render()
sleep(1)

env.render_mode = 'network'
if TYPE == 'conv':
    state = env.render()
else:
    state = obs['state']
done = False
while not done:
    action = agent_net.get_action(state)
    obs, reward, done, info = env.step(action)
    env.render_mode = 'network'
    if TYPE == 'conv':
        state = env.render()
    else:
        state = obs['state']
    env.render_mode = 'human'
    env.render()
    sleep(0.3)
    battle['DQN'] += 1

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
0,O,.,.,.,.,.,.,.,.,.
1,.,.,.,.,.,.,.,.,.,.
2,.,.,.,.,.,.,.,.,.,.
3,.,.,.,.,.,.,.,.,.,.
4,.,.,.,.,.,.,.,.,.,.
5,.,.,.,.,.,.,.,.,.,.
6,.,.,.,.,.,.,.,.,.,.
7,.,.,.,.,.,.,.,.,.,.
8,.,.,.,.,.,.,.,.,.,.
9,.,.,.,.,.,.,.,.,.,.


In [14]:
from time import sleep


agent_hyb.stop_train()
env = HunterWorldEnv(agent_hyb, target, SIZE)

obs, _ = env.reset()
env.render_mode = 'human'
state = obs['state']
env.render()
sleep(1)

env.render_mode = 'network'
render = env.render()
done = False
while not done:
    action = agent_hyb.get_action(state, render)
    obs, _, done, _ = env.step(action)
    state = obs['state']
    env.render_mode = 'network'
    render = env.render()
    env.render_mode = 'human'
    env.render()
    sleep(0.3)
    battle['Hybrid'] += 1

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
0,O,.,.,.,.,.,.,.,.,.
1,.,.,.,.,.,.,.,.,.,.
2,.,.,.,.,.,.,.,.,.,.
3,.,.,.,.,.,.,.,.,.,.
4,.,.,.,.,.,.,.,.,.,.
5,.,.,.,.,.,.,.,.,.,.
6,.,.,.,.,.,.,.,.,.,.
7,.,.,.,.,.,.,.,.,.,.
8,.,.,.,.,.,.,.,.,.,.
9,.,.,.,.,.,.,.,.,.,.


In [15]:
pd.DataFrame(battle, index=[0])

Unnamed: 0,Table,DQN,Hybrid
0,31,31,31
