In [1]:
from procgen import ProcgenGym3Env
import gym

In [5]:
import random
import numpy as np

# Constants
COLORS = ["blue", "green", "red"]  # Assuming 3 colors: blue, green, red
KEY = 2
LOCKED_DOOR = 1
WORLD_DIM = 25

MAZE_STATE_DICT_TEMPLATE = [
    ["int", "SERIALIZE_VERSION"],
    ["string", "game_name"],
    ["int", "options.paint_vel_info"],
    ["int", "options.use_generated_assets"],
    ["int", "options.use_monochrome_assets"],
    ["int", "options.restrict_themes"],
    ["int", "options.use_backgrounds"],
    ["int", "options.center_agent"],
    ["int", "options.debug_mode"],
    ["int", "options.distribution_mode"],
    ["int", "options.use_sequential_levels"],
    ["int", "options.use_easy_jump"],
    ["int", "options.plain_assets"],
    ["int", "options.physics_mode"],
    ["int", "grid_step"],
    ["int", "level_seed_low"],
    ["int", "level_seed_high"],
    ["int", "game_type"],
    ["int", "game_n"],
    # level_seed_rand_gen.serialize(b'],
    ["int", "level_seed_rand_gen.is_seeded"],
    ["string", "level_seed_rand_gen.str"],
    # end level_seed_rand_gen.serialize(b'],
    # rand_gen.serialize(b'],
    ["int", "rand_gen.is_seeded"],
    ["string", "rand_gen.str"],
    # end rand_gen.serialize(b'],
    ["float", "step_data.reward"],
    ["int", "step_data.done"],
    ["int", "step_data.level_complete"],
    ["int", "action"],
    ["int", "timeout"],
    ["int", "current_level_seed"],
    ["int", "prev_level_seed"],
    ["int", "episodes_remaining"],
    ["int", "episode_done"],
    ["int", "last_reward_timer"],
    ["float", "last_reward"],
    ["int", "default_action"],
    ["int", "fixed_asset_seed"],
    ["int", "cur_time"],
    ["int", "is_waiting_for_step"],
    # end Game::serialize(b'],
    ["int", "grid_size"],
    # write_entities(b, entities'],
    ["int", "ents.size"],
    # for (size_t i = 0; i < ents.size(', i++)
    [
        "loop",
        "ents",
        "ents.size",
        [
            # ents[i]->serialize(b'],
            ["float", "x"],
            ["float", "y"],
            ["float", "vx"],
            ["float", "vy"],
            ["float", "rx"],
            ["float", "ry"],
            ["int", "type"],
            ["int", "image_type"],
            ["int", "image_theme"],
            ["int", "render_z"],
            ["int", "will_erase"],
            ["int", "collides_with_entities"],
            ["float", "collision_margin"],
            ["float", "rotation"],
            ["float", "vrot"],
            ["int", "is_reflected"],
            ["int", "fire_time"],
            ["int", "spawn_time"],
            ["int", "life_time"],
            ["int", "expire_time"],
            ["int", "use_abs_coords"],
            ["float", "friction"],
            ["int", "smart_step"],
            ["int", "avoids_collisions"],
            ["int", "auto_erase"],
            ["float", "alpha"],
            ["float", "health"],
            ["float", "theta"],
            ["float", "grow_rate"],
            ["float", "alpha_decay"],
            [
                "float",
                "climber_spawn_x",
            ],
        ],
    ],
    # end ents[i]->serialize(b'],
    # end write_entities
    ["int", "use_procgen_background"],
    ["int", "background_index"],
    ["float", "bg_tile_ratio"],
    ["float", "bg_pct_x"],
    ["float", "char_dim"],
    ["int", "last_move_action"],
    ["int", "move_action"],
    ["int", "special_action"],
    ["float", "mixrate"],
    ["float", "maxspeed"],
    ["float", "max_jump"],
    ["float", "action_vx"],
    ["float", "action_vy"],
    ["float", "action_vrot"],
    ["float", "center_x"],
    ["float", "center_y"],
    ["int", "random_agent_start"],
    ["int", "has_useful_vel_info"],
    ["int", "step_rand_int"],
    # asset_rand_gen.serialize(b'],
    ["int", "asset_rand_gen.is_seeded"],
    ["string", "asset_rand_gen.str"],
    # end asset_rand_gen.serialize(b'],
    ["int", "main_width"],
    ["int", "main_height"],
    ["int", "out_of_bounds_object"],
    ["float", "unit"],
    ["float", "view_dim"],
    ["float", "x_off"],
    ["float", "y_off"],
    ["float", "visibility"],
    ["float", "min_visibility"],
    # grid.serialize(b'],
    ["int", "w"],
    ["int", "h"],
    # b->write_vector_int(data'],
    ["int", "data.size"],
    # for (auto i : v) {
    ["loop", "data", "data.size", [["int", "i"]]],
    # end b->write_vector_int(data'],
    # end grid.serialize(b'],
    # end BasicAbstractGame::serialize(b'],
    ["int", "maze_dim"],
    ["int", "world_dim"],
    ["int", "END_OF_BUFFER"],
]

class StateValue:
    def __init__(self, val, idx):
        self.val = val
        self.idx = idx

class EnvState:
    def __init__(self, state_bytes):
        self.state_bytes = state_bytes
        self.state_vals = self._parse_state_bytes(state_bytes)

    def _parse_state_bytes(self, state_bytes):
        state_vals = {}

        num_keys = int.from_bytes(state_bytes[:4], byteorder="little")
        state_vals["num_keys"] = num_keys

        # Extract the world dimension
        world_dim = int.from_bytes(state_bytes[4:8], byteorder="little")
        state_vals["world_dim"] = world_dim

        # Extract the color order
        color_order = []
        for i in range(num_keys):
            color_idx = state_bytes[8 + i]
            color = COLORS[color_idx]
            color_order.append(StateValue(color, i))
        state_vals["color_order"] = color_order

        # Extract the grid size
        grid_size = int.from_bytes(state_bytes[8 + num_keys : 12 + num_keys], byteorder="little")
        state_vals["grid_size"] = grid_size

        # Extract the grid data
        grid_data = []
        for i in range(grid_size * grid_size):
            val = int.from_bytes(state_bytes[12 + num_keys + i : 13 + num_keys + i], byteorder="little")
            grid_data.append(StateValue(val, i))
        state_vals["grid_data"] = grid_data

        return state_vals

    def _serialize_state(self, state_vals):
        # Implement the logic to serialize the state values into bytes
        # ...
        return b""

    def get_color_order(self):
        """Get the color order from the game state."""
        color_order = self.state_vals["color_order"]
        return [color.val for color in color_order]

    def set_color_order(self, color_order):
        """Set the color order in the game state."""
        state_vals = self.state_vals
        state_vals["color_order"] = [StateValue(color, 0) for color in color_order]
        self.state_bytes = self._serialize_state(state_vals)

    def randomly_order_colors(self):
        """Randomly order the colors in the game state."""
        color_order = COLORS.copy()
        random.shuffle(color_order)
        self.set_color_order(color_order)

    def get_objects(self):
        # Implement the logic to retrieve the objects from the game state
        # and return them as a list of dictionaries
        # ...
        return []

def game_reset(env):
    # Reset the game environment
    # ...

    # Randomly order the colors
    state = EnvState(env.env.callmethod("get_state")[0])
    state.randomly_order_colors()
    env.env.callmethod("set_state", [state.state_bytes])

    # ...

def asset_for_type(state, obj_type, obj_theme):
    if obj_type == KEY:
        color_order = state.get_color_order()
        color = color_order[obj_theme]
        return f"misc_assets/key{color.capitalize()}.png"
    elif obj_type == LOCKED_DOOR:
        color_order = state.get_color_order()
        color = color_order[obj_theme]
        return f"misc_assets/lock_{color}.png"
    # ... (code for other object types) ...

def render_game_frame(state):
    frame = np.zeros((WORLD_DIM, WORLD_DIM, 3), dtype=np.uint8)

    for obj in state.get_objects():
        obj_type = obj["type"]
        obj_theme = obj["image_theme"]
        asset = asset_for_type(state, obj_type, obj_theme)

        # Load the asset image and draw it on the frame
        # ...

    return frame

def create_venv(num=1, start_level=100, num_levels=200):
    env_name = "procgen:procgen-heist-v0"  
    env = gym.make(env_name, start_level=start_level, num_levels=num_levels, render_mode="rgb_array", distribution_mode="easy") #remove render mode argument to go faster but not produce images 
    return env

# Main game loop
def play_game():
    env = create_venv(num=1, start_level=0, num_levels=1)

    while True:
        game_reset(env)
        state = EnvState(env.env.callmethod("get_state")[0])

        while not done:
            action = choose_action(state)
            obs, reward, done, info = env.step(action)
            state = EnvState(env.env.callmethod("get_state")[0])
            frame = render_game_frame(state)
            # Display the frame
            # ...

        # Game over logic
        # ...



In [14]:
env = ProcgenGym3Env(num=1, env_name="heist", render_mode="rgb_array")
state = env.callmethod("get_state")[0]

# Analyze the game state
print("Game state:")
print(state)

# Modify specific values in the game state
modified_state = bytearray(state)
# modified_state[...] = ...  # Modify specific bytes based on your analysis
# env.callmethod("set_state", [bytes(modified_state)])

# # Render the modified game frame
# obs, _, _, _ = env.step(0)

# # Observe the changes in the rendered frame
# import matplotlib.pyplot as plt
# plt.imshow(obs[0])
# plt.show()

Game state:
b'\x00\x00\x00\x00\x05\x00\x00\x00heist\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\x7f\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00%\x1a\x00\x002494687307 92190159 1190298030 618451215 3261628912 2993377253 1969890602 1223798015 4292816447 4134492598 2699247988 2850748826 3161379077 1355167441 825815071 2295354187 2311187678 1799892702 1353241870 1409947135 1337039691 4096974408 1226102446 2934266403 547305502 1152070128 3202067760 2964920022 3590944193 2450409384 4053942321 45880794 1289505059 2381974668 339800617 2857794385 3747812068 528815177 22745588 3006638444 1977202319 3883467824 3076715338 1429439380 2748360438 3411440242 1232117700 3109189097 2618312936 4292345476 1897835126 3885232167 1239220841 1532205886 2563372818 3730180488 2849646352 1803029588 3611908804 2700496831

In [11]:
from procgen import ProcgenGym3Env
import struct
import typing
from typing import Tuple, Dict, Callable, List, Optional
from dataclasses import dataclass

DEBUG = (
    False  # slows everything down by ensuring parse & serialize are inverses.
)

# Types and things


@dataclass
class StateValue:
    val: typing.Any
    idx: int

StateValues = typing.Dict[
    str, typing.Any
]  # Union[StateValue, List[StateValue], 'StateValues']]
Square = typing.Tuple[int, int]

def _parse_maze_state_bytes(state_bytes: bytes, assert_=DEBUG) -> StateValues:
    # Functions to read values of different types
    def read_fixed(sb, idx, fmt):
        sz = struct.calcsize(fmt)
        # print(f'{idx} chomp {sz} got {len(sb[idx:(idx+sz)])} fmt {fmt}')
        val = struct.unpack(fmt, sb[idx : (idx + sz)])[0]
        idx += sz
        return val, idx

    read_int = lambda sb, idx: read_fixed(sb, idx, "@i")
    read_float = lambda sb, idx: read_fixed(sb, idx, "@f")

    def read_string(sb, idx):
        sz, idx = read_int(sb, idx)
        val = sb[idx : (idx + sz)].decode("ascii")
        idx += sz
        return val, idx

    # Function to process a value definition and return a value (called recursively for loops)
    def parse_value(vals, val_def, idx):
        typ = val_def[0]
        name = val_def[1]
        # print((typ, name))
        if typ == "int":
            val, idx = read_int(state_bytes, idx)
            vals[name] = StateValue(val, idx)
        elif typ == "float":
            val, idx = read_float(state_bytes, idx)
            vals[name] = StateValue(val, idx)
        elif typ == "string":
            val, idx = read_string(state_bytes, idx)
            vals[name] = StateValue(val, idx)
        elif typ == "loop":
            len_name = val_def[2]
            loop_val_defs = val_def[3]
            loop_len = vals[len_name].val
            vals[name] = []
            for _ in range(loop_len):
                vals_this = {}
                for loop_val_def in loop_val_defs:
                    idx = parse_value(vals_this, loop_val_def, idx)
                vals[name].append(vals_this)
        return idx

    # Dict to hold values
    vals = {}

    # Loop over list of value defs, parsing each
    idx = 0
    for val_def in MAZE_STATE_DICT_TEMPLATE:
        idx = parse_value(vals, val_def, idx)

    if assert_:
        assert (
            _serialize_maze_state(vals, assert_=False) == state_bytes
        ), "serialize(deserialize(state_bytes)) != state_bytes"
    return vals

def _serialize_maze_state(state_vals: StateValues, assert_=DEBUG) -> bytes:
    # Serialize any value to a bytes object
    def serialize_val(val):
        if isinstance(val, StateValue):
            val = val.val
        if isinstance(val, int):
            return struct.pack("@i", val)
        elif isinstance(val, float):
            return struct.pack("@f", val)
        elif isinstance(val, str):
            return serialize_val(len(val)) + val.encode("ascii")
        else:
            raise ValueError(f"type(val)={type(val)} not handled")

    # Flatten the nested values into a single list of primitives
    def flatten_vals(vals, flat_list=[]):
        if isinstance(vals, dict):
            for val in vals.values():
                flatten_vals(val, flat_list)
        elif isinstance(vals, list):
            for val in vals:
                flatten_vals(val, flat_list)
        else:
            flat_list.append(vals)

    # Flatten the values, then serialize
    flat_vals = []
    flatten_vals(state_vals, flat_vals)

    state_bytes = b"".join([serialize_val(val) for val in flat_vals])

    if assert_:
        assert (
            _parse_maze_state_bytes(state_bytes, assert_=False) == state_vals
        ), "deserialize(serialize(state_vals)) != state_vals"
    return state_bytes


def test_state_parsing():
    env = ProcgenGym3Env(num=1, env_name="maze", num_levels=1, start_level=0)
    state_bytes = env.callmethod("get_state")[0]
    
    state_values = _parse_maze_state_bytes(state_bytes)
    
    # Access specific state values
    world_dim = state_values["world_dim"].val
    grid_size = state_values["grid_size"].val
    for key, val in state_values.items():
    
        print(key, val)
    # grid_data = state_values["grid_data"]
    
    print("World Dimension:", world_dim)
    print("Grid Size:", grid_size)
    # print("Grid Data:")
    # for row in range(grid_size):
    #     for col in range(grid_size):
    #         print(grid_data[row * grid_size + col].val, end=" ")
    #     print()

test_state_parsing()

SERIALIZE_VERSION StateValue(val=0, idx=4)
game_name StateValue(val='maze', idx=12)
options.paint_vel_info StateValue(val=0, idx=16)
options.use_generated_assets StateValue(val=0, idx=20)
options.use_monochrome_assets StateValue(val=0, idx=24)
options.restrict_themes StateValue(val=0, idx=28)
options.use_backgrounds StateValue(val=1, idx=32)
options.center_agent StateValue(val=0, idx=36)
options.debug_mode StateValue(val=0, idx=40)
options.distribution_mode StateValue(val=1, idx=44)
options.use_sequential_levels StateValue(val=0, idx=48)
options.use_easy_jump StateValue(val=0, idx=52)
options.plain_assets StateValue(val=0, idx=56)
options.physics_mode StateValue(val=0, idx=60)
grid_step StateValue(val=1, idx=64)
level_seed_low StateValue(val=0, idx=68)
level_seed_high StateValue(val=1, idx=72)
game_type StateValue(val=0, idx=76)
game_n StateValue(val=0, idx=80)
level_seed_rand_gen.is_seeded StateValue(val=1, idx=84)
level_seed_rand_gen.str StateValue(val='3481251806 1032365875 21271188