In [None]:
#  Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')



Mounted at /content/drive


In [1]:
!pip install gymnasium

Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m15.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0


In [10]:

import gymnasium as gym
import numpy as np
from itertools import count
from collections import deque
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.distributions import Categorical



device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
batch_size = 32
learning_rate = 1e-2

input_size = 8

hidden_size = 128
num_classes= 2
num_layers = 1



gamma=0.99


log_interval=100

scores, running_scores= [],  []


class Policy(nn.Module):
    def __init__(self):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(input_size, hidden_size)
        self.dropout = nn.Dropout(p=0.2)
        self.affine2 = nn.Linear(hidden_size, num_classes)

        self.saved_log_probs = []
        self.rewards = []

    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=1)





env = gym.make("two-step-task-novel-v0", render_mode='human')


policy = Policy()
#comment optimizer if continuing training for a saved checkpoint
optimizer = optim.Adam(policy.parameters(), lr=learning_rate)



In [None]:
# Load the checkpoint from Google Drive
path = "/content/drive/MyDrive/Models/folder_name/"

file_name_trained="best_checkpoint_atThreshold_2562.pth"
checkpoint_tarined = torch.load(f"{path}{file_name_trained}", weights_only=False)

# Load the model and optimizer state
optimizer = optim.Adam(policy.parameters())  # Recreate the optimizer with the same settings as before
policy.load_state_dict(checkpoint_tarined['model_state_dict'])
optimizer.load_state_dict(checkpoint_tarined['optimizer_state_dict'])
policy.train()



Policy(
  (affine1): Linear(in_features=8, out_features=128, bias=True)
  (dropout): Dropout(p=0.2, inplace=False)
  (affine2): Linear(in_features=128, out_features=2, bias=True)
)

In [11]:


# Epsilon value to avoid division by zero during normalization
eps = np.finfo(np.float32).eps.item()

# Function to select an action based on the current state
def select_action(state):
    # Convert the state to a PyTorch tensor, add a batch dimension, and move to the device (CPU or GPU)
    state = torch.from_numpy(state).float().unsqueeze(0).to(device)

    # Get the action probabilities from the policy model
    probs = policy(state)

    # Create a categorical distribution using the action probabilities
    action_dist = Categorical(probs)

    # Sample an action from the distribution
    action = action_dist.sample()

    # Store the log probability of the taken action for later use in the policy loss computation
    policy.saved_log_probs.append(action_dist.log_prob(action))

    # Return the action as a Python integer (scalar)
    return action.item()

# Function to finish an episode, calculate the policy loss, and perform backpropagation
def finish_episode():
    # Initialize the return value (R) and lists to store the policy loss and the computed returns
    R = 0
    policy_loss = []
    returns = deque()

    # Loop through the rewards in reverse order to calculate the discounted returns (R)
    for r in policy.rewards[::-1]:
        R = r + gamma * R  # Apply the discount factor (gamma)
        returns.appendleft(R)  # Store the return value at the beginning of the deque

    # Convert returns to a tensor
    returns = torch.tensor(returns)

    # Normalize the returns (zero mean, unit variance) to stabilize training
    returns = (returns - returns.mean()) / (returns.std() + eps)

    # Compute the policy loss using the log probability of each action and the computed return
    for log_prob, R in zip(policy.saved_log_probs, returns):
        policy_loss.append(-log_prob * R)  # Negative log probability is used for gradient ascent

    # Zero out gradients before backpropagation
    optimizer.zero_grad()

    # Concatenate the policy losses and sum them up
    policy_loss = torch.cat(policy_loss).sum()

    # Perform backpropagation on the policy loss
    policy_loss.backward()

    # Update the model parameters using the optimizer
    optimizer.step()

    # Clear the stored rewards and log probabilities to start fresh for the next episode
    del policy.rewards[:]
    del policy.saved_log_probs[:]

    return

# Main function to run the reinforcement learning training loop
def main():
    # Define the folder path where models will be saved
    folder_path = '/content/drive/MyDrive/Models/folder_name/'

    # Set the threshold for the running reward to consider the task solved
    threshold = 700
    running_reward = 10  # Initialize the running reward

    # Loop over episodes
    for i_episode in range(1001):
        # Reset the environment and get the initial state
        state = env.reset()[0]
        state = np.array(state)
        ep_reward = 0  # Initialize the episode reward

        # Loop over time steps within an episode
        for t in range(1, 10000):  # Limit the maximum number of time steps to avoid infinite loops

            # Select an action based on the current state
            action = select_action(state)

            # Take the action and get the next state, reward, and whether the episode is done
            state, reward, done, _, info = env.step(action)

            # Add the reward for the current time step to the episode reward
            ep_reward += reward

            # Store the reward for this time step for later use in the policy loss calculation
            policy.rewards.append(reward)


            # If the episode is done, break out of the loop
            if done:
                break

        # Append the episode's total reward to the scores list
        scores.append(ep_reward)
        if i_episode ==1:
              return
        # Update the running average of the episode rewards using an exponential moving average
        running_reward = 0.05 * ep_reward + (1 - 0.05) * running_reward

        # Call finish_episode to calculate the policy loss and update the model
        finish_episode()

        # Log the episode results every log_interval episodes
        if i_episode % log_interval == 0:
            print(f'Episode {i_episode} Last reward: {ep_reward} Average reward: {running_reward}')

        # If the running reward exceeds the threshold, save the model checkpoint
        if running_reward >= threshold:
            threshold = running_reward  # Update the threshold to the new running reward
            print(f"Solved! eps {i_episode} Running reward is now {running_reward}")

            # Create a checkpoint dictionary to save the model and optimizer states
            checkpoint = {
                'model_state_dict': policy.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'score': ep_reward
            }

            # Save the checkpoint to a file with details about the threshold and reward
            torch.save(checkpoint, f'best_checkpoint_atThreshold{threshold}_runing{running_reward}_eps{i_episode}_score_{ep_reward}.pth')
            print(f"New best running score: {running_reward} . Checkpoint saved!")

            # Move the checkpoint file to the specified Google Drive folder
            !mv /content/best*.pth /content/drive/MyDrive/Models/folder_name/

        # If it's the last episode (episode 1000), save the final checkpoint
        if i_episode == 1000:
            print(f"last episode! Running reward is now {running_reward}")

            # Create a final checkpoint dictionary
            checkpoint = {
                'model_state_dict': policy.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'score': ep_reward
            }

            # Save the final checkpoint
            torch.save(checkpoint, f'best_checkpoint_attheEnd_score_{ep_reward}_runing{running_reward}.pth')
            print(f"last best score: {ep_reward} . Checkpoint saved!")

            # Move the final checkpoint to the Google Drive folder
            !mv /content/best*.pth /content/drive/MyDrive/Models/folder_name/
            return

if __name__ == '__main__':
    main()


Blu   Gre 
      
  Age   

Episode 0 Last reward: 2259 Average reward: 122.45
Ora   Tur 
      
  Age   



In [9]:
# Import necessary libraries
import gymnasium as gym  # Gymnasium (new version of Gym)
from enum import Enum
import random
import numpy as np
from scipy.stats import norm

from os import truncate

# Enum defining the possible actions the agent can take in the environment: LEFT or RIGHT
class AgentAction(Enum):
    LEFT = 0
    RIGHT = 1

    # Method to get the integer value of the action (0 or 1)
    def get_value(self):
        return self.value

# Enum defining various grid tiles in the environment (e.g., agent, spaceships, aliens)
class GridTile(Enum):
    Agent = 0
    Green_Spaceship = 1
    Blue_Spaceship = 2
    Turquoise_Spaceship = 3
    Orange_Spaceship = 4
    Red_Alien = 5
    Purple_Alien = 6

    # Method to return the first 3 letters of the tile name for printing
    def __str__(self):
        return self.name[:3]

    # Method to get the integer value associated with the grid tile
    def get_value(self):
        return self.value

# Class representing the Two-Step Task, a custom task environment for reinforcement learning
class Two_Step_Task:

    # Initialize the grid size and task setup
    def __init__(self, grid_rows=3, grid_cols=3, fps=1):
        self.grid_rows = grid_rows
        self.grid_cols = grid_cols
        self.reset()

    # Reset the environment to its initial state
    def reset(self, seed=None):
        random.seed(seed)  # Set the seed for reproducibility
        self.state = [0, 0, 0, 0]  # Initial state undefined
        self.assign_stimulus()  # Assign stimulus for the left and right actions and sets state

        # Initialize positions for rendering the agent and the stimuli
        self.left_stim_pos = [0, 0]
        self.right_stim_pos = [0, 2]
        self.agent_pos = [2, 1]

        self.generate_rewards()  # Generate reward probabilities for each action state pair
        self.trial_num = 0  # Initialize trial number
        self.winning_prob_matrix = []  # Reward probabilities matrix
        self.stake = 1  # Initial stake (can be 1 or 5)
        self.score = 0  # Initial score

    # Reset for a new trial
    def reset_trial(self):
        self.state = [0, 0, 0, 0]  # Reset state to initial
        self.assign_stimulus()  # Re-assign stimuli
        self.left_stim_pos = [0, 0]
        self.right_stim_pos = [0, 2]
        self.agent_pos = [2, 1]

    # Generate reward probabilities for each trial based on Gaussian noise
    def generate_rewards(self):
        self.bounds = [0, 9]  # Set reward bounds
        self.sd = 2  # Standard deviation for noise
        self.choices = 1  # Only one choice is made in this novel task
        self.nrtrials = 200  # Number of trials

        # Ensure the bounds are sorted
        self.bounds = np.sort(self.bounds)

        # Initialize rewards matrix
        self.rewards = np.zeros((self.nrtrials, 2, self.choices))

        # Randomly generate reward probabilities for the first trial
        if np.random.rand() < 0.5:
            if np.random.rand() < 0.5:
                x = np.array([0, 4])
            else:
                x = np.array([4, 0])
        else:
            if np.random.rand() < 0.5:
                x = np.array([[5, 9]])
            else:
                x = np.array([[9, 5]])

        # Set the reward for the first trial
        self.rewards[0, :, 0] = x

        # Loop through each trial to update the rewards with Gaussian noise
        for t in range(1, self.nrtrials):
            for s in range(2):  # Two states (based on stimulus)
                for a in range(self.choices):  # One choice per trial
                    # Update reward with noise and ensure it stays within bounds
                    self.rewards[t, s, a] = self.rewards[t - 1, s, a] + norm.rvs(scale=self.sd)
                    self.rewards[t, s, a] = min(self.rewards[t, s, a], max(self.bounds[1] * 2 - self.rewards[t, s, a], self.bounds[0]))
                    self.rewards[t, s, a] = max(self.rewards[t, s, a], min(self.bounds[0] * 2 - self.rewards[t, s, a], self.bounds[1]))

        # Round rewards to 3 decimal places
        self.rewards = np.round(self.rewards, 3)
        return self.rewards

    # Reset the stake randomly (either 1 or 5)
    def reset_stake(self):
        p = random.random()
        if p <= 0.5:
            self.stake = 1
        else:
            self.stake = 5

    # Assign state and a stimulus (spaceship type) to the agent's environment
    def assign_stimulus(self, stim_1=GridTile.Green_Spaceship, stim_2=GridTile.Blue_Spaceship,
                        stim_3=GridTile.Turquoise_Spaceship, stim_4=GridTile.Orange_Spaceship):
        p = random.random()
        if p <= 0.5:
            p = random.random()
            if p <= 0.5:
                self.left_stim = stim_1
                self.right_stim = stim_2
            else:
                self.left_stim = stim_2
                self.right_stim = stim_1
            self.state = [1, 0, 0, 0]  # State for first stimulus pair
        else:
            p = random.random()
            if p <= 0.5:
                self.left_stim = stim_3
                self.right_stim = stim_4
            else:
                self.left_stim = stim_4
                self.right_stim = stim_3
            self.state = [0, 1, 0, 0]  # State for second stimulus pair

        self.state_part2 = [self.left_stim.get_value(), self.right_stim.get_value()]

    # Assign a planet based on the spaceship type (Green, Blue, Turquoise, Orange)
    def assign_planet(self, space_ship: GridTile):
        p = random.random()
        match space_ship:
            case GridTile.Green_Spaceship:
                self.state = [0, 0, 1, 0]  # Red planet
                self.state_part2 = [GridTile.Red_Alien.get_value(), 0]
            case GridTile.Blue_Spaceship:
                self.state = [0, 0, 0, 1]  # Purple planet
                self.state_part2 = [0, GridTile.Purple_Alien.get_value()]
            case GridTile.Turquoise_Spaceship:
                self.state = [0, 0, 1, 0]  # Red planet
                self.state_part2 = [GridTile.Red_Alien.get_value(), 0]
            case GridTile.Orange_Spaceship:
                self.state = [0, 0, 0, 1]  # Purple planet
                self.state_part2 = [0, GridTile.Purple_Alien.get_value()]

    # Perform the agent's action (LEFT or RIGHT) and update the state
    def perform_action(self, agent_action: AgentAction):
        if agent_action == AgentAction.LEFT:
            self.assign_planet(self.left_stim)

        if agent_action == AgentAction.RIGHT:
            self.assign_planet(self.right_stim)

        # Determine the reward index based on the current state and action
        reward_ind_state = self.state.index(1) - 2
        reward_ind_action = 0
        self.reward_prob = self.rewards[self.trial_num, reward_ind_state, reward_ind_action]
        self.winning_prob_matrix = self.rewards[self.trial_num, :, :]
        if self.trial_num <= 199:
            self.trial_num += 1

    # Render the environment's current state on the console
    def render(self):
        for r in range(self.grid_rows):
            for c in range(self.grid_cols):
                if [r, c] == self.agent_pos:
                    print(GridTile.Agent, end=' ')
                elif [r, c] == self.left_stim_pos:
                    print(GridTile(self.left_stim), end=' ')
                elif [r, c] == self.right_stim_pos:
                    print(GridTile(self.right_stim), end=' ')
                else:
                    print(' ', end=' ')
            print()
        print()


# Implementing the custom Gymnasium environment
from gymnasium import spaces
from gymnasium.envs.registration import register
from gymnasium.utils.env_checker import check_env
import numpy as np

# Custom Gymnasium environment class
class TwoStepTaskEnv(gym.Env):
    metadata = {"render_modes": ["human"], 'render_fps': 1}  # Metadata for Gymnasium

    # Initialize the environment
    def __init__(self, grid_rows=3, grid_cols=3, render_mode=None):
        self.grid_rows = grid_rows
        self.grid_cols = grid_cols
        self.render_mode = render_mode
        self.input_size = 8  # Total size of the observation space

        # Initialize the task agent
        self.task_agent = Two_Step_Task(grid_rows=grid_rows, grid_cols=grid_cols, fps=self.metadata['render_fps'])

        # Define action space (discrete action choices)
        self.action_space = spaces.Discrete(len(AgentAction))

        # Define observation space (size of the state vector)
        self.observation_space = spaces.Box(
            low=0,
            high=4000,
            shape=(self.input_size,),
            dtype=np.int64
        )

    # Reset the environment and return the initial observation
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)  # Ensure reproducibility
        self.task_agent.reset(seed=seed)
        observation = self.task_agent.state + self.task_agent.state_part2 + [self.task_agent.stake] + [self.task_agent.score]

        # Reshape observation and return
        obs = np.array([observation], dtype=np.int64).reshape((self.input_size,))
        info = {}  # Optional info dictionary

        # Render environment if necessary
        if self.render_mode == 'human':
            self.render()

        return obs, info

    # Render the environment's current state
    def render(self):
        self.task_agent.render()

    # Step function to perform an action and return the new state, reward, etc.
    def step(self, action):
        self.task_agent.perform_action(AgentAction(action))

        # Calculate reward and termination condition
        reward = 0
        terminated = False
        reward = int(self.task_agent.reward_prob) * self.task_agent.stake
        self.task_agent.score += reward
        self.task_agent.reset_stake()
        self.task_agent.reset_trial()

        # Check for termination condition
        if self.task_agent.trial_num >= 200:
            terminated = True

        # Update observation
        observation = self.task_agent.state + self.task_agent.state_part2 + [self.task_agent.stake] + [self.task_agent.score]
        obs = np.array([observation], dtype=np.int64).reshape((self.input_size,))

        info = {'number of trials': self.task_agent.trial_num,
                'stake': self.task_agent.stake,
                'winning_matrix': self.task_agent.winning_prob_matrix}

        return obs, reward, terminated, False, info


# Register this environment with Gymnasium for use with gym.make()
register(
      id='two-step-task-novel-v0',  # ID for the environment
      entry_point=TwoStepTaskEnv,   # Class used to create the environment
)


  logger.warn(f"Overriding environment {new_spec.id} already in registry.")
