Skip to content

Commit

Permalink
On rollback, Boundary must reset _lowest_queue_id
Browse files Browse the repository at this point in the history
Fixes #3056
  • Loading branch information
slfritchie authored and jtfmumm committed Nov 22, 2019
1 parent fcea2e3 commit 8b7c650
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
12 changes: 8 additions & 4 deletions lib/wallaroo/core/boundary/boundary.pony
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ use "wallaroo/core/routing"
use "wallaroo/core/tcp_actor"
use "wallaroo/core/topology"
use "wallaroo_labs/bytes"
use "wallaroo_labs/logging"
use "wallaroo_labs/mort"
use "wallaroo_labs/time"

use @l[I32](severity: LogSeverity, category: LogCategory, fmt: Pointer[U8] tag,...)

class val OutgoingBoundaryBuilder
let _auth: AmbientAuth
Expand Down Expand Up @@ -351,7 +353,11 @@ actor OutgoingBoundary is (Consumer & TCPActor)
_maybe_mute_or_unmute_upstreams()

fun ref receive_ack(acked_seq_id: SeqId) =>
@l(Log.debug(), Log.boundary(), "worker %s target_worker %s acked_seq_id %lu > _lowest_queue_id %lu\n".cstring(), _worker_name.cstring(), _target_worker.cstring(), acked_seq_id, _lowest_queue_id)
ifdef debug then
if not (acked_seq_id > _lowest_queue_id) then
@printf[I32]("not (acked_seq_id %lu > _lowest_queue_id %lu)\n".cstring(), acked_seq_id, _lowest_queue_id)
end
Invariant(acked_seq_id > _lowest_queue_id)
end

Expand Down Expand Up @@ -541,10 +547,7 @@ actor OutgoingBoundary is (Consumer & TCPActor)
be rollback(payload: ByteSeq val, event_log: EventLog,
checkpoint_id: CheckpointId)
=>
"""
There is nothing for a Boundary to rollback to.
"""
None
_lowest_queue_id = 0

be update_worker_data_service(worker: WorkerName,
host: String, service: String)
Expand Down Expand Up @@ -660,6 +663,7 @@ actor OutgoingBoundary is (Consumer & TCPActor)
@printf[I32]("Received StartNormalDataSendingMsg at Boundary\n"
.cstring())
end
@l(Log.debug(), Log.boundary(), "received: worker %s target_worker %s sn.last_id_seen %lu\n".cstring(), _worker_name.cstring(), _target_worker.cstring(), sn.last_id_seen)
receive_connect_ack(sn.last_id_seen)
start_normal_sending()
| let aw: AckDataReceivedMsg =>
Expand Down
2 changes: 2 additions & 0 deletions lib/wallaroo_labs/logging/wallaroo-logging.pony
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ primitive Log
fun global_stream_registry(): LogCategory => LogCategory(9).shl(8)
fun local_stream_registry(): LogCategory => LogCategory(10).shl(8)
fun routing(): LogCategory => LogCategory(11).shl(8)
fun boundary(): LogCategory => LogCategory(12).shl(8)
fun max_category(): LogCategory => LogCategory(40).shl(8)
fun manual_category(n: U16): LogCategory => LogCategory(n).shl(8)

Expand Down Expand Up @@ -96,6 +97,7 @@ primitive Log
(default_severity(), global_stream_registry(), "GlobalStreamRegistry")
(default_severity(), local_stream_registry(), "LocalStreamRegistry")
(default_severity(), routing(), "Routing")
(default_severity(), boundary(), "Boundary")
] // END category_map

fun set_defaults() =>
Expand Down

0 comments on commit 8b7c650

Please sign in to comment.