Skip to content

Commit

Permalink
Work in progress refactoring reachability.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisCummins committed Dec 14, 2018
1 parent 49f1080 commit f184474
Show file tree
Hide file tree
Showing 12 changed files with 432 additions and 144 deletions.
2 changes: 2 additions & 0 deletions .project/.idea/dictionaries/cec.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 54 additions & 14 deletions experimental/compilers/reachability/BUILD
@@ -1,13 +1,59 @@
# Experiments in learning reachability analysis.

load("@org_pubref_rules_protobuf//python:rules.bzl", "py_proto_library")

py_library(
name = "control_flow_graph",
srcs = ["control_flow_graph.py"],
deps = [
"//third_party/py/absl",
"//third_party/py/networkx",
],
visibility = ["//experimental/compilers/reachability:__subpackages__"],
)

py_test(
name = "control_flow_graph_test",
srcs = ["control_flow_graph_test.py"],
deps = [
":control_flow_graph",
"//third_party/py/absl",
"//third_party/py/networkx",
"//third_party/py/pytest",
],
)

py_library(
name = "control_flow_graph_generator",
srcs = ["control_flow_graph_generator.py"],
deps = [
":control_flow_graph",
"//third_party/py/absl",
"//third_party/py/networkx",
"//third_party/py/numpy",
],
visibility = ["//experimental/compilers/reachability:__subpackages__"],
)

py_test(
name = "control_flow_graph_generator_test",
srcs = ["control_flow_graph_generator_test.py"],
deps = [
":control_flow_graph_generator",
"//third_party/py/absl",
"//third_party/py/networkx",
"//third_party/py/pytest",
],
)

py_binary(
name = "eval_model",
srcs = ["eval_model.py"],
deps = [
":reachability_py_pb2",
":train_model",
"//deeplearning/clgen:telemetry",
"//deeplearning/clgen/corpuses:atomizers",
"//experimental/compilers/reachability/proto:reachability_py_pb2",
"//labm8:pbutil",
"//third_party/py/absl",
"//third_party/py/humanize",
Expand All @@ -16,27 +62,21 @@ py_binary(
],
)

py_binary(
name = "reachability",
srcs = ["reachability.py"],
deps = [
"//experimental/compilers/reachability/proto:reachability_py_pb2",
"//labm8:fmt",
"//labm8:fs",
"//labm8:graph",
"//third_party/py/absl",
"//third_party/py/numpy",
],
py_proto_library(
name = "reachability_py_pb2",
protos = ["reachability.proto"],
deps = ["//third_party/py/protobuf"],
visibility = ["//experimental/compilers/reachability:__subpackages__"],
)

py_binary(
name = "train_model",
srcs = ["train_model.py"],
deps = [
":reachability",
":control_flow_graph",
":reachability_py_pb2",
"//deeplearning/clgen:telemetry",
"//deeplearning/clgen/corpuses:atomizers",
"//experimental/compilers/reachability/proto:reachability_py_pb2",
"//labm8:pbutil",
"//third_party/py/absl",
"//third_party/py/humanize",
Expand Down
56 changes: 56 additions & 0 deletions experimental/compilers/reachability/control_flow_graph.py
@@ -0,0 +1,56 @@
"""A class representing a control flow graph."""
import typing

import networkx as nx
from absl import flags


FLAGS = flags.FLAGS


class ControlFlowGraph(nx.DiGraph):
"""A control flow graph.
For a control flow graph to be considered "valid", the following properties
should be adhered to:
* All nodes in the graph should have a unique "name" attribute. This can be
set at creation time, e.g.: cfg.add_node(0, name='foo').
* The graph should be fully connected.
Use the IsValidControlFlowGraph() method to check if a graph instance has
these properties.
"""

def __init__(self, name: str = "cfg"):
super(ControlFlowGraph, self).__init__(name=name)

def IsReachable(self, src, dst) -> bool:
"""Return whether dst node is reachable from src."""
# TODO(cec): It seems that descendants() does not include self loops, so
# test for the node in both descendants and self loops.
return ((dst in nx.descendants(self, src)) or
(dst in self.nodes_with_selfloops()))

def Reachables(self, src) -> typing.Iterator[bool]:
"""Return whether each node is reachable from the src node."""
return (self.IsReachable(src, dst) for dst in self.nodes)

def IsValidControlFlowGraph(self) -> bool:
"""Return true if the graph is a valid control flow graph."""
number_of_nodes = self.number_of_nodes()
# CFGs must contain a node.
if not number_of_nodes:
return False
# CFGs must be fully connected.
# TODO:
# if nx.number_connected_components(self) != number_of_nodes:
# return False
# All nodes must have a name.
if not all('name' in self.nodes[node] for node in self.nodes):
return False
# All node names must be unique.
node_names_set = set(self.nodes[n]['name'] for n in self.nodes)
if len(node_names_set) != number_of_nodes:
return False
return True
@@ -0,0 +1,99 @@
"""A generator for control flow graphs."""
import typing

import numpy as np
from absl import flags

from experimental.compilers.reachability import control_flow_graph as cfg


FLAGS = flags.FLAGS


class UniqueNameSequence(object):

def __init__(self, base_char: str, prefix: str = ''):
if base_char not in {'a', 'A'}:
raise ValueError(f"Invalid base_char '{base_char}'")
self._base_ord = ord(base_char)
self._prefix = prefix
self._i = 0

def StringInSequence(self, i: int) -> str:
if i < 0:
raise ValueError
s = [self._prefix]

while (i > 25):
k = i // 26
i %= 26
s.append(chr(self._base_ord - 1 + k))
s.append(chr(self._base_ord + i))

return ''.join(s)

def __next__(self):
s = self.StringInSequence(self._i)
self._i += 1
return s


class ControlFlowGraphGenerator(object):
"""A generator for control flow graphs."""

def __init__(self, rand: np.random.RandomState,
num_nodes_min_max: typing.Tuple[int, int],
connections_scaling_param: float):
"""Instantiate a control flow graph generator.
Args:
rand: A random state instance.
num_nodes: The number of CFG nodes.
connections_scaling_param: Scaling parameter to use to determine the
likelihood of edges between CFG nodes.
"""
self._rand = rand
self._num_nodes_min_max = num_nodes_min_max
self._connections_scaling_param = connections_scaling_param

@property
def rand(self) -> np.random.RandomState:
return self._rand

@property
def num_nodes_min_max(self) -> typing.Tuple[int, int]:
return self._num_nodes_min_max

@property
def connections_scaling_param(self) -> float:
return self._connections_scaling_param

def GenerateOne(self) -> 'ControlFlowGraph':
"""Create a random CFG.
Returns:
A ControlFlowGraph instance.
"""
num_nodes = self.rand.randint(*self.num_nodes_min_max)

nodes = [cfg.ControlFlowGraph(NumberToLetters(i)) for i in range(num_nodes)]
for node in nodes:
node.all_nodes = nodes
adjacency_matrix = (
self.rand.rand(num_nodes, num_nodes) * self.connections_scaling_param)
adjacency_matrix = np.clip(adjacency_matrix, 0, 1)
adjacency_matrix = np.rint(adjacency_matrix)
# CFG nodes cannot be connected to self.
for i in range(len(adjacency_matrix)):
adjacency_matrix[i][i] = 0
for j, row in enumerate(adjacency_matrix):
for i, col in enumerate(row):
if col:
nodes[j].children.add(nodes[i])
for i, node in enumerate(nodes):
if not node.children:
j = i
while j == i:
j = self.rand.randint(0, len(nodes) - 1)
node.children.add(nodes[j])
return nodes[0]
@@ -0,0 +1,67 @@
"""Unit tests for :control_flow_graph_generator."""
import sys
import typing

import pytest
from absl import app
from absl import flags

from experimental.compilers.reachability import control_flow_graph_generator


FLAGS = flags.FLAGS


def test_UniqueNameSequence_next():
"""Test iterator interface."""
g = control_flow_graph_generator.UniqueNameSequence('a')
assert next(g) == 'a'
assert next(g) == 'b'
assert next(g) == 'c'


def test_UniqueNameSequence_StringInSequence_single_char():
"""Test single character sequence output."""
g = control_flow_graph_generator.UniqueNameSequence('a')
assert g.StringInSequence(0) == 'a'
assert g.StringInSequence(1) == 'b'
assert g.StringInSequence(2) == 'c'
assert g.StringInSequence(25) == 'z'


def test_UniqueNameSequence_StringInSequence_multi_char():
"""Test multi character sequence output."""
g = control_flow_graph_generator.UniqueNameSequence('a')
assert g.StringInSequence(26) == 'aa'
assert g.StringInSequence(27) == 'ab'
assert g.StringInSequence(28) == 'ac'


def test_UniqueNameSequence_StringInSequence_prefix():
"""Test prefix."""
g = control_flow_graph_generator.UniqueNameSequence('a', prefix='prefix_')
assert g.StringInSequence(0) == 'prefix_a'


def test_UniqueNameSequence_StringInSequence_base_char():
"""Test different base char."""
g = control_flow_graph_generator.UniqueNameSequence('A')
assert g.StringInSequence(0) == 'A'


def test_UniqueNameSequence_StringInSequence_invalid_base_char():
"""Test that invalid base char raises error."""
with pytest.raises(ValueError):
control_flow_graph_generator.UniqueNameSequence('AA')


def main(argv: typing.List[str]):
"""Main entry point."""
if len(argv) > 1:
raise app.UsageError("Unknown arguments: '{}'.".format(' '.join(argv[1:])))
sys.exit(pytest.main([__file__, '-vv']))


if __name__ == '__main__':
flags.FLAGS(['argv[0]', '-v=1'])
app.run(main)

0 comments on commit f184474

Please sign in to comment.