In [1]:
# !pip install gymnasium[accept-rom-license]

In [2]:
from gymnasium.utils.play import play
import gymnasium as gym


def play_env():
    env = gym.make("ALE/BankHeist-v5", render_mode="rgb_array")
    env.metadata['render_fps'] = 15
    play(env, zoom=3)

# play_env()

In [3]:
from tqdm.auto import tqdm
import numpy as np

import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
class DQN(nn.Module):

    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__() # [b, 1, 136, 136]
        self.max_pool = nn.MaxPool2d(kernel_size=8, stride=2) # [b, 1, 68, 68]
        self.conv1 = nn.Conv2d(1, 16, kernel_size=5, padding=2) # [b, 16, 68, 68]
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1) # [b, 32, 136, 136]
        self.layer1 = nn.Linear(1 * 68 * 68, 2048)
        self.layer2 = nn.Linear(2048, 512)
        self.layer3 = nn.Linear(512, n_actions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = x.unsqueeze(1)
        # x = self.conv1(x)
        # x = self.max_pool(x)
        # x = self.conv2(x)
        x = torch.flatten(x, 1)
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return F.softmax(self.layer3(x), 1)

In [5]:
env = gym.make("ALE/BankHeist-v5", obs_type="grayscale", render_mode="human", frameskip=2)
env.reset()
env.render()

  logger.warn(


In [6]:
# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA is the discount factor as mentioned in the previous section
# EPS_START is the starting value of epsilon
# EPS_END is the final value of epsilon
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR is the learning rate of the ``AdamW`` optimizer
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.2
EPS_END = 0.2
EPS_DECAY = 10000
TAU = 0.005
LR = 1e-4

# Get number of actions from gym action space
n_actions = 6
# Get the number of state observations
state, info = env.reset()
n_observations = len(state.reshape(-1, 1))

model = DQN(n_observations, n_actions).to(device)
model.load_state_dict(torch.load('./policy_net_2.pt'))

<All keys matched successfully>

In [7]:
def convert_observation(observation: np.ndarray) -> np.ndarray[np.uint8]:
    converted_observation = observation[41:177, 12:148]


    bank_boxes = np.array([[[0, 0, 0, 0],
                            [0, 0, 142, 142],
                            [0, 142, 142, 142],
                            [0, 142, 0, 142]],
                           [[0, 0, 0, 0],
                            [142, 142, 0, 0],
                            [142, 142, 142, 0],
                            [142, 0, 142, 0]],
                           [[0, 142, 0, 142],
                            [0, 142, 0, 142],
                            [0, 142, 0, 142],
                            [142, 142, 142, 142]],
                           [[142, 0, 142, 0],
                            [142, 0, 142, 0],
                            [142, 0, 142, 0],
                            [142, 142, 142, 142]]], dtype=np.uint8)

    # print(bank_boxes)

    shape = (converted_observation.shape[0] // 2, converted_observation.shape[1] // 2)

    new_observation = np.zeros(shape, dtype=np.uint8)

    for num_row in range(0, shape[0]):
        num_row_start = num_row * 4
        num_row_finish = num_row_start + 4

        for num_col in range(0, shape[1]):
            num_col_start = num_col * 4
            num_col_finish = num_col_start + 4

            box = converted_observation[num_row_start:num_row_finish, num_col_start:num_col_finish]

            new_obs_row_start = num_row_start // 2
            new_obs_row_finish = num_row_finish // 2
            new_obs_col_start = num_col_start // 2
            new_obs_col_finish = num_col_finish // 2

            if np.all(box != 0): # стены
                new_observation[new_obs_row_start:new_obs_row_finish, new_obs_col_start:new_obs_col_finish] = 1
                # print(box[0,0])
                continue

            elif np.all(box == 0): # пол
                new_observation[new_obs_row_start:new_obs_row_finish, new_obs_col_start:new_obs_col_finish] = 0
                continue

            elif np.any(box == 110): # персонаж
                new_observation[new_obs_row_start:new_obs_row_finish, new_obs_col_start:new_obs_col_finish] = 2
                continue

            elif np.any(box == 41): # полиция
                new_observation[new_obs_row_start:new_obs_row_finish, new_obs_col_start:new_obs_col_finish] = 3
                continue

            elif np.any(box in bank_boxes): # банки и цифры реварда
                is_bank = False
                for bank_box in bank_boxes:
                    if np.all(box == bank_box):
                        is_bank = True
                        new_observation[new_obs_row_start:new_obs_row_finish, new_obs_col_start:new_obs_col_finish] = 4
                        break
                if is_bank:
                    continue
                else:
                    new_observation[new_obs_row_start:new_obs_row_finish, new_obs_col_start:new_obs_col_finish] = 3

            elif np.any(box != 0): # бимба
                new_observation[new_obs_row_start:new_obs_row_finish, new_obs_col_start:new_obs_col_finish] = 5
                continue
            # else:
                # print((num_row_start, num_row_finish), (num_col_start, num_col_finish), box, sep='\n')

    return new_observation

In [8]:
# convert_observation(state)

In [9]:
def select_action(state):
    sample = random.random()
    eps_threshold = 0.1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return the largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            temp = model(state).max(1).indices[0]
            if temp == 1:
                return torch.tensor([[0]], device=device, dtype=torch.long)
            else:
                return torch.tensor([[temp]], device=device, dtype=torch.long)

    else:
        temp = np.random.randint(5)
        if temp == 1:
            return torch.tensor([[0]], device=device, dtype=torch.long)
        else:
            return torch.tensor([[temp]], device=device, dtype=torch.long)

In [None]:
state, info = env.reset()
done = False
state = convert_observation(state)
state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
action = select_action(state)
observation, reward, terminated, truncated, _ = env.step(action.item())

while not done:
    state = convert_observation(observation)
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    action = select_action(state)
    observation, reward, terminated, truncated, _ = env.step(action.item())
    
    done = terminated or truncated

print('Complete')