Skip to content

Commit

Permalink
Merge pull request #126 from Parsely/feature/dsl_verify
Browse files Browse the repository at this point in the history
Verification for Python Topology
  • Loading branch information
dan-blanchard committed Apr 24, 2015
2 parents 837a6c6 + 42ec2c3 commit e888e57
Show file tree
Hide file tree
Showing 36 changed files with 1,057 additions and 231 deletions.
38 changes: 25 additions & 13 deletions doc/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,46 @@ API
Tuples
------

.. autoclass:: streamparse.component.Tuple
.. autoclass:: streamparse.storm.component.Tuple

You should never have to instantiate an instance of a
:class:`streamparse.component.Tuple` yourself as streamparse handles this for you
prior to, for example, a :class:`streamparse.bolt.Bolt`'s ``process()`` method
:class:`streamparse.storm.component.Tuple` yourself as streamparse handles this for you
prior to, for example, a :class:`streamparse.storm.bolt.Bolt`'s ``process()`` method
being called.

None of the emit methods for bolts or spouts require that you pass a
:class:`streamparse.component.Tuple` instance.
:class:`streamparse.storm.component.Tuple` instance.


Components
----------

Both :class:`streamparse.storm.bolt.Bolt` and
:class:`streamparse.storm.spout.Spout` inherit from a common base-class,
:class:`streamparse.storm.component.Component`. It handles the basic
`Multi-Lang IPC between Storm and Python <https://storm.apache.org/documentation/Multilang-protocol.html>`__.

.. autoclass:: streamparse.storm.component.Component
:inherited-members:

Spouts
------
^^^^^^

Spouts are data sources for topologies, they can read from any data source and
emit tuples into streams.

.. autoclass:: streamparse.spout.Spout
:members:

.. autoclass:: streamparse.storm.spout.Spout
:inherited-members:
:show-inheritance:

Bolts
-----
^^^^^

.. autoclass:: streamparse.bolt.Bolt
:members:
.. autoclass:: streamparse.storm.bolt.Bolt
:inherited-members:
:show-inheritance:

.. autoclass:: streamparse.bolt.BatchingBolt
:members:

.. autoclass:: streamparse.storm.bolt.BatchingBolt
:inherited-members:
:show-inheritance:
3 changes: 2 additions & 1 deletion doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
'sphinx.ext.doctest',
'sphinx.ext.intersphinx',
'sphinx.ext.ifconfig',
'sphinx.ext.viewcode',
]

# Add any paths that contain templates here, relative to this directory.
Expand All @@ -50,7 +51,7 @@

# General information about the project.
project = u'streamparse'
copyright = u'2014, Parsely'
copyright = u'2014-2015, Parsely'

# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
Expand Down
4 changes: 2 additions & 2 deletions doc/source/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ Storm version running on your cluster, then try resubmitting your topology.
:target-path "_build"
:min-lein-version "2.0.0"
:jvm-opts ["-client"]
:dependencies [[org.apache.storm/storm-core "0.9.1-incubating"] ;; this should match your Storm cluster
[com.parsely/streamparse "0.0.3-SNAPSHOT"]]
:dependencies [[org.apache.storm/storm-core "0.9.4"] ;; this should match your Storm cluster
[com.parsely/streamparse "0.0.4-SNAPSHOT"]]
:jar-exclusions [#"log4j\.properties" #"backtype" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
:uberjar-exclusions [#"log4j\.properties" #"backtype" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
)
2 changes: 1 addition & 1 deletion examples/drpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ In order to have the `sparse` shell command available system-wide.

# Install Leiningen

Follow (these instructions)[http://leiningen.org/] to install Leiningen > 2.0.
Follow [these instructions](http://leiningen.org/) to install Leiningen > 2.0.
On OSX you can also use homebrew to install leiningen

brew install leiningen
Expand Down
20 changes: 11 additions & 9 deletions examples/kafka-jvm/Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Vagrant.configure("2") do |config|
chef.add_recipe "supervisor"
chef.add_recipe "storm::singlenode"
chef.add_recipe "streamparse"
chef.add_recipe "kafka"
chef.add_recipe "apache_kafka"

chef.json = {
:java => {
Expand All @@ -42,20 +42,12 @@ Vagrant.configure("2") do |config|

:python => {
"min_version" => "2.7.5",
# "install_method" => "source"
},

:zookeeper => {
:client_port => "2181"
},

:kafka => {
# :version => "0.8.1.1",
# :download_url => "http://mirror.its.dal.ca/apache/kafka",
# :checksum => "123c72a6d7562db836c71362ae9c6bc21e9549cf20c97cccf9acef0a83118f74",
:broker_host_name => "streamparse-box",
},

:storm => {
:deploy => {
:user => "storm",
Expand All @@ -75,6 +67,16 @@ Vagrant.configure("2") do |config|
:ui => {
:childopts => "-Xmx128m"
}
},

:apache_kafka => {
:conf => {
:server => {
:entries => {
"advertised.host.name" => "streamparse-box"
}
}
}
}
}
end
Expand Down
4 changes: 2 additions & 2 deletions examples/kafka-jvm/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
:target-path "_build"
:min-lein-version "2.0.0"
; :jvm-opts ["-client"]
:dependencies [[org.apache.storm/storm-core "0.9.2-incubating"]
[org.apache.storm/storm-kafka "0.9.2-incubating" :exclusions [org.slf4j/slf4j-api]]
:dependencies [[org.apache.storm/storm-core "0.9.4"]
[org.apache.storm/storm-kafka "0.9.4" :exclusions [org.slf4j/slf4j-api]]
[org.apache.kafka/kafka_2.9.2 "0.8.1.1" :exclusions [com.sun.jmx/jmxri com.sun.jdmk/jmxtools javax.jms/jms org.slf4j/slf4j-api]]
[org.apache.zookeeper/zookeeper "3.4.6" :exclusions [io.netty/netty org.slf4j/slf4j-api org.slf4j/slf4j-log4j12]]
[com.parsely/streamparse "0.0.4-SNAPSHOT"]]
Expand Down
5 changes: 2 additions & 3 deletions examples/kafka-jvm/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# streamparse==1.0.1
-e git+https://github.com/Parsely/streamparse.git@feature/uberjar#egg=streamparse
kafka-python==0.9.2
streamparse==1.1.0
kafka-python==0.9.3
simplejson==3.6.3
3 changes: 1 addition & 2 deletions examples/kafka-jvm/virtualenvs/pixelcount.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
#streamparse==1.0.1
-e git+https://github.com/Parsely/streamparse.git@feature/uberjar#egg=streamparse
streamparse==1.1.0
simplejson==3.6.3
18 changes: 6 additions & 12 deletions jvm/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,13 @@
:url "https://raw.githubusercontent.com/Parsely/streamparse/master/LICENSE"
:distribution :repo}
:min-lein-version "2.0.0"
:dependencies [[commons-collections/commons-collections "3.2.1"]
;;[org.clojars.hsestupin/storm-spirit "0.1.0"] <-- depends on wrong version of Storm
:dependencies [
[org.clojure/clojure "1.5.1"]
[org.clojure/data.json "0.2.6"]
[org.apache.storm/storm-core "0.9.4"]
]
:profiles {:dev
{:dependencies [
[org.apache.storm/storm-core "0.9.2-incubating"]
[org.clojure/data.json "0.2.4"]
[org.testng/testng "6.8.5"]
[org.easytesting/fest-assert-core "2.0M8"]
[org.mockito/mockito-all "1.9.0"]
[org.jmock/jmock "2.6.0"]]
}}
:source-paths ["src"]
:test-paths ["test"]
:resource-paths ["resources"]
:aot :all)
:aot :all
)
4 changes: 2 additions & 2 deletions jvm/src/streamparse/commands/run.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
options
topology)
;; sleep for a few seconds to let the topology run locally
(Thread/sleep run-for-secs)
(Thread/sleep (if (= 0 run-for-secs) -1 run-for-secs))
;; shutdown the cluster
(.shutdown cluster))
(catch Exception e
Expand Down Expand Up @@ -58,7 +58,7 @@
(cli args
["-h" "--help" "Show this help screen." :flag true :default false]
["-d" "--debug" "Run with debug mode." :flag true :default false]
["-t" "--time" "Amount of seconds to run cluser before shutting down." :default 5 :parse-fn #(Integer/parseInt %)]
["-t" "--time" "Amount of seconds to run cluser before shutting down. If time <= 0, run indefinitely." :default 0 :parse-fn #(Integer/parseInt %)]
["-o" "--option" "Override topology config option."
:parse-fn -parse-topo-option :assoc-fn -assoc-topo-option])
]
Expand Down
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ jinja2
-e git+https://github.com/pashinin/fabric@p33#egg=fabric
invoke
six
requests
prettytable
setuptools
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python
"""
Copyright 2014 Parsely, Inc.
Copyright 2014-2015 Parsely, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
22 changes: 19 additions & 3 deletions streamparse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,33 @@

from __future__ import absolute_import, print_function, unicode_literals

import streamparse.storm
import streamparse.bolt
import streamparse.cmdln
import streamparse.component
import streamparse.contextmanagers
import streamparse.debug
import streamparse.decorators
from streamparse.storm import bolt, component, spout
import streamparse.dsl
import streamparse.spout
import streamparse.storm
from streamparse.version import __version__, VERSION

__all__ = [
'bolt',
'cmdln',
'component',
'contextmanagers',
'debug',
'decorators',
'dsl',
'spout',
'storm',
'__version__',
'VERSION',
]

__license__ = """
Copyright 2014 Parsely, Inc.
Copyright 2014-2015 Parsely, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
5 changes: 5 additions & 0 deletions streamparse/bolt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
Placeholder module to not break API
"""

from .storm.bolt import *
2 changes: 1 addition & 1 deletion streamparse/bootstrap/project/project.jinja2.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
:target-path "_build"
:min-lein-version "2.0.0"
:jvm-opts ["-client"]
:dependencies [[org.apache.storm/storm-core "0.9.2-incubating"]
:dependencies [[org.apache.storm/storm-core "0.9.4"]
[com.parsely/streamparse "0.0.4-SNAPSHOT"]
]
:jar-exclusions [#"log4j\.properties" #"backtype" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
Expand Down
23 changes: 21 additions & 2 deletions streamparse/cmdln.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from __future__ import absolute_import, print_function, unicode_literals

from docopt import docopt
from pkg_resources import parse_version

from .bootstrap import quickstart
from .ext.invoke import (list_topologies, kill_topology, run_local_topology,
submit_topology, tail_topology, visualize_topology)
submit_topology, tail_topology, visualize_topology,
display_stats, display_worker_uptime,
storm_lib_version)
from .version import __version__ as VERSION


Expand Down Expand Up @@ -34,6 +37,8 @@ def main():
sparse list [-e <env>] [-v]
sparse kill [-n <topology>] [-e <env>] [-v] [--wait <seconds>]
sparse tail [-e <env>] [-n <topology>] [--pattern <regex>]
sparse stats [-e <env>] [-n <topology>] [-c <component>|--all]
sparse worker-uptime [-e <env>]
sparse visualize [-n <topology>] [--flip]
sparse (-h | --help)
sparse --version
Expand All @@ -54,6 +59,8 @@ def main():
have only one topology defined in your
topologies/ directory, streamparse
will use it automatically.
-c --component <component> Topology component (bolt/spout) name as
specified in Clojure topology specification
-o --option <option>... Topology option to use upon submit, e.g.
"-o topology.debug=true" is equivalent to
"--debug". May be repeated for multiple
Expand All @@ -68,7 +75,8 @@ def main():
-w --workers <workers> Set number of Storm workers. Takes
precedence over --par if both set.
-t --time <time> Time (in seconds) to keep local cluster
running [default: 5].
running. If time <= 0, run indefinitely.
[default: 0].
--pattern <regex> Apply pattern to files for "tail"
subcommand.
--flip Flip the visualization to be horizontal.
Expand Down Expand Up @@ -109,6 +117,17 @@ def main():
wait=args["--wait"])
elif args["tail"]:
tail_topology(args["--name"], args["--environment"], args["--pattern"])
elif args["stats"] or args["worker-uptime"]:
storm_version = storm_lib_version()
if storm_version >= parse_version('0.9.2-incubating'):
if args["stats"]:
display_stats(args["--environment"], args.get("--name"),
args.get("--component"), args.get("--all"))
else:
display_worker_uptime(args["--environment"])
else:
print("ERROR: Storm {} does not support this command."
.format(storm_version))
elif args["visualize"]:
visualize_topology(args["--name"])

Expand Down
5 changes: 5 additions & 0 deletions streamparse/component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
Placeholder module to not break API
"""

from .storm.component import *
9 changes: 4 additions & 5 deletions streamparse/contextmanagers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ def _port_in_use(port, server_type="tcp"):
@contextmanager
def ssh_tunnel(user, host, local_port, remote_port):
if _port_in_use(local_port):
raise Exception("Local port: {} already in use, unable to open ssh "
"tunnel to {}:{}."
.format(local_port, host, remote_port))
raise IOError("Local port: {} already in use, unable to open ssh "
"tunnel to {}:{}.".format(local_port, host, remote_port))

ssh_cmd = ["ssh",
"{user}@{host}".format(user=user, host=host),
Expand All @@ -43,8 +42,8 @@ def ssh_tunnel(user, host, local_port, remote_port):
# Periodically check to see if the ssh command failed and returned a
# value, then raise an Exception
if ssh_proc.poll() is not None:
raise Exception('Unable to open ssh tunnel via: "{}"'
.format(" ".join(ssh_cmd)))
raise IOError('Unable to open ssh tunnel via: "{}"'
.format(" ".join(ssh_cmd)))
time.sleep(0.2)
try:
yield
Expand Down
2 changes: 1 addition & 1 deletion streamparse/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def get_avail_port(self, host, port, search_limit=100, skew=+0):
else:
return _sock, this_port
else:
raise Exception(NO_AVAILABLE_PORT.format(self=self))
raise IOError(NO_AVAILABLE_PORT.format(self=self))

def say(self, m):
print(m, file=self.out)
Expand Down

0 comments on commit e888e57

Please sign in to comment.