In [3]:
import os
import numpy as np
from typing import Optional

from gymnasium import Env
from gymnasium.spaces import MultiDiscrete, Box, Discrete
from gymnasium.error import DependencyNotInstalled

from randomcolors import colors_array

from stable_baselines3 import PPO, A2C
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback, StopTrainingOnRewardThreshold
from sb3_contrib import RecurrentPPO

In [167]:
class GameEnv(Env):
    metadata = {
        "render_modes": ["human", "rgb_array"],
        "render_fps": 10,
    }
    
    def __init__(self, num_cubes=3):
        self.num_cubes = num_cubes
        
        # Define the colors for each face of the cube
        self.color_map = ['red', 'green', 'blue', 'white', 'yellow', 'pink']
        # Create a color-to-index mapping
        self.color_to_index = {color: i for i, color in enumerate(self.color_map)}   
        
        # Define the action space
        self.action_space = MultiDiscrete(np.array([2] * len(self.color_map), dtype=np.uint8))
        # Define the Box observation space
        self.observation_space = Box(
            low=np.zeros(self.num_cubes),
            high= np.full(self.num_cubes, len(self.color_map)), dtype=np.uint8)
        
        self.shuffled_color = self.color_shuffle()
        
        self.player_credit = 10
        self.step_count = 100
        
    def step(self, action):
        
        self.step_count -= 1
        
        assert self.action_space.contains(action)
        # Get selected colors
        selected_color = [self.color_map[i] for i, selected in enumerate(action) if selected]
        selected_color_to_index = np.array([self.color_to_index[color] for color in selected_color]).astype(np.uint8)

        # Re-Roll cubes every step
        self.cubes = self.roll_cube(self.shuffled_color)
        
        # Return the count of color that match to selected and observation space
        cubes = np.array([self.color_to_index[color] for color in self.cubes]).astype(np.uint8)
        counted_occurences = {idx: np.count_nonzero(cubes == idx) for idx in selected_color_to_index}

        win = sum(counted_occurences.values())
        loss = -len(set(selected_color).difference(self.cubes))
        
        total = win + loss

        self.player_credit += total

        if total == 3 and win == 3 and self.player_credit > 10:
            reward = 1.5
        elif total >= 1 and not self.player_credit < 0:
            reward = 1.0
        elif total == 0:
            reward = 0.0
        else:
            reward = -1.0

        # every 100 step, game will reset
        if self.step_count == 0 or self.player_credit == 0:
            done = True
        else:
            done = False
            
        info = {
            'selected_color': selected_color,
            'occurences_count': counted_occurences, 
            'win_count': win, 
            'loss_count': loss, 
            'total_count': total,
            'credit_count': self.player_credit
        }
        return self._get_obs(), reward, done, False, info

    def _get_obs(self):
        return np.array([self.color_to_index[color] for color in self.cubes]).astype(np.uint8)

    def color_shuffle(self):
        return np.array(colors_array[self.np_random.choice(719)])
        
    def roll_cube(self, shuffled_colors):
        return np.array([
            self.np_random.choice(shuffled_colors), 
            self.np_random.choice(shuffled_colors), 
            self.np_random.choice(shuffled_colors)
        ])
        
    def reset(
        self,
        seed: Optional[int] = None,
        options: Optional[dict] = None,
    ):
        super().reset(seed=seed)
        self.player_credit = 10
        self.step_count = 100
        self.shuffled_color = self.color_shuffle()
        self.cubes = self.roll_cube(self.shuffled_color)
        
        return self._get_obs(), {}
        

In [172]:
env = GameEnv()
env.reset(seed=42)

(array([0, 5, 3], dtype=uint8), {})

In [224]:
timesteps=1500000

In [225]:
log_path = os.path.join("training", "logs")
model_path = os.path.join("training", "models/{}{}".format("PPO_", timesteps))

In [226]:
stop_callback = StopTrainingOnRewardThreshold(reward_threshold=500, verbose=1)
eval_callback = EvalCallback(env,
                             callback_on_new_best=stop_callback,
                             eval_freq=1000,
                             best_model_save_path=model_path,
                             verbose=1
                            )


In [219]:
# model = A2C('MlpPolicy', env, verbose=1, tensorboard_log=log_path)

In [193]:
# model = RecurrentPPO('MlpLstmPolicy', env, verbose=1, tensorboard_log=log_path)

In [227]:
model = PPO('MlpPolicy', env, verbose=1, tensorboard_log=log_path)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [228]:
model.learn(total_timesteps=timesteps)

Logging to training/logs/PPO_3
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 50       |
|    ep_rew_mean     | -8.11    |
| time/              |          |
|    fps             | 1937     |
|    iterations      | 1        |
|    time_elapsed    | 1        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 49.2        |
|    ep_rew_mean          | -9.18       |
| time/                   |             |
|    fps                  | 1602        |
|    iterations           | 2           |
|    time_elapsed         | 2           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.009448185 |
|    clip_fraction        | 0.0742      |
|    clip_range           | 0.2         |
|    entropy_loss         | -4.15       |
|    explained_variance   | -0.00342    |

<stable_baselines3.ppo.ppo.PPO at 0x2a1c57d00>

In [234]:
obs, _= env.reset(seed=42)
total_reward = 0
total_credit = 0
loss_count = 0 
for episode in range(99):
    # action = env.action_space.sample()
    action, _ = model.predict(obs)
    obs, reward, done, _, info = env.step(action)

    # print("Game roll : ", obs)
    # print("Occurrences count: {}".format(info['occurences_count']))
    # print("Selected color : {} \n".format(info['selected_color']))
    # print('Win :', info['win_count'])
    # print('Loss :', info['loss_count'])
    # print('Total : {}'.format(info['total_count']))
    # print('Credit: {} \n'.format(info['credit_count']))
    print('Episode {} : Reward {} \n'.format(episode, reward))
    # total_win += info['total_count']
        
    total_reward += reward
    if done:
        print("DONE HEEREE!! \n\n")
        total_credit += info['credit_count']
        loss_count += 1
        env.reset()
        
print('Loss count: {}'.format(loss_count))
# print('Total wins: {} \n'.format(total_win))
print('Total credit: {} \n'.format(total_credit))
print('Reward {} \n'.format(total_reward))

Episode 0 : Reward -1.0 

Episode 1 : Reward 1.5 

Episode 2 : Reward 0.0 

Episode 3 : Reward 1.0 

Episode 4 : Reward 1.0 

Episode 5 : Reward -1.0 

Episode 6 : Reward 1.0 

Episode 7 : Reward -1.0 

Episode 8 : Reward -1.0 

Episode 9 : Reward 0.0 

Episode 10 : Reward -1.0 

Episode 11 : Reward 1.0 

Episode 12 : Reward -1.0 

Episode 13 : Reward 1.0 

Episode 14 : Reward 0.0 

Episode 15 : Reward 1.0 

Episode 16 : Reward -1.0 

Episode 17 : Reward -1.0 

Episode 18 : Reward 1.0 

Episode 19 : Reward -1.0 

Episode 20 : Reward 0.0 

Episode 21 : Reward -1.0 

Episode 22 : Reward 1.0 

Episode 23 : Reward 0.0 

Episode 24 : Reward -1.0 

Episode 25 : Reward 0.0 

Episode 26 : Reward 1.0 

Episode 27 : Reward 0.0 

Episode 28 : Reward -1.0 

Episode 29 : Reward -1.0 

Episode 30 : Reward -1.0 

Episode 31 : Reward 0.0 

Episode 32 : Reward 1.0 

Episode 33 : Reward 1.0 

Episode 34 : Reward -1.0 

Episode 35 : Reward 1.0 

Episode 36 : Reward -1.0 

Episode 37 : Reward 0.0 

Episod

In [235]:
'

SyntaxError: EOL while scanning string literal (544193499.py, line 1)