In [None]:
#!/home/wp/Studia/soft_robotics/gym/bin/python
# Enable Interactive Plots
%matplotlib widget
import gymnasium as gym
import numpy as np
from tqdm import tqdm
from collections import defaultdict
from matplotlib import pyplot as plt
import os
import shutil
from manipulator.trunk_environment import TrunkEnv  # Import TrunkEnv

class TrunkAgent:
    def __init__(self, env: gym.Env, learning_rate: float, epsilon: float, epsilon_decay: float, final_epsilon: float, discount_factor: float = 0.95):
        """Initialize a reinforcement learning agent."""
        self.env = env
        self.lr = learning_rate
        self.discount_factor = discount_factor
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon
        self.training_error = []
        self.q_values = defaultdict(lambda: np.zeros(self.env.action_space.shape[0]))

        # Define bins for discretization
        N_BINS = 20
        self.bins = [
            np.linspace(-20, 20, N_BINS),  # x-effector
            np.linspace(-30, 0, N_BINS),  # y-effector
            np.linspace(-20, 20, N_BINS),  # x-target
            np.linspace(-30, 0, N_BINS),  # y-target
        ]

    def discretize_observation(self, obs):
        """Discretizes the continuous observation into bins."""
        discrete_obs = tuple(np.digitize(obs[i], self.bins[i]) - 1 for i in range(len(obs)))
        return discrete_obs

    def get_action(self, obs) -> np.ndarray:
        """Returns the best action."""
        if np.random.random() < self.epsilon:
            return self.env.action_space.sample()
        else:
            obs_tuple = self.discretize_observation(obs)
            return np.argmax(self.q_values[obs_tuple])

    def update(self, obs, action, reward, terminated, next_obs):
        """Updates a Q-value of an action."""
        tp_obs = self.discretize_observation(obs)
        tp_next_obs = self.discretize_observation(next_obs)
        future_q_value = (not terminated) * np.max(self.q_values[tp_next_obs])
        temporal_difference = reward + self.discount_factor * future_q_value - self.q_values[tp_obs][action]
        self.q_values[tp_obs][action] += self.lr * temporal_difference
        self.training_error.append(float(temporal_difference))

    def decay_epsilon(self):
        self.epsilon = max(self.epsilon - self.epsilon_decay, self.final_epsilon)


# Hyperparameters
learning_rate = 0.1
n_episodes = 10
start_epsilon = 1.0
final_epsilon = 0.05
epsilon_decay = start_epsilon / (n_episodes / 2)

# Remove previous recordings
folder_path = "/home/wp/Studia/soft_robotics/trunk-agent"
if os.path.exists(folder_path):
    shutil.rmtree(folder_path)
os.makedirs(folder_path)

# Environment setup
env = TrunkEnv(render_mode="rgb")
env = gym.wrappers.RecordVideo(env, video_folder="trunk-agent", name_prefix="eval", episode_trigger=lambda x: x == n_episodes or x == 1)
env = gym.wrappers.RecordEpisodeStatistics(env=env)
agent = TrunkAgent(env=env, learning_rate=learning_rate, epsilon=start_epsilon, epsilon_decay=epsilon_decay, final_epsilon=final_epsilon)

episode_td_errors = []
for episode in tqdm(range(n_episodes)):
    obs, info = env.reset()
    done = False
    episode_td_error = []
    while not done:
        action = agent.get_action(obs)
        next_obs, reward, terminated, truncated, info = env.step(action)
        agent.update(action=action, obs=obs, next_obs=next_obs, reward=reward, terminated=terminated)
        obs = next_obs
        done = terminated or truncated
        episode_td_error.append(agent.training_error[-1])
    episode_td_errors.append(np.mean(episode_td_error))
    agent.decay_epsilon()

# Compute rolling mean of TD errors
rolling_mean = np.convolve(episode_td_errors, np.ones(1) / 1, mode='valid')

# Plot the training error
fig, ax = plt.subplots(3,1,figsize=(10, 12))
ax[0].plot(rolling_mean)
ax[0].set_title("Training Error")
ax[0].set_xlabel("Episode")
ax[0].set_ylabel("Mean Temporal Difference")

ax[1].plot(env.return_queue)
ax[1].set_title("Episode Rewards")
ax[1].set_xlabel("Episode")
ax[1].set_ylabel("Reward")

ax[2].plot(env.length_queue)
ax[2].set_title("Episode Lengths")
ax[2].set_xlabel("Episode")
ax[2].set_ylabel("Length")

plt.tight_layout()
plt.show()

In [None]:

# Evaluate the agent
total_rewards = []
for _ in range(100):  # Evaluate for 100 episodes
    obs, info = env.reset()
    episode_reward = 0
    done = False
    while not done:
        action = agent.get_action(obs)
        obs, reward, terminated, truncated, info = env.step(action)
        episode_reward += reward
        done = terminated or truncated
    total_rewards.append(episode_reward)

print(f"Average reward over 100 evaluation episodes: {np.mean(total_rewards)}")
