Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of #5967 - Allow PersistentShardCoordinator to tolerate duplicate ShardHomeAllocated messages #5970

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ protected override bool ReceiveRecover(object message)
switch (evt)
{
case ShardRegionRegistered _:
State = State.Updated(evt);
State = State.Updated(evt, true);
return true;
case ShardRegionProxyRegistered _:
State = State.Updated(evt);
State = State.Updated(evt, true);
return true;
case ShardRegionTerminated regionTerminated:
if (State.Regions.ContainsKey(regionTerminated.Region))
State = State.Updated(evt);
State = State.Updated(evt, true);
else
//Log.Debug(
// "{0}: ShardRegionTerminated, but region {1} was not registered. This inconsistency is due to that " +
Expand All @@ -107,13 +107,13 @@ protected override bool ReceiveRecover(object message)
return true;
case ShardRegionProxyTerminated proxyTerminated:
if (State.RegionProxies.Contains(proxyTerminated.RegionProxy))
State = State.Updated(evt);
State = State.Updated(evt, true);
return true;
case ShardHomeAllocated _:
State = State.Updated(evt);
State = State.Updated(evt, true);
return true;
case ShardHomeDeallocated _:
State = State.Updated(evt);
State = State.Updated(evt, true);
return true;
case ShardCoordinatorInitialized _:
// not used here
Expand Down
34 changes: 28 additions & 6 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1144,12 +1144,15 @@ public override string ToString()
}

/// <summary>
/// TBD
/// Feed an event into the ShardCoordinator state.
/// </summary>
/// <param name="e">TBD</param>
/// <exception cref="ArgumentException">TBD</exception>
/// <returns>TBD</returns>
public CoordinatorState Updated(IDomainEvent e)
/// <param name="e">The event to process.</param>
/// <param name="isRecovering">A flag to indicate if we're trying to recover state previously stored in the database.
/// We need to be more tolerant when this happens in the name of trying to accelerate recovery, so the system doesn't compromise
/// itself and go offline.</param>
/// <exception cref="ArgumentException">Thrown if an event is illegal in the current state.</exception>
/// <returns>An update copy of this state.</returns>
public CoordinatorState Updated(IDomainEvent e, bool isRecovering = false)
{
switch (e)
{
Expand Down Expand Up @@ -1188,7 +1191,26 @@ public CoordinatorState Updated(IDomainEvent e)
if (!Regions.TryGetValue(message.Region, out var shardRegions))
throw new ArgumentException($"Region {message.Region} not registered: {this}", nameof(e));
if (Shards.ContainsKey(message.Shard))
throw new ArgumentException($"Shard {message.Shard} already allocated: {this}", nameof(e));
{
if(!isRecovering)
throw new ArgumentException($"Shard {message.Shard} already allocated: {this}",
nameof(e));

// per https://github.com/akkadotnet/akka.net/issues/5604
// we're going to allow new value to overwrite previous
var newRegions = Regions;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple questions: 1) Is this safe all the times?; 2) Will there be a way to disable this new behavior?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Not sure - the alternative is defaulting the current behavior, which is that the ShardCoordinator blows up and the cluster needs to be restarted and all of the ShardCoordinator data deleted. Given that, I think this probably is safer than the current defaults. The worst case scenario I can imagine here is that the Shard was actually allocated to two different nodes, but that would if that were the case then the sharding system would already be in very bad shape (i.e. violating its consistency rules) and this would allow the newest home for the shard to supersede the old one, which would stop receiving message traffic. In the logs where I saw this occuring it was duplicate records for the same Shard in the same ShardRegion IActorRef - so this change would just make the recovery idempotent.

  2. We could add it, but I don't think it should be necessary.

var previousRegion = Shards[message.Shard];
if (Regions.TryGetValue(previousRegion, out var previousShards))
{
newRegions = newRegions.SetItem(previousRegion,
previousShards.Remove(message.Shard));
}
var newUnallocatedShardsRecovery = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards;
return Copy(
shards: Shards.SetItem(message.Shard, message.Region),
regions: newRegions.SetItem(message.Region, shardRegions.Add(message.Shard)),
unallocatedShards: newUnallocatedShardsRecovery);
}

var newUnallocatedShards = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards;
return Copy(
Expand Down