Skip to content

Commit

Permalink
show progress info on SIGINFO and SIGQUIT
Browse files Browse the repository at this point in the history
  • Loading branch information
trehn committed Aug 8, 2017
1 parent 1c76cf4 commit 2de8f28
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 6 deletions.
4 changes: 3 additions & 1 deletion bundlewrap/cmdline/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ..concurrency import WorkerPool
from ..exceptions import ItemDependencyLoop
from ..utils import SkipList
from ..utils.cmdline import get_target_nodes
from ..utils.cmdline import count_items, get_target_nodes
from ..utils.plot import explain_item_dependency_loop
from ..utils.table import ROW_SEPARATOR, render_table
from ..utils.text import (
Expand All @@ -31,6 +31,8 @@ def bw_apply(repo, args):
target_nodes = get_target_nodes(repo, args['target'], adhoc_nodes=args['adhoc_nodes'])
pending_nodes = target_nodes[:]

io.progress_set_total(count_items(pending_nodes))

repo.hooks.apply_start(
repo,
args['target'],
Expand Down
4 changes: 4 additions & 0 deletions bundlewrap/cmdline/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def bw_lock_add(repo, args):
pending_nodes = target_nodes[:]
max_node_name_length = max([len(node.name) for node in target_nodes])
lock_id = randstr(length=4).upper()
io.progress_set_total(len(pending_nodes))

def tasks_available():
return bool(pending_nodes)
Expand All @@ -56,6 +57,7 @@ def next_task():
}

def handle_result(task_id, return_value, duration):
io.progress_advance()
io.stdout(_("{x} {node} locked with ID {id} (expires in {exp})").format(
x=green("✓"),
node=bold(task_id.ljust(max_node_name_length)),
Expand Down Expand Up @@ -89,6 +91,7 @@ def bw_lock_remove(repo, args):
target_nodes = remove_dummy_nodes(target_nodes)
pending_nodes = target_nodes[:]
max_node_name_length = max([len(node.name) for node in target_nodes])
io.progress_set_total(len(pending_nodes))

def tasks_available():
return bool(pending_nodes)
Expand All @@ -102,6 +105,7 @@ def next_task():
}

def handle_result(task_id, return_value, duration):
io.progress_advance()
if return_value is True:
io.stdout(_("{x} {node} lock {id} removed").format(
x=green("✓"),
Expand Down
3 changes: 3 additions & 0 deletions bundlewrap/cmdline/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def bw_run(repo, args):
errors = []
target_nodes = get_target_nodes(repo, args['target'], adhoc_nodes=args['adhoc_nodes'])
pending_nodes = target_nodes[:]
io.progress_set_total(len(pending_nodes))

repo.hooks.run_start(
repo,
Expand Down Expand Up @@ -101,10 +102,12 @@ def next_task():
}

def handle_result(task_id, return_value, duration):
io.progress_advance()
if return_value == 0:
skip_list.add(task_id)

def handle_exception(task_id, exception, traceback):
io.progress_advance()
if isinstance(exception, NodeLockedException):
msg = _(
"{node_bold} locked by {user} "
Expand Down
4 changes: 3 additions & 1 deletion bundlewrap/cmdline/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ..concurrency import WorkerPool
from ..plugins import PluginManager
from ..repo import Repository
from ..utils.cmdline import get_target_nodes
from ..utils.cmdline import count_items, get_target_nodes
from ..utils.plot import explain_item_dependency_loop
from ..utils.text import bold, green, mark_for_translation as _, red, yellow
from ..utils.ui import io
Expand All @@ -33,6 +33,8 @@ def bw_test(repo, args):
x=yellow("!"),
))

io.progress_set_total(count_items(pending_nodes))

def tasks_available():
return bool(pending_nodes)

Expand Down
3 changes: 2 additions & 1 deletion bundlewrap/cmdline/verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sys import exit

from ..concurrency import WorkerPool
from ..utils.cmdline import get_target_nodes
from ..utils.cmdline import count_items, get_target_nodes
from ..utils.table import ROW_SEPARATOR, render_table
from ..utils.text import (
blue,
Expand Down Expand Up @@ -105,6 +105,7 @@ def bw_verify(repo, args):
node_stats = {}
pending_nodes = get_target_nodes(repo, args['target'], adhoc_nodes=args['adhoc_nodes'])
start_time = datetime.now()
io.progress_set_total(count_items(pending_nodes))

def tasks_available():
return bool(pending_nodes)
Expand Down
7 changes: 7 additions & 0 deletions bundlewrap/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def tasks_available():
def next_task():
item, skipped_items = item_queue.pop()
for skipped_item in skipped_items:
io.progress_advance()
handle_apply_result(
node,
skipped_item,
Expand Down Expand Up @@ -199,6 +200,7 @@ def handle_result(task_id, return_value, duration):

handle_apply_result(node, item, status_code, interactive, changes=changes)
if not isinstance(item, DummyItem):
io.progress_advance()
results.append((item.id, status_code, duration))

worker_pool = WorkerPool(
Expand Down Expand Up @@ -834,6 +836,7 @@ def next_task():
}

def handle_result(task_id, return_value, duration):
io.progress_advance()
node_name, bundle_name, item_id = task_id.split(":", 2)
io.stdout("{x} {node} {bundle} {item}".format(
bundle=bold(bundle_name),
Expand Down Expand Up @@ -885,6 +888,8 @@ def verify_items(node, show_all=False, workers=1):
not item.triggered
):
items.append(item)
elif not isinstance(item, DummyItem):
io.progress_advance()

def tasks_available():
return bool(items)
Expand All @@ -899,6 +904,7 @@ def next_task():
if item.error_on_missing_fault:
item._raise_for_faults()
else:
io.progress_advance()
io.stdout(_("{x} {node} {bundle} {item} ({msg})").format(
bundle=bold(item.bundle.name),
item=item.id,
Expand All @@ -913,6 +919,7 @@ def next_task():
}

def handle_result(task_id, return_value, duration):
io.progress_advance()
unless_result, item_status = return_value
node_name, bundle_name, item_id = task_id.split(":", 2)
if not unless_result and not item_status.correct:
Expand Down
7 changes: 7 additions & 0 deletions bundlewrap/utils/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
from .ui import io


def count_items(nodes):
count = 0
for node in nodes:
count += len(node.items)
return count


def get_group(repo, group_name):
try:
return repo.get_group(group_name)
Expand Down
69 changes: 66 additions & 3 deletions bundlewrap/utils/ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@
from os import _exit, environ, getpid, kill
from os.path import join
from select import select
from signal import signal, SIG_DFL, SIGINT, SIGTERM
from signal import signal, SIG_DFL, SIGINFO, SIGINT, SIGQUIT, SIGTERM
import struct
from subprocess import PIPE, Popen
import sys
import termios
from threading import Event, Lock, Thread

from . import STDERR_WRITER, STDOUT_WRITER
from .table import render_table, ROW_SEPARATOR
from .text import ansi_clean, blue, bold, inverse, mark_for_translation as _
from .time import format_duration

INFO_EVENT = Event()
QUIT_EVENT = Event()
SHUTDOWN_EVENT_HARD = Event()
SHUTDOWN_EVENT_SOFT = Event()
Expand Down Expand Up @@ -84,6 +87,16 @@ def sigint_handler(*args, **kwargs):
SHUTDOWN_EVENT_HARD.set()


def siginfo_handler(*args, **kwargs):
"""
This handler is kept short since it interrupts execution of the
main thread. It's safer to handle these events in their own thread
because the main thread might be holding the IO lock while it is
interrupted.
"""
INFO_EVENT.set()


def term_width():
if not TTY:
return 0
Expand Down Expand Up @@ -145,6 +158,9 @@ def __init__(self):
self.debug_mode = False
self.jobs = []
self.lock = Lock()
self.progress = 0
self.progress_start = None
self.progress_total = 0
self._signal_handler_thread = Thread(
target=self._signal_handler_thread_body,
)
Expand All @@ -166,7 +182,9 @@ def activate(self):
),
), 'a')
self._signal_handler_thread.start()
signal(SIGINFO, siginfo_handler)
signal(SIGINT, sigint_handler)
signal(SIGQUIT, siginfo_handler)

def ask(self, question, default, epilogue=None, input_handler=DrainableStdin()):
assert self._active
Expand Down Expand Up @@ -207,7 +225,9 @@ def ask(self, question, default, epilogue=None, input_handler=DrainableStdin()):

def deactivate(self):
self._active = False
signal(SIGINFO, SIG_DFL)
signal(SIGINT, SIG_DFL)
signal(SIGQUIT, SIG_DFL)
self._signal_handler_thread.join()
if self.debug_log_file:
self.debug_log_file.close()
Expand Down Expand Up @@ -238,6 +258,49 @@ def job_del(self, msg):
self.jobs.remove(msg)
self._write_current_job()

def progress_advance(self, increment=1):
with self.lock:
self.progress += increment

def progress_set_total(self, total):
self.progress = 0
self.progress_start = datetime.utcnow()
self.progress_total = total

def progress_show(self):
if INFO_EVENT.is_set():
INFO_EVENT.clear()
table = []
if self.jobs:
table.append([bold(_("Running jobs")), self.jobs[0].strip()])
for job in self.jobs[1:]:
table.append(["", job.strip()])
try:
progress = (self.progress / float(self.progress_total))
elapsed = datetime.utcnow() - self.progress_start
remaining = elapsed / progress - elapsed
except ZeroDivisionError:
pass
else:
table.extend([
ROW_SEPARATOR,
[bold(_("Progress")), "{:.1f}%".format(progress * 100)],
ROW_SEPARATOR,
[bold(_("Elapsed")), format_duration(elapsed)],
ROW_SEPARATOR,
[
bold(_("Remaining")),
_("{} (estimate based on progress)").format(format_duration(remaining))
],
])
io.stdout(blue("i"))
if table:
for line in render_table(table):
io.stdout("{x} {line}".format(x=blue("i"), line=line))
else:
io.stdout(_("{x} No progress info available at this time.").format(x=blue("i")))
io.stdout(blue("i"))

@clear_formatting
@capture_for_debug_logfile
@add_debug_timestamp
Expand Down Expand Up @@ -266,6 +329,7 @@ def _clear_last_job(self):

def _signal_handler_thread_body(self):
while self._active:
self.progress_show()
if QUIT_EVENT.is_set():
if SHUTDOWN_EVENT_HARD.wait(0.1):
self.stderr(_("{x} {signal} cleanup interrupted, exiting...").format(
Expand Down Expand Up @@ -294,8 +358,7 @@ def _signal_handler_thread_body(self):
def _write(self, msg, append_newline=True, err=False):
if not self._active:
return
if self.jobs and TTY:
write_to_stream(STDOUT_WRITER, "\r\033[K")
self._clear_last_job()
if msg is not None:
if append_newline:
msg += "\n"
Expand Down

0 comments on commit 2de8f28

Please sign in to comment.