## Lab05: Introduction to Partial Observability 

In this lab, we will explore some most basic ideas that are classic in partial observable environments and try to deploy that knowledge onto simple games such as lemonade. The lab is consisted of two parts. In the first one you will learn about POMDP and belief states update, while in the second one you will learn to implement a simple recurrent Q-learning agent. 

In a Partially Observable Markov Decision Process(POMDP), the agent does not directly observe the underlying true state of the environment but instead makes an observation o at each timestep. The probability of observing o given an action a and transitioned state s' is denoted by O(o|a, s'). And the transition function between the true states is given by T(s'|s, a). In this lab, we will focus on discrete states, and so a suitable inference known as recursive Bayesian estimation can be used to maintain ::our belief:: about the current states we are in. When the state and observation spaces are finite, we can use maintain a categorical distribution of length |S| and we call it a 'belief vector'. Sometimes it is also referred to as a probability simplex.

Theorem:
The belief updating function, given a fixed observation function O(o|a,s), a transition function T and a previous belief state b, is given by b'(s') = O(o|a,s') $\sum_s $ T(s'|s,a)b(s). 

**TO-DO: Try to derive this formula by yourself!**
\begin{align*}
b'(s') &= P(s'|b,a,o) \\
       &= P(o|b,a,s')P(s'|b,a) \\
       &= O(o|a,s')P(s'|b,a) \\
       &=  \\
       &= O(o|a,s') \sum_s T(s'|s,a)b(s)
\end{align*}


Then, the belief vector is dived by a constant N so that it's normalized as a proper simplex.

#### Part I: model for a simple fixed opponent in Lemonade ####

In the previous lab we did the lemonade stand game, where you compete against two agents with unknown types. Now we simplify the game to be only you competing against ONE agent with a FIXED strategy. This strategy is:

The agent has two potential types: one we call ***type3*** when it will play position 3 with a probability of 70% and position 9 30%, another is ***type9*** when it plays position 9 70% of the time and position 3 30%. At the start of the game, the agent is going to be in either one of the type, and it's never going to change. 

Let's first try just using a random strategy against this agent:

In [None]:
from agt_server.agents.base_agents.lemonade_agent import LemonadeAgent
from agt_server.local_games.lemonade_arena import LemonadeArena
from agt_server.agents.test_agents.lemonade.stick_agent.my_agent import StickAgent
from agt_server.agents.test_agents.lemonade.always_stay.my_agent import ReserveAgent
from agt_server.agents.test_agents.lemonade.decrement_agent.my_agent import DecrementAgent
from agt_server.agents.test_agents.lemonade.increment_agent.my_agent import IncrementAgent

# first let's make a dummy bot that always pick position 0
# So we can have an environment of only two people competing
class DummyBot(LemonadeAgent):
    def setup(self):
        self.spot = 0
    def get_action(self):
        return self.spot
    def update(self):
        pass

# second let's code up our Mysterious agent we just described
import random
class MysteriousBot(LemonadeAgent):
    def setup(self):
        self.agent_type = random.choice(['type3', 'type9'])

        if self.agent_type == 'type3':
            self.probs = {3: 0.7, 9: 0.3}
        else:
            # type9
            self.probs = {3: 0.3, 9: 0.7}

    def get_action(self):
        # Choose an action (i.e., a position) based on the fixed probability distribution.
        positions = list(self.probs.keys())
        weights = list(self.probs.values())
        chosen_position = random.choices(positions, weights=weights, k=1)[0]
        return chosen_position
    def update(self):
        pass

class RandomAgent(LemonadeAgent):
    def setup(self):
        pass
    def get_action(self):
        return random.randint(0, 11)
    def update(self):
        pass

randomagent = RandomAgent("Random")
mysteriousagent = MysteriousBot("Mysterious")
dummyagent = DummyBot("PlaceHolder")

arena = LemonadeArena(
    num_rounds=500,
    timeout=1, 
    players=[
        randomagent,
        mysteriousagent,
        dummyagent,
    ]
)
arena.run()

Now that an intuition of how to do better in this simple game is to decide which type is your mysterious agent, type3 or type9? With the previously introduced tools(Recursive Bayesian Estimation), you can now try to infer about this information!

In [None]:
class BeliefAgent(LemonadeAgent):
    def setup(self):
        # Initialize belief vector: state 0 -> type3, state 1 -> type9.
        self.b = [0.5, 0.5]
        # Transition matrix T: since the type never changes, it is the identity.
        self.T = [[1, 0],
                  [0, 1]]
        # Observation function O: maps state index to a dict of observation probabilities.
        # For type3 (state 0): P(3)=0.7, P(9)=0.3.
        # For type9 (state 1): P(3)=0.3, P(9)=0.7.
        self.O = {
            0: {3: 0.7, 9: 0.3},
            1: {3: 0.3, 9: 0.7}
        }

    def get_action(self):
        ######### TO-DO: Find the optimial policy in expectation given the current belief state #########
        p3 = self.b[0] * self.O[0][3] + self.b[1] * self.O[1][3]
        p9 = self.b[0] * self.O[0][9] + self.b[1] * self.O[1][9]
        
        best_utility = float('-inf')
        best_action = None
        # Loop over all potential actions (0 to 11)
        for a in range(12):
            # calculate_utility should compute the expected utility for taking action `a`
            # given the predicted probabilities for agent2's behavior (p3 and p9).
            # Here we assume it returns a numeric utility value.
            utility = p3 * self.calculate_utility(a, 0, 3)[0] + p9 * self.calculate_utility(a, 0, 9)[0]
            if utility > best_utility:
                best_utility = utility
                best_action = a
        #################################################################################################
        return best_action

    def update(self):
        """
        Update the belief vector given an observation from agent1.
        `observation` should be either 3 or 9.
        
        The belief update is:
            b'(s') = O(observation | s') * b(s') / N
        where N = sum_{s''} O(observation | s'') * b(s'').
        """
        if not(self.get_opp1_last_action()):
            pass
        observation = self.get_opp1_last_action()
        new_b = [0.0, 0.0]

        ############ TO_DO: Update new_b to be your new belief state based on the observation! #########
        for s in range(2):
            # Because the transition is identity, we simply multiply the prior by the observation likelihood.
            new_b[s] = self.O[s][observation] * self.b[s]
        # Normalize the new belief so that it sums to 1.
        total = sum(new_b)
        if total > 0:
            self.b = [x / total for x in new_b]
        else:
            # Fallback: if something went wrong (total==0), reset to uniform belief.
            self.b = [0.5, 0.5]
        #################################################################################################

beliefagent = BeliefAgent("belief")
arena = LemonadeArena(
    num_rounds=500,
    timeout=1, 
    players=[
        beliefagent,
        mysteriousagent,
        dummyagent,
    ]
)
arena.run()

*** TO-DO: ***

Suppose your start with [0.5, 0.5] as your belief vector. You observed that your opponent played 3, and 9, and 3 again. What will be your belief about the type of your opponent? Work by hands. 


[ANSWER]:


#### Part II: Handle Partial Observability with Memory-based RL

In this part of the lab, you'll be implementing a recurrent Q-learning agent, with a gated recurrent unit (GRU) as the recurrent network. Let's first implement the RNN Q-function. In previous assignments, you implemented Q-functions as a linear function between state features and weights, $Q(\phi(s_t), a_t) = \mathbf{w}_a^{\intercal}\phi(s_t)$. Now we'll be using a *recurrent neural network* as our Q-function. If you need a good reference to how recurrent neural networks work, we refer you to page 370 of the [deep learning book](https://www.deeplearningbook.org/contents/rnn.html). Let $Q_{\theta} : \mathcal{O} \times \mathcal{A} \times \mathcal{H} \rightarrow \mathbb{R}^{|A|} \times \mathcal{H}$ represent our recurrent Q-function neural network, parameterized by $\theta$. Our Q-function takes as input the previous action taken, the current observation seen, and the previous hidden state as input. Here, $\mathcal{H}$ represents the space of our hidden states (normally $\mathbb{R}^d$, where $d$ is the dimension of your hidden state), which is meant to be a condensed version of our history of observations and actions. Our Q-function takes as input the observation $\mathbf{o}_t$, the previous action $a_{t - 1}$ and previous hidden state $\mathbf{h}_{t - 1}$ and outputs the Q-values of _all_ actions at time step $t$, as well as the next hidden state $\mathbf{h}_t$.

We'll be using the PyTorch library to implement our neural network. If you need a reference to implementing RNNs, check out this tutorial on [RNN classification with PyTorch](https://pytorch.org/tutorials/intermediate/char_rnn_classification_tutorial.html). If you need a reference to how GRUs work, check out the [PyTorch documentation for GRUs](https://pytorch.org/docs/stable/generated/torch.nn.GRU.html).

For a Q-Learning agent on Lemonade, refer to the my_rl_lemonade_agent.py and q_learning.py files. 


**(This Introduction is adapted from CS2951F)**

In [None]:
import random
import math
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque, namedtuple

# from my_rl_lemonade_agent import MyRLAgent
# from my_supervised_agent import MyNRLAgent

from agt_server.agents.base_agents.lemonade_agent import LemonadeAgent
from agt_server.local_games.lemonade_arena import LemonadeArena
from agt_server.agents.test_agents.lemonade.stick_agent.my_agent import StickAgent
from agt_server.agents.test_agents.lemonade.always_stay.my_agent import ReserveAgent
from agt_server.agents.test_agents.lemonade.decrement_agent.my_agent import DecrementAgent
from agt_server.agents.test_agents.lemonade.increment_agent.my_agent import IncrementAgent


# -------------------------
# DRQN (Deep Recurrent Q-Network) Model
# -------------------------
class DRQN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(DRQN, self).__init__()
        self.hidden_dim = hidden_dim
        # The LSTM processes sequences (here we use sequence length = 1 each step).
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
    
    def forward(self, x, hidden):
        # x shape: (batch, seq_len, input_dim)
        out, hidden = self.lstm(x, hidden)
        # Use the last time-step’s output
        out = out[:, -1, :]  # shape: (batch, hidden_dim)
        q_values = self.fc(out)  # shape: (batch, output_dim)
        return q_values, hidden

    def init_hidden(self, batch_size=1):
        # Initialize (h, c) to zeros
        h = torch.zeros(1, batch_size, self.hidden_dim)
        c = torch.zeros(1, batch_size, self.hidden_dim)
        return (h, c)

# -------------------------
# Simple Replay Buffer
# -------------------------
Transition = namedtuple('Transition', 
                        ('obs', 'action', 'reward', 'next_obs', 'done', 'hidden'))

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, *args):
        self.buffer.append(Transition(*args))
    
    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)
    
    def __len__(self):
        return len(self.buffer)

# -------------------------
# DRQN Agent Implementation
# -------------------------
class DRQNAgent(LemonadeAgent):
    def setup(self):
        # ----- Parameters -----
        # Number of possible spots (actions)
        self.num_positions = 12  
        # Our observation will be the concatenation of three one-hot encodings
        # (one per agent). Thus, input_dim = 36.
        self.input_dim = 36  
        self.hidden_dim = 128
        self.output_dim = self.num_positions
        
        self.gamma = 0.99           # discount factor
        self.epsilon = 1.0          # initial epsilon for epsilon-greedy
        self.epsilon_min = 0.1
        self.epsilon_decay = 0.995  # decay factor per update
        self.batch_size = 32
        self.replay_capacity = 10000

        # ----- DRQN and Optimizer -----
        self.policy_net = DRQN(self.input_dim, self.hidden_dim, self.output_dim)
        # In a full implementation you might maintain a target network updated less frequently.
        self.target_net = DRQN(self.input_dim, self.hidden_dim, self.output_dim)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()

        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=0.001)

        # ----- Replay Buffer -----
        self.replay_buffer = ReplayBuffer(self.replay_capacity)

        # ----- RNN Hidden State -----
        self.hidden = self.policy_net.init_hidden(batch_size=1)
        # We will store the last observation and action to form a transition.
        self.last_obs = None
        self.last_action = None

    def encode_observation(self):
        """
        TO-DO::: 
        Encode the most recent opponent actions as a 36-dimensional vector.
        For each opponent we use a 12-dimensional one-hot encoding.
        If no action is available, a zero vector is used.
        """
        obs = []
        ############# TO-DO: make one-hot encoding of the LAST action of all three players, put into obs use obs.extend() #########
        
        ###########################################################################################################################
        # Create a tensor of shape (1, 1, input_dim)
        obs_tensor = torch.tensor([obs], dtype=torch.float32)  # shape: (1, input_dim)
        obs_tensor = obs_tensor.unsqueeze(1)  # shape: (1, 1, input_dim), seq_len=1
        
        return obs_tensor

    def get_action(self):
        """
        Select an action using an epsilon-greedy policy based on the current observation.
        """
        obs = self.encode_observation()  # shape: (1, 1, input_dim)
        # Forward pass through the DRQN to get Q-values
        with torch.no_grad():
            q_values, self.hidden = self.policy_net(obs, self.hidden)
            
        ######################### TO-DO: Implement the Epsilon-greedy action selection ###############
        action = None
        ##############################################################################################

        # Store the observation and action for the transition.
        self.last_obs = obs  
        self.last_action = action
        return action

    def update(self):
        """
        Called at every time step. This method:
          1. Obtains the reward and next observation.
          2. Stores the transition in the replay buffer.
          3. Trains the network using a batch sampled from the replay buffer.
          4. Updates the target network and decays epsilon.
        """
        # ---- 1. Get reward and next observation ----
        reward = self.get_last_util()
        done = self.is_done() if hasattr(self, "is_done") else False
        next_obs = self.encode_observation()

        # Detach hidden state to avoid backpropagating through the entire history.
        hidden_detached = (self.hidden[0].detach(), self.hidden[1].detach())
        # ---- 2. Save the transition ----
        self.replay_buffer.push(self.last_obs, self.last_action, reward, next_obs, done, hidden_detached)

        # If the episode ended, reset the hidden state.
        if done:
            self.hidden = self.policy_net.init_hidden(batch_size=1)

        # ---- 3. Train the DRQN if enough transitions have been collected ----
        if len(self.replay_buffer) >= self.batch_size:
            transitions = self.replay_buffer.sample(self.batch_size)
            batch = Transition(*zip(*transitions))
            
            # Each stored observation is shape (1, 1, input_dim); concatenate them along the batch dimension.
            obs_batch = torch.cat(batch.obs, dim=0)       # shape: (batch, 1, input_dim)
            actions_batch = torch.tensor(batch.action, dtype=torch.long)
            rewards_batch = torch.tensor(batch.reward, dtype=torch.float32)
            next_obs_batch = torch.cat(batch.next_obs, dim=0)  # shape: (batch, 1, input_dim)
            dones_batch = torch.tensor(batch.done, dtype=torch.float32)
            
            # For training we reinitialize the hidden state for the whole batch.
            hidden_batch = self.policy_net.init_hidden(batch_size=self.batch_size)
            
            # Compute Q-values for the current observations.
            q_values, _ = self.policy_net(obs_batch, hidden_batch)
            # Select Q-values for the actions that were taken.
            state_action_values = q_values.gather(1, actions_batch.unsqueeze(1)).squeeze(1)
            
            # Compute Q-values for next observations using the target network.
            next_hidden_batch = self.target_net.init_hidden(batch_size=self.batch_size)
            next_q_values, _ = self.target_net(next_obs_batch, next_hidden_batch)
            max_next_q_values = next_q_values.max(1)[0]
            
            # Compute TD target: reward + gamma * max_next_q_value (if not done)
            expected_state_action_values = rewards_batch + (1 - dones_batch) * self.gamma * max_next_q_values
            
            # Compute loss (mean squared error).
            loss = nn.functional.mse_loss(state_action_values, expected_state_action_values.detach())
            
            # Optimize the policy network.
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            
            # ---- 4. Update epsilon and (for simplicity) update the target network every step. ----
            self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
            self.target_net.load_state_dict(self.policy_net.state_dict())

# -------------------------
# Agent Submission
# -------------------------
name = "DRQNAgent"
nrl_agent_submission = DRQNAgent(name)

# NUM_POSSIBLE_STATES = 144
# INITIAL_STATE = 0
# # Lemonade as 12 possible actions [0 - 11]
# NUM_POSSIBLE_ACTIONS = 12
# LEARNING_RATE = 0.05
# DISCOUNT_FACTOR = 0.90
# EXPLORATION_RATE = 0.05
# ################### SUBMISSION #####################
# Q_agent1 = MyRLAgent("Q1", NUM_POSSIBLE_STATES, NUM_POSSIBLE_ACTIONS,
#                                    INITIAL_STATE, LEARNING_RATE, DISCOUNT_FACTOR, EXPLORATION_RATE, False, "my-qtable.npy")
# Q_agent2 = MyRLAgent("Q2", NUM_POSSIBLE_STATES, NUM_POSSIBLE_ACTIONS,
#                                    INITIAL_STATE, LEARNING_RATE, DISCOUNT_FACTOR, EXPLORATION_RATE, False, "my-qtable.npy")
if __name__ == "__main__":
    arena = LemonadeArena(
        num_rounds=1000,
        timeout=1,
        players=[
            nrl_agent_submission,
            # ReserveAgent("reserve"),
            DecrementAgent("decrese"),
            StickAgent("stick"),
        ]
    )
    arena.run()


You should Observe that your DRQN agent starts colluding with the stick agent by play 0 or 1 most often!