Skip to content

Commit

Permalink
Merge pull request #393 from Parsely/fix/remove_logs_user
Browse files Browse the repository at this point in the history
Make sparse tail and sparse remove_logs work better with Storm 1.x
  • Loading branch information
dan-blanchard committed Aug 30, 2017
2 parents bafb72b + 4105e78 commit a18b5a4
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 31 deletions.
9 changes: 9 additions & 0 deletions streamparse/cli/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ def add_pattern(parser):
help='Pattern of log files to operate on.')


def add_pool_size(parser):
""" Add --pool_size option to parser """
parser.add_argument('--pool_size',
help='Number of simultaneous SSH connections to use when updating '
'virtualenvs, removing logs, or tailing logs.',
default=10,
type=int)


def add_requirements(parser):
""" Add --requirements option to parser """
parser.add_argument('-r', '--requirements',
Expand Down
55 changes: 42 additions & 13 deletions streamparse/cli/remove_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,47 @@

from __future__ import absolute_import, print_function

from fabric.api import env, execute, parallel, sudo
from fabric.api import env, execute, parallel, run, sudo
from pkg_resources import parse_version

from .common import add_environment, add_name, add_pattern
from ..util import activate_env, get_topology_definition, get_logfiles_cmd
from .common import (add_environment, add_name, add_override_name, add_pattern,
add_pool_size)
from ..util import (activate_env, get_env_config, get_topology_definition,
get_logfiles_cmd, get_nimbus_client, nimbus_storm_version,
ssh_tunnel)


@parallel
def _remove_logs(topology_name, pattern, remove_worker_logs):
def _remove_logs(topology_name, pattern, remove_worker_logs, user, is_old_storm,
remove_all_artifacts):
"""
Actual task to remove logs on all servers in parallel.
"""
ls_cmd = get_logfiles_cmd(topology_name=topology_name, pattern=pattern,
include_worker_logs=remove_worker_logs)
rm_pipe = " | xargs rm"
sudo(ls_cmd + rm_pipe, warn_only=True)
include_worker_logs=remove_worker_logs,
include_all_artifacts=remove_all_artifacts)
rm_pipe = " | xargs rm -f"
if user == env.user:
run(ls_cmd + rm_pipe, warn_only=True)
else:
sudo(ls_cmd + rm_pipe, warn_only=True, user=user)


def remove_logs(topology_name=None, env_name=None, pattern=None,
remove_worker_logs=False):
remove_worker_logs=False, user='root', override_name=None,
remove_all_artifacts=False):
"""Remove all Python logs on Storm workers in the log.path directory."""
topology_name = get_topology_definition(topology_name)[0]
if override_name is not None:
topology_name = override_name
else:
topology_name = get_topology_definition(topology_name)[0]
env_name, env_config = get_env_config(env_name)
with ssh_tunnel(env_config) as (host, port):
nimbus_client = get_nimbus_client(env_config, host=host, port=port)
is_old_storm = nimbus_storm_version(nimbus_client) < parse_version('1.0')
activate_env(env_name)
execute(_remove_logs, topology_name, pattern, remove_worker_logs,
hosts=env.storm_workers)
execute(_remove_logs, topology_name, pattern, remove_worker_logs, user,
is_old_storm, remove_all_artifacts, hosts=env.storm_workers)


def subparser_hook(subparsers):
Expand All @@ -36,19 +53,31 @@ def subparser_hook(subparsers):
description=__doc__,
help=main.__doc__)
subparser.set_defaults(func=main)
subparser.add_argument('-A', '--remove_all_artifacts',
help='Remove not only topology-specific logs, but '
'also any other files for the topology in its '
'workers-artifacts subdirectories.',
action='store_true')
add_environment(subparser)
add_name(subparser)
add_override_name(subparser)
add_pattern(subparser)
add_pool_size(subparser)
subparser.add_argument('-u', '--user',
help="User argument to sudo when deleting logs.")
help="User argument to sudo when deleting logs.",
default='root')
subparser.add_argument('-w', '--remove_worker_logs',
help='Remove not only topology-specific logs, but '
'also worker logs that may be shared between '
'topologies.',
action='store_true')


def main(args):
""" Remove logs from Storm workers. """
env.pool_size = args.pool_size
remove_logs(topology_name=args.name, env_name=args.environment,
pattern=args.pattern,
remove_worker_logs=args.remove_worker_logs)
remove_worker_logs=args.remove_worker_logs,
user=args.user, override_name=args.override_name,
remove_all_artifacts=args.remove_all_artifacts)
7 changes: 5 additions & 2 deletions streamparse/cli/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
get_topology_from_file, set_topology_serializer,
ssh_tunnel, warn)
from .common import (add_ackers, add_debug, add_environment, add_name,
add_options, add_override_name, add_requirements,
add_timeout, add_wait, add_workers, resolve_options)
add_options, add_override_name, add_pool_size,
add_requirements, add_timeout, add_wait, add_workers,
resolve_options)
from .jar import jar_for_deploy
from .kill import _kill_topology
from .list import _list_topologies
Expand Down Expand Up @@ -247,6 +248,7 @@ def subparser_hook(subparsers):
add_name(subparser)
add_options(subparser)
add_override_name(subparser)
add_pool_size(subparser)
add_requirements(subparser)
subparser.add_argument('-R', '--remote_jar_path',
help='Path to a prebuilt JAR that already exists on '
Expand All @@ -266,6 +268,7 @@ def subparser_hook(subparsers):

def main(args):
""" Submit a Storm topology to Nimbus. """
env.pool_size = args.pool_size
submit_topology(name=args.name, env_name=args.environment,
options=args.options, force=args.force, wait=args.wait,
simple_jar=args.simple_jar,
Expand Down
31 changes: 23 additions & 8 deletions streamparse/cli/tail.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,44 @@
from __future__ import absolute_import, print_function

from fabric.api import env, execute, parallel, run
from pkg_resources import parse_version

from .common import add_environment, add_name, add_pattern
from ..util import activate_env, get_logfiles_cmd, get_topology_definition
from .common import (add_environment, add_name, add_override_name, add_pattern,
add_pool_size)
from ..util import (activate_env, get_env_config, get_logfiles_cmd,
get_topology_definition, get_nimbus_client,
nimbus_storm_version, ssh_tunnel)


@parallel
def _tail_logs(topology_name, pattern, follow, num_lines):
def _tail_logs(topology_name, pattern, follow, num_lines, is_old_storm):
"""
Actual task to run tail on all servers in parallel.
"""
ls_cmd = get_logfiles_cmd(topology_name=topology_name, pattern=pattern)
ls_cmd = get_logfiles_cmd(topology_name=topology_name, pattern=pattern,
is_old_storm=is_old_storm)
tail_pipe = " | xargs tail -n {}".format(num_lines)
if follow:
tail_pipe += " -f"
run(ls_cmd + tail_pipe)


def tail_topology(topology_name=None, env_name=None, pattern=None, follow=False,
num_lines=10):
num_lines=10, override_name=None):
"""Follow (tail -f) the log files on remote Storm workers.
Will use the `log_path` and `workers` properties from config.json.
"""
topology_name = get_topology_definition(topology_name)[0]
if override_name is not None:
topology_name = override_name
else:
topology_name = get_topology_definition(topology_name)[0]
env_name, env_config = get_env_config(env_name)
with ssh_tunnel(env_config) as (host, port):
nimbus_client = get_nimbus_client(env_config, host=host, port=port)
is_old_storm = nimbus_storm_version(nimbus_client) < parse_version('1.0')
activate_env(env_name)
execute(_tail_logs, topology_name, pattern, follow, num_lines,
execute(_tail_logs, topology_name, pattern, follow, num_lines, is_old_storm,
hosts=env.storm_workers)


Expand All @@ -52,11 +64,14 @@ def subparser_hook(subparsers):
help='tail outputs the last NUM_LINES lines of the '
'logs. (default: %(default)s)')
add_name(subparser)
add_override_name(subparser)
add_pool_size(subparser)
add_pattern(subparser)


def main(args):
""" Tail logs for specified Storm topology. """
env.pool_size = args.pool_size
tail_topology(topology_name=args.name, env_name=args.environment,
pattern=args.pattern, follow=args.follow,
num_lines=args.num_lines)
num_lines=args.num_lines, override_name=args.override_name)
4 changes: 3 additions & 1 deletion streamparse/cli/update_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from six import string_types

from .common import (add_environment, add_name, add_override_name,
add_requirements)
add_pool_size, add_requirements)
from ..util import (activate_env, die, get_config, get_env_config,
get_topology_definition)

Expand Down Expand Up @@ -109,11 +109,13 @@ def subparser_hook(subparsers):
add_environment(subparser)
add_name(subparser)
add_override_name(subparser)
add_pool_size(subparser)
add_requirements(subparser)


def main(args):
""" Create or update a virtualenv on Storm workers. """
env.pool_size = args.pool_size
create_or_update_virtualenvs(args.environment, args.name,
virtualenv_name=args.override_name,
requirements_paths=args.requirements)
48 changes: 41 additions & 7 deletions streamparse/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def ssh_tunnel(env_config, local_port=6627, remote_port=None, quiet=False):
host, nimbus_port = get_nimbus_host_port(env_config)
if remote_port is None:
remote_port = nimbus_port
if env_config.get('use_ssh_for_nimbus', True):
if is_ssh_for_nimbus(env_config):
need_setup = True
while _port_in_use(local_port):
if local_port in _active_tunnels:
Expand Down Expand Up @@ -122,8 +122,9 @@ def activate_env(env_name=None):

env.storm_workers = get_storm_workers(env_config)
env.user = env_config.get("user")
env.log_path = env_config.get("log_path") or \
env_config.get("log", {}).get("path")
env.log_path = (env_config.get("log_path") or
env_config.get("log", {}).get("path") or
get_nimbus_config(env_config).get('storm.log.dir'))
env.virtualenv_root = env_config.get("virtualenv_root") or \
env_config.get("virtualenv_path")
env.disable_known_hosts = True
Expand All @@ -133,6 +134,7 @@ def activate_env(env_name=None):
if env_config.get("ssh_password"):
env.password = env_config.get("ssh_password")


def die(msg, error_code=1):
print("{}: {}".format(red("error"), msg))
sys.exit(error_code)
Expand Down Expand Up @@ -310,6 +312,25 @@ def get_storm_workers(env_config):
return worker_list


_nimbus_configs = {}
def get_nimbus_config(env_config):
"""Retrieves a dict with all the config info stored in Nimbus
:param env_config: The project's parsed config.
:type env_config: `dict`
:returns: dict of Nimbus settings
"""
nimbus_info = get_nimbus_host_port(env_config)
if nimbus_info not in _nimbus_configs:
with ssh_tunnel(env_config) as (host, port):
nimbus_client = get_nimbus_client(env_config, host=host, port=port)
nimbus_json = nimbus_client.getNimbusConf()
nimbus_conf = json.loads(nimbus_json)
_nimbus_configs[nimbus_info] = nimbus_conf
return _nimbus_configs[nimbus_info]


def is_ssh_for_nimbus(env_config):
"""Check if we need to use SSH access to Nimbus or not.
"""
Expand Down Expand Up @@ -433,22 +454,35 @@ def _get_file_names_command(path, patterns):
"""Given a list of bash `find` patterns, return a string for the
bash command that will find those pystorm log files
"""
patterns = "' -o -type f -name '".join(patterns)
if path is None:
raise ValueError('path cannot be None')
patterns = "' -o -type f -wholename '".join(patterns)
return ("cd {path} && "
"find . -maxdepth 4 -type f -name '{patterns}'") \
"find . -maxdepth 4 -type f -wholename '{patterns}'") \
.format(path=path, patterns=patterns)


def get_logfiles_cmd(topology_name=None, pattern=None, include_worker_logs=True):
def get_logfiles_cmd(topology_name=None, pattern=None, include_worker_logs=True,
is_old_storm=False, include_all_artifacts=False):
""" Returns a string representing a command to run on the Storm workers that
will yield all of the logfiles for the given topology that meet the given
pattern (if specified).
"""
log_name_patterns = ["pystorm_{topo_name}*".format(topo_name=topology_name)]
log_name_patterns = ["*{topo_name}*".format(topo_name=topology_name)]
if not include_all_artifacts:
log_name_patterns[0] += '.log'
# The worker logs are separated by topology in Storm 1.0+, so no need to do
# this except on old versions of Storm
if not is_old_storm:
include_worker_logs = False
# list log files found
if include_worker_logs:
log_name_patterns.extend(["worker*", "supervisor*", "access*",
"metrics*"])
if env.log_path is None:
raise ValueError('Cannot find log files if you do not set `log_path` '
'or the `path` key in the `log` dict for your '
'environment in your config.json.')
ls_cmd = _get_file_names_command(env.log_path, log_name_patterns)
if pattern is not None:
ls_cmd += " | egrep '{pattern}'".format(pattern=pattern)
Expand Down

0 comments on commit a18b5a4

Please sign in to comment.