In [1]:
import torch, os, random, cv2, itertools
import numpy as np
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from collections import deque
from vizdoom import DoomGame

In [2]:
torch.cuda.empty_cache()
# Then restart your kernel/notebook
game = DoomGame()

# Load configuration
game.load_config("B:/Pytorch/RL/basic.cfg")

# Initialize the game
game.init()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [3]:
state = game.get_state()

In [4]:
game.get_available_buttons()

[<Button.MOVE_LEFT: 11>, <Button.MOVE_RIGHT: 10>, <Button.ATTACK: 0>]

In [5]:
print("Environment shape: ", state.screen_buffer.shape)

Environment shape:  (3, 240, 320)


In [6]:
print("Actions taken: ", game.get_available_buttons())

Actions taken:  [<Button.MOVE_LEFT: 11>, <Button.MOVE_RIGHT: 10>, <Button.ATTACK: 0>]


In [7]:
print("Is episode finished:", game.is_episode_finished())

Is episode finished: False


In [8]:
buttons = game.get_available_buttons()
print(buttons)
print("Number of buttons:", len(buttons))

[<Button.MOVE_LEFT: 11>, <Button.MOVE_RIGHT: 10>, <Button.ATTACK: 0>]
Number of buttons: 3


In [9]:
def get_game_actions(game):
    button_names = game.get_available_buttons()
    num_buttons = len(button_names)
    
    # Generate all combinations
    all_actions = [list(bits) for bits in itertools.product([0,1], repeat=num_buttons)]
    
    # Filter: remove do-nothing and invalid combos
    filtered_actions = []
    for vec in all_actions:
        left = vec[0]
        right = vec[1]
        
        # Remove:
        # - do nothing
        # - both move buttons together
        # - all buttons pressed
        if vec == [0,0,0]:
            continue
        if left and right:
            continue
        if sum(vec) == 3:
            continue
        
        filtered_actions.append(vec)
    
    print("Button order in action vector:")
    for idx, btn in enumerate(button_names):
        print(f"index {idx}: {btn}")
        
    print("\nFiltered action vectors:")
    for idx, vec in enumerate(filtered_actions):
        active = [btn.name for i, btn in enumerate(button_names) if vec[i]==1]
        print(f"action index: {idx} → {vec} → active buttons: {active}")
    
    return filtered_actions

game_actions = get_game_actions(game)
game_actions

Button order in action vector:
index 0: Button.MOVE_LEFT
index 1: Button.MOVE_RIGHT
index 2: Button.ATTACK

Filtered action vectors:
action index: 0 → [0, 0, 1] → active buttons: ['ATTACK']
action index: 1 → [0, 1, 0] → active buttons: ['MOVE_RIGHT']
action index: 2 → [0, 1, 1] → active buttons: ['MOVE_RIGHT', 'ATTACK']
action index: 3 → [1, 0, 0] → active buttons: ['MOVE_LEFT']
action index: 4 → [1, 0, 1] → active buttons: ['MOVE_LEFT', 'ATTACK']


[[0, 0, 1], [0, 1, 0], [0, 1, 1], [1, 0, 0], [1, 0, 1]]

In [10]:
game.make_action([0, 0, 1])

-1.0

In [11]:
class ReplayBuffer():
    def __init__(self, max_buffer_size=10000, batch_size=16):
        self.buffer = deque(maxlen=max_buffer_size)
        self.batch_size = batch_size
        
    def __len__(self):
        return len(self.buffer)
    
    def add_sample(self, element: tuple):
        self.buffer.append(element)
    
    def get_batch(self):
        return random.sample(self.buffer, k=self.batch_size) if len(self.buffer) > self.batch_size else list(self.buffer)

In [12]:
class QNetwork(nn.Module):
    def __init__(self, conv_features=64, num_actions=3, input_shape=(3, 120, 160)):
        super(QNetwork, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=conv_features, kernel_size=3)
        self.pool1 = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(in_channels=conv_features, out_channels=conv_features*2, kernel_size=3)
        self.pool2 = nn.MaxPool2d(2, 2)
        
        self.flatten_size = self._get_flatten_size(input_shape)
        
        self.fc1 = nn.Linear(self.flatten_size, 1024)
        self.fc2 = nn.Linear(1024, num_actions)
        
    def _get_flatten_size(self, input_shape):
        with torch.no_grad():
            dummy = torch.zeros(1, *input_shape)
            x = F.relu(self.conv1(dummy))
            x = self.pool1(x)
            x = F.relu(self.conv2(x))
            x = self.pool2(x)
            flatten_size = x.numel()
        return flatten_size
        
    def forward(self, x):
        x = F.gelu(self.conv1(x))
        x = self.pool1(x)
        x = F.gelu(self.conv2(x))
        x = self.pool2(x)
        x = torch.flatten(x, 1)
        x = F.gelu(self.fc1(x))
        x = self.fc2(x)
        return x


In [13]:
GAMMA = 0.95
BATCH_SIZE = 16
BUFFER_SIZE = 5000
EPSILON = 1.0
DECAY = 0.995
MIN_EPSILON = 0.01
LEARNING_RATE = 1e-4
TOTAL_NUM_STEPS = 20000
N_START_TRAINING = 2000
TARGET_UPDATE = 50
LOG_LOSS_EVERY_N_STEPS = 100
EPSILON_DECAY_RATE = 0.995
MIN_EPSILON = 0.1


q_net = QNetwork(num_actions=len(game_actions))
target_q_net = QNetwork(num_actions=len(game_actions))
buffer = ReplayBuffer(max_buffer_size=BUFFER_SIZE, batch_size=BATCH_SIZE)
optimizer = optim.AdamW(q_net.parameters(), lr=LEARNING_RATE)
target_q_net.load_state_dict(q_net.state_dict())
q_net.to(device=device)
target_q_net.to(device=device)

QNetwork(
  (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1))
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1))
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=136192, out_features=1024, bias=True)
  (fc2): Linear(in_features=1024, out_features=5, bias=True)
)

In [14]:
def get_tensored_state(game_state):
    game_state = game_state.transpose(1, 2, 0)
    resized_image = cv2.resize(game_state, (160, 120))
    return (torch.from_numpy(resized_image).permute(2, 0, 1)).float().to(device) / 255.0

def select_action(state_tensor, q_net, epsilon):
    if epsilon > MIN_EPSILON:
        if random.random() < epsilon:
            return random.randint(0, len(game_actions)-1)
        else:
            with torch.no_grad():
                logits = q_net(state_tensor.unsqueeze(0))
                action_idx = torch.argmax(logits, dim=-1).item()
                return action_idx
    else:
        with torch.no_grad():
                logits = q_net(state_tensor.unsqueeze(0))
                action_idx = torch.argmax(logits, dim=-1).item()
                return action_idx

In [None]:
## Training

game.new_episode()
for step in range(TOTAL_NUM_STEPS):
    # Get current state
    state = game.get_state().screen_buffer
    state_tensor = get_tensored_state(state)

    # Select action (returns index)
    action = select_action(state_tensor, q_net, EPSILON)
    action_vector = game_actions[action]
    
    # Execute action
    reward, done = game.make_action(action_vector), game.is_episode_finished()
    did_attack = action_vector[2] == 1

    # Penalize blind attack
    if did_attack and reward == 0:
        reward -= 10

    # Get next state
    if done:
        next_state_tensor = torch.zeros_like(state_tensor)
        game.new_episode()
    else:
        next_state = game.get_state().screen_buffer
        next_state_tensor = get_tensored_state(next_state)

    # Store in buffer as numpy / Python
    buffer.add_sample((
        state_tensor.cpu().numpy(),
        action,          # int
        reward,          # float
        next_state_tensor.cpu().numpy(),
        done             # bool
    ))
    
    if step < N_START_TRAINING and step % 500 == 0:
        print(f"Buffer filled so far: {len(buffer)}")

    # Start training once buffer has enough samples
    if step > N_START_TRAINING:
        q_net.train()
        target_q_net.eval()

        # Sample minibatch
        sampled_batches = buffer.get_batch()

        # Convert to tensors on device
        states = torch.from_numpy(np.stack([b[0] for b in sampled_batches])).float().to(device)
        actions = torch.tensor([b[1] for b in sampled_batches], dtype=torch.int64, device=device)
        rewards = torch.tensor([b[2] for b in sampled_batches], dtype=torch.float32, device=device)
        next_states = torch.from_numpy(np.stack([b[3] for b in sampled_batches])).float().to(device)
        dones = torch.tensor([b[4] for b in sampled_batches], dtype=torch.bool, device=device)

        # Double DQN target calculation
        with torch.no_grad():
            q_values_main = q_net(next_states)
            next_actions = torch.argmax(q_values_main, dim=-1)
            target_q_values = target_q_net(next_states)

        target_q_values_max = target_q_values.gather(1, next_actions.unsqueeze(1)).squeeze(1)
        target = rewards + GAMMA * target_q_values_max * (~dones)

        # Predicted Q-values for actions actually taken
        predicted = q_net(states).gather(1, actions.unsqueeze(1)).squeeze(1)

        # Compute loss
        loss = F.mse_loss(predicted, target)

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(q_net.parameters(), max_norm=1.0)
        optimizer.step()

        # Logging
        if step % LOG_LOSS_EVERY_N_STEPS == 0:
            print(f"[step]: {step}, [loss]: {loss.item():.4f}, [epsilon]: {EPSILON:.3f}")

        # Update target network
        if step % TARGET_UPDATE == 0:
            target_q_net.load_state_dict(q_net.state_dict())
        
        EPSILON = max(MIN_EPSILON, EPSILON * EPSILON_DECAY_RATE)


Buffer filled so far: 1
Buffer filled so far: 501
Buffer filled so far: 1001
Buffer filled so far: 1501
[step]: 2100, [loss]: 2.6392, [epsilon]: 0.609
[step]: 2200, [loss]: 0.9626, [epsilon]: 0.369
[step]: 2300, [loss]: 0.8260, [epsilon]: 0.223
[step]: 2400, [loss]: 3.0183, [epsilon]: 0.135
[step]: 2500, [loss]: 0.1130, [epsilon]: 0.100
[step]: 2600, [loss]: 0.1056, [epsilon]: 0.100
[step]: 2700, [loss]: 0.4663, [epsilon]: 0.100
[step]: 2800, [loss]: 0.9471, [epsilon]: 0.100
[step]: 2900, [loss]: 0.2758, [epsilon]: 0.100
[step]: 3000, [loss]: 0.3405, [epsilon]: 0.100
[step]: 3100, [loss]: 15.1039, [epsilon]: 0.100
[step]: 3200, [loss]: 0.3922, [epsilon]: 0.100
[step]: 3300, [loss]: 0.3703, [epsilon]: 0.100
[step]: 3400, [loss]: 0.6406, [epsilon]: 0.100
[step]: 3500, [loss]: 0.4991, [epsilon]: 0.100
[step]: 3600, [loss]: 0.7357, [epsilon]: 0.100
[step]: 3700, [loss]: 0.4499, [epsilon]: 0.100
[step]: 3800, [loss]: 705.7902, [epsilon]: 0.100
[step]: 3900, [loss]: 0.3096, [epsilon]: 0.100


In [16]:
MODEL_FILE_PATH = "models/double_dqn.pth"
torch.save(q_net.state_dict(), MODEL_FILE_PATH)
print(f"Model state_dict saved to {MODEL_FILE_PATH}")

Model state_dict saved to models/double_dqn.pth


In [37]:
def evaluate_agent(q_net, num_episodes=10, eval_epsilon=0.0):
    q_net.eval()

    episode_rewards = []
    episode_lengths = []
    success_count = 0

    for ep in range(num_episodes):
        game.init()
        game.new_episode()
        done = False
        ep_reward = 0
        ep_length = 0

        while not done:
            state = game.get_state().screen_buffer
            state_tensor = get_tensored_state(state)

            with torch.no_grad():
                action_idx = select_action(state_tensor, q_net, epsilon=eval_epsilon)
            action_vector = game_actions[action_idx]

            # Execute action
            reward, done = game.make_action(action_vector), game.is_episode_finished()

            # Penalize blind attack
            did_attack = action_vector[2] == 1
            if did_attack and reward == 0:
                reward -= 10

            ep_reward += reward
            ep_length += 1

        print(f"Episode {ep+1}/{num_episodes} | Reward: {ep_reward} | Length: {ep_length}")
        episode_rewards.append(ep_reward)
        episode_lengths.append(ep_length)

        if ep_reward > 0:
            success_count += 1

    game.close()

    metrics = {
        'mean_reward': np.mean(episode_rewards).item(),
        'std_reward': np.std(episode_rewards).item(),
        'min_reward': np.min(episode_rewards).item(),
        'max_reward': np.max(episode_rewards).item(),
        'mean_length': np.mean(episode_lengths).item(),
        'std_length': np.std(episode_lengths).item(),
        'success_rate': (success_count / num_episodes),
        'episode_rewards': episode_rewards,    # keep as raw list
        'episode_lengths': episode_lengths     # keep as raw list
    }

    return metrics


metrics = evaluate_agent(q_net)
print(metrics)

Episode 1/10 | Reward: 95.0 | Length: 6
Episode 2/10 | Reward: 75.0 | Length: 26
Episode 3/10 | Reward: 95.0 | Length: 6
Episode 4/10 | Reward: 73.0 | Length: 28
Episode 5/10 | Reward: 95.0 | Length: 6
Episode 6/10 | Reward: 95.0 | Length: 6
Episode 7/10 | Reward: 71.0 | Length: 30
Episode 8/10 | Reward: 71.0 | Length: 25
Episode 9/10 | Reward: 95.0 | Length: 6
Episode 10/10 | Reward: 95.0 | Length: 6
{'mean_reward': 86.0, 'std_reward': 11.072488428533127, 'min_reward': 71.0, 'max_reward': 95.0, 'mean_length': 14.5, 'std_length': 10.480935072788114, 'success_rate': 1.0, 'episode_rewards': [95.0, 75.0, 95.0, 73.0, 95.0, 95.0, 71.0, 71.0, 95.0, 95.0], 'episode_lengths': [6, 26, 6, 28, 6, 6, 30, 25, 6, 6]}


In [33]:
def record_episode_video(q_net,filename="videos/doom_agent.mp4", fps=20, max_steps=300, eval_epsilon=0.0):
    q_net.eval()
    frames = []
    episode_reward = 0
    episode_length = 0

    game.init()
    game.new_episode()
    done = False

    while not done and episode_length < max_steps:
        state = game.get_state().screen_buffer
        state_tensor = get_tensored_state(state)

        with torch.no_grad():
            action_idx = select_action(state_tensor, q_net, epsilon=eval_epsilon)
        action_vector = game_actions[action_idx]

        reward, done = game.make_action(action_vector), game.is_episode_finished()
        episode_reward += reward
        episode_length += 1

        frame = state.transpose(1, 2, 0)   # (C,H,W) → (H,W,C)
        frames.append(frame)

    game.close()

    # save video
    if frames:
        height, width, _ = frames[0].shape
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        video = cv2.VideoWriter(filename, fourcc, fps, (width, height))

        for f in frames:
            video.write(cv2.cvtColor(f, cv2.COLOR_RGB2BGR))
        video.release()

        print(f"✅ Video saved as {filename} 🎥")
    else:
        print("⚠️ No frames captured, video not saved.")

    return episode_reward, episode_length

episode_reward, episode_length = record_episode_video(q_net)

✅ Video saved as videos/doom_agent.mp4 🎥
