Skip to content

Commit

Permalink
Merge pull request #350 from bundlewrap/resume-file
Browse files Browse the repository at this point in the history
add `—resume-file` to `bw apply` and `bw run`
  • Loading branch information
trehn committed Jun 30, 2017
2 parents 9130cf4 + b5ae288 commit 21bd444
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 7 deletions.
7 changes: 6 additions & 1 deletion bundlewrap/cmdline/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from ..concurrency import WorkerPool
from ..exceptions import ItemDependencyLoop
from ..utils import SkipList
from ..utils.cmdline import get_target_nodes
from ..utils.plot import explain_item_dependency_loop
from ..utils.table import ROW_SEPARATOR, render_table
Expand Down Expand Up @@ -39,6 +40,7 @@ def bw_apply(repo, args):

start_time = datetime.now()
results = []
skip_list = SkipList(args['resume_file'])

def tasks_available():
return bool(pending_nodes)
Expand All @@ -52,14 +54,16 @@ def next_task():
'autoskip_selector': args['autoskip'],
'force': args['force'],
'interactive': args['interactive'],
'skip_list': skip_list,
'workers': args['item_workers'],
'profiling': args['profiling'],
},
}

def handle_result(task_id, return_value, duration):
if return_value is None: # node skipped because it had no items
if return_value is None: # node skipped
return
skip_list.add(task_id)
results.append(return_value)
if args['profiling']:
total_time = 0.0
Expand Down Expand Up @@ -95,6 +99,7 @@ def handle_exception(task_id, exception, traceback):
next_task,
handle_result=handle_result,
handle_exception=handle_exception,
cleanup=skip_list.dump,
pool_id="apply",
workers=args['node_workers'],
)
Expand Down
2 changes: 1 addition & 1 deletion bundlewrap/cmdline/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def remove_dummy_nodes(targets):
if list(node.items):
_targets.append(node)
else:
io.stdout(_("{x} {node} has no items").format(node=bold(node.name), x=yellow("!")))
io.stdout(_("{x} {node} has no items").format(node=bold(node.name), x=yellow("»")))
return _targets


Expand Down
24 changes: 24 additions & 0 deletions bundlewrap/cmdline/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,18 @@ def build_parser_bw():
dest='summary',
help=_("don't show stats summary"),
)
parser_apply.add_argument(
"-r",
"--resume-file",
default=None,
dest='resume_file',
help=_(
"path to a file that a list of completed nodes will be added to; "
"if the file already exists, any nodes therein will be skipped"
),
metavar=_("PATH"),
type=str,
)

# bw debug
help_debug = _("Start an interactive Python shell for this repository")
Expand Down Expand Up @@ -735,6 +747,18 @@ def build_parser_bw():
"(defaults to {})").format(bw_run_p_default),
type=int,
)
parser_run.add_argument(
"-r",
"--resume-file",
default=None,
dest='resume_file',
help=_(
"path to a file that a list of completed nodes will be added to; "
"if the file already exists, any nodes therein will be skipped"
),
metavar=_("PATH"),
type=str,
)

# bw stats
help_stats = _("Show some statistics about your repository")
Expand Down
21 changes: 18 additions & 3 deletions bundlewrap/cmdline/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@

from ..concurrency import WorkerPool
from ..exceptions import NodeLockedException
from ..utils import SkipList
from ..utils.cmdline import get_target_nodes
from ..utils.text import mark_for_translation as _
from ..utils.text import bold, error_summary, green, red, yellow
from ..utils.time import format_duration
from ..utils.ui import io


def run_on_node(node, command, may_fail, ignore_locks, log_output):
def run_on_node(node, command, may_fail, ignore_locks, log_output, skip_list):
if node.dummy:
io.stdout(_("{x} {node} is a dummy node").format(node=bold(node.name), x=yellow("!")))
return
io.stdout(_("{x} {node} is a dummy node").format(node=bold(node.name), x=yellow("»")))
return None

if node.name in skip_list:
io.stdout(_("{x} {node} skipped by --resume-file").format(node=bold(node.name), x=yellow("»")))
return None

node.repo.hooks.node_run_start(
node.repo,
Expand Down Expand Up @@ -59,6 +64,7 @@ def run_on_node(node, command, may_fail, ignore_locks, log_output):
node=bold(node.name),
x=red("✘"),
))
return result.return_code


def bw_run(repo, args):
Expand All @@ -74,6 +80,8 @@ def bw_run(repo, args):
)
start_time = datetime.now()

skip_list = SkipList(args['resume_file'])

def tasks_available():
return bool(pending_nodes)

Expand All @@ -88,9 +96,14 @@ def next_task():
args['may_fail'],
args['ignore_locks'],
True,
skip_list,
),
}

def handle_result(task_id, return_value, duration):
if return_value == 0:
skip_list.add(task_id)

def handle_exception(task_id, exception, traceback):
if isinstance(exception, NodeLockedException):
msg = _(
Expand All @@ -111,7 +124,9 @@ def handle_exception(task_id, exception, traceback):
worker_pool = WorkerPool(
tasks_available,
next_task,
handle_result=handle_result,
handle_exception=handle_exception,
cleanup=skip_list.dump,
pool_id="run",
workers=args['node_workers'],
)
Expand Down
4 changes: 4 additions & 0 deletions bundlewrap/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def __init__(
next_task,
handle_result=None,
handle_exception=None,
cleanup=None,
pool_id=None,
workers=4,
):
Expand All @@ -33,6 +34,7 @@ def __init__(
self.next_task = next_task
self.handle_result = handle_result
self.handle_exception = handle_exception
self.cleanup = cleanup

self.number_of_workers = workers
self.idle_workers = set(range(self.number_of_workers))
Expand Down Expand Up @@ -169,6 +171,8 @@ def run(self):
return processed_results
finally:
io.debug(_("shutting down worker pool {pool}").format(pool=self.pool_id))
if self.cleanup:
self.cleanup()
self.executor.shutdown()
io.debug(_("worker pool {pool} has been shut down").format(pool=self.pool_id))

Expand Down
9 changes: 7 additions & 2 deletions bundlewrap/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,15 +542,20 @@ def apply(
autoskip_selector="",
interactive=False,
force=False,
skip_list=tuple(),
workers=4,
profiling=False,
):
if not list(self.items):
io.stdout(_("{x} {node} has no items").format(node=bold(self.name), x=yellow("!")))
io.stdout(_("{x} {node} has no items").format(node=bold(self.name), x=yellow("»")))
return None

if self.covered_by_autoskip_selector(autoskip_selector):
io.debug(_("skipping {}, matches autoskip selector").format(self.name))
io.stdout(_("{x} {node} skipped by --skip").format(node=bold(self.name), x=yellow("»")))
return None

if self.name in skip_list:
io.stdout(_("{x} {node} skipped by --resume-file").format(node=bold(self.name), x=yellow("»")))
return None

start = datetime.now()
Expand Down
25 changes: 25 additions & 0 deletions bundlewrap/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,31 @@ def sha1(data):
return hasher.hexdigest()


class SkipList(object):
"""
Used to maintain a list of nodes that have already been visited.
"""
def __init__(self, path):
self.path = path
if path and exists(path):
with open(path) as f:
self._list_items = set(f.read().strip().split("\n"))
else:
self._list_items = set()

def __contains__(self, item):
return item in self._list_items

def add(self, item):
if self.path:
self._list_items.add(item)

def dump(self):
if self.path:
with open(self.path, 'w') as f:
f.write("\n".join(sorted(self._list_items)) + "\n")


@contextmanager
def tempfile():
handle, path = mkstemp()
Expand Down

0 comments on commit 21bd444

Please sign in to comment.