Skip to content

Commit

Permalink
Update our thrift.py module to include Storm 1.1.1 thrift bits (#414)
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-blanchard committed Dec 22, 2017
1 parent 348208b commit 9370337
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 5 deletions.
15 changes: 11 additions & 4 deletions streamparse/cli/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@

import simplejson as json
from fabric.api import env
from pkg_resources import parse_version
from six import itervalues

from ..dsl.component import JavaComponentSpec
from ..thrift import ShellComponent

from ..util import (activate_env, get_config, get_env_config,
get_nimbus_client, get_topology_definition,
get_topology_from_file, set_topology_serializer,
ssh_tunnel, warn)
from .common import (add_ackers, add_config, add_debug, add_environment, add_name,
add_options, add_override_name, add_pool_size,
get_topology_from_file, nimbus_storm_version,
set_topology_serializer, ssh_tunnel, warn)
from .common import (add_ackers, add_config, add_debug, add_environment,
add_name, add_options, add_override_name, add_pool_size,
add_requirements, add_timeout, add_wait, add_workers,
resolve_options)
from .jar import jar_for_deploy
Expand Down Expand Up @@ -80,6 +81,12 @@ def _submit_topology(topology_name, topology_class, remote_jar_path, config,

set_topology_serializer(env_config, config, topology_class)

# Check if topology name is okay on Storm versions that support that
if nimbus_storm_version(nimbus_client) >= parse_version('1.1.0'):
if not nimbus_client.isTopologyNameAllowed(topology_name):
raise ValueError('Nimbus says {} is an invalid name for a '
'Storm topology.'.format(topology_name))

print("Submitting {} topology to nimbus...".format(topology_name), end='')
sys.stdout.flush()
nimbus_client.submitTopology(name=topology_name,
Expand Down

0 comments on commit 9370337

Please sign in to comment.