Skip to content

Commit

Permalink
Port of #5999: Add Cluster.Sharding ActorInitializationException spec (
Browse files Browse the repository at this point in the history
…#6001)

* Add Cluster.Sharding ActorInitializationException spec

* Update API Verify list
  • Loading branch information
Arkatufus committed Jun 15, 2022
1 parent 52dfcc1 commit e3be344
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 0 deletions.
@@ -0,0 +1,169 @@
// //-----------------------------------------------------------------------
// // <copyright file="ShardEntityFailureSpec.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Sharding.Internal;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.Event;
using Akka.TestKit;
using Akka.Util;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Sharding.Tests
{
public class ShardEntityFailureSpec: AkkaSpec
{
private static readonly Config Config = ConfigurationFactory.ParseString(@"
akka.loglevel = DEBUG
akka.actor.provider = cluster
akka.persistence.journal.plugin = ""akka.persistence.journal.inmem""
akka.remote.dot-netty.tcp.port = 0")
.WithFallback(ClusterSingletonManager.DefaultConfig())
.WithFallback(ClusterSharding.DefaultConfig());

private sealed class EntityEnvelope
{
public readonly long Id;
public readonly object Payload;
public EntityEnvelope(long id, object payload)
{
Id = id;
Payload = payload;
}
}

public ShardEntityFailureSpec(ITestOutputHelper helper) : base(Config, helper)
{
}

private class ConstructorFailActor : ActorBase
{
private static bool _thrown;
private readonly ILoggingAdapter _log = Context.GetLogger();

public ConstructorFailActor()
{
if (!_thrown)
{
_thrown = true;
throw new Exception("EXPLODING CONSTRUCTOR!");
}
}

protected override bool Receive(object message)
{
_log.Info("Msg {0}", message);
Sender.Tell($"ack {message}");
return true;
}
}

private class PreStartFailActor : ActorBase
{
private static bool _thrown;
private readonly ILoggingAdapter _log = Context.GetLogger();

protected override void PreStart()
{
base.PreStart();
if (!_thrown)
{
_thrown = true;
throw new Exception("EXPLODING PRE-START!");
}
}

protected override bool Receive(object message)
{
_log.Info("Msg {0}", message);
Sender.Tell($"ack {message}");
return true;
}
}

[Theory(DisplayName = "Persistent shard must recover from transient failures inside sharding entity constructor and PreStart method")]
[MemberData(nameof(PropsFactory))]
public async Task Persistent_Shard_must_recover_from_failing_entity(Props entityProp)
{
ExtractEntityId extractEntityId = message =>
{
switch (message)
{
case EntityEnvelope env:
return (env.Id.ToString(), env.Payload);
}
return Option<(string, object)>.None;
};

ExtractShardId extractShardId = message =>
{
switch (message)
{
case EntityEnvelope msg:
return msg.Id.ToString();
}
return null;
};

var settings = ClusterShardingSettings.Create(Sys);
settings = settings.WithTuningParameters(settings.TuningParameters.WithEntityRestartBackoff(1.Seconds()));
var provider = new EventSourcedRememberEntitiesProvider("cats", settings);

var props = Props.Create(() => new Shard(
"cats",
"shard-1",
_ => entityProp,
settings,
extractEntityId,
extractShardId,
PoisonPill.Instance,
provider
));

Sys.EventStream.Subscribe<Error>(TestActor);

var persistentShard = Sys.ActorOf(props);

persistentShard.Tell(new EntityEnvelope(1, "Start"));

// entity died here
var err = ExpectMsg<Error>();
err.Cause.Should().BeOfType<ActorInitializationException>();

// Need to wait for the internal state to reset, else everything we sent will go to dead letter
await AwaitConditionAsync(() =>
{
persistentShard.Tell(Shard.GetCurrentShardState.Instance);
var failedState = ExpectMsg<Shard.CurrentShardState>();
return failedState.EntityIds.Count == 0;
});

// entity should be restarted when it received this message
persistentShard.Tell(new EntityEnvelope(1, "Restarted"));
ExpectMsg("ack Restarted");

persistentShard.Tell(Shard.GetCurrentShardState.Instance);
var state = ExpectMsg<Shard.CurrentShardState>();
state.EntityIds.Count.Should().Be(1);
state.EntityIds.First().Should().Be("1");
}

public static IEnumerable<object[]> PropsFactory()
{
yield return new object[] { Props.Create(() => new PreStartFailActor()) };
yield return new object[] { Props.Create(() => new ConstructorFailActor()) };
}
}
}
Expand Up @@ -164,6 +164,96 @@ public class TuningParameters
LeastShardAllocationAbsoluteLimit = leastShardAllocationAbsoluteLimit;
LeastShardAllocationRelativeLimit = leastShardAllocationRelativeLimit;
}

public TuningParameters WithCoordinatorFailureBackoff(TimeSpan coordinatorFailureBackoff)
=> Copy(coordinatorFailureBackoff: coordinatorFailureBackoff);
public TuningParameters WithRetryInterval(TimeSpan retryInterval)
=> Copy(retryInterval: retryInterval);
public TuningParameters WithBufferSize(int bufferSize)
=> Copy(bufferSize: bufferSize);
public TuningParameters WithHandOffTimeout(TimeSpan handOffTimeout)
=> Copy(handOffTimeout: handOffTimeout);
public TuningParameters WithShardStartTimeout(TimeSpan shardStartTimeout)
=> Copy(shardStartTimeout: shardStartTimeout);
public TuningParameters WithShardFailureBackoff(TimeSpan shardFailureBackoff)
=> Copy(shardFailureBackoff: shardFailureBackoff);
public TuningParameters WithEntityRestartBackoff(TimeSpan entityRestartBackoff)
=> Copy(entityRestartBackoff: entityRestartBackoff);
public TuningParameters WithRebalanceInterval(TimeSpan rebalanceInterval)
=> Copy(rebalanceInterval: rebalanceInterval);
public TuningParameters WithSnapshotAfter(int snapshotAfter)
=> Copy(snapshotAfter: snapshotAfter);
public TuningParameters WithKeepNrOfBatches(int keepNrOfBatches)
=> Copy(keepNrOfBatches: keepNrOfBatches);
public TuningParameters WithLeastShardAllocationRebalanceThreshold(int leastShardAllocationRebalanceThreshold)
=> Copy(leastShardAllocationRebalanceThreshold: leastShardAllocationRebalanceThreshold);
public TuningParameters WithLeastShardAllocationMaxSimultaneousRebalance(int leastShardAllocationMaxSimultaneousRebalance)
=> Copy(leastShardAllocationMaxSimultaneousRebalance: leastShardAllocationMaxSimultaneousRebalance);
public TuningParameters WithWaitingForStateTimeout(TimeSpan waitingForStateTimeout)
=> Copy(waitingForStateTimeout: waitingForStateTimeout);
public TuningParameters WithUpdatingStateTimeout(TimeSpan updatingStateTimeout)
=> Copy(updatingStateTimeout: updatingStateTimeout);
public TuningParameters WithEntityRecoveryStrategy(string entityRecoveryStrategy)
=> Copy(entityRecoveryStrategy: entityRecoveryStrategy);
public TuningParameters WithEntityRecoveryConstantRateStrategyFrequency(TimeSpan entityRecoveryConstantRateStrategyFrequency)
=> Copy(entityRecoveryConstantRateStrategyFrequency: entityRecoveryConstantRateStrategyFrequency);
public TuningParameters WithEntityRecoveryConstantRateStrategyNumberOfEntities(int entityRecoveryConstantRateStrategyNumberOfEntities)
=> Copy(entityRecoveryConstantRateStrategyNumberOfEntities: entityRecoveryConstantRateStrategyNumberOfEntities);
public TuningParameters WithCoordinatorStateWriteMajorityPlus(int coordinatorStateWriteMajorityPlus)
=> Copy(coordinatorStateWriteMajorityPlus: coordinatorStateWriteMajorityPlus);
public TuningParameters WithCoordinatorStateReadMajorityPlus(int coordinatorStateReadMajorityPlus)
=> Copy(coordinatorStateReadMajorityPlus: coordinatorStateReadMajorityPlus);
public TuningParameters WithLeastShardAllocationAbsoluteLimit(int leastShardAllocationAbsoluteLimit)
=> Copy(leastShardAllocationAbsoluteLimit: leastShardAllocationAbsoluteLimit);
public TuningParameters WithLeastShardAllocationRelativeLimit(double leastShardAllocationRelativeLimit)
=> Copy(leastShardAllocationRelativeLimit: leastShardAllocationRelativeLimit);

private TuningParameters Copy(
TimeSpan? coordinatorFailureBackoff = null,
TimeSpan? retryInterval = null,
int? bufferSize = null,
TimeSpan? handOffTimeout = null,
TimeSpan? shardStartTimeout = null,
TimeSpan? shardFailureBackoff = null,
TimeSpan? entityRestartBackoff = null,
TimeSpan? rebalanceInterval = null,
int? snapshotAfter = null,
int? keepNrOfBatches = null,
int? leastShardAllocationRebalanceThreshold = null,
int? leastShardAllocationMaxSimultaneousRebalance = null,
TimeSpan? waitingForStateTimeout = null,
TimeSpan? updatingStateTimeout = null,
string entityRecoveryStrategy = null,
TimeSpan? entityRecoveryConstantRateStrategyFrequency = null,
int? entityRecoveryConstantRateStrategyNumberOfEntities = null,
int? coordinatorStateWriteMajorityPlus = null,
int? coordinatorStateReadMajorityPlus = null,
int? leastShardAllocationAbsoluteLimit = null,
double? leastShardAllocationRelativeLimit = null)
=> new TuningParameters(
coordinatorFailureBackoff: coordinatorFailureBackoff ?? CoordinatorFailureBackoff,
retryInterval: retryInterval ?? RetryInterval,
bufferSize: bufferSize ?? BufferSize,
handOffTimeout: handOffTimeout ?? HandOffTimeout,
shardStartTimeout: shardStartTimeout ?? ShardStartTimeout,
shardFailureBackoff: shardFailureBackoff ?? ShardFailureBackoff,
entityRestartBackoff: entityRestartBackoff ?? EntityRestartBackoff,
rebalanceInterval: rebalanceInterval ?? RebalanceInterval,
snapshotAfter: snapshotAfter ?? SnapshotAfter,
keepNrOfBatches: keepNrOfBatches ?? KeepNrOfBatches,
leastShardAllocationRebalanceThreshold: leastShardAllocationRebalanceThreshold ?? LeastShardAllocationRebalanceThreshold,
leastShardAllocationMaxSimultaneousRebalance: leastShardAllocationMaxSimultaneousRebalance ?? LeastShardAllocationMaxSimultaneousRebalance,
waitingForStateTimeout: waitingForStateTimeout ?? WaitingForStateTimeout,
updatingStateTimeout: updatingStateTimeout ?? UpdatingStateTimeout,
entityRecoveryStrategy: entityRecoveryStrategy ?? EntityRecoveryStrategy,
entityRecoveryConstantRateStrategyFrequency: entityRecoveryConstantRateStrategyFrequency ?? EntityRecoveryConstantRateStrategyFrequency,
entityRecoveryConstantRateStrategyNumberOfEntities:
entityRecoveryConstantRateStrategyNumberOfEntities ?? EntityRecoveryConstantRateStrategyNumberOfEntities,
coordinatorStateWriteMajorityPlus: coordinatorStateWriteMajorityPlus ?? CoordinatorStateWriteMajorityPlus,
coordinatorStateReadMajorityPlus: coordinatorStateReadMajorityPlus ?? CoordinatorStateReadMajorityPlus,
leastShardAllocationAbsoluteLimit: leastShardAllocationAbsoluteLimit ?? LeastShardAllocationAbsoluteLimit,
leastShardAllocationRelativeLimit: leastShardAllocationRelativeLimit ?? LeastShardAllocationRelativeLimit
);
}

public enum StateStoreMode
Expand Down
Expand Up @@ -309,6 +309,27 @@ namespace Akka.Cluster.Sharding
int coordinatorStateReadMajorityPlus,
int leastShardAllocationAbsoluteLimit,
double leastShardAllocationRelativeLimit) { }
public Akka.Cluster.Sharding.TuningParameters WithBufferSize(int bufferSize) { }
public Akka.Cluster.Sharding.TuningParameters WithCoordinatorFailureBackoff(System.TimeSpan coordinatorFailureBackoff) { }
public Akka.Cluster.Sharding.TuningParameters WithCoordinatorStateReadMajorityPlus(int coordinatorStateReadMajorityPlus) { }
public Akka.Cluster.Sharding.TuningParameters WithCoordinatorStateWriteMajorityPlus(int coordinatorStateWriteMajorityPlus) { }
public Akka.Cluster.Sharding.TuningParameters WithEntityRecoveryConstantRateStrategyFrequency(System.TimeSpan entityRecoveryConstantRateStrategyFrequency) { }
public Akka.Cluster.Sharding.TuningParameters WithEntityRecoveryConstantRateStrategyNumberOfEntities(int entityRecoveryConstantRateStrategyNumberOfEntities) { }
public Akka.Cluster.Sharding.TuningParameters WithEntityRecoveryStrategy(string entityRecoveryStrategy) { }
public Akka.Cluster.Sharding.TuningParameters WithEntityRestartBackoff(System.TimeSpan entityRestartBackoff) { }
public Akka.Cluster.Sharding.TuningParameters WithHandOffTimeout(System.TimeSpan handOffTimeout) { }
public Akka.Cluster.Sharding.TuningParameters WithKeepNrOfBatches(int keepNrOfBatches) { }
public Akka.Cluster.Sharding.TuningParameters WithLeastShardAllocationAbsoluteLimit(int leastShardAllocationAbsoluteLimit) { }
public Akka.Cluster.Sharding.TuningParameters WithLeastShardAllocationMaxSimultaneousRebalance(int leastShardAllocationMaxSimultaneousRebalance) { }
public Akka.Cluster.Sharding.TuningParameters WithLeastShardAllocationRebalanceThreshold(int leastShardAllocationRebalanceThreshold) { }
public Akka.Cluster.Sharding.TuningParameters WithLeastShardAllocationRelativeLimit(double leastShardAllocationRelativeLimit) { }
public Akka.Cluster.Sharding.TuningParameters WithRebalanceInterval(System.TimeSpan rebalanceInterval) { }
public Akka.Cluster.Sharding.TuningParameters WithRetryInterval(System.TimeSpan retryInterval) { }
public Akka.Cluster.Sharding.TuningParameters WithShardFailureBackoff(System.TimeSpan shardFailureBackoff) { }
public Akka.Cluster.Sharding.TuningParameters WithShardStartTimeout(System.TimeSpan shardStartTimeout) { }
public Akka.Cluster.Sharding.TuningParameters WithSnapshotAfter(int snapshotAfter) { }
public Akka.Cluster.Sharding.TuningParameters WithUpdatingStateTimeout(System.TimeSpan updatingStateTimeout) { }
public Akka.Cluster.Sharding.TuningParameters WithWaitingForStateTimeout(System.TimeSpan waitingForStateTimeout) { }
}
}
namespace Akka.Cluster.Sharding.External
Expand Down
Expand Up @@ -309,6 +309,27 @@ namespace Akka.Cluster.Sharding
int coordinatorStateReadMajorityPlus,
int leastShardAllocationAbsoluteLimit,
double leastShardAllocationRelativeLimit) { }
public Akka.Cluster.Sharding.TuningParameters WithBufferSize(int bufferSize) { }
public Akka.Cluster.Sharding.TuningParameters WithCoordinatorFailureBackoff(System.TimeSpan coordinatorFailureBackoff) { }
public Akka.Cluster.Sharding.TuningParameters WithCoordinatorStateReadMajorityPlus(int coordinatorStateReadMajorityPlus) { }
public Akka.Cluster.Sharding.TuningParameters WithCoordinatorStateWriteMajorityPlus(int coordinatorStateWriteMajorityPlus) { }
public Akka.Cluster.Sharding.TuningParameters WithEntityRecoveryConstantRateStrategyFrequency(System.TimeSpan entityRecoveryConstantRateStrategyFrequency) { }
public Akka.Cluster.Sharding.TuningParameters WithEntityRecoveryConstantRateStrategyNumberOfEntities(int entityRecoveryConstantRateStrategyNumberOfEntities) { }
public Akka.Cluster.Sharding.TuningParameters WithEntityRecoveryStrategy(string entityRecoveryStrategy) { }
public Akka.Cluster.Sharding.TuningParameters WithEntityRestartBackoff(System.TimeSpan entityRestartBackoff) { }
public Akka.Cluster.Sharding.TuningParameters WithHandOffTimeout(System.TimeSpan handOffTimeout) { }
public Akka.Cluster.Sharding.TuningParameters WithKeepNrOfBatches(int keepNrOfBatches) { }
public Akka.Cluster.Sharding.TuningParameters WithLeastShardAllocationAbsoluteLimit(int leastShardAllocationAbsoluteLimit) { }
public Akka.Cluster.Sharding.TuningParameters WithLeastShardAllocationMaxSimultaneousRebalance(int leastShardAllocationMaxSimultaneousRebalance) { }
public Akka.Cluster.Sharding.TuningParameters WithLeastShardAllocationRebalanceThreshold(int leastShardAllocationRebalanceThreshold) { }
public Akka.Cluster.Sharding.TuningParameters WithLeastShardAllocationRelativeLimit(double leastShardAllocationRelativeLimit) { }
public Akka.Cluster.Sharding.TuningParameters WithRebalanceInterval(System.TimeSpan rebalanceInterval) { }
public Akka.Cluster.Sharding.TuningParameters WithRetryInterval(System.TimeSpan retryInterval) { }
public Akka.Cluster.Sharding.TuningParameters WithShardFailureBackoff(System.TimeSpan shardFailureBackoff) { }
public Akka.Cluster.Sharding.TuningParameters WithShardStartTimeout(System.TimeSpan shardStartTimeout) { }
public Akka.Cluster.Sharding.TuningParameters WithSnapshotAfter(int snapshotAfter) { }
public Akka.Cluster.Sharding.TuningParameters WithUpdatingStateTimeout(System.TimeSpan updatingStateTimeout) { }
public Akka.Cluster.Sharding.TuningParameters WithWaitingForStateTimeout(System.TimeSpan waitingForStateTimeout) { }
}
}
namespace Akka.Cluster.Sharding.External
Expand Down

0 comments on commit e3be344

Please sign in to comment.