In [4]:
import stable_baselines3
import gym
import numpy as np
import itertools

# Environement
import gym
from gym import spaces

# Evaluate the environement
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.results_plotter import load_results, ts2xy, plot_results
from stable_baselines3.common.monitor import Monitor

# Agent
from stable_baselines3 import A2C,PPO

# Policy
from stable_baselines3.ppo import MlpPolicy, CnnPolicy
from stable_baselines3.common.policies import ActorCriticPolicy

import matplotlib.pyplot as plt
import os

In [5]:
log_dir = "bf/ppo/mlp/"
os.makedirs(log_dir, exist_ok=False)

FileExistsError: [Errno 17] File exists: 'bf/ppo/mlp/'

# Environement

## Definition

In [53]:
class CustomEnv(gym.Env):
    """Custom Environment that follows gym interface"""
    metadata = {'render.modes': ['human']}

    def __init__(self,dimension,tuiles,quotas):
        super().__init__()
        assert quotas.shape[0]==tuiles.shape[0]
        self.dimension = dimension
        self.tuiles = tuiles
        self.quotas = quotas

        self._state = np.zeros((self.dimension,),dtype=np.int8)

        # /!\ => action = especes
        self.action_space = spaces.Discrete(self.tuiles.shape[0])

        # ETAPE 1 : OBSERVATION = REMPLISSAGE (ON RAJOUTE UNE CASE POUR LES CASES VIDES)
        self.observation_space = spaces.Box(low=0,
                                            high=np.inf,
                                            shape=(self.tuiles.shape[0]+1,),
                                            dtype=np.float32)

        self._episode_ended = False
        self._next_position = self.next_position()
        self._taux = self.taux_remplissage()
    
    
    def next_position(self):
        """
        Determine la prochaine position vide
        :return: None si la grille est full
        """
        pos = np.unravel_index(np.argmin(self._state),self._state.shape)
        if self._state[pos]!=0:
            return None
        return pos
    
    
    def taux_remplissage(self):
        """
        Détermine le taux de remplissage des quotas
        :return: [] + taux_pour les espèces
        """
        taux = np.full((len(self.tuiles)+1,),0.0,dtype=np.float32) # Taux a une case de plus que quota ou tuiles
        ind,c = np.unique(self._state,return_counts=True)
        taux[ind] = c  # Ind sont bien compris entre 1 et n_tuiles, car lues dans state
        mask = self.quotas!=0.0 # Pour les quotas imposés, ie différent de zeros
        mask_tx= np.insert(mask,0,False) # Taux a une case de plus que quota. On ajout False à la fin
        taux[mask_tx] = taux[mask_tx]/self.quotas[mask]
        taux[~(mask_tx)] = 0.0

        return taux
    
    
    def step(self, action):
        """
        Remplit la action_ième case
        Termine si action déjà remplie
        """
        if self._episode_ended:
            # The last action ended the episode. Ignore the current action and start
            # a new episode.
            return self.reset()

        # ACTION ENTRE 0 et N_Tuile-1
        # DANS LE STATE LES ESPECES SONT REPERERES PAR i_TUILE + 1 car 0 Pour case vides
        
        info = {}
        espece_vue = action + 1

        self._episode_ended = False
        reward = 0

        self._state[self._next_position] = action # POSE DU LEGUME
        new_taux = self.taux_remplissage() # NOUVEAU TAUX, LES ANCIENS SONT ENCORE DANS self._taux
        self._next_position = self.next_position()


        if self._taux[espece_vue]<1.0 and self.quotas[action]>0.0:
            # ON AUGMENTE UN QUOTA A REMPLIR
            reward = 0.1
            if new_taux[espece_vue]>1.0:
                # ON LE REMPLI COMPLETEMENT
                reward = 0.2
        else:
            reward = 0 # INCITATION A REMPLIR DES TAUX PROCHES DE PLEINS
        if self._next_position is None:
            reward = 0.5
            self._episode_ended = True

        self._taux = new_taux # ON ENREGISTRE LES NOUVEAUX TAUX
            
        return self.to_observation(), reward, self._episode_ended, info
    
    
    def reset(self):
        self._state = np.random.randint(1,len(self.tuiles)+1,(self.dimension,),np.int8)

        nb_zeros = np.random.randint(0,self.dimension)
        zero_indices = np.random.randint(0,self.dimension,(nb_zeros,))
        self._state[zero_indices] = 0

        self._next_position = self.next_position()
        if self._next_position is None:
            return self.reset()

        self._taux = self.taux_remplissage()
        self._episode_ended = False
        
        return self.to_observation()
    
    
    def to_observation(self):
        return self._taux.copy()
    
    
    def render(self,mode="human"):
        grill = self._state.reshape((int(self.dimension**0.5),int(self.dimension**0.5)))
        img = np.full((grill.shape[0]*16,grill.shape[1]*256),255)
        for r,c in itertools.product(range(grill.shape[0]),range(grill.shape[1])):
            img[r*16:r*16+16,c*16:c*16+16] = 255-grill[r,c]*255

        return img.astype('uint8')
        


## Instanciation

In [54]:
dimension = 25
tuiles = np.array([1,2,3,4,5,6,7,8,9,10])
quotas = np.array([0,3,4,0,1,0,3,4,0,2])

In [55]:
env = CustomEnv(dimension,tuiles,quotas)
eval_env = CustomEnv(dimension,tuiles,quotas) # Use a separate environement for evaluation
env = Monitor(env, log_dir)

# Agent

In [56]:
algo_with_policy = PPO("MlpPolicy", env, verbose=2)

Using cpu device
Wrapping the env in a DummyVecEnv.


## Evaluate initial Agent

In [61]:
# Random Agent, before training
mean_reward, std_reward = evaluate_policy(algo_with_policy, eval_env, n_eval_episodes=10)
print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")

KeyboardInterrupt: 

## Train the agent

In [None]:
class MyCallBack(BaseCallback):
    def __init__(self,log_dir,steps_to_print,verbose=0):
        super().__init__()
        self.log_dir = log_dir
        self.steps_to_print = steps_to_print
        
    def _on_step(self) -> bool:
        if self.num_timesteps % self.steps_to_print == 0:
            plt.imshow(self.locals["obs_tensor"].numpy()[0,:,:])
            plt.show()


In [24]:
print_steps = 1
n_steps = 20_000

# Train the agent for 10000 steps
callback = MyCallBack(log_dir,print_steps)
algo_with_policy.learn(total_timesteps=n_steps,callback=callback)

NameError: name 'MyCallBack' is not defined

## Evaluate agent 

In [27]:
# Evaluate the trained agent
mean_reward, std_reward = evaluate_policy(algo_with_policy, env, n_eval_episodes=1000)

print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")

mean_reward:-99.00 +/- 0.00
