In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import ray
import time
import numpy as np
import pickle



In [2]:
ray.init(num_cpus=4, include_webui=False, ignore_reinit_error=True, redis_max_memory=1000000000, object_store_memory=10000000000)


2019-04-15 00:19:17,983	INFO node.py:423 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-04-15_00-19-17_9021/logs.
2019-04-15 00:19:18,096	INFO services.py:363 -- Waiting for redis server at 127.0.0.1:64023 to respond...
2019-04-15 00:19:18,231	INFO services.py:363 -- Waiting for redis server at 127.0.0.1:36089 to respond...
2019-04-15 00:19:18,236	INFO services.py:760 -- Starting Redis shard with 1.0 GB max memory.
2019-04-15 00:19:18,278	INFO services.py:1384 -- Starting the Plasma object store with 10.0 GB memory using /dev/shm.


{'node_ip_address': None,
 'redis_address': '10.9.1.17:64023',
 'object_store_address': '/tmp/ray/session_2019-04-15_00-19-17_9021/sockets/plasma_store',
 'webui_url': None,
 'raylet_socket_name': '/tmp/ray/session_2019-04-15_00-19-17_9021/sockets/raylet'}

In [3]:
@ray.remote
def slow_function(i):
    time.sleep(1)
    return i

In [4]:
time.sleep(10.0)
start_time = time.time()

results = [slow_function.remote(i) for i in range(4)]
results = ray.get(results)
end_time = time.time()
duration = end_time - start_time

print('The results are {}. This took {} seconds. Run the next cell to see '
      'if the exercise was done correctly.'.format(results, duration))


The results are [0, 1, 2, 3]. This took 1.006211280822754 seconds. Run the next cell to see if the exercise was done correctly.


In [5]:
assert results == [0, 1, 2, 3], 'Did you remember to call ray.get?'
assert duration < 1.1, ('The loop took {} seconds. This is too slow.'
                        .format(duration))
assert duration > 1, ('The loop took {} seconds. This is too fast.'
                      .format(duration))

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 1.006211280822754 seconds.


In [6]:
@ray.remote
def load_data(filename):
    time.sleep(0.1)
    return np.ones((1000, 100))

In [7]:
@ray.remote
def normalize_data(data):
    time.sleep(0.1)
    return data - np.mean(data, axis=0)

In [8]:
@ray.remote
def extract_features(normalized_data):
    time.sleep(0.1)
    return np.hstack([normalized_data, normalized_data ** 2])


In [9]:
@ray.remote
def compute_loss(features):
    num_data, dim = features.shape
    time.sleep(0.1)
    return np.sum((np.dot(features, np.ones(dim)) - np.ones(num_data)) ** 2)

In [10]:

assert hasattr(load_data, 'remote'), 'load_data must be a remote function'
assert hasattr(normalize_data, 'remote'), 'normalize_data must be a remote function'
assert hasattr(extract_features, 'remote'), 'extract_features must be a remote function'
assert hasattr(compute_loss, 'remote'), 'compute_loss must be a remote function'

In [11]:
time.sleep(2.0)
start_time = time.time()

In [12]:
losses = []
for filename in ['file1', 'file2', 'file3', 'file4']:
    inner_start = time.time()

    data = load_data.remote(filename)
    normalized_data = normalize_data.remote(data)
    features = extract_features.remote(normalized_data)
    loss = compute_loss.remote(features)
    losses.append(loss)
    
    inner_end = time.time()
    
    if inner_end - inner_start >= 0.1:
        raise Exception('You may be calling ray.get inside of the for loop! '
                        'Doing this will prevent parallelism from being exposed. '
                        'Make sure to only call ray.get once outside of the for loop.')

In [13]:
losses = ray.get(losses)
print('The losses are {}.'.format(losses) + '\n')
loss = sum(losses)

end_time = time.time()
duration = end_time - start_time

print('The loss is {}. This took {} seconds. Run the next cell to see '
      'if the exercise was done correctly.'.format(loss, duration))

The losses are [1000.0, 1000.0, 1000.0, 1000.0].

The loss is 4000.0. This took 0.5383255481719971 seconds. Run the next cell to see if the exercise was done correctly.


In [14]:
assert loss == 4000
assert duration < 0.8, ('The loop took {} seconds. This is too slow.'
                        .format(duration))
assert duration > 0.4, ('The loop took {} seconds. This is too fast.'
                        .format(duration))

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 0.5383255481719971 seconds.


In [15]:
@ray.remote
class Foo(object):
    def __init__(self):
        self.counter = 0

    def reset(self):
        self.counter = 0

    def increment(self):
        time.sleep(0.5)
        self.counter += 1
        return self.counter

In [16]:
assert hasattr(Foo, 'remote'), 'You need to turn "Foo" into an actor with @ray.remote.'


In [17]:
f1 = Foo.remote()
f2 = Foo.remote()

In [18]:
time.sleep(2.0)
start_time = time.time()

In [19]:
f1.reset.remote()
f2.reset.remote()

ObjectID(01000000e27d741a8030458ed0a9f5a4aea7daf7)

In [20]:
results = []
for _ in range(5):
    results.append(f1.increment.remote())
    results.append(f2.increment.remote())
results = ray.get(results)
end_time = time.time()
duration = end_time - start_time


In [21]:
assert not any([isinstance(result, ray.ObjectID) for result in results]), 'Looks like "results" is {}. You may have forgotten to call ray.get.'.format(results)

In [22]:
assert results == [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]

assert duration < 3, ('The experiments ran in {} seconds. This is too '
                      'slow.'.format(duration))
assert duration > 2.5, ('The experiments ran in {} seconds. This is too '
                        'fast.'.format(duration))

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 2.5290303230285645 seconds.


In [23]:
@ray.remote
def f(i):
    np.random.seed(5 + i)
    x = np.random.uniform(0, 4)
    time.sleep(x)
    return i, time.time()


In [24]:
time.sleep(2.0)
start_time = time.time()
result_ids = [f.remote(i) for i in range(6)]
initial_results,remaining_results = ray.wait(result_ids,num_returns=3,timeout=None)
end_time = time.time()
duration = end_time - start_time
initial_results = ray.get(initial_results)
remaining_results = ray.get(remaining_results)
assert len(initial_results) == 3
assert len(remaining_results) == 3

In [26]:
initial_indices = [result[0] for result in initial_results]
initial_times = [result[1] for result in initial_results]
remaining_indices = [result[0] for result in remaining_results]
remaining_times = [result[1] for result in remaining_results]

assert set(initial_indices + remaining_indices) == set(range(6))

assert duration < 1.5, ('The initial batch of ten tasks was retrieved in '
                        '{} seconds. This is too slow.'.format(duration))

assert duration > 0.8, ('The initial batch of ten tasks was retrieved in '
                        '{} seconds. This is too slow.'.format(duration))

# Make sure the initial results actually completed first.
assert max(initial_times) < min(remaining_times)

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 1.1988871097564697 seconds.


In [27]:
neural_net_weights = {'variable{}'.format(i): np.random.normal(size=1000000)
                      for i in range(50)}

In [28]:
print('Ray - serializing')
start = time.time()
x_id = ray.put(neural_net_weights)
print('time: ', time.time() - start)
print('\nRay - deserializing')
start = time.time()
x_val = ray.get(x_id)
print('time: ', time.time() - start)

print('\npickle - serializing')
start = time.time()
serialized = pickle.dumps(neural_net_weights)
print('time: ', time.time() - start)
print('\npickle - deserializing')
start = time.time()
deserialized = pickle.loads(serialized)
print('time: ', time.time() - start)

Ray - serializing
time:  0.1253950595855713

Ray - deserializing
time:  0.000698089599609375

pickle - serializing
time:  0.37827324867248535

pickle - deserializing
time:  0.17782354354858398


In [29]:
@ray.remote
def use_weights(weights, i):
    return i

In [30]:
time.sleep(2.0)
start_time = time.time()

neural_net_weights_id = ray.put(neural_net_weights)
results = ray.get([use_weights.remote(neural_net_weights_id, i)
                   for i in range(20)])

end_time = time.time()
duration = end_time - start_time

In [31]:
assert results == list(range(20))
assert duration < 1, ('The experiments ran in {} seconds. This is too '
                      'slow.'.format(duration))

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 0.11236715316772461 seconds.
