# Notebook to experiment with testing:

## Code:

In [1]:
import numpy as np
import random
import torch
from gymnasium import spaces

SEED = 42
# Python RNG
random.seed(SEED)

# NumPy RNG
np.random.seed(SEED)

# PyTorch RNG (CPU + GPU)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)
    torch.cuda.manual_seed_all(SEED)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


In [None]:
import sys
sys.path.append("/home/martina/codi2/4year/tfg")  # add parent folder of general.py

from general import prepare, testing
from training_dqn import DQNPositionalEncoding
from training_agents import DQNAgent
from training_buffers import ReplayBuffer

In [None]:
class GlioblastomaPositionalEncoding(gym.Env):
    metadata = {"render_modes": ["human"], "render_fps": 4} 

    def __init__(self, image_path, mask_path, grid_size=4, tumor_threshold=0.01, rewards = [10.0, -2.0, 2.5, -0.1], action_space=spaces.Discrete(3), max_steps=20, render_mode="human"): # cosntructor with the brain image, the mask and a size
        super().__init__()
        
        self.image = np.load(image_path).astype(np.float32)
        self.mask = np.load(mask_path).astype(np.uint8)
        
        img_min, img_max = self.image.min(), self.image.max()
        if img_max > 1.0:
            self.image = (self.image - img_min) / (img_max - img_min + 1e-8)

        self.grid_size = grid_size
        self.block_size = self.image.shape[0] // grid_size
        
        self.action_space = action_space
        self.tumor_threshold = tumor_threshold
        self.rewards = rewards
        self.render_mode = render_mode

        # UPDATED: Now 3 channels (image + 2 positional encodings)
        self.observation_space = spaces.Box(
            low=0, high=1,
            shape=(3, self.block_size, self.block_size),  # Changed from (60, 60) to (3, 60, 60)
            dtype=np.float32
        )

        self.agent_pos = [0, 0]
        self.prev_pos = None
        self.prev_prev_pos = None
        self.current_step = 0
        if max_steps == 0:
            self.max_steps = sys.maxsize
        else:
            self.max_steps = max_steps
        
    def _random_shift(self):
        pad = 20
        H, W = self.image.shape

        while True:
            canvas = np.zeros((H + 2*pad, W + 2*pad), dtype=self.image.dtype)
            canvas_mask = np.zeros_like(canvas)

            # random offset
            y_off = np.random.randint(0, 2*pad+1)
            x_off = np.random.randint(0, 2*pad+1)

            # place original image
            canvas[y_off:y_off+H, x_off:x_off+W] = self.image
            canvas_mask[y_off:y_off+H, x_off:x_off+W] = self.mask

            # crop
            new_image = canvas[pad:pad+H, pad:pad+W]
            new_mask = canvas_mask[pad:pad+H, pad:pad+W]

            # check if new_mask still contains tumor
            if np.sum(new_mask > 0) > 0:
                self.image = new_image
                self.mask = new_mask
                return

    def reset(self, seed=None, options=None, force_on_target=False, start_on_zero=False):
        super().reset(seed=seed)
        
        self._random_shift()  # Apply random shift on reset

        if start_on_zero:
            self.agent_pos = [0, 0]
        else:
            if force_on_target: # start on tumor so it can see good reward if stay
                tumor_indices = np.where(self.mask > 0)
                # Pick a random pixel within the tumor
                idx = np.random.randint(len(tumor_indices[0]))
                one = tumor_indices[0][idx]
                two = tumor_indices[1][idx]
                self.agent_pos = [one // self.block_size, two // self.block_size]
            else:
                # Standard random start
                self.agent_pos = [np.random.randint(self.grid_size), np.random.randint(self.grid_size)]
        
        self.current_step = 0
        self.prev_pos = None
        self.prev_prev_pos = None
        obs = self._get_obs()
        info = {}
        return obs, info

    def step(self, action):
        self.current_step += 1
        prev_pos = self.agent_pos.copy()    # store position BEFORE applying action
                
        if action == 0: # END episode
            reward = self._get_reward(action, prev_pos)
            if reward == self.rewards[0]: # good stop
                terminated = True  
            else:    
                terminated = True
            obs = self._get_obs()
            return obs, reward, terminated, False, {}
        
        # Apply action (respect grid boundaries)
        if self.action_space.n == 3:
            if action == 1 and self.agent_pos[0] < self.grid_size - 1: # down
                self.agent_pos[0] += 1
            elif action == 2 and self.agent_pos[1] < self.grid_size - 1: # right
                self.agent_pos[1] += 1
                
        elif self.action_space.n == 5:
            if action == 1 and self.agent_pos[0] < self.grid_size - 1: # down
                self.agent_pos[0] += 1
            elif action == 2 and self.agent_pos[1] < self.grid_size - 1: # right
                self.agent_pos[1] += 1
            elif action == 3 and self.agent_pos[0] > 0: # up
                self.agent_pos[0] -= 1
            elif action == 4 and self.agent_pos[1] > 0: # left
                self.agent_pos[1] -= 1
        
        reward = self._get_reward(action, prev_pos)
        
        terminated = self.current_step >= self.max_steps
        obs = self._get_obs()
        info = {}

        # track previous positions for oscillation detection
        self.prev_prev_pos = self.prev_pos.copy() if self.prev_pos is not None else None
        self.prev_pos = prev_pos.copy()     # store for next step

        return obs, reward, terminated, False, info

    def _get_obs(self):
        """
        UPDATED: Returns (3, 60, 60) tensor with:
        - Channel 0: Image patch
        - Channel 1: Normalized row position (0 to 1)
        - Channel 2: Normalized column position (0 to 1)
        """
        r0 = self.agent_pos[0] * self.block_size
        c0 = self.agent_pos[1] * self.block_size
        
        # Extract image patch
        patch = self.image[r0:r0+self.block_size, c0:c0+self.block_size].astype(np.float32)
        
        # Create position encoding channels (normalized to [0, 1])
        pos_row = np.full_like(patch, self.agent_pos[0] / (self.grid_size - 1))
        pos_col = np.full_like(patch, self.agent_pos[1] / (self.grid_size - 1))
        
        # Stack into (3, H, W) format
        obs = np.stack([patch, pos_row, pos_col], axis=0)
        
        return obs

    def _get_reward(self, action, prev_pos): 
        # oscillation = agent returns to the previous position (A→B→A)
        if self.prev_pos is not None and self.agent_pos == self.prev_pos:
            return -1.0

  
        attempted_move_but_blocked = (action != 0) and (prev_pos == self.agent_pos)
        if attempted_move_but_blocked:
            #print("Out of bounds move attempted") # DEBUGGING
            return -0.3  # penalty for trying to move out of bounds
        
        # look position of the agent in the mask
        r0 = self.agent_pos[0] * self.block_size
        c0 = self.agent_pos[1] * self.block_size
        patch_mask = self.mask[r0:r0+self.block_size, c0:c0+self.block_size]
        
        # Now that i have the patch where i was and the patch where i am, i can check if there is tumor in any of them
        # tumor is labeled as 1 or 4 in the mask        
        # label 2 is edema
        
        # first get a count of the tumor pixels in the patch. 
        tumor_count_curr = np.sum(np.isin(patch_mask, [1, 4]))
        total = self.block_size * self.block_size # to compute the percentage
        # Determine if patch has more than self.tumor_threshold of tumor
        inside = tumor_count_curr > 0 
        # inside = (tumor_count_curr / total) >= self.tumor_threshold

        if action == 0:   
            if inside:
                return self.rewards[0]
            else:
                return self.rewards[1]
        
        else: # movement
            if inside:
                return self.rewards[2]  # reward for moving into tumor
            else:
                # tumor_indices = np.where(self.mask > 0)
                # if len(tumor_indices[0]) == 0:
                #     # Fallback: If no tumor, set target to center of image, 
                #     ty, tx = self.image.shape[0] / 2, self.image.shape[1] / 2 
                # else:
                #     ty, tx = np.mean(tumor_indices, axis=1)

                # prev_dist = np.sqrt((prev_pos[1] - tx)**2 + (prev_pos[0] - ty)**2)
                # curr_dist = np.sqrt((self.agent_pos[1] - tx)**2 + (self.agent_pos[0] - ty)**2)                
                # #reward for moving closer, penalty for moving away
                # dist_delta = prev_dist - curr_dist
                # shaping_reward = dist_delta * 0.2
                return self.rewards[3] #+ shaping_reward

    def render(self, show=True):
        if self.render_mode != "human": # would be rgb_array or ansi
            return  # Only render in human mode

        # Create RGB visualization image
        # not necessary since it's grayscale, but i want to draw the mask and position
        vis_img = np.stack([self.image] * 3, axis=-1).astype(np.float32)

        # Overlay tumor mask in red [..., 0] 
        tumor_overlay = np.zeros_like(vis_img) # do all blank but here we have 3 channels, mask is 2D
        tumor_overlay[..., 0] = (self.mask > 0).astype(float) # red channel. set to float to avoid issues when blending in vis_img

        # transparency overlay (crec que es el mateix valor que tinc a l'altra notebook)
        alpha = 0.4
        vis_img = (1 - alpha) * vis_img + alpha * tumor_overlay

        if show:
            # Plotting
            fig, ax = plt.subplots(figsize=(3, 3))
            ax.imshow(vis_img, cmap='gray', origin='upper')

            # Draw grid lines
            # alpha for transparency again
            for i in range(1, self.grid_size):
                ax.axhline(i * self.block_size, color='white', lw=1, alpha=0.5)
                ax.axvline(i * self.block_size, color='white', lw=1, alpha=0.5)

            # Draw agent position
            r0 = self.agent_pos[0] * self.block_size
            c0 = self.agent_pos[1] * self.block_size
            rect = patches.Rectangle(
                (c0, r0), # (x,y) bottom left corner
                self.block_size, # width
                self.block_size, # height
                linewidth=2,
                edgecolor='yellow',
                facecolor='none'
            )
            ax.add_patch(rect)

            ax.set_title(f"Agent at {self.agent_pos} | Step {self.current_step}/{self.max_steps}")
            ax.axis('off')
            plt.show()
            return None
        else: #just return without showing but draw the agent position
            rgb_array = (vis_img * 255).astype(np.uint8)
        
            # Draw grid lines directly on the array
            for i in range(1, self.grid_size):
                # Horizontal line
                y = i * self.block_size
                rgb_array[y-1:y+1, :] = [255, 255, 255]  # White line
                
                # Vertical line  
                x = i * self.block_size
                rgb_array[:, x-1:x+1] = [255, 255, 255]  # White line
            
            # Draw agent position as a yellow rectangle
            r0 = self.agent_pos[0] * self.block_size
            c0 = self.agent_pos[1] * self.block_size
            
            # Draw rectangle borders (yellow)
            rgb_array[r0:r0+2, c0:c0+self.block_size] = [255, 255, 0]  # Top border
            rgb_array[r0+self.block_size-2:r0+self.block_size, c0:c0+self.block_size] = [255, 255, 0]  # Bottom border
            rgb_array[r0:r0+self.block_size, c0:c0+2] = [255, 255, 0]  # Left border
            rgb_array[r0:r0+self.block_size, c0+self.block_size-2:c0+self.block_size] = [255, 255, 0]  # Right border
            
            # Add step counter text to the image
            from PIL import Image, ImageDraw, ImageFont
            pil_img = Image.fromarray(rgb_array)
            draw = ImageDraw.Draw(pil_img)
            
            # Use default font (you can also load a specific font)
            try:
                font = ImageFont.truetype("arial.ttf", 16)
            except:
                font = ImageFont.load_default()
            
            # Draw step counter in top-left corner
            step_text = f"Step: {self.current_step}/{self.max_steps}"
            draw.text((5, 5), step_text, fill=(255, 255, 0), font=font)  # Yellow text
            
            # Convert back to numpy array
            rgb_array = np.array(pil_img)
            return rgb_array
        
    def current_patch_overlap_with_lesion(self, pos=None): # FALTAAA chat
        """ Returns the number of overlapping lesion pixels between the agent's current patch and the ground-truth mask. If > 0, the agent is correctly over the lesion (TP). """
        if pos is None:
            row, col = self.agent_pos
        else:
            row, col = pos
        patch_h = self.block_size # not grid_size because grid_size is number of patches per side
        patch_w = self.block_size
        
        y0 = row * patch_h
        y1 = y0 + patch_h
        x0 = col * patch_w
        x1 = x0 + patch_w
        # extract mask region under current patch
        patch_mask = self.mask[y0:y1, x0:x1]
        # count how many pixels of lesion (nonzero)
        overlap = np.sum(patch_mask > 0)
        return overlap



In [3]:
test_pairs = prepare(mode='test')

Preparing testing set.
✅ Found 100 pairs out of 100 listed in CSV.


# TESTING:

In [4]:
# load model to test:
LR = 5e-5

CURRENT_CONFIG = {
    'grid_size': 6,
    'rewards': [10.0, -10.0, 2.5, -0.1], # [staying on tumor, staying off tumor, moving into tumor, movement cost] #[3.0, -1.0, -0.2],
    'action_space': spaces.Discrete(5), 
    'max_steps': 20
    # 'stop': False
}

model_name = 'Extension093'
env = GlioblastomaPositionalEncoding(*test_pairs[0], **CURRENT_CONFIG)

model = DQNPositionalEncoding(env, learning_rate=LR, device='cpu')
model.load_state_dict(torch.load(f"{model_name}.dat"))

agent = DQNAgent(env_config=CURRENT_CONFIG, dnnetwork=model, buffer_class=ReplayBuffer, train_pairs=test_pairs,
                 env_class=GlioblastomaPositionalEncoding,
                 epsilon=0.00)  # very low epsilon for testing

In [5]:
overall_results = testing(agent, test_pairs, agent_type="dqn", num_episodes=len(test_pairs), env_config=CURRENT_CONFIG, save_gifs=True, gif_folder=f"GIFs_Testing_{model_name}")

Saved GIF for episode 0 at GIFs_Testing_Extension093/episode_0_002_58.gif
Saved GIF for episode 10 at GIFs_Testing_Extension093/episode_10_013_86.gif
Saved GIF for episode 20 at GIFs_Testing_Extension093/episode_20_024_49.gif
Saved GIF for episode 30 at GIFs_Testing_Extension093/episode_30_038_84.gif
Saved GIF for episode 40 at GIFs_Testing_Extension093/episode_40_052_98.gif
Saved GIF for episode 50 at GIFs_Testing_Extension093/episode_50_104_74.gif
Saved GIF for episode 60 at GIFs_Testing_Extension093/episode_60_176_99.gif
Saved GIF for episode 70 at GIFs_Testing_Extension093/episode_70_204_52.gif
Saved GIF for episode 80 at GIFs_Testing_Extension093/episode_80_260_62.gif
Saved GIF for episode 90 at GIFs_Testing_Extension093/episode_90_300_107.gif

TEST RESULTS (DQN Agent)
✅Hard Success (correct STAY): 11.00%
   ❌Hard Failure (wrong STAY): 89.00%
✔️Timeout Success (lucky): 0.00%
   ❌Timeout Failure: 0.00%
Average Episode Reward: -8.78
Average Steps to Find Tumor: 17.91
Average Tumor R