Skip to content

Commit

Permalink
Fix #391 where sparse run needed nimbus configured in config.json (#392)
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-blanchard committed Aug 29, 2017
1 parent 150c8cd commit e42803f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
11 changes: 9 additions & 2 deletions streamparse/cli/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,14 @@ def add_workers(parser):
dest='options')


def resolve_options(cli_options, env_config, topology_class, topology_name):
def resolve_options(cli_options, env_config, topology_class, topology_name,
local_only=False):
"""Resolve potentially conflicting Storm options from three sources:
CLI options > Topology options > config.json options
:param local_only: Whether or not we should talk to Nimbus to get Storm
workers and other info.
"""
storm_options = {}

Expand Down Expand Up @@ -216,7 +220,10 @@ def resolve_options(cli_options, env_config, topology_class, topology_name):
storm_options['pystorm.log.level'] = 'debug'

# If ackers and executors still aren't set, use number of worker nodes
num_storm_workers = len(get_storm_workers(env_config))
if not local_only:
num_storm_workers = len(get_storm_workers(env_config))
else:
num_storm_workers = 1
if storm_options.get('topology.acker.executors') is None:
storm_options['topology.acker.executors'] = num_storm_workers
if storm_options.get('topology.workers') is None:
Expand Down
3 changes: 2 additions & 1 deletion streamparse/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def run_local_topology(name=None, env_name=None, time=0, options=None):

set_topology_serializer(env_config, config, topology_class)

storm_options = resolve_options(options, env_config, topology_class, name)
storm_options = resolve_options(options, env_config, topology_class, name,
local_only=True)
if storm_options['topology.acker.executors'] != 0:
storm_options['topology.acker.executors'] = 1
storm_options['topology.workers'] = 1
Expand Down

0 comments on commit e42803f

Please sign in to comment.