Skip to content

Commit

Permalink
Merge pull request #238 from Parsely/feature/serializer_setting
Browse files Browse the repository at this point in the history
Add serializer setting to config.json
  • Loading branch information
dan-blanchard committed Apr 11, 2016
2 parents 5e6377d + f9a01e9 commit 728e94f
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 10 deletions.
2 changes: 1 addition & 1 deletion doc/source/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ in our ``config.json`` file:
.. code-block:: json
{
"library": "",
"serializer": "json",
"topology_specs": "topologies/",
"virtualenv_specs": "virtualenvs/",
"envs": {
Expand Down
4 changes: 2 additions & 2 deletions streamparse/bootstrap/project/config.jinja2.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{
"library": "",
"serializer": "json",
"topology_specs": "topologies/",
"virtualenv_specs": "virtualenvs/",
"envs": {
"prod": {
"user": "",
"ssh_password": "",
"ssh_password": "",
"nimbus": "",
"workers": [],
"log": {
Expand Down
36 changes: 30 additions & 6 deletions streamparse/cli/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ def _upload_jar(nimbus_client, local_path):


def submit_topology(name=None, env_name="prod", workers=2, ackers=2,
options=None, force=False, debug=False, wait=None):
options=None, force=False, debug=False, wait=None,
simple_jar=True):
"""Submit a topology to a remote Storm cluster."""
config = get_config()
name, topology_file = get_topology_definition(name)
Expand Down Expand Up @@ -212,16 +213,34 @@ def submit_topology(name=None, env_name="prod", workers=2, ackers=2,
for thrift_bolt in itervalues(topology_class.thrift_bolts):
inner_shell = thrift_bolt.bolt_object.shell
if isinstance(inner_shell, ShellComponent):
inner_shell.execution_command = streamparse_run_path
if 'streamparse_run' in inner_shell.execution_command:
inner_shell.execution_command = streamparse_run_path
# Update python paths in spouts
for thrift_spout in itervalues(topology_class.thrift_spouts):
inner_shell = thrift_spout.spout_object.shell
if isinstance(inner_shell, ShellComponent):
inner_shell.execution_command = streamparse_run_path
if 'streamparse_run' in inner_shell.execution_command:
inner_shell.execution_command = streamparse_run_path

serializer = env_config.get('serializer', config.get('serializer', 'json'))
# Set serializer arg in bolts
for thrift_bolt in itervalues(topology_class.thrift_bolts):
inner_shell = thrift_bolt.bolt_object.shell
if isinstance(inner_shell, ShellComponent):
inner_shell.script = '-s {} {}'.format(serializer,
inner_shell.script)
# Set serializer arg in spouts
for thrift_spout in itervalues(topology_class.thrift_spouts):
inner_shell = thrift_spout.spout_object.shell
if isinstance(inner_shell, ShellComponent):
inner_shell.script = '-s {} {}'.format(serializer,
inner_shell.script)


# Check topology for JVM stuff to see if we need to create uber-jar
simple_jar = not any(isinstance(spec, JavaComponentSpec)
for spec in topology_class.specs)
if simple_jar:
simple_jar = not any(isinstance(spec, JavaComponentSpec)
for spec in topology_class.specs)

# Prepare a JAR that doesn't have Storm dependencies packaged
topology_jar = jar_for_deploy(simple_jar=simple_jar)
Expand Down Expand Up @@ -255,6 +274,11 @@ def subparser_hook(subparsers):
add_name(subparser)
add_options(subparser)
add_par(subparser)
subparser.add_argument('-u', '--uber_jar',
help='Build an Uber-JAR even if you have no Java '
'components in your topology. Useful if you '
'are providing your own seriailzer class.',
dest='simple_jar', action='store_false')
add_wait(subparser)
add_workers(subparser)

Expand All @@ -265,4 +289,4 @@ def main(args):
submit_topology(name=args.name, env_name=args.environment,
workers=args.workers, ackers=args.ackers,
options=args.options, force=args.force, debug=args.debug,
wait=args.wait)
wait=args.wait, simple_jar=args.simple_jar)
12 changes: 11 additions & 1 deletion streamparse/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import os
import sys

from pystorm.component import _SERIALIZERS


def main():
"""main entry point for Python bolts and spouts"""
Expand All @@ -16,6 +18,14 @@ def main():
'classes via ``python -m '
'streamparse.run <class name>``.')
parser.add_argument('target_class', help='The bolt/spout class to start.')
parser.add_argument('-s', '--serializer',
help='The serialization protocol to use to talk to '
'Storm.',
choices=_SERIALIZERS.keys(),
default='json')
# Storm sends everything as one string, which is not great
if len(sys.argv) == 2:
sys.argv = [sys.argv[0]] + sys.argv[1].split()
args = parser.parse_args()
# Add current directory to sys.path so imports will work
sys.path.append(os.getcwd())
Expand All @@ -24,7 +34,7 @@ def main():
mod = importlib.import_module(mod_name)
# Get class from module and run it
cls = getattr(mod, cls_name)
cls().run()
cls(serializer=args.serializer).run()


if __name__ == '__main__':
Expand Down

0 comments on commit 728e94f

Please sign in to comment.