Skip to content

Commit

Permalink
Elfi model docs and refactoring (#128)
Browse files Browse the repository at this point in the history
* ElfiModel documentation and refactoring

* More documentation and refactoring of the internal model representation format

* Review changes
  • Loading branch information
Jarno Lintusaari committed Apr 10, 2017
1 parent 86d729d commit 86171ed
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 198 deletions.
108 changes: 55 additions & 53 deletions elfi/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,51 +10,53 @@

class Compiler:
@classmethod
def compile(cls, source_net, output_net):
def compile(cls, source_net, compiled_net):
"""
Parameters
----------
source_net : nx.DiGraph
output_net : nx.DiGraph
compiled_net : nx.DiGraph
Returns
-------
compiled_net : nx.Digraph
"""
raise NotImplementedError


class OutputCompiler(Compiler):
@classmethod
def compile(cls, source_net, output_net):
def compile(cls, source_net, compiled_net):
"""Compiles the nodes present in the source_net
"""
logger.debug("{} compiling...".format(cls.__name__))

# Make a structural copy of the source_net
output_net.add_nodes_from(source_net.nodes())
output_net.add_edges_from(source_net.edges(data=True))
compiled_net.add_nodes_from(source_net.nodes())
compiled_net.add_edges_from(source_net.edges(data=True))

# Compile the nodes to computation nodes
for name, data in output_net.nodes_iter(data=True):
for name, data in compiled_net.nodes_iter(data=True):
state = source_net.node[name]
data['output'] = state['class'].compile_output(state)
if '_output' in state and '_operation' in state:
raise ValueError("Cannot compile: both _output and _operation present")

if '_output' in state:
data['output'] = state['_output']
elif '_operation' in state:
data['operation'] = state['_operation']
else:
raise ValueError("Cannot compile, no _output or _operation present")

return output_net
return compiled_net


class ObservedCompiler(Compiler):
@classmethod
def compile(cls, source_net, output_net):
def compile(cls, source_net, compiled_net):
"""Adds observed nodes to the computation graph
Parameters
----------
source_net : nx.DiGraph
output_net : nx.DiGraph
Returns
-------
output_net : nx.DiGraph
"""
logger.debug("{} compiling...".format(cls.__name__))

Expand All @@ -63,93 +65,93 @@ def compile(cls, source_net, output_net):

for node in nx.topological_sort(source_net):
state = source_net.node[node]
if state.get('observable'):
if state.get('_observable'):
observable.append(node)
cls.make_observed_copy(node, output_net)
elif state.get('uses_observed'):
cls.make_observed_copy(node, compiled_net)
elif state.get('_uses_observed'):
uses_observed.append(node)
obs_node = cls.make_observed_copy(node, output_net, args_to_tuple)
obs_node = cls.make_observed_copy(node, compiled_net, args_to_tuple)
# Make edge to the using node
output_net.add_edge(obs_node, node, param='observed')
compiled_net.add_edge(obs_node, node, param='observed')
else:
continue

# Copy the edges
if not state.get('stochastic'):
if not state.get('_stochastic'):
obs_node = observed_name(node)
for parent in source_net.predecessors(node):
if parent in observable:
link_parent = observed_name(parent)
else:
link_parent = parent

output_net.add_edge(link_parent, obs_node, source_net[parent][node].copy())
compiled_net.add_edge(link_parent, obs_node, source_net[parent][node].copy())

# Check that there are no stochastic nodes in the ancestors
# TODO: move to loading phase when checking that stochastic observables get their data?
for node in uses_observed:
# Use the observed version to query observed ancestors in the output_net
# Use the observed version to query observed ancestors in the compiled_net
obs_node = observed_name(node)
for ancestor_node in nx.ancestors(output_net, obs_node):
if 'stochastic' in source_net.node.get(ancestor_node, {}):
for ancestor_node in nx.ancestors(compiled_net, obs_node):
if '_stochastic' in source_net.node.get(ancestor_node, {}):
raise ValueError("Observed nodes must be deterministic. Observed data"
"depends on a non-deterministic node {}."
.format(ancestor_node))

return output_net
return compiled_net

@classmethod
def make_observed_copy(cls, node, output_net, output_data=None):
def make_observed_copy(cls, node, compiled_net, operation=None):
obs_node = observed_name(node)

if output_net.has_node(obs_node):
if compiled_net.has_node(obs_node):
raise ValueError("Observed node {} already exists!".format(obs_node))

if output_data is None:
output_dict = output_net.node[node].copy()
if operation is None:
compiled_dict = compiled_net.node[node].copy()
else:
output_dict = dict(output=output_data)
compiled_dict = dict(operation=operation)

output_net.add_node(obs_node, output_dict)
compiled_net.add_node(obs_node, compiled_dict)
return obs_node


class BatchSizeCompiler(Compiler):
@classmethod
def compile(cls, source_net, output_net):
def compile(cls, source_net, compiled_net):
logger.debug("{} compiling...".format(cls.__name__))

_node = '_batch_size'
for node, d in source_net.nodes_iter(data=True):
if d.get('uses_batch_size'):
if not output_net.has_node(_node):
output_net.add_node(_node)
output_net.add_edge(_node, node, param='batch_size')
return output_net
if d.get('_uses_batch_size'):
if not compiled_net.has_node(_node):
compiled_net.add_node(_node)
compiled_net.add_edge(_node, node, param='batch_size')
return compiled_net


class RandomStateCompiler(Compiler):
@classmethod
def compile(cls, source_net, output_net):
def compile(cls, source_net, compiled_net):
logger.debug("{} compiling...".format(cls.__name__))

_random_node = '_random_state'
for node, d in source_net.nodes_iter(data=True):
if 'stochastic' in d:
if not output_net.has_node(_random_node):
output_net.add_node(_random_node)
output_net.add_edge(_random_node, node, param='random_state')
return output_net
if '_stochastic' in d:
if not compiled_net.has_node(_random_node):
compiled_net.add_node(_random_node)
compiled_net.add_edge(_random_node, node, param='random_state')
return compiled_net


class ReduceCompiler(Compiler):
@classmethod
def compile(cls, source_net, output_net):
def compile(cls, source_net, compiled_net):
logger.debug("{} compiling...".format(cls.__name__))

outputs = output_net.graph['outputs']
output_ancestors = all_ancestors(output_net, outputs)
for node in output_net.nodes():
outputs = compiled_net.graph['outputs']
output_ancestors = all_ancestors(compiled_net, outputs)
for node in compiled_net.nodes():
if node not in output_ancestors:
output_net.remove_node(node)
return output_net
compiled_net.remove_node(node)
return compiled_net
57 changes: 46 additions & 11 deletions elfi/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,36 @@

class Executor:
"""
Responsible for computing the graph
Responsible for computing the graph G
The format of the computable graph G is `nx.DiGraph`. The execution order of the nodes
is fixed and follows the topological ordering of G. The following properties are
required.
### Keys in G.graph dictionary
outputs : list
lists all the names of the nodes whose outputs are returned.
### Keys in edge dictionaries, G[parent_name][child_name]
param : str or int
The parent node output is passed as a parameter with this name to the child node.
Integers are interpreted as positional parameters.
### Keys in node dictionaries, G.node
op : callable
Executed with with the parameter specified in the incoming edges
output : variable
Existing output value taken as an output itself
Notes
-----
You cannot have both operation and output in the same node dictionary
"""

@classmethod
Expand All @@ -25,12 +54,19 @@ def execute(cls, G):
"""

for node in nx_alphabetical_topological_sort(G):
for node in nx_constant_topological_sort(G):
attr = G.node[node]
fn = attr['output']
logger.debug("Executing {}".format(node))
if callable(fn):
G.node[node] = cls._run(fn, node, G)
if attr.keys() >= {'operation', 'output'}:
raise ValueError('Generative graph has both op and output present')

if 'operation' in attr:
op = attr['operation']
G.node[node] = cls._run(op, node, G)
elif 'output' not in attr:
raise ValueError('Generative graph has no op or output present')

# Make a result dict based on the requested outputs
result = {k:G.node[k]['output'] for k in G.graph['outputs']}
return result

Expand All @@ -56,8 +92,9 @@ def _run(fn, node, G):
return output


def nx_alphabetical_topological_sort(G, nbunch=None, reverse=False):
"""Return a list of nodes in topological sort order.
def nx_constant_topological_sort(G, nbunch=None, reverse=False):
"""Return a list of nodes in a constant topological sort order. This implementations is
adapted from `networkx.topological_sort`.
Modified version of networkx.topological_sort. The difference is that this version
will always return the same order for the same graph G given that the nodes
Expand Down Expand Up @@ -92,10 +129,8 @@ def nx_alphabetical_topological_sort(G, nbunch=None, reverse=False):
Notes
-----
This algorithm is based on a description and proof in
The Algorithm Design Manual [1]_ .
The implementation is adapted from networkx.topological_sort.
This algorithm is based on a description and proof in The Algorithm Design
Manual [1].
References
----------
Expand Down
8 changes: 6 additions & 2 deletions elfi/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ def load(cls, context, output_net, batch_index):
continue
elif node in batch:
output_net.node[node]['output'] = batch[node]
output_net.node[node].pop('operation', None)
elif node not in output_net.graph['outputs']:
# Add output so that it can be stored when the results come
# We are missing this item from the batch so add the output to the
# requested outputs so that it can be stored when the results arrive
output_net.graph['outputs'].append(node)

return output_net
Expand All @@ -87,10 +89,12 @@ class RandomStateLoader(Loader):

@classmethod
def load(cls, context, output_net, batch_index):
key = 'output'
seed = context.seed
if seed is False:
# Get the random_state of the respective worker by delaying the evaluation
random_state = get_np_random
key = 'operation'
elif isinstance(seed, (int, np.uint32)):
random_state = np.random.RandomState(context.seed)
else:
Expand All @@ -104,7 +108,7 @@ def load(cls, context, output_net, batch_index):

_random_node = '_random_state'
if output_net.has_node(_random_node):
output_net.node[_random_node]['output'] = random_state
output_net.node[_random_node][key] = random_state

return output_net

Expand Down

0 comments on commit 86171ed

Please sign in to comment.