Skip to content

Commit

Permalink
Add more debugging info to invalid calls
Browse files Browse the repository at this point in the history
It's useful to know the method that was called instead
of just the line number where the call happened (which
can change over time).
  • Loading branch information
jtfmumm authored and slfritchie committed Nov 8, 2019
1 parent 5c2c39e commit 40fdf14
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 110 deletions.
54 changes: 27 additions & 27 deletions lib/wallaroo/core/autoscale/autoscale_phase.pony
Original file line number Diff line number Diff line change
Expand Up @@ -39,62 +39,62 @@ trait _AutoscalePhase
local_topology: LocalTopology, current_worker_count: USize,
response_fn: TryJoinResponseFn)
=>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref update_checkpoint_id(checkpoint_id: CheckpointId,
rollback_id: RollbackId)
=>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref grow_checkpoint_barrier_complete() =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref grow_autoscale_barrier_complete() =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref joining_worker_initialized(worker: WorkerName,
step_group_routing_ids: Map[RoutingId, RoutingId] val)
=>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref worker_connected_to_joining_workers(worker: WorkerName) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref stop_the_world_for_grow_migration_initiated(coordinator: WorkerName,
joining_workers: Array[WorkerName] val)
=>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref grow_migration_initiated(checkpoint_id: CheckpointId) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref all_migration_complete() =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref receive_grow_migration_ack(worker: WorkerName) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref worker_completed_migration(w: WorkerName) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref pre_register_joining_workers(ws: Array[WorkerName] val) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref receive_hash_partitions(hp: Map[RoutingId, HashPartitions] val) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref inform_of_producers_list(ps: SetIs[Producer] val) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref producer_acked_registering(p: Producer) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref inform_of_boundaries_map(bs: Map[WorkerName, OutgoingBoundary] val)
=>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref boundary_acked_registering(b: OutgoingBoundary) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref try_shrink(local_topology: LocalTopologyInitializer,
target_workers: Array[WorkerName] val, shrink_count: U64,
Expand All @@ -120,29 +120,29 @@ trait _AutoscalePhase
coordinator: WorkerName, remaining_workers: Array[WorkerName] val,
leaving_workers: Array[WorkerName] val)
=>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref shrink_checkpoint_barrier_complete() =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref shrink_autoscale_barrier_complete() =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref leaving_worker_finished_migration(worker: WorkerName) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref receive_leaving_migration_ack(worker: WorkerName) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref producers_disposed() =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref autoscale_complete() =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref _invalid_call() =>
@printf[I32]("Invalid call on autoscale phase %s\n".cstring(),
name().cstring())
fun ref _invalid_call(method_name: String) =>
@printf[I32]("Invalid call to %s on autoscale phase %s\n".cstring(),
method_name.cstring(), name().cstring())

class _EmptyAutoscalePhase is _AutoscalePhase
fun name(): String => "EmptyAutoscalePhase"
Expand Down
16 changes: 8 additions & 8 deletions lib/wallaroo/core/barrier/barrier_coordinator_phase.pony
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ trait _BarrierCoordinatorPhase
fun ref initiate_barrier(barrier_token: BarrierToken,
result_promise: BarrierResultPromise)
=>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref source_registration_complete(s: Source) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ready_for_next_token(): Bool =>
false
Expand All @@ -51,17 +51,17 @@ trait _BarrierCoordinatorPhase
end

fun ref ack_barrier(s: Sink, barrier_token: BarrierToken) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref worker_ack_barrier(w: WorkerName, barrier_token: BarrierToken) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref barrier_fully_acked(token: BarrierToken) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun _invalid_call() =>
@printf[I32]("Invalid call on barrier initiator phase %s\n".cstring(),
name().cstring())
fun _invalid_call(method_name: String) =>
@printf[I32]("Invalid call to %s on barrier initiator phase %s\n"
.cstring(), method_name.cstring(), name().cstring())

class _InitialBarrierCoordinatorPhase is _BarrierCoordinatorPhase
fun name(): String => "_InitialBarrierCoordinatorPhase"
Expand Down
4 changes: 2 additions & 2 deletions lib/wallaroo/core/checkpoint/checkpoint_initiator_phase.pony
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ trait _CheckpointInitiatorPhase
method_name.cstring(), name().cstring())
Fail()

fun _unexpected_call(call: String) =>
fun _unexpected_call(method_name: String) =>
"""
Only call this for phase methods that are called directly in response to
control messages received. That's because we can't be sure in that case if
Expand All @@ -111,7 +111,7 @@ trait _CheckpointInitiatorPhase
there are problems to be solved in order to do this safely.
"""
@printf[I32]("UNEXPECTED CALL to %s on checkpoint initiator phase %s. Ignoring!\n"
.cstring(), call.cstring(), name().cstring())
.cstring(), method_name.cstring(), name().cstring())

class _WaitingCheckpointInitiatorPhase is _CheckpointInitiatorPhase
fun name(): String => "_WaitingCheckpointInitiatorPhase"
Expand Down
16 changes: 8 additions & 8 deletions lib/wallaroo/core/common/failing_consumer_sender.pony
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,32 @@ class FailingConsumerSender is TestableConsumerSender
new create(producer_id': RoutingId) =>
_id = producer_id'

fun _invalid_call() =>
@printf[I32]("FailingConsumerSender: Invalid call on Producer %s\n"
.cstring(), _id.string().cstring())
fun _invalid_call(method_name: String) =>
@printf[I32]("FailingConsumerSender: Invalid call to %s on Producer %s\n"
.cstring(), method_name.cstring(), _id.string().cstring())

fun ref send[D: Any val](metric_name: String,
pipeline_time_spent: U64, data: D, key: Key, event_ts: U64,
watermark_ts: U64, msg_uid: MsgId, frac_ids: FractionalMessageId,
latest_ts: U64, metrics_id: U16, worker_ingress_ts: U64,
consumer: Consumer)
=>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref forward(delivery_msg: DeliveryMsg, pipeline_time_spent: U64,
latest_ts: U64, metrics_id: U16, metric_name: String,
worker_ingress_ts: U64, boundary: OutgoingBoundary)
=>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref register_producer(consumer_id: RoutingId, consumer: Consumer) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref unregister_producer(consumer_id: RoutingId, consumer: Consumer) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref update_output_watermark(w: U64): (U64, U64) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()
(0, 0)

fun producer_id(): RoutingId =>
Expand Down
12 changes: 6 additions & 6 deletions lib/wallaroo/core/data_receiver/data_receiver_phase.pony
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ trait _DataReceiverPhase
pipeline_time_spent: U64, seq_id: SeqId, latest_ts: U64, metrics_id: U16,
worker_ingress_ts: U64)
=>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref forward_barrier(input_id: RoutingId, output_id: RoutingId,
token: BarrierToken, seq_id: SeqId)
=>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref data_connect(highest_seq_id: SeqId) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun _invalid_call() =>
@printf[I32]("Invalid call on Data Receiver phase %s\n".cstring(),
name().cstring())
fun _invalid_call(method_name: String) =>
@printf[I32]("Invalid call to %s on Data Receiver phase %s\n".cstring(),
method_name.cstring(), name().cstring())

class _DataReceiverNotProcessingPhase is _DataReceiverPhase
fun name(): String => "_DataReceiverNotProcessingPhase"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,34 +56,32 @@ trait LocalTopologyInitializerPhase
else
// If this is not a recovering single worker cluster, then
// initialize has been called during the wrong phase.
_invalid_call()
Fail()
_invalid_call(__loc.method_name()); Fail()
end
else
// If worker_count is None, then we have not yet initialized the
// LocalTopology, which means this has been called during the wrong
// phase.
_invalid_call()
Fail()
_invalid_call(__loc.method_name()); Fail()
end

fun ref begin_reporting() =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref report_created(initializable: Initializable) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref report_initialized(initializable: Initializable) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref report_ready_to_work(initializable: Initializable) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref worker_report_ready_to_work(w: WorkerName) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref all_workers_ready_to_work() =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref report_event_log_ready_to_work() =>
// !TODO!: For now, this is partially handled by the
Expand All @@ -102,9 +100,9 @@ trait LocalTopologyInitializerPhase
=>
lti._cluster_status_query_not_initialized(conn)

fun _invalid_call() =>
@printf[I32]("Invalid call on local topology initializer phase %s\n"
.cstring(), name().cstring())
fun _invalid_call(method_name: String) =>
@printf[I32]("Invalid call to %s on local topology initializer phase %s\n"
.cstring(), method_name.cstring(), name().cstring())

class _ApplicationAwaitingInitializationPhase is LocalTopologyInitializerPhase
let _initializables: Initializables = Initializables
Expand Down
26 changes: 13 additions & 13 deletions lib/wallaroo/core/recovery/event_log_phase.pony
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ trait _EventLogPhase
@printf[I32]("checkpoint_state() for resilient %s, checkpoint_id %s\n"
.cstring(), resilient_id.string().cstring(),
checkpoint_id.string().cstring())
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref state_checkpointed(resilient_id: RoutingId) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref write_initial_checkpoint_id(checkpoint_id: CheckpointId) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref write_checkpoint_id(checkpoint_id: CheckpointId,
promise: Promise[CheckpointId])
Expand All @@ -67,28 +67,28 @@ trait _EventLogPhase
fun ref checkpoint_id_written(checkpoint_id: CheckpointId,
promise: Promise[CheckpointId])
=>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref expect_rollback_count(count: USize) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref ack_rollback(resilient_id: RoutingId) =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref complete_early() =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref check_completion() =>
_invalid_call(); Fail()
_invalid_call(__loc.method_name()); Fail()

fun ref dispose(event_log: EventLog ref) =>
event_log._dispose()

fun _invalid_call() =>
@printf[I32]("Invalid call on event log phase %s\n".cstring(),
name().cstring())
fun _invalid_call(method_name: String) =>
@printf[I32]("Invalid call to %s on event log phase %s\n".cstring(),
method_name.cstring(), name().cstring())

fun _unexpected_call(call: String) =>
fun _unexpected_call(method_name: String) =>
"""
Only call this for phase methods that are called directly in response to
control messages received. That's because we can't be sure in that case if
Expand All @@ -105,7 +105,7 @@ trait _EventLogPhase
there are problems to be solved in order to do this safely.
"""
@printf[I32]("UNEXPECTED CALL to %s on event log phase %s. Ignoring!\n"
.cstring(), call.cstring(), name().cstring())
.cstring(), method_name.cstring(), name().cstring())

class _InitialEventLogPhase is _EventLogPhase
fun name(): String => "_InitialEventLogPhase"
Expand Down
Loading

0 comments on commit 40fdf14

Please sign in to comment.