Skip to content

Commit

Permalink
Fixed DDataShardCoordinator NullReferenceException (#6892)
Browse files Browse the repository at this point in the history
* fixed NRE bug with DDataShardCoordinator

* Revert "Bump Verify.Xunit from 20.8.0 to 20.8.1 (#6890)"

This reverts commit 0a66f94.

* Revert "Revert "Bump Verify.Xunit from 20.8.0 to 20.8.1 (#6890)""

This reverts commit f63b24a.
  • Loading branch information
Aaronontheweb committed Aug 24, 2023
1 parent 8b2ded4 commit 0c49fdb
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 25 deletions.
33 changes: 17 additions & 16 deletions src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

#nullable enable
using System;
using System.Collections.Immutable;
using System.Linq;
Expand Down Expand Up @@ -86,11 +87,11 @@ private RememberEntitiesLoadTimeout()
private readonly LWWRegisterKey<ShardCoordinator.CoordinatorState> _coordinatorStateKey;
private ImmutableHashSet<(IActorRef, GetShardHome)> _getShardHomeRequests = ImmutableHashSet<(IActorRef, GetShardHome)>.Empty;
private int _initialStateRetries = 0;
private readonly IActorRef _rememberEntitiesStore;
private readonly IActorRef? _rememberEntitiesStore;
private readonly bool _rememberEntities;

public ITimerScheduler Timers { get; set; }
public IStash Stash { get; set; }
public ITimerScheduler Timers { get; set; } = null!;
public IStash Stash { get; set; } = null!;

private string TypeName => _baseImpl.TypeName;
private ClusterShardingSettings Settings => _baseImpl.Settings;
Expand All @@ -103,7 +104,7 @@ private RememberEntitiesLoadTimeout()
IShardAllocationStrategy allocationStrategy,
IActorRef replicator,
int majorityMinCap,
IRememberEntitiesProvider rememberEntitiesStoreProvider)
IRememberEntitiesProvider? rememberEntitiesStoreProvider)
{
_replicator = replicator;
var log = Context.GetLogger();
Expand Down Expand Up @@ -162,7 +163,9 @@ protected override bool Receive(object message)
/// <returns></returns>
private Receive WaitingForInitialState(IImmutableSet<ShardId> rememberedShards)
{
bool Receive(object message)
return ReceiveDelegate;

bool ReceiveDelegate(object message)
{
switch (message)
{
Expand All @@ -181,7 +184,7 @@ bool Receive(object message)
case GetFailure m when m.Key.Equals(_coordinatorStateKey):
_initialStateRetries++;
var template =
"{0}: The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout': {1} millis (retrying). Has ClusterSharding been started on all nodes?";
"{0}: The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout': {1} millis (retrying). Has ClusterSharding been started on all nodes?";
if (_initialStateRetries == 1)
Log.Info(template, TypeName, _stateReadConsistency.Timeout.TotalMilliseconds);
else if (_initialStateRetries < 5)
Expand Down Expand Up @@ -230,8 +233,6 @@ bool Receive(object message)
}
return ReceiveTerminated(message);
}

return Receive;
}

private void OnInitialState(CoordinatorState loadedState, IImmutableSet<ShardId> rememberedShards)
Expand Down Expand Up @@ -321,13 +322,15 @@ private bool WaitingForStateInitialized(object message)
/// <returns></returns>
private Receive WaitingForUpdate<TEvent>(
TEvent evt,
ShardId shardId,
ShardId? shardId,
bool waitingForStateWrite,
bool waitingForRememberShard,
Action<TEvent> afterUpdateCallback)
where TEvent : IDomainEvent
{
bool Receive(object message)
return ReceiveDelegate;

bool ReceiveDelegate(object message)
{
switch (message)
{
Expand Down Expand Up @@ -382,7 +385,7 @@ bool Receive(object message)
m.ErrorMessage,
evt,
_terminating ? "Coordinator will be terminated due to Terminate message received"
: "Coordinator will be restarted");
: "Coordinator will be restarted");
if (_terminating)
{
Context.Stop(Self);
Expand All @@ -405,7 +408,7 @@ bool Receive(object message)
return true;

case RememberEntitiesCoordinatorStore.UpdateDone m:
if (!shardId.Contains(m.ShardId))
if (shardId != null && !shardId.Equals(m.ShardId))
{
Log.Warning("{0}: Saw remember entities update complete for shard id [{1}], while waiting for [{2}]",
TypeName,
Expand Down Expand Up @@ -438,7 +441,7 @@ bool Receive(object message)
return true;

case RememberEntitiesCoordinatorStore.UpdateFailed m:
if (shardId.Contains(m.ShardId))
if (shardId != null && shardId.Equals(m.ShardId))
{
OnRememberEntitiesUpdateFailed(m.ShardId);
}
Expand All @@ -452,7 +455,7 @@ bool Receive(object message)
return true;

case RememberEntitiesTimeout m:
if (shardId.Contains(m.ShardId))
if (shardId != null && shardId.Equals(m.ShardId))
{
OnRememberEntitiesUpdateFailed(m.ShardId);
}
Expand All @@ -479,8 +482,6 @@ bool Receive(object message)
return true;
}
}

return Receive;
}

private void UnbecomeAfterUpdate<TEvent>(TEvent evt, Action<TEvent> afterUpdateCallback) where TEvent : IDomainEvent
Expand Down
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

#nullable enable
using System;
using System.Collections.Immutable;
using Akka.Actor;
Expand Down Expand Up @@ -77,7 +78,7 @@ protected override bool ReceiveRecover(object message)
switch (message)
{
case EventSourcedRememberEntitiesCoordinatorStore.MigrationMarker _:
case SnapshotOffer offer when offer.Snapshot is EventSourcedRememberEntitiesCoordinatorStore.State _:
case SnapshotOffer { Snapshot: EventSourcedRememberEntitiesCoordinatorStore.State }:
throw new IllegalStateException(
"state-store is set to persistence but a migration has taken place to remember-entities-store=eventsourced. You can not downgrade.");

Expand All @@ -87,10 +88,10 @@ protected override bool ReceiveRecover(object message)

switch (evt)
{
case ShardRegionRegistered _:
case ShardRegionRegistered:
State = State.Updated(evt);
return true;
case ShardRegionProxyRegistered _:
case ShardRegionProxyRegistered:
State = State.Updated(evt);
return true;
case ShardRegionTerminated regionTerminated:
Expand Down Expand Up @@ -125,7 +126,7 @@ protected override bool ReceiveRecover(object message)
return true;
}
return false;
case SnapshotOffer offer when offer.Snapshot is CoordinatorState state:
case SnapshotOffer { Snapshot: CoordinatorState state }:
if (VerboseDebug)
Log.Debug("{0}: receiveRecover SnapshotOffer {1}", TypeName, state);
State = state.WithRememberEntities(Settings.RememberEntities);
Expand All @@ -135,7 +136,7 @@ protected override bool ReceiveRecover(object message)
State = State.Copy(unallocatedShards: ImmutableHashSet<ShardId>.Empty);
return true;

case RecoveryCompleted _:
case RecoveryCompleted:
State = State.WithRememberEntities(Settings.RememberEntities);
_baseImpl.WatchStateActors();
return true;
Expand All @@ -158,12 +159,12 @@ private bool WaitingForStateInitialized(object message)
{
switch (message)
{
case Terminate _:
case Terminate:
Log.Debug("{0}: Received termination message before state was initialized", TypeName);
Context.Stop(Self);
return true;

case StateInitialized _:
case StateInitialized:
_baseImpl.ReceiveStateInitialized();
Log.Debug("{0}: Coordinator initialization completed", TypeName);
Context.Become(msg => _baseImpl.Active(msg) || ReceiveSnapshotResult(msg));
Expand All @@ -175,8 +176,7 @@ private bool WaitingForStateInitialized(object message)
return true;
}

if (_baseImpl.ReceiveTerminated(message)) return true;
else return ReceiveSnapshotResult(message);
return _baseImpl.ReceiveTerminated(message) || ReceiveSnapshotResult(message);
}

private bool ReceiveSnapshotResult(object message)
Expand Down

0 comments on commit 0c49fdb

Please sign in to comment.