# Assignment 1 Question 1

# First Visit Approach - Monte Carlo Algorithm

In [60]:
from typing import Dict, List, Optional, Tuple
import gymnasium as gym
import numpy as np


class TSP(gym.Env):
    """Traveling Salesman Problem (TSP) RL environment for persistent monitoring."""

    def __init__(self, num_targets: int, max_area: int = 30, seed: int = None) -> None:
        super().__init__()
        if seed is not None:
            np.random.seed(seed=seed)

        self.steps: int = 0
        self.num_targets: int = num_targets
        self.max_steps: int = num_targets
        self.max_area: int = max_area

        self.locations: np.ndarray = self._generate_points(self.num_targets)
        self.distances: np.ndarray = self._calculate_distances(self.locations)

        self.obs_low = self._build_obs_low()
        self.obs_high = self._build_obs_high()

        self.observation_space = gym.spaces.Box(low=self.obs_low, high=self.obs_high)
        self.action_space = gym.spaces.Discrete(self.num_targets)

    def _build_obs_low(self) -> np.ndarray:
        return np.concatenate(
            [
                np.array([0], dtype=np.float32),
                np.zeros(self.num_targets, dtype=np.float32),
                np.zeros(2 * self.num_targets, dtype=np.float32),
            ]
        )

    def _build_obs_high(self) -> np.ndarray:
        return np.concatenate(
            [
                np.array([self.num_targets], dtype=np.float32),
                2 * self.max_area * np.ones(self.num_targets, dtype=np.float32),
                self.max_area * np.ones(2 * self.num_targets, dtype=np.float32),
            ]
        )

    def reset(
        self,
        *,
        seed: Optional[int] = None,
        options: Optional[dict] = None,
        init_loc: Optional[int] = 0,
    ) -> Tuple[np.ndarray, Dict[str, None]]:
        self.steps = 0
        self.loc = init_loc
        self.visited_targets = []
        self.dist = self.distances[self.loc]

        state = self._get_state()
        return state, {}

    def _get_state(self) -> np.ndarray:
        return np.concatenate(
            (
                np.array([self.loc]),
                np.array(self.dist),
                np.array(self.locations).reshape(-1),
            ),
            dtype=np.float32,
        )

    def step(
        self, action: int
    ) -> Tuple[np.ndarray, float, bool, bool, Dict[str, None]]:
        self.steps += 1
        past_loc = self.loc
        next_loc = action

        reward = self._get_rewards(past_loc, next_loc)
        self.visited_targets.append(next_loc)

        self.loc, self.dist = next_loc, self.distances[next_loc]
        terminated = self.steps == self.max_steps
        truncated = False

        next_state = self._get_state()
        return next_state, reward, terminated, truncated, {}

    def _generate_points(self, num_points: int) -> np.ndarray:
        points = []
        while len(points) < num_points:
            x, y = np.random.random() * self.max_area, np.random.random() * self.max_area
            if [x, y] not in points:
                points.append([x, y])
        return np.array(points)

    def _calculate_distances(self, locations: List) -> np.ndarray:
        n = len(locations)
        distances = np.zeros((n, n))
        for i in range(n):
            for j in range(n):
                distances[i, j] = np.linalg.norm(locations[i] - locations[j])
        return distances

    def _get_rewards(self, past_loc: int, next_loc: int) -> float:
        if next_loc not in self.visited_targets:
            return -self.distances[past_loc][next_loc]
        return -10000  # Penalize revisits


def train_tsp(env: TSP, max_episodes: int, max_steps: int, gamma: float) -> Tuple[Dict, List[float]]:
    policy, Qvalue, Returns = {}, {}, {}
    ep_rets = []

    for ep in range(max_episodes):
        obs, _ = env.reset(init_loc=env.action_space.sample())
        action = env.action_space.sample()
        episode = []

        for step in range(max_steps):
            next_obs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            episode.append((obs, action, reward))

            obs = next_obs
            action = policy.get(env.loc, env.action_space.sample())

            if done:
                break

        G = update_policy(episode, gamma, policy, Qvalue, Returns, env)
        ep_rets.append(G)

        print(f"Episode {ep}: {G}")

    return policy, ep_rets


def update_policy(
    episode: List[Tuple[np.ndarray, int, float]],
    gamma: float,
    policy: Dict,
    Qvalue: Dict,
    Returns: Dict,
    env: TSP,
) -> float:
    G = 0
    for obs_, action, reward in reversed(episode):
        G = reward + gamma * G
        state_action_key = (env.loc, action)

        if state_action_key not in Returns:
            Returns[state_action_key] = []
        Returns[state_action_key].append(G)

        Qvalue[state_action_key] = np.mean(Returns[state_action_key])

        best_action = max(
            range(env.action_space.n),
            key=lambda a: Qvalue.get((env.loc, a), float('-inf')),
        )
        policy[env.loc] = best_action
    return G


def test_policy(env: TSP, policy: Dict) -> None:
    for i in range(env.num_targets):
        action = policy[env.loc]
        obs_, reward, terminated, truncated, info = env.step(action)
        print(f"Go to {action}")


if __name__ == "__main__":
    num_targets = 6
    max_episodes = 15000
    max_steps = 10
    gamma = 0.9

    env = TSP(num_targets)
    policy, ep_rets = train_tsp(env, max_episodes, max_steps, gamma)

    print(f"Average return: {np.mean(ep_rets)}")

    print("\nTesting policy:")
    test_policy(env, policy)


Episode 0: -12541.32127968855
Episode 1: -36879.79571067834
Episode 2: -19800.842484646742
Episode 3: -27889.099458173405
Episode 4: -19778.44936113769
Episode 5: -20601.495049019875
Episode 6: -19795.205663916986
Episode 7: -19788.19555938526
Episode 8: -36868.26457727338
Episode 9: -36862.63200724004
Episode 10: -19804.596855469386
Episode 11: -8151.592209766093
Episode 12: -21333.135990177616
Episode 13: -21987.900255253266
Episode 14: -36884.078686447945
Episode 15: -14949.36106332921
Episode 16: -36879.08642692344
Episode 17: -19792.674506641528
Episode 18: -5965.852786050081
Episode 19: -27893.65671422015
Episode 20: -27869.797446886296
Episode 21: -15417.633743155217
Episode 22: -12500.926087035408
Episode 23: -5963.4031578864415
Episode 24: -74.41426657159741
Episode 25: -5953.727844602827
Episode 26: -27881.535290495034
Episode 27: -19804.608628143284
Episode 28: -19804.608628143284
Episode 29: -12517.968000434623
Episode 30: -27888.00805212724
Episode 31: -12504.620136574258


In [61]:
# Every visit Approach

# Every visit Approach Monte Carlo

In [63]:
from typing import Dict, List, Optional, Tuple

import gymnasium as gym
import numpy as np


class TSP(gym.Env):
    """Traveling Salesman Problem (TSP) RL environment for persistent monitoring."""

    def __init__(self, num_targets: int, max_area: int = 30, seed: int = None) -> None:
        """Initialize the TSP environment.

        Args:
            num_targets (int): Number of targets the agent needs to visit.
            max_area (int): Maximum area size within which targets are located.
            seed (int, optional): Random seed for reproducibility. Defaults to None.
        """
        super().__init__()
        if seed is not None:
            np.random.seed(seed=seed)

        self.steps = 0
        self.num_targets = num_targets
        self.max_steps = num_targets
        self.max_area = max_area

        self.locations = self._generate_points(num_targets)
        self.distances = self._calculate_distances(self.locations)

        self.obs_low = np.concatenate([
            np.array([0], dtype=np.float32),
            np.zeros(self.num_targets, dtype=np.float32),
            np.zeros(2 * self.num_targets, dtype=np.float32),
        ])

        self.obs_high = np.concatenate([
            np.array([self.num_targets], dtype=np.float32),
            2 * self.max_area * np.ones(self.num_targets, dtype=np.float32),
            self.max_area * np.ones(2 * self.num_targets, dtype=np.float32),
        ])

        self.observation_space = gym.spaces.Box(low=self.obs_low, high=self.obs_high)
        self.action_space = gym.spaces.Discrete(self.num_targets)

    def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None, init_loc: Optional[int] = 0) -> Tuple[np.ndarray, Dict[str, None]]:
        """Reset the environment to its initial state."""
        self.steps = 0
        self.loc = init_loc
        self.visited_targets = []
        self.dist = self.distances[self.loc]

        state = np.concatenate((
            np.array([self.loc]),
            np.array(self.dist),
            np.array(self.locations).reshape(-1),
        ), dtype=np.float32)

        return state, {}

    def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict[str, None]]:
        """Execute an action to move to the next target."""
        self.steps += 1
        past_loc, next_loc = self.loc, action

        reward = self._get_rewards(past_loc, next_loc)
        self.visited_targets.append(next_loc)

        next_dist = self.distances[next_loc]
        terminated = bool(self.steps == self.max_steps)
        truncated = False

        next_state = np.concatenate([
            np.array([next_loc]),
            next_dist,
            np.array(self.locations).reshape(-1),
        ], dtype=np.float32)

        self.loc, self.dist = next_loc, next_dist
        return next_state, reward, terminated, truncated, {}

    def _generate_points(self, num_points: int) -> np.ndarray:
        """Generate random 2D points representing target locations."""
        points = []
        while len(points) < num_points:
            x, y = np.random.random() * self.max_area, np.random.random() * self.max_area
            if [x, y] not in points:
                points.append([x, y])
        return np.array(points)

    def _calculate_distances(self, locations: List) -> np.ndarray:
        """Calculate the pairwise distance matrix for all locations."""
        n = len(locations)
        distances = np.zeros((n, n))
        for i in range(n):
            for j in range(n):
                distances[i, j] = np.linalg.norm(locations[i] - locations[j])
        return distances

    def _get_rewards(self, past_loc: int, next_loc: int) -> float:
        """Return the reward based on the distance or penalize for repeated visits."""
        if next_loc not in self.visited_targets:
            return -self.distances[past_loc][next_loc]
        else:
            return -10000


if __name__ == "__main__":
    num_targets = 6
    max_episodes = 12000
    max_steps = 10

    env = TSP(num_targets)
    ep_rets, policy, Qvalue, Returns = [], {}, {}, {}
    gamma = 0.9

    # Training Loop
    for ep in range(max_episodes):
        ret, episode = 0, []
        obs_, _ = env.reset(init_loc=env.action_space.sample())
        action = env.action_space.sample()

        for step in range(max_steps):
            next_obs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            episode.append((obs_, action, reward))
            obs_ = next_obs
            action = policy.get(env.loc, env.action_space.sample())

            if done:
                break

        G = 0
        visited_state_actions = set()

        # Every-visit Monte Carlo Update
        for obs_, action, reward in reversed(episode):
            G = reward + gamma * G

            state_action_key = (env.loc, action)
            if state_action_key not in visited_state_actions:
                visited_state_actions.add(state_action_key)

                Returns.setdefault(state_action_key, []).append(G)
                Qvalue[state_action_key] = np.mean(Returns[state_action_key])

                best_action = max([a for a in range(env.action_space.n) if a != state_action_key[0]],
                                  key=lambda a: Qvalue.get((env.loc, a), float('-inf')))
                policy[env.loc] = best_action

        ep_rets.append(G)
        print(f"Episode {ep} : {G}")

    print(f"\n\nAverage Episodic return: {np.mean(ep_rets)}")
    print("\n\nThe Optimal Policy after Monte Carlo First Visit Approach")
    print(policy)

    # Testing
    env.reset()
    for i in range(num_targets):
        action = policy[env.loc]
        obs_, reward, terminated, truncated, info = env.step(action)
        print(f"Go to {action}")



Episode 0 : -14974.969535875794
Episode 1 : -94.96245620180164
Episode 2 : -6665.65184564946
Episode 3 : -7401.589703290318
Episode 4 : -7361.0515217684615
Episode 5 : -19818.072102052916
Episode 6 : -5984.568877898646
Episode 7 : -19829.145213918146
Episode 8 : -13251.42204662537
Episode 9 : -131.7300798206674
Episode 10 : -19832.652781649394
Episode 11 : -5998.957736179149
Episode 12 : -19821.403406180907
Episode 13 : -27873.70919453561
Episode 14 : -27903.396926834757
Episode 15 : -19819.60163953508
Episode 16 : -119.20215521428378
Episode 17 : -19799.147079916733
Episode 18 : -12545.95238683127
Episode 19 : -12540.31585376415
Episode 20 : -12515.08533365847
Episode 21 : -105.02664431154705
Episode 22 : -5994.6907939114435
Episode 23 : -12548.570626281375
Episode 24 : -12542.835564064713
Episode 25 : -12566.504340984564
Episode 26 : -12559.852170978447
Episode 27 : -19826.063269942802
Episode 28 : -19825.835971936576
Episode 29 : -27897.332959619434
Episode 30 : -19823.179566548657


In [64]:
# Dynamic Programming Approach

# Dynamic Programming Approach

In [65]:
import numpy as np
from typing import Dict, Tuple

class TSP:
    """Traveling Salesman Problem (TSP) RL environment for persistent monitoring."""

    def __init__(self, num_targets: int, max_area: int = 30, seed: int = None) -> None:
        if seed is not None:
            np.random.seed(seed=seed)

        self.num_targets: int = num_targets
        self.max_area: int = max_area

        self.locations: np.ndarray = self._generate_points(self.num_targets)
        self.distances: np.ndarray = self._calculate_distances(self.locations)

    def _generate_points(self, num_points: int) -> np.ndarray:
        """Generate random 2D points representing target locations."""
        points = []
        while len(points) < num_points:
            x = np.random.random() * self.max_area
            y = np.random.random() * self.max_area
            if [x, y] not in points:
                points.append([x, y])
        return np.array(points)

    def _calculate_distances(self, locations: np.ndarray) -> np.ndarray:
        """Calculate the distance matrix between all target locations."""
        n = len(locations)
        distances = np.zeros((n, n))
        for i in range(n):
            for j in range(n):
                distances[i, j] = np.linalg.norm(locations[i] - locations[j])
        return distances

    def solve_tsp_value_iteration(self, gamma: float = 0.9, max_iterations: int = 10000, theta: float = 1e-6) -> Tuple[Dict, np.ndarray]:
        """Solve TSP using Value Iteration.
        
        Args:
            gamma (float): Discount factor for future rewards.
            max_iterations (int): Maximum number of iterations for value iteration.
            theta (float): Convergence threshold.

        Returns:
            Tuple[Dict, np.ndarray]: Optimal policy and the value function.
        """
        # Number of states: (location, visited_mask)
        V = np.zeros((2 ** self.num_targets, self.num_targets))
        policy = {}

        for iteration in range(max_iterations):
            delta = 0
            for visited_mask in range(2 ** self.num_targets):
                for loc in range(self.num_targets):
                    if visited_mask & (1 << loc):  # If already visited
                        continue

                    v = V[visited_mask][loc]
                    best_value = float('inf')
                    
                    for next_loc in range(self.num_targets):
                        if visited_mask & (1 << next_loc):  # Skip already visited locations
                            continue
                        
                        # Compute next state
                        next_visited_mask = visited_mask | (1 << next_loc)
                        next_value = -self.distances[loc][next_loc] + gamma * V[next_visited_mask][next_loc]
                        
                        best_value = min(best_value, next_value)
                    
                    V[visited_mask][loc] = best_value
                    delta = max(delta, abs(v - best_value))

            if delta < theta:
#                 print(f"Value Iteration converged after {iteration + 1} iterations.")
                break

        # Extract policy from value function
        for visited_mask in range(2 ** self.num_targets):
            for loc in range(self.num_targets):
                if visited_mask & (1 << loc):  # If already visited
                    continue
                
                best_action = None
                best_value = float('inf')
                
                for next_loc in range(self.num_targets):
                    if visited_mask & (1 << next_loc):
                        continue
                    
                    next_visited_mask = visited_mask | (1 << next_loc)
                    next_value = -self.distances[loc][next_loc] + gamma * V[next_visited_mask][next_loc]
                    
                    if next_value < best_value:
                        best_value = next_value
                        best_action = next_loc
                
                policy[(visited_mask, loc)] = best_action

        return policy, V

if __name__ == "__main__":
    num_targets = 6
    env = TSP(num_targets)

    # Solve TSP using value iteration
    optimal_policy, value_function = env.solve_tsp_value_iteration()

    print("Optimal Policy:", optimal_policy)

    # Testing the policy
    current_loc = 0
    visited_mask = 0
    total_reward = 0

    print("\nTesting the learned policy:")
    while visited_mask != (1 << num_targets) - 1:  # Until all locations are visited
        try:
            action = optimal_policy[(visited_mask, current_loc)]
            if action is None:
                raise KeyError
        except KeyError:
            # If the state isn't in the policy, choose the closest unvisited location
            unvisited_locs = [loc for loc in range(num_targets) if not (visited_mask & (1 << loc))]
            action = min(unvisited_locs, key=lambda loc: env.distances[current_loc][loc])

        print(f"At location {current_loc}, going to {action}")
        
        visited_mask |= (1 << action)
        total_reward += -env.distances[current_loc][action]
        current_loc = action
    
    print(f"Total reward (negative of total distance): {total_reward}")


Value Iteration converged after 2 iterations.
Optimal Policy: {(0, 0): 3, (0, 1): 3, (0, 2): 3, (0, 3): 5, (0, 4): 3, (0, 5): 3, (1, 1): 3, (1, 2): 3, (1, 3): 5, (1, 4): 3, (1, 5): 3, (2, 0): 3, (2, 2): 3, (2, 3): 5, (2, 4): 3, (2, 5): 3, (3, 2): 3, (3, 3): 5, (3, 4): 3, (3, 5): 3, (4, 0): 3, (4, 1): 3, (4, 3): 5, (4, 4): 3, (4, 5): 3, (5, 1): 3, (5, 3): 5, (5, 4): 3, (5, 5): 3, (6, 0): 3, (6, 3): 5, (6, 4): 3, (6, 5): 3, (7, 3): 5, (7, 4): 3, (7, 5): 3, (8, 0): 4, (8, 1): 4, (8, 2): 5, (8, 4): 1, (8, 5): 1, (9, 1): 4, (9, 2): 5, (9, 4): 1, (9, 5): 1, (10, 0): 4, (10, 2): 5, (10, 4): 0, (10, 5): 2, (11, 2): 5, (11, 4): 2, (11, 5): 2, (12, 0): 4, (12, 1): 4, (12, 4): 1, (12, 5): 1, (13, 1): 4, (13, 4): 1, (13, 5): 1, (14, 0): 4, (14, 4): 0, (14, 5): 0, (15, 4): 5, (15, 5): 4, (16, 0): 3, (16, 1): 3, (16, 2): 3, (16, 3): 5, (16, 5): 3, (17, 1): 3, (17, 2): 3, (17, 3): 5, (17, 5): 3, (18, 0): 3, (18, 2): 3, (18, 3): 5, (18, 5): 3, (19, 2): 3, (19, 3): 5, (19, 5): 3, (20, 0): 3, (20, 1): 3