In [None]:
!pip install gymnasium
!pip install tensorflow==2.3.0
!pip install keras
!pip install keras-rl2

Collecting gymnasium
  Downloading gymnasium-0.29.0-py3-none-any.whl (953 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.8/953.8 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.0
[31mERROR: Could not find a version that satisfies the requirement tensorflow==2.3.0 (from versions: 2.8.0rc0, 2.8.0rc1, 2.8.0, 2.8.1, 2.8.2, 2.8.3, 2.8.4, 2.9.0rc0, 2.9.0rc1, 2.9.0rc2, 2.9.0, 2.9.1, 2.9.2, 2.9.3, 2.10.0rc0, 2.10.0rc1, 2.10.0rc2, 2.10.0rc3, 2.10.0, 2.10.1, 2.11.0rc0, 2.11.0rc1, 2.11.0rc2, 2.11.0, 2.11.1, 2.12.0rc0, 2.12.0rc1, 2.12.0, 2.12.1, 2.13.0rc0, 2.13.0rc1, 2.13.0rc2, 2.13.0)[0m[31m
[0m[31mERROR: No matching distribution found for tensorflow==2.3.0[0m[31m
Collecting keras-rl2
  Downloading keras_rl2-1.0.5-py3-non

In [None]:
# Import required libraries
import numpy as np
import cv2
import matplotlib.pyplot as plt
import PIL.Image as Image
import gym
import random
from gym import Env, spaces
import time

# Environment Definition
class Experiment_Scope(Env):
    metadata = {"render_modes": ["rgb_array"]} # Metadata

    def __init__(self, observation_shape: tuple = (902, 600, 1), radius_interval: tuple = (10, 50), intensity_interval: tuple = (0, 5), number_of_circles: int = 1, go_cue_interval: tuple = (20, 41)):
        """
            Initializes the Experiment_Scope object.

            Parameters:
            - observation_shape (tuple): Shape of the observation (default: (902, 600, 1))
            - radius_interval (tuple): Interval for circle radius (default: (10, 50))
            - intensity_interval (tuple): Interval for circle intensity (default: (0, 5))
            - number_of_circles (int): Number of circles (default: 1)
            - cue_interval (tuple): Interval for go cue time (default: (20, 41))
        """
        super(Experiment_Scope, self).__init__()

        self.observation_shape = observation_shape
        self.observation_space = spaces.Box(low=-105, high=105, shape=(5, ))

        self.action_space = spaces.Discrete(3)

        self._action_to_value = {
            1 : -5, # Left
            0 : 0, # No Op
            2 : +5, # Right
        }

        self.render_mode = 'rgb_array'
        self.screen = np.zeros(self.observation_shape) + 255
        self.screen_part = ""
        self.intensity_vector = None

        self.decision_var = 0
        self.is_done = False
        self.time_step = 1
        self.go_cue_time = np.random.randint(go_cue_interval[0], go_cue_interval[1])
        self.cue_played = False

        self.radius_interval = radius_interval
        self.intensity_interval = intensity_interval
        self.number_of_circles = number_of_circles
        self.cue_interval = go_cue_interval

    def _get_obs(self) -> dict:
        """
            Returns a dictionary containing the observation variables.

            Returns:
                dict: A dictionary containing the observation variables.
                    - 'decision_var': The value of the decision variable.
                    - 'cue_played': Whether cue played or not.
        """
        return np.array([self.intensity_vector[0]/100, self.intensity_vector[1]/100, self.intensity_vector[2]/100, self.decision_var, int(self.cue_played)])

    def _get_info(self) -> dict:
        """
            Returns a dictionary containing information about the object.

            Returns:
                dict: A dictionary with the following keys:
                    - 'is_done': A boolean indicating whether the task is done or not.
                    - 'time_step': An integer representing the time step of the task.
        """
        return {'is_done': self.is_done, "time_step": self.time_step}

    def _generate_image(self) -> tuple:
        """
            Generates an image with circles and Gabor effects on different parts of the screen.

            Returns:
                tuple: A tuple containing the generated image, the screen part where the circles are located,
                    and the intensities of each screen part.
        """
        width = self.observation_shape[0]
        height = self.observation_shape[1]
        number_of_circles = self.number_of_circles
        intensity_interval = self.intensity_interval
        radius_interval = self.radius_interval
        image = np.zeros((height, width, 1), dtype=np.uint8) + 255
        parts = [1, 2, 3]
        screen_intensities = np.zeros((3, ))
        left_circles = 0
        right_circles = 0
        middle_circles = 0
        color = 0
        thickness = 1
        for _ in range(number_of_circles):
            intensity = np.random.randint(intensity_interval[0], intensity_interval[1])
            int_tmp = intensity * 25
            intensity = 255 - (2 ** (2 * intensity) - 1)
            radius = np.random.randint(radius_interval[0], radius_interval[1])
            image_part = random.choice(parts)
            parts.remove(image_part)
            if image_part == 1:
                # Left Screen
                left_circles += 1
                center_x = np.random.randint(radius, (width-2)//3 - 1 - radius)
                center_y = np.random.randint(radius, height - 1 - radius)
                screen_intensities[0] = int_tmp

            elif image_part == 2:
                # Middle Screen
                middle_circles += 1
                center_x = np.random.randint((width-2) // 3 + radius, 2 * (width-2) // 3 - 1 - radius)
                center_y = np.random.randint(radius, height - 1 - radius)
                screen_intensities[1] = int_tmp

            else:
                # Right Screen
                right_circles += 1
                center_x = np.random.randint(2 * (width-2) // 3  + radius, (width-2) - radius)
                center_y = np.random.randint(radius, height - 1 - radius)
                screen_intensities[2] = int_tmp

            cv2.circle(image, (center_x, center_y), radius, intensity, -1)
            # Gabor Effect
            start_point = (center_x, center_y - radius)
            end_point = (center_x - radius, center_y)
            cv2.line(image, start_point, end_point, 255, thickness + 5)

            start_point = (center_x + radius, center_y)
            end_point = (center_x, center_y + radius)
            cv2.line(image, start_point, end_point, 255, thickness + 5)

            start_point = (center_x + int(np.sqrt(2) / 2 * radius), center_y - int(np.sqrt(2) / 2 * radius))
            end_point = (center_x - int(np.sqrt(2) / 2 * radius), center_y + int(np.sqrt(2) / 2 * radius))
            cv2.line(image, start_point, end_point, 255, thickness + 5)

        # Screen Split lines
        start_point1 = ((width-2) // 3, 0)
        end_point1 = ((width-2) // 3, 599)
        start_point2 = (2 * (width-2) // 3, 0)
        end_point2 = (2 * (width-2) // 3, 599)

        cv2.line(image, start_point1, end_point1, color, thickness)
        cv2.line(image, start_point2, end_point2, color, thickness)

        screen_part = None
        if right_circles > 0 and (left_circles == 0 and middle_circles == 0):
            screen_part = "right"

        elif left_circles > 0 and (right_circles == 0 and middle_circles == 0):
            screen_part = "left"

        elif middle_circles > 0 and (right_circles == 0 and left_circles == 0):
            screen_part = "middle"

        elif right_circles > 0 and left_circles > 0 and middle_circles == 0:
            screen_part = "right-left"

        elif middle_circles > 0 and left_circles > 0 and right_circles == 0:
            screen_part = "middle-left"

        elif right_circles > 0 and middle_circles > 0 and left_circles == 0:
            screen_part = "right-middle"

        elif right_circles > 0 and left_circles > 0 and middle_circles > 0:
            screen_part = "right-middle-left"

        else:
            screen_part = ""


        return (image, screen_part, screen_intensities)

    def reset(self, seed=None, options=None):
        """
            Resets the environment to its initial state.

            Args        seed (int): Optional. The random seed used for generating random numbers.
                options (dict): Optional. Additional options for resetting the environment.

            Returns:
                    observation: The initial observation of the environment.
                    info: Additional information about the environment's state.

            Raises:
                Any exceptions that may occur during the reset process.

            Notes:
                - This method should be called before starting a new episode or when the environment needs to be reset.
                - The `seed` parameter can be used to reproduce the same sequence of random numbers for consistent results.
                - The `options` parameter can be used to pass any additional configuration options specific to the environment.

        """
        super().reset(seed=seed)

        self.screen, self.screen_part, self.intensity_vector = self._generate_image()

        self.decision_var = 0
        self.is_done = False
        self.time_step = 1
        self.go_cue_time = np.random.randint(self.cue_interval[0], self.cue_interval[1])

        observation = self._get_obs()
        info = self._get_info()

        return observation, info

    def step(self, action):
        """
            Executes a single step in the decision-making process.

            Args:
                action (int): The action to take.

            Returns:
                tuple: A tuple containing the observation, reward, and info.

            Raises:
                None

        """

        if self.time_step >= self.go_cue_time:
            self.cue_played = True

        if not self.cue_played:
          self.time_step += 1
          observation = self._get_obs()
          info = self._get_info()

          return observation, 0, self.is_done, None, info

        self.decision_var += (action - 1) * 5
        if self.decision_var >= 100:
            # Right Decision
            self.is_done = True
            if not self.go_cue_time:
                # Make decision before go cue sound
                reward = -300
            else:
                # Make decision after go cue sound
                if 'right' in self.screen_part:
                    reward = 200
                else:
                    reward = -100

        elif self. decision_var <= -100:
            # Left Decision
            self.is_done = True
            if not self.cue_played:
                # Make decision before go cue sound
                reward = -300
            else:
                # Make decision after go cue sound
                if 'left' in self.screen_part:
                    reward = 200
                else:
                    reward = -100

        elif self.time_step >= 200:
            # Middle or No Stimulus
            self.is_done = True
            if self.screen_part == "" or "middle" in self.screen_part:
                reward = 200
            else:
                reward = -100

        else:
            reward = 0
            self.time_step += 1

        observation = self._get_obs()
        info = self._get_info()

        return observation, reward, self.is_done, None, info

    def render(self):
        """
            Renders the screen.

            Returns:
                The rendered screen.
        """
        return self.screen


In [None]:
env = Experiment_Scope()

  and should_run_async(code)


In [None]:
episodes = 20 #20 shower episodes
for episode in range(1, episodes+1):
    state, info = env.reset()
    done = False
    score = 0

    while not done:
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)
        score+=reward
    print('Episode:{} Score:{}'.format(episode, score))

ValueError: ignored

In [None]:
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
# from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers.legacy import Adam
import tensorflow as tf

In [None]:
env.reset()[0]

  and should_run_async(code)


array([0., 0., 1., 0., 0.])

In [None]:
states = env.observation_space.shape
actions = env.action_space.n

In [None]:
states, actions

((5,), 3)

In [None]:
def build_model(states, actions):
  model = Sequential()
  # model.add(Flatten(input_shape=(1, states)))
  model.add(Dense(24, activation='relu', input_shape=(1, 5)))
  model.add(Dense(24, activation='relu'))
  model.add(Dense(actions, activation='linear'))
  model.add(tf.keras.layers.Reshape((3, )))
  return model

In [None]:
model = build_model(states, actions)

In [None]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 1, 24)             144       
                                                                 
 dense_1 (Dense)             (None, 1, 24)             600       
                                                                 
 dense_2 (Dense)             (None, 1, 3)              75        
                                                                 
 reshape (Reshape)           (None, 3)                 0         
                                                                 
Total params: 819
Trainable params: 819
Non-trainable params: 0
_________________________________________________________________


In [None]:
from rl.agents import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory

In [None]:
def build_agent(model, actions):
  policy = BoltzmannQPolicy()
  memory = SequentialMemory(limit=1000, window_length=1)
  dqn = DQNAgent(model=model, memory=memory, policy = policy, nb_actions = actions, nb_steps_warmup=10, target_model_update=1e-2, enable_double_dqn=False)
  return dqn

In [None]:
dqn = build_agent(model, actions)
dqn.compile(Adam(learning_rate=1e-3), metrics=['mae'])
dqn.fit(env, nb_steps=60000, visualize=False, verbose=1)

AttributeError: ignored

In [None]:
import tensorflow as tf
from keras.layers import Dense, Flatten
import gym
from rl.agents.dqn import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory

env = gym.make('CartPole-v0')
states = env.observation_space.shape[0]
actions = env.action_space.n
episodes = 10

def buildModel(statez, actiones):
    model = tf.keras.Sequential()
    model.add(Flatten(input_shape=(1, statez)))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(actiones, activation='linear'))
    return model

def buildAgent(modell, actionz):
    policy = BoltzmannQPolicy()
    memory = SequentialMemory(limit=50000, window_length=1)
    dqn = DQNAgent(model=modell, memory=memory, policy=policy,
                   nb_actions=actionz, nb_steps_warmup=10,
                   target_model_update=1e-2)
    return dqn

model = buildModel(states, actions)
DQN = buildAgent(model, actions)
DQN.compile(tf.keras.optimizers.Adam(learning_rate=1e-3), metrics=['mae'])
DQN.fit(env, nb_steps=50000, visualize=False, verbose=1)

  logger.warn(
  deprecation(
  deprecation(


AttributeError: ignored

In [None]:
arr = np.array([1, 2 ,3])

In [None]:
arr[0]

In [None]:
!pip install stable-baselines3

In [None]:
!pip install shimmy

In [None]:
import gymnasium as gym

from stable_baselines3 import DQN


model = DQN("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=100000, log_interval=4)
model.save("dqn_cartpole")

del model # remove to demonstrate saving and loading

model = DQN.load("dqn_cartpole")

obs, info = env.reset()
total_reward = 0
while True:
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = env.step(action)
    total_reward += reward
    if terminated or truncated:
        break

In [None]:
total_reward

In [None]:
obs, info = env.reset()
total_reward = 0
dec_var_over_time = []
while True:
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = env.step(action)
    dec_var_over_time.append(obs[3])
    total_reward += reward
    if terminated or truncated:
        break

In [None]:
total_reward

In [None]:
dec_var_over_time

In [None]:
import matplotlib.pyplot as plt
plt.plot(dec_var_over_time)