### Imports

In [72]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import namedtuple, deque
import random
import math
from itertools import count
from PIL import Image
import cv2
from tqdm import tqdm

### Hyperparameters

In [73]:
# DQL constants
BUFFER_SIZE = int(1e5)
BATCH_SIZE = 128
GAMMA = 0.999
EPS_START = 0.95
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE_EVERY = 10
LEARNING_RATE = 5e-4
NUM_EPISODES = 20

# Environment constants
ENV_SIZE = 32
MAX_ENV_STEPS = 50
POP_DENSITY = 0.2
ZOMBIE_FRACTION = 0.3
VISIBILITY = 7 # square's half side length
STATE_ROOT_SIZE = 2 * VISIBILITY + 1
STATE_SIZE = STATE_ROOT_SIZE**2
ACTION_SIZE = 5 # 4 directions + do nothing

DEATH_REWARD = -100
KILL_REWARD = 50
REST_REWARD = 0
MOVE_REWARD = -5

EMPTY_CELL = 0
HUMAN_CELL = 1
AGENT_CELL = 2
ZOMBIE_CELL = 3

# Printing and visualization constants
SHOW_ENV = True
SHOW_ENV_EVERY = 1

### Initializations

In [74]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cpu


## Function definitions

### Environment

In [75]:
class Environment():
        
    def __init__(self):
        print("Initializing environment")
    
    def reset(self):
        ## generate grid
        self.grid = np.zeros((ENV_SIZE, ENV_SIZE), dtype=np.uint8)
        
        ## place people and zombies
        rands = [random.uniform(0, 1) for i in range(ENV_SIZE**2)]
        r = random.randint(0, ENV_SIZE**2-1)
        self.humans_pos = []
        self.zombies_pos = []
        for y in range(ENV_SIZE):
            for x in range(ENV_SIZE):
                if rands[y * ENV_SIZE + x] < POP_DENSITY:
                    if rands[y * ENV_SIZE + x] < POP_DENSITY * ZOMBIE_FRACTION:
                        self.grid[y, x] = ZOMBIE_CELL
                        self.zombies_pos.append((y, x))
                    else:
                        self.grid[y, x] = HUMAN_CELL
                        self.humans_pos.append((y, x))
                    
        # place agent
        self.agent_x = random.randint(0, ENV_SIZE-1) # (second bound is included)
        self.agent_y = random.randint(0, ENV_SIZE-1)
        while self.grid[self.agent_y, self.agent_x] != EMPTY_CELL:
            self.agent_x = random.randint(0, ENV_SIZE-1)
            self.agent_y = random.randint(0, ENV_SIZE-1)
        self.grid[self.agent_y, self.agent_x] = AGENT_CELL
        
        self.episode_step = 0
        state = self.scan_env(self.agent_x, self.agent_y)
        return state
    
    def step(self, action):
        self.episode_step += 1
        
        self.perform_action(action)
        self.zombies_pos = self.move_zombies()
        ## TODO - Later on: make other agents move as well
        
        dead_humans = set()
        dead_zombies = set()
        
        nb_allies, enemies = self.scan_vicinity(self.agent_x, self.agent_y)
        
        # Handle agent's interactions
        dead = False
        reward = REST_REWARD if action == 0 else MOVE_REWARD
        if len(enemies) > 0: # Contact
            for enemy in enemies: #Might want to aggregate this by simply modifying the 'b' value instead (b = 1 + nb_enemies)
                r = np.random.uniform(0, 1)
                b = 1
                k = 1 + nb_allies # << S # maybe add some multiplicative factor to put more importance on having allies
                if r >= b/(b+k): # kill
                    reward += KILL_REWARD
                    self.grid[enemy] = EMPTY_CELL
                    dead_zombies.add(enemy)
                else: # death
                    reward += DEATH_REWARD
                    dead = True
                    ## Probably don't need to update state as it won't be used in replay memory anyway
                    break
        done = self.episode_step >= MAX_ENV_STEPS or dead
        
        # Handle all other interactions
        for (y, x) in self.humans_pos:
            nb_allies, enemies = self.scan_vicinity(x, y)
            if len(enemies) > 0:
                for enemy in enemies:
                    r = np.random.uniform(0, 1)
                    b = 1
                    k = 1 + nb_allies # << S
                    if r >= b/(b+k): # kill
                        self.grid[enemy] = EMPTY_CELL
                        dead_zombies.add(enemy)
                    else: # death
                        self.grid[y, x] = ZOMBIE_CELL
                        self.zombies_pos.append((y, x))
                        dead_humans.add((y, x))
        
        # Clean the dead
        for dead_human in dead_humans:
            self.humans_pos.remove(dead_human)
        for dead_zombie in dead_zombies:
            self.zombies_pos.remove(dead_zombie)
            
        state = self.scan_env(self.agent_x, self.agent_y)
        
        return state, reward, done, None
    
    def scan_vicinity(self, x, y): #Scan the vicinity to record allies and enemies
        enemies = []
        nb_allies = 0
        for j in [(y-1)%ENV_SIZE, y, (y+1)%ENV_SIZE]:
            for i in [(x-1)%ENV_SIZE, x, (x+1)%ENV_SIZE]:
                if not (j == y and i == x):
                    if self.grid[j, i] == HUMAN_CELL:
                        nb_allies += 1
                    elif self.grid[j, i] == ZOMBIE_CELL and (j == y or i == x):
                        enemies.append((j, i))
        return nb_allies, enemies
    
    def scan_env(self, x, y): #Scan environment to return the state
        state = np.zeros((STATE_SIZE,))
        k = 0
        for j in range(y - VISIBILITY, y + VISIBILITY+1):
            for i in range(x - VISIBILITY, x + VISIBILITY+1):
                state[k] = self.grid[j%ENV_SIZE, i%ENV_SIZE]
                k += 1
        state = state.reshape((1, STATE_ROOT_SIZE, STATE_ROOT_SIZE))
        state = np.ascontiguousarray(state, dtype=np.float32)
        state = torch.from_numpy(state)
        state = state.unsqueeze(0)
        return state
    
    def perform_action(self, action):
        x = self.agent_x
        y = self.agent_y
        if action == 0:
            return
        if action == 1: #move right
            self.agent_x = (self.agent_x + 1) % ENV_SIZE
        elif action == 2: #move left
            self.agent_x = (self.agent_x - 1) % ENV_SIZE
        elif action == 3: #move top
            self.agent_y = (self.agent_y + 1) % ENV_SIZE
        else: #move down
            self.agent_y = (self.agent_y - 1) % ENV_SIZE
        # Might want to implement collision avoidance, but this might cause problems if 2 agents want to move
        # together in the same direction.
        if self.grid[self.agent_y, self.agent_x] != EMPTY_CELL: # If cell not empty
            self.agent_x = x
            self.agetn_y = y
        else:
            self.grid[y, x] = EMPTY_CELL
            self.grid[self.agent_y, self.agent_x] = AGENT_CELL
    
    def move_zombies(self):
        new_pos = []
        for (y, x) in self.zombies_pos:
            action = random.randint(0, 4)
            if action == 0:
                new_pos.append((y, x))
            else:
                new_x = x
                new_y = y
                if action == 1: #move right
                    new_x = (x + 1) % ENV_SIZE
                elif action == 2: #move left
                    new_x = (x - 1) % ENV_SIZE
                elif action == 3: #move top
                    new_y = (y + 1) % ENV_SIZE
                else: #move down
                    new_y = (y - 1) % ENV_SIZE
                if self.grid[new_y, new_x] != EMPTY_CELL:
                    new_x = x
                    new_y = y
                else:
                    self.grid[y, x] = EMPTY_CELL
                    self.grid[new_y, new_x] = ZOMBIE_CELL
                new_pos.append((new_y, new_x))
        return new_pos
        
    def render(self):
        img = self.get_image()
        img = img.resize((300, 300))
        cv2.imshow("image", np.array(img))
        cv2.waitKey(1)
    
    def get_image(self):
        env = self.grid * 80
        #env = env[:, None]
        img = Image.fromarray(env, 'L')  # Black and white mode
        return img

### Replay memory

In [76]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)
        
    def push(self, *args):
        self.memory.append(Transition(*args))
        
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
    
    def __len__(self):
        return len(self.memory)

### DQN

In [77]:
class DQN(nn.Module):
    def __init__(self, w, h, action_size):
        super(DQN, self).__init__()
        self.seed = torch.manual_seed(0)
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, stride=2)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=2)
        self.bn2 = nn.BatchNorm2d(32)

        # Number of Linear input connections depends on output of conv2d layers
        # and therefore the input image size, so compute it.
        def conv2d_size_out(size, kernel_size = 3, stride = 2):
            return (size - (kernel_size - 1) - 1) // stride  + 1
        convw = conv2d_size_out(conv2d_size_out(w))
        convh = conv2d_size_out(conv2d_size_out(h))
        linear_input_size = convw * convh * 32
        self.head = nn.Linear(linear_input_size, action_size)
        
    def forward(self, x):
        x = x.to(device)
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        return self.head(x.view(x.size(0), -1))

### DQL Agent

In [80]:
class DQLAgent():
    def __init__(self, env):
        self.env = env
        
        # DQNs
        self.policy_net = DQN(STATE_ROOT_SIZE, STATE_ROOT_SIZE, ACTION_SIZE).to(device)
        self.target_net = DQN(STATE_ROOT_SIZE, STATE_ROOT_SIZE, ACTION_SIZE).to(device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()
        
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=LEARNING_RATE)
        self.memory = ReplayMemory(BUFFER_SIZE)
        
        self.time_step = 0
        
    def optimize_model(self):
        if len(self.memory) < BATCH_SIZE:
            return
        transitions = self.memory.sample(BATCH_SIZE)
        batch = Transition(*zip(*transitions))
        non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.bool)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        
        state_action_values = self.policy_net(state_batch).gather(1, action_batch)
        
        next_state_values = torch.zeros(BATCH_SIZE, device=device)
        next_state_values[non_final_mask] = self.target_net(non_final_next_states).max(1)[0].detach()
        expected_state_action_values = next_state_values * GAMMA + reward_batch
        
        criterion = nn.SmoothL1Loss()
        loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))
        
        self.optimizer.zero_grad()
        loss.backward()
        for param in self.policy_net.parameters():
            param.grad.data.clamp_(-1, 1) # Gradient clipping?
        self.optimizer.step()
        
    def select_action(self, state):
        sample = random.random()
        eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * self.time_step / EPS_DECAY)
        if sample > eps_threshold:
            with torch.no_grad():
                return self.policy_net(state).max(1)[1].view(1, 1)
        else:
            return torch.tensor([[random.randrange(ACTION_SIZE)]], device=device, dtype=torch.long)
    
    def train(self):
        for episode in tqdm(range(NUM_EPISODES)):
            state = self.env.reset() # get initial state
            #state = torch.from_numpy(state)
            for t in count(): # The environment is responsible for returning done=True after some time steps
                action = self.select_action(state)
                next_state, reward, done, _ = self.env.step(action.item())
                #next_state = torch.from_numpy(next_state)
                reward = torch.tensor([reward], device=device)
                
                self.memory.push(state, action, next_state, reward)
                state = next_state
                
                self.optimize_model()
                if done:
                    # TODO: Plot some statistics etc...
                    break
            if episode % TARGET_UPDATE_EVERY == 0:
                self.target_net.load_state_dict(self.policy_net.state_dict())
                
            if SHOW_ENV and episode % SHOW_ENV_EVERY == 0:
                self.env.render()
        print("Training finished")

# Run simulation

In [79]:
env = Environment()
dqlAgent = DQLAgent(env)
dqlAgent.train()

Initializing environment


100%|██████████████████████████████████████████████████████████████████████████████████| 20/20 [00:04<00:00,  4.14it/s]

Training finished



