In [1]:
import numpy as np
from PIL import Image
import os

import random
import gymnasium as gym
from gymnasium import spaces

from stable_baselines3 import PPO

In [2]:
class ImagePairEnvironment(gym.Env):
    def __init__(self):
        super(ImagePairEnvironment, self).__init__()
        
        # Define the base directory for images
        self.image_dir = "./ResizedSymbols"

         # Define image pairs with paths and points, using placeholders for paths
        self.learning_pairs = {
            'A+B': {'A': {'points': 10, 'prob': 0.75}, 
                    'B': {'points': 10, 'prob': 0.25}},
            'C+D': {'C': {'points': 1, 'prob': 0.75}, 
                    'D': {'points': 1, 'prob': 0.25}},
            'E+F': {'E': {'points': 10, 'prob': 0.75}, 
                    'F': {'points': 10, 'prob': 0.25}},
            'G+H': {'G': {'points': 1, 'prob': 0.75}, 
                    'H': {'points': 1, 'prob': 0.25}}
        }

        self.transfer_pairs = {
            'A+C': {'A': {'points': 10, 'prob': 0.75}, 
                    'C': {'points': 1, 'prob': 0.75}},
            'B+D': {'B': {'points': 10, 'prob': 0.25}, 
                    'D': {'points': 1, 'prob': 0.25}},
            'E+H': {'E': {'points': 10, 'prob': 0.75}, 
                    'H': {'points': 1, 'prob': 0.25}},
            'F+G': {'F': {'points': 10, 'prob': 0.25}, 
                    'G': {'points': 1, 'prob': 0.75}}
        }

        self.phase = 'learning'
        self.episode_count = 0
        self.current_pair = None
        
        # Define the observation and action space
        self.observation_space = spaces.Box(low=0, high=1, shape=(2, 400), dtype=np.float64)
        self.action_space = spaces.Discrete(2)  # 0: choose left image, 1: choose right image

    def get_image_path(self, symbol, has_indicator=False):
        """Return the path to the image file based on the symbol and indicator presence."""
        filename = f"Symbol{symbol}{'_R' if has_indicator else ''}.png"
        return os.path.join(self.image_dir, filename)

    def preprocess_image(self, path):
        """Load and preprocess the image from the given path."""
        img = Image.open(path).convert('L')  # Convert to grayscale
        img_data = np.array(img).flatten() / 255.0  # Normalize pixel values
        return img_data

    def reset(self, seed=None):
        self.episode_count = 0
        if self.phase == 'learning':
            self.current_pair = random.choice(list(self.learning_pairs.keys())) 
        elif self.phase == 'transfer':
            self.current_pair = random.choice(list(self.transfer_pairs.keys()))
        return self.get_image_observation(self.current_pair)
    
    def get_image_observation(self, pair_key):
        # Swap the order 50% of the time
        if random.random() < 0.5:
            left_image, right_image = pair_key.split('+')
        else:
            right_image, left_image = pair_key.split('+')
        
        # Load and preprocess images from their respective paths
        pair_data = self.learning_pairs if self.phase == 'learning' else self.transfer_pairs
        
        withSymbolL = np.random.rand() < pair_data[pair_key][left_image]['prob']
        left_img_data = self.preprocess_image(self.get_image_path(left_image, withSymbolL))
        
        withSymbolR = np.random.rand() < pair_data[pair_key][right_image]['prob']
        right_img_data = self.preprocess_image(self.get_image_path(right_image, withSymbolR))
        
        return np.array([left_img_data, right_img_data]), {'left_image': left_image, 'leftHasSymbol': withSymbolL, 'right_image': right_image, 'rightHasSymbol': withSymbolR,}
    
    def step(self, action):
        # Identify which images are on the left and right
        observation, images = self.get_image_observation(self.current_pair)
        left_image = images['left_image']
        right_image = images['right_image']

        # Determine the chosen image
        chosen_image = left_image if action == 0 else right_image
        if action == 0:
            chosen_image = left_image
            hasSymbol = images['leftHasSymbol']
        else:
            chosen_image = right_image
            hasSymbol = images['rightHasSymbol']

        pair_data = self.learning_pairs[self.current_pair] if self.phase == 'learning' else self.transfer_pairs[self.current_pair]
        
        # Apply the 95% predictor rule
        points = pair_data[chosen_image]['points']

        # Hand a reward 95% of the times with a symbol and 5% of the times without a symbol
        reward = points if (hasSymbol and np.random.rand() < 0.95) else (points if (not hasSymbol and np.random.rand() < 0.05) else 0)
     
        # Prepare next state
        done = False
        self.episode_count += 1
        if self.phase == 'learning':
            self.current_pair = random.choice(list(self.learning_pairs.keys())) 
        elif self.phase == 'transfer':
            self.current_pair = random.choice(list(self.transfer_pairs.keys()))
        
        #if self.episode_count >= 10000:
            #done = True
        truncated = False
        info = {'hasSymbol': hasSymbol}
        
        #print(self.episode_count)
        return observation, reward, done, truncated, info


In [None]:
# Initialize the environment and PPO agent
env = ImagePairEnvironment()
model = PPO("MlpPolicy", env, verbose=1, gamma=1)

# Train the agent
learning_timesteps = 10000  # Adjust if needed
model.learn(total_timesteps=learning_timesteps)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
-----------------------------
| time/              |      |
|    fps             | 282  |
|    iterations      | 1    |
|    time_elapsed    | 7    |
|    total_timesteps | 2048 |
-----------------------------
------------------------------------------
| time/                   |              |
|    fps                  | 231          |
|    iterations           | 2            |
|    time_elapsed         | 17           |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0014977283 |
|    clip_fraction        | 0            |
|    clip_range           | 0.2          |
|    entropy_loss         | -0.693       |
|    explained_variance   | -0.0013      |
|    learning_rate        | 0.0003       |
|    loss                 | 905          |
|    n_updates            | 10           |
|    policy_gradient_loss | -0.000862    |
|    va

<stable_baselines3.ppo.ppo.PPO at 0x1e05aca0fe0>

In [55]:
# Set the environment to the transfer phase
env.phase = 'transfer'
vec_env = model.get_env()
# Test the model
num_episodes = 2  # Number of episodes to test
runs = 10000

for episode in range(num_episodes):
    obs = vec_env.reset()
    #obs = env.reset()
    done = False
    total_reward = 0
    while not done:
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, done, _, _ = env.step(action)
        total_reward += reward
        if env.episode_count == runs:
            done = True
    print(f"Episode {episode + 1}: Total Reward = {total_reward}")

Episode 1: Total Reward = 27801
Episode 2: Total Reward = 27123


In [4]:
env.phase = 'transfer'  # Switch to transfer phase for testing
vec_env = model.get_env()
obs = vec_env.reset()
done = False
rewards = []
hasSymbol = np.empty(0)
runs = 10000

while not done:
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, done, truncated, info = env.step(action)
    rewards.append(reward)
    hasSymbol = np.append(hasSymbol, info['hasSymbol'])
    if env.episode_count == runs:
        done = True

print(f"Total reward in transfer phase: {sum(rewards)}")
# Calculate the percentage of True values
true_count = np.sum(hasSymbol)
total_count = len(hasSymbol)
percentage_true = (true_count / total_count) * 100

print(f"Percentage of True values: {percentage_true}%")


Total reward in transfer phase: 27454
Percentage of True values: 50.51%
