New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reliable delivery in Typed, #20984 #28155
Conversation
Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Through a first skim, looks very cool, but have lots of questions :)
producer: ActorRef[ProducerController.InternalCommand], | ||
start: Start[A], | ||
firstSeqNr: Long): Behavior[InternalCommand] = { | ||
val requestedSeqNr = firstSeqNr - 1 + RequestWindow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we'd want to watch to watch the consumer as well and tie the lifecycle of the CC to the consumer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, also restart of consumer is a fixme
checkProducerId(producerId, pid, seqNr) | ||
val expectedSeqNr = s.receivedSeqNr + 1 | ||
if (seqNr == expectedSeqNr || (first && seqNr >= expectedSeqNr) || (first && seqMsg.producer != s.producer)) { | ||
logIfChangingProducer(s.producer, seqMsg, pid, seqNr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the scenario when we get a new producer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe mostly when we introduce the durable producer, which may crash and then continue elsewhere.
Also, for the Sharding use case there is a loose coupling between PC and CC which could allow PC to be stopped and started again (possible elsewhere).
context.log.infoN("from producer [{}], missing [{}], received [{}]", pid, expectedSeqNr, seqNr) | ||
if (resendLost) { | ||
seqMsg.producer ! Resend(fromSeqNr = expectedSeqNr) | ||
resending(s.copy(producer = seqMsg.producer)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we restart from there and drop the current message if resendLost
is enabled, probably good to have different logging for the two scenarios, and maybe at warning
for when not resending.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no time/size window here before sending the resending
message?
I more prefer to change the behavior to some kind of observing
before that
} else { | ||
deliverTo ! Delivery(pid, seqNr, msg, context.self) | ||
waitingForConfirmation(s.copy(producer = seqMsg.producer, receivedSeqNr = seqNr), first) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there perhaps be 3 options, resend, drop and stop/fail on detected gap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better a strategy?
Behaviors.unhandled | ||
|
||
case Start(_) => | ||
Behaviors.unhandled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably good to log warn here as well, since it would only happen when using the API incorrectly
* to ignore lost messages, and then the `ProducerController` will not buffer unconfirmed messages. | ||
* In that mode it provides only flow control but no reliable delivery. | ||
*/ | ||
object ProducerController { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way for the producer to signal that it is done/reached the end of the messages it wants to send etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, good idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing my mind, better to have that as an application level end message. Because it needs the same delivery mechanism and the application consumer must be able to understand it. When it has been received the application consumer can tear things down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so that means the CC stopping on consumer termination?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that is implemented
on producer side I think the producer would have to wait for ack corresponding to that last message before stopping the PC
|
||
final case class RequestNextParam[A](currentSeqNr: Long, confirmedSeqNr: Long, sendNextTo: ActorRef[A]) | ||
|
||
final case class MessageWithConfirmation[A](message: A, replyTo: ActorRef[Long]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps dont take a boxed-primitive as reply message but a wrapping type?
So the idea is that the message is A
but the response is the sequence number it got? How will a producer correlate those two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The primary purpose is to use this with ask
from the P. The reply (seqNr: Long
) is sent back when all is confirmed for that message: A
. Having the seqNr as the reply might not be important, could be Done
. I exposed seqNr to both P and C so far.
final case class RegisterConsumer[A]( | ||
consumerController: ActorRef[ConsumerController.Command[A]], | ||
replyTo: ActorRef[Done]) | ||
extends Command[A] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just here for testability?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, needed for dynamic registration/replacement of CC, for example when using sharding
hasRequested: Boolean) | ||
|
||
// TODO Now the workers have to be registered explicitly/manually. | ||
// We could support automatic registration via Receptionist, similar to how routers work. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was just going to ask if you had thought about how to find the remote consumer, for this case it makes sense, one producer many consumers/workers. But for a 1:1 reliable delivery between cluster nodes that is potentially short lived I don't think the receptionist is a great fit (maybe I just made that use case up though, not sure) .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, Receptionist shouldn't be used for short lived.
The point-to-point scenario currently requires some application specific way to exchange CC/PC ActorRef to connect them.
State(Map.empty, hasRequested = false)) | ||
} | ||
.narrow | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very cool, kind of pre-baking much of the distributed workers sample.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, was fun to write
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is really nice, i hadn't quite got my head around how this was going to work with the P2P API but looks good
with a sequential seqNr or syncId? |
Test PASSed. |
Test FAILed. |
Test PASSed. |
Test PASSed. |
Test PASSed. |
Ready for final approval. Let me squash this manually and craft the commit message before merging. |
while waiting for the confirmation are stashed by the `ConsumerController` and delivered when the previous | ||
message is confirmed. | ||
|
||
The consumer and the `ConsumerController` actors are supposed to be local so that these messages are fast |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The consumer and the `ConsumerController` actors are supposed to be local so that these messages are fast | |
The consumer and the `ConsumerController` actors are guaranteed to be local so that these messages are fast |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That doesn't seem right, it's a recommendation for the user and not an aspect built into the reliable delivery actors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's enforced by a check. I'll reformulate this, but "guaranteed" is not the right word because user can do the wrong thing (but will then notice at runtime)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clarified in 64178fa
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java samples mostly looking good, two small things noted.
akka-cluster-sharding-typed/src/test/java/jdocs/delivery/ShardingDocExample.java
Outdated
Show resolved
Hide resolved
akka-cluster-sharding-typed/src/test/java/jdocs/delivery/ShardingDocExample.java
Outdated
Show resolved
Hide resolved
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, let's merge this behemoth!
thanks, I'll squash |
Different approach than in classic AtLeastOnceDelivery because I would like: * support flow control, with a work pulling approach * be possible to use with or without persistence (without it may loose messages if producer node crashes) * detect lost messages on the consumer side and let that drive resends, instead of aggressively resending from producer side * deliver messages in order and deduplicate resent messages * have an efficient protocol for acknowledgments over the network (not ack each message), but still have a simple one-by-one protocol for the end user * support 3 use cases (building blocks) * point-to-point * work pulling * sharding * optional durable queue, with one event sourced implementation * protobuf serialization * ApiMayChange * reference docs and examples * api docs * doc example code missing so far
dba92f3
to
1d16e84
Compare
Test PASSed. |
I count @chbatey 's previous reviews as an approval |
val producerController = | ||
context.spawn(ShardingProducerController(producerId, region, durableQueueBehavior = None), "producerController") | ||
|
||
context.spawn(TodoService(producerController), "producer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if you could also include an example of actually calling UpdateTodo in the TodoService? What would that look like? In particular, what would the entityRef look like?
The below doesn't work because I get an exception.
EntityTypeKey<TodoService.Command> entityTypeKey1 =
EntityTypeKey.create(TodoService.Command.class, "todo");
EntityRef<TodoService.Command> entityRef1 =
ClusterSharding.get(system).entityRefFor(entityTypeKey1, "123");
CompletionStage<Response> r = entityRef1.ask(replyTo -> new UpdateTodo("123", "tire", false, replyTo),
Duration.ofSeconds(3));
java.lang.ClassCastException: com.agapic.TodoService$UpdateTodo cannot be cast to akka.actor.typed.delivery.ConsumerController$Command
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try "todoService" in the EntityTypeKey instead, because I think the example is already using "todo" for the TodoList
and here you are trying to interact with the TodoService
.
However, I think you misunderstand how it is intended to be used. You try to use it with two levels of Sharding, one for the TodoList and one for the TodoService. The docs say " A single ShardingProducerController per ActorSystem (node) can be shared for sending to all entities of a certain entity type." https://doc.akka.io/docs/akka/current/typed/reliable-delivery.html#sharding
If you have more questions I'd recommend switching over to https://discuss.akka.io/
Approach
I'm taking a rather different approach than in classic
AtLeastOnceDelivery
because I would like:I have thought of 3 separate usage cases:
Those use cases are illustrated in the tests in
ReliableDeliverySpec
,WorkPullingSpec
,ReliableDeliveryShardingSpec
.Point-to-point
ProducerController and ConsumerController are point-to-point with single producer and single consumer.
The ScalaDoc in
ProducerController
andConsumerController
describes how it works and the message protocol.Work Pulling
ConsumerController (CC-1) is started for worker-1, and registered to the Receptionist.
The WorkPullingProducerController receives the Listing from the Receptionist and spawns a ProducerController (PC-1) for CC-1. The PC-1/CC-1 pair is used for delivery of messages selected for worker-1.
Another worker-2 is started and registered. Corresponding ProducerController (PC-2) for CC-2. The PC-1/CC-1 pair is used for delivery of messages selected for worker-1.
The WorkPullingProducerController randomly selects worker for each message (job) among the workers with demand (according to the PC/CC pair request window).
Sharding
A ShardingProducerController is started for each producer, typically one per node. Messages to any entity can be sent via this ShardingProducerController. For the first message for an entity it spawns a ProducerController (PC-entity-1), which is used for delivery of messages to that specific entity.
Sharding creates the ShardingConsumerController for the first message to that entity, which spawns the application's entity Behavior.
A ConsumerController (CC-a) is also spawned and is working in pair with the ProducerController (PC-entity-1).
Message for another entity means that the ShardingProducerController will spawn another ProducerController (PC-2) for that entity.
There can be many producers, typically one per node. The ShardingConsumerController will spawn a new ConsumerController for each producer.
TODO:
Refs #20984