In [None]:
import numpy as np
import gymnasium as gym
import gymnasium_robotics
import matplotlib.pyplot as plt
import time
from collections import defaultdict

gym.register_envs(gymnasium_robotics)

class DiscretizedFetchPickAndPlace:
    def __init__(self, bins=5, render_mode=None):
        self.env = gym.make("FetchPickAndPlace-v3", render_mode=render_mode)
        self.bins = bins
        self.action_bins = 3
        self.num_actions = self.action_bins ** 4
        obs, _ = self.env.reset()
        self.observation_keys = list(obs.keys())
        
    def reset(self, seed=None):
        obs, info = self.env.reset(seed=seed)
        return self._discretize_observation(obs), info
    
    def step(self, discrete_action):
        continuous_action = self._discrete_to_continuous_action(discrete_action)
        obs, reward, terminated, truncated, info = self.env.step(continuous_action)
        return self._discretize_observation(obs), reward, terminated, truncated, info
    
    def _discretize_observation(self, obs):
        grip_pos = obs['observation'][:3]
        object_pos = obs['observation'][3:6]
        target_pos = obs['desired_goal']
        grip_to_obj = np.linalg.norm(grip_pos - object_pos)
        obj_to_target = np.linalg.norm(object_pos - target_pos)
        discrete_grip_obj = self._discretize_value(grip_to_obj, 0, 1.0, self.bins)
        discrete_obj_target = self._discretize_value(obj_to_target, 0, 1.0, self.bins)
        discrete_gripper_pos = tuple(self._discretize_value(grip_pos[i], -1, 1, self.bins) for i in range(3))
        state = (discrete_grip_obj, discrete_obj_target) + discrete_gripper_pos
        return state
    
    def _discretize_value(self, value, min_val, max_val, bins):
        bin_size = (max_val - min_val) / bins
        discretized = int((value - min_val) / bin_size)
        return min(bins - 1, max(0, discretized))
    
    def _discrete_to_continuous_action(self, discrete_action):
        actions = []
        temp = discrete_action
        for _ in range(4):
            actions.insert(0, temp % self.action_bins)
            temp = temp // self.action_bins
        continuous_action = np.zeros(4)
        for i in range(4):
            if i < 3:
                continuous_action[i] = -1.0 + (actions[i] * 2.0 / (self.action_bins - 1))
            else:
                continuous_action[i] = 0.0 if actions[i] == 0 else 1.0
        return continuous_action
    
    def close(self):
        self.env.close()

class MonteCarloAgent:
    def __init__(self, env, epsilon=0.2, gamma=0.95):
        self.env = env
        self.epsilon = epsilon
        self.gamma = gamma
        self.num_actions = env.num_actions
        self.Q = defaultdict(lambda: np.zeros(self.num_actions))
        self.returns_count = defaultdict(lambda: np.zeros(self.num_actions))
        self.returns_sum = defaultdict(lambda: np.zeros(self.num_actions))
        self.policy = defaultdict(lambda: np.ones(self.num_actions) / self.num_actions)
    
    def select_action(self, state, exploring=True):
        if exploring and np.random.random() < self.epsilon:
            return np.random.randint(self.num_actions)
        else:
            return np.argmax(self.Q[state])
    
    def update_policy(self, state):
        best_action = np.argmax(self.Q[state])
        for a in range(self.num_actions):
            self.policy[state][a] = 1.0 if a == best_action else 0.0
    
    def generate_episode(self, max_steps=50, exploring=True):
        episode = []
        state, _ = self.env.reset()
        for _ in range(max_steps):
            action = self.select_action(state, exploring)
            next_state, reward, terminated, truncated, _ = self.env.step(action)
            episode.append((state, action, reward))
            if terminated or truncated:
                break
            state = next_state
        return episode
    
    def monte_carlo_prediction(self, num_episodes=100):
        for _ in range(num_episodes):
            episode = self.generate_episode()
            G = 0
            visited_state_actions = set()
            for t in range(len(episode)-1, -1, -1):
                state, action, reward = episode[t]
                G = self.gamma * G + reward
                if (state, action) not in visited_state_actions:
                    visited_state_actions.add((state, action))
                    self.returns_sum[(state, action)] += G
                    self.returns_count[(state, action)] += 1
                    self.Q[state][action] = self.returns_sum[(state, action)] / self.returns_count[(state, action)]
    
    def monte_carlo_control(self, num_episodes=1000):
        episode_rewards = []
        for episode in range(num_episodes):
            episode_data = self.generate_episode(exploring=True)
            episode_rewards.append(sum(r for _, _, r in episode_data))
            if episode % 50 == 0:
                print(f"Episode {episode}/{num_episodes}, Avg Reward: {np.mean(episode_rewards[-50:] if episode > 0 else episode_rewards):.2f}")
            G = 0
            visited_state_actions = set()
            for t in range(len(episode_data)-1, -1, -1):
                state, action, reward = episode_data[t]
                G = self.gamma * G + reward
                if (state, action) not in visited_state_actions:
                    visited_state_actions.add((state, action))
                    self.returns_sum[(state, action)] += G
                    self.returns_count[(state, action)] += 1
                    self.Q[state][action] = self.returns_sum[(state, action)] / self.returns_count[(state, action)]
                    self.update_policy(state)
        return episode_rewards
    
    def evaluate(self, num_episodes=5, render=False):
        render_mode = "human" if render else None
        if render:
            self.env.close()
            eval_env = DiscretizedFetchPickAndPlace(render_mode=render_mode)
        else:
            eval_env = self.env
        total_rewards = 0
        success_count = 0
        for _ in range(num_episodes):
            state, _ = eval_env.reset()
            episode_reward = 0
            done = False
            truncated = False
            for step in range(100):
                action = self.select_action(state, exploring=False)
                next_state, reward, done, truncated, _ = eval_env.step(action)
                episode_reward += reward
                state = next_state
                if render:
                    time.sleep(0.05)
                if done or truncated:
                    if reward > 0:
                        success_count += 1
                    break
            total_rewards += episode_reward
        if render:
            eval_env.close()
        avg_reward = total_rewards / num_episodes
        success_rate = success_count / num_episodes
        return avg_reward, success_rate
    
    def plot_learning_curve(self, rewards):
        plt.figure(figsize=(10, 6))
        plt.plot(rewards)
        plt.title('Learning Curve - Monte Carlo Control')
        plt.xlabel('Episode')
        plt.ylabel('Episode Reward')
        plt.grid(True)
        plt.show()

def main():
    env = DiscretizedFetchPickAndPlace(bins=5)
    agent = MonteCarloAgent(env, epsilon=0.2, gamma=0.95)
    print("Training Monte Carlo control agent...")
    rewards = agent.monte_carlo_control(num_episodes=1000)
    agent.plot_learning_curve(rewards)
    print("\nEvaluating learned policy...")
    avg_reward, success_rate = agent.evaluate(num_episodes=10, render=True)
    print(f"Average Reward: {avg_reward:.2f}")
    print(f"Success Rate: {success_rate:.2f}")
    env.close()

if __name__ == "__main__":
    main()
