In [1]:
import ray
ray.init()

Usage stats collection is enabled by default for nightly wheels. To disable this, run the following command: `ray disable-usage-stats` before starting Ray. See https://docs.ray.io/en/master/cluster/usage-stats.html for more details.


2023-01-28 10:51:46,197	INFO worker.py:1555 -- Started a local Ray instance.


0,1
Python version:,3.7.15
Ray version:,3.0.0.dev0


See details of connections

In [20]:
ray.nodes()

[{'NodeID': '6af46194a6a491969ac45340d9f8dd32a43699de37cbca23d51ed560',
  'Alive': True,
  'NodeManagerAddress': '127.0.0.1',
  'NodeManagerHostname': 'cloudy',
  'NodeManagerPort': 52189,
  'ObjectManagerPort': 52187,
  'ObjectStoreSocketName': 'tcp://127.0.0.1:52636',
  'RayletSocketName': 'tcp://127.0.0.1:53222',
  'MetricsExportPort': 55843,
  'NodeName': '127.0.0.1',
  'alive': True,
  'Resources': {'object_store_memory': 379942502.0,
   'node:127.0.0.1': 1.0,
   'memory': 759885006.0,
   'CPU': 8.0}}]

[2m[36m(pid=)[0m [2023-01-26 21:30:52,045 C 17320 15044] (raylet.exe) util.cc:59:  Check failed: e.size() == ep.size() 
[2m[36m(pid=)[0m *** StackTrace Information ***
[2m[36m(pid=)[0m unknown
[2m[36m(pid=)[0m unknown
[2m[36m(pid=)[0m unknown
[2m[36m(pid=)[0m unknown
[2m[36m(pid=)[0m unknown
[2m[36m(pid=)[0m unknown
[2m[36m(pid=)[0m unknown
[2m[36m(pid=)[0m unknown
[2m[36m(pid=)[0m unknown
[2m[36m(pid=)[0m unknown
[2m[36m(pid=)[0m unknown
[2m[36m(pid=)[0m unknown
[2m[36m(pid=)[0m unknown
[2m[36m(pid=)[0m BaseThreadInitThunk
[2m[36m(pid=)[0m RtlUserThreadStart
[2m[36m(pid=)[0m 
	(2) raylet has lagging heartbeats due to slow network or busy workload.


In [2]:
#example of Ray API
import time
database = ["Learning", "Ray",
 "Flexible", "Distributed", "Python", "for", "Data", "Science"
]
def retrieve(item):
    time.sleep(item/10.)
    return item,database[item]
def print_runtime(input_data,start_time,decimals=1):
    print(f'Runtime:{time.time()-start_time:.{decimals}f} seconds,data:')
    print(*input_data,sep="\n")
start = time.time()
data = [retrieve(item) for item in range(8)]
print_runtime(data,start)

Runtime:2.8 seconds,data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Data')
(7, 'Science')


In [4]:
@ray.remote
def retrieve_task(item):
    return retrieve(item)

In [5]:
start = time.time()
data_references = [retrieve_task.remote(item) for item in range(8)]
data  = ray.get(data_references)
print_runtime(data,start,2)

Runtime:0.81 seconds,data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Data')
(7, 'Science')


In [7]:
database_object_ref = ray.put(database)
@ray.remote
def retrieve_task(item):
 obj_store_data = ray.get(database_object_ref) 
 time.sleep(item / 10.)
 return item, obj_store_data[item]

In [8]:
start = time.time()
data_references = [retrieve_task.remote(item) for item in range(8)]
data  = ray.get(data_references)
print_runtime(data,start,2)

Runtime:0.77 seconds,data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Data')
(7, 'Science')


building a RL maze

In [1]:
import random 

class Discrete:
    def __init__(self,num_actions:int):
        """Discrete action space for num_actions.
            Discreete(4) can be used as encoding moving in one of the 
            cardinal directions"""
        self.n=  num_actions
    def sample(self):
        return random.randint(0,self.n-1)

In [3]:
space = Discrete(4)
print(space.sample())

3


In [10]:
from IPython.display import clear_output
class Environment:
    seeker,goal = (0,0), (4,4)
    info = {'seeker':seeker,'goal':goal}
    def __init__(self,*args,**kwargs):
        self.action_space = Discrete(4)
        self.observation_space = Discrete(5*5)
    def reset(self):
        """reset seekers and goals position, return obs"""
        self.seeker = (0,0)
        self.goal = (4,4)
        return self.get_observation()
    def get_observation(self):
        """Encode the seeker position as integer"""
        return 5*self.seeker[0] +self.seeker[1]
    def get_reward(self):
        """reward finding the goal"""
        return 1 if self.seeker == self.goal else 0
    def is_done(self):
        """We're done if we found the goal"""
        return self.seeker == self.goal
    def step(self,action):
        """Take a step in a direction and return all available
        information"""
        if action == 0: #move down
            self.seeker = (min(self.seeker[0]+1,4),self.seeker[1])
        elif action == 1: # move left
            self.seeker = (self.seeker[0],max(self.seeker[1]-1,0))
        elif action == 2: # move up
            self.seeker = (max(self.seeker[0]-1,0),self.seeker[1])
        elif action == 3: # move right
            self.seeker = (self.seeker[0],min(self.seeker[1]+1,4))
        else:
            raise ValueError("invalid action")
        return self.get_observation(),self.get_reward(),self.is_done(),self.info
    def render(self, *args,**kwargs):
        """Render the environment, e.g by printing its representation"""
        clear_output()
        grid = [['| ' for _ in range(5)] + ["|\n"] for _ in range(5)]
        grid[self.goal[0]][self.goal[1]] = '|G'
        grid[self.seeker[0]][self.seeker[1]] = '|S' 
        print(''.join([''.join(grid_row) for grid_row in grid]))


In [12]:
import time 
environment = Environment()

while not environment.is_done():
    random_action = environment.action_space.sample()
    environment.step(random_action)
    time.sleep(0.8)
    environment.render()

| | | | | |
| | | | | |
| | | | | |
| | | | | |
| | | | |S|



In [18]:
import numpy as np
class Policy:
    def __init__(self,env):
        """A policy suggest actions based on the current state.
        We do this by tracking the value of each state-action pair"""
        self.state_action_table = [
            [ 0 for _ in range(env.action_space.n)] for _ in range(env.observation_space.n)
        ]
        self.action_space = env.action_space
    def get_action(self,state,explore=True,epsilon=0.1):
        """Explore randomly or exploit the best value currently available"""
        if explore and random.uniform(0,1) < epsilon:
            #explore 10% of the times
            return self.action_space.sample()
        return np.argmax(self.state_action_table[state])

In [21]:
class Simulation(object):
    def __init__(self,env):
        """simulates rollouts of an environment. given a policy
        to follow"""
        self.env = env
    def rollouts(self,policy,render=False,explore=True,epsilon=0.1):
        """Returns experiences for a policy rollout"""
        experiences = []
        state = self.env.reset()
        done = False
        while not done:
            action = policy.get_action(state,explore,epsilon)
            next_state ,reward, done, info = self.env.step(action)
            experiences.append([state,action,reward,next_state])
            state = next_state
            if render:
                time.sleep(0.8)
                self.env.render()
        return experiences

In [None]:
untrained_policy = Policy(environment)
sim = Simulation(environment)
exp = sim.rollouts(untrained_policy,render = True, epsilon =1.0)
for row in untrained_policy.state_action_table:
    print(row)

In [23]:
def update_policy( policy,experiences,weight=0.1,discount_factor=0.9):
    """Updates a given policy with a list of (state, action, reward, state)
    experiences."""
    for state,action,reward,next_state in experiences:
        next_max = np.max(policy.state_action_table[next_state])
        value = policy.state_action_table[state][action]
        new_value = (1-weight)*value+weight*(reward+discount_factor*next_max)
        policy.state_action_table[state][action] = new_value

In [26]:
def train_policy(env,num_episodes=10000,weight=0.1,discount_factor=0.9):
    """Training  policy by updating it with rollout experiences"""
    policy = Policy(env)
    sim =  Simulation(env)
    for _ in range(num_episodes):
        experiences = sim.rollouts(policy)
        update_policy(policy,experiences,weight,discount_factor)
    return policy

In [27]:
trained_policy = train_policy(environment)

In [28]:
def evaluate_policy(env, policy, num_episodes=10):
    """Evaluate a trained policy through rollouts"""
    simulation =  Simulation(env)
    steps = 0
    for _ in range(num_episodes):
        experiences = simulation.rollouts(policy,render=True,explore=False)
        steps+=len(experiences)
    print(f"{steps/num_episodes} steps on average"
        f"for a total of {num_episodes} episodes")
    return steps/num_episodes
evaluate_policy(environment,trained_policy)


| | | | | |
| | | | | |
| | | | | |
| | | | | |
| | | | |S|

8.0 steps on averagefor a total of 10 episodes


8.0

In [36]:
ray.shutdown()

In [37]:
import ray
ray.init()
environment = Environment()
env_ref = ray.put(environment)
@ray.remote
def create_policy():
    env = ray.get(env_ref)
    return Policy(env)
@ray.remote 
class SimulationActor(Simulation):
    def __init__(self):
        env = ray.get(env_ref)
        super().__init__(env)
@ray.remote
def update_policy_task(policy_ref, experiences_list):
    """Remote Ray task for updating a policy with experiences in parallel."""
    [update_policy(policy_ref, ray.get(xp)) for xp in experiences_list] 
    return policy_ref
def train_policy_parallel(num_episodes=1000, num_simulations=4):
    """Parallel policy training function."""
    policy = create_policy.remote() 
    simulations = [SimulationActor.remote() for _ in range(num_simulations)] 
    for _ in range(num_episodes):
        experiences = [sim.rollouts.remote(policy) for sim in simulations] 
        policy = update_policy_task.remote(policy, experiences) 
    return ray.get(policy)

Usage stats collection is enabled by default for nightly wheels. To disable this, run the following command: `ray disable-usage-stats` before starting Ray. See https://docs.ray.io/en/master/cluster/usage-stats.html for more details.


2023-01-28 14:34:39,186	INFO worker.py:1555 -- Started a local Ray instance.


In [38]:
parallel_policy = train_policy_parallel()
evaluate_policy(environment,parallel_policy)

| | | | | |
| | | | | |
| | | | | |
| | | | | |
| | | | |S|

8.0 steps on averagefor a total of 10 episodes


8.0