Skip to content

Commit

Permalink
Allow submitting topologies as inactive/deactivated (#448)
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-blanchard committed Aug 28, 2018
1 parent 0b4b2cb commit f3fb5a9
Showing 1 changed file with 22 additions and 9 deletions.
31 changes: 22 additions & 9 deletions streamparse/cli/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from six import itervalues

from ..dsl.component import JavaComponentSpec
from ..thrift import ShellComponent
from ..thrift import ShellComponent, SubmitOptions, TopologyInitialStatus

from ..util import (activate_env, get_config, get_env_config,
get_nimbus_client, get_topology_definition,
Expand Down Expand Up @@ -75,7 +75,7 @@ def _kill_existing_topology(topology_name, force, wait, nimbus_client):


def _submit_topology(topology_name, topology_class, remote_jar_path, config,
env_config, nimbus_client, options=None):
env_config, nimbus_client, options=None, active=True):
if options.get('pystorm.log.path'):
print("Routing Python logging to {}.".format(options['pystorm.log.path']))
sys.stdout.flush()
Expand All @@ -90,10 +90,14 @@ def _submit_topology(topology_name, topology_class, remote_jar_path, config,

print("Submitting {} topology to nimbus...".format(topology_name), end='')
sys.stdout.flush()
nimbus_client.submitTopology(name=topology_name,
uploadedJarLocation=remote_jar_path,
jsonConf=json.dumps(options),
topology=topology_class.thrift_topology)
initial_status = (TopologyInitialStatus.ACTIVE if active
else TopologyInitialStatus.INACTIVE)
submit_options = SubmitOptions(initial_status=initial_status)
nimbus_client.submitTopologyWithOpts(name=topology_name,
uploadedJarLocation=remote_jar_path,
jsonConf=json.dumps(options),
topology=topology_class.thrift_topology,
options=submit_options)
print('done')


Expand Down Expand Up @@ -145,7 +149,7 @@ def submit_topology(name=None, env_name=None, options=None, force=False,
wait=None, simple_jar=True, override_name=None,
requirements_paths=None, local_jar_path=None,
remote_jar_path=None, timeout=None, config_file=None,
overwrite_virtualenv=False, user='root'):
overwrite_virtualenv=False, user='root', active=True):
"""Submit a topology to a remote Storm cluster."""
config = get_config(config_file=config_file)
name, topology_file = get_topology_definition(name, config_file=config_file)
Expand Down Expand Up @@ -236,7 +240,8 @@ def submit_topology(name=None, env_name=None, options=None, force=False,
remote_jar_path = _upload_jar(nimbus_client, local_jar_path)
_kill_existing_topology(override_name, force, wait, nimbus_client)
_submit_topology(override_name, topology_class, remote_jar_path, config,
env_config, nimbus_client, options=options)
env_config, nimbus_client, options=options,
active=active)
_post_submit_hooks(override_name, env_name, env_config, options)


Expand All @@ -255,6 +260,13 @@ def subparser_hook(subparsers):
help='Force a topology to submit by killing any '
'currently running topologies with the same '
'name.')
subparser.add_argument('-i', '--inactive',
help='Submit topology as inactive instead of active.'
' This is useful if you are migrating the '
'topology to a new environment and already '
'have it running actively in an older one.',
action='store_false',
dest='active')
subparser.add_argument('-j', '--local_jar_path',
help='Path to a prebuilt JAR to upload to Nimbus. '
'This is useful when you have multiple '
Expand Down Expand Up @@ -296,4 +308,5 @@ def main(args):
timeout=args.timeout,
config_file=args.config,
overwrite_virtualenv=args.overwrite_virtualenv,
user=args.user)
user=args.user,
active=args.active)

0 comments on commit f3fb5a9

Please sign in to comment.