Skip to content

Commit

Permalink
Added support for UnrestrictedStash (#6325)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed Jan 18, 2023
1 parent d0d1369 commit 7b1090d
Show file tree
Hide file tree
Showing 20 changed files with 160 additions and 65 deletions.
20 changes: 15 additions & 5 deletions docs/articles/actors/receive-actor-api.md
Expand Up @@ -790,12 +790,15 @@ static void Main(string[] args)

## Stash

The `IWithUnboundedStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.BecomeStacked()` or `Context.UnbecomeStacked()`;, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. An actor that implements `IWithUnboundedStash` will automatically get a dequeue-based mailbox.
The `IWithStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.Become()` or `Context.Unbecome()`, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally.

Here is an example of the `IWithUnboundedStash` interface in action:
> [!NOTE]
> The interface `IWithStash` implements the marker interface `IRequiresMessageQueue<IDequeBasedMessageQueueSemantics>` which requests the system to automatically choose a deque-based mailbox implementation for the actor (defaults to an unbounded deque mailbox). If you want more control over the mailbox, see the documentation on mailboxes: [Mailboxes](xref:mailboxes).

Here is an example of the `IWithStash` interface in action:

```csharp
public class ActorWithProtocol : ReceiveActor, IWithUnboundedStash
public class ActorWithProtocol : ReceiveActor, IWithStash
{
public IStash Stash { get; set; }

Expand Down Expand Up @@ -827,11 +830,18 @@ public class ActorWithProtocol : ReceiveActor, IWithUnboundedStash
}
```

Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `Int`) of the mailbox's configuration.
Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `int`) of the mailbox's configuration.

Invoking `UnstashAll()` enqueues messages from the stash to the actor's mailbox until the capacity of the mailbox (if any) has been reached (note that messages from the stash are prepended to the mailbox). In case a bounded mailbox overflows, a `MessageQueueAppendFailedException` is thrown. The stash is guaranteed to be empty after calling `UnstashAll()`.

Note that the `stash` is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. The `IWithUnboundedStash` interface implementation of `PreRestart` will call `UnstashAll()`, which is usually the desired behavior.
Note that the stash is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property.

However, the `IWithStash` interface implementation of `PreRestart` will call `UnstashAll()`. This means that before the actor restarts, it will transfer all stashed messages back to the actors mailbox.

The result of this is that when an actor is restarted, any stashed messages will be delivered to the new incarnation of the actor. This is usually the desired behavior.

> [!NOTE]
> If you want to enforce that your actor can only work with an unbounded stash, then you should use the `IWithUnboundedStash` interface instead.

## Killing an Actor

Expand Down
20 changes: 15 additions & 5 deletions docs/articles/actors/untyped-actor-api.md
Expand Up @@ -708,12 +708,15 @@ static void Main(string[] args)

## Stash

The `IWithUnboundedStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.BecomeStacked()` or `Context.UnbecomeStacked()`;, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. An actor that implements `IWithUnboundedStash` will automatically get a dequeue-based mailbox.
The `IWithStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.Become()` or `Context.Unbecome()`, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally.

Here is an example of the `IWithUnboundedStash` interface in action:
> [!NOTE]
> The interface `IWithStash` implements the marker interface `IRequiresMessageQueue<IDequeBasedMessageQueueSemantics>` which requests the system to automatically choose a deque-based mailbox implementation for the actor (defaults to an unbounded deque mailbox). If you want more control over the mailbox, see the documentation on mailboxes: [Mailboxes](xref:mailboxes).
Here is an example of the `IWithStash` interface in action:

```csharp
public class ActorWithProtocol : UntypedActor, IWithUnboundedStash
public class ActorWithProtocol : UntypedActor, IWithStash
{
public IStash Stash { get; set; }

Expand Down Expand Up @@ -748,11 +751,18 @@ public class ActorWithProtocol : UntypedActor, IWithUnboundedStash
}
```

Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `Int`) of the mailbox's configuration.
Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `int`) of the mailbox's configuration.

Invoking `UnstashAll()` enqueues messages from the stash to the actor's mailbox until the capacity of the mailbox (if any) has been reached (note that messages from the stash are prepended to the mailbox). In case a bounded mailbox overflows, a `MessageQueueAppendFailedException` is thrown. The stash is guaranteed to be empty after calling `UnstashAll()`.

Note that the `stash` is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. The `IWithUnboundedStash` interface implementation of `PreRestart` will call `UnstashAll()`, which is usually the desired behavior.
Note that the stash is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property.

However, the `IWithStash` interface implementation of `PreRestart` will call `UnstashAll()`. This means that before the actor restarts, it will transfer all stashed messages back to the actor’s mailbox.

The result of this is that when an actor is restarted, any stashed messages will be delivered to the new incarnation of the actor. This is usually the desired behavior.

> [!NOTE]
> If you want to enforce that your actor can only work with an unbounded stash, then you should use the `IWithUnboundedStash` interface instead.
## Killing an Actor

Expand Down
1 change: 1 addition & 0 deletions docs/cSpell.json
Expand Up @@ -26,6 +26,7 @@
"CRDT",
"datacenter",
"denormalization",
"deque",
"deserialization",
"downstream",
"downstreams",
Expand Down
Expand Up @@ -1178,14 +1178,15 @@ namespace Akka.Actor
void Become(Akka.Actor.UntypedReceive receive);
void BecomeStacked(Akka.Actor.UntypedReceive receive);
}
[System.ObsoleteAttribute("Bounded stashing is not yet implemented. Unbounded stashing will be used instead " +
"[0.7.0]")]
public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics> { }
[System.ObsoleteAttribute("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")]
public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics> { }
public interface IWithStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IDequeBasedMessageQueueSemantics> { }
public interface IWithTimers
{
Akka.Actor.ITimerScheduler Timers { get; set; }
}
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWithUnrestrictedStash : Akka.Actor.IActorStash { }
public interface IWrappedMessage
{
object Message { get; }
Expand Down
Expand Up @@ -1180,14 +1180,15 @@ namespace Akka.Actor
void Become(Akka.Actor.UntypedReceive receive);
void BecomeStacked(Akka.Actor.UntypedReceive receive);
}
[System.ObsoleteAttribute("Bounded stashing is not yet implemented. Unbounded stashing will be used instead " +
"[0.7.0]")]
public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics> { }
[System.ObsoleteAttribute("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")]
public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics> { }
public interface IWithStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IDequeBasedMessageQueueSemantics> { }
public interface IWithTimers
{
Akka.Actor.ITimerScheduler Timers { get; set; }
}
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWithUnrestrictedStash : Akka.Actor.IActorStash { }
public interface IWrappedMessage
{
object Message { get; }
Expand Down
Expand Up @@ -1178,14 +1178,15 @@ namespace Akka.Actor
void Become(Akka.Actor.UntypedReceive receive);
void BecomeStacked(Akka.Actor.UntypedReceive receive);
}
[System.ObsoleteAttribute("Bounded stashing is not yet implemented. Unbounded stashing will be used instead " +
"[0.7.0]")]
public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics> { }
[System.ObsoleteAttribute("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")]
public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics> { }
public interface IWithStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IDequeBasedMessageQueueSemantics> { }
public interface IWithTimers
{
Akka.Actor.ITimerScheduler Timers { get; set; }
}
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWithUnrestrictedStash : Akka.Actor.IActorStash { }
public interface IWrappedMessage
{
object Message { get; }
Expand Down
Expand Up @@ -209,7 +209,7 @@ namespace Akka.Persistence
{
public static Akka.Persistence.DiscardToDeadLetterStrategy Instance { get; }
}
public abstract class Eventsourced : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>, Akka.Persistence.IPersistenceRecovery, Akka.Persistence.IPersistenceStash, Akka.Persistence.IPersistentIdentity
public abstract class Eventsourced : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>, Akka.Persistence.IPersistenceRecovery, Akka.Persistence.IPersistenceStash, Akka.Persistence.IPersistentIdentity
{
public static readonly System.Func<Akka.Actor.Envelope, bool> UnstashFilterPredicate;
protected Eventsourced() { }
Expand Down Expand Up @@ -270,7 +270,7 @@ namespace Akka.Persistence
{
Akka.Persistence.Recovery Recovery { get; }
}
public interface IPersistenceStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
public interface IPersistenceStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
{
Akka.Persistence.IStashOverflowStrategy InternalStashOverflowStrategy { get; }
}
Expand Down Expand Up @@ -844,7 +844,7 @@ namespace Akka.Persistence.Journal
protected System.Exception TryUnwrapException(System.Exception e) { }
protected abstract System.Threading.Tasks.Task<System.Collections.Immutable.IImmutableList<System.Exception>> WriteMessagesAsync(System.Collections.Generic.IEnumerable<Akka.Persistence.AtomicWrite> messages);
}
public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
{
protected AsyncWriteProxy() { }
public Akka.Actor.IStash Stash { get; set; }
Expand Down Expand Up @@ -980,7 +980,7 @@ namespace Akka.Persistence.Journal
public System.Collections.Generic.IDictionary<string, System.Collections.Generic.LinkedList<Akka.Persistence.IPersistentRepresentation>> Update(string pid, long seqNr, System.Func<Akka.Persistence.IPersistentRepresentation, Akka.Persistence.IPersistentRepresentation> updater) { }
protected override System.Threading.Tasks.Task<System.Collections.Immutable.IImmutableList<System.Exception>> WriteMessagesAsync(System.Collections.Generic.IEnumerable<Akka.Persistence.AtomicWrite> messages) { }
}
public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
{
public PersistencePluginProxy(Akka.Configuration.Config config) { }
public Akka.Actor.IStash Stash { get; set; }
Expand Down

0 comments on commit 7b1090d

Please sign in to comment.