Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow AtLeastOnceDelivery parameters to be set from deriving classes (as intended) #3810

Merged
merged 11 commits into from
Jul 20, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ public ChaosSender(IActorRef destination, IActorRef probe)
_config = Context.System.Settings.Config.GetConfig("akka.persistence.sender.chaos");
_liveProcessingFailureRate = _config.GetDouble("live-processing-failure-rate");
_replayProcessingFailureRate = _config.GetDouble("replay-processing-failure-rate");
}

public override TimeSpan RedeliverInterval { get { return TimeSpan.FromMilliseconds(500); } }
RedeliverInterval = TimeSpan.FromMilliseconds(500);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good example how these properties are now intended to be used; just set them anywhere from within the actor instead of overwriting.

}

public override string PersistenceId { get { return "chaosSender"; } }

Expand Down
36 changes: 24 additions & 12 deletions src/core/Akka.Persistence/AtLeastOnceDelivery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ protected AtLeastOnceDeliveryActor(PersistenceSettings.AtLeastOnceDeliverySettin
/// Interval between redelivery attempts.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.redeliver-interval'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key.
/// </summary>
public virtual TimeSpan RedeliverInterval => _atLeastOnceDeliverySemantic.RedeliverInterval;
public TimeSpan RedeliverInterval
{
get => _atLeastOnceDeliverySemantic.RedeliverInterval;
set => _atLeastOnceDeliverySemantic.RedeliverInterval = value;
}

/// <summary>
/// Maximum number of unconfirmed messages that will be sent at each redelivery burst
Expand All @@ -77,31 +80,40 @@ protected AtLeastOnceDeliveryActor(PersistenceSettings.AtLeastOnceDeliverySettin
/// this helps prevent an overwhelming amount of messages to be sent at once.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.redelivery-burst-limit'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key.
/// </summary>
public virtual int RedeliveryBurstLimit => _atLeastOnceDeliverySemantic.RedeliveryBurstLimit;
public int RedeliveryBurstLimit
{
get => _atLeastOnceDeliverySemantic.RedeliveryBurstLimit;
set => _atLeastOnceDeliverySemantic.RedeliveryBurstLimit = value;
}

/// <summary>
/// After this number of delivery attempts a <see cref="UnconfirmedWarning" /> message will be sent to
/// <see cref="ActorBase.Self" />. The count is reset after restart.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key.
/// </summary>
public int WarnAfterNumberOfUnconfirmedAttempts => _atLeastOnceDeliverySemantic.WarnAfterNumberOfUnconfirmedAttempts;
public int WarnAfterNumberOfUnconfirmedAttempts
{
get => _atLeastOnceDeliverySemantic.WarnAfterNumberOfUnconfirmedAttempts;
set => _atLeastOnceDeliverySemantic.WarnAfterNumberOfUnconfirmedAttempts = value;
}

/// <summary>
/// Maximum number of unconfirmed messages, that this actor is allowed to hold in the memory.
/// if this number is exceeded, <see cref="AtLeastOnceDeliverySemantic.Deliver" /> will not accept more
/// messages and it will throw <see cref="MaxUnconfirmedMessagesExceededException" />.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.max-unconfirmed-messages'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key.
/// </summary>
public int MaxUnconfirmedMessages => _atLeastOnceDeliverySemantic.MaxUnconfirmedMessages;
public int MaxUnconfirmedMessages
{
get => _atLeastOnceDeliverySemantic.MaxUnconfirmedMessages;
set => _atLeastOnceDeliverySemantic.MaxUnconfirmedMessages = value;
}

/// <summary>
/// Number of messages that have not been confirmed yet.
Expand Down
36 changes: 24 additions & 12 deletions src/core/Akka.Persistence/AtLeastOnceDeliveryReceiveActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ protected AtLeastOnceDeliveryReceiveActor(PersistenceSettings.AtLeastOnceDeliver
/// Interval between redelivery attempts.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.redeliver-interval'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key.
/// </summary>
public virtual TimeSpan RedeliverInterval => _atLeastOnceDeliverySemantic.RedeliverInterval;
public TimeSpan RedeliverInterval
{
get => _atLeastOnceDeliverySemantic.RedeliverInterval;
set => _atLeastOnceDeliverySemantic.RedeliverInterval = value;
}

/// <summary>
/// Maximum number of unconfirmed messages that will be sent at each redelivery burst
Expand All @@ -52,31 +55,40 @@ protected AtLeastOnceDeliveryReceiveActor(PersistenceSettings.AtLeastOnceDeliver
/// this helps prevent an overwhelming amount of messages to be sent at once.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.redelivery-burst-limit'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key.
/// </summary>
public int RedeliveryBurstLimit => _atLeastOnceDeliverySemantic.RedeliveryBurstLimit;
public int RedeliveryBurstLimit
{
get => _atLeastOnceDeliverySemantic.RedeliveryBurstLimit;
set => _atLeastOnceDeliverySemantic.RedeliveryBurstLimit = value;
}

/// <summary>
/// After this number of delivery attempts a <see cref="UnconfirmedWarning" /> message will be sent to
/// <see cref="ActorBase.Self" />. The count is reset after restart.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key.
/// </summary>
public int WarnAfterNumberOfUnconfirmedAttempts => _atLeastOnceDeliverySemantic.WarnAfterNumberOfUnconfirmedAttempts;
public int WarnAfterNumberOfUnconfirmedAttempts
{
get => _atLeastOnceDeliverySemantic.WarnAfterNumberOfUnconfirmedAttempts;
set => _atLeastOnceDeliverySemantic.WarnAfterNumberOfUnconfirmedAttempts = value;
}

/// <summary>
/// Maximum number of unconfirmed messages, that this actor is allowed to hold in the memory.
/// if this number is exceeded, <see cref="AtLeastOnceDeliverySemantic.Deliver" /> will not accept more
/// messages and it will throw <see cref="MaxUnconfirmedMessagesExceededException" />.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.max-unconfirmed-messages'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key.
/// </summary>
public int MaxUnconfirmedMessages => _atLeastOnceDeliverySemantic.MaxUnconfirmedMessages;
public int MaxUnconfirmedMessages
{
get => _atLeastOnceDeliverySemantic.MaxUnconfirmedMessages;
set => _atLeastOnceDeliverySemantic.MaxUnconfirmedMessages = value;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these settings should be immutable and overloadable via the CTOR, as you mentioned @ondrejpialek

}

/// <summary>
/// Number of messages that have not been confirmed yet.
Expand Down
84 changes: 68 additions & 16 deletions src/core/Akka.Persistence/AtLeastOnceDeliverySemantic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,10 @@ private RedeliveryTick() { }
private readonly IActorContext _context;
private long _deliverySequenceNr;
private ICancelable _redeliverScheduleCancelable;
private readonly PersistenceSettings.AtLeastOnceDeliverySettings _settings;
private ImmutableSortedDictionary<long, Delivery> _unconfirmed = ImmutableSortedDictionary<long, Delivery>.Empty;
private TimeSpan _redeliverInterval;
private int _warnAfterNumberOfUnconfirmedAttempts;
private int _maxUnconfirmedMessages;

/// <summary>
/// Initializes a new instance of the <see cref="AtLeastOnceDeliverySemantic"/> class.
Expand All @@ -326,18 +328,37 @@ private RedeliveryTick() { }
public AtLeastOnceDeliverySemantic(IActorContext context, PersistenceSettings.AtLeastOnceDeliverySettings settings)
{
_context = context;
_settings = settings;
_deliverySequenceNr = 0;

RedeliverInterval = settings.RedeliverInterval;
RedeliveryBurstLimit = settings.RedeliveryBurstLimit;
WarnAfterNumberOfUnconfirmedAttempts = settings.WarnAfterNumberOfUnconfirmedAttempts;
MaxUnconfirmedMessages = settings.MaxUnconfirmedMessages;
}

/// <summary>
/// Interval between redelivery attempts.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.redeliver-interval'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key.
/// </summary>
public virtual TimeSpan RedeliverInterval => _settings.RedeliverInterval;
public TimeSpan RedeliverInterval
{
get => _redeliverInterval;
set
{
if (_redeliverInterval != value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this looks too surprising? I mean, using setter is usually considered to be simple operation with no side effect (except changing value type). The fact, that it resets schedule may be the source of problems, for example in debugging, in the future.

Copy link
Contributor Author

@ondrejpialek ondrejpialek May 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's here so that the semantic behaves "correctly", e.g. if you set a different interval it should send things at a different interval. I agree that having too much logic in setters is not great though (thought I thought it's acceptable and tried to limit the resets by checking for a change).

What do you propose? Have a method instead for changing this? Or have this read only and only accept the value via ctor? Or making this smarter by calculating the correct delay between the elapsed time and new interval?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK those configuration values should be defined exactly once - when an actor is created. I'd probably need verify it somehow, but we're having a state machines here, so changing this value in different actor "state" may result in different side effects - and I'm not sure if all of them are acceptable.

Simplest way I can think of ATM to make it break the actor would be to call RedeliverInterval = x in actor's PostStop method after calling base.PostStop(), which is cancelling all scheduled messages. Any call like this one made afterwards would trigger it to schedule the message back again, but never actually cancel.

Copy link
Contributor Author

@ondrejpialek ondrejpialek May 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would not happen the way it's currently written as it checks for _redeliverScheduleCancelable being active before restarting it.

if (_redeliverScheduleCancelable != null)
{
	_redeliverScheduleCancelable.Cancel();
	_redeliverScheduleCancelable = null;
	StartRedeliverTask();
}

But if you prefer not to have these mutable I can redo this PR in a different way - the values would be set via ctor only (at the cost of having to read the redelivery settings in two places (both AtLeastOnce actors that use this semantic).

The problem with that approach is that the AtLeastOnceDeliverySemantic is created in the actor's ctor and you should not access virtual fields inside a ctor, so the only way that comes in mind is specifying two ctors for each actor so that you can specify these parameters calling the right base ctor.

Copy link
Contributor Author

@ondrejpialek ondrejpialek May 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there is already a ctor on the actors that takes the redelivery settings. So perhaps we make the properties non-virtual in order not to confuse the developers (as overwriting has no effect currently) and we fix the docs to say to use the overloaded ctor instead of overwriting?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice, but the problem is backwards compatibility. I think, we can stay with your current approach. Any thoughts @Aaronontheweb ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ondrejpialek @Horusiath

Actually there is already a ctor on the actors that takes the redelivery settings. So perhaps we make the properties non-virtual in order not to confuse the developers (as overwriting has no effect currently) and we fix the docs to say to use the overloaded ctor instead of overwriting?

Since this is going into the v1.4 branch and we can afford to take some more liberties with the API there - if overriding the properties already has no effect, I don't see an issue with removing the virtual modifier on those properties and just having developers use the constructor overload instead. I think making these settings immutable and settable through the constructor is probably the best way to go, in order to avoid having those side effects during the processing of the state machine as @Horusiath mentioned.

Can you go ahead with those changes @ondrejpialek ?

{
_redeliverInterval = value;
if (_redeliverScheduleCancelable != null)
{
_redeliverScheduleCancelable.Cancel();
_redeliverScheduleCancelable = null;
StartRedeliverTask();
}
}
}
}

/// <summary>
/// Maximum number of unconfirmed messages that will be sent at each redelivery burst
Expand All @@ -346,31 +367,62 @@ public AtLeastOnceDeliverySemantic(IActorContext context, PersistenceSettings.At
/// this helps prevent an overwhelming amount of messages to be sent at once.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.redelivery-burst-limit'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key.
/// </summary>
public virtual int RedeliveryBurstLimit => _settings.RedeliveryBurstLimit;
public int RedeliveryBurstLimit { get; set; }

/// <summary>
/// After this number of delivery attempts a <see cref="UnconfirmedWarning" /> message will be sent to
/// <see cref="ActorBase.Self" />. The count is reset after restart.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key.
/// </summary>
public virtual int WarnAfterNumberOfUnconfirmedAttempts => _settings.WarnAfterNumberOfUnconfirmedAttempts;
public int WarnAfterNumberOfUnconfirmedAttempts
{
get => _warnAfterNumberOfUnconfirmedAttempts;
set
{
if (_warnAfterNumberOfUnconfirmedAttempts != value)
{
_warnAfterNumberOfUnconfirmedAttempts = value;

var deliveriesAboveThreshold = _unconfirmed.Where(u => u.Value.Attempt >= value)
.Select(u => new UnconfirmedDelivery(u.Key, u.Value.Destination, u.Value.Message))
.ToArray();

if (deliveriesAboveThreshold.Length > 0)
{
_context.Self.Tell(new UnconfirmedWarning(deliveriesAboveThreshold));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, the last thing I would expect from using (this.WarnAfterNumberOfUnconfirmedAttempts = 5) is that it may send messages to actors and allocate extra arrays.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again - this is to make the logic correct - if you start with 10, have 6 unconfirmed, then switch to 5 then you should receive the warning.

What do you think is best? Have this read only (and accept the value only via ctor?) or not trigger immediately but wait for next unconfirmed message?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @Horusiath - this should be immutable I think. Pick your strategy and stick with it at the time the actor starts.

}
}
}
}

/// <summary>
/// Maximum number of unconfirmed messages, that this actor is allowed to hold in the memory.
/// if this number is exceeded, <see cref="AtLeastOnceDeliverySemantic.Deliver" /> will not accept more
/// messages and it will throw <see cref="MaxUnconfirmedMessagesExceededException" />.
///
/// The default value can be configure with the 'akka.persistence.at-least-once-delivery.max-unconfirmed-messages'
/// configuration key. This method can be overridden by implementation classes to return
/// non-default values.
/// configuration key.
/// </summary>
public virtual int MaxUnconfirmedMessages => _settings.MaxUnconfirmedMessages;
public int MaxUnconfirmedMessages
{
get => _maxUnconfirmedMessages;
set
{
if (_maxUnconfirmedMessages != value)
{
_maxUnconfirmedMessages = value;
if (_unconfirmed.Count >= value)
{
throw new MaxUnconfirmedMessagesExceededException(
$"{_context.Self} has too many unconfirmed messages. Maximum allowed is {value}");
}
}
}
}

/// <summary>
/// Number of messages, that have not been confirmed yet.
Expand All @@ -380,8 +432,8 @@ public AtLeastOnceDeliverySemantic(IActorContext context, PersistenceSettings.At
private void StartRedeliverTask()
{
if (_redeliverScheduleCancelable != null) return;
var interval = new TimeSpan(RedeliverInterval.Ticks / 2);
_redeliverScheduleCancelable = _context.System.Scheduler.ScheduleTellRepeatedlyCancelable(interval, interval, _context.Self,
var delay = new TimeSpan(RedeliverInterval.Ticks / 2);
_redeliverScheduleCancelable = _context.System.Scheduler.ScheduleTellRepeatedlyCancelable(delay, RedeliverInterval, _context.Self,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this a bug? The first parameter is the initial delay of the very first callback. The second parameter is then the interval this will fire in, so this should be the whole RedeliverInterval, not half?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, must have been a bug. Nice catch.

RedeliveryTick.Instance, _context.Self);
}

Expand Down