# Territorial.io Brain Model Training (DQN)
## Train a Deep Q-Network for strategic decision making

**Instructions:**
1. Upload this notebook to [Kaggle](https://kaggle.com/notebooks)
2. Enable GPU: Settings → Accelerator → GPU T4 x2 (free)
3. Run all cells
4. Download `brain_model.pth` from the Output tab
5. Place it in `territorial_bot/models/brain_model.pth`

This notebook:
- Builds a simulated Territorial.io game environment
- Trains a DQN agent using reinforcement learning
- Exports the trained model weights for use in the live bot

In [None]:
!pip install -q torch numpy matplotlib tqdm

In [None]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
from typing import List, Tuple, Optional
import matplotlib.pyplot as plt
from tqdm import tqdm
from pathlib import Path

OUTPUT_DIR = '/kaggle/working'
os.makedirs(OUTPUT_DIR, exist_ok=True)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
if torch.cuda.is_available():
    print(f'GPU: {torch.cuda.get_device_name(0)}')

# Reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

## 1. Simulated Game Environment

We simulate a simplified Territorial.io grid world for RL training.
The agent controls territory expansion on a 20x20 grid.

In [None]:
class TerritorialEnv:
    """
    Simplified Territorial.io simulation environment.
    
    Grid cells can be:
      0 = neutral
      1 = own territory
     -1 = enemy territory
    
    Actions: 0=STAY, 1=N, 2=NE, 3=E, 4=SE, 5=S, 6=SW, 7=W, 8=NW
    """
    
    GRID_SIZE = 20
    NUM_ENEMIES = 3
    
    # Direction vectors (row, col)
    DIRECTIONS = [
        (0, 0),   # STAY
        (-1, 0),  # N
        (-1, 1),  # NE
        (0, 1),   # E
        (1, 1),   # SE
        (1, 0),   # S
        (1, -1),  # SW
        (0, -1),  # W
        (-1, -1), # NW
    ]
    
    def __init__(self):
        self.grid = None
        self.own_cells = set()
        self.enemy_cells = [set() for _ in range(self.NUM_ENEMIES)]
        self.step_count = 0
        self.max_steps = 500
        self.reset()
    
    def reset(self) -> np.ndarray:
        """Reset environment to initial state."""
        self.grid = np.zeros((self.GRID_SIZE, self.GRID_SIZE), dtype=np.float32)
        self.step_count = 0
        
        # Place own territory (3x3 block in random corner)
        corners = [
            (1, 1), (1, self.GRID_SIZE-4),
            (self.GRID_SIZE-4, 1), (self.GRID_SIZE-4, self.GRID_SIZE-4)
        ]
        own_corner = random.choice(corners)
        self.own_cells = set()
        for dr in range(3):
            for dc in range(3):
                r, c = own_corner[0] + dr, own_corner[1] + dc
                self.grid[r, c] = 1.0
                self.own_cells.add((r, c))
        
        # Place enemies (2x2 blocks in other corners)
        remaining_corners = [c for c in corners if c != own_corner]
        self.enemy_cells = [set() for _ in range(self.NUM_ENEMIES)]
        for i, corner in enumerate(remaining_corners[:self.NUM_ENEMIES]):
            for dr in range(2):
                for dc in range(2):
                    r, c = corner[0] + dr, corner[1] + dc
                    self.grid[r, c] = -1.0
                    self.enemy_cells[i].add((r, c))
        
        return self._get_state()
    
    def step(self, action: int) -> Tuple[np.ndarray, float, bool, dict]:
        """Execute action and return (next_state, reward, done, info)."""
        self.step_count += 1
        prev_own = len(self.own_cells)
        prev_enemy_total = sum(len(e) for e in self.enemy_cells)
        
        # Execute own action
        if action != 0:  # Not STAY
            self._expand_territory(action)
        
        # Execute enemy actions (simple AI)
        for i in range(self.NUM_ENEMIES):
            if self.enemy_cells[i]:
                self._enemy_expand(i)
        
        # Compute reward
        curr_own = len(self.own_cells)
        curr_enemy_total = sum(len(e) for e in self.enemy_cells)
        
        territory_gain = (curr_own - prev_own) * 0.5
        enemy_loss = (prev_enemy_total - curr_enemy_total) * 0.3
        time_penalty = -0.01
        
        reward = territory_gain + enemy_loss + time_penalty
        
        # Check done conditions
        total_cells = self.GRID_SIZE * self.GRID_SIZE
        own_pct = curr_own / total_cells
        
        done = False
        if curr_own == 0:
            reward = -10.0  # Died
            done = True
        elif own_pct > 0.6:
            reward = +20.0  # Dominant victory
            done = True
        elif all(len(e) == 0 for e in self.enemy_cells):
            reward = +15.0  # Eliminated all enemies
            done = True
        elif self.step_count >= self.max_steps:
            done = True
        
        info = {
            'own_pct': own_pct,
            'own_cells': curr_own,
            'enemy_cells': curr_enemy_total,
        }
        
        return self._get_state(), reward, done, info
    
    def _expand_territory(self, action: int):
        """Expand own territory in the given direction."""
        dr, dc = self.DIRECTIONS[action]
        new_cells = set()
        
        for r, c in list(self.own_cells):
            nr, nc = r + dr, c + dc
            if 0 <= nr < self.GRID_SIZE and 0 <= nc < self.GRID_SIZE:
                if self.grid[nr, nc] != 1.0:  # Not already own
                    new_cells.add((nr, nc))
        
        # Expand to adjacent cells (not all, simulate troop cost)
        expand_count = max(1, len(new_cells) // 3)
        expand_cells = random.sample(list(new_cells), min(expand_count, len(new_cells)))
        
        for r, c in expand_cells:
            old_val = self.grid[r, c]
            self.grid[r, c] = 1.0
            self.own_cells.add((r, c))
            # Remove from enemy if it was enemy territory
            if old_val == -1.0:
                for enemy_set in self.enemy_cells:
                    enemy_set.discard((r, c))
    
    def _enemy_expand(self, enemy_idx: int):
        """Simple enemy AI: expand toward own territory."""
        if not self.enemy_cells[enemy_idx] or not self.own_cells:
            return
        
        # Find direction toward own territory
        enemy_center = np.mean(list(self.enemy_cells[enemy_idx]), axis=0)
        own_center = np.mean(list(self.own_cells), axis=0)
        
        diff = own_center - enemy_center
        dr = int(np.sign(diff[0]))
        dc = int(np.sign(diff[1]))
        
        # Expand one cell
        for r, c in list(self.enemy_cells[enemy_idx]):
            nr, nc = r + dr, c + dc
            if 0 <= nr < self.GRID_SIZE and 0 <= nc < self.GRID_SIZE:
                if self.grid[nr, nc] != -1.0:
                    old_val = self.grid[nr, nc]
                    self.grid[nr, nc] = -1.0
                    self.enemy_cells[enemy_idx].add((nr, nc))
                    if old_val == 1.0:
                        self.own_cells.discard((nr, nc))
                    break
    
    def _get_state(self) -> np.ndarray:
        """Convert grid to state vector (64 features)."""
        total = self.GRID_SIZE * self.GRID_SIZE
        own_count = len(self.own_cells)
        enemy_count = sum(len(e) for e in self.enemy_cells)
        neutral_count = total - own_count - enemy_count
        
        state = np.zeros(64, dtype=np.float32)
        
        # Territory percentages
        state[0] = own_count / total
        state[1] = enemy_count / total
        state[2] = neutral_count / total
        
        # Normalized counts
        state[3] = own_count / total
        state[4] = enemy_count / total
        state[5] = neutral_count / total
        
        # Border counts
        own_borders = self._count_borders(self.own_cells)
        state[6] = own_borders / 100.0
        
        # Game phase
        own_pct = own_count / total
        if own_pct < 0.05:
            state[8] = 1.0  # early
        elif own_pct < 0.25:
            state[9] = 1.0  # mid
        else:
            state[10] = 1.0  # late
        
        # Compact grid (5x10 = 50 features, starting at index 14)
        sample_rows = np.linspace(0, self.GRID_SIZE - 1, 5, dtype=int)
        sample_cols = np.linspace(0, self.GRID_SIZE - 1, 10, dtype=int)
        idx = 14
        for r in sample_rows:
            for c in sample_cols:
                if idx < 64:
                    state[idx] = self.grid[r, c]
                    idx += 1
        
        return state
    
    def _count_borders(self, cells: set) -> int:
        """Count cells that are on the border of a territory."""
        count = 0
        for r, c in cells:
            for dr, dc in [(-1,0),(1,0),(0,-1),(0,1)]:
                nr, nc = r+dr, c+dc
                if (nr, nc) not in cells:
                    count += 1
                    break
        return count
    
    def render(self, ax=None):
        """Visualize the current grid state."""
        if ax is None:
            fig, ax = plt.subplots(1, 1, figsize=(6, 6))
        
        display = np.zeros((self.GRID_SIZE, self.GRID_SIZE, 3))
        display[self.grid == 0] = [0.5, 0.5, 0.5]   # neutral = gray
        display[self.grid == 1] = [0.0, 0.4, 1.0]   # own = blue
        display[self.grid == -1] = [1.0, 0.2, 0.2]  # enemy = red
        
        ax.imshow(display)
        ax.set_title(f'Step {self.step_count} | Own: {len(self.own_cells)}')
        ax.axis('off')


# Test environment
env = TerritorialEnv()
state = env.reset()
print(f'State shape: {state.shape}')
print(f'State sample: {state[:10]}')

# Visualize initial state
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
env.render(axes[0])
axes[0].set_title('Initial State')

# Take some random steps
for _ in range(20):
    s, r, done, info = env.step(random.randint(1, 8))
    if done:
        env.reset()
        break
env.render(axes[1])
axes[1].set_title('After 20 Steps')

for _ in range(50):
    s, r, done, info = env.step(random.randint(1, 8))
    if done:
        env.reset()
        break
env.render(axes[2])
axes[2].set_title('After 70 Steps')

plt.tight_layout()
plt.savefig(f'{OUTPUT_DIR}/env_visualization.png', dpi=100)
plt.show()
print('Environment visualization saved.')

## 2. DQN Architecture

In [None]:
class DQNNetwork(nn.Module):
    """
    Deep Q-Network for Territorial.io decision making.
    Architecture matches brain_system.py for direct weight loading.
    """
    def __init__(self, state_size: int = 64, action_size: int = 9):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_size, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, action_size),
        )
    
    def forward(self, x):
        return self.net(x)


class ReplayMemory:
    """Experience replay buffer."""
    
    def __init__(self, capacity: int = 10000):
        self.memory = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size: int):
        return random.sample(self.memory, batch_size)
    
    def __len__(self):
        return len(self.memory)


# Hyperparameters
STATE_SIZE = 64
ACTION_SIZE = 9
GAMMA = 0.95
EPSILON_START = 1.0
EPSILON_END = 0.05
EPSILON_DECAY = 0.995
LEARNING_RATE = 0.001
BATCH_SIZE = 64
MEMORY_SIZE = 10000
TARGET_UPDATE_FREQ = 10
EPISODES = 1000
MAX_STEPS = 500

# Initialize networks
policy_net = DQNNetwork(STATE_SIZE, ACTION_SIZE).to(device)
target_net = DQNNetwork(STATE_SIZE, ACTION_SIZE).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=LEARNING_RATE)
memory = ReplayMemory(MEMORY_SIZE)
criterion = nn.MSELoss()

total_params = sum(p.numel() for p in policy_net.parameters())
print(f'DQN parameters: {total_params:,}')
print(policy_net)

## 3. DQN Training Loop

In [None]:
def select_action(state: np.ndarray, epsilon: float) -> int:
    """Epsilon-greedy action selection."""
    if random.random() < epsilon:
        return random.randint(0, ACTION_SIZE - 1)
    
    with torch.no_grad():
        state_t = torch.FloatTensor(state).unsqueeze(0).to(device)
        q_values = policy_net(state_t)
        return q_values.argmax(dim=1).item()


def train_step() -> Optional[float]:
    """Perform one DQN training step (Double DQN)."""
    if len(memory) < BATCH_SIZE:
        return None
    
    batch = memory.sample(BATCH_SIZE)
    states, actions, rewards, next_states, dones = zip(*batch)
    
    states_t = torch.FloatTensor(np.array(states)).to(device)
    actions_t = torch.LongTensor(actions).to(device)
    rewards_t = torch.FloatTensor(rewards).to(device)
    next_states_t = torch.FloatTensor(np.array(next_states)).to(device)
    dones_t = torch.BoolTensor(dones).to(device)
    
    # Current Q values
    q_values = policy_net(states_t).gather(1, actions_t.unsqueeze(1))
    
    # Double DQN target
    with torch.no_grad():
        next_actions = policy_net(next_states_t).argmax(dim=1)
        next_q = target_net(next_states_t).gather(
            1, next_actions.unsqueeze(1)
        ).squeeze(1)
        next_q[dones_t] = 0.0
        target_q = rewards_t + GAMMA * next_q
    
    loss = criterion(q_values.squeeze(1), target_q)
    
    optimizer.zero_grad()
    loss.backward()
    nn.utils.clip_grad_norm_(policy_net.parameters(), 1.0)
    optimizer.step()
    
    return loss.item()


# Training
env = TerritorialEnv()
epsilon = EPSILON_START

episode_rewards = []
episode_lengths = []
episode_own_pcts = []
losses = []

best_avg_reward = float('-inf')
best_model_path = f'{OUTPUT_DIR}/brain_model_best.pth'

print(f'Training DQN for {EPISODES} episodes...')
print('=' * 70)

for episode in tqdm(range(1, EPISODES + 1)):
    state = env.reset()
    total_reward = 0.0
    step = 0
    
    for step in range(MAX_STEPS):
        action = select_action(state, epsilon)
        next_state, reward, done, info = env.step(action)
        
        memory.push(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward
        
        loss = train_step()
        if loss is not None:
            losses.append(loss)
        
        if done:
            break
    
    # Decay epsilon
    epsilon = max(EPSILON_END, epsilon * EPSILON_DECAY)
    
    # Update target network
    if episode % TARGET_UPDATE_FREQ == 0:
        target_net.load_state_dict(policy_net.state_dict())
    
    episode_rewards.append(total_reward)
    episode_lengths.append(step + 1)
    episode_own_pcts.append(info.get('own_pct', 0))
    
    # Save best model
    if episode >= 50:
        avg_reward = np.mean(episode_rewards[-50:])
        if avg_reward > best_avg_reward:
            best_avg_reward = avg_reward
            torch.save({
                'policy_net': policy_net.state_dict(),
                'target_net': target_net.state_dict(),
                'epsilon': epsilon,
                'episode': episode,
            }, best_model_path)
    
    # Periodic logging
    if episode % 100 == 0:
        avg_r = np.mean(episode_rewards[-100:])
        avg_own = np.mean(episode_own_pcts[-100:])
        avg_loss = np.mean(losses[-1000:]) if losses else 0
        print(
            f'Episode {episode:5d}/{EPISODES} | '
            f'Avg Reward: {avg_r:7.2f} | '
            f'Avg Own%: {avg_own:.1%} | '
            f'ε: {epsilon:.3f} | '
            f'Loss: {avg_loss:.4f} | '
            f'Memory: {len(memory)}'
        )

print('=' * 70)
print(f'Training complete! Best avg reward: {best_avg_reward:.2f}')

## 4. Training Curves

In [None]:
def smooth(data, window=50):
    """Moving average smoothing."""
    if len(data) < window:
        return data
    return np.convolve(data, np.ones(window)/window, mode='valid')


fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# Episode rewards
axes[0, 0].plot(episode_rewards, alpha=0.3, color='blue', label='Raw')
axes[0, 0].plot(smooth(episode_rewards), color='blue', label='Smoothed')
axes[0, 0].set_title('Episode Rewards')
axes[0, 0].set_xlabel('Episode')
axes[0, 0].set_ylabel('Total Reward')
axes[0, 0].legend()
axes[0, 0].grid(True)

# Episode lengths
axes[0, 1].plot(episode_lengths, alpha=0.3, color='green', label='Raw')
axes[0, 1].plot(smooth(episode_lengths), color='green', label='Smoothed')
axes[0, 1].set_title('Episode Lengths')
axes[0, 1].set_xlabel('Episode')
axes[0, 1].set_ylabel('Steps')
axes[0, 1].legend()
axes[0, 1].grid(True)

# Territory ownership
axes[1, 0].plot(episode_own_pcts, alpha=0.3, color='orange', label='Raw')
axes[1, 0].plot(smooth(episode_own_pcts), color='orange', label='Smoothed')
axes[1, 0].set_title('Territory Ownership at Episode End')
axes[1, 0].set_xlabel('Episode')
axes[1, 0].set_ylabel('Own Territory %')
axes[1, 0].legend()
axes[1, 0].grid(True)

# Training loss
if losses:
    axes[1, 1].plot(losses, alpha=0.2, color='red', label='Raw')
    axes[1, 1].plot(smooth(losses, 200), color='red', label='Smoothed')
    axes[1, 1].set_title('Training Loss')
    axes[1, 1].set_xlabel('Training Step')
    axes[1, 1].set_ylabel('MSE Loss')
    axes[1, 1].legend()
    axes[1, 1].grid(True)

plt.suptitle('DQN Training Progress', fontsize=14)
plt.tight_layout()
plt.savefig(f'{OUTPUT_DIR}/brain_training_curves.png', dpi=100)
plt.show()
print('Training curves saved.')

## 5. Evaluate Trained Agent

In [None]:
# Load best model
checkpoint = torch.load(best_model_path, map_location=device)
policy_net.load_state_dict(checkpoint['policy_net'])
policy_net.eval()

print('Evaluating trained agent (10 episodes, greedy policy)...')
eval_rewards = []
eval_own_pcts = []

fig, axes = plt.subplots(2, 5, figsize=(20, 8))

for ep in range(10):
    state = env.reset()
    total_reward = 0.0
    
    for step in range(MAX_STEPS):
        # Greedy action (no exploration)
        with torch.no_grad():
            state_t = torch.FloatTensor(state).unsqueeze(0).to(device)
            action = policy_net(state_t).argmax(dim=1).item()
        
        state, reward, done, info = env.step(action)
        total_reward += reward
        
        if done:
            break
    
    eval_rewards.append(total_reward)
    eval_own_pcts.append(info.get('own_pct', 0))
    
    # Visualize final state
    row, col = ep // 5, ep % 5
    env.render(axes[row, col])
    axes[row, col].set_title(
        f'Ep {ep+1}: R={total_reward:.1f} Own={info["own_pct"]:.1%}',
        fontsize=9
    )

plt.suptitle('Trained Agent Evaluation (10 Episodes)', fontsize=12)
plt.tight_layout()
plt.savefig(f'{OUTPUT_DIR}/agent_evaluation.png', dpi=100)
plt.show()

print(f'\nEvaluation Results:')
print(f'  Avg Reward: {np.mean(eval_rewards):.2f} ± {np.std(eval_rewards):.2f}')
print(f'  Avg Own Territory: {np.mean(eval_own_pcts):.1%} ± {np.std(eval_own_pcts):.1%}')
print(f'  Best Episode: {max(eval_rewards):.2f} reward, {max(eval_own_pcts):.1%} territory')

## 6. Export Final Model

In [None]:
# Save final model
final_model_path = f'{OUTPUT_DIR}/brain_model.pth'
torch.save({
    'policy_net': policy_net.state_dict(),
    'target_net': target_net.state_dict(),
    'epsilon': EPSILON_END,  # Start with low epsilon for deployment
    'episode': EPISODES,
    'state_size': STATE_SIZE,
    'action_size': ACTION_SIZE,
    'hyperparams': {
        'gamma': GAMMA,
        'lr': LEARNING_RATE,
        'batch_size': BATCH_SIZE,
    }
}, final_model_path)
print(f'Final brain model saved to: {final_model_path}')

# Verify
test_model = DQNNetwork(STATE_SIZE, ACTION_SIZE)
ckpt = torch.load(final_model_path, map_location='cpu')
test_model.load_state_dict(ckpt['policy_net'])
test_model.eval()

dummy_state = torch.randn(1, STATE_SIZE)
with torch.no_grad():
    q_vals = test_model(dummy_state)
print(f'Test Q-values shape: {q_vals.shape}')
print(f'Test Q-values: {q_vals.numpy()}')
print('Brain model verification passed!')

# List output files
print('\nOutput files:')
for f in sorted(Path(OUTPUT_DIR).iterdir()):
    size_kb = f.stat().st_size / 1024
    print(f'  {f.name}: {size_kb:.1f} KB')

print('\n✅ DONE! Download brain_model.pth')
print('   Place it in territorial_bot/models/brain_model.pth')