In [1]:
import gym
from gym import spaces
import numpy as np
import torch
from torch import nn
from win32com.client import Dispatch
import subprocess
import os
import signal

import shutil

In [2]:
import os
import time


def wait_path_exists(path, sleep_time=0.25, peridos_to_wait=40):
    time_counter = 0
    while (not os.path.exists(path)) or os.path.exists(f'C:/Users/user/Taikyu_project/stuff/Super Fish/Realtime/gamemaker_flg.txt'):
        time.sleep(sleep_time)
        time_counter += 1
        if time_counter > peridos_to_wait:
            break


In [3]:
import os
import yaml
from attrdict import AttrDict

def find_file(file_name):
    cur_dir = os.getcwd()

    for root, dirs, files in os.walk(cur_dir):
        if file_name in files:
            return os.path.join(root, file_name)

    raise FileNotFoundError(f"File '{file}' not found in subdirectories of {cur_dir}")

def load_config(config_path):
    if not config_path.endswith(".yml"):
        config_path += ".yml"
    config_path = find_file(config_path)
    with open(config_path) as f:
        config = yaml.load(f, Loader=yaml.FullLoader)
    return AttrDict(config)



config_file='config.yml'

config = load_config(config_file)

In [4]:
def start_iwanna(folder, exe):
    os.chdir(folder)
    return subprocess.Popen([exe])

In [5]:
def close_iwanna(process):
    os.kill(process.pid, signal.SIGTERM)

In [6]:
def actions_to_controls(walk_length, jump_height):
    
    controls_array = np.zeros([6,4], dtype=np.int8)
    
    jump_true = 0
    
    for i in range(6):
        
        if walk_length > 0:
            
            controls_left = 0
            controls_right = 1
                
            walk_length -= 1
        
        elif walk_length < 0:
            
            controls_left = 1
            controls_right = 0
            
            walk_length += 1
        
        else:
            
            controls_left = 0
            controls_right = 0
            
        if jump_height > 0 and jump_true == 0:
            
            controls_jump = 1
            
            jump_true = 1
            jump_height -= 1
        
        else:
            
            controls_jump = 0
            
            jump_height -= 1
            
        if jump_height <= 0 and jump_true == 1:
            
            controls_jump_release = 1
            
            jump_true = 0
        
        else:
            
            controls_jump_release = 0
            
        controls_array[i] = [controls_left,controls_right,controls_jump,controls_jump_release]
    
    return controls_array

In [7]:
def write_controls_txt(controls, file_path, rewrite=True):
    
    if rewrite:
        open(file_path, 'w').close()
    
    for control in controls:
        with open(file_path, 'a') as f:
            f.write(''.join(str(x) for x in control) + '\n')

In [8]:
def read_start_conditions(file_path):
    with open(file_path) as file:
        lines = [line.rstrip() for line in file]
    seed = int(lines[0])
    timeline_start = int(lines[1])
    player_x_start = float(lines[2])
    player_y_start = float(lines[3])
    return seed, timeline_start, player_x_start, player_y_start

In [9]:
def write_start_conditions(file_path, seed, timeline_start, player_x_start, player_y_start, rewrite=True):
    
    if rewrite:
        open(file_path, 'w').close()
        
    with open(file_path, 'w') as file:
        file.write(f'{seed}\n')
        file.write(f'{timeline_start}\n')
        file.write(f'{player_x_start}\n')
        file.write(f'{player_y_start}\n')

In [10]:
read_start_conditions(r'D:\fangames\I wanna be the Palladium ver1.0\Sadistic_Music_Factory\Realtime\start_conditions.txt')

(3186, 0, 403.0, 567.4)

In [11]:
#write_controls_txt(controls, r'D:\fangames\I wanna be the Palladium ver1.0\Sadistic_Music_Factory\Realtime\controls\controls_test_test.txt')

In [12]:
from taikyu_screenshots.dataset import load_image_obs, zero_screenshot_load, zero_screenshot_part_load

In [13]:
class PythonFlg():
    
    def __init__(self, flg_path):
        
        self.raised = 0
        self.flg_path = flg_path
        if os.path.exists(self.flg_path):
            os.remove(self.flg_path)

        
    def raise_flg(self):
        
        if self.raised == 0:
            self.raised = 1
            self.flg_file = open(self.flg_path, "w")
            
    def lower_flg(self):
        
        if self.raised == 1:
            
            self.flg_file.close()
            os.remove(self.flg_path)
            self.raised = 0

In [14]:
flg = PythonFlg(r'D:\fangames\I wanna be the Palladium ver1.0\Sadistic_Music_Factory\Realtime\controls\python_flg.txt')

In [15]:
flg.raise_flg()

In [16]:
flg.lower_flg()

In [17]:
write_controls_txt(actions_to_controls(0,5), r'D:\fangames\I wanna be the Palladium ver1.0\Sadistic_Music_Factory\Realtime\controls\controls_test.txt')

In [88]:
class IwannaEnv(gym.Env):
    
    def __init__(self, root_folder, exe_folder, exe, config, seed=None, timeline_start=None, player_x_start=None, player_y_start=None):
        
        
        self.config = config
        self.flg = PythonFlg(root_folder + 'Realtime/python_flg.txt')
        
        super(IwannaEnv, self).__init__()
        
        self.observation_space = spaces.Box(low=0, high=1,
                                            shape=(4, self.config.environment.image_size, self.config.environment.image_size), 
                                            dtype=np.float32)
        
        self.action_space = spaces.Box(low=np.array([-6, 6]), high=np.array([0, 6]), dtype=np.float32)
        
        self.exe_folder = exe_folder
        self.exe = exe
        self.root_folder = root_folder
        self.screenshot_folder = root_folder + 'Realtime/screenshots/'
        self.screenshot_add_folder = root_folder + 'Realtime/screenshots_add/'
        self.controls_folder = root_folder + 'Realtime/controls/'
        
        self.zero_screenshot = zero_screenshot_load(root_folder + 'Realtime/Zero_screenshot.png',
                                                    image_size=self.config.environment.image_size,
                                                    pad=self.config.parameters.edges_dataset.pad, 
                                                    aperture_size=self.config.parameters.edges_dataset.aperture_size)
        self.zero_screenshot_part = zero_screenshot_part_load(root_folder + 'Realtime/Zero_screenshot_part.png', 
                                                    image_size=self.config.environment.image_size, 
                                                    part_size=self.config.parameters.edges_dataset.part_size, 
                                                    player_x=self.config.parameters.edges_dataset.zero_screenshot_player_x, 
                                                    player_y=self.config.parameters.edges_dataset.zero_screenshot_player_y, 
                                                    pad=self.config.parameters.edges_dataset.pad, 
                                                    aperture_size=self.config.parameters.edges_dataset.aperture_size)
        
        if seed is None or timeline_start is None or player_x_start is None or player_y_start is None:
            self.seed, self.timeline_start, self.player_x_start, self.player_y_start = read_start_conditions(root_folder + r'Realtime\start_conditions.txt')
        else:
            self.set_start_conditions(seed, timeline_start, player_x_start, player_y_start)
        
        self.timeline_start_str = (4 - len(str(self.timeline_start))) * '0' + str(self.timeline_start)

        self.timestep = 6
        
        self.process = None
        
    
    def reset(self):
        
        self.flg.raise_flg()
        
        if self.process is not None:
            close_iwanna(self.process)
        
        self.timestep = 6
        
        self.episode_screenshots_folder = self.screenshot_folder + f'screenshots{self.timeline_start_str}_{self.seed}/'
        
        self.episode_screenshots_add_folder = self.screenshot_add_folder + f'screenshots{self.timeline_start_str}_{self.seed}/'
        
        print(self.episode_screenshots_folder)
        
        self.episode_controls_folder = self.controls_folder + f'controls{self.timeline_start_str}_{self.seed}/'
        
        # Создаем новую папку для скриншотов
        if os.path.exists(self.episode_screenshots_folder):
            shutil.rmtree(self.episode_screenshots_folder)
        os.mkdir(self.episode_screenshots_folder)
        
        # Создаем новую папку для доп скриншотов
        if os.path.exists(self.episode_screenshots_add_folder):
            shutil.rmtree(self.episode_screenshots_add_folder)
        os.mkdir(self.episode_screenshots_add_folder)
        
        # Создаем новую папку для записи инпутов
        if os.path.exists(self.episode_controls_folder):
            shutil.rmtree(self.episode_controls_folder)
        os.mkdir(self.episode_controls_folder)
        
        write_controls_txt(actions_to_controls(0,0), self.episode_controls_folder + f'controls{0}.txt')
        
        img_path = self.episode_screenshots_folder + f'{self.timestep}.png'
        add_img_path = self.episode_screenshots_add_folder + f'{self.timestep}.png'
        
        self.flg.lower_flg()
        
        self.process = start_iwanna(self.exe_folder, self.exe)
    
        wait_path_exists(img_path)
        wait_path_exists(add_img_path)
        
        observation_img, _, _, _ = load_image_obs(img_path, self.zero_screenshot, self.zero_screenshot_part, self.config)
        
        return observation_img
    
    def step(self, action):
        
        print(action)
        
        self.flg.raise_flg()
        
        write_controls_txt(actions_to_controls(*action), self.episode_controls_folder + f'controls{self.timestep}.txt')
        
        self.flg.lower_flg()
        
        img_path = self.episode_screenshots_folder + f'{self.timestep}.png'
        add_info_path = self.episode_screenshots_add_folder + f'{self.timestep}.txt'
        
        wait_path_exists(img_path)
        wait_path_exists(add_info_path)
        
        observation_img, _, reward, terminal = load_image_obs(img_path, self.zero_screenshot, self.zero_screenshot_part, self.config)
        
        info = {}
        
        self.timestep += 6
        
        return observation_img, reward, terminal, info
    
    def render(self, mode='human'):
        
        pass
    
    def close(self):
        
        try:
            close_iwanna(self.process)
        except PermissionError:
            print('Отказано в доступе при попытке остановки процесса')
        except OSError:
            print('Процесс не существует')
        
        self.flg.lower_flg()
    
    def __del__(self):
        
        self.flg.lower_flg()
    
    def set_start_conditions(self, seed, timeline_start, player_x_start, player_y_start):
        
        write_start_conditions(root_folder + 'Realtime/start_conditions.txt', seed, timeline_start, player_x_start, player_y_start)
        self.seed = seed
        self.timeline_start = timeline_start
        self.player_x_start = player_x_start
        self.player_y_start = player_y_start
 

In [89]:
from stable_baselines3 import TD3
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.noise import NormalActionNoise, OrnsteinUhlenbeckActionNoise

# Create environment
iwanna_env = IwannaEnv(root_folder='D:/fangames/I wanna be the Palladium ver1.0/Sadistic_Music_Factory/',
               exe_folder='D:/fangames/I wanna be the Palladium ver1.0/',
               exe='test.exe',
               config=config)

# The noise objects for TD3
n_actions = iwanna_env.action_space.shape[-1]
action_noise = NormalActionNoise(mean=np.zeros(n_actions), sigma=0.1 * np.ones(n_actions))

# Instantiate the agent
model = TD3("CnnPolicy", iwanna_env, action_noise=action_noise, buffer_size=10000, verbose=1)
# Train the agent and display a progress bar
model.learn(total_timesteps=10000, log_interval=10)
model.save("td3_iwanna_test")

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


AssertionError: You should use NatureCNN only with images not with Box([[[0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  ...
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]]

 [[0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  ...
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]]

 [[0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  ...
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]]], [[[1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  ...
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]]

 [[1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  ...
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]]

 [[1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  ...
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]
  [1. 1. 1. ... 1. 1. 1.]]], (3, 64, 64), float32)
(you are probably using `CnnPolicy` instead of `MlpPolicy` or `MultiInputPolicy`)
If you are using a custom environment,
please check it using our env checker:
https://stable-baselines3.readthedocs.io/en/master/common/env_checker.html

In [84]:
iwanna_env.close()

In [39]:
iwanna_env.flg.lower_flg()

In [20]:
obs = {}

In [40]:
iwanna_env = IwannaEnv(root_folder='D:/fangames/I wanna be the Palladium ver1.0/Sadistic_Music_Factory/',
               exe_folder='D:/fangames/I wanna be the Palladium ver1.0/',
               exe='test.exe',
               config=config)
obs[0] = iwanna_env.reset()

  logger.warn(


D:/fangames/I wanna be the Palladium ver1.0/Sadistic_Music_Factory/Realtime/screenshots/screenshots0000_3186/


In [22]:
obs[1] = iwanna_env.step((1,3))
obs[2] = iwanna_env.step((-4,0))

In [26]:
observation_img, action, reward, terminal = obs[2]

In [28]:
action

array([1, 3])

In [65]:
iwanna_env.close()

In [None]:
   
class IwannaEnv(gym.Env):
    r"""The main OpenAI Gym class.
    It encapsulates an environment with arbitrary behind-the-scenes dynamics.
    An environment can be partially or fully observed.
    The main API methods that users of this class need to know are:
    - :meth:`step` - Takes a step in the environment using an action returning the next observation, reward,
      if the environment terminated and observation information.
    - :meth:`reset` - Resets the environment to an initial state, returning the initial observation and observation information.
    - :meth:`render` - Renders the environment observation with modes depending on the output
    - :meth:`close` - Closes the environment, important for rendering where pygame is imported
    And set the following attributes:
    - :attr:`action_space` - The Space object corresponding to valid actions
    - :attr:`observation_space` - The Space object corresponding to valid observations
    - :attr:`reward_range` - A tuple corresponding to the minimum and maximum possible rewards
    - :attr:`spec` - An environment spec that contains the information used to initialise the environment from `gym.make`
    - :attr:`metadata` - The metadata of the environment, i.e. render modes
    - :attr:`np_random` - The random number generator for the environment
    Note: a default reward range set to :math:`(-\infty,+\infty)` already exists. Set it if you want a narrower range.
    """

    # Set this in SOME subclasses
    metadata: Dict[str, Any] = {"render_modes": []}
    # define render_mode if your environment supports rendering
    render_mode: Optional[str] = None
    reward_range = (-float("inf"), float("inf"))
    spec: "EnvSpec" = None

    # Set these in ALL subclasses
    action_space: spaces.Space[ActType]
    observation_space: spaces.Space[ObsType]

    # Created
    _np_random: Optional[np.random.Generator] = None

    @property
    def np_random(self) -> np.random.Generator:
        """Returns the environment's internal :attr:`_np_random` that if not set will initialise with a random seed."""
        if self._np_random is None:
            self._np_random, seed = seeding.np_random()
        return self._np_random

    @np_random.setter
    def np_random(self, value: np.random.Generator):
        self._np_random = value

    def step(self, action: ActType) -> Tuple[ObsType, float, bool, bool, dict]:
        """Run one timestep of the environment's dynamics.
        When end of episode is reached, you are responsible for calling :meth:`reset` to reset this environment's state.
        Accepts an action and returns either a tuple `(observation, reward, terminated, truncated, info)`.
        Args:
            action (ActType): an action provided by the agent
        Returns:
            observation (object): this will be an element of the environment's :attr:`observation_space`.
                This may, for instance, be a numpy array containing the positions and velocities of certain objects.
            reward (float): The amount of reward returned as a result of taking the action.
            terminated (bool): whether a `terminal state` (as defined under the MDP of the task) is reached.
                In this case further step() calls could return undefined results.
            truncated (bool): whether a truncation condition outside the scope of the MDP is satisfied.
                Typically a timelimit, but could also be used to indicate agent physically going out of bounds.
                Can be used to end the episode prematurely before a `terminal state` is reached.
            info (dictionary): `info` contains auxiliary diagnostic information (helpful for debugging, learning, and logging).
                This might, for instance, contain: metrics that describe the agent's performance state, variables that are
                hidden from observations, or individual reward terms that are combined to produce the total reward.
                It also can contain information that distinguishes truncation and termination, however this is deprecated in favour
                of returning two booleans, and will be removed in a future version.
            (deprecated)
            done (bool): A boolean value for if the episode has ended, in which case further :meth:`step` calls will return undefined results.
                A done signal may be emitted for different reasons: Maybe the task underlying the environment was solved successfully,
                a certain timelimit was exceeded, or the physics simulation has entered an invalid state.
        """
        raise NotImplementedError

    def reset(
        self,
        *,
        seed: Optional[int] = None,
        options: Optional[dict] = None,
    ) -> Tuple[ObsType, dict]:
        """Resets the environment to an initial state and returns the initial observation.
        This method can reset the environment's random number generator(s) if ``seed`` is an integer or
        if the environment has not yet initialized a random number generator.
        If the environment already has a random number generator and :meth:`reset` is called with ``seed=None``,
        the RNG should not be reset. Moreover, :meth:`reset` should (in the typical use case) be called with an
        integer seed right after initialization and then never again.
        Args:
            seed (optional int): The seed that is used to initialize the environment's PRNG.
                If the environment does not already have a PRNG and ``seed=None`` (the default option) is passed,
                a seed will be chosen from some source of entropy (e.g. timestamp or /dev/urandom).
                However, if the environment already has a PRNG and ``seed=None`` is passed, the PRNG will *not* be reset.
                If you pass an integer, the PRNG will be reset even if it already exists.
                Usually, you want to pass an integer *right after the environment has been initialized and then never again*.
                Please refer to the minimal example above to see this paradigm in action.
            options (optional dict): Additional information to specify how the environment is reset (optional,
                depending on the specific environment)
        Returns:
            observation (object): Observation of the initial state. This will be an element of :attr:`observation_space`
                (typically a numpy array) and is analogous to the observation returned by :meth:`step`.
            info (dictionary):  This dictionary contains auxiliary information complementing ``observation``. It should be analogous to
                the ``info`` returned by :meth:`step`.
        """
        # Initialize the RNG if the seed is manually passed
        if seed is not None:
            self._np_random, seed = seeding.np_random(seed)

    def render(self) -> Optional[Union[RenderFrame, List[RenderFrame]]]:
        """Compute the render frames as specified by render_mode attribute during initialization of the environment.
        The set of supported modes varies per environment. (And some
        third-party environments may not support rendering at all.)
        By convention, if render_mode is:
        - None (default): no render is computed.
        - human: render return None.
          The environment is continuously rendered in the current display or terminal. Usually for human consumption.
        - rgb_array: return a single frame representing the current state of the environment.
          A frame is a numpy.ndarray with shape (x, y, 3) representing RGB values for an x-by-y pixel image.
        - rgb_array_list: return a list of frames representing the states of the environment since the last reset.
          Each frame is a numpy.ndarray with shape (x, y, 3), as with `rgb_array`.
        - ansi: Return a strings (str) or StringIO.StringIO containing a
          terminal-style text representation for each time step.
          The text can include newlines and ANSI escape sequences (e.g. for colors).
        Note:
            Make sure that your class's metadata 'render_modes' key includes
            the list of supported modes. It's recommended to call super()
            in implementations to use the functionality of this method.
        """
        raise NotImplementedError

    def close(self):
        """Override close in your subclass to perform any necessary cleanup.
        Environments will automatically :meth:`close()` themselves when
        garbage collected or when the program exits.
        """
        pass