-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #859 from aprooks/sample/guaranteedDelivery
Guaranteed delivery sample
- Loading branch information
Showing
3 changed files
with
184 additions
and
2 deletions.
There are no files selected for viewing
151 changes: 151 additions & 0 deletions
151
src/examples/PersistenceExample/GuaranteedDeliveryExampleActor.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Text; | ||
|
||
using Akka.Persistence; | ||
using Akka.Actor; | ||
|
||
namespace PersistenceExample | ||
{ | ||
public class Message | ||
{ | ||
public Message(string data) | ||
{ | ||
this.Data = data; | ||
} | ||
|
||
public string Data { get; private set; } | ||
} | ||
|
||
public class Confirmable | ||
{ | ||
public Confirmable(long deliveryId, string data) | ||
{ | ||
this.DeliveryId = deliveryId; | ||
this.Data = data; | ||
} | ||
|
||
|
||
public long DeliveryId { get; private set; } | ||
|
||
public string Data { get; private set; } | ||
} | ||
public class Confirmation | ||
{ | ||
public Confirmation(long deliveryId) | ||
{ | ||
this.DeliveryId = deliveryId; | ||
} | ||
|
||
public long DeliveryId { get; private set; } | ||
} | ||
[Serializable] | ||
public class Snap | ||
{ | ||
public Snap(GuaranteedDeliverySnapshot snapshot) | ||
{ | ||
this.Snapshot = snapshot; | ||
} | ||
|
||
public GuaranteedDeliverySnapshot Snapshot { get; private set; } | ||
} | ||
|
||
public class DeliveryActor : UntypedActor | ||
{ | ||
bool Confirming = true; | ||
|
||
protected override void OnReceive(object message) | ||
{ | ||
if (message == "start") | ||
{ | ||
Confirming = true; | ||
} | ||
if (message == "stop") | ||
{ | ||
Confirming = false; | ||
} | ||
if (message is Confirmable) | ||
{ | ||
var msg = message as Confirmable; | ||
if (Confirming) | ||
{ | ||
Console.WriteLine("Confirming delivery of message id: {0} and data: {1}", msg.DeliveryId, msg.Data); | ||
Context.Sender.Tell(new Confirmation(msg.DeliveryId)); | ||
} | ||
else | ||
{ | ||
Console.WriteLine("Ignoring message id: {0} and data: {1}", msg.DeliveryId, msg.Data); | ||
} | ||
} | ||
} | ||
} | ||
/// <summary> | ||
/// GuaranteedDelivery will repeat sending messages, unless confirmed by deliveryId | ||
/// | ||
/// By default, in-memory Journal is used, so this won't survive system restarts. | ||
/// </summary> | ||
public class GuaranteedDeliveryExampleActor : GuaranteedDeliveryActor | ||
{ | ||
public ActorPath DeliveryPath { get; private set; } | ||
|
||
public GuaranteedDeliveryExampleActor(ActorPath deliveryPath) | ||
{ | ||
this.DeliveryPath = deliveryPath; | ||
} | ||
|
||
public override string PersistenceId | ||
{ | ||
get { return "guaranteed-1"; } | ||
} | ||
|
||
protected override bool ReceiveRecover(object message) | ||
{ | ||
if (message is Message) | ||
{ | ||
var messageData = ((Message)message).Data; | ||
Console.WriteLine("recovered {0}",messageData); | ||
Deliver(DeliveryPath, | ||
id => | ||
{ | ||
Console.WriteLine("recovered delivery task: {0}, with deliveryId: {1}", messageData, id); | ||
return new Confirmable(id, messageData); | ||
}); | ||
|
||
} | ||
else if (message is Confirmation) | ||
{ | ||
var deliveryId = ((Confirmation)message).DeliveryId; | ||
Console.WriteLine("recovered confirmation of {0}", deliveryId); | ||
ConfirmDelivery(deliveryId); | ||
} | ||
else | ||
return false; | ||
return true; | ||
} | ||
|
||
protected override bool ReceiveCommand(object message) | ||
{ | ||
if (message == "boom") | ||
throw new Exception("Controlled devastation"); | ||
else if (message is Message) | ||
{ | ||
Persist(message as Message, m => | ||
{ | ||
Deliver(DeliveryPath, | ||
id => | ||
{ | ||
Console.WriteLine("sending: {0}, with deliveryId: {1}", m.Data, id); | ||
return new Confirmable(id, m.Data); | ||
}); | ||
}); | ||
} | ||
else if (message is Confirmation) | ||
{ | ||
Persist(message as Confirmation, m => ConfirmDelivery(m.DeliveryId)); | ||
} | ||
else return false; | ||
return true; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters