In [1]:


import numpy as np
import random
import matplotlib.pyplot as plt
from collections import deque
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from tqdm import trange

# global vars
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED) 

# Environment class
class GridWorldEnv:
    def __init__(self, relative_ignorability=True, grid_size=2):
        self.relative_ignorability = relative_ignorability
        self.actions= ['right', 'down', 'left', 'up']
        self.action_to_delta = {'right': (0, 1), 'down': (1, 0), 'left': (0, -1), 'up': (-1, 0)}
        self.action_space = len(self.actions)
        self.grid_size=grid_size
        self.obs_space = 2  # x, y position

    def reset(self):
        self.pos = (0, 0)
        self.mode = np.random.choice([0, 1])  # Relatively ignorable hidden variable
        
        return np.array(self.pos)

    def step(self, action_idx):
        action = self.actions[action_idx]
        dx, dy = self.action_to_delta[action]

        

        new_x = np.clip(self.pos[0] + dx, 0, self.grid_size - 1)
        new_y = np.clip(self.pos[1] + dy, 0, self.grid_size - 1)
        self.pos = (new_x, new_y)

        reward = -0.1
        done = False
        if self.pos == (0, grid_size):  # Goal
            reward = (10 if self.mode == 0 else 8) if self.relative_ignorability else (-10 if self.mode == 0 else 10)
            #reward = np.random.normal(loc=reward_mean, scale=1, size=1)
            done = True
        elif self.pos == (grid_size, grid_size):  # Trap
            reward = -10 if self.relative_ignorability else (10 if self.mode == 0 else -10)
            #reward = np.random.normal(loc=reward_mean, scale=1, size=1)
            done = True

        return np.array(self.pos), reward, done

# Simple DQN
class DQN(nn.Module):
    def __init__(self, input_size, output_size):
        super(DQN, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_size, 64),
            nn.ReLU(),
            nn.Linear(64,output_size)
        )

    def forward(self, x):
        return self.fc(x)

# Training loop
def train_dqn(env, episodes=2000, seed=42, gamma=0.95,batch_size=64, epsilon_min = 0.2, epsilon_decay=0.99995, lr=0.0001):
    model = DQN(input_size=env.obs_space, output_size=env.action_space)
    target_model = DQN(input_size=env.obs_space, output_size=env.action_space)
    target_model.load_state_dict(model.state_dict())
    optimizer = optim.Adam(model.parameters(), lr=lr)
    memory = deque(maxlen=10000)
    #gamma = gamma
    #batch_size = 64
    epsilon = 1.0
    #epsilon_min = 0.2
    #epsilon_decay = 0.99995

    rewards = []

    for ep in range(episodes):
        state = torch.tensor(env.reset(), dtype=torch.float32)
        total_reward = 0
        done = False

        while not done:
            if random.random() < epsilon:
                action = np.random.randint(env.action_space)
            else:
                with torch.no_grad():
                    q_vals = model(state.unsqueeze(0))
                action = torch.argmax(q_vals).item()

            next_state_np, reward, done = env.step(action)
            next_state = torch.tensor(next_state_np, dtype=torch.float32)
            memory.append((state, action, reward, next_state, done))
            state = next_state
            total_reward += reward

            if len(memory) >= batch_size:
                batch = random.sample(memory, batch_size)
                states, actions, rewards_batch, next_states, dones = zip(*batch)

                states = torch.stack(states)
                actions = torch.tensor(actions)
                rewards_batch = torch.tensor(rewards_batch)
                next_states = torch.stack(next_states)
                dones = torch.tensor(dones, dtype=torch.float32)

                q_vals = model(states).gather(1, actions.unsqueeze(1)).squeeze()
                with torch.no_grad():
                    max_next_q = target_model(next_states).max(dim=1)[0]
                targets = rewards_batch + gamma * max_next_q * (1 - dones)

                loss = F.mse_loss(q_vals, targets)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        if ep % 10 == 0:
            target_model.load_state_dict(model.state_dict())

        epsilon = max(epsilon_min, epsilon * epsilon_decay)
        rewards.append(total_reward)

    return rewards




In [2]:


# POMDP GridWorld
class GridWorldPOMDP:
    def __init__(self, relative_ignorability=True, grid_size=2):
        self.relative_ignorability = relative_ignorability
        self.grid_size=grid_size
        self.actions= ['right', 'down', 'left', 'up']
        self.action_to_delta = {'right': (0, 1), 'down': (1, 0), 'left': (0, -1), 'up': (-1, 0)}
        self.action_space = len(self.actions)
        self.grid_size=grid_size
        self.obs_space = 3 #x, y, belief (latent "mode")

    def reset(self):
        self.pos = (0, 0)
        self.mode = np.random.choice([0, 1])  # hidden variable
        self.observed_variable  = np.random.normal(loc=self.mode)
        self.belief_mode = 0.5  # initial belief over hidden variable
        return np.array([*self.pos, self.belief_mode], dtype=np.float32)

    def step(self, action_idx):
        action = self.actions[action_idx]
        dx, dy = self.actions_to_delta[action]
        new_x = np.clip(self.pos[0] + dx, 0, self.grid_size - 1)
        new_y = np.clip(self.pos[1] + dy, 0, self.grid_size - 1)
        self.pos = (new_x, new_y)
        
        reward = -0.1
        done = False
        if self.pos == (0, self.grid_size):  # Goal
            reward = (10 if self.mode == 0 else 9) if self.relative_ignorability else (-10 if self.mode == 0 else 10)
            done = True
        elif self.pos == (self.grid_size, self.grid_size):  # Trap
            reward = -10 if self.relative_ignorability else (10 if self.mode == 0 else -10)
            done = True

        # observe new evidence
        self.observed_variable = np.random.normal(loc=self.mode, scale=0.15)
    
        # update belief based on new observation
        self.update_belief(obs=self.observed_variable)
    
        return np.array([*self.pos, self.belief_mode], dtype=np.float32), reward, done

    def update_belief(self, obs):
        p1 = np.exp(-(obs - 1)**2 / 2)
        p0 = np.exp(-(obs - 0)**2 / 2)
        prior = self.belief_mode
        self.belief_mode = (p1 * prior) / (p1 * prior + p0 * (1 - prior) + 1e-8)
        

# Neural Net with belief input
class BeliefDQN(nn.Module):
    def __init__(self, input_size, output_size):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_size, 64), 
            nn.ReLU(),
            nn.Linear(64, output_size)
        )

    def forward(self, x):
        return self.fc(x)

# Training function
def train_pomdp_agent(env, episodes=2000, batch_size=64,seed=42, gamma=0.95, epsilon_decay=0.99995, epsilon_min=0.2, lr=0.0001):
    
    model = BeliefDQN(input_size=env.obs_space, output_size=env.action_space)
    target_model = BeliefDQN()
    target_model.load_state_dict(model.state_dict())
    optimizer = optim.Adam(model.parameters(), lr=lr)

    memory = deque(maxlen=10000)

    epsilon = 1.0
    
    rewards = []

    for ep in trange(episodes):
        state = torch.tensor(env.reset(), dtype=torch.float32)
        total_reward = 0
        done = False

        while not done:
            if random.random() < epsilon:
                action = np.random.randint(env.action_space)
            else:
                with torch.no_grad():
                    q_vals = model(state.unsqueeze(0))
                action = torch.argmax(q_vals).item()

            next_obs, reward, done = env.step(action)
            
            next_obs[2] = env.belief_mode  # update belief component
            next_state = torch.tensor(next_obs, dtype=torch.float32)

            memory.append((state, action, reward, next_state, done))
            state = next_state
            total_reward += reward

            if len(memory) >= batch_size:
                batch = random.sample(memory, batch_size)
                states, actions, rewards_batch, next_states, dones = zip(*batch)

                states = torch.stack(states)
                actions = torch.tensor(actions)
                rewards_batch = torch.tensor(rewards_batch)
                next_states = torch.stack(next_states)
                dones = torch.tensor(dones, dtype=torch.float32)

                q_vals = model(states).gather(1, actions.unsqueeze(1)).squeeze()
                with torch.no_grad():
                    max_next_q = target_model(next_states).max(1)[0]
                targets = rewards_batch + gamma * max_next_q * (1 - dones)

                loss = F.mse_loss(q_vals, targets)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        if ep % 10 == 0:
            target_model.load_state_dict(model.state_dict())

        epsilon = max(epsilon_min, epsilon * epsilon_decay)
        rewards.append(total_reward)

    return rewards





In [None]:
M = 3
episodes = 1000
seed_range = range(M*350)
seeds = [random.randint(0,7392479) for _ in range(M)]
m=0
gamma = 0.9
batch_size = 32
epsilon = 1.0
epsilon_min = 0.05
epsilon_decay = 0.995
lr=0.001
gamma=0.9
grid_size = 5

resultsarray = np.empty((M, episodes, 2, 2))
for m in range(m,M):
    print(m)
    rewards_ri = train_dqn(GridWorldEnv(relative_ignorability=True), episodes=episodes, seed=seeds[m], lr=lr, epsilon_min=epsilon_min,gamma=gamma, epsilon_decay=epsilon_decay)
    
    rewards_non_ri = train_dqn(GridWorldEnv(relative_ignorability=False), episodes=episodes, seed=seeds[m], lr=lr, epsilon_min=epsilon_min,gamma=gamma, epsilon_decay=epsilon_decay)
    
    
    rewards_pomdp_ri = train_pomdp_agent(GridWorldPOMDP(relative_ignorability=True), episodes= episodes, seed=seeds[m], lr=lr, epsilon_min=epsilon_min,gamma=gamma, epsilon_decay=epsilon_decay)
    rewards_pomdp_nonri = train_pomdp_agent(GridWorldPOMDP(relative_ignorability=False), episodes=episodes, seed=seeds[m], lr=lr, epsilon_min=epsilon_min,gamma=gamma, epsilon_decay=epsilon_decay)

    resultsarray[m,:,0,0] = rewards_ri
    resultsarray[m,:,0,1] = rewards_non_ri
    resultsarray[m,:,1,0] = rewards_pomdp_ri
    resultsarray[m,:,1,1] = rewards_pomdp_nonri

0


In [None]:
import pandas as pd
resultsavg = np.nanmean(resultsarray, axis=0)
# Compute rolling average
window = 50
smoothed = np.empty_like(resultsavg)
for i in range(resultsavg.shape[1]):  # RI vs Non-RI
    for j in range(resultsavg.shape[2]):  # Vanilla vs POMDP
        smoothed[:, i, j] = pd.Series(resultsavg[:, i, j]).rolling(window=window, min_periods=1).mean()

# Plot
plt.figure(figsize=(10, 6))
plt.plot(smoothed[10:, 0, 0], label="RI Mode + Vanilla Q-learning", color="green")
plt.plot(smoothed[10:, 0, 1], label="Non-RI Mode + Vanilla Q-learning", color="green", linestyle="--")
plt.plot(smoothed[10:, 1, 0], label="RI Mode + POMDP Q-learning", color = "black")
plt.plot(smoothed[10:, 1, 1], label="Non-RI Mode + POMDP Q-learning", color="black",linestyle="--")
plt.xlabel("Episode")
plt.ylabel("Smoothed Total Reward")
plt.title("Smoothed Deep Q-Learning Performance\nUnder Relative Ignorability vs. Non-Ignorability")
plt.legend()
plt.grid(True)
plt.tight_layout()
#plt.show()
plt.savefig("reward_curves.png")

In [None]:
np.save('resultsarray.npy', resultsarray)

#resultsarray = np.load("/kaggle/input/relative-ignorability-simulation-results/resultsarray.npy")
#resultsavg[1000:1005,0,0]