From 8b7c65027b8467681f8ad8cc65c049505b4c55c9 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Mon, 18 Nov 2019 15:27:35 -0600 Subject: [PATCH] On rollback, Boundary must reset _lowest_queue_id Fixes #3056 --- lib/wallaroo/core/boundary/boundary.pony | 12 ++++++++---- lib/wallaroo_labs/logging/wallaroo-logging.pony | 2 ++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/wallaroo/core/boundary/boundary.pony b/lib/wallaroo/core/boundary/boundary.pony index c0ff8eb91b..9950a5addc 100644 --- a/lib/wallaroo/core/boundary/boundary.pony +++ b/lib/wallaroo/core/boundary/boundary.pony @@ -34,9 +34,11 @@ use "wallaroo/core/routing" use "wallaroo/core/tcp_actor" use "wallaroo/core/topology" use "wallaroo_labs/bytes" +use "wallaroo_labs/logging" use "wallaroo_labs/mort" use "wallaroo_labs/time" +use @l[I32](severity: LogSeverity, category: LogCategory, fmt: Pointer[U8] tag,...) class val OutgoingBoundaryBuilder let _auth: AmbientAuth @@ -351,7 +353,11 @@ actor OutgoingBoundary is (Consumer & TCPActor) _maybe_mute_or_unmute_upstreams() fun ref receive_ack(acked_seq_id: SeqId) => + @l(Log.debug(), Log.boundary(), "worker %s target_worker %s acked_seq_id %lu > _lowest_queue_id %lu\n".cstring(), _worker_name.cstring(), _target_worker.cstring(), acked_seq_id, _lowest_queue_id) ifdef debug then + if not (acked_seq_id > _lowest_queue_id) then + @printf[I32]("not (acked_seq_id %lu > _lowest_queue_id %lu)\n".cstring(), acked_seq_id, _lowest_queue_id) + end Invariant(acked_seq_id > _lowest_queue_id) end @@ -541,10 +547,7 @@ actor OutgoingBoundary is (Consumer & TCPActor) be rollback(payload: ByteSeq val, event_log: EventLog, checkpoint_id: CheckpointId) => - """ - There is nothing for a Boundary to rollback to. - """ - None + _lowest_queue_id = 0 be update_worker_data_service(worker: WorkerName, host: String, service: String) @@ -660,6 +663,7 @@ actor OutgoingBoundary is (Consumer & TCPActor) @printf[I32]("Received StartNormalDataSendingMsg at Boundary\n" .cstring()) end + @l(Log.debug(), Log.boundary(), "received: worker %s target_worker %s sn.last_id_seen %lu\n".cstring(), _worker_name.cstring(), _target_worker.cstring(), sn.last_id_seen) receive_connect_ack(sn.last_id_seen) start_normal_sending() | let aw: AckDataReceivedMsg => diff --git a/lib/wallaroo_labs/logging/wallaroo-logging.pony b/lib/wallaroo_labs/logging/wallaroo-logging.pony index f367d2c29a..5b0c907bdb 100644 --- a/lib/wallaroo_labs/logging/wallaroo-logging.pony +++ b/lib/wallaroo_labs/logging/wallaroo-logging.pony @@ -66,6 +66,7 @@ primitive Log fun global_stream_registry(): LogCategory => LogCategory(9).shl(8) fun local_stream_registry(): LogCategory => LogCategory(10).shl(8) fun routing(): LogCategory => LogCategory(11).shl(8) + fun boundary(): LogCategory => LogCategory(12).shl(8) fun max_category(): LogCategory => LogCategory(40).shl(8) fun manual_category(n: U16): LogCategory => LogCategory(n).shl(8) @@ -96,6 +97,7 @@ primitive Log (default_severity(), global_stream_registry(), "GlobalStreamRegistry") (default_severity(), local_stream_registry(), "LocalStreamRegistry") (default_severity(), routing(), "Routing") + (default_severity(), boundary(), "Boundary") ] // END category_map fun set_defaults() =>