-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
=cls #17846 Use CRDTs instead of PersistentActor to remember the state of the ShardCoordinator #17871
Conversation
Can one of the repo owners verify this patch? |
@patriknw I have some questions about cluster sharding. I need to know the "underlying idea". Currently only two questions.
|
Hi @SmLin There are two things that are persisted.
Lets try to figure out the coordinator state first. It is possible to send message to replicator in postStop, but crash must also be handled and if the node is removed from the cluster the replicator might not be alive any more. |
@patriknw PR is not ready for review yet. So you could do not waste your time. I will notify to you when it would be ready. |
@SmLin My thinking around the design. I think the coordinator should remain a singleton, because it is important with a central decision maker of where the shards are to be allocated. We could start with just saving the coordinator state in a LWWRegister, using WriteMajority and ReadMajority. The LWWRegister is pretty safe here since the singleton failover is not instant, but we can later create a more customized data type that makes use of akka.cluster.Member.isOlderThan, we want to use the data from the newest singleton. That can be rather useful for singletons in general. That has the small consistency problems that I sketched in the issue, but perhaps I'm looking too much for the bad scenarios, and perhaps we can make it more safe somehow. The state of the I will be on vacation the next 4 weeks, so if I'm unresponsive here that is not because I'm not interested. Thanks for working on this. |
@patriknw Thank you too. |
case g @ GetSuccess(RegionsKey, _) => | ||
val regions: Map[ActorRef, Set[(ShardId, ActorRef)]] = g.get(RegionsKey).entries | ||
regions.foreach { case (region, shards) => | ||
// TODO: Would the Coordinator notified by the `Terminated` message if the region was terminated before `watch` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it will
0100945
to
b3f5a8d
Compare
…state of the ShardCoordinator akka#17871
import akka.cluster.ddata.Replicator.Update | ||
import akka.cluster.ddata.Replicator.UpdateSuccess | ||
import akka.cluster.ddata.Replicator.UpdateTimeout | ||
import akka.cluster.ddata.Replicator.WriteMajority |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use a wildcard import for these
Thanks a lot for this idea and PR, @SmLin! There are some aspects that I cannot currently think through in full, revolving around the semantics of shard allocation. Currently you send back the result immediately before knowing that the CRDT update has been disseminated. The old implementation makes sure that the allocation has been persisted before it can be handed out and acted upon, which ensures that two different regions cannot create the same shard when the coordinator’s machine fails and a new coordinator makes a different decision. The other point is related: the old implementation pulled shard location information lazily into the regions when needed, but CRDT replication would allow us to just listen to the same data at all regions, saving roundtrips to the coordinator. One thing I really like about this approach is that the CRDT’s lifetime is coupled to the lifetime of the cluster itself. Persistence means that allocations can outlive the whole cluster, but when starting from scratch we may as well just reallocate things instead. The only missing feature then is that instantiated entities will not automatically be resurrected, but my thinking goes in the direction of making that an optional add-on since not everybody needs that and then we would have completely separated the sharding from the persistence concerns. |
@rkuhn I have two ways. |
Yes, I agree that the first proposal is the way to go, optimizing the use of the CRDTs can and should a second step; please create a ticket so that we have it properly on the radar. |
# Settings for the coordinator singleton. Same layout as akka.cluster.singleton. | ||
|
||
# Timeout of waiting the initial distributed state (an initial state will be queried again if the timeout happened) | ||
waiting-for-state-timeout = 30 s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a more reasonable default is 5 s
@SmLin I took a first look. I think it is in the right direction. I appreciate the step-wise approach you take on this. Great work! To set the expectations right, we will probably have to support both this and the persistent sharding and we need to discuss how to package those. Anyway, don't worry about that now. Please continue in this PR, and separate the different steps with separate commits. |
805f285
to
0d4d45a
Compare
@rkuhn @patriknw I am stopped on consistency requirements to case of duplicated shards. To avoid such duplication a coordinator must ensure changes of state to a whole cluster (WriteAll in case of CRDTs) or synchronize state with all regions before moving to the active state. But in second step I will change the source of distributed state changes from a coordinator to the regions and shards. So after second step duplication will be avoided automatically "by design". |
I have to notice one case when avoiding the shards duplication is impossible. If cluster was fragmented (due to broken link between two datacenters or some other cases), then such situation would be end up with the two identical coordinators in the both clusters. |
"The ShardCoordinator was unable to update a distributed state with error {}. Was retrying with event {}.", | ||
error, | ||
evt) | ||
sendUpdate(evt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ModifyFailure
only happens if you modify function in the Update
message throws an exception, and that should never happen unless you have done a programming error. No point in trying to do it again. Will probably fail again. Instead you should just re-throw the cause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could it happens due to message/updates reordering? And would fixed after some time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, typical error would be things like NullPointerException
, i.e. it should never happen unless we have a bug somewhere and then we don't want to continue anyway
Am I right understand this mean that issue must not be solved in this PR? |
yes |
@patriknw It turns out that ddata realization (without straightforward message exchanges between coordinator, regions and shards) is tremendously slow since gossips have a timeout. So I see two variants:
Probability of the shards duplication even at the moments of cluster failures seems to be very small. I don't know exactly, but it possible that such failure could happen only if Currently I think it is complete solution. Any better solution required absolutely different idea and realization. Sorry, I have no a time to do such realization. |
@@ -155,6 +155,12 @@ object ShardRegion { | |||
@SerialVersionUID(1L) final case object GracefulShutdown extends ShardRegionCommand | |||
|
|||
/** | |||
* We must be sure that a shard is initialized before to start send messages to it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the reason for the changes in this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as mentioned elsewhere: Sometimes persistent shard could be failure during a replay process due to temporary journal unavailability. The user messages which sent to such shard just after creation, but before a failure would be lost. This code removes that problem.
@SmLin I agree that the solution in this PR is good and it is a great advantage that it is so similar to current persistent implementation, since we need to support both approaches in 2.4 (as described in my comment above). Would you be able to wrap up that part or would you like that I make those adjustments? |
I will try do it by myslef. Thank you for review. |
Thanks! |
To clarify my idea. We should make it possible to switch between the two implementations with a config setting. The dependency to distributed-data should be in scope provided, so the user must define the dependency and switch the config setting to use this implementation. Does that make sense? |
Yes, it make sense. Perhaps now it could be done also for akka-persistence dependency. |
@patriknw I wrote two implementations of the coordinator which depends on config parameter. I duplicated tests for ddata coordinator - currently I don't know how make it more elegantly. Possible you could help me. |
context.watch(region) | ||
sender() ! RegisterAck(self) | ||
region ! RegisterAck(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sender() and region is the same, right?
but now we use region here, but sender() a few lines above
@SmLin excellent separation! I have a few comments, but it looks like this can be merged soon. We are releasing on 2.4-RC1 on Friday so it would be great if we can merge it on Thursday at the latests. Could you also add a section in the docs of the alternative ddata mode:
Regarding the tests I think you could split them up into an abstract class and two implementations. The only difference should be the config (and some initial creation of actors). See example of similar split: |
case evt: ClusterDomainEvent ⇒ receiveClusterEvent(evt) | ||
case state: CurrentClusterState ⇒ receiveClusterState(state) | ||
case msg: CoordinatorMessage ⇒ receiveCoordinatorMessage(msg) | ||
case cmd: ShardRegionCommand ⇒ receiveCommand(cmd) | ||
case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender()) | ||
case msg: RestartShard ⇒ deliverMessage(msg, sender()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The RestartShard
message handling has not existed before - it is a bug fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great catch, it's scheduled by receiveTerminated
fc7c106
to
241f4d8
Compare
LGTM I will add a few notes about experimental and such in the docs, but I can take care of that. |
…state of the ShardCoordinator akka#17871
ec7785e
to
6814d08
Compare
@patriknw Squashed. |
thanks! |
=cls #17846 Use CRDTs instead of PersistentActor to remember the state of the ShardCoordinator
Refs #17846 |
Currently this is not real PR. Only to show my attempt to change the claster-sharding dependency from the akka-persistence to the akka-ddata. I would incrementally update it when found time for this .
Idea:
Welcome for comments!
It is very possible if I missing something in this concept. Correct me please.
And I am sorry for my English.