Skip to content

Commit

Permalink
Merge pull request #455 from Parsely/fix/permissions_issues_update_vi…
Browse files Browse the repository at this point in the history
…rtualenv

Fix permissions when env user cannot be used for ssh login
  • Loading branch information
dan-blanchard committed Apr 8, 2019
2 parents 0a64bd5 + e80b327 commit 2aecc4b
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 55 deletions.
8 changes: 8 additions & 0 deletions doc/source/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,14 @@ This implies a few requirements about the user you specify per environment:
2. Must have write access to the ``virtualenv_root`` on all servers in your
Storm cluster

If you would like to use your system user for creating the SSH connection to
the Storm cluster, you can omit the ``user`` setting from your ``config.json``.

By default the ``root`` user is used for creating virtualenvs when you do not
specify a ``user`` in your ``config.json``. To override this, set the
``sudo_user`` option in your ``config.json``. ``sudo_user`` will default to
``user`` if one is specified.

streamparse also assumes that virtualenv is installed on all Storm servers.

Once an environment is configured, we could deploy our wordcount topology like
Expand Down
43 changes: 38 additions & 5 deletions streamparse/cli/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
import argparse
import copy
import warnings

from ruamel import yaml
from six import integer_types, string_types
Expand Down Expand Up @@ -220,11 +221,24 @@ def add_timeout(parser):
)


def add_user(parser):
""" Add --user option to parser """
parser.add_argument(
"--user", help="User argument to sudo when deleting files.", default="root"
)
def add_user(parser, allow_short=False):
""" Add --user option to parser
Set allow_short to add -u as well.
"""
args = ["--user"]
if allow_short:
args.insert(0, "-u")

kwargs = {
"help": "User argument to sudo when creating and deleting virtualenvs.",
"default": None,
"type": option_alias("sudo_user"),
"dest": "options",
"action": _StoreDictAction,
}

parser.add_argument(*args, **kwargs)


def add_wait(parser):
Expand Down Expand Up @@ -303,6 +317,9 @@ def resolve_options(
if venv_option in env_config:
storm_options[venv_option] = env_config[venv_option]

# Set sudo_user default to SSH user if we have one
storm_options["sudo_user"] = env_config.get("user", None)

# Override options with topology options
storm_options.update(topology_class.config)

Expand Down Expand Up @@ -330,4 +347,20 @@ def resolve_options(
if storm_options.get("topology.workers") is None:
storm_options["topology.workers"] = num_storm_workers

# If sudo_user was not present anywhere, set it to "root"
storm_options.setdefault("sudo_user", "root")

return storm_options


def warn_about_deprecated_user(user, func_name):
if user is not None:
warnings.warn(
(
"The 'user' argument to '{}' will be removed in the next "
"major release of streamparse. Provide the 'sudo_user' key to"
" the 'options' dict argument instead."
).format(func_name),
DeprecationWarning,
)

48 changes: 20 additions & 28 deletions streamparse/cli/remove_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

from __future__ import absolute_import, print_function

from fabric.api import env, execute, parallel, run, sudo
from pkg_resources import parse_version
from fabric.api import env, execute, parallel

from .common import (
add_config,
Expand All @@ -14,21 +13,23 @@
add_override_name,
add_pattern,
add_pool_size,
add_user,
resolve_options,
warn_about_deprecated_user,
)
from ..util import (
activate_env,
get_env_config,
get_topology_definition,
get_topology_from_file,
get_logfiles_cmd,
get_nimbus_client,
nimbus_storm_version,
ssh_tunnel,
run_cmd,
)


@parallel
def _remove_logs(
topology_name, pattern, remove_worker_logs, user, is_old_storm, remove_all_artifacts
topology_name, pattern, remove_worker_logs, user, remove_all_artifacts
):
"""
Actual task to remove logs on all servers in parallel.
Expand All @@ -40,42 +41,36 @@ def _remove_logs(
include_all_artifacts=remove_all_artifacts,
)
rm_pipe = " | xargs rm -f"
if user == env.user:
run(ls_cmd + rm_pipe, warn_only=True)
else:
sudo(ls_cmd + rm_pipe, warn_only=True, user=user)
run_cmd(ls_cmd + rm_pipe, user, warn_only=True)


def remove_logs(
topology_name=None,
env_name=None,
pattern=None,
remove_worker_logs=False,
user="root",
user=None,
override_name=None,
remove_all_artifacts=False,
options=None,
config_file=None,
):
"""Remove all Python logs on Storm workers in the log.path directory."""
if override_name is not None:
topology_name = override_name
else:
topology_name = get_topology_definition(topology_name, config_file=config_file)[
0
]
warn_about_deprecated_user(user, "remove_logs")
topology_name, topology_file = get_topology_definition(
override_name or topology_name, config_file=config_file
)
topology_class = get_topology_from_file(topology_file)
env_name, env_config = get_env_config(env_name, config_file=config_file)
with ssh_tunnel(env_config) as (host, port):
nimbus_client = get_nimbus_client(env_config, host=host, port=port)
is_old_storm = nimbus_storm_version(nimbus_client) < parse_version("1.0")
activate_env(env_name, options=options)
storm_options = resolve_options(options, env_config, topology_class, topology_name)
activate_env(env_name, storm_options, config_file=config_file)
execute(
_remove_logs,
topology_name,
pattern,
remove_worker_logs,
user,
is_old_storm,
# TODO: Remove "user" in next major version
user or storm_options["sudo_user"],
remove_all_artifacts,
hosts=env.storm_workers,
)
Expand All @@ -101,10 +96,7 @@ def subparser_hook(subparsers):
add_override_name(subparser)
add_pattern(subparser)
add_pool_size(subparser)
# Not using add_user because we need -u for backward compatibility
subparser.add_argument(
"-u", "--user", help="User argument to sudo when deleting logs.", default="root"
)
add_user(subparser, allow_short=True)
subparser.add_argument(
"-w",
"--remove_worker_logs",
Expand All @@ -123,7 +115,7 @@ def main(args):
env_name=args.environment,
pattern=args.pattern,
remove_worker_logs=args.remove_worker_logs,
user=args.user,
options=args.options,
override_name=args.override_name,
remove_all_artifacts=args.remove_all_artifacts,
config_file=args.config,
Expand Down
5 changes: 3 additions & 2 deletions streamparse/cli/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
add_wait,
add_workers,
resolve_options,
warn_about_deprecated_user,
)
from .jar import jar_for_deploy
from .kill import _kill_topology
Expand Down Expand Up @@ -194,10 +195,11 @@ def submit_topology(
timeout=None,
config_file=None,
overwrite_virtualenv=False,
user="root",
user=None,
active=True,
):
"""Submit a topology to a remote Storm cluster."""
warn_about_deprecated_user(user, "submit_topology")
config = get_config(config_file=config_file)
name, topology_file = get_topology_definition(name, config_file=config_file)
env_name, env_config = get_env_config(env_name, config_file=config_file)
Expand Down Expand Up @@ -390,6 +392,5 @@ def main(args):
timeout=args.timeout,
config_file=args.config,
overwrite_virtualenv=args.overwrite_virtualenv,
user=args.user,
active=args.active,
)
32 changes: 14 additions & 18 deletions streamparse/cli/update_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import os
from io import open

from fabric.api import env, execute, parallel, prefix, put, puts, run, show, sudo
from fabric.api import env, execute, parallel, put, puts, run, show
from fabric.contrib.files import exists
from six import string_types

Expand All @@ -24,6 +24,7 @@
add_requirements,
add_user,
resolve_options,
warn_about_deprecated_user,
)
from ..util import (
activate_env,
Expand All @@ -32,39 +33,33 @@
get_env_config,
get_topology_definition,
get_topology_from_file,
run_cmd,
)


def _run_cmd(cmd, user, warn_only=False):
if user == env.user:
run(cmd, warn_only=warn_only)
else:
sudo(cmd, warn_only=warn_only, user=user)


@parallel
def _create_or_update_virtualenv(
virtualenv_root,
virtualenv_name,
requirements_paths,
virtualenv_flags=None,
overwrite_virtualenv=False,
user="root",
user=None,
):
with show("output"):
virtualenv_path = "/".join((virtualenv_root, virtualenv_name))
if overwrite_virtualenv:
puts("Removing virtualenv if it exists in {}".format(virtualenv_root))
cmd = "rm -rf {}".format(virtualenv_path)
_run_cmd(cmd, user, warn_only=True)
run_cmd(cmd, user, warn_only=True)
if not exists(virtualenv_path):
if virtualenv_flags is None:
virtualenv_flags = ""
puts("virtualenv not found in {}, creating one.".format(virtualenv_root))
cmd = "virtualenv --never-download {} {}".format(
virtualenv_path, virtualenv_flags
)
_run_cmd(cmd, user)
run_cmd(cmd, user)

if isinstance(requirements_paths, string_types):
requirements_paths = [requirements_paths]
Expand All @@ -73,19 +68,20 @@ def _create_or_update_virtualenv(
puts("Uploading {} to temporary file.".format(requirements_path))
temp_req = run("mktemp /tmp/streamparse_requirements-XXXXXXXXX.txt")
temp_req_paths.append(temp_req)
put(requirements_path, temp_req)
put(requirements_path, temp_req, mode="0666")

puts("Updating virtualenv: {}".format(virtualenv_name))
pip_path = "/".join((virtualenv_path, "bin", "pip"))
# Make sure we're using latest pip so options work as expected
run("{} install --upgrade 'pip>=9.0,!=19.0'".format(pip_path), pty=False)
run(
run_cmd("{} install --upgrade 'pip>=9.0,!=19.0'".format(pip_path), user)
run_cmd(
(
"{} install -r {} --exists-action w --upgrade "
"--upgrade-strategy only-if-needed --progress-bar off"
).format(pip_path, " -r ".join(temp_req_paths)),
pty=False,
user,
)

run("rm -f {}".format(" ".join(temp_req_paths)))


Expand All @@ -97,7 +93,7 @@ def create_or_update_virtualenvs(
requirements_paths=None,
config_file=None,
overwrite_virtualenv=False,
user="root",
user=None,
):
"""Create or update virtualenvs on remote servers.
Expand All @@ -113,6 +109,7 @@ def create_or_update_virtualenvs(
if it already exists.
:param user: Who to delete virtualenvs as when using overwrite_virtualenv
"""
warn_about_deprecated_user(user, "create_or_update_virtualenvs")
config = get_config()
topology_name, topology_file = get_topology_definition(
topology_name, config_file=config_file
Expand Down Expand Up @@ -159,7 +156,7 @@ def create_or_update_virtualenvs(
virtualenv_flags=storm_options.get("virtualenv_flags"),
hosts=env.storm_workers,
overwrite_virtualenv=overwrite_virtualenv,
user=user,
user=user or storm_options["sudo_user"],
)


Expand Down Expand Up @@ -191,5 +188,4 @@ def main(args):
requirements_paths=args.requirements,
config_file=args.config,
overwrite_virtualenv=args.overwrite_virtualenv,
user=args.user,
)
11 changes: 9 additions & 2 deletions streamparse/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

import requests
import simplejson as json
from fabric.api import env, hide, local, settings
from fabric.api import env, hide, local, settings, show, run, sudo
from fabric.colors import red, yellow
from pkg_resources import parse_version
from texttable import Texttable
from six import iteritems, itervalues
from six import itervalues
from six.moves.socketserver import UDPServer, TCPServer
from thriftpy.protocol import TBinaryProtocolFactory
from thriftpy.rpc import make_client
Expand Down Expand Up @@ -599,3 +599,10 @@ def set_topology_serializer(env_config, config, topology_class):
inner_shell = thrift_spout.spout_object.shell
if inner_shell is not None:
inner_shell.script = "-s {} {}".format(serializer, inner_shell.script)


def run_cmd(cmd, user, **kwargs):
with show("everything"):
return (
run(cmd, **kwargs) if user == env.user else sudo(cmd, user=user, **kwargs)
)

0 comments on commit 2aecc4b

Please sign in to comment.