In [62]:
# Khai báo các thư viện cần thiết
import torch
import gym
import numpy as np
from gym.spaces import Discrete
import random
import os
import pickle
from collections import deque
import torch.nn as nn
import torch.nn.functional as F

In [63]:
# Model cho MazeNet

class MazeNetCombined(nn.Module):
    def __init__(self, local_size=11, global_size=10, num_actions=4):
        super(MazeNetCombined, self).__init__()
        
        # Quan sát cục bộ
        self.conv1_local = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2_local = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3_local = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv4_local = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        
        # Quan sát toàn cục
        self.conv1_global = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2_global = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3_global = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv4_global = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        
        # Fully Connected cho vị trí hiện tại
        self.fc_position = nn.Linear(2, 32)

        # Tầng Fully Connected cuối cùng
        self.fc1 = nn.Linear(256 * local_size * local_size + 256 * global_size * global_size + 32, 256)
        self.fc2 = nn.Linear(256, 128)
        self.dropout_fc = nn.Dropout(p=0.5)  # Dropout trước tầng FC3
        self.fc3 = nn.Linear(128, num_actions)

    def forward(self, local_obs, global_obs, position):
        # Xử lý local_obs
        x_local = F.relu(self.conv1_local(local_obs))
        x_local = F.relu(self.conv2_local(x_local))
        x_local = F.relu(self.conv3_local(x_local))
        x_local = F.relu(self.conv4_local(x_local))
        x_local = x_local.view(x_local.size(0), -1)
    
        # Xử lý global_obs
        x_global = F.relu(self.conv1_global(global_obs))
        x_global = F.relu(self.conv2_global(x_global))
        x_global = F.relu(self.conv3_global(x_global))
        x_global = F.relu(self.conv4_global(x_global))
        x_global = x_global.view(x_global.size(0), -1)
        # Xử lý vị trí hiện tại
        x_position = F.relu(self.fc_position(position))

        # Kết hợp tất cả
        x = torch.cat((x_local, x_global, x_position), dim=1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.dropout_fc(x)  # Dropout trước FC3
        x = self.fc3(x)

        return x


In [None]:
# Môi trường Mê cung

class MazeEnv(gym.Env):
    # Môi trường mê cung
    # Maze: Mê cung được tạo ra ngẫu nhiên với các ô đường và tường
    # Với các ô đường được đánh dấu là 0 và các ô tường được đánh dấu là -1, ô đích được đánh dấu là 2
    # Discovered_maze: Mê cung đã được khám phá trong quá trình di chuyển của tác tử
    # Với các ô đường được đánh dấu là 0, các ô tường được đánh dấu là -5, đích được đánh dấu là 10, ô chưa được khám phá được đánh dấu là 1
    # Giá trị các ô đường giảm dần mỗi lần tác tử đi qua
    # Agent_position: Vị trí hiện tại của tác tử trong mê cung
    # Base_position: Vị trí ban đầu của tác tử trong mê cung
    # Goal_position: Vị trí đích trong mê cung
    # Buff: Biến để xác định xem tác tử có đang sử dụng buff senrigan hay không
    # Debuff: Biến để xác định xem tác tử có đang bị debuff shin no meiro hay không
    
    def __init__(self, maze_size, max_steps = 15, path_percent = 70):
        """
        Khởi tạo môi trường Mê cung.

        Args:
        - maze_size (int): Kích thước của mê cung (ví dụ: 50x50).
        - max_steps (int): Số bước tối đa cho mỗi tập.
        - path_percent (int): Tỷ lệ phần trăm ô đường trong mê cung (0-100).
        """
        # Đặt các thông số của môi trường

        super(MazeEnv, self).__init__()  # Kế thừa từ gym.Env
        self.data_set_name = ""
        self.export_data = {}
        
        self.maze_size = maze_size
        self.max_steps = max_steps
        self.path_percent = path_percent

        self.maze = np.ones((maze_size, maze_size), dtype=int)  # Mặc định là chưa biết (1)
        self.goal_position = (maze_size * 5 // 6, maze_size * 5 // 6)  # Đích cố định tại giữa khu vực đích
        self.buff = False
        self.debuff = False

        # Định nghĩa action_space (0: lên, 1: xuống, 2: trái, 3: phải)
        self.action_space = Discrete(4)
    
    # Các phương thức liên quan đến tạo data set
    def create_data_set(self, data_set_name):
        self.data_set_name = data_set_name
        self.data_export_count = 0
        os.makedirs(data_set_name, exist_ok=True)
        print(f"Data_set created: {data_set_name}")
    
    def export_maze_training_data(self):
        if self.data_set_name == "":
            return;
        self.data_export_count += 1
        self.export_data['agent_end_position'] = self.agent_position
        with open(f"{self.data_set_name}/maze_data_{self.data_export_count}.pkl", "wb") as f:
            pickle.dump(self.export_data, f)
    
    # Các phương thức liên quan đến tái tạo mê cung
    def reset(self):
        self.base_position = (self.maze_size // 6 - 1, self.maze_size // 6 - 1)
        self.agent_position = self.base_position
        return self.regenerate_maze(0, self.path_percent)
    
    def regenerate_maze(self, adder = 0, path_percent = 70, ):
        self.maze = self.generate_maze(adder, path_percent)
        self.create_discovered_maze()
        self.discover_maze()
        self.export_data['local_obs'] = self.get_observation()[2:9, 2:9]
        self.export_data['agent_start_position'] = self.agent_position
        return (self.get_observation(), self.downsample(self.discovered_maze), self.agent_position)

    def generate_maze(self, adder = 0, path_percent=70):
        if self.agent_position[0] >= self.maze_size * 2 // 3 and self.agent_position[1] >= self.maze_size * 2 // 3:
            number_of_path = 1 + adder
        else:
            number_of_path = 2 + adder
            
        while True:  # Sử dụng vòng lặp để tạo lại mê cung nếu không hợp lệ
            total_cells = self.maze_size * self.maze_size
            num_paths = int(total_cells * path_percent / 100)
            num_walls = total_cells - num_paths
    
            # Tạo danh sách ngẫu nhiên các giá trị (đường hoặc tường)
            maze_values = [0] * num_paths + [-1] * num_walls
            random.shuffle(maze_values)  # Xáo trộn các giá trị
    
            # Điền vào mê cung
            self.maze = np.array(maze_values).reshape(self.maze_size, self.maze_size)
    
            # Đặt điểm bắt đầu và đích
            self.maze[self.agent_position] = 0  # Đảm bảo điểm bắt đầu là đường (0)
            self.maze[self.goal_position] = 2  # Đảm bảo điểm đích là đường (2)
    
            # Kiểm tra tính hợp lệ
            if self.validate_maze(number_of_path):
                break  # Nếu mê cung hợp lệ, thoát vòng lặp
        return self.maze

    def validate_maze(self, number_of_path = 0):
        stack = [self.agent_position]
        visited = set()
        d = 0
        
        while stack:
            x, y = stack.pop()
            if self.debuff:
                if (x, y) == self.goal_position:
                    return False

            if (x, y) == self.goal_position:
                d += 1
                if d == number_of_path:
                    return True
                continue
            
            if (x, y) in visited:
                continue
            visited.add((x, y))

            # Thêm các ô lân cận vào stack
            neighbors = self.get_neighbors(x, y)
            for nx, ny in neighbors:
                if self.maze[nx, ny] > -1 and (nx, ny) not in visited:  # Chỉ đi qua đường
                    stack.append((nx, ny))
                    
        return False or self.debuff  # Không có đường tới đích

    def get_neighbors(self, x, y):
        """
        Lấy danh sách các ô lân cận.
        """
        neighbors = []
        for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
            nx, ny = x + dx, y + dy
            if 0 <= nx < self.maze_size and 0 <= ny < self.maze_size:
                neighbors.append((nx, ny))
        return neighbors
    
    # Các phương thức liên quan đến mê cung được khám phá
    def create_discovered_maze(self):
        self.discovered_maze = np.ones((self.maze_size + 10, self.maze_size + 10), dtype = int) # Mặc định là chưa biết (1)
        self.discovered_maze[0 : 5, :] = -5
        self.discovered_maze[:, 0 : 5] = -5
        self.discovered_maze[self.maze_size + 5 : self.maze_size + 10, :] = -5
        self.discovered_maze[:, self.maze_size + 5 : self.maze_size + 10] = -5

    def discover_maze(self):
        x, y = self.agent_position
        obs_size = 11  # Kích thước quan sát (11x11)
        if self.buff:
            half_size = 5
        else:
            half_size = 3

        # Xác định giới hạn của vùng quan sát trong mê cung
        min_x = max(0, x - half_size)
        max_x = min(self.maze_size, x + half_size + 1)
        min_y = max(0, y - half_size)
        max_y = min(self.maze_size, y + half_size + 1)

        # Điền dữ liệu từ mê cung vào vùng khám phá
        for x in range(min_x, max_x):
            for y in range(min_y, max_y):
                if self.discovered_maze[x + 5, y + 5] == 1:
                    self.discovered_maze[x + 5, y + 5] = self.maze[x, y] * 5
        
    # Phương thức chính để thực hiện hành động trong môi trường
    def step(self, action):
        """
        Thực hiện hành động và cập nhật trạng thái của môi trường.
    
        Args:
        - action (int): Hành động (0: lên, 1: xuống, 2: trái, 3: phải)
    
        Returns:
        - observation (np.array): Vùng quan sát quanh tác tử
        - done (bool): Trạng thái kết thúc
        """
        # Lấy vị trí hiện tại của tác tử
        x, y = self.agent_position
    
        # Xác định vị trí mới dựa trên hành động
        if action == 0:  # Lên
            new_x, new_y = x - 1, y
        elif action == 1:  # Xuống
            new_x, new_y = x + 1, y
        elif action == 2:  # Trái
            new_x, new_y = x, y - 1
        elif action == 3:  # Phải
            new_x, new_y = x, y + 1
        
        # Khởi tạo biến done
        done = False

        # Cập nhật vị trí tác tử
        if self.valid_check((new_x, new_y)):  # Nếu vị trí mới hợp lệ và không phải là tường
            # Nếu vị trí mới hợp lệ, cập nhật vị trí tác tử
            self.agent_position = (new_x, new_y)
            self.discover_maze()

            # Kiểm tra nếu đến đích
            if self.agent_position == self.goal_position:
                done = True
            
            # Giảm giá trị ô đã khám phá 
            if self.discovered_maze[new_x + 5, new_y + 5] > -5:
                self.discovered_maze[new_x + 5, new_y + 5] -= 1        
                
        # Tạo quan sát hiện tại
        local_obs = self.get_observation()

        #Tạo quan sát toàn mê cung
        global_obs = self.downsample(self.discovered_maze)

        return local_obs, global_obs, self.agent_position, done

    # Phương thức để downsample mê cung
    def downsample(self, maze, block_size = 5):
        """
        Downsample toàn bộ mê cung bằng cách lấy tổng của các ô.
    
        Args:
        - maze: Mê cung kích thước lớn (ví dụ: 50x50).
        - block_size: Kích thước mỗi khối để downsample (ví dụ: 5x5).
    
        Returns:
        - Downsampled maze.
        """
        from skimage.measure import block_reduce
        return block_reduce(maze, block_size=(block_size, block_size), func=np.sum)

    # Xuất dữ liệu mê cung
    def render(self):
        render_maze = np.zeros((self.maze_size, self.maze_size), dtype = int) 
        for i in range(self.maze_size):
            for j in range(self.maze_size):
                if self.maze[i][j] == -1:
                    render_maze[i][j] = 1
                    continue
                if self.maze[i][j] == 2:
                    render_maze[i][j] = 10
                    continue
                if self.discovered_maze[i + 5, j + 5] < 0:
                    render_maze[i][j] = 2
        print(render_maze)
    
    # Phương thức để lấy quan sát hiện tại của tác tử
    def get_observation(self):
        x, y = self.agent_position
        observation = np.zeros((11,11), dtype = int)
        observation[0 : 11, 0 : 11] = self.discovered_maze[x: x + 11, y : y + 11]
        return observation

    # Phương thức để kích hoạt buff
    def activate_buff(self, buff):
        self.regenerate_maze()
        if buff == 'senrigan':
            self.buff = True
        else:
            self.buff = False
        if buff == 'slime-san onegai':
            self.bfs(self.agent_position, 50)
        if buff == 'tou no hikari':
            min_x = max(0, self.agent_position[0] - 10)
            max_x = min(self.maze_size, self.agent_position[0] + 11)
            min_y = max(0, self.agent_position[1] - 10)
            max_y = min(self.maze_size, self.agent_position[1] + 11)
            for x in range(min_x, max_x):
                for y in range(min_y, max_y):
                    if self.discovered_maze[x + 5, y + 5] == 1:
                        self.discovered_maze[x + 5, y + 5] = 5 * self.maze[x, y] 
        if buff == 'unmei no michi':
            self.regenerate_maze(1)
        return self.get_observation(), self.downsample(self.discovered_maze), self.agent_position
    
    # Phương thức để kích hoạt debuff
    def activate_debuff(self, debuff):
        if debuff == 'waamu houru':
            self.agent_position = (random.randint(self.maze_size // 3, self.maze_size), random.randint(self.maze_size // 3, self.maze_size))
        if debuff == 'shin no meiro':
            self.debuff = True
        else:
            self.debuff = False
        return self.regenerate_maze()
    
    # Thuật toán BFS phục vụ buff slime-san onegai
    def bfs(self, position, step = 50):
        """
        Thuật toán BFS (Breadth-First Search)
    
        Args:
        - position: Đỉnh bắt đầu tìm kiếm.
        - step: số bược di chuyển
    
        """
        # Tập các đỉnh đã duyệt
        visited = set()
        visited_order = []
    
        # Hàng đợi (FIFO) để quản lý các đỉnh
        queue = deque([position])
    
        # Bắt đầu duyệt đồ thị
        while queue:
            # Lấy một đỉnh từ hàng đợi
            current = queue.popleft()
            x,y = current
            
            # Kiểm tra nếu đỉnh chưa được duyệt
            if current not in visited:
                visited.add(current)
                visited_order.append(current)
                if self.discovered_maze[x + 5, y + 5] == 1:
                    self.discovered_maze[5 + x, 5 + y] = 5 * self.maze[x, y]
                if len(visited_order) >= step:
                    break
                neighbors = self.get_neighbors(x, y)
                for nx, ny in neighbors:
                    if self.maze[nx, ny] > -1 and (nx, ny) not in visited:  # Chỉ đi qua đường
                        queue.append((nx, ny))

    # Phương thức để kiểm tra tính hợp lệ của một vị trí
    def valid_check(self, p1):
        if 0 <= p1[0] < self.maze_size and 0 <= p1[1] < self.maze_size and self.maze[p1] > -1:
            return True
        return False

    # Phương thức xác định hệ số tau
    def tau_coefficient(self, coefficient = 2):
        return ((abs(self.agent_position[0] - self.goal_position[0]) + abs(self.agent_position[1] - self.goal_position[1])) / (4 / 3 * self.maze_size)) ** coefficient

In [65]:
# Phương thức để tải model
def load_model(model_path, device = "cuda"):
    # Load the model
    model = torch.load(model_path, map_location=device, weights_only=False)
    model.eval()  # Chuyển model về chế độ đánh giá (evaluation mode)
    return model

In [66]:
# Tải model
while True:
    model_path = input("Enter the model name: ")
    if os.path.exists(model_path + "/model.pth"):
        break
    else:
        print("Model not found. Please try again.")

model = load_model(model_path + "/model.pth")
with open(model_path + "/model_info.pkl", "rb") as f:
    model_info = pickle.load(f)
    buff = model_info['buff']
    tau_start = model_info['tau_start']
    tau_end = model_info['tau_end']
    tau_decay = model_info['tau_decay']
    tau_decay_exponent = model_info['tau_decay_exponent']

print(f"Model loaded: {model_path}")

Model loaded: model05


In [67]:
# Khởi tạo các siêu tham số
maze_size = 30
max_steps = 15
path_percent = 70
num_actions = 4

max_episodes = 50000
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

Using device: cuda


In [68]:
# Chọn hành động
def select_action(env, policy_net, local_obs, global_obs, position, tau, num_actions, device):
    """
    Chọn hành động dựa trên chiến lược Boltzmann Exploration.

    Args:
    - policy_net (nn.Module): Mạng chính để dự đoán giá trị Q(s, a).
    - local_obs (np.array): Quan sát cục bộ (ví dụ: 11x11).
    - global_obs (np.array): Quan sát toàn bộ mê cung đã downsample (ví dụ: 10x10).
    - position (list or np.array): Vị trí hiện tại của tác tử (dx, dy).
    - tau (float): Xác suất chọn hành động ngẫu nhiên (khám phá).
    - num_actions (int): Số lượng hành động (ví dụ: 4: lên, xuống, trái, phải).
    - device (torch.device): Thiết bị thực thi (CPU hoặc GPU).

    Returns:
    - action (int): Hành động được chọn (0, 1, 2, 3).
    """
    p1 = (position[0] - 1, position[1])
    p2 = (position[0] + 1, position[1])
    p3 = (position[0], position[1] - 1)
    p4 = (position[0], position[1] + 1)

    # Loại bỏ các hành động không hợp lệ (nếu cần)
    valid_actions = []
    if env.valid_check(p1): valid_actions.append(0)  # Lên
    if env.valid_check(p2): valid_actions.append(1)  # Xuống
    if env.valid_check(p3): valid_actions.append(2)  # Trái
    if env.valid_check(p4): valid_actions.append(3)  # Phải

    # Chuyển đổi các quan sát thành Tensor để đưa vào mạng
    local_obs_tensor = torch.tensor(local_obs, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(device)
    global_obs_tensor = torch.tensor(global_obs, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(device)
    position_tensor = torch.tensor(position, dtype=torch.float32).unsqueeze(0).to(device)

    # Dự đoán giá trị Q(s, a) cho tất cả các hành động
    q_values = policy_net(local_obs_tensor, global_obs_tensor, position_tensor)

    # Lấy giá trị Q chỉ cho các hành động hợp lệ
    valid_q_values = q_values.squeeze()[valid_actions]  # Chỉ giữ Q của các hành động hợp lệ

    # Tính xác suất Boltzmann (softmax)
    probabilities = torch.softmax(valid_q_values / tau, dim=0)

    # Chọn hành động dựa trên xác suất Boltzmann
    selected_index = torch.multinomial(probabilities, num_samples=1).item()
    action = valid_actions[selected_index]
    
    return action


In [69]:
# Vòng lặp chạy chương trình
counter = 0
step_min = 1000
step_max = 0
done_count = 0
env = MazeEnv(maze_size=maze_size, max_steps=max_steps, path_percent=path_percent)
env.reset()
env.create_data_set(model_path + "/dataset")
for episode in range(max_episodes):
    if counter % env.max_steps == 0:
        if counter:
            env.export_maze_training_data()
        
        # Khởi tạo trạng thái môi trường
        local_obs, global_obs, position = env.regenerate_maze()
        local_obs, global_obs, position = env.activate_buff(buff)
        tau = tau_start * env.tau_coefficient(tau_decay_exponent)
    counter += 1

    # Chọn hành động
    action = select_action(env, model, local_obs, global_obs, position, tau, num_actions, device)

    # Thực hiện hành động trong môi trường
    local_obs, global_obs, position, done = env.step(action)
    
    # Giảm tau theo thời gian
    tau = max(tau_end, tau * tau_decay)
    
    # reset môi trường nếu đạt được mục tiêu
    if done:
        env.export_maze_training_data()
        done_count += 1
        print(f"Done counter: {done_count}")
        print(f"Number of steps: {counter}")
        if counter > step_max:
            step_max = counter
        if counter < step_min: 
            step_min = counter
        counter = 0
        local_obs, global_obs, position = env.reset()
        local_obs, global_obs, position = env.activate_buff(buff)
        tau = tau_start * env.tau_coefficient(env.tau_coefficient())

print(f"average number of steps: {(max_episodes - counter) // done_count}")
print(f"max: {step_max}")
print(f"min: {step_min}")


Data_set created: model05/dataset
Done counter: 1
Number of steps: 54
Done counter: 2
Number of steps: 88
Done counter: 3
Number of steps: 44
Done counter: 4
Number of steps: 42
Done counter: 5
Number of steps: 50
Done counter: 6
Number of steps: 48
Done counter: 7
Number of steps: 60
Done counter: 8
Number of steps: 78
Done counter: 9
Number of steps: 52
Done counter: 10
Number of steps: 48
Done counter: 11
Number of steps: 46
Done counter: 12
Number of steps: 44
Done counter: 13
Number of steps: 44
Done counter: 14
Number of steps: 42
Done counter: 15
Number of steps: 44
Done counter: 16
Number of steps: 44
Done counter: 17
Number of steps: 44
Done counter: 18
Number of steps: 62
Done counter: 19
Number of steps: 62
Done counter: 20
Number of steps: 64
Done counter: 21
Number of steps: 68
Done counter: 22
Number of steps: 44
Done counter: 23
Number of steps: 42
Done counter: 24
Number of steps: 76
Done counter: 25
Number of steps: 50
Done counter: 26
Number of steps: 54
Done counter: