Code 1 1   The class gym.Env
https://github.com/openai/gym/blob/master/gym/core.py

In [None]:
#

import gym
from gym import spaces
from gym.utils import seeding
from gym.logger import deprecation
from gym.utils.seeding import RandomNumberGenerator

ObsType = TypeVar("ObsType")
ActType = TypeVar("ActType")


class Env(Generic[ObsType, ActType]):
    
    # Set this in SOME subclasses
    metadata = {"render_modes": []}
    render_mode = None  # define render_mode if your environment supports rendering
    reward_range = (-float("inf"), float("inf"))
    spec: "EnvSpec" = None

    # Set these in ALL subclasses
    action_space: spaces.Space[ActType]
    observation_space: spaces.Space[ObsType]

    # Created
    _np_random: RandomNumberGenerator | None = None

    @property
    def np_random(self) -> RandomNumberGenerator:
        """Initializes the np_random field if not done already."""
        if self._np_random is None:
            self._np_random, seed = seeding.np_random()
        return self._np_random

    @np_random.setter
    def np_random(self, value: RandomNumberGenerator):
        self._np_random = value

    def step(self, action: ActType) -> Union[
        Tuple[ObsType, float, bool, bool, dict], Tuple[ObsType, float, bool, dict]
    ]:
        """Run one timestep of the environment's dynamics."""
        raise NotImplementedError

    def reset(self, *, seed: Optional[int] = None, return_info: bool = False,
        options: Optional[dict] = None,
    ) -> Union[ObsType, tuple[ObsType, dict]]:
        """Resets the environment to an initial state and returns an initial
        observation."""
        if seed is not None:
            self._np_random, seed = seeding.np_random(seed)

    def render(self) -> Optional[Union[RenderFrame, List[RenderFrame]]]:
        """Renders the environment."""
        raise NotImplementedError

    def close(self):
        """Override close in your subclass to perform any necessary cleanup."""
        pass

    @property
    def unwrapped(self) -> Env:
        """Completely unwrap this env."""
        return self

    def __str__(self):
        if self.spec is None:
            return f"<{type(self).__name__} instance>"
        else:
            return f"<{type(self).__name__}<{self.spec.id}>>"

    def __enter__(self):
        """Support with-statement for the environment."""
        return self

    def __exit__(self, *args):
        """Support with-statement for the environment."""
        self.close()
        return False

#

Code 1 2   The class gym.spaces.Space
https://github.com/openai/gym/blob/master/gym/spaces/space.py

In [None]:
#

from gym.utils import seeding

T_cov = TypeVar("T_cov", covariant=True)


class Space(Generic[T_cov]):
    def __init__(
        self,
        shape: Optional[Sequence[int]] = None,
        dtype: Optional[Type | str] = None,
        seed: Optional[int] = None,
    ):
        self._shape = None if shape is None else tuple(shape)
        self.dtype = None if dtype is None else np.dtype(dtype)
        self._np_random = None
        if seed is not None:
            self.seed(seed)

    @property
    def np_random(self) -> seeding.RandomNumberGenerator:
        """Lazily seed the rng since this is expensive and only needed if
        sampling from this space."""
        if self._np_random is None:
            self.seed()
        return self._np_random

    @property
    def shape(self) -> Optional[tuple[int, ...]]:
        """Return the shape of the space as an immutable property"""
        return self._shape

    def sample(self) -> T_cov:
        """Randomly sample an element of this space."""
        raise NotImplementedError

    def seed(self, seed: Optional[int] = None) -> list:
        """Seed the PRNG of this space."""
        self._np_random, seed = seeding.np_random(seed)
        return [seed]

    def contains(self, x) -> bool:
        """Return boolean specifying if x is a valid member of this space."""
        raise NotImplementedError

    def __contains__(self, x) -> bool:
        return self.contains(x)

    def __setstate__(self, state: Iterable | Mapping):
        state = dict(state)

        if "shape" in state:
            state["_shape"] = state["shape"]
            del state["shape"]
        if "np_random" in state:
            state["_np_random"] = state["np_random"]
            del state["np_random"]

        # Update our state
        self.__dict__.update(state)

    def to_jsonable(self, sample_n: Sequence[T_cov]) -> list:
        """Convert a batch of samples from this space to a JSONable data type."""
        return list(sample_n)

    def from_jsonable(self, sample_n: list) -> list[T_cov]:
        """Convert a JSONable data type to a batch of samples from this space."""
        return sample_n

#

Code 1 3   The class gym.spaces.Box
https://github.com/openai/gym/blob/master/gym/spaces/box.py

In [None]:
#

from .space import Space
from gym import logger

class Box(Space[np.ndarray]):

    def __init__(
        self,
        low: Union[SupportsFloat, np.ndarray],
        high: Union[SupportsFloat, np.ndarray],
        shape: Optional[Sequence[int]] = None,
        dtype: Type = np.float32,
        seed: Optional[int] = None,
    ):
        self.dtype = np.dtype(dtype)
        if shape is not None:
            shape = tuple(shape)
        elif not np.isscalar(low):
            shape = low.shape
        elif not np.isscalar(high):
            shape = high.shape
        else:
            raise ValueError("shape must be provided from the shapes of low or high")
        
        # Capture the boundedness information before replacing np.inf with get_inf
        _low = np.full(shape, low, dtype=float) if np.isscalar(low) else low
        self.bounded_below = -np.inf < _low
        _high = np.full(shape, high, dtype=float) if np.isscalar(high) else high
        self.bounded_above = np.inf > _high

        low = _broadcast(low, dtype, shape, inf_sign="-")
        high = _broadcast(high, dtype, shape, inf_sign="+")
        
        self._shape: Tuple[int, ...] = shape

        self.low = self.low.astype(self.dtype)
        self.high = self.high.astype(self.dtype)
        self.low_repr = _short_repr(self.low)
        self.high_repr = _short_repr(self.high)
        super(Box, self).__init__(self.shape, self.dtype)

    @property
    def shape(self) -> Tuple[int, ...]:
        """Has stricter type than gym.Space - never None."""
        return self._shape

    def is_bounded(self, manner: str = "both") -> bool:
        below = np.all(self.bounded_below)
        above = np.all(self.bounded_above)
        if manner == "both":
            return below and above
        elif manner == "below":
            return below
        elif manner == "above":
            return above
        else:
            raise ValueError("manner is not in {'below', 'above', 'both'}")

    def sample(self) -> np.ndarray:
        high = self.high if self.dtype.kind == "f" else self.high.astype("int64") + 1
        sample = np.empty(self.shape)
        
        # Masking arrays which classify the coordinates according to interval
        # type
        unbounded = ~self.bounded_below & ~self.bounded_above
        upp_bounded = ~self.bounded_below & self.bounded_above
        low_bounded = self.bounded_below & ~self.bounded_above
        bounded = self.bounded_below & self.bounded_above
        
        # Vectorized sampling by interval type
        sample[unbounded] = self.np_random.normal(
            size=unbounded[unbounded].shape
        )
        sample[low_bounded] = (
            self.np_random.exponential(size=low_bounded[low_bounded].shape)
            + self.low[low_bounded]
        )
        sample[upp_bounded] = (
            -self.np_random.exponential(size=upp_bounded[upp_bounded].shape)
            + self.high[upp_bounded]
        )
        sample[bounded] = self.np_random.uniform(
            low=self.low[bounded], high=high[bounded],
            size=bounded[bounded].shape
        )
        if self.dtype.kind == 'i':
            sample = np.floor(sample)
        return sample.astype(self.dtype)

    def contains(self, x) -> bool:
        if not isinstance(x, np.ndarray):
            x = np.asarray(x, dtype=self.dtype)
        return bool(
            np.can_cast(x.dtype, self.dtype)
            and x.shape == self.shape
            and np.all(x >= self.low)
            and np.all(x <= self.high)
        )

    def to_jsonable(self, sample_n):
        return np.array(sample_n).tolist()

    def from_jsonable(self, sample_n: Sequence[SupportsFloat]) -> list[np.ndarray]:
        return [np.asarray(sample) for sample in sample_n]

    def __repr__(self) -> str:
        return f"Box({self.low_repr}, {self.high_repr}, {self.shape}, {self.dtype})"
    
    def __eq__(self, other) -> bool:
        return (
            isinstance(other, Box)
            and (self.shape == other.shape)
            and np.allclose(self.low, other.low)
            and np.allclose(self.high, other.high)
        )
    
def get_precision(dtype) -> SupportsFloat:
    if np.issubdtype(dtype, np.floating):
        return np.finfo(dtype).precision
    else:
        return np.inf


def _broadcast(
    value: Union[SupportsFloat, np.ndarray],
    dtype,
    shape: tuple[int, ...],
    inf_sign: str,
) -> np.ndarray:
    """handle infinite bounds and broadcast at the same time if needed"""
    if np.isscalar(value):
        value = get_inf(dtype, inf_sign) if np.isinf(value) else value  # type: ignore
        value = np.full(shape, value, dtype=dtype)
    else:
        assert isinstance(value, np.ndarray)
        if np.any(np.isinf(value)):
            # create new array with dtype, but maintain old one to preserve np.inf
            temp = value.astype(dtype)
            temp[np.isinf(value)] = get_inf(dtype, inf_sign)
            value = temp
    return value

#

Code 1 4   The class gym.spaces.Discrete
https://github.com/openai/gym/blob/master/gym/spaces/discrete.py

In [None]:
#

from .space import Space


class Discrete(Space[int]):
    
    def __init__(self, n: int, seed: Optional[int] = None, start: int = 0):
        self.n = int(n)
        self.start = int(start)
        super().__init__((), np.int64, seed)

    def sample(self) -> int:
        return int(self.start + self.np_random.integers(self.n))

    def contains(self, x) -> bool:
        if isinstance(x, int):
            as_int = x
        elif isinstance(x, (np.generic, np.ndarray)) and (
            x.dtype.char in np.typecodes["AllInteger"] and x.shape == ()
        ):
            as_int = int(x)  # type: ignore
        else:
            return False
        return self.start <= as_int < self.start + self.n

    def __repr__(self) -> str:
        if self.start != 0:
            return "Discrete(%d, start=%d)" % (self.n, self.start)
        return "Discrete(%d)" % self.n

    def __eq__(self, other) -> bool:
        return (
            isinstance(other, Discrete)
            and self.n == other.n
            and self.start == other.start
        )
    
    def __setstate__(self, state):
        super().__setstate__(state)
        state = dict(state)
        if "start" not in state:
            state["start"] = 0
        self.__dict__.update(state)

#

Code 1 5   The class gym.Wrapper
https://github.com/openai/gym/blob/master/gym/core.py

In [None]:
#

class Wrapper(Env[ObsType, ActType]):
    
    def __init__(self, env: Env):
        self.env = env
        self._action_space: spaces.Space | None = None
        self._observation_space: spaces.Space | None = None
        self._reward_range: tuple[SupportsFloat, SupportsFloat] | None = None
        self._metadata: dict | None = None

    def __getattr__(self, name):
        return getattr(self.env, name)

    @property
    def spec(self):
        return self.env.spec

    @classmethod
    def class_name(cls):
        return cls.__name__

    @property
    def action_space(self) -> spaces.Space[ActType]:
        if self._action_space is None:
            return self.env.action_space
        return self._action_space

    @action_space.setter
    def action_space(self, space):
        self._action_space = space

    @property
    def observation_space(self) -> spaces.Space:
        if self._observation_space is None:
            return self.env.observation_space
        return self._observation_space

    @observation_space.setter
    def observation_space(self, space):
        self._observation_space = space

    @property
    def reward_range(self) -> tuple[SupportsFloat, SupportsFloat]:
        if self._reward_range is None:
            return self.env.reward_range
        return self._reward_range

    @reward_range.setter
    def reward_range(self, value):
        self._reward_range = value

    @property
    def metadata(self) -> dict:
        if self._metadata is None:
            return self.env.metadata
        return self._metadata

    @metadata.setter
    def metadata(self, value):
        self._metadata = value

    def step(self, action: ActType) -> Tuple[ObsType, float, bool, dict]:
        return self.env.step(action)

    def reset(self, **kwargs) -> Union[ObsType, tuple[ObsType, dict]]:
        return self.env.reset(**kwargs)

    def render(self, **kwargs):
        return self.env.render(mode, **kwargs)

    def close(self):
        return self.env.close()

    def seed(self, seed=None):
        return self.env.seed(seed)

    def __str__(self):
        return f"<{type(self).__name__}{self.env}>"

    def __repr__(self):
        return str(self)

    @property
    def unwrapped(self) -> Env:
        return self.env.unwrapped

#

In [None]:
Code 1 6   The class gym.wrappers.TimeLimit
https://github.com/openai/gym/blob/master/gym/wrappers/time_limit.py

In [None]:
#

import gym

class TimeLimit(gym.Wrapper):
    
    def __init__(self, env, max_episode_steps=None):
        super(TimeLimit, self).__init__(env)
        if max_episode_steps is None and self.env.spec is not None:
            max_episode_steps = env.spec.max_episode_steps
        if self.env.spec is not None:
            self.env.spec.max_episode_steps = max_episode_steps
        self._max_episode_steps = max_episode_steps
        self._elapsed_steps = None

    def step(self, action):
        observation, reward, terminated, truncated, info = step_api_compatibility(
            self.env.step(action),
            True,
        )
        self._elapsed_steps += 1
        if self._elapsed_steps >= self._max_episode_steps:
            done = True
        return step_api_compatibility(
            (observation, reward, terminated, truncated, info),
            self.new_step_api,
        )

    def reset(self, **kwargs):
        self._elapsed_steps = 0
        return self.env.reset(**kwargs)

#

Code 4 1   The class gym.spaces.Tuple
https://github.com/openai/gym/blob/master/gym/spaces/tuple.py

In [None]:
#

from .space import Space

class Tuple(Space[tuple], Sequence):
    
    def __init__(
        self, spaces: Iterable[Space], seed: Optional[Union[int, List[int]]] = None
    ):
        self.spaces = spaces
        super(Tuple, self).__init__(None, None)

    def seed(self, seed: Optional[Union[int, List[int]]] = None) -> list:
        seeds = []

        if isinstance(seed, list):
            for i, space in enumerate(self.spaces):
                seeds += space.seed(seed[i])
        elif isinstance(seed, int):
            seeds = super().seed(seed)
            try:
                subseeds = self.np_random.choice(
                    np.iinfo(int).max,
                    size=len(self.spaces),
                    replace=False,  # unique subseed for each subspace
                )
            except ValueError:
                subseeds = self.np_random.choice(
                    np.iinfo(int).max,
                    size=len(self.spaces),
                    replace=True,  # we get more than INT_MAX subspaces
                )

            for subspace, subseed in zip(self.spaces, subseeds):
                seeds.append(subspace.seed(int(subseed))[0])
        elif seed is None:
            for space in self.spaces:
                seeds += space.seed(seed)
        else:
            raise TypeError("Passed seed not of an expected type: list or int or None")

        return seeds

    def sample(self) -> tuple:
        return tuple(space.sample() for space in self.spaces)

    def contains(self, x) -> bool:
        if isinstance(x, (list, np.ndarray)):
            x = tuple(x)  # Promote list and ndarray to tuple for contains check
        return (
            isinstance(x, tuple)
            and len(x) == len(self.spaces)
            and all(space.contains(part) for (space, part) in zip(self.spaces, x))
        )

    def __repr__(self) -> str:
        return "Tuple(" + ", ".join([str(s) for s in self.spaces]) + ")"

    def to_jsonable(self, sample_n) -> list:
        # serialize as list-repr of tuple of vectors
        return [
            space.to_jsonable([sample[i] for sample in sample_n])
            for i, space in enumerate(self.spaces)
        ]

    def from_jsonable(self, sample_n) -> list:
        return [
            sample
            for sample in zip(
                *[
                    space.from_jsonable(sample_n[i])
                    for i, space in enumerate(self.spaces)
                ]
            )
        ]

    def __getitem__(self, index: int) -> Space:
        return self.spaces[index]

    def __len__(self) -> int:
        return len(self.spaces)

    def __eq__(self, other) -> bool:
        return isinstance(other, Tuple) and self.spaces == other.spaces

#

Code 11 2   The class gym.wrappers.TransformReward
https://github.com/openai/gym/blob/master//gym/wrappers/transform_reward.py

In [None]:
#

from gym import RewardWrapper


class TransformReward(RewardWrapper):

    def __init__(self, env, f):
        super().__init__(env)
        assert callable(f)
        self.f = f

    def reward(self, reward):
        return self.f(reward)

#

Code 12 2   The class AtariPreprocessing
https://github.com/openai/gym/blob/master/gym/wrappers/atari_preprocessing.py

In [None]:
#

from gym.spaces import Box


class AtariPreprocessing(gym.Wrapper):

    def __init__(
        self,
        env: gym.Env,
        noop_max: int = 30,
        frame_skip: int = 4,
        screen_size: int = 84,
        terminal_on_life_loss: bool = False,
        grayscale_obs: bool = True,
        grayscale_newaxis: bool = False,
        scale_obs: bool = False,
    ):
        super().__init__(env)
        self.noop_max = noop_max
        self.frame_skip = frame_skip
        self.screen_size = screen_size
        self.terminal_on_life_loss = terminal_on_life_loss
        self.grayscale_obs = grayscale_obs
        self.grayscale_newaxis = grayscale_newaxis
        self.scale_obs = scale_obs

        # buffer of most recent two observations for max pooling
        if grayscale_obs:
            self.obs_buffer = [
                np.empty(env.observation_space.shape[:2], dtype=np.uint8),
                np.empty(env.observation_space.shape[:2], dtype=np.uint8),
            ]
        else:
            self.obs_buffer = [
                np.empty(env.observation_space.shape, dtype=np.uint8),
                np.empty(env.observation_space.shape, dtype=np.uint8),
            ]

        self.ale = env.unwrapped.ale
        self.lives = 0
        self.game_over = False

        _low, _high, _obs_dtype = (
            (0, 255, np.uint8) if not scale_obs else (0, 1, np.float32)
        )
        _shape = (screen_size, screen_size, 1 if grayscale_obs else 3)
        if grayscale_obs and not grayscale_newaxis:
            _shape = _shape[:-1]  # Remove channel axis
        self.observation_space = Box(
            low=_low, high=_high, shape=_shape, dtype=_obs_dtype
        )

    def step(self, action):
        R = 0.0

        for t in range(self.frame_skip):
            _, reward, done, info = self.env.step(action)
            R += reward
            self.game_over = done

            if self.terminal_on_life_loss:
                new_lives = self.ale.lives()
                done = done or new_lives < self.lives
                self.lives = new_lives

            if done:
                break
            if t == self.frame_skip - 2:
                if self.grayscale_obs:
                    self.ale.getScreenGrayscale(self.obs_buffer[1])
                else:
                    self.ale.getScreenRGB(self.obs_buffer[1])
            elif t == self.frame_skip - 1:
                if self.grayscale_obs:
                    self.ale.getScreenGrayscale(self.obs_buffer[0])
                else:
                    self.ale.getScreenRGB(self.obs_buffer[0])
        return self._get_obs(), R, done, info

    def reset(self, **kwargs):
        # NoopReset
        if kwargs.get("return_info", False):
            _, reset_info = self.env.reset(**kwargs)
        else:
            _ = self.env.reset(**kwargs)
            reset_info = {}

        noops = (
            self.env.unwrapped.np_random.integers(1, self.noop_max + 1)
            if self.noop_max > 0
            else 0
        )
        for _ in range(noops):
            _, _, done, step_info = self.env.step(0)
            reset_info.update(step_info)
            if done:
                if kwargs.get("return_info", False):
                    _, reset_info = self.env.reset(**kwargs)
                else:
                    _ = self.env.reset(**kwargs)
                    reset_info = {}

        self.lives = self.ale.lives()
        if self.grayscale_obs:
            self.ale.getScreenGrayscale(self.obs_buffer[0])
        else:
            self.ale.getScreenRGB(self.obs_buffer[0])
        self.obs_buffer[1].fill(0)

        if kwargs.get("return_info", False):
            return self._get_obs(), reset_info
        else:
            return self._get_obs()

    def _get_obs(self):
        if self.frame_skip > 1:  # more efficient in-place pooling
            np.maximum(self.obs_buffer[0], self.obs_buffer[1], out=self.obs_buffer[0])
        obs = cv2.resize(
            self.obs_buffer[0],
            (self.screen_size, self.screen_size),
            interpolation=cv2.INTER_AREA,
        )

        if self.scale_obs:
            obs = np.asarray(obs, dtype=np.float32) / 255.0
        else:
            obs = np.asarray(obs, dtype=np.uint8)

        if self.grayscale_obs and self.grayscale_newaxis:
            obs = np.expand_dims(obs, axis=-1)  # Add a channel axis
        return obs

#

Code 12 3   The class FrameStack
https://github.com/openai/gym/blob/master/gym/wrappers/frame_stack.py

In [None]:
#

from gym.spaces import Box
from gym import ObservationWrapper


class LazyFrames:

    __slots__ = ("frame_shape", "dtype", "shape", "lz4_compress", "_frames")

    def __init__(self, frames, lz4_compress=False):
        self.frame_shape = tuple(frames[0].shape)
        self.shape = (len(frames),) + self.frame_shape
        self.dtype = frames[0].dtype
        if lz4_compress:
            from lz4.block import compress

            frames = [compress(frame) for frame in frames]
        self._frames = frames
        self.lz4_compress = lz4_compress

    def __array__(self, dtype=None):
        arr = self[:]
        if dtype is not None:
            return arr.astype(dtype)
        return arr

    def __len__(self):
        return self.shape[0]

    def __getitem__(self, int_or_slice):
        if isinstance(int_or_slice, int):
            return self._check_decompress(self._frames[int_or_slice])  # single frame
        return np.stack(
            [self._check_decompress(f) for f in self._frames[int_or_slice]], axis=0
        )

    def __eq__(self, other):
        return self.__array__() == other

    def _check_decompress(self, frame):
        if self.lz4_compress:
            from lz4.block import decompress

            return np.frombuffer(decompress(frame), dtype=self.dtype).reshape(
                self.frame_shape
            )
        return frame


class FrameStack(ObservationWrapper):

    def __init__(self, env, num_stack, lz4_compress=False):
        super().__init__(env)
        self.num_stack = num_stack
        self.lz4_compress = lz4_compress

        self.frames = deque(maxlen=num_stack)

        low = np.repeat(self.observation_space.low[np.newaxis, ...], num_stack, axis=0)
        high = np.repeat(
            self.observation_space.high[np.newaxis, ...], num_stack, axis=0
        )
        self.observation_space = Box(
            low=low, high=high, dtype=self.observation_space.dtype
        )

    def observation(self):
        return LazyFrames(list(self.frames), self.lz4_compress)

    def step(self, action):
        observation, reward, done, info = self.env.step(action)
        self.frames.append(observation)
        return self.observation(), reward, done, info

    def reset(self, **kwargs):
        if kwargs.get("return_info", False):
            obs, info = self.env.reset(**kwargs)
        else:
            obs = self.env.reset(**kwargs)
            info = None  # Unused
        [self.frames.append(obs) for _ in range(self.num_stack)]

        if kwargs.get("return_info", False):
            return self.observation(), info
        else:
            return self.observation()

#