Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow AtLeastOnceDelivery parameters to be set from deriving classes (as intended) #3810

Merged
merged 11 commits into from Jul 20, 2019
Expand Up @@ -16,9 +16,10 @@ namespace Akka.Persistence
{
protected AtLeastOnceDeliveryActor() { }
protected AtLeastOnceDeliveryActor(Akka.Persistence.PersistenceSettings.AtLeastOnceDeliverySettings settings) { }
protected AtLeastOnceDeliveryActor(System.Func<Akka.Persistence.PersistenceSettings.AtLeastOnceDeliverySettings, Akka.Persistence.PersistenceSettings.AtLeastOnceDeliverySettings> overrideSettings) { }
public int MaxUnconfirmedMessages { get; }
public virtual System.TimeSpan RedeliverInterval { get; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with this change.

public virtual int RedeliveryBurstLimit { get; }
public System.TimeSpan RedeliverInterval { get; }
public int RedeliveryBurstLimit { get; }
public int UnconfirmedCount { get; }
public int WarnAfterNumberOfUnconfirmedAttempts { get; }
public override void AroundPostStop() { }
Expand All @@ -35,8 +36,9 @@ namespace Akka.Persistence
{
protected AtLeastOnceDeliveryReceiveActor() { }
protected AtLeastOnceDeliveryReceiveActor(Akka.Persistence.PersistenceSettings.AtLeastOnceDeliverySettings settings) { }
protected AtLeastOnceDeliveryReceiveActor(System.Func<Akka.Persistence.PersistenceSettings.AtLeastOnceDeliverySettings, Akka.Persistence.PersistenceSettings.AtLeastOnceDeliverySettings> overrideSettings) { }
public int MaxUnconfirmedMessages { get; }
public virtual System.TimeSpan RedeliverInterval { get; }
public System.TimeSpan RedeliverInterval { get; }
public int RedeliveryBurstLimit { get; }
public int UnconfirmedCount { get; }
public int WarnAfterNumberOfUnconfirmedAttempts { get; }
Expand All @@ -53,11 +55,11 @@ namespace Akka.Persistence
public class AtLeastOnceDeliverySemantic
{
public AtLeastOnceDeliverySemantic(Akka.Actor.IActorContext context, Akka.Persistence.PersistenceSettings.AtLeastOnceDeliverySettings settings) { }
public virtual int MaxUnconfirmedMessages { get; }
public virtual System.TimeSpan RedeliverInterval { get; }
public virtual int RedeliveryBurstLimit { get; }
public int MaxUnconfirmedMessages { get; }
public System.TimeSpan RedeliverInterval { get; }
public int RedeliveryBurstLimit { get; }
public int UnconfirmedCount { get; }
public virtual int WarnAfterNumberOfUnconfirmedAttempts { get; }
public int WarnAfterNumberOfUnconfirmedAttempts { get; }
public bool AroundReceive(Akka.Actor.Receive receive, object message) { }
public void Cancel() { }
public bool ConfirmDelivery(long deliveryId) { }
Expand Down
Expand Up @@ -136,7 +136,8 @@ internal class ChaosSender : AtLeastOnceDeliveryActor

public ILoggingAdapter Log { get { return _log ?? (_log = Context.GetLogger()); }}

public ChaosSender(IActorRef destination, IActorRef probe)
public ChaosSender(IActorRef destination, IActorRef probe)
: base(x => x.WithRedeliverInterval(TimeSpan.FromMilliseconds(500)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new ctor overload, see comment on the base class

{
_destination = destination;
Probe = probe;
Expand All @@ -147,8 +148,6 @@ public ChaosSender(IActorRef destination, IActorRef probe)
_replayProcessingFailureRate = _config.GetDouble("replay-processing-failure-rate");
}

public override TimeSpan RedeliverInterval { get { return TimeSpan.FromMilliseconds(500); } }

public override string PersistenceId { get { return "chaosSender"; } }

protected override bool ReceiveRecover(object message)
Expand Down
30 changes: 20 additions & 10 deletions src/core/Akka.Persistence/AtLeastOnceDelivery.cs
Expand Up @@ -61,14 +61,24 @@ protected AtLeastOnceDeliveryActor(PersistenceSettings.AtLeastOnceDeliverySettin
_atLeastOnceDeliverySemantic = new AtLeastOnceDeliverySemantic(Context, settings);
}

/// <summary>
/// Initializes a new instance of the <see cref="AtLeastOnceDeliveryActor"/> class.
/// </summary>
/// <param name="overrideSettings">A lambda to tweak the default AtLeastOnceDelivery settings.</param>
protected AtLeastOnceDeliveryActor(Func<PersistenceSettings.AtLeastOnceDeliverySettings, PersistenceSettings.AtLeastOnceDeliverySettings> overrideSettings)
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
{
var settings = overrideSettings(Extension.Settings.AtLeastOnceDelivery);
_atLeastOnceDeliverySemantic = new AtLeastOnceDeliverySemantic(Context, settings);
}

/// <summary>
/// Interval between redelivery attempts.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.redeliver-interval'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key. Custom value may be provided via the
/// <see cref="AtLeastOnceDeliveryActor(PersistenceSettings.AtLeastOnceDeliverySettings)"/> constructor.
/// </summary>
public virtual TimeSpan RedeliverInterval => _atLeastOnceDeliverySemantic.RedeliverInterval;
public TimeSpan RedeliverInterval => _atLeastOnceDeliverySemantic.RedeliverInterval;

/// <summary>
/// Maximum number of unconfirmed messages that will be sent at each redelivery burst
Expand All @@ -77,18 +87,18 @@ protected AtLeastOnceDeliveryActor(PersistenceSettings.AtLeastOnceDeliverySettin
/// this helps prevent an overwhelming amount of messages to be sent at once.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.redelivery-burst-limit'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key. Custom value may be provided via the
/// <see cref="AtLeastOnceDeliveryActor(PersistenceSettings.AtLeastOnceDeliverySettings)"/> constructor.
/// </summary>
public virtual int RedeliveryBurstLimit => _atLeastOnceDeliverySemantic.RedeliveryBurstLimit;
public int RedeliveryBurstLimit => _atLeastOnceDeliverySemantic.RedeliveryBurstLimit;

/// <summary>
/// After this number of delivery attempts a <see cref="UnconfirmedWarning" /> message will be sent to
/// <see cref="ActorBase.Self" />. The count is reset after restart.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key. Custom value may be provided via the
/// <see cref="AtLeastOnceDeliveryActor(PersistenceSettings.AtLeastOnceDeliverySettings)"/> constructor.
/// </summary>
public int WarnAfterNumberOfUnconfirmedAttempts => _atLeastOnceDeliverySemantic.WarnAfterNumberOfUnconfirmedAttempts;

Expand All @@ -98,8 +108,8 @@ protected AtLeastOnceDeliveryActor(PersistenceSettings.AtLeastOnceDeliverySettin
/// messages and it will throw <see cref="MaxUnconfirmedMessagesExceededException" />.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.max-unconfirmed-messages'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key. Custom value may be provided via the
/// <see cref="AtLeastOnceDeliveryActor(PersistenceSettings.AtLeastOnceDeliverySettings)"/> constructor.
/// </summary>
public int MaxUnconfirmedMessages => _atLeastOnceDeliverySemantic.MaxUnconfirmedMessages;

Expand Down
30 changes: 19 additions & 11 deletions src/core/Akka.Persistence/AtLeastOnceDeliveryReceiveActor.cs
Expand Up @@ -23,27 +23,35 @@ public abstract class AtLeastOnceDeliveryReceiveActor : ReceivePersistentActor
protected AtLeastOnceDeliveryReceiveActor()
{
_atLeastOnceDeliverySemantic = new AtLeastOnceDeliverySemantic(Context, Extension.Settings.AtLeastOnceDelivery);

}

/// <summary>
/// Initializes a new instance of the <see cref="AtLeastOnceDeliveryReceiveActor"/> class.
/// </summary>
/// <param name="settings">TBD</param>
/// <param name="settings">Custom AtLeastOnceDelivery settings</param>
protected AtLeastOnceDeliveryReceiveActor(PersistenceSettings.AtLeastOnceDeliverySettings settings)
{
_atLeastOnceDeliverySemantic = new AtLeastOnceDeliverySemantic(Context, settings);
}

/// <summary>
/// Initializes a new instance of the <see cref="AtLeastOnceDeliveryReceiveActor"/> class.
/// </summary>
/// <param name="overrideSettings">A lambda to tweak the default AtLeastOnceDelivery settings.</param>
protected AtLeastOnceDeliveryReceiveActor(Func<PersistenceSettings.AtLeastOnceDeliverySettings, PersistenceSettings.AtLeastOnceDeliverySettings> overrideSettings)
{
var settings = overrideSettings(Extension.Settings.AtLeastOnceDelivery);
_atLeastOnceDeliverySemantic = new AtLeastOnceDeliverySemantic(Context, settings);
}

/// <summary>
/// Interval between redelivery attempts.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.redeliver-interval'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key. Custom value may be provided via the
/// <see cref="AtLeastOnceDeliveryReceiveActor(PersistenceSettings.AtLeastOnceDeliverySettings)"/> constructor.
/// </summary>
public virtual TimeSpan RedeliverInterval => _atLeastOnceDeliverySemantic.RedeliverInterval;
public TimeSpan RedeliverInterval => _atLeastOnceDeliverySemantic.RedeliverInterval;

/// <summary>
/// Maximum number of unconfirmed messages that will be sent at each redelivery burst
Expand All @@ -52,8 +60,8 @@ protected AtLeastOnceDeliveryReceiveActor(PersistenceSettings.AtLeastOnceDeliver
/// this helps prevent an overwhelming amount of messages to be sent at once.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.redelivery-burst-limit'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key. Custom value may be provided via the
/// <see cref="AtLeastOnceDeliveryReceiveActor(PersistenceSettings.AtLeastOnceDeliverySettings)"/> constructor.
/// </summary>
public int RedeliveryBurstLimit => _atLeastOnceDeliverySemantic.RedeliveryBurstLimit;

Expand All @@ -62,8 +70,8 @@ protected AtLeastOnceDeliveryReceiveActor(PersistenceSettings.AtLeastOnceDeliver
/// <see cref="ActorBase.Self" />. The count is reset after restart.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key. Custom value may be provided via the
/// <see cref="AtLeastOnceDeliveryReceiveActor(PersistenceSettings.AtLeastOnceDeliverySettings)"/> constructor.
/// </summary>
public int WarnAfterNumberOfUnconfirmedAttempts => _atLeastOnceDeliverySemantic.WarnAfterNumberOfUnconfirmedAttempts;

Expand All @@ -73,8 +81,8 @@ protected AtLeastOnceDeliveryReceiveActor(PersistenceSettings.AtLeastOnceDeliver
/// messages and it will throw <see cref="MaxUnconfirmedMessagesExceededException" />.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.max-unconfirmed-messages'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key. Custom value may be provided via the
/// <see cref="AtLeastOnceDeliveryReceiveActor(PersistenceSettings.AtLeastOnceDeliverySettings)"/> constructor.
/// </summary>
public int MaxUnconfirmedMessages => _atLeastOnceDeliverySemantic.MaxUnconfirmedMessages;

Expand Down
31 changes: 15 additions & 16 deletions src/core/Akka.Persistence/AtLeastOnceDeliverySemantic.cs
Expand Up @@ -315,7 +315,6 @@ public sealed class RedeliveryTick : INotInfluenceReceiveTimeout, IDeadLetterSup
private readonly IActorContext _context;
private long _deliverySequenceNr;
private ICancelable _redeliverScheduleCancelable;
private readonly PersistenceSettings.AtLeastOnceDeliverySettings _settings;
private ImmutableSortedDictionary<long, Delivery> _unconfirmed = ImmutableSortedDictionary<long, Delivery>.Empty;

/// <summary>
Expand All @@ -326,18 +325,21 @@ public sealed class RedeliveryTick : INotInfluenceReceiveTimeout, IDeadLetterSup
public AtLeastOnceDeliverySemantic(IActorContext context, PersistenceSettings.AtLeastOnceDeliverySettings settings)
{
_context = context;
_settings = settings;
_deliverySequenceNr = 0;

RedeliverInterval = settings.RedeliverInterval;
RedeliveryBurstLimit = settings.RedeliveryBurstLimit;
WarnAfterNumberOfUnconfirmedAttempts = settings.WarnAfterNumberOfUnconfirmedAttempts;
MaxUnconfirmedMessages = settings.MaxUnconfirmedMessages;
}

/// <summary>
/// Interval between redelivery attempts.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.redeliver-interval'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key.
/// </summary>
public virtual TimeSpan RedeliverInterval => _settings.RedeliverInterval;
public TimeSpan RedeliverInterval { get; }

/// <summary>
/// Maximum number of unconfirmed messages that will be sent at each redelivery burst
Expand All @@ -346,31 +348,28 @@ public AtLeastOnceDeliverySemantic(IActorContext context, PersistenceSettings.At
/// this helps prevent an overwhelming amount of messages to be sent at once.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.redelivery-burst-limit'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key.
/// </summary>
public virtual int RedeliveryBurstLimit => _settings.RedeliveryBurstLimit;
public int RedeliveryBurstLimit { get; }

/// <summary>
/// After this number of delivery attempts a <see cref="UnconfirmedWarning" /> message will be sent to
/// <see cref="ActorBase.Self" />. The count is reset after restart.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key.
/// </summary>
public virtual int WarnAfterNumberOfUnconfirmedAttempts => _settings.WarnAfterNumberOfUnconfirmedAttempts;
public int WarnAfterNumberOfUnconfirmedAttempts { get; }

/// <summary>
/// Maximum number of unconfirmed messages, that this actor is allowed to hold in the memory.
/// if this number is exceeded, <see cref="AtLeastOnceDeliverySemantic.Deliver" /> will not accept more
/// messages and it will throw <see cref="MaxUnconfirmedMessagesExceededException" />.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.max-unconfirmed-messages'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key.
/// </summary>
public virtual int MaxUnconfirmedMessages => _settings.MaxUnconfirmedMessages;
public int MaxUnconfirmedMessages { get; }

/// <summary>
/// Number of messages, that have not been confirmed yet.
Expand All @@ -380,8 +379,8 @@ public AtLeastOnceDeliverySemantic(IActorContext context, PersistenceSettings.At
private void StartRedeliverTask()
{
if (_redeliverScheduleCancelable != null) return;
var interval = new TimeSpan(RedeliverInterval.Ticks / 2);
_redeliverScheduleCancelable = _context.System.Scheduler.ScheduleTellRepeatedlyCancelable(interval, interval, _context.Self,
var delay = new TimeSpan(RedeliverInterval.Ticks / 2);
_redeliverScheduleCancelable = _context.System.Scheduler.ScheduleTellRepeatedlyCancelable(delay, RedeliverInterval, _context.Self,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this a bug? The first parameter is the initial delay of the very first callback. The second parameter is then the interval this will fire in, so this should be the whole RedeliverInterval, not half?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, must have been a bug. Nice catch.

RedeliveryTick.Instance, _context.Self);
}

Expand Down