Skip to content

Commit

Permalink
Optimize node execution order (#238)
Browse files Browse the repository at this point in the history
* Further optimizations of going through a large pool of simulations

* Linting and CHANGELOG
  • Loading branch information
Jarno Lintusaari committed Sep 8, 2017
1 parent 832046a commit 23f7a6d
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 37 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Changelog
=========

dev
---

- Furhther performance improvements for rerunning inference using stored data via caches

0.6.2 (2017-09-06)
------------------

Expand Down
3 changes: 3 additions & 0 deletions elfi/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,4 +326,7 @@ def load_data(cls, compiled_net, context, batch_index):
loaded_net = RandomStateLoader.load(context, loaded_net, batch_index)
loaded_net = PoolLoader.load(context, loaded_net, batch_index)

# Add cache from the contect
loaded_net.graph['_executor_cache'] = context.caches['executor']

return loaded_net
57 changes: 36 additions & 21 deletions elfi/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,27 +99,42 @@ def get_execution_order(cls, G):
nodes that require execution
"""
nodes = set()
order = nx_constant_topological_sort(G)
dep_graph = nx.DiGraph(G.edges())

for node in order:
attr = G.node[node]
if attr.keys() >= {'operation', 'output'}:
raise ValueError('Generative graph has both op and output present')

# Remove nodes from dependency graph whose outputs are present
if 'output' in attr:
dep_graph.remove_node(node)
elif 'operation' not in attr:
raise ValueError('Generative graph has no op or output present')

for output_node in G.graph['outputs']:
if dep_graph.has_node(output_node):
nodes.update([output_node])
nodes.update(nx.ancestors(dep_graph, output_node))

return [n for n in order if n in nodes]
# Get the cache dict if it exists
cache = G.graph.get('_executor_cache', {})

output_nodes = G.graph['outputs']
# Filter those output nodes who have an operation to run
needed = tuple(sorted(node for node in output_nodes if 'operation' in G.node[node]))

if needed not in cache:
# Resolve the nodes that need to be executed in the graph
nodes_to_execute = set(needed)

if 'sort_order' not in cache:
cache['sort_order'] = nx_constant_topological_sort(G)
sort_order = cache['sort_order']

# Resolve the dependencies of needed
dep_graph = nx.DiGraph(G.edges())
for node in sort_order:
attr = G.node[node]
if attr.keys() >= {'operation', 'output'}:
raise ValueError('Generative graph has both op and output present')

# Remove those nodes from the dependency graph whose outputs are present
if 'output' in attr:
dep_graph.remove_node(node)
elif 'operation' not in attr:
raise ValueError('Generative graph has no op or output present')

# Add the dependencies of the needed nodes
for needed_node in needed:
nodes_to_execute.update(nx.ancestors(dep_graph, needed_node))

# Turn in to a sorted list and cache
cache[needed] = [n for n in sort_order if n in nodes_to_execute]

return cache[needed]

@staticmethod
def _run(fn, node, G):
Expand Down
5 changes: 2 additions & 3 deletions elfi/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,8 @@ def load(cls, context, compiled_net, batch_index):
elif isinstance(seed, (int, np.int32, np.uint32)):
# TODO: In the future, we could use https://pypi.python.org/pypi/randomstate to enable
# jumps?
sub_seed, context.sub_seed_cache = get_sub_seed(seed,
batch_index,
cache=context.sub_seed_cache)
cache = context.caches.get('sub_seed', None)
sub_seed = get_sub_seed(seed, batch_index, cache=cache)
random_state = np.random.RandomState(sub_seed)
else:
raise ValueError("Seed of type {} is not supported".format(seed))
Expand Down
6 changes: 4 additions & 2 deletions elfi/model/elfi_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def random_name(length=4, prefix=''):
return prefix + str(uuid.uuid4().hex[0:length])


# TODO: move to another file
# TODO: move to another file?
class ComputationContext:
"""Container object for key components for consistent computation results.
Expand Down Expand Up @@ -167,9 +167,11 @@ def __init__(self, batch_size=None, seed=None, pool=None):

self._batch_size = batch_size or 1
self._seed = random_seed() if seed is None else seed
self.sub_seed_cache = {}
self._pool = pool

# Caches will not be used if they are not found from the caches dict
self.caches = {'executor': {}, 'sub_seed': {}}

# Count the number of submissions from this context
self.num_submissions = 0

Expand Down
3 changes: 2 additions & 1 deletion elfi/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,13 +692,14 @@ def append(self, array):

@property
def memmap(self):
"""Return a NumPy memory map to the array data."""
if not self.initialized:
raise IndexError("NpyArray is not initialized")

if self._memmap is None:
order = 'F' if self.fortran_order else 'C'
self._memmap = np.memmap(self.fs, dtype=self.dtype, shape=self.shape,
offset=self.header_length, order=order)
offset=self.header_length, order=order)
return self._memmap

def _init_from_file_header(self):
Expand Down
16 changes: 7 additions & 9 deletions elfi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,12 @@ def get_sub_seed(seed, sub_seed_index, high=2**31, cache=None):
high : int
upper limit for the range of sub seeds (exclusive)
cache : dict or None, optional
If provided, cached state will be used to compute the next sub_seed.
If provided, cached state will be used to compute the next sub_seed and then updated.
Returns
-------
int or tuple
The seed will be from the interval [0, high - 1]. If cache is provided, will also return
the updated cache.
int
The seed will be from the interval [0, high - 1].
Notes
-----
Expand Down Expand Up @@ -121,9 +120,8 @@ def get_sub_seed(seed, sub_seed_index, high=2**31, cache=None):
seen.update(sub_seeds)
n_unique = len(seen)

sub_seed = sub_seeds[-1]
if cache is not None:
cache = {'random_state': random_state, 'seen': seen}
return sub_seed, cache
else:
return sub_seed
cache['random_state'] = random_state
cache['seen'] = seen

return sub_seeds[-1]
2 changes: 1 addition & 1 deletion tests/functional/test_randomness.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_get_sub_seed():
cache = {}
sub_seeds_cached = []
for i in range(n):
sub_seed, cache = get_sub_seed(seed, i, n, cache=cache)
sub_seed = get_sub_seed(seed, i, n, cache=cache)
sub_seeds_cached.append(sub_seed)

assert np.array_equal(sub_seeds, sub_seeds_cached)
Expand Down

0 comments on commit 23f7a6d

Please sign in to comment.