In [1]:
import os
import sys
sys.path.insert(0, os.path.abspath('/Users/denoyer/workspace/rlstructures'))

from rlstructures import Agent,DictTensor
import torch
import os
import sys
import gym
from gym.wrappers import TimeLimit
from rlstructures.env_wrappers import NDiscreteGymEnv
from rlstructures.batchers import EpisodeBatcher,Batcher

import gym
from gym.utils import seeding



In [2]:
# %% [markdown]
# We redefine the previous environment. 
# As a little change, we allow one to provide a dictionary as an argument to the reset function
# Each environment will be associated with an env_id specified by the user (for debugging for instance)
# %%
class MyEnv(gym.Env):
    def __init__(self):
        super().__init__()

    def seed(self,seed=None):
        self.np_random,seed=seeding.np_random(seed)

    def reset(self,env_info={"env_id":0}):
        assert "env_id" in env_info        
        self.env_id=env_info["env_id"]
        self.x=self.np_random.rand()*2.0-1.0
        self.identifier=self.np_random.rand()
        obs={"x":self.x,"identifier":self.identifier,"env_id":self.env_id}       
        return obs

    def step(self,action):
        if action==0:
            self.x-=0.3
        else:
            self.x+=0.3
        done = self.x<-1 or self.x>1
        
        obs={"x":self.x,"identifier":self.identifier,"env_id":self.env_id},self.x,done,{}        
        return obs

# %% [markdown]
# # Multithread Batchers
# 
# RLStructures provides 2 different Multithread batchers. The precise explanation about how multithread batchers are working will be given in an other example. 
 
# 1) The EpisodeBatcher allows one to sample complete episodes. 
# 2) The Batcher allows to sample n timesteps only

# Let us first illustrate the EpisodeBatcher.
# Agent:
# * For multithread batchers, the Agent has few differences; o) a buffer argument is added to the constructor ii) the ___call__ method has two additional arguments (slots, position_in_slots) which meaning will be given later. ii) the agent gives details about the information that it generates (specs) 

# Our specific agent is also taking as an input (throught agent_info) and agent_id to illustrate the use of agent_info
# %%

class UniformAgent(Agent):
    def __init__(self,buffer,n_actions):
        super().__init__(buffer=buffer)
        self.n_actions=n_actions
    
    def __call__(self,state,observation,agent_info, slots,position_in_slots):
        B=observation.n_elems()
        agent_state=None
        if state is None:
            agent_state=DictTensor({"timestep":torch.zeros(B).long()})
        else:
            agent_state=state

        scores=torch.randn(B,self.n_actions)
        probabilities=torch.softmax(scores,dim=1)
        actions=torch.distributions.Categorical(probabilities).sample()
        new_state=DictTensor({"timestep":agent_state["timestep"]+1})
        # We also decide to output the action probabilities
        return agent_state,DictTensor({"action":actions,"action_probabilities":probabilities,"agent_id":agent_info["agent_id"]}),new_state

    def specs_output(self):  
        # Used to setup the buffer
        specs={}      
        specs["action"] = {"size": torch.Size([]), "dtype": torch.int64}        
        specs["agent_id"] = {"size": torch.Size([]), "dtype": torch.int64}        
        specs["action_probabilities"] = {"size": torch.Size([self.n_actions]), "dtype": torch.float32}        
        return specs

    def specs_state(self):
        # Used to setup the buffer
        specs = {}
        specs["timestep"] = {"size": torch.Size([]), "dtype": torch.int64}
        return specs

# %% [markdown]
# Now, let us declare functions to create the agent and the environment

# %%

def create_env(seed=0,max_episode_steps=100):
    envs=[]
    for k in range(4):
        e=MyEnv()
        e=TimeLimit(e, max_episode_steps=max_episode_steps)
        envs.append(e)
    return NDiscreteGymEnv(envs,seed=seed)

def create_agent(buffer=None,n_actions=None):
    # Here, the buffer argument must be specified
    return UniformAgent(buffer,n_actions)



In [None]:
    import torch.multiprocessing as mp

    mp.set_start_method("spawn")    
    batcher=EpisodeBatcher(
            n_timesteps=100,
            n_slots=128,
            n_threads=4,
            seeds=[1,2,3,4],        
            create_agent=create_agent,
            agent_args={"n_actions":2},
            create_env=create_env,
            env_args={"max_episode_steps":100}
    )
    #We want to sample 32 episodes (Note that since our batcher has 4 threads with 4 envs per thread, we must sample 16,32 , 48, ... episodes)    
    print("Starting the acquisition process")
    n_episodes=32

    #Information to pass to the 32 agents, and 32 environments
    agent_info=DictTensor({"agent_id":torch.arange(32)})    
    env_info=DictTensor({"env_id":torch.arange(32)})    
    
    #Running the batcher. It is a non-blocking function that launch the acqusition
    batcher.execute(n_episodes=32,agent_info=agent_info,env_info=env_info)
    #Getting episodes -- not that get is a blocking function such that the process will wait until the end of the acquisition. 
    #The non-blocking variant can be used as get(blocking=False) returning None if the acquisition process is not finished
    trajectories=batcher.get()

    idx=5
    print("Lengths of trajectories = ",trajectories.lengths)
    print("For trajectory #"+str(idx))
    print("\th_t: ",trajectories["timestep"][idx])
    print("\tz_t: ",trajectories["_timestep"][idx])
    print("\tagent_id: ",trajectories["agent_id"][idx])
    print("\tenv_id: ",trajectories["env_id"][idx].squeeze(-1))
                                                                      
    print("\tReward received after each action ",trajectories["_reward"][idx])
    print("\tAction at each timestep ",trajectories["action"][idx])
    

[DEBUG] Creating buffer for 'timestep' of size (128, 100) and type torch.int64
[DEBUG] Creating buffer for 'action' of size (128, 100) and type torch.int64
[DEBUG] Creating buffer for 'agent_id' of size (128, 100) and type torch.int64
[DEBUG] Creating buffer for 'action_probabilities' of size (128, 100, 2) and type torch.float32
[DEBUG] Creating buffer for 'x' of size (128, 100, 1) and type torch.float32
[DEBUG] Creating buffer for 'identifier' of size (128, 100, 1) and type torch.float32
[DEBUG] Creating buffer for 'env_id' of size (128, 100, 1) and type torch.float32
[DEBUG] Creating buffer for 'reward' of size (128, 100) and type torch.float32
[DEBUG] Creating buffer for 'done' of size (128, 100) and type torch.bool
[DEBUG] Creating buffer for 'initial_state' of size (128, 100) and type torch.bool
[DEBUG] Creating buffer for 'last_action' of size (128, 100) and type torch.int64
[DEBUG] Creating buffer for '_timestep' of size (128, 100) and type torch.int64
[DEBUG] Creating buffer fo