In [1]:
!pip install gym



In [13]:
import gym
from gym import spaces
import numpy as np


class HomeostaticEnv(gym.Env):
    """
    Custom Environment that follows gym interface.
    This is a stationary gridworld where resources are randomly distributed and actions
    are chosen randomly.
    """

    def __init__(self, grid_size=8,
                 field_of_view=1,
                 n_stats=3,
                 set_point=1,
                 initial_stats=[0.5, 0.5],
                 loc=[3, 3],
                 n_actions=4,
                 stat_decrease_per_step=0.005,
                 ep_length=50,
                 resource_percentile=70):
        super(HomeostaticEnv, self).__init__()

        # Agent parameters
        # The initial location of the agent
        self.loc = loc  # 2D location [x, y] where x = rows and y = columns
        self.n_actions = n_actions # The number of actions
        self.action_space = spaces.Discrete(n_actions)
        self.ep_length = ep_length # The number of timesteps in an episode

        # Stats parameters
        self.n_stats = n_stats  # Number of features (one for each stat)
        self.set_point = set_point  # Goal of stats
        self.initial_stats = initial_stats  # The range of initial stats: [x, y] x = lowest initial stat, y = highest
        self.stat_decrease_per_step = stat_decrease_per_step  # Stat decreases over time

        # Grid world parameters
        self.grid_size = grid_size  # Size of the gridworld (grid_size x grid_size)
        #self.field_of_view = field_of_view  # How many cells the agent can see in each direction
        #self.view_size = 2 * self.field_of_view + 1  # The height and width of the view square

        # Resource parameters
        self.resource_percentile = resource_percentile  # Resources affect stat above this

        # Initialize the environment --Reset
        #self.grid = np.zeros((self.n_stats, self.grid_size, self.grid_size))  # empty world grid
        self.reset_stats()
        self.reset_grid()
        self.reset()


    ######## Reset
    def reset(self):
        self.time_step = 0  # resets step timer
        self.dead = False
        self.done = False

    def reset_stats(self):
        """Initialize stats of the agent."""
        self.stats = []
        for i in range(self.n_stats):
            self.stats.append(np.random.uniform(self.initial_stats[0], self.initial_stats[1]))

    def reset_grid(self):
        """Reset both thresholds and distribution of resources."""
        self.thresholds = []
        # Randomly generate resources
        self.grid = np.random.rand(self.n_stats, self.grid_size, self.grid_size)

        for stat in range(self.n_stats):
            # Normalize to ensure the same total amount of each resource (total amount = 1)
            total_resource = np.sum(self.grid[stat, :, :])
            self.grid[stat, :, :] /= total_resource
            # Returns a single stat value: the 95th percentile (resource_percentile=95)(high) in the whole grid world
            self.thresholds.append(np.percentile(self.grid[stat, :, :], self.resource_percentile))


    ####### Step
    def step(self, action):
        """Update locations, stats, and rewards."""
        self.time_step += 1

        # Check if this episode terminates
        if self.time_step == self.ep_length:
            self.done = True

        # Move
        if action == 0:  # move down
            self.loc[0] = min(self.grid_size - 1, self.loc[0] + 1)
        elif action == 1:  # move up
            self.loc[0] = max(0, self.loc[0] - 1)
        elif action == 2:  # move right
            self.loc[1] = min(self.grid_size - 1, self.loc[1] + 1)
        elif action == 3:  # move left
            self.loc[1] = max(0, self.loc[1] - 1)

        reward = self.step_stats()

        # Return step information
        return reward, self.done, self.dead

    def step_stats(self):
        """Update stats and dead."""
        #self.old_stats = copy.deepcopy(self.stats)

        for i in range(self.n_stats):
            # All stats decrease over time
            self.stats[i] -= self.stat_decrease_per_step
            # Stats only increase when resources are above thresholds
            if self.grid[i, self.loc[0], self.loc[1]] > self.thresholds[i]:
                self.stats[i] += self.grid[i, self.loc[0], self.loc[1]]

            if self.stats[i] < 0:
                self.dead = True

        return self.sq_dev_reward()

    def sq_dev_reward(self):
        return 0.2 - sum([(self.set_point - stat) ** 2 for stat in self.stats])

    def render(self, mode='human'):
        # Simple text-based rendering
        print(f"Position: {self.loc}, Stats: {np.round(self.stats, 2)}")

  and should_run_async(code)


In [15]:
# Example usage
if __name__ == "__main__":
    env = HomeostaticEnv()
    total_reward = 0

    while not done and not dead:
        action = env.action_space.sample()  # Random action
        reward, done, dead = env.step(action)
        total_reward += reward
        env.render()

    print(f"Total reward: {total_reward}")

Position: [4, 4], Stats: [0.5 0.5 0.5]
Position: [4, 3], Stats: [0.52 0.52 0.49]
Position: [4, 4], Stats: [0.51 0.52 0.48]
Position: [5, 4], Stats: [0.51 0.54 0.48]
Position: [5, 5], Stats: [0.53 0.53 0.48]
Position: [5, 4], Stats: [0.53 0.55 0.47]
Position: [5, 3], Stats: [0.52 0.58 0.49]
Position: [4, 3], Stats: [0.54 0.6  0.49]
Position: [3, 3], Stats: [0.54 0.6  0.48]
Position: [3, 4], Stats: [0.56 0.59 0.48]
Position: [3, 5], Stats: [0.55 0.59 0.5 ]
Position: [3, 4], Stats: [0.57 0.58 0.5 ]
Position: [2, 4], Stats: [0.57 0.58 0.49]
Position: [2, 3], Stats: [0.56 0.59 0.49]
Position: [2, 2], Stats: [0.56 0.59 0.48]
Position: [3, 2], Stats: [0.55 0.58 0.51]
Position: [3, 3], Stats: [0.55 0.58 0.5 ]
Position: [3, 4], Stats: [0.57 0.57 0.5 ]
Position: [3, 3], Stats: [0.56 0.57 0.49]
Position: [3, 2], Stats: [0.56 0.56 0.51]
Position: [3, 1], Stats: [0.58 0.56 0.53]
Position: [2, 1], Stats: [0.57 0.55 0.53]
Position: [1, 1], Stats: [0.57 0.57 0.55]
Position: [0, 1], Stats: [0.56 0.57 0

  and should_run_async(code)
