In [13]:
# Окружение и обёртка из Masked DQN, но без изменений
import gymnasium as gym
from gymnasium import spaces
from collections import deque
import numpy as np
import copy
import time

class WaterSortEnvFixed(gym.Env):
    """
    Окружение Water Sort Puzzle с фиксированным числом пробирок и слоёв.
    """

    def __init__(self, num_tubes=4, max_layers=4, num_empty=1, num_colors=3, max_steps=300):
        super().__init__()
        self.num_tubes = num_tubes
        self.max_layers = max_layers
        self.num_empty = num_empty
        self.num_colors = num_colors
        self.max_steps = max_steps

        # MultiDiscrete([from, to])
        self.action_space = spaces.MultiDiscrete([num_tubes, num_tubes])
        # Box state: (N, K)
        self.observation_space = spaces.Box(
            low=-1, high=num_colors - 1,
            shape=(num_tubes, max_layers), dtype=int
        )

        self.state = None
        self.steps = 0
        self.prev_action = None
        self.recent_states = deque(maxlen=10)

    def reset(self, seed=None, options=None, previous=False):
        super().reset(seed=seed)
        self.recent_states.clear()

        if previous and hasattr(self, "prev_state"):
            self.state = self.prev_state.copy()
        else:
            # Генерация нового уровня
            while True:
                self.state = np.full((self.num_tubes, self.max_layers), -1, dtype=int)
                filled = self.num_tubes - self.num_empty
                slots = filled * self.max_layers
                counts = slots // self.num_colors
                colors = np.repeat(np.arange(self.num_colors), counts)
                self.np_random.shuffle(colors)
                idx = 0
                for t in range(filled):
                    for l in range(self.max_layers):
                        self.state[t, l] = colors[idx];
                        idx += 1
                if not self._is_solved():
                    break

        self.steps = 0
        obs = self._get_obs()
        self.prev_state = obs.copy()
        self.prev_action = None
        return obs, {}

    def step(self, action):
        frm, to = action
        reward = -1
        info = {}

        # проверка валидности
        if frm == to or frm < 0 or to < 0 or frm >= self.num_tubes or to >= self.num_tubes:
            reward -= 0.05
        elif self.prev_action == (frm, to):
            reward -= 0.1
        elif self._can_pour(frm, to):
            before = self._count_sorted()
            self._pour(frm, to)
            after = self._count_sorted()
            reward += 1 if after > before else 0
        else:
            reward -= 0.05

        self.prev_action = (frm, to)
        self.steps += 1

        obs = self._get_obs()
        done = self._is_solved()
        no_moves = len(self.fast_get_valid_actions(obs.flatten())) == 0
        truncated = self.steps >= self.max_steps or no_moves

        # штраф за повтор состояния
        tup = tuple(obs.flatten())
        if tup in self.recent_states:
            reward -= 3.0
        self.recent_states.append(tup)

        return obs, reward, done, truncated, info

    def _get_obs(self):
        return self.state.copy()

    def _is_solved(self):
        for tube in self.state:
            if np.all(tube == -1): continue
            if np.any(tube == -1): return False
            if len(set(tube)) != 1: return False
        return True

    def _count_sorted(self):
        cnt = 0
        for tube in self.state:
            if np.any(tube == -1): continue
            if len(set(tube)) == 1: cnt += 1
        return cnt

    def _find_top(self, tube):
        for i in range(self.max_layers):
            if self.state[tube, i] != -1: return i
        return -1

    def _can_pour(self, frm, to):
        f = self._find_top(frm)
        if f < 0: return False
        t = self._find_top(to)
        if t == 0: return False
        c = self.state[frm, f]
        if t < 0: return True
        return c == self.state[to, t]

    def _pour(self, frm, to):
        f = self._find_top(frm)
        color = self.state[frm, f]
        cnt = 1
        i = f + 1
        while i < self.max_layers and self.state[frm, i] == color:
            cnt += 1;
            i += 1
        t = self._find_top(to)
        dest = self.max_layers - 1 if t < 0 else t - 1
        while cnt > 0 and dest >= 0:
            if self.state[to, dest] == -1:
                self.state[to, dest] = color
                self.state[frm, f] = -1
                f += 1;
                dest -= 1;
                cnt -= 1
            else:
                break

    def fast_get_valid_actions(self, flat_obs, *, ignore_prev=False):
        N, K = self.num_tubes, self.max_layers
        obs = flat_obs.reshape(N, K)
        empty = (obs == -1).all(axis=1)
        tops = np.where(empty, K, (obs != -1).argmax(axis=1))
        free = K - (K - tops)
        topcol = np.full(N, -2, dtype=int)
        mask_fill = ~empty
        topcol[mask_fill] = obs[mask_fill, tops[mask_fill]]
        can_from = ~empty
        can_to = free > 0
        same_or_empty = (topcol[:, None] == topcol[None, :]) | empty[None, :]
        mask = can_from[:, None] & can_to[None, :] & same_or_empty
        np.fill_diagonal(mask, False)
        if not ignore_prev and self.prev_action is not None:
            idx = self.prev_action[0] * N + self.prev_action[1]
            mask.flat[idx] = False
        return np.flatnonzero(mask.ravel()).tolist()




In [4]:
class DiscreteActionWrapper(gym.Wrapper):
    """
    Преобразование MultiDiscrete→Discrete и flatten(obs).
    """

    def __init__(self, env):
        super().__init__(env)
        N = env.num_tubes
        self.env = env
        self.action_space = spaces.Discrete(N * N)
        self.observation_space = spaces.Box(
            low=-1, high=env.num_colors - 1,
            shape=(N * env.max_layers,), dtype=int
        )

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        return obs.flatten(), info

    def step(self, action):
        N = self.env.num_tubes
        frm, to = divmod(action, N)
        obs, reward, done, trunc, info = self.env.step((frm, to))
        return obs.flatten(), reward, done, trunc, info

    def fast_get_valid_actions(self, flat_obs, *, ignore_prev=False):
        return self.env.fast_get_valid_actions(flat_obs, ignore_prev=ignore_prev)


# Обычный DQN-агент (Vanilla), без маскирования
import torch.nn as nn
import torch.optim as optim


class VanillaDQNAgent(nn.Module):
    """
    Vanilla DQN + target-net. ε-жадный по всему action_space.
    """

    def __init__(self, state_dim, action_dim, net_arch=[256, 256], lr=1e-4, device='cpu'):
        super().__init__()
        self.device = device

        # Q-сеть
        layers = []
        inp = state_dim
        for h in net_arch:
            layers += [nn.Linear(inp, h), nn.ReLU()]
            inp = h
        layers.append(nn.Linear(inp, action_dim))
        self.q_net = nn.Sequential(*layers).to(device)

        # target-сеть
        self.q_net_target = copy.deepcopy(self.q_net)
        for p in self.q_net_target.parameters():
            p.requires_grad = False

        self.epsilon = 1.0
        self.optimizer = optim.Adam(self.q_net.parameters(), lr=lr)
        self.to(device)

    def forward(self, x):
        return self.q_net(x)

    def predict_qvalues(self, obs: np.ndarray) -> np.ndarray:
        t = torch.FloatTensor(obs).to(self.device)
        with torch.no_grad():
            q = self.q_net(t).cpu().numpy()
        return q

    def sample_actions(self, obs: np.ndarray) -> np.ndarray:
        """
        ε-жадный по всему action_dim.
        """
        q = self.predict_qvalues(obs)  # shape (B,A)
        B, A = q.shape
        acts = np.empty(B, dtype=int)
        for i in range(B):
            if random.random() < self.epsilon:
                acts[i] = random.randrange(A)
            else:
                acts[i] = int(np.argmax(q[i]))
        return acts

    def update_target(self):
        self.q_net_target.load_state_dict(self.q_net.state_dict())


In [5]:
from collections import deque
import random
import numpy as np
import torch
import matplotlib.pyplot as plt
from IPython.display import clear_output

class ReplayBuffer:
    def __init__(self, capacity=10**5):
        self.buffer = deque(maxlen=capacity)

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size=64):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            np.array(states),
            np.array(actions),
            np.array(rewards, dtype=np.float32),
            np.array(next_states),
            np.array(dones, dtype=bool)
        )

    def __len__(self):
        return len(self.buffer)


def compute_vanilla_dqn_loss(agent: VanillaDQNAgent,
                             env,            # нужен только для сигнатуры
                             batch,
                             gamma=0.99,
                             device='cpu'):
    """
    MSE-loss для Vanilla DQN:
      target = r + γ * max_a' Q_target(s',a') * (1 - done)
    """
    states, actions, rewards, next_states, dones = batch
    B = len(states)

    states_t      = torch.FloatTensor(states).to(device)
    actions_t     = torch.LongTensor(actions).to(device)
    rewards_t     = torch.FloatTensor(rewards).to(device)
    next_states_t = torch.FloatTensor(next_states).to(device)
    dones_t       = torch.BoolTensor(dones).to(device)

    # Q(s,a)
    q_values = agent.q_net(states_t)                   # (B, A)
    q_taken  = q_values[range(B), actions_t]           # (B,)

    # target via target-network
    with torch.no_grad():
        q_next_target, _ = agent.q_net_target(next_states_t).max(dim=1)  # (B,)
        target = rewards_t + gamma * q_next_target * (~dones_t)

    loss = torch.nn.functional.mse_loss(q_taken, target)
    return loss


def evaluate_by_mean(env_eval,
                     agent,
                     n_games: int,
                     bfs_timeout: int = 100_000):
    """
    Возвращает ту же группу метрик, что и в masked evaluate_by_mean,
    но агент выбирает actions через ε-greedy без маски.
    """
    rewards_all, steps_all    = [], []
    rewards_clean, steps_clean= [], []
    steps_math                = []

    n_win_games = n_step_limit = n_no_moves = 0

    for _ in range(n_games):
        obs0, _ = env_eval.reset(previous=False)
        obs     = obs0.copy()
        total_r = 0.0
        agent_steps = 0

        # игра агента
        while True:
            old_eps = agent.epsilon
            agent.epsilon = 0.0
            action = agent.sample_actions(obs[None])[0]
            agent.epsilon = old_eps

            obs, rew, terminated, truncated, info = env_eval.step(action)
            total_r   += rew
            agent_steps += 1

            if terminated or truncated:
                break

        rewards_all.append(total_r)
        steps_all.append(agent_steps)

        clean_ep = not info.get("step_limit_reached", False) \
                   and not info.get("no_valid_moves", False)

        if clean_ep:
            rewards_clean.append(total_r)
            steps_clean.append(agent_steps)
            n_win_games += 1

            if terminated:
                path_opt, len_opt, *_ = solve_optimal_path(
                    env_eval, obs0, max_expansions=bfs_timeout)
                if len_opt is not None:
                    steps_math.append(len_opt)
        else:
            # уровень не «чисто» решён — проверяем решаемость
            path_opt, len_opt, *_ = solve_optimal_path(
                env_eval, obs0, max_expansions=bfs_timeout)
            if len_opt is not None:
                n_step_limit += 1
            else:
                n_no_moves += 1

    # агрегаты
    mean_reward_all   = float(np.mean(rewards_all))
    mean_steps_all    = float(np.mean(steps_all))
    mean_reward_clean = float(np.mean(rewards_clean)) if rewards_clean else float("nan")
    mean_steps_clean  = float(np.mean(steps_clean)) if steps_clean else float("nan")
    mean_steps_math   = float(np.mean(steps_math)) if steps_math else float("nan")
    delta_steps       = mean_steps_clean - mean_steps_math if not np.isnan(mean_steps_math) else float("nan")

    return (
        mean_reward_all,   mean_steps_all,
        mean_reward_clean, mean_steps_clean,
        mean_steps_math,   delta_steps,
        n_win_games,       n_step_limit, n_no_moves
    )


def draw_graphs(r_all, s_all,
                r_clean, s_clean,
                s_math, delt,
                n_win, n_limit, n_nomove,
                eval_period, total_timesteps):
    """
    Тот же 5-панельный дашборд, что и у вас в masked DQN.
    """
    clear_output(wait=True)
    x = np.arange(1, len(r_all) + 1) * eval_period

    fig = plt.figure(figsize=(18, 15))
    gs  = fig.add_gridspec(3, 2,
                           height_ratios=[1, 0.9, 0.6],
                           width_ratios =[1, 0.8])

    # ① Mean reward
    ax0 = fig.add_subplot(gs[0, 0])
    ax0.set_title("Mean reward")
    ax0.plot(x, r_all,   label="all")
    ax0.plot(x, r_clean, label="clean")
    ax0.set_ylabel("R"); ax0.grid(); ax0.legend()

    # ② Mean steps (+ ideal)
    ax1 = fig.add_subplot(gs[0, 1])
    ax1.set_title("Mean steps")
    ax1.plot(x, s_all,   label="all")
    ax1.plot(x, s_clean, label="clean")
    ax1.plot(x, s_math,  label="ideal")
    ax1.set_ylabel("steps"); ax1.grid(); ax1.legend()

    # ③ Episode counters
    ax2 = fig.add_subplot(gs[1, 0])
    ax2.set_title("Episode counters / evaluation")
    ax2.plot(x, n_win,    label="wins")
    ax2.plot(x, n_limit,  label="step-limit")
    ax2.plot(x, n_nomove, label="no-moves")
    ax2.set_xlabel(f"env steps (×{eval_period})")
    ax2.set_ylabel("episodes / 100 runs")
    ax2.grid(); ax2.legend()

    # ④ Summary table
    ax3 = fig.add_subplot(gs[1, 1]);  ax3.axis("off")
    last = lambda arr: (arr[-1] if arr else float("nan"))
    table_data = [
        ["reward all",   f"{last(r_all):.2f}"],
        ["reward clean", f"{last(r_clean):.2f}"],
        ["steps  all",   f"{last(s_all):.1f}"],
        ["steps  clean", f"{last(s_clean):.1f}"],
        ["steps  math",  f"{last(s_math):.1f}"],
        ["Δ clean-math", f"{last(delt):+.1f}"],
        ["wins",         int(last(n_win))],
        ["step-limit",   int(last(n_limit))],
        ["no-moves",     int(last(n_nomove))],
    ]
    tbl = ax3.table(cellText=table_data,
                    colLabels=["Metric", "Last value"],
                    loc="center", cellLoc="center")
    tbl.scale(1, 2.1); tbl.auto_set_font_size(False); tbl.set_fontsize(12)

    # ⑤ Δ-steps plot
    ax4 = fig.add_subplot(gs[2, 0])
    ax_dummy = fig.add_subplot(gs[2, 1])
    ax_dummy.axis("off")
    ax4.set_title("Δ-steps (clean − math)")
    ax4.plot(x, delt, label="Δ steps", color="tab:red")
    ax4.set_xlabel("environment steps"); ax4.set_ylabel("Δ"); ax4.grid(); ax4.legend()

    fig.tight_layout()
    plt.show()

    return fig


In [11]:
def solve_optimal_path(wrapped_env,
                       flat_obs: np.ndarray,
                       max_expansions: int = 100_000):
    """
    BFS-поиск кратчайшего решения для текущего состояния.

    Возврат:
        path        – список действий (формат Discrete, как в env.action_space)
        length      – длина решения (None, если не найдено)
        expansions  – сколько состояний было развёрнуто
        t_seconds   – время работы поиска
    """
    # параметры головоломки
    env = wrapped_env.env
    N, K = env.num_tubes, env.max_layers

    start_tuple = tuple(flat_obs)                 # неизменяемое представление

    # ── вспомогательные чистые функции на numpy-матрицах ──────────────────────
    def is_solved(mat: np.ndarray) -> bool:
        for t in range(N):
            tube = mat[t]
            if np.all(tube == -1):                     # полностью пустая
                continue
            if np.any(tube == -1) or len(set(tube)) != 1:
                return False
        return True

    def find_top(mat, tube):
        for i in range(K):
            if mat[tube, i] != -1:
                return i
        return -1

    def can_pour(mat, fr, to):
        if fr == to:
            return False
        fr_top = find_top(mat, fr)
        if fr_top == -1:
            return False
        to_top = find_top(mat, to)
        if to_top == 0:                              # приёмник полон
            return False
        from_color = mat[fr, fr_top]
        return (to_top == -1) or (mat[to, to_top] == from_color)

    def pour(mat, fr, to):
        """возвращает НОВУЮ матрицу после переливания fr→to"""
        mat = mat.copy()
        fr_top = find_top(mat, fr)
        color  = mat[fr, fr_top]

        # сколько одинаковых цветных слоёв сверху?
        count = 1
        idx = fr_top + 1
        while idx < K and mat[fr, idx] == color:
            count += 1; idx += 1

        to_top = find_top(mat, to)
        to_idx = K - 1 if to_top == -1 else to_top - 1

        while count and to_idx >= 0 and mat[to, to_idx] == -1:
            mat[to, to_idx] = color
            mat[fr, fr_top] = -1
            fr_top += 1
            to_idx -= 1
            count -= 1
        return mat

    # ── сам BFS ───────────────────────────────────────────────────────────────
    t0 = time.time()
    q = deque([(start_tuple, [])])
    visited = {start_tuple}
    expansions = 0

    while q and expansions < max_expansions:
        state_tuple, path = q.popleft()
        state_mat = np.array(state_tuple).reshape(N, K)

        if is_solved(state_mat):
            return path, len(path), expansions, time.time() - t0

        for a in wrapped_env.fast_get_valid_actions(np.array(state_tuple),
                                                    ignore_prev=True):
            fr, to = divmod(a, N)
            if not can_pour(state_mat, fr, to):
                continue
            nxt = pour(state_mat, fr, to)
            nxt_tuple = tuple(nxt.flatten())
            if nxt_tuple not in visited:
                visited.add(nxt_tuple)
                q.append((nxt_tuple, path + [a]))

        expansions += 1

    # не нашли решение
    return [], None, expansions, time.time() - t0

In [6]:
import random
from collections import deque
from tqdm import trange

import numpy as np
import torch
import torch.nn as nn

def train_vanilla_dqn(
    env,
    env_eval,
    agent,
    total_timesteps: int = 200_000,
    device: str = 'cpu',
    evaluate_interval: int = 10_000
):
    # Параметры тренинга
    buffer_size = 10**5
    batch_size = 64
    learning_starts = 10_000
    train_freq = 4
    target_update_interval = 10_000
    gamma = 0.99

    exploration_fraction = 0.5
    exploration_final_eps = 0.1
    max_grad_norm = 10.0

    replay_buffer = ReplayBuffer(capacity=buffer_size)

    def linear_epsilon(step):
        frac = min(step / (exploration_fraction * total_timesteps), 1.0)
        return 1.0 + frac * (exploration_final_eps - 1.0)

    # ——— Начальное заполнение буфера —————————————
    obs, _ = env.reset()
    for _ in range(learning_starts):
        # ε-greedy без маски
        if random.random() < 1.0:
            action = env.action_space.sample()
        else:
            action = agent.sample_actions(obs[None])[0]

        next_obs, reward, done, truncated, _ = env.step(action)
        replay_buffer.add(obs, action, reward, next_obs, done or truncated)
        obs = next_obs
        if done or truncated:
            obs, _ = env.reset()

    # ——— Логгеры —————————————————————————————————
    rewards_history = []
    steps_history = []

    # Для evaluate_by_mean
    ev_r_all, ev_s_all = [], []
    ev_r_clean, ev_s_clean = [], []
    ev_s_math, ev_delt = [], []
    ev_n_win, ev_n_limit, ev_n_nomove = [], [], []

    # Старт эпизода
    obs, _ = env.reset()
    ep_reward = 0.0
    ep_steps = 0
    global_step = 0
    figs = None

    for step in trange(total_timesteps):
        # 1) обновляем ε
        agent.epsilon = linear_epsilon(step)

        # 2) выбор действия ε-greedy (без маски)
        action = agent.sample_actions(obs[None])[0]

        # 3) шаг в среде
        next_obs, reward, done, truncated, _ = env.step(action)
        replay_buffer.add(obs, action, reward, next_obs, done or truncated)
        obs = next_obs

        ep_reward += reward
        ep_steps += 1
        global_step += 1

        if done or truncated:
            rewards_history.append(ep_reward)
            steps_history.append(ep_steps)
            ep_reward = 0.0
            ep_steps = 0
            obs, _ = env.reset()

        # 4) обучение
        if global_step % train_freq == 0 and len(replay_buffer) >= batch_size:
            batch = replay_buffer.sample(batch_size)
            loss = compute_vanilla_dqn_loss(agent, env, batch, gamma, device)
            agent.optimizer.zero_grad()
            loss.backward()
            nn.utils.clip_grad_norm_(agent.parameters(), max_grad_norm)
            agent.optimizer.step()

        # 5) обновление target
        if global_step % target_update_interval == 0:
            agent.update_target()

        # 6) оценка
        if global_step % evaluate_interval == 0:
            (mean_r_all,  mean_s_all,
             mean_r_clean, mean_s_clean,
             mean_s_math, delta_steps,
             n_win, n_limit, n_no_moves) = evaluate_by_mean(
                env_eval, agent, n_games=100, bfs_timeout=100_000
            )

            ev_r_all  .append(mean_r_all)
            ev_s_all  .append(mean_s_all)
            ev_r_clean.append(mean_r_clean)
            ev_s_clean.append(mean_s_clean)
            ev_s_math .append(mean_s_math)
            ev_delt   .append(delta_steps)
            ev_n_win  .append(n_win)
            ev_n_limit.append(n_limit)
            ev_n_nomove.append(n_no_moves)

            figs = draw_graphs(
                ev_r_all, ev_s_all,
                ev_r_clean, ev_s_clean,
                ev_s_math, ev_delt,
                ev_n_win, ev_n_limit, ev_n_nomove,
                evaluate_interval, total_timesteps
            )

    return agent, rewards_history, steps_history, figs


In [16]:
import random
import numpy as np
import torch
import matplotlib.pyplot as plt

# Список моделей (N_K_L)
filenames = [

    "5_1_4", "7_2_4", "7_2_5",

]

total_timesteps     = 500_000
evaluate_interval   = total_timesteps // 10

for name in filenames:
    n, k, l = map(int, name.split('_'))

    # фиксация сидов для повторимости
    random.seed(42)
    np.random.seed(42)
    torch.manual_seed(42)
    torch.cuda.manual_seed_all(42)
    torch.backends.cudnn.deterministic = True

    # создаём среду и обёртку
    max_steps_per_game = 100
    base_env      = WaterSortEnvFixed(n, l, num_empty=k, num_colors=n-k, max_steps=max_steps_per_game)
    env           = DiscreteActionWrapper(base_env)
    base_env_eval = WaterSortEnvFixed(n, l, num_empty=k, num_colors=n-k, max_steps=max_steps_per_game)
    env_eval      = DiscreteActionWrapper(base_env_eval)

    device = torch.device(
        "cuda" if torch.cuda.is_available()
        else "mps"   if torch.backends.mps.is_available()
        else "cpu"
    )
    print(f"[{name}] Using device: {device}")

    # создаём ванильного агента
    lr      = 1e-4
    obs_dim = env.observation_space.shape[0]
    act_dim = env.action_space.n

    agent = VanillaDQNAgent(
        state_dim = obs_dim,
        action_dim= act_dim,
        net_arch  = [(n*(n-1))*25, (n*(n-1))*10],
        lr        = lr,
        device    = device
    )

    # запускам обучение
    agent, rewards_history, steps_history, fig = train_vanilla_dqn(
        env,
        env_eval,
        agent,
        total_timesteps   = total_timesteps,
        device            = device,
        evaluate_interval = evaluate_interval
    )

    # сохраняем веса и последний график
    model_path = f"relevante/unmasked/dqn_{n}_{k}_{l}_{total_timesteps}.pth"
    torch.save(agent.state_dict(), model_path)

    if fig is not None:
        out_png = f"graphs/unmasked/dqn_{n}_{k}_{l}_{total_timesteps}.png"
        fig.savefig(out_png)
        plt.close(fig)

    print(f"[{name}] done, model → {model_path}\n")


100%|██████████| 500000/500000 [10:00<00:00, 832.40it/s]

[7_2_5] done, model → relevante/unmasked/dqn_7_2_5_500000.pth




