Skip to content

Commit

Permalink
Handle groupers correctly at sources
Browse files Browse the repository at this point in the history
  • Loading branch information
jtfmumm committed Nov 3, 2018
1 parent 537ea63 commit 0e83747
Show file tree
Hide file tree
Showing 21 changed files with 147 additions and 82 deletions.
11 changes: 6 additions & 5 deletions examples/python/market_spread/market_spread.py
Expand Up @@ -47,11 +47,10 @@ def application_setup(args):
out_host, out_port = wallaroo.tcp_parse_output_addrs(args)[0]

orders = wallaroo.source("Orders",
wallaroo.TCPSourceConfig(order_host, order_port, order_decoder))
wallaroo.TCPSourceConfig(order_host, order_port, order_decoder))

market_data = wallaroo.source("Market Data",
wallaroo.TCPSourceConfig(nbbo_host, nbbo_port,
market_data_decoder))
wallaroo.TCPSourceConfig(nbbo_host, nbbo_port, market_data_decoder))

pipeline = (orders.merge(market_data)
.key_by(extract_symbol)
Expand All @@ -72,13 +71,14 @@ def __init__(self, last_bid=0.0, last_offer=0.0, should_reject_trades=True):

@wallaroo.key_extractor
def extract_symbol(data):
print("!@ Extracting symbol " + data.symbol)
return data.symbol

@wallaroo.state_computation(name="Check Market Data", state=SymbolData)
def check_market_data(data, state):
if data.is_order:
if state.should_reject_trades:
print("!@Rejecting " + data.symbol + " order")
print("!@Rejecting " + data.symbol + " order with offer of " + str(state.last_offer))
ts = int(time.time() * 100000)
return OrderResult(data, state.last_bid, state.last_offer, ts)
print("!@Not rejecting " + data.symbol + " order")
Expand All @@ -94,7 +94,7 @@ def check_market_data(data, state):
if should_reject_trades:
print("!@Updating " + data.symbol + " as reject")
else:
print("!@Not updating " + data.symbol + " as reject")
print("!@Not updating " + data.symbol + " as reject: offer " + str(data.offer) + " last_offer now: " + str(state.last_offer))

state.should_reject_trades = should_reject_trades
return None
Expand Down Expand Up @@ -122,6 +122,7 @@ def __init__(self, symbol, transact_time, bid, offer):

@wallaroo.decoder(header_length=4, length_fmt=">I")
def order_decoder(bs):
print("!@ Decoding message at Source")
"""
0 - 1b - FixType (U8)
1 - 1b - side (U8)
Expand Down
9 changes: 6 additions & 3 deletions lib/wallaroo/core/grouping/grouping.pony
Expand Up @@ -23,18 +23,21 @@ use "wallaroo/core/topology"
use "wallaroo_labs/mort"


trait val GrouperBuilder
fun apply(): Grouper

trait Grouper
fun ref apply[D: Any val](d: D): Key

primitive OneToOneGroup
primitive OneToOneGroup is GrouperBuilder
fun apply(): OneToOneGrouper =>
OneToOneGrouper

class OneToOneGrouper is Grouper
fun ref apply[D: Any val](d: D): Key =>
"one-to-one-grouping-key"

primitive Shuffle
primitive Shuffle is GrouperBuilder
fun apply(): Shuffler =>
Shuffler

Expand All @@ -47,7 +50,7 @@ class Shuffler is Grouper
fun ref apply[D: Any val](d: D): Key =>
_rand.next().string()

trait val GroupByKey
trait val GroupByKey is GrouperBuilder
fun apply(): KeyGrouper

class val TypedGroupByKey[In: Any val] is GroupByKey
Expand Down
18 changes: 10 additions & 8 deletions lib/wallaroo/core/initialization/application_distributor.pony
Expand Up @@ -106,7 +106,7 @@ actor ApplicationDistributor is Distributor
let frontier = Array[U128]
let processed = SetIs[U128]
let edges = Map[U128, SetIs[U128]]
let groupers = Map[U128, (Shuffle | GroupByKey | None)]
let groupers = Map[U128, GrouperBuilder]

// Add Wallaroo sinks to intermediate graph
for sink_node in logical_graph.sinks() do
Expand Down Expand Up @@ -138,7 +138,7 @@ actor ApplicationDistributor is Distributor
end

while frontier.size() > 0 do
let next_id = frontier.pop()?
let next_id = frontier.shift()?
var node = logical_graph.get_node(next_id)?
if processed.contains(node.id) then continue end
match node.value
Expand All @@ -147,14 +147,14 @@ actor ApplicationDistributor is Distributor
if groupers.contains(node.id) then
groupers(node.id)?
else
None
OneToOneGroup
end

// We initially set this to Shuffle. If this is a stateless
// computation then we'll use Shuffle. If it's a state computation
// and there's no prior key_by, then we use direct routing, set
// later by using None.
var input_grouper: (GroupByKey | Shuffle | None) = Shuffle
var input_grouper: GrouperBuilder = Shuffle

// Create the StepBuilder for this stage. If the stage is a state
// computation, then we can simply create it. If the stage is
Expand All @@ -164,7 +164,7 @@ actor ApplicationDistributor is Distributor
if rb.is_stateful() then
// If this stage is preceded by a key_by, this will be
// ignored. If not, then this default will be used.
input_grouper = None
input_grouper = OneToOneGroup
StepBuilder(_app_name, _app_name, rb,
node.id, rb.routing_group(), grouper, rb.is_stateful())
else
Expand Down Expand Up @@ -278,8 +278,10 @@ actor ApplicationDistributor is Distributor
end

for i_node in node.ins() do
// !@ We're parallelizing all computations
groupers(i_node.id) = input_grouper
// !@ Does this check make sense, or should we override sometimes?
if not groupers.contains(i_node.id) then
groupers(i_node.id) = input_grouper
end

if not frontier.contains(i_node.id) and
not processed.contains(i_node.id)
Expand Down Expand Up @@ -329,7 +331,7 @@ actor ApplicationDistributor is Distributor
if groupers.contains(node.id) then
groupers(node.id)?
else
None
OneToOneGroup
end

let r_builder = RunnerSequenceBuilder(
Expand Down
4 changes: 3 additions & 1 deletion lib/wallaroo/core/initialization/local_topology.pony
Expand Up @@ -34,6 +34,7 @@ use "wallaroo/ent/recovery"
use "wallaroo/ent/router_registry"
use "wallaroo/ent/checkpoint"
use "wallaroo/core/data_channel"
use "wallaroo/core/grouping"
use "wallaroo/core/invariant"
use "wallaroo/core/messages"
use "wallaroo/core/metrics"
Expand Down Expand Up @@ -904,10 +905,11 @@ actor LocalTopologyInitializer is LayoutInitializer

// Set up SourceListener builders
let source_runner_builder = source_data.runner_builder()
let grouper = source_data.grouper()
let sl_builder_builder =
source_data.source_listener_builder_builder()
let sl_builder = sl_builder_builder(_worker_name,
pipeline_name, source_runner_builder, out_router,
pipeline_name, source_runner_builder, grouper, out_router,
_metrics_conn, consume source_reporter, _router_registry,
_outgoing_boundary_builders, _event_log, _auth, this,
_recovering)
Expand Down
Expand Up @@ -21,6 +21,7 @@ use "time"
use "wallaroo_labs/time"
use "wallaroo/core/boundary"
use "wallaroo/core/common"
use "wallaroo/core/grouping"
use "wallaroo/ent/data_receiver"
use "wallaroo/ent/recovery"
use "wallaroo_labs/mort"
Expand All @@ -47,9 +48,9 @@ class ConnectorSourceNotify[In: Any val]

new iso create(source_id: RoutingId, pipeline_name: String, env: Env,
auth: AmbientAuth, handler: FramedSourceHandler[In] val,
runner_builder: RunnerBuilder, router': Router,
metrics_reporter: MetricsReporter iso, event_log: EventLog,
target_router: Router)
runner_builder: RunnerBuilder, grouper_builder: GrouperBuilder,
router': Router, metrics_reporter: MetricsReporter iso,
event_log: EventLog, target_router: Router)
=>
_source_id = source_id
_pipeline_name = pipeline_name
Expand All @@ -58,7 +59,7 @@ class ConnectorSourceNotify[In: Any val]
_auth = auth
_handler = handler
_runner = runner_builder(event_log, auth, None,
target_router)
target_router, grouper_builder)
_router = router'
_metrics_reporter = consume metrics_reporter
_header_size = _handler.header_length()
Expand Down
Expand Up @@ -31,6 +31,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
use "collections"
use "wallaroo/core/boundary"
use "wallaroo/core/common"
use "wallaroo/core/grouping"
use "wallaroo/ent/data_receiver"
use "wallaroo/ent/recovery"
use "wallaroo/ent/router_registry"
Expand All @@ -52,6 +53,7 @@ actor ConnectorSourceListener[In: Any val] is SourceListener
let _worker_name: WorkerName
let _pipeline_name: String
let _runner_builder: RunnerBuilder
let _grouper_builder: GrouperBuilder
var _router: Router
let _metrics_conn: MetricsSink
let _metrics_reporter: MetricsReporter
Expand Down Expand Up @@ -79,7 +81,8 @@ actor ConnectorSourceListener[In: Any val] is SourceListener
let _available_sources: Array[ConnectorSource[In]] = _available_sources.create()

new create(env: Env, worker_name: WorkerName, pipeline_name: String,
runner_builder: RunnerBuilder, router: Router, metrics_conn: MetricsSink,
runner_builder: RunnerBuilder, grouper: GrouperBuilder, router: Router,
metrics_conn: MetricsSink,
metrics_reporter: MetricsReporter iso, router_registry: RouterRegistry,
outgoing_boundary_builders: Map[String, OutgoingBoundaryBuilder] val,
event_log: EventLog, auth: AmbientAuth,
Expand All @@ -96,6 +99,7 @@ actor ConnectorSourceListener[In: Any val] is SourceListener
_worker_name = worker_name
_pipeline_name = pipeline_name
_runner_builder = runner_builder
_grouper_builder = grouper
_router = router
_metrics_conn = metrics_conn
_metrics_reporter = consume metrics_reporter
Expand Down Expand Up @@ -134,7 +138,7 @@ actor ConnectorSourceListener[In: Any val] is SourceListener
for i in Range(0, _limit) do
let source_id = _routing_id_gen()
let notify = ConnectorSourceNotify[In](source_id, _pipeline_name,
_env, _auth, _handler, _runner_builder, _router,
_env, _auth, _handler, _runner_builder, _grouper_builder, _router,
_metrics_reporter.clone(), _event_log, _target_router)
let source = ConnectorSource[In](source_id, _auth, this,
consume notify, _event_log, _router,
Expand Down
Expand Up @@ -19,6 +19,7 @@ Copyright 2017 The Wallaroo Authors.
use "collections"
use "wallaroo/core/boundary"
use "wallaroo/core/common"
use "wallaroo/core/grouping"
use "wallaroo/ent/recovery"
use "wallaroo/ent/router_registry"
use "wallaroo/core/initialization"
Expand All @@ -31,6 +32,7 @@ class val ConnectorSourceListenerBuilder[In: Any val]
let _worker_name: WorkerName
let _pipeline_name: String
let _runner_builder: RunnerBuilder
let _grouper_builder: GrouperBuilder
let _router: Router
let _metrics_conn: MetricsSink
let _metrics_reporter: MetricsReporter
Expand All @@ -47,7 +49,8 @@ class val ConnectorSourceListenerBuilder[In: Any val]
let _service: String

new val create(worker_name: WorkerName, pipeline_name: String,
runner_builder: RunnerBuilder, router: Router, metrics_conn: MetricsSink,
runner_builder: RunnerBuilder, grouper: GrouperBuilder, router: Router,
metrics_conn: MetricsSink,
metrics_reporter: MetricsReporter iso, router_registry: RouterRegistry,
outgoing_boundary_builders: Map[String, OutgoingBoundaryBuilder] val,
event_log: EventLog, auth: AmbientAuth,
Expand All @@ -59,6 +62,7 @@ class val ConnectorSourceListenerBuilder[In: Any val]
_worker_name = worker_name
_pipeline_name = pipeline_name
_runner_builder = runner_builder
_grouper_builder = grouper
_router = router
_metrics_conn = metrics_conn
_metrics_reporter = consume metrics_reporter
Expand All @@ -75,7 +79,8 @@ class val ConnectorSourceListenerBuilder[In: Any val]
_service = service

fun apply(env: Env): SourceListener =>
ConnectorSourceListener[In](env, _worker_name, _pipeline_name, _runner_builder,
ConnectorSourceListener[In](env, _worker_name, _pipeline_name,
_runner_builder, _grouper_builder,
_router, _metrics_conn, _metrics_reporter.clone(), _router_registry,
_outgoing_boundary_builders, _event_log, _auth, _layout_initializer,
_recovering, _target_router, _parallelism, _handler, _host, _service)
Expand All @@ -95,15 +100,17 @@ class val ConnectorSourceListenerBuilderBuilder[In: Any val]
_handler = handler

fun apply(worker_name: WorkerName, pipeline_name: String,
runner_builder: RunnerBuilder, router: Router, metrics_conn: MetricsSink,
runner_builder: RunnerBuilder, grouper: GrouperBuilder, router: Router,
metrics_conn: MetricsSink,
metrics_reporter: MetricsReporter iso, router_registry: RouterRegistry,
outgoing_boundary_builders: Map[String, OutgoingBoundaryBuilder] val,
event_log: EventLog, auth: AmbientAuth,
layout_initializer: LayoutInitializer,
recovering: Bool, target_router: Router = EmptyRouter):
ConnectorSourceListenerBuilder[In]
=>
ConnectorSourceListenerBuilder[In](worker_name, pipeline_name, runner_builder,
ConnectorSourceListenerBuilder[In](worker_name, pipeline_name,
runner_builder, grouper,
router, metrics_conn, consume metrics_reporter, router_registry,
outgoing_boundary_builders, event_log, auth,
layout_initializer, recovering, target_router, _parallelism, _handler,
Expand Down
9 changes: 6 additions & 3 deletions lib/wallaroo/core/source/gen_source/gen_source.pony
Expand Up @@ -35,6 +35,7 @@ use "serialise"
use "time"
use "wallaroo/core/boundary"
use "wallaroo/core/common"
use "wallaroo/core/grouping"
use "wallaroo/core/initialization"
use "wallaroo/core/invariant"
use "wallaroo/core/metrics"
Expand Down Expand Up @@ -109,8 +110,9 @@ actor GenSource[V: Any val] is Source
let _msg_id_gen: MsgIdGenerator = MsgIdGenerator

new create(source_id: RoutingId, auth: AmbientAuth, pipeline_name: String,
runner_builder: RunnerBuilder, router': Router, target_router: Router,
generator: GenSourceGenerator[V], event_log: EventLog,
runner_builder: RunnerBuilder, grouper: GrouperBuilder, router': Router,
target_router: Router, generator: GenSourceGenerator[V],
event_log: EventLog,
outgoing_boundary_builders: Map[String, OutgoingBoundaryBuilder] val,
layout_initializer: LayoutInitializer,
metrics_reporter': MetricsReporter iso, router_registry: RouterRegistry)
Expand All @@ -130,7 +132,8 @@ actor GenSource[V: Any val] is Source
_layout_initializer = layout_initializer
_router_registry = router_registry

_runner = runner_builder(event_log, auth, None, target_router)
_runner = runner_builder(event_log, auth, None, target_router,
grouper)
_router = router'

for (target_worker_name, builder) in outgoing_boundary_builders.pairs() do
Expand Down
10 changes: 7 additions & 3 deletions lib/wallaroo/core/source/gen_source/gen_source_listener.pony
Expand Up @@ -33,6 +33,7 @@ use "collections"
use "crypto"
use "wallaroo/core/boundary"
use "wallaroo/core/common"
use "wallaroo/core/grouping"
use "wallaroo/ent/data_receiver"
use "wallaroo/ent/recovery"
use "wallaroo/ent/router_registry"
Expand Down Expand Up @@ -61,6 +62,7 @@ actor GenSourceListener[In: Any val] is SourceListener
let _worker_name: WorkerName
let _pipeline_name: String
let _runner_builder: RunnerBuilder
let _grouper_builder: GrouperBuilder
var _router: Router
let _metrics_conn: MetricsSink
let _metrics_reporter: MetricsReporter
Expand All @@ -75,7 +77,8 @@ actor GenSourceListener[In: Any val] is SourceListener
let _sources: Array[GenSource[In]] = _sources.create()

new create(env: Env, worker_name: WorkerName, pipeline_name: String,
runner_builder: RunnerBuilder, router: Router, metrics_conn: MetricsSink,
runner_builder: RunnerBuilder, grouper: GrouperBuilder, router: Router,
metrics_conn: MetricsSink,
metrics_reporter: MetricsReporter iso, router_registry: RouterRegistry,
outgoing_boundary_builders: Map[String, OutgoingBoundaryBuilder] val,
event_log: EventLog, auth: AmbientAuth,
Expand All @@ -87,6 +90,7 @@ actor GenSourceListener[In: Any val] is SourceListener
_worker_name = worker_name
_pipeline_name = pipeline_name
_runner_builder = runner_builder
_grouper_builder = grouper
_router = router
_metrics_conn = metrics_conn
_metrics_reporter = consume metrics_reporter
Expand Down Expand Up @@ -119,8 +123,8 @@ actor GenSourceListener[In: Any val] is SourceListener
let source_id = try rb.u128_le()? else Fail(); 0 end

let source = GenSource[In](source_id, _auth, _pipeline_name,
_runner_builder, _router, _target_router, _generator, _event_log,
_outgoing_boundary_builders, _layout_initializer,
_runner_builder, _grouper_builder, _router, _target_router, _generator,
_event_log, _outgoing_boundary_builders, _layout_initializer,
_metrics_reporter.clone(), _router_registry)

source.mute(this)
Expand Down

0 comments on commit 0e83747

Please sign in to comment.