Be notified of new releases
Create your free GitHub account today to subscribe to this repository for new releases and build software alongside 28 million developers.Sign up
We are excited to announce the release of Wallaroo 0.6.0. The most significant change in this release is a complete overhaul of the Wallaroo API to make it cleaner, simpler, and more intuitive. As a result of these changes, this is a breaking release. We also want to thank Github users ChristianWitts and cristaloleg for their contributions to Wallaroo last month!
We would love to hear what you think of the new API and how you plan to use Wallaroo. Please reach out to us! We’re available on Twitter, IRC, GitHub, by email, our mailing list, or our subreddit. We love questions!
What is Wallaroo
Wallaroo is a modern, extensible framework that makes it simple to get stateful streaming data and event-driven applications to production fast, regardless of scale.
If you are interested in installing Wallaroo, our installation documentation provides the various ways you can get up and running.
Feel free to use the table of contents below to help you navigate to sections you might find relevant.
Table of Contents
- New Features and Improvements
Python 3 Support for Connectors
Connectors API Update
Streamlined Wallaroo Python API
- Converting to the New API
- Installing Wallaroo
- Upgrading Wallaroo
New Features and Improvements
Python 3 Support for Connectors
The Connectors API has been updated to work with Python 3.5 and up, and all of the example connectors have been tested against Python 3.5. Prior to this work Connectors would only work under Python 2.7.
Connectors API Update
We’ve made some changes to the connectors API when defining applications and pipelines to bring it more in line with how other built-in sources and sinks are defined. These changes only impact your
application_setup code and should not require code changes in the connector scripts.
An example application has been updated in this release and the documentation includes all relevant details if you’re getting started. You’re encouraged to keep reading the section on the streamlined API below as this is all relevant to how your application code should be updated. For quick reference, the new source and sink configuration constructors look like this:
source_config = wallaroo.experimental.SourceConnectorConfig( "source_name", encoder=source_encode_function, decoder=source_decode_function, port=7100) sink_config = wallaroo.experimental.SinkConnectorConfig( "sink_name", encoder=sink_encode_function, decoder=sink_decode_function, port=7200)
As before, the names must match what you pass to the connector scripts so the right data flows to each part. Ports are now assigned explicitly and should be unique for each connector. These configuration values can be passed into source and sink pipeline components respectively. Read on for more information on how to use the streamlined pipeline components.
Streamlined Wallaroo Python API
The original Wallaroo Python API has existed in roughly the same form since September 2017. Based on user feedback and continuous internal experimentation, we decided it was time to streamline the API both to create a better developer experience and to allow us to more easily add functionality to Wallaroo in the future. We’re going to describe the new API in isolation in this section. If you want to know how to convert from the old to the new API, see here.
Defining a Simple Wallaroo Application
A Wallaroo application includes one or more sources. You use
wallaroo.source(...) to define a stream originating from a source. Each source stream can be followed by one or more computation stages (we will describe how to define computations themselves later on). A linear sequence from a source through zero or more computations constitutes a partial pipeline. For example:
inputs = wallaroo.source("Source Name", source_config) partial_pipeline = inputs.to(my_computation)
This defines a partial pipeline that could be diagrammed as followed:
Source -> my_computation ->
The hanging arrow at the end of this diagram indicates that the pipeline is partial. We can still add more stages, and to complete the pipeline we need one or more sinks.
You create a complete pipeline by terminating a partial pipeline with a call to
to_sinks. For example:
inputs = wallaroo.source("Source Name", source_config) complete_pipeline = (inputs .to(my_computation) .to_sink(sink_config))
Our pipeline is now complete:
Source -> my_computation -> Sink
Unless a call to
to using a stateless computation is preceded by a call to
key_by (which partitions messages by key), there are no guarantees around the order in which messages will be processed. That's because Wallaroo might parallelize a stateless computation if that is beneficial for scaling. That means the execution graph for the above pipeline could look like this:
/-> my_stateless_computation -\ / \ Source ----> my_stateless_computation ----> Sink \ / \-> my_stateless_computation -/
Some messages will be routed to each of the parallel computation instances. When they merge again at the sink, these messages will be interleaved in a non-deterministic fashion.
Merging Partial Pipelines
You can merge two partial pipelines to form a new partial pipeline. For example:
inputs1 = wallaroo.source("Source 1", source_config) partial_pipeline1 = inputs1.to(computation1) inputs2 = wallaroo.source("Source 2", source_config) partial_pipeline2 = inputs2.to(computation2) partial_pipeline = inputs1.merge(inputs2)
The resulting partial pipeline could be
Source1 -> computation1 ->\ \ -> / Source2 -> computation2 ->/
Again, the hanging arrow indicates we can still add more stages, and that to complete the pipeline we still need one or more sinks. You could also merge this partial pipeline with additional partial pipelines. When you merge partial pipelines in this way, you are not creating a join in the sense familiar from SQL joins. Instead, you are combining two streams into one, with messages from the first stream interwoven with messages from the second. That combined stream is then passed to the next stage following the hanging arrow.
The following is an example of a complete pipeline including a merge where we first add one more computation before the sink:
pipeline = (inputs1.merge(inputs2) .to(computation3) .to_sink(sink_config))
The corresponding diagram for this definition would look like this:
Source1 -> computation1 ->\ \ -> computation3 -> Sink / Source2 -> computation2 ->/
Building an Application
Once you have defined a complete pipeline, you must pass it into
wallaroo.build_application(app_name, pipeline) in order to build the application object you must return from the
For a simple application with a decoder, computation, and encoder, the
application_setup function might look like
def application_setup(args): inputs = wallaroo.source("Source Name", source_config) pipeline = (inputs .to(computation) .to_sink(sink_config)) return wallaroo.build_application("Application Name", pipeline)
There are two types of computations that can be added to a Wallaroo pipeline: stateless and state computations. The API for stateless computation has not changed, so we will only discuss state computations here.
A state computation takes an input message and a state object, does some work which might involve updating that state, and then optionally returns an output that will be sent downstream. Here is an example of a simple state computation taken from our Word Count example:
class WordTotal(object): def __init__(self): self.count = 0 @wallaroo.state_computation(name="count word", state=WordTotal) def count_word(word, word_total): word_total.count = word_total.count + 1 return WordCount(word, word_total.count)
count_word function takes an input called
word and the state representing the running total called
word_total. We specify the associated state class by passing
WordTotal as the decorator argument
state. In this case, the function always returns an output
WordCount object (defined elsewhere).
To add this state computation to a Wallaroo pipeline, we simply add the following to our pipeline definition (see above for a description of how to define a complete pipeline):
If you want to partition this state by the word that is being counted, then you must define a key_extractor function:
@wallaroo.key_extractor def extract_word(word): return word
You must then precede the
to call for our state computation with a
key_by call using the key_extractor, as in the following example:
Converting to the New API
Converting an old Wallaroo application to the new API should be a relatively straightforward process. This section will take you through what is required.
Defining an Application Source
In the old API, applications were defined using an
ApplicationBuilder object. This is no longer a part of the API. Instead, you begin by defining a source and one or more stages immediately following the source. The old Word Count application began this way:
def application_setup(args): in_host, in_port = wallaroo.tcp_parse_input_addrs(args) out_host, out_port = wallaroo.tcp_parse_output_addrs(args) ab = wallaroo.ApplicationBuilder("Word Count Application") ab.new_pipeline("Split and Count", wallaroo.TCPSourceConfig(in_host, in_port, decoder))
The new version begins like this:
def application_setup(args): in_host, in_port = wallaroo.tcp_parse_input_addrs(args) out_host, out_port = wallaroo.tcp_parse_output_addrs(args) lines = wallaroo.source("Split and Count", wallaroo.TCPSourceConfig(in_host, in_port, decode_lines))
Both require an entry point function called
application_setup and some setup around the TCP source and sink addresses. In the old example, we create an
ApplicationBuilder and add a pipeline to it. In the new version, we forego the
ApplicationBuilder and instead define a source stream using
Adding a Stateless Computation Stage
For Word Count, the first computation splits the incoming lines into individual words. The definition of stateless computations themselves is the same across APIs, but what has changed is how they are added to the pipeline definition. In the old API, you had to choose between
to_parallel(), depending on whether the computation was going to be parallelized. However, one of the design goals of Wallaroo is to provide a scale-independent API that allows you to focus on your business logic. We decided that this choice between
to_parallel represented a conflation of concerns. In the new API, all computations are potentially parallelized, and there is only one corresponding call,
With the old API, we had:
ab = wallaroo.ApplicationBuilder("Word Count Application") ab.new_pipeline("Split and Count", wallaroo.TCPSourceConfig(in_host, in_port, decoder)) ab.to_parallel(split)
With the new:
lines = wallaroo.source("Split and Count", wallaroo.TCPSourceConfig(in_host, in_port, decode_lines)) pipeline = (lines .to(split))
This brings us to another difference. In the old API, each API call mutated the
ApplicationBuilder. In the new API, a call to
to() returns a new immutable pipeline object. You can chain many of these calls together to define a more complex pipeline.
Adding a State Computation Stage
The next stage in the Word Count application involves state: this is where we keep a running total for each word. In the old API, this looked like:
ab.to_state_partition(count_word, WordTotals, "word totals", partition, word_partitions)
You passed in the state computation, the corresponding state class, a string uniquely identifying that state, a partition function for determining how to partition state, and a list of initial partitions. With the new API, you would do the following instead:
First of all, notice that the partitioning by key is no longer conflated with the definition of the state computation stage itself. If you remove the
key_by call, then all messages will be sent to the same state (corresponding to the former
to_stateful call). Second, notice that you no longer assign the state a unique string and you no longer provide an initial list of partitions.
There are a couple of related differences. First, in the old API, you defined a partition function as follows:
@wallaroo.partition def partition(word): if word >= "a" and word <= "z": return word else: return "!"
In the old version, we used the first letter of the word as our partitioning key because if we had used words themselves as keys, Wallaroo would have created a new actor to handle every word. In the new version of Wallaroo, we use two levels of hashing when partitioning so that we can reduce the numbers of actors required to handle large sets of keys. The new version uses the following function instead:
@wallaroo.key_extractor def extract_word(word): return word
We have renamed the decorator from
key_extractor to more closely describe what the function is actually doing.
The last point we need to explain is the difference between the old and new state computation definition itself. Let’s take a look at the old version:
@wallaroo.state_computation(name="Count Word") def count_word(word, word_totals): word_totals.update(word) return (word_totals.get_count(word), True)
The state computation takes an input (here it’s
word) and our state object (here
word_totals). It then returns a tuple. The first element of the tuple is either an optional output or
None if there is no output. The second element is a Boolean telling Wallaroo whether state was updated. In the new API, we no longer return this Boolean. Instead, the state computation can either return an output or
None, and that’s it. Here’s the new example:
@wallaroo.state_computation(name="count word", state=WordTotal) def count_word(word, word_total): word_total.count = word_total.count + 1 return WordCount(word, word_total.count)
The other difference to note is that instead of passing the state class (
WordTotal) into the pipeline definition
to() call, we pass it in as an argument to the state computation decorator (
Applications with Multiple Sources
In the old API, defining an application involved adding one or more pipelines to an
ApplicationBuilder using the
new_pipeline call. These pipelines intersected by referring to the same state name string in a
to_state_partition call. So, for example, in our Market Spread example, we have two sources of data. The first is a stream of orders that are checked against market data and then potentially generate rejection alerts that are sent to a sink. The second is a stream of market data updates that are used to keep our market data state up to date. The definition of the application looks like this:
ab = wallaroo.ApplicationBuilder("market-spread") ab.new_pipeline( "Orders", wallaroo.TCPSourceConfig(order_host, order_port, order_decoder) ).to_state_partition( check_order, SymbolData, "symbol-data", symbol_partition_function, symbol_partitions ).to_sink(wallaroo.TCPSinkConfig(out_host, out_port, order_result_encoder) ).new_pipeline( "Market Data", wallaroo.TCPSourceConfig(nbbo_host, nbbo_port, market_data_decoder) ).to_state_partition( update_market_data, SymbolData, "symbol-data", symbol_partition_function, symbol_partitions ).done()
Because you passed
”symbol-data” as a state name to the
to_state_partition call in each pipeline definition, these pipelines would intersect there. However, this was error-prone and involved some unexpected behavior, the most notable of which is that the state partition would only use the config information provided in one of the two
With the new API, you would do the same thing as follows:
orders = wallaroo.source("Orders", wallaroo.TCPSourceConfig(order_host, order_port, decode_order)) market_data = wallaroo.source("Market Data", wallaroo.TCPSourceConfig(nbbo_host, nbbo_port, decode_market_data)) pipeline = (orders.merge(market_data) .key_by(extract_symbol) .to(check_market_data) .to_sink(wallaroo.TCPSinkConfig(out_host, out_port, encode_order_result)))
market_data source streams are first defined separately. Then they are merged together via the
merge() call (see above for more detail about how this works).
Building an Application
Finally, once we have defined an application, we must return it so that Wallaroo can transform it into an execution graph in a running cluster. The old way of doing this was via another call to the
Since we no longer use an
ApplicationBuilder, we do the following instead, passing the name of the application and the pipeline we defined into
return wallaroo.build_application("Word Count Application", pipeline)
If you have made no changes to Wallaroo or Pony since installation, your best bet will be to start from scratch, following the instructions of your choice.
Upgrading Wallaroo via Wallaroo Up
The normal Wallaroo Up installation instructions will install new versions next to existing versions.
Upgrading the Wallaroo Docker image
To upgrade the Wallaroo Docker image, run the following command to get the latest image. If you don't allow a non-root user to run Docker commands, you'll need to add
sudo to the front of the command.
docker pull wallaroo-labs-docker-wallaroolabs.bintray.io/release/wallaroo:0.6.0
Upgrading Wallaroo Source Code
If you mounted the Wallaroo source code to your local machine using the directory recommended in setup, in
/tmp/wallaroo-docker (UNIX & MacOS users) or
c:/wallaroo-docker (Windows users), then you will need to move the existing directory in order to get the latest source code. The latest Wallaroo source code will be copied to this directory automatically when a new container is started with the latest Docker image.
UNIX & MacOS Users
For UNIX users, you can move the directory with the following command:
mv /tmp/wallaroo-docker/wallaroo-src/ /tmp/wallaroo-docker/wallaroo-0.5.4-src/
For Windows users, you can move the directory with the following command:
move c:/wallaroo-docker/wallaroo-src/ c:/wallaroo-docker/wallaroo-0.5.4-src
Once done moving, you can re-create the
wallaroo-src directory with the following command:
Upgrading Wallaroo in Vagrant
To upgrade your Wallaroo installation in Vagrant, you’ll want to follow the latest installation instructions for Wallaroo in Vagrant.
If you have modified your old Vagrant VM in any way that you intend to persist, you should persist your changes now. For example, copy any edited or new files from the old Vagrant VM to the new one.
Upgrading Wallaroo when compiled from source
These instructions are for Ubuntu Linux. It's assumed that if you are using a different operating system then you are able to translate these instructions to your OS of choice.
Upgrading ponyc to 0.25.0
ponyc can be upgraded with the following command:
sudo apt-get install --only-upgrade ponyc=0.25.0
Verify you are now on the correct version of ponyc by running:
You should get the following output:
How to Upgrade Wallaroo
Once you're on the latest ponyc and pony stable, you're ready to switch over to Wallaroo 0.6.0.
If you have made prior changes to the Wallaroo code, you’ll need to re-implement those changes. To get the latest release, assuming that you previously installed to the directory we recommended in setup, you’ll need to run the following:
To get a new copy of the Wallaroo repository, run the following commands:
cd ~/wallaroo-tutorial/ curl -L -o wallaroo-0.6.0.tar.gz 'https://wallaroo-labs.bintray.com/wallaroolabs-ftp/wallaroo/0.6.0/wallaroo-0.6.0.tar.gz' mkdir wallaroo-0.6.0 tar -C wallaroo-0.6.0 --strip-components=1 -xzf wallaroo-0.6.0.tar.gz rm wallaroo-0.6.0.tar.gz cd wallaroo-0.6.0
You can then run the following commands to build the necessary tools to continue developing using Wallaroo 0.6.0:
cd ~/wallaroo-tutorial/wallaroo-0.6.0 make build-machida build-machida3 build-giles-all build-utils-cluster_shutdown
[0.6.0] - 2018-11-30
- Gradually back off when attempting to reconnect on data channel
- There is no longer a problem when using more workers than there are partitions
- No longer treat state computation stages as a special case. This results in fewer allocations and better performance
- Python 3 Support for Connectors
- Add parallel stateless steps to joining workers
- Streamlined Wallaroo Python API
- Connectors API Update
- Simplify the Python API for adding a computation to a pipeline