In [91]:
# pip install gymnasium
from __future__ import annotations
import random
import string
from itertools import product
from typing import Dict, List, Optional, Tuple

import gymnasium as gym
from gymnasium import spaces


class ARShotEnv(gym.Env):
    """
    Associative Retrieval (AR) with a 'shot' query.

    Поток токенов:
      - shot_mode="after_pairs":
          [! k : v !] x P  +  [! query_key : shot]          → длина T = 5*P + 4
      - shot_mode="after_any_colon":
          [! k : v !] x P  +  [! k : v !] x E + [! k : shot] → T = 5*(P+E) + 4
        (query_key ранее уже встречался как полная пара)

    Наблюдение — один токен за шаг.
    Награда выдаётся только, когда текущий токен == 'shot':
        reward = 1, если действие == правильному value; иначе 0. Эпизод завершается.

    Важные флаги:
      - deterministic_vocab=True  → порядок универсума фиксирован (не зависит от seed)
      - full_universe_vocab=True → в env.vocab добавляется весь универсум токенов по длинам
      - randomize_pairs=True     → ключи и значения для ЭПИЗОДА случайны (но из фикс. универсума)
      - include_pass_token=True  → добавить 'pass' к спец-токенам (можно использовать как no-op)


    # Отображение токенов в ID и обратно
    obs_id = env.token_to_id["zA"]   # например 1287
    print(obs_id) # 1287
    tok = env.id_to_token[obs_id]    # вернёт "zA"
    print(tok) # "zA"
    """

    metadata = {"render_modes": []}

    def __init__(
        self,
        n_pairs: int = 6,
        rng_seed: Optional[int] = None,

        # где появится shot в смысле "сколько полных пар точно показать сначала"
        prefix_pairs_range: Optional[Tuple[int, int]] = None,  # по умолчанию (1, n_pairs)
        query_from_any_shown: bool = True,  # иначе берём последний из показанных

        shot_mode: str = "after_pairs",  # "after_pairs" | "after_any_colon"
        max_extra_pairs_before_shot: int = 0,  # только для "after_any_colon"

        # словари (если None — берём из универсума согласно режимам ниже)
        keys_vocab: Optional[List[str]] = None,
        values_vocab: Optional[List[str]] = None,

        # диапазоны длин токенов (включительно); чаще всего (2,2)
        key_token_len_range: Tuple[int, int] = (2, 2),
        value_token_len_range: Tuple[int, int] = (2, 2),

        # алфавиты для НЕДЕТЕРМИНИРОВАННОЙ генерации
        key_charset: str = string.ascii_letters + string.digits,
        value_charset: str = string.ascii_letters + string.digits,

        # управление словарём и его стабильностью
        deterministic_vocab: bool = True,
        full_universe_vocab: bool = True,
        randomize_pairs: bool = True,
        include_pass_token: bool = False,
    ):
        super().__init__()

        # RNG для динамики эпизодов и (опционально) случайного выбора пар
        self.rng = random.Random(rng_seed)

        # --- проверки параметров ---
        assert n_pairs >= 1, "n_pairs must be >= 1"
        if prefix_pairs_range is None:
            prefix_pairs_range = (1, n_pairs)
        min_p, max_p = prefix_pairs_range
        if not (1 <= min_p <= max_p <= n_pairs):
            raise ValueError("prefix_pairs_range must satisfy 1 <= min <= max <= n_pairs")
        if shot_mode not in ("after_pairs", "after_any_colon"):
            raise ValueError("shot_mode must be 'after_pairs' or 'after_any_colon'")

        self.n_pairs = n_pairs
        self.prefix_pairs_range = (min_p, max_p)
        self.query_from_any_shown = query_from_any_shown
        self.shot_mode = shot_mode
        self.max_extra_pairs_before_shot = max(0, int(max_extra_pairs_before_shot))

        # ---- SPECIAL tokens
        self.SPECIAL = ["!", ":", "shot", "pass"]
        if include_pass_token:
            self.SPECIAL.append("pass")
        reserved = set(self.SPECIAL)

        # ---- детерминированный универсум токенов по диапазону длин
        def det_tokens_for_range(length_range: Tuple[int, int]) -> List[str]:
            """
            Генерирует все токены в лексикографическом порядке по фиксированному алфавиту:
              digits + lowercase + uppercase = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
            Для длины L: перебор product(charset, repeat=L).
            """
            lo, hi = length_range
            if lo < 1 or hi < lo:
                raise ValueError("length_range must satisfy 1 <= lo <= hi")
            charset = "0123456789" + string.ascii_lowercase + string.ascii_uppercase
            out: List[str] = []
            for L in range(lo, hi + 1):
                for tup in product(charset, repeat=L):
                    out.append("".join(tup))
            # спец-строк тут нет, но фильтр оставим
            return [t for t in out if t not in reserved]

        # ---- случайная генерация токенов (используется только если deterministic_vocab=False и словари не заданы)
        def random_tokens(need: int, charset: str, length_range: Tuple[int, int], avoid: set[str]) -> List[str]:
            lo, hi = length_range
            if lo < 1 or hi < lo:
                raise ValueError("length_range must satisfy 1 <= lo <= hi")
            tokens: List[str] = []
            seen = set(avoid)
            attempts = 0
            while len(tokens) < need:
                attempts += 1
                L = self.rng.randint(lo, hi)
                cand = "".join(self.rng.choice(charset) for _ in range(L))
                if cand and cand not in seen:
                    tokens.append(cand)
                    seen.add(cand)
                if attempts > 100_000:
                    raise RuntimeError("Failed to generate enough unique random tokens; enlarge charset/lengths.")
            return tokens

        # ---- строим словари источников для выбора пар в эпизодах (keys_vocab/values_vocab) ----
        if keys_vocab is not None:
            seen = set()
            keys_vocab = [t for t in keys_vocab if (t not in reserved) and (t not in seen and not seen.add(t))]
            if len(keys_vocab) < n_pairs:
                raise ValueError("Provided keys_vocab has fewer unique tokens than n_pairs.")
        if values_vocab is not None:
            seen = set()
            values_vocab = [t for t in values_vocab if (t not in reserved) and (t not in seen and not seen.add(t))]

        # если словари не заданы — берём из универсума согласно режимам
        if keys_vocab is None or values_vocab is None:
            if deterministic_vocab:
                key_universe = det_tokens_for_range(key_token_len_range)
                val_universe = det_tokens_for_range(value_token_len_range)

                if randomize_pairs:
                    # случайно выбираем пары (но из фиксированного универсума)
                    if len(key_universe) < n_pairs:
                        raise ValueError("Not enough deterministic tokens for keys.")
                    keys_vocab = self.rng.sample(key_universe, n_pairs)

                    key_set = set(keys_vocab)
                    val_candidates = [t for t in val_universe if t not in key_set]
                    if len(val_candidates) < n_pairs:
                        raise ValueError("Not enough deterministic tokens for values after excluding keys.")
                    values_vocab = self.rng.sample(val_candidates, n_pairs)
                else:
                    # берём первые n_pairs (фиксировано; пары не рандомизируются)
                    if len(key_universe) < n_pairs:
                        raise ValueError("Not enough deterministic tokens for keys.")
                    keys_vocab = key_universe[:n_pairs]

                    key_set = set(keys_vocab)
                    val_candidates = [t for t in val_universe if t not in key_set]
                    if len(val_candidates) < n_pairs:
                        raise ValueError("Not enough deterministic tokens for values after excluding keys.")
                    values_vocab = val_candidates[:n_pairs]
            else:
                # полностью случайные словари (не фиксированный универсум/порядок)
                if keys_vocab is None:
                    keys_vocab = random_tokens(
                        need=n_pairs, charset=key_charset, length_range=key_token_len_range, avoid=reserved
                    )
                avoid_for_values = reserved | set(keys_vocab)
                if values_vocab is None:
                    values_vocab = random_tokens(
                        need=n_pairs, charset=value_charset, length_range=value_token_len_range, avoid=avoid_for_values
                    )

        # финальные проверки
        if set(keys_vocab) & set(values_vocab):
            raise ValueError("keys_vocab and values_vocab must be disjoint.")
        if len(keys_vocab) < n_pairs or len(values_vocab) < n_pairs:
            raise ValueError("Not enough tokens in keys_vocab/values_vocab for n_pairs.")

        self.keys_vocab = list(keys_vocab)
        self.values_vocab = list(values_vocab)

        # ---- строим env.vocab (пространство наблюдений/действий) ----
        if deterministic_vocab and full_universe_vocab:
            U_keys = det_tokens_for_range(key_token_len_range)
            U_vals = det_tokens_for_range(value_token_len_range)
            # объединяем в стабильном порядке: сначала U_keys, затем добавляем из U_vals всё, чего нет в U_keys
            universe = U_keys + [t for t in U_vals if t not in set(U_keys)]
            self.vocab = self.SPECIAL + universe
        else:
            # компактный словарь: только спец + выбранные ключи/значения
            self.vocab = self.SPECIAL + self.keys_vocab + self.values_vocab

        self.token_to_id = {tok: i for i, tok in enumerate(self.vocab)}
        self.id_to_token = {i: tok for tok, i in self.token_to_id.items()}

        # gym spaces
        self.observation_space = spaces.Discrete(len(self.vocab))
        self.action_space = spaces.Discrete(len(self.vocab))

        # состояние эпизода
        self._tokens: List[int] = []
        self._ptr: int = 0
        self._query_key: Optional[str] = None
        self._mapping: Dict[str, str] = {}

    # ---------- helpers ----------
    def _tok(self, s: str) -> int:
        return self.token_to_id[s]

    def _append_full_pair_tokens(self, stream: List[str], key: str):
        """Добавить токены полной пары: ! key : value !"""
        stream += ["!", key, ":", self._mapping[key], "!"]

    def _build_after_pairs(self) -> List[str]:
        # сэмплируем n_pairs уникальных ключей и значений из словарей-источников
        keys = self.rng.sample(self.keys_vocab, self.n_pairs)
        values = self.rng.sample(self.values_vocab, self.n_pairs)
        self._mapping = {k: v for k, v in zip(keys, values)}

        min_p, max_p = self.prefix_pairs_range
        shown_pairs = self.rng.randint(min_p, max_p)
        shown_order = self.rng.sample(keys, shown_pairs)

        stream: List[str] = []
        for k in shown_order:
            self._append_full_pair_tokens(stream, k)

        self._query_key = self.rng.choice(shown_order) if self.query_from_any_shown else shown_order[-1]
        stream += ["!", self._query_key, ":", "shot"]
        return stream

    def _build_after_any_colon(self) -> List[str]:
        keys = self.rng.sample(self.keys_vocab, self.n_pairs)
        values = self.rng.sample(self.values_vocab, self.n_pairs)
        self._mapping = {k: v for k, v in zip(keys, values)}

        min_p, max_p = self.prefix_pairs_range
        min_p = max(1, min_p)  # нужен хотя бы один k:v, чтобы было что вспоминать
        shown_pairs = self.rng.randint(min_p, max_p)

        shown_order = self.rng.sample(keys, shown_pairs)

        stream: List[str] = []
        for k in shown_order:
            self._append_full_pair_tokens(stream, k)

        # ключ для запроса из показанных
        self._query_key = self.rng.choice(shown_order) if self.query_from_any_shown else shown_order[-1]

        # опциональные дополнительные пары перед повторным появлением ключа
        remaining_keys = [k for k in keys if k not in shown_order]
        extra_cap = min(self.max_extra_pairs_before_shot, len(remaining_keys))
        extra_pairs = self.rng.randint(0, extra_cap)
        self.rng.shuffle(remaining_keys)
        for k in remaining_keys[:extra_pairs]:
            self._append_full_pair_tokens(stream, k)

        # повторный показ query_key, но вместо value → 'shot'
        stream += ["!", self._query_key, ":", "shot"]
        return stream

    def _build_episode(self):
        if self.shot_mode == "after_pairs":
            stream = self._build_after_pairs()
        else:
            stream = self._build_after_any_colon()
        self._tokens = [self._tok(s) for s in stream]
        self._ptr = 0

    # ---------- Gym API ----------
    def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
        if seed is not None:
            self.rng.seed(seed)
        self._build_episode()
        obs = self._tokens[self._ptr]
        info = {
            "mapping": self._mapping.copy(),
            "query_key": self._query_key,
            "vocab": self.vocab,
        }
        return obs, info

    def step(self, action: int):
        assert 0 <= self._ptr < len(self._tokens), "Episode finished. Call reset()."

        cur_tok_id = self._tokens[self._ptr]
        cur_tok = self.id_to_token[cur_tok_id]

        reward = 0.0
        terminated = False
        truncated = False

        if cur_tok == "shot":
            correct_value = self._mapping[self._query_key]
            reward = 1.0 if action == self._tok(correct_value) else 0.0
            terminated = True

        self._ptr += 1
        if self._ptr >= len(self._tokens):
            terminated = True

        obs = self._tok("pass") if (terminated or truncated) else self._tokens[self._ptr]
        info = {
            "query_key": self._query_key,
            "correct_value": self._mapping[self._query_key],
            "was_shot_step": (cur_tok == "shot"),
        }
        return obs, reward, terminated, truncated, info

    # ---------- utils ----------
    def decode_stream(self) -> List[str]:
        return [self.id_to_token[t] for t in self._tokens]

    def render(self):
        print(" ".join(self.decode_stream()))


In [97]:
#  Размер словаря: 4 (SPECIAL: '!', ':', 'shot', 'eos') + 62*62 = 3848
# (Если include_pass_token=True, будет 3849.
env = ARShotEnv(
    n_pairs=20, 
    rng_seed=None,
    shot_mode="after_pairs", # "after_any_colon" -> shot после любого ключа; "after_pairs" -> shot после последней пары
    key_token_len_range=(2, 2),
    value_token_len_range=(2, 2),
    deterministic_vocab=True,
    full_universe_vocab=True,    # <— ВАЖНО
    randomize_pairs=True,          # ВАЖНО: пары выбираем случайно из универсума
    include_pass_token=True
)

"""
T = 5P+4 if shot_mode="after_pairs"
T = 5(P+E)+4 if shot_mode="after_any_colon"
"""

obs, info = env.reset()
print("Vocab size:", env.observation_space.n)  # 4 спец + 80 keys + 80 values = 164
print("Obs.space:", env.observation_space)
print("Action space:", env.action_space)
print("Query key:", info["query_key"])
print("Stream:", env.decode_stream())
print("Stream:", "".join(env.decode_stream()))
print("Len stream:", len(env.decode_stream()))
print("Total number of tokens:", len(env.decode_stream()))

done = False
total = 0
t = 0

print(f"t={t:<3} obs={obs:<3} token='{env.id_to_token[obs]}'")
while not done:
    tok = env.id_to_token[obs]
    if tok == "shot":
        act_tok = info["mapping"][info["query_key"]]
    else:
        act_tok = "pass"
    obs, r, done, _, _ = env.step(env.token_to_id[act_tok])
    total += r
    t += 1
    obs_token = env.id_to_token[obs] if obs in env.id_to_token else "pass"
    print(f"t={t:<3} obs={obs:<6} token='{obs_token}' \tact_tok='{act_tok}' \tr={r}")
print("Reward:", total)

Vocab size: 3849
Obs.space: Discrete(3849)
Action space: Discrete(3849)
Query key: jZ
Stream: ['!', 'R4', ':', 'Ba', '!', '!', 'zH', ':', 'kR', '!', '!', '3L', ':', 'dX', '!', '!', 'Wi', ':', 'gL', '!', '!', 'CK', ':', 'Nd', '!', '!', 'b8', ':', '0A', '!', '!', 'xw', ':', 'Uj', '!', '!', 'jZ', ':', 'EN', '!', '!', 'QJ', ':', 'cL', '!', '!', 'wX', ':', 'Bt', '!', '!', 'Np', ':', 'Cb', '!', '!', 'zN', ':', 'vI', '!', '!', 'PO', ':', '6u', '!', '!', 'DD', ':', 'hZ', '!', '!', 'jZ', ':', 'shot']
Stream: !R4:Ba!!zH:kR!!3L:dX!!Wi:gL!!CK:Nd!!b8:0A!!xw:Uj!!jZ:EN!!QJ:cL!!wX:Bt!!Np:Cb!!zN:vI!!PO:6u!!DD:hZ!!jZ:shot
Len stream: 74
Total number of tokens: 74
t=0   obs=0   token='!'
t=1   obs=3295   token='R4' 	act_tok='pass' 	r=0.0
t=2   obs=1      token=':' 	act_tok='pass' 	r=0.0
t=3   obs=2309   token='Ba' 	act_tok='pass' 	r=0.0
t=4   obs=0      token='!' 	act_tok='pass' 	r=0.0
t=5   obs=0      token='!' 	act_tok='pass' 	r=0.0
t=6   obs=2218   token='zH' 	act_tok='pass' 	r=0.0
t=7   obs=1      to

In [81]:
len(env.vocab)

3849

In [82]:
info["mapping"]

{'qY': '9O',
 'Am': 'eL',
 'V4': 'OB',
 'uo': 'eG',
 '2v': 'pa',
 'Hk': 'My',
 'VR': '5L',
 'OO': 'q4',
 'E6': 'Eo',
 'fF': 'Tq',
 '0g': 'sf',
 'fs': 'KI',
 'VJ': 'D2',
 'zl': '6Y',
 '5Z': 'Kv',
 'iz': 'Fm',
 '41': '88',
 'ZR': 'bd',
 'AO': '3n',
 'gp': 's0'}

In [83]:
env.keys_vocab

['E6',
 'zl',
 'iz',
 '2v',
 'VJ',
 '41',
 'Hk',
 'ZR',
 'VR',
 'uo',
 'gp',
 'OO',
 'qY',
 'V4',
 '0g',
 'Am',
 'fF',
 'AO',
 '5Z',
 'fs']

In [84]:
obs_id = env.token_to_id["pass"]   # например 1287
print(obs_id)
tok = env.id_to_token[obs_id]    # вернёт "zA"
print(tok)

4
pass
