forked from akkadotnet/akka.net
-
Notifications
You must be signed in to change notification settings - Fork 2
/
TestActor.cs
104 lines (90 loc) · 3.58 KB
/
TestActor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
//-----------------------------------------------------------------------
// <copyright file="TestActor.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using System.Collections.Immutable;
using System.Linq;
using Akka.Actor;
using Akka.Persistence.Journal;
namespace Akka.Persistence.TCK.Query
{
internal class TestActor : UntypedPersistentActor, IWithUnboundedStash
{
public static Props Props(string persistenceId) => Actor.Props.Create(() => new TestActor(persistenceId));
public sealed class DeleteCommand
{
public DeleteCommand(long toSequenceNr)
{
ToSequenceNr = toSequenceNr;
}
public long ToSequenceNr { get; }
}
public TestActor(string persistenceId)
{
PersistenceId = persistenceId;
}
public override string PersistenceId { get; }
protected override void OnRecover(object message)
{
}
protected override void OnCommand(object message)
{
switch (message)
{
case DeleteCommand delete:
DeleteMessages(delete.ToSequenceNr);
Become(WhileDeleting(Sender)); // need to wait for delete ACK to return
break;
case string cmd:
var sender = Sender;
Persist(cmd, e => sender.Tell($"{e}-done"));
break;
}
}
protected Receive WhileDeleting(IActorRef originalSender)
{
return message =>
{
switch (message)
{
case DeleteMessagesSuccess success:
originalSender.Tell($"{success.ToSequenceNr}-deleted");
Become(OnCommand);
Stash.UnstashAll();
break;
case DeleteMessagesFailure failure:
Log.Error(failure.Cause, "Failed to delete messages to sequence number [{0}].", failure.ToSequenceNr);
originalSender.Tell($"{failure.ToSequenceNr}-deleted-failed");
Become(OnCommand);
Stash.UnstashAll();
break;
default:
Stash.Stash();
break;
}
return true;
};
}
}
public class ColorFruitTagger : IWriteEventAdapter
{
public static IImmutableSet<string> Colors { get; } = ImmutableHashSet.Create("green", "black", "blue");
public static IImmutableSet<string> Fruits { get; } = ImmutableHashSet.Create("apple", "banana");
public string Manifest(object evt) => string.Empty;
public object ToJournal(object evt)
{
if (evt is string s)
{
var colorTags = Colors.Aggregate(ImmutableHashSet<string>.Empty, (acc, color) => s.Contains(color) ? acc.Add(color) : acc);
var fruitTags = Fruits.Aggregate(ImmutableHashSet<string>.Empty, (acc, color) => s.Contains(color) ? acc.Add(color) : acc);
var tags = colorTags.Union(fruitTags);
return tags.IsEmpty
? evt
: new Tagged(evt, tags);
}
return evt;
}
}
}