Permalink
Browse files

IMPALA-80: Dynamic progress reporting for the shell

This patch adds a way to allow for dynamic progress reporting in the
shell. There are two new command line flags for the shell

   --live_progress - will print the completed vs total # of scan ranges
   --live_summary - prints an updated exec summary

In addition to the command line flags, these options can be set from
within the shell using:

   set LIVE_SUMMARY=True
   set LIVE_PROGRESS=True

The new options will be listed under shell options. Both reports will be
updated at most every second, for longer running queries it will be
adjusted to the time between two RPC calls to get the query status. To
provide this information in the ExecSummary, the Thrift structure for
the ExecSummary was extended to contain a progress indicator. The output
is printed to stderr and only available in interactive mode.

An example video is available here:

https://asciinema.org/a/5wi7ypckx4ol4ha1hlg3e3q1k

Change-Id: I70b2ab5fa74dc2ba5bc3b338ef13ddc6ccf367d2
Reviewed-on: http://gerrit.cloudera.org:8080/508
Tested-by: Internal Jenkins
Reviewed-by: Martin Grund <mgrund@cloudera.com>
  • Loading branch information...
grundprinzip committed Jul 7, 2015
1 parent 0bcfefe commit 5a7109c5c68acb6cfd1b1e5e758e492276f4a653
@@ -580,14 +580,23 @@ Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id,
}
Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, TExecSummary* result) {
- // Search for the query id in the active query map
+ // Search for the query id in the active query map.
{
shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
if (exec_state != NULL) {
lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
if (exec_state->coord() != NULL) {
- lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
- *result = exec_state->coord()->exec_summary();
+ TExecProgress progress;
+ {
+ lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
+ *result = exec_state->coord()->exec_summary();
+
+ // Update the current scan range progress for the summary.
+ progress.__set_num_completed_scan_ranges(
+ exec_state->coord()->progress().num_complete());
+ progress.__set_total_scan_ranges(exec_state->coord()->progress().total());
+ }
+ result->__set_progress(progress);
return Status::OK();
}
}
@@ -72,6 +72,12 @@ struct TPlanNodeExecSummary {
9: optional bool is_broadcast
}
+// Progress counters for an in-flight query.
+struct TExecProgress {
+ 1: optional i64 total_scan_ranges
+ 2: optional i64 num_completed_scan_ranges
+}
+
// Execution summary of an entire query.
struct TExecSummary {
// State of the query.
@@ -91,5 +97,7 @@ struct TExecSummary {
// even if status is okay, in which case it contains errors that impala skipped
// over.
5: optional list<string> error_logs
-}
+ // Optional record indicating the query progress
+ 6: optional TExecProgress progress
+}
View
@@ -28,7 +28,6 @@
from thrift.transport.TTransport import TBufferedTransport, TTransportException
from thrift.Thrift import TApplicationException
-
class RpcStatus:
"""Convenience enum to describe Rpc return statuses"""
OK = 0
@@ -122,14 +121,15 @@ def build_summary_table(self, summary, idx, is_fragment_root, indent_level,
setattr(max_stats, attr, 0)
node = summary.nodes[idx]
- for stats in node.exec_stats:
- for attr in attrs:
- val = getattr(stats, attr)
- if val is not None:
- setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
- setattr(max_stats, attr, max(getattr(max_stats, attr), val))
-
- if len(node.exec_stats) > 0:
+ if node.exec_stats is not None:
+ for stats in node.exec_stats:
+ for attr in attrs:
+ val = getattr(stats, attr)
+ if val is not None:
+ setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
+ setattr(max_stats, attr, max(getattr(max_stats, attr), val))
+
+ if node.exec_stats is not None and node.exec_stats:
avg_time = agg_stats.latency_ns / len(node.exec_stats)
else:
avg_time = 0
@@ -171,8 +171,11 @@ def prettyprint_units(unit_val):
def prettyprint_time(time_val):
return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0)
+ hosts = 0
+ if node.exec_stats is not None:
+ hosts = len(node.exec_stats)
row = [ label_prefix + node.label,
- len(node.exec_stats),
+ hosts,
prettyprint_time(avg_time),
prettyprint_time(max_stats.latency_ns),
prettyprint_units(cardinality),
@@ -288,7 +291,7 @@ def execute_query(self, query):
raise RPCException("Error executing the query")
return last_query_handle
- def wait_to_finish(self, last_query_handle):
+ def wait_to_finish(self, last_query_handle, periodic_callback=None):
loop_start = time.time()
while True:
query_state = self.get_query_state(last_query_handle)
@@ -299,6 +302,8 @@ def wait_to_finish(self, last_query_handle):
raise QueryStateException(self.get_warning_log(last_query_handle))
else:
raise DisconnectedException("Not connected to impalad.")
+
+ if periodic_callback is not None: periodic_callback()
time.sleep(self._get_sleep_interval(loop_start))
def fetch(self, query_handle):
@@ -419,7 +424,6 @@ def _get_sleep_interval(self, start_time):
return 0.1
elif elapsed < 60.0:
return 0.5
-
return 1.0
def get_column_names(self, last_query_handle):
View
@@ -33,6 +33,7 @@
from impala_shell_config_defaults import impala_shell_defaults
from option_parser import get_option_parser, get_config_from_file
from shell_output import DelimitedOutputFormatter, OutputStream, PrettyOutputFormatter
+from shell_output import OverwritingStdErrOutputStream
from subprocess import call
VERSION_FORMAT = "Impala Shell v%(version)s (%(git_hash)s) built on %(build_date)s"
@@ -94,6 +95,14 @@ class ImpalaShell(cmd.Cmd):
# Seperator for queries in the history file.
HISTORY_FILE_QUERY_DELIM = '_IMP_DELIM_'
+ VALID_SHELL_OPTIONS = {
+ 'LIVE_PROGRESS' : (lambda x: x in ("true", "TRUE", "True", "1"), "print_progress"),
+ 'LIVE_SUMMARY' : (lambda x: x in ("true", "TRUE", "True", "1"), "print_summary")
+ }
+
+ # Minimum time in seconds between two calls to get the exec summary.
+ PROGRESS_UPDATE_INTERVAL = 1.0
+
def __init__(self, options):
cmd.Cmd.__init__(self)
self.is_alive = True
@@ -127,6 +136,8 @@ def __init__(self, options):
self.write_delimited = options.write_delimited
self.print_header = options.print_header
+ self.progress_stream = OverwritingStdErrOutputStream()
+
self.set_query_options = {}
self._populate_command_list()
@@ -137,6 +148,9 @@ def __init__(self, options):
self.last_query_handle = None;
self.query_handle_closed = None
+ self.print_summary = options.print_summary
+ self.print_progress = options.print_progress
+
try:
self.readline = __import__('readline')
self.readline.set_history_length(HISTORY_LENGTH)
@@ -171,7 +185,8 @@ def _disable_readline(self):
def _print_options(self, default_options, set_options):
# Prints the current query options
- # with default values distinguished from set values by brackets []
+ # with default values distinguished from set values by brackets [], followed by
+ # shell-local options.
if not default_options and not set_options:
print '\tNo options available.'
else:
@@ -180,6 +195,13 @@ def _print_options(self, default_options, set_options):
print '\n'.join(["\t%s: %s" % (k, set_options[k])])
else:
print '\n'.join(["\t%s: [%s]" % (k, default_options[k])])
+ self._print_shell_options()
+
+ def _print_shell_options(self):
+ """Prints shell options, which are local and independent of query options."""
+ print "\nShell Options"
+ for x in self.VALID_SHELL_OPTIONS:
+ print "\t%s: %s" % (x, self.__dict__[self.VALID_SHELL_OPTIONS[x][1]])
def do_shell(self, args):
"""Run a command on the shell
@@ -390,14 +412,20 @@ def do_summary(self, args):
print_to_stderr("Summary not available")
return CmdStatus.SUCCESS
output = []
- table = self.construct_table_header(["Operator", "#Hosts", "Avg Time", "Max Time",
- "#Rows", "Est. #Rows", "Peak Mem",
- "Est. Peak Mem", "Detail"])
+ table = self._default_summary_table()
self.imp_client.build_summary_table(summary, 0, False, 0, False, output)
formatter = PrettyOutputFormatter(table)
self.output_stream = OutputStream(formatter, filename=self.output_file)
self.output_stream.write(output)
+ def _handle_shell_options(self, token, value):
+ try:
+ handle = self.VALID_SHELL_OPTIONS[token]
+ self.__dict__[handle[1]] = handle[0](value)
+ return True
+ except KeyError:
+ return False
+
def do_set(self, args):
"""Set or display query options.
@@ -420,12 +448,13 @@ def do_set(self, args):
print_to_stderr("Error: SET <option>=<value>")
return CmdStatus.ERROR
option_upper = tokens[0].upper()
- if option_upper not in self.imp_client.default_query_options.keys():
- print "Unknown query option: %s" % (tokens[0])
- print "Available query options, with their values (defaults shown in []):"
- self._print_options(self.imp_client.default_query_options, self.set_query_options)
- return CmdStatus.ERROR
- self.set_query_options[option_upper] = tokens[1]
+ if not self._handle_shell_options(option_upper, tokens[1]):
+ if option_upper not in self.imp_client.default_query_options.keys():
+ print "Unknown query option: %s" % (tokens[0])
+ print "Available query options, with their values (defaults shown in []):"
+ self._print_options(self.imp_client.default_query_options, self.set_query_options)
+ return CmdStatus.ERROR
+ self.set_query_options[option_upper] = tokens[1]
self._print_if_verbose('%s set to %s' % (option_upper, tokens[1]))
def do_unset(self, args):
@@ -629,10 +658,50 @@ def _format_outputstream(self):
if self.print_header:
self.output_stream.write([column_names])
else:
- prettytable = self.construct_table_header(column_names)
+ prettytable = self.construct_table_with_header(column_names)
formatter = PrettyOutputFormatter(prettytable)
self.output_stream = OutputStream(formatter, filename=self.output_file)
+ def _periodic_wait_callback(self):
+ """If enough time elapsed since the last call to the periodic callback,
+ execute the RPC to get the query exec summary and depending on the set options
+ print either the progress or the summary or both to stderr.
+ """
+ if not self.print_progress and not self.print_summary: return
+
+ checkpoint = time.time()
+ if checkpoint - self.last_summary > self.PROGRESS_UPDATE_INTERVAL:
+ summary = self.imp_client.get_summary(self.last_query_handle)
+ if summary and summary.progress:
+ progress = summary.progress
+
+ # If the data is not complete return and wait for a good result.
+ if not progress.total_scan_ranges and not progress.num_completed_scan_ranges:
+ self.last_summary = time.time()
+ return
+
+ data = ""
+ if self.print_progress and progress.total_scan_ranges > 0:
+ val = ((summary.progress.num_completed_scan_ranges * 100) /
+ summary.progress.total_scan_ranges)
+ fragment_text = "[%s%s] %s%%\n" % ("#" * val, " " * (100 - val), val)
+ data += fragment_text
+
+ if self.print_summary:
+ table = self._default_summary_table()
+ output = []
+ self.imp_client.build_summary_table(summary, 0, False, 0, False, output)
+ formatter = PrettyOutputFormatter(table)
+ data += formatter.format(output) + "\n"
+
+ self.progress_stream.write(data)
+ self.last_summary = time.time()
+
+ def _default_summary_table(self):
+ return self.construct_table_with_header(["Operator", "#Hosts", "Avg Time", "Max Time",
+ "#Rows", "Est. #Rows", "Peak Mem",
+ "Est. Peak Mem", "Detail"])
+
def _execute_stmt(self, query, is_insert=False):
""" The logic of executing any query statement
@@ -650,7 +719,11 @@ def _execute_stmt(self, query, is_insert=False):
self.last_query_handle = self.imp_client.execute_query(query)
self.query_handle_closed = False
- wait_to_finish = self.imp_client.wait_to_finish(self.last_query_handle)
+ self.last_summary = time.time()
+ wait_to_finish = self.imp_client.wait_to_finish(self.last_query_handle,
+ self._periodic_wait_callback)
+ # Reset the progress stream.
+ self.progress_stream.clear()
if is_insert:
# retrieve the error log
@@ -729,7 +802,7 @@ def _no_cancellation_error(self, error):
if ImpalaShell.CANCELLATION_ERROR not in str(error):
return True
- def construct_table_header(self, column_names):
+ def construct_table_with_header(self, column_names):
""" Constructs the table header for a given query handle.
Should be called after the query has finished and before data is fetched.
@@ -1008,6 +1081,10 @@ def execute_queries_non_interactive_mode(options):
sys.exit(1)
if options.query or options.query_file:
+ if options.print_progress or options.print_summary:
+ print_to_stderr("Error: Live reporting is available for interactive mode only.")
+ sys.exit(1)
+
execute_queries_non_interactive_mode(options)
sys.exit(0)
@@ -39,5 +39,7 @@
'user': getpass.getuser(),
'ssl': False,
'ca_cert': None,
- 'config_file': os.path.expanduser("~/.impalarc")
- }
+ 'config_file': os.path.expanduser("~/.impalarc"),
+ 'print_progress' : False,
+ 'print_summary' : False
+ }
View
@@ -134,6 +134,10 @@ def get_option_parser(defaults):
"Specifying this option within a config file will have "
"no effect. Only specify this as a option in the commandline."
))
+ parser.add_option("--live_summary", dest="print_summary", action="store_true",
+ help="Print a query summary every 1s while the query is running.")
+ parser.add_option("--live_progress", dest="print_progress", action="store_true",
+ help="Print a query progress every 1s while the query is running.")
# add default values to the help text
for option in parser.option_list:
View
@@ -14,6 +14,7 @@
# limitations under the License.
import csv
+import re
import sys
from cStringIO import StringIO
@@ -86,3 +87,40 @@ def __del__(self):
# Don't close the file handle if it points to sys.stdout.
if self.filename and self.handle != sys.stdout:
self.handle.close()
+
+
+class OverwritingStdErrOutputStream(object):
+ """This class is used to write output to stderr and overwrite the previous text as
+ soon as new content needs to be written."""
+
+ # ANSI Escape code for up.
+ UP = "\x1b[A"
+
+ def __init__(self):
+ self.last_line_count = 0
+ self.last_clean_text = ""
+
+ def _clean_before(self):
+ sys.stderr.write(self.UP * self.last_line_count)
+ sys.stderr.write(self.last_clean_text)
+
+ def write(self, data):
+ """This method will erase the previously printed text on screen by going
+ up as many new lines as the old text had and overwriting it with whitespace.
+ Afterwards, the new text will be printed."""
+ self._clean_before()
+ new_line_count = data.count("\n")
+ sys.stderr.write(self.UP * min(new_line_count, self.last_line_count))
+ sys.stderr.write(data)
+
+ # Cache the line count and the old text where all text was replaced by
+ # whitespace.
+ self.last_line_count = new_line_count
+ self.last_clean_text = re.sub(r"[^\s]", " ", data)
+
+ def clear(self):
+ sys.stderr.write(self.UP * self.last_line_count)
+ sys.stderr.write(self.last_clean_text)
+ sys.stderr.write(self.UP * self.last_line_count)
+ self.last_line_count = 0
+ self.last_clean_text = ""
Oops, something went wrong.

0 comments on commit 5a7109c

Please sign in to comment.