-
Notifications
You must be signed in to change notification settings - Fork 19
/
RedisJournal.cs
153 lines (131 loc) · 6.19 KB
/
RedisJournal.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// -----------------------------------------------------------------------
// <copyright file="RedisJournal.cs" company="Petabridge, LLC">
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.Journal;
using Akka.Persistence.Redis.Query;
using Akka.Util.Internal;
using StackExchange.Redis;
namespace Akka.Persistence.Redis.Journal
{
public class RedisJournal : AsyncWriteJournal
{
protected static readonly RedisPersistence Extension = RedisPersistence.Get(Context.System);
private readonly HashSet<IActorRef> _newEventsSubscriber = new HashSet<IActorRef>();
private readonly RedisSettings _settings;
private readonly JournalHelper _journalHelper;
private readonly Lazy<IDatabase> _database;
private readonly ActorSystem _system;
public IDatabase Database => _database.Value;
public bool IsClustered { get; private set; }
protected bool HasNewEventSubscribers => _newEventsSubscriber.Count != 0;
public RedisJournal(Config journalConfig)
{
_settings = RedisSettings.Create(journalConfig.WithFallback(Extension.DefaultJournalConfig));
_journalHelper = new JournalHelper(Context.System, _settings.KeyPrefix);
_system = Context.System;
_database = new Lazy<IDatabase>(() =>
{
var redisConnection = ConnectionMultiplexer.Connect(_settings.ConfigurationString);
IsClustered = redisConnection.IsClustered();
if (_settings.DatabaseFromConnectionString && !IsClustered)
{
var conf = ConfigurationOptions.Parse(_settings.ConfigurationString);
if (conf.DefaultDatabase.HasValue)
return redisConnection.GetDatabase(conf.DefaultDatabase.Value);
}
// for Redis Cluster, the database is 0 https://redis.io/topics/cluster-spec#implemented-subset
if (IsClustered)
return redisConnection.GetDatabase(0);
return redisConnection.GetDatabase(_settings.Database);
});
}
protected override bool ReceivePluginInternal(object message)
{
switch (message)
{
case SubscribeNewEvents _:
_newEventsSubscriber.Add(Sender);
Context.Watch(Sender);
return true;
}
return false;
}
public override async Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr)
{
var highestSequenceNr =
await Database.StringGetAsync(_journalHelper.GetHighestSequenceNrKey(persistenceId, IsClustered));
return highestSequenceNr.IsNull ? 0L : (long) highestSequenceNr;
}
public override async Task ReplayMessagesAsync(
IActorContext context,
string persistenceId,
long fromSequenceNr,
long toSequenceNr,
long max,
Action<IPersistentRepresentation> recoveryCallback)
{
var journals = await Database.SortedSetRangeByScoreAsync(
_journalHelper.GetJournalKey(persistenceId, IsClustered),
fromSequenceNr,
toSequenceNr,
skip: 0L,
take: max);
foreach (var journal in journals)
recoveryCallback(_journalHelper.PersistentFromBytes(journal));
}
protected override async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr)
{
await Database.SortedSetRemoveRangeByScoreAsync(
_journalHelper.GetJournalKey(persistenceId, IsClustered),
-1,
toSequenceNr);
}
protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages)
{
var writeTasks = messages.Select(WriteBatchAsync).ToArray();
var result = await Task<IImmutableList<Exception>>
.Factory
.ContinueWhenAll(
writeTasks,
tasks => tasks.Select(t => t.IsFaulted ? TryUnwrapException(t.Exception) : null)
.ToImmutableList());
if (HasNewEventSubscribers)
foreach (var subscriber in _newEventsSubscriber)
subscriber.Tell(NewEventAppended.Instance);
return result;
}
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
private async Task WriteBatchAsync(AtomicWrite aw)
{
var eventList = new List<SortedSetEntry>();
var payloads = aw.Payload.AsInstanceOf<IImmutableList<IPersistentRepresentation>>();
foreach (var payload in payloads)
{
var bytes = _journalHelper.PersistentToBytes(payload.WithTimestamp(DateTime.UtcNow.Ticks));
// save the payload
eventList.Add(new SortedSetEntry(bytes, payload.SequenceNr));
}
var transaction = Database.CreateTransaction();
transaction.SortedSetAddAsync(
_journalHelper.GetJournalKey(aw.PersistenceId, IsClustered),
eventList.ToArray());
// set highest sequence number key
transaction.StringSetAsync(
_journalHelper.GetHighestSequenceNrKey(aw.PersistenceId, IsClustered),
aw.HighestSequenceNr);
if (!await transaction.ExecuteAsync())
throw new Exception(
$"{nameof(WriteMessagesAsync)}: failed to write {nameof(IPersistentRepresentation)} to redis");
}
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}
}