Skip to content

Commit

Permalink
Merge pull request #409 from codywilbourn/cli-improvements
Browse files Browse the repository at this point in the history
Add CLI options
  • Loading branch information
dan-blanchard committed Dec 14, 2017
2 parents c9777ce + 0a60495 commit d638c30
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 85 deletions.
7 changes: 7 additions & 0 deletions streamparse/cli/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ def add_ackers(parser):
dest='options')


def add_config(parser):
""" Add --config option to parser """
parser.add_argument('--config',
help='Specify path to config.json',
type=argparse.FileType('r'))


def add_debug(parser):
""" Add --debug option to parser """
parser.add_argument('-d', '--debug',
Expand Down
11 changes: 6 additions & 5 deletions streamparse/cli/kill.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@
from ..thrift import KillOptions
from ..util import (get_topology_definition, get_env_config, get_nimbus_client,
ssh_tunnel)
from .common import add_environment, add_name, add_timeout, add_wait
from .common import add_config, add_environment, add_name, add_timeout, add_wait


def _kill_topology(topology_name, nimbus_client, wait=None):
kill_opts = KillOptions(wait_secs=wait)
nimbus_client.killTopologyWithOpts(name=topology_name, options=kill_opts)


def kill_topology(topology_name=None, env_name=None, wait=None, timeout=None):
def kill_topology(topology_name=None, env_name=None, wait=None, timeout=None, config_file=None):
# For kill, we allow any topology name to be specified, because people
# should be able to kill topologies not in their local branch
if topology_name is None:
topology_name = get_topology_definition(topology_name)[0]
env_name, env_config = get_env_config(env_name)
topology_name = get_topology_definition(topology_name, config_file=config_file)[0]
env_name, env_config = get_env_config(env_name, config_file=config_file)
# Use ssh tunnel with Nimbus if use_ssh_for_nimbus is unspecified or True
with ssh_tunnel(env_config) as (host, port):
nimbus_client = get_nimbus_client(env_config, host=host, port=port,
Expand All @@ -34,6 +34,7 @@ def subparser_hook(subparsers):
description=__doc__,
help=main.__doc__)
subparser.set_defaults(func=main)
add_config(subparser)
add_environment(subparser)
add_name(subparser)
add_timeout(subparser)
Expand All @@ -43,4 +44,4 @@ def subparser_hook(subparsers):
def main(args):
""" Kill the specified Storm topology """
kill_topology(topology_name=args.name, env_name=args.environment,
wait=args.wait, timeout=args.timeout)
wait=args.wait, timeout=args.timeout, config_file=args.config)
9 changes: 5 additions & 4 deletions streamparse/cli/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ..util import (get_env_config, get_nimbus_client, print_stats_table,
ssh_tunnel)
from ..thrift import TopologySummary
from .common import add_environment, add_timeout
from .common import add_config, add_environment, add_timeout


def _list_topologies(nimbus_client):
Expand All @@ -16,9 +16,9 @@ def _list_topologies(nimbus_client):
return cluster_summary.topologies


def list_topologies(env_name, timeout=None):
def list_topologies(env_name, timeout=None, config_file=None):
"""Prints out all running Storm topologies"""
env_name, env_config = get_env_config(env_name)
env_name, env_config = get_env_config(env_name, config_file=config_file)
# Use ssh tunnel with Nimbus if use_ssh_for_nimbus is unspecified or True
with ssh_tunnel(env_config) as (host, port):
nimbus_client = get_nimbus_client(env_config, host=host, port=port,
Expand All @@ -37,10 +37,11 @@ def subparser_hook(subparsers):
description=__doc__,
help=main.__doc__)
subparser.set_defaults(func=main)
add_config(subparser)
add_environment(subparser)
add_timeout(subparser)


def main(args):
""" List the currently running Storm topologies """
list_topologies(args.environment, timeout=args.timeout)
list_topologies(args.environment, timeout=args.timeout, config_file=args.config)
12 changes: 7 additions & 5 deletions streamparse/cli/remove_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from fabric.api import env, execute, parallel, run, sudo
from pkg_resources import parse_version

from .common import (add_environment, add_name, add_override_name, add_pattern,
from .common import (add_config, add_environment, add_name, add_override_name, add_pattern,
add_pool_size)
from ..util import (activate_env, get_env_config, get_topology_definition,
get_logfiles_cmd, get_nimbus_client, nimbus_storm_version,
Expand All @@ -32,13 +32,13 @@ def _remove_logs(topology_name, pattern, remove_worker_logs, user, is_old_storm,

def remove_logs(topology_name=None, env_name=None, pattern=None,
remove_worker_logs=False, user='root', override_name=None,
remove_all_artifacts=False, options=None):
remove_all_artifacts=False, options=None, config_file=None):
"""Remove all Python logs on Storm workers in the log.path directory."""
if override_name is not None:
topology_name = override_name
else:
topology_name = get_topology_definition(topology_name)[0]
env_name, env_config = get_env_config(env_name)
topology_name = get_topology_definition(topology_name, config_file=config_file)[0]
env_name, env_config = get_env_config(env_name, config_file=config_file)
with ssh_tunnel(env_config) as (host, port):
nimbus_client = get_nimbus_client(env_config, host=host, port=port)
is_old_storm = nimbus_storm_version(nimbus_client) < parse_version('1.0')
Expand All @@ -58,6 +58,7 @@ def subparser_hook(subparsers):
'also any other files for the topology in its '
'workers-artifacts subdirectories.',
action='store_true')
add_config(subparser)
add_environment(subparser)
add_name(subparser)
add_override_name(subparser)
Expand All @@ -80,4 +81,5 @@ def main(args):
pattern=args.pattern,
remove_worker_logs=args.remove_worker_logs,
user=args.user, override_name=args.override_name,
remove_all_artifacts=args.remove_all_artifacts)
remove_all_artifacts=args.remove_all_artifacts,
config_file=args.config)
13 changes: 7 additions & 6 deletions streamparse/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
from ..util import (get_config, get_env_config, get_topology_definition,
get_topology_from_file, local_storm_version,
set_topology_serializer, storm_lib_version)
from .common import (add_ackers, add_debug, add_environment, add_name,
from .common import (add_ackers, add_config, add_debug, add_environment, add_name,
add_options, add_workers, resolve_options)
from .jar import jar_for_deploy


def run_local_topology(name=None, env_name=None, time=0, options=None):
def run_local_topology(name=None, env_name=None, time=0, options=None, config_file=None):
"""Run a topology locally using Flux and `storm jar`."""
name, topology_file = get_topology_definition(name)
config = get_config()
env_name, env_config = get_env_config(env_name)
name, topology_file = get_topology_definition(name, config_file=config_file)
config = get_config(config_file=config_file)
env_name, env_config = get_env_config(env_name, config_file=config_file)
topology_class = get_topology_from_file(topology_file)

set_topology_serializer(env_config, config, topology_class)
Expand Down Expand Up @@ -83,6 +83,7 @@ def subparser_hook(subparsers):
formatter_class=RawDescriptionHelpFormatter)
subparser.set_defaults(func=main)
add_ackers(subparser)
add_config(subparser)
add_debug(subparser)
add_environment(subparser)
add_name(subparser)
Expand All @@ -99,4 +100,4 @@ def subparser_hook(subparsers):
def main(args):
"""Run the local topology with the given arguments"""
run_local_topology(name=args.name, time=args.time, options=args.options,
env_name=args.environment)
env_name=args.environment, config_file=args.config)
13 changes: 8 additions & 5 deletions streamparse/cli/slot_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from prettytable import PrettyTable
from six import iteritems

from .common import add_environment
from .common import add_config, add_environment
from ..util import get_ui_json, get_ui_jsons, storm_lib_version


Expand All @@ -20,15 +20,16 @@ def subparser_hook(subparsers):
description=__doc__,
help=main.__doc__)
subparser.set_defaults(func=main)
add_config(subparser)
add_environment(subparser)


def display_slot_usage(env_name):
def display_slot_usage(env_name, config_file=None):
print('Querying Storm UI REST service for slot usage stats (this can take a while)...')
topology_summary_path = '/api/v1/topology/summary'
topology_detail_path = '/api/v1/topology/{topology}'
component_path = '/api/v1/topology/{topology}/component/{component}'
topo_summary_json = get_ui_json(env_name, topology_summary_path)
topo_summary_json = get_ui_json(env_name, topology_summary_path, config_file=config_file)
topology_ids = [x['id'] for x in topo_summary_json['topologies']]
# Keep track of the number of workers used by each topology on each machine
topology_worker_ports = defaultdict(lambda: defaultdict(set))
Expand All @@ -37,7 +38,8 @@ def display_slot_usage(env_name):
topology_components = dict()
topology_detail_jsons = get_ui_jsons(env_name,
(topology_detail_path.format(topology=topology)
for topology in topology_ids))
for topology in topology_ids),
config_file=config_file)

for topology in topology_ids:
topology_detail_json = topology_detail_jsons[topology_detail_path.format(topology=topology)]
Expand All @@ -49,7 +51,8 @@ def display_slot_usage(env_name):
(component_path.format(topology=topology,
component=comp)
for topology, comp_list in iteritems(topology_components)
for comp in comp_list))
for comp in comp_list),
config_file=config_file)

for request_url, comp_detail in iteritems(comp_details):
topology = request_url.split('/')[4]
Expand Down
57 changes: 30 additions & 27 deletions streamparse/cli/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,32 @@
from itertools import chain

from pkg_resources import parse_version
from six import iteritems, string_types
from six import string_types
from six.moves import map, zip

from ..util import (get_env_config, get_ui_json, get_ui_jsons,
print_stats_table, storm_lib_version)
from .common import add_environment, add_name

from .common import add_config, add_environment, add_name


def display_stats(env_name, topology_name=None, component_name=None,
all_components=None):
all_components=None, config_file=None):
env_name = env_name
if topology_name and all_components:
_print_all_components(env_name, topology_name)
_print_all_components(env_name, topology_name, config_file=config_file)
elif topology_name and component_name:
_print_component_status(env_name, topology_name, component_name)
_print_component_status(env_name, topology_name, component_name, config_file=config_file)
elif topology_name:
_print_topology_status(env_name, topology_name)
_print_topology_status(env_name, topology_name, config_file=config_file)
else:
_print_cluster_status(env_name)
_print_cluster_status(env_name, config_file=config_file)


def _print_cluster_status(env_name):
def _print_cluster_status(env_name, config_file=None):
jsons = get_ui_jsons(env_name, ["/api/v1/cluster/summary",
"/api/v1/topology/summary",
"/api/v1/supervisor/summary"])
"/api/v1/supervisor/summary"],
config_file=config_file)
# Print Cluster Summary
ui_cluster_summary = jsons["/api/v1/cluster/summary"]
columns = ['stormVersion', 'nimbusUptime', 'supervisors', 'slotsTotal',
Expand All @@ -53,16 +53,16 @@ def _print_cluster_status(env_name):
{'host': 'l', 'uptime': 'l'})


def _get_topology_ui_detail(env_name, topology_name):
env_name = get_env_config(env_name)[0]
def _get_topology_ui_detail(env_name, topology_name, config_file=None):
env_name = get_env_config(env_name, config_file=config_file)[0]
topology_id = _get_topology_id(env_name, topology_name)
detail_url = '/api/v1/topology/%s' % topology_id
detail = get_ui_json(env_name, detail_url)
detail = get_ui_json(env_name, detail_url, config_file=config_file)
return detail


def _print_topology_status(env_name, topology_name):
ui_detail = _get_topology_ui_detail(env_name, topology_name)
def _print_topology_status(env_name, topology_name, config_file=None):
ui_detail = _get_topology_ui_detail(env_name, topology_name, config_file=config_file)
# Print topology summary
columns = ['name', 'id', 'status', 'uptime', 'workersTotal',
'executorsTotal', 'tasksTotal']
Expand All @@ -86,41 +86,42 @@ def _print_topology_status(env_name, topology_name):
{'boltId': 'l'})


def _get_component_ui_detail(env_name, topology_name, component_names):
def _get_component_ui_detail(env_name, topology_name, component_names, config_file=None):
if isinstance(component_names, string_types):
component_names = [component_names]
env_name = get_env_config(env_name)[0]
topology_id = _get_topology_id(env_name, topology_name)
env_name = get_env_config(env_name, config_file=config_file)[0]
topology_id = _get_topology_id(env_name, topology_name, config_file=config_file)
base_url = '/api/v1/topology/%s/component/%s'
detail_urls = [base_url % (topology_id, name) for name in component_names]
detail = get_ui_jsons(env_name, detail_urls)
detail = get_ui_jsons(env_name, detail_urls, config_file=config_file)
if len(detail) == 1:
return list(detail.values())[0]
else:
return detail


def _print_all_components(env_name, topology_name):
def _print_all_components(env_name, topology_name, config_file=None):
topology_ui_detail = _get_topology_ui_detail(env_name, topology_name)
spouts = map(lambda spout: (spout['spoutId'],
topology_ui_detail.get('spouts', {})))
bolts = map(lambda spout: (spout['boltId'],
topology_ui_detail.get('bolts', {})))
ui_details = _get_component_ui_detail(env_name, topology_name, chain(spouts,
bolts))
bolts),
config_file=config_file)
names_and_keys = zip(map(lambda ui_detail: (ui_detail['name'],
ui_details.values())),
ui_details.keys())
for component_name, key in names_and_keys:
_print_component_status(env_name, topology_name,
component_name, ui_details[key])
component_name, ui_details[key], config_file=config_file)


def _print_component_status(env_name, topology_name, component_name,
ui_detail=None):
ui_detail=None, config_file=None):
if not ui_detail:
ui_detail = _get_component_ui_detail(env_name, topology_name,
component_name)
component_name, config_file=config_file)
_print_component_summary(ui_detail)
if ui_detail.get("componentType") == "spout":
_print_spout_stats(ui_detail)
Expand Down Expand Up @@ -181,11 +182,11 @@ def _print_spout_executors(ui_detail):
columns, 'r', {'host': 'l'})


def _get_topology_id(env_name, topology_name):
def _get_topology_id(env_name, topology_name, config_file=None):
"""Get toplogy ID from summary json provided by UI api
"""
summary_url = '/api/v1/topology/summary'
topology_summary = get_ui_json(env_name, summary_url)
topology_summary = get_ui_json(env_name, summary_url, config_file=config_file)
for topology in topology_summary["topologies"]:
if topology_name == topology["name"]:
return topology["id"]
Expand All @@ -203,6 +204,7 @@ def subparser_hook(subparsers):
subparser.add_argument('-c', '--component',
help='Topology component (bolt/spout) name as '
'specified in Clojure topology specification')
add_config(subparser)
add_environment(subparser)
add_name(subparser)

Expand All @@ -212,7 +214,8 @@ def main(args):
storm_version = storm_lib_version()
if storm_version >= parse_version('0.9.2-incubating'):
display_stats(args.environment, topology_name=args.name,
component_name=args.component, all_components=args.all)
component_name=args.component, all_components=args.all,
config_file=args.config)
else:
print("ERROR: Storm {0} does not support this command."
.format(storm_version))
Expand Down

0 comments on commit d638c30

Please sign in to comment.