In [4]:
# Import libraries
import numpy as np
import gym
from gym import spaces

In [12]:
# Define a simple grid environment where the agent moves towards a goal
# We are setting the goal at position 4,4
class SimpleGoalEnv(gym.Env):
    def __init__(self):
        super(SimpleGoalEnv, self).__init__()
        # 4 possible actions: up, down, left, right
        self.action_space = spaces.Discrete(4) 
        self.observation_space = spaces.Box(low=np.array([0, 0]), high=np.array([4, 4]), dtype=np.float32)
        self.goal = np.array([4, 4])
        self.state = None

    def reset(self):
        self.state = np.array([0, 0])
        return self.state

    def step(self, action):
        if action == 0:
            # move up
            self.state[1] += 1 
        elif action == 1:
            # move down
            self.state[1] -= 1 
        elif action == 2:
            # move left
            self.state[0] -= 1
        elif action == 3:
            # move right
            self.state[0] += 1 

        self.state = np.clip(self.state, 0, 4)
        done = np.array_equal(self.state, self.goal)
        # Reward is 1 if goal is reached, otherwise reward is -0.1
        reward = 1 if done else -0.1 
        return self.state, reward, done, {}

# HER replay buffer - manages the storage and sampling of experiences
class HERReplayBuffer:
    def __init__(self, size, goal_selection_strategy='future'):
        self.size = size
        self.buffer = []
        self.position = 0
        self.goal_selection_strategy = goal_selection_strategy

    # Add method stores complete episodes in the buffer
    def add(self, episode):
        if len(self.buffer) < self.size:
            self.buffer.append(None)
        self.buffer[self.position] = episode
        self.position = (self.position + 1) % self.size

    # Samples episodes from the buffer
    def sample(self, batch_size):
        batch_size = min(batch_size, len(self.buffer))  # Adjust batch size to the number of available episodes
        indices = np.random.choice(len(self.buffer), batch_size, replace=False)
        return [self.buffer[idx] for idx in indices]

    # Create new experiences by replacing goals from stored episodes with states that were actually encountered in the episodes
    def sample_hindsight(self, batch_size):
        episodes = self.sample(batch_size)
        her_episodes = []
        for episode in episodes:
            num_steps = len(episode['states'])
            for t in range(num_steps - 1):  # Ensure there's a next state available
                if num_steps > 1:
                    new_goal = episode['states'][np.random.randint(t + 1, num_steps)]
                    her_episode = {
                        'state': episode['states'][t],
                        'action': episode['actions'][t],
                        'reward': 1 if np.array_equal(episode['states'][t + 1], new_goal) else -0.1,
                        'next_state': episode['states'][t + 1],
                        'goal': new_goal
                    }
                    her_episodes.append(her_episode)
        return her_episodes

# Agent that randomly selects actions
class RandomAgent:
    def __init__(self, action_space):
        self.action_space = action_space

    def act(self, state):
        return self.action_space.sample()

# Training loop with HER
# Run an episode (agent interacts with environment, storing states and actions)
# Add completed episodes to the replay buffer
# Sample and print HER experiences
def train(env, agent, buffer, num_episodes, batch_size):
    for episode in range(num_episodes):
        state = env.reset()
        episode_data = {'states': [], 'actions': []}
        done = False

        while not done:
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            episode_data['states'].append(state)
            episode_data['actions'].append(action)
            state = next_state

        buffer.add(episode_data)

        # HER sampling and learning
        her_samples = buffer.sample_hindsight(batch_size)
        for sample in her_samples:
            # For this demo, we'll just print the HER samples instead of updating the policy
            print(f"State: {sample['state']}, Action: {sample['action']}, Reward: {sample['reward']}, Next State: {sample['next_state']}, Goal: {sample['goal']}")

# Initialize environment, agent, and HER buffer
env = SimpleGoalEnv()
agent = RandomAgent(env.action_space)
her_buffer = HERReplayBuffer(size=1000)

# Train the agent with HER
train(env, agent, her_buffer, num_episodes=10, batch_size=5)


State: [1 0], Action: 3, Reward: -0.1, Next State: [2 0], Goal: [1 1]
State: [2 0], Action: 3, Reward: -0.1, Next State: [1 0], Goal: [2 1]
State: [1 0], Action: 2, Reward: -0.1, Next State: [ 1 -1], Goal: [2 1]
State: [ 1 -1], Action: 1, Reward: -0.1, Next State: [ 1 -1], Goal: [0 2]
State: [ 1 -1], Action: 1, Reward: -0.1, Next State: [0 0], Goal: [2 1]
State: [0 0], Action: 2, Reward: -0.1, Next State: [0 1], Goal: [4 4]
State: [0 1], Action: 0, Reward: -0.1, Next State: [0 2], Goal: [0 1]
State: [0 2], Action: 0, Reward: -0.1, Next State: [0 3], Goal: [0 0]
State: [0 3], Action: 0, Reward: -0.1, Next State: [0 2], Goal: [2 0]
State: [0 2], Action: 1, Reward: -0.1, Next State: [0 1], Goal: [3 1]
State: [0 1], Action: 1, Reward: -0.1, Next State: [1 1], Goal: [1 0]
State: [1 1], Action: 3, Reward: -0.1, Next State: [0 1], Goal: [2 1]
State: [0 1], Action: 2, Reward: -0.1, Next State: [1 1], Goal: [4 3]
State: [1 1], Action: 3, Reward: -0.1, Next State: [1 0], Goal: [ 4 -1]
State: [1 