In [15]:
from IPython import display
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import torch
import omegaconf
from gymnasium import spaces
import gymnasium as gym


import mbrl.models as models
from mbrl.models import Model
import mbrl.planning as planning
import mbrl.util.common as common_util
import mbrl.util as util
from pyoperon.sklearn import SymbolicRegressor


%load_ext autoreload
%autoreload 2

mpl.rcParams.update({"font.size": 16})

device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

In [16]:
class Simple1DMDP(gym.Env):
    def __init__(self):
        super(Simple1DMDP, self).__init__()
        
        # define the action space
        self.action_space = spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float32)
        
        # define the observation space: continuous single dimension for position
        self.observation_space = spaces.Box(low=-10, high=10, shape=(1,), dtype=np.float32)
        # Initialize state and episode length
        self.state = 0.0
        self.episode_length = 10
        self.current_step = 0

    def reset(self, seed=None):
        # reset the state to 0 and the step counter
        self.state = 0.0
        self.current_step = 0
        return np.array([self.state], dtype=np.float32), {}

    def step(self, action):  
        # update state based on action
        self.state += action
        # clip in the case we go outside of [-10,10]
        self.state = np.clip(self.state, -10.,10.)
        
        # calculate reward
        reward = np.cos(2 * np.pi * self.state) * np.exp(np.abs(self.state) / 3)
        # increment step counter
        self.current_step += 1
        # check if episode is terminated
        terminated = self.current_step >= self.episode_length
        # set placeholder for truncated
        truncated = False
        # set placeholder for info
        info = {}
        return np.array([self.state], dtype=np.float32), reward, terminated, truncated, info

    def render(self, mode='human'):
        # simple print rendering
        print(f"Step: {self.current_step}, State: {self.state}")

In [42]:
def term_fn(a, next_obs):
    pass

def reward_fn(a, next_obs):
    return np.cos(2 * np.pi * next_obs) * np.exp(np.abs(next_obs) / 3)

In [17]:
class SymbolicModel(Model):
    def __init__(self, symbols, population_size, generations, max_length):
        super().__init__("cpu")
        self.reg = SymbolicRegressor(population_size = population_size,
                                     allowed_symbols=symbols,
                                     optimizer_iterations=10,
                                     generations = generations,
                                     n_threads=32,
                                     max_evaluations=int(1e6),
                                     max_length = max_length,
                                     tournament_size = 3)
    def forward(self,x):
        return self.reg.predict(x), None
    
    def loss(self, model_in, target):
        return self.reg.score(model_in, target)
    
    def eval_score(self, model_in, target):
        return self.reg.score(model_in, target)

In [32]:
# Register the custom environment
gym.envs.registration.register(
    id='Simple1DMDP-v0',
    entry_point=Simple1DMDP
)


env = gym.make('Simple1DMDP-v0')

# Reset the environment
state = env.reset()
print(f"Initial State: {state}")

for _ in range(10):
    action = env.action_space.sample()  # Random action
    state, reward, terminated, truncated, info = env.step(action)

    print(f"Action: {action}, State: {state}, Reward: {reward}")
    env.render()

Initial State: (array([0.], dtype=float32), {})
Action: [-0.81615496], State: [[-0.81615496]], Reward: [0.53004694]
Step: 1, State: [-0.81615496]
Action: [-0.12825458], State: [[-0.94440955]], Reward: [1.2872664]
Step: 2, State: [-0.94440955]
Action: [0.96853346], State: [[0.02412391]], Reward: [0.99651563]
Step: 3, State: [0.02412391]
Action: [0.70285285], State: [[0.72697675]], Reward: [-0.18368363]
Step: 4, State: [0.72697675]
Action: [-0.29498503], State: [[0.43199173]], Reward: [-1.0510392]
Step: 5, State: [0.43199173]
Action: [-0.78522635], State: [[-0.35323462]], Reward: [-0.67959213]
Step: 6, State: [-0.35323462]
Action: [0.2519107], State: [[-0.10132393]], Reward: [0.83172154]
Step: 7, State: [-0.10132393]
Action: [0.4385841], State: [[0.33726016]], Reward: [-0.5832286]
Step: 8, State: [0.33726016]
Action: [0.48986024], State: [[0.8271204]], Reward: [0.6137009]
Step: 9, State: [0.8271204]
Action: [-0.7058208], State: [[0.12129962]], Reward: [0.7532014]
Step: 10, State: [0.1212

In [40]:
seed = 0
env.reset()
rng = np.random.default_rng(seed=seed)
generator = torch.Generator(device=device)
generator.manual_seed(seed)
obs_shape = env.observation_space.shape
act_shape = env.action_space.shape

In [35]:
trial_length = 500
num_trials = 10
ensemble_size = 7

# Everything with "???" indicates an option with a missing value.
# Our utility functions will fill in these details using the 
# environment information
cfg_dict = {
    # dynamics model configuration
    "dynamics_model": {
        "_target_": "mbrl.models.GaussianMLP",
        "device": device,
        "num_layers": 4,
        "ensemble_size": ensemble_size,
        "hid_size": 200,
        "in_size": "???",
        "out_size": "???",
        "deterministic": True,
        "propagation_method": "fixed_model",
        # can also configure activation function for GaussianMLP
        "activation_fn_cfg": {
            "_target_": "torch.nn.SiLU"}
    },
    # options for training the dynamics model
    "algorithm": {
        "learned_rewards": False,
        "target_is_delta": True,
        "normalize": True,
    },
    # these are experiment specific options
    "overrides": {
        "trial_length": trial_length,
        "num_steps": num_trials * trial_length,
        "model_batch_size": 256,
        "validation_ratio": 0.05
    }
}
cfg = omegaconf.OmegaConf.create(cfg_dict)

In [43]:
# Create a 1-D dynamics model for this environment
dynamics_model = common_util.create_one_dim_tr_model(cfg, obs_shape, act_shape)

# Create a gym-like environment to encapsulate the model
model_env = models.ModelEnv(env, dynamics_model, term_fn, reward_fn, generator=generator)