# Imports and Setup

In [1]:
import ray
import time

print('Successfully imported ray!')

Successfully imported ray!


In [2]:
ray.init(ignore_reinit_error=True)

# Sleep a little to improve the accuracy of the timing measurements used below,
# because some workers may still be starting up in the background.
time.sleep(2.0)

2019-12-20 15:42:54,297	INFO resource_spec.py:216 -- Starting Ray with 1.51 GiB memory available for workers and up to 0.78 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).


# 1. Simple Parallelization
## Slow Function

In [3]:
# A function simulating a more interesting computation that takes one second.
def slow_function(i):
    time.sleep(1)
    return i

start_time = time.time()

results = []
for i in range(4):
  results.append(slow_function(i))
  
duration = time.time() - start_time
print('Executing the for loop took {:.3f} seconds.'.format(duration))
print('The results are:', results)
print('Run the next cell to check if the exercise was performed correctly.')

Executing the for loop took 4.004 seconds.
The results are: [0, 1, 2, 3]
Run the next cell to check if the exercise was performed correctly.


## Faster Version by executing Parallel
- `@ray.remote` - Decorator for Parallel Function
- `remote_function.get()` - Function Call that returns an ObjectID
- `ray.get(ObjectID)` - Extract Data from the ObjectID

In [4]:
# A function simulating a more interesting computation that takes one second.
@ray.remote
def slow_function(i):
    time.sleep(1)
    return i

start_time = time.time()

results = []
for i in range(4):
  results.append(slow_function.remote(i))

results = ray.get(results)
duration = time.time() - start_time
print('Executing the for loop took {:.3f} seconds.'.format(duration))
print('The results are:', results)
print('Run the next cell to check if the exercise was performed correctly.')

Executing the for loop took 1.023 seconds.
The results are: [0, 1, 2, 3]
Run the next cell to check if the exercise was performed correctly.


In [5]:
assert results == [0, 1, 2, 3], 'Did you remember to call ray.get?'
assert duration < 1.1, ('The loop took {:.3f} seconds. This is too slow.'
                        .format(duration))
assert duration > 1, ('The loop took {:.3f} seconds. This is too fast.'
                      .format(duration))

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 1.022601842880249 seconds.


# 2. Parallel Data Processing with Task Dependencies
An ObjectID Object can be passed to a remote function as an argument without converting to a regular Object using
`ray.get()`

In [8]:
import numpy as np
@ray.remote
def load_data(filename):
    time.sleep(0.1)
    return np.ones((1000, 100))

@ray.remote
def normalize_data(data):
    time.sleep(0.1)
    return data - np.mean(data, axis=0)

@ray.remote
def extract_features(normalized_data):
    time.sleep(0.1)
    return np.hstack([normalized_data, normalized_data ** 2])

@ray.remote
def compute_loss(features):
    num_data, dim = features.shape
    time.sleep(0.1)
    return np.sum((np.dot(features, np.ones(dim)) - np.ones(num_data)) ** 2)

assert hasattr(load_data, 'remote'), 'load_data must be a remote function'
assert hasattr(normalize_data, 'remote'), 'normalize_data must be a remote function'
assert hasattr(extract_features, 'remote'), 'extract_features must be a remote function'
assert hasattr(compute_loss, 'remote'), 'compute_loss must be a remote function'

In [9]:
start_time = time.time()
losses = []
for filename in ['file1', 'file2', 'file3', 'file4']:
    inner_start = time.time()

    data = load_data.remote(filename)
    normalized_data = normalize_data.remote(data)
    features = extract_features.remote(normalized_data)
    loss = compute_loss.remote(features)
    losses.append(loss)
        
    inner_end = time.time()
    
    if inner_end - inner_start >= 0.1:
        raise Exception('You may be calling ray.get inside of the for loop! '
                        'Doing this will prevent parallelism from being exposed. '
                        'Make sure to only call ray.get once outside of the for loop.')
losses = ray.get(losses)
print('The losses are {}.'.format(losses) + '\n')
loss = sum(losses)

duration = time.time() - start_time

print('The loss is {}. This took {:.3f} seconds. Run the next cell to see '
      'if the exercise was done correctly.'.format(loss, duration))

The losses are [1000.0, 1000.0, 1000.0, 1000.0].

The loss is 4000.0. This took 0.467 seconds. Run the next cell to see if the exercise was done correctly.


In [10]:
assert loss == 4000
assert duration < 0.8, ('The loop took {:.3f} seconds. This is too slow.'
                        .format(duration))
assert duration > 0.4, ('The loop took {:.3f} seconds. This is too fast.'
                        .format(duration))

print('Success! The example took {:.3f} seconds.'.format(duration))

Success! The example took 0.467 seconds.


3. Nested Parallelism

In [11]:
ray.init(num_cpus=9, ignore_reinit_error=True)

# Sleep a little to improve the accuracy of the timing measurements used below,
# because some workers may still be starting up in the background.
time.sleep(2.0)

2019-12-20 16:11:38,701	ERROR worker.py:679 -- Calling ray.init() again after it has already been called.


In [12]:
@ray.remote
def compute_gradient(data, current_model):
    time.sleep(0.03)
    return 1

@ray.remote
def train_model(hyperparameters):
    current_model = 0
    # Iteratively improve the current model. This outer loop cannot be parallelized.
    for _ in range(10):
        # EXERCISE: Parallelize the list comprehension in the line below. After you
        # turn "compute_gradient" into a remote function, you will need to call it
        # with ".remote". The results must be retrieved with "ray.get" before "sum"
        # is called.
        total_gradient = sum(ray.get([compute_gradient.remote(j, current_model) for j in range(2)]))
        current_model += total_gradient

    return current_model

assert hasattr(compute_gradient, 'remote'), 'compute_gradient must be a remote function'
assert hasattr(train_model, 'remote'), 'train_model must be a remote function'

In [13]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

# Run some hyperparaameter experiments.
results = []
for hyperparameters in [{'learning_rate': 1e-1, 'batch_size': 100},
                        {'learning_rate': 1e-2, 'batch_size': 100},
                        {'learning_rate': 1e-3, 'batch_size': 100}]:
    results.append(train_model.remote(hyperparameters))

results = ray.get(results)
# EXERCISE: Once you've turned "results" into a list of Ray ObjectIDs
# by calling train_model.remote, you will need to turn "results" back
# into a list of integers, e.g., by doing "results = ray.get(results)".

end_time = time.time()
duration = end_time - start_time

assert all([isinstance(x, int) for x in results]), \
    'Looks like "results" is {}. You may have forgotten to call ray.get.'.format(results)

In [14]:
assert results == [20, 20, 20]
assert duration < 0.5, ('The experiments ran in {:.3f} seconds. This is too '
                         'slow.'.format(duration))
assert duration > 0.3, ('The experiments ran in {:.3f} seconds. This is too '
                        'fast.'.format(duration))

print('Success! The example took {:.3f} seconds.'.format(duration))


Success! The example took 0.416 seconds.
