-
Notifications
You must be signed in to change notification settings - Fork 1k
/
Replicator.cs
1584 lines (1402 loc) · 72 KB
/
Replicator.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//-----------------------------------------------------------------------
// <copyright file="Replicator.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using Akka.Actor;
using Akka.DistributedData.Internal;
using Akka.Serialization;
using Akka.Util;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Security.Cryptography;
using Akka.Cluster;
using Akka.DistributedData.Durable;
using Akka.Event;
using Google.Protobuf;
using Gossip = Akka.DistributedData.Internal.Gossip;
using Status = Akka.DistributedData.Internal.Status;
namespace Akka.DistributedData
{
using Digest = ByteString;
/// <summary>
/// <para>
/// A replicated in-memory data store supporting low latency and high availability
/// requirements.
///
/// The <see cref="Replicator"/> actor takes care of direct replication and gossip based
/// dissemination of Conflict Free Replicated Data Types (CRDTs) to replicas in the
/// the cluster.
/// The data types must be convergent CRDTs and implement <see cref="IReplicatedData{T}"/>, i.e.
/// they provide a monotonic merge function and the state changes always converge.
///
/// You can use your own custom <see cref="IReplicatedData{T}"/> or <see cref="IDeltaReplicatedData{T,TDelta}"/> types,
/// and several types are provided by this package, such as:
/// </para>
/// <list type="bullet">
/// <item>
/// <term>Counters</term>
/// <description><see cref="GCounter"/>, <see cref="PNCounter"/></description>
/// </item>
/// <item>
/// <term>Registers</term>
/// <description><see cref="LWWRegister{T}"/>, <see cref="Flag"/></description>
/// </item>
/// <item>
/// <term>Sets</term>
/// <description><see cref="GSet{T}"/>, <see cref="ORSet{T}"/></description>
/// </item>
/// <item>
/// <term>Maps</term>
/// <description><see cref="ORDictionary{TKey,TValue}"/>, <see cref="ORMultiValueDictionary{TKey,TValue}"/>, <see cref="LWWDictionary{TKey,TValue}"/>, <see cref="PNCounterDictionary{TKey}"/></description>
/// </item>
/// </list>
/// <para>
/// For good introduction to the CRDT subject watch the
/// <a href="http://www.ustream.tv/recorded/61448875">The Final Causal Frontier</a>
/// and <a href="http://vimeo.com/43903960">Eventually Consistent Data Structures</a>
/// talk by Sean Cribbs and and the
/// <a href="http://research.microsoft.com/apps/video/dl.aspx?id=153540">talk by Mark Shapiro</a>
/// and read the excellent paper <a href="http://hal.upmc.fr/docs/00/55/55/88/PDF/techreport.pdf">
/// A comprehensive study of Convergent and Commutative Replicated Data Types</a>
/// by Mark Shapiro et. al.
/// </para>
/// <para>
/// The <see cref="Replicator"/> actor must be started on each node in the cluster, or group of
/// nodes tagged with a specific role. It communicates with other <see cref="Replicator"/> instances
/// with the same path (without address) that are running on other nodes . For convenience it
/// can be used with the <see cref="DistributedData"/> extension but it can also be started as an ordinary
/// actor using the <see cref="Props"/>. If it is started as an ordinary actor it is important
/// that it is given the same name, started on same path, on all nodes.
/// </para>
/// <para>
/// <a href="paper http://arxiv.org/abs/1603.01529">Delta State Replicated Data Types</a>
/// is supported. delta-CRDT is a way to reduce the need for sending the full state
/// for updates. For example adding element 'c' and 'd' to set {'a', 'b'} would
/// result in sending the delta {'c', 'd'} and merge that with the state on the
/// receiving side, resulting in set {'a', 'b', 'c', 'd'}.
/// </para>
/// <para>
/// The protocol for replicating the deltas supports causal consistency if the data type
/// is marked with <see cref="IRequireCausualDeliveryOfDeltas"/>. Otherwise it is only eventually
/// consistent. Without causal consistency it means that if elements 'c' and 'd' are
/// added in two separate <see cref="Update"/> operations these deltas may occasionally be propagated
/// to nodes in different order than the causal order of the updates. For this example it
/// can result in that set {'a', 'b', 'd'} can be seen before element 'c' is seen. Eventually
/// it will be {'a', 'b', 'c', 'd'}.
/// </para>
/// <para>
/// == Update ==
///
/// To modify and replicate a <see cref="IReplicatedData{T}"/> value you send a <see cref="Update"/> message
/// to the local <see cref="Replicator"/>.
/// The current data value for the `key` of the <see cref="Update"/> is passed as parameter to the `modify`
/// function of the <see cref="Update"/>. The function is supposed to return the new value of the data, which
/// will then be replicated according to the given consistency level.
///
/// The `modify` function is called by the `Replicator` actor and must therefore be a pure
/// function that only uses the data parameter and stable fields from enclosing scope. It must
/// for example not access `sender()` reference of an enclosing actor.
///
/// <see cref="Update"/> is intended to only be sent from an actor running in same local `ActorSystem` as
/// the <see cref="Replicator"/>, because the `modify` function is typically not serializable.
///
/// You supply a write consistency level which has the following meaning:
/// <list type="bullet">
/// <item>
/// <term><see cref="WriteLocal"/></term>
/// <description>
/// The value will immediately only be written to the local replica, and later disseminated with gossip.
/// </description>
/// </item>
/// <item>
/// <term><see cref="WriteTo"/></term>
/// <description>
/// The value will immediately be written to at least <see cref="WriteTo.Count"/> replicas, including the local replica.
/// </description>
/// </item>
/// <item>
/// <term><see cref="WriteMajority"/></term>
/// <description>
/// The value will immediately be written to a majority of replicas, i.e. at least `N/2 + 1` replicas,
/// where N is the number of nodes in the cluster (or cluster role group).
/// </description>
/// </item>
/// <item>
/// <term><see cref="WriteAll"/></term>
/// <description>
/// The value will immediately be written to all nodes in the cluster (or all nodes in the cluster role group).
/// </description>
/// </item>
/// </list>
///
/// As reply of the <see cref="Update"/> a <see cref="UpdateSuccess"/> is sent to the sender of the
/// <see cref="Update"/> if the value was successfully replicated according to the supplied consistency
/// level within the supplied timeout. Otherwise a <see cref="IUpdateFailure"/> subclass is
/// sent back. Note that a <see cref="UpdateTimeout"/> reply does not mean that the update completely failed
/// or was rolled back. It may still have been replicated to some nodes, and will eventually
/// be replicated to all nodes with the gossip protocol.
///
/// You will always see your own writes. For example if you send two <see cref="Update"/> messages
/// changing the value of the same `key`, the `modify` function of the second message will
/// see the change that was performed by the first <see cref="Update"/> message.
///
/// In the <see cref="Update"/> message you can pass an optional request context, which the <see cref="Replicator"/>
/// does not care about, but is included in the reply messages. This is a convenient
/// way to pass contextual information (e.g. original sender) without having to use <see cref="Ask"/>
/// or local correlation data structures.
/// </para>
/// <para>
/// == Get ==
///
/// To retrieve the current value of a data you send <see cref="Get"/> message to the
/// <see cref="Replicator"/>. You supply a consistency level which has the following meaning:
/// <list type="bullet">
/// <item>
/// <term><see cref="ReadLocal"/></term>
/// <description>The value will only be read from the local replica.</description>
/// </item>
/// <item>
/// <term><see cref="ReadFrom"/></term>
/// <description>The value will be read and merged from <see cref="ReadFrom.N"/> replicas, including the local replica.</description>
/// </item>
/// <item>
/// <term><see cref="ReadMajority"/></term>
/// <description>
/// The value will be read and merged from a majority of replicas, i.e. at least `N/2 + 1` replicas, where N is the number of nodes in the cluster (or cluster role group).
/// </description>
/// </item>
/// <item>
/// <term><see cref="ReadAll"/></term>
/// <description>The value will be read and merged from all nodes in the cluster (or all nodes in the cluster role group).</description>
/// </item>
/// </list>
///
/// As reply of the <see cref="Get"/> a <see cref="GetSuccess"/> is sent to the sender of the
/// <see cref="Get"/> if the value was successfully retrieved according to the supplied consistency
/// level within the supplied timeout. Otherwise a <see cref="GetFailure"/> is sent.
/// If the key does not exist the reply will be <see cref="NotFound"/>.
///
/// You will always read your own writes. For example if you send a <see cref="Update"/> message
/// followed by a <see cref="Get"/> of the same `key` the <see cref="Get"/> will retrieve the change that was
/// performed by the preceding <see cref="Update"/> message. However, the order of the reply messages are
/// not defined, i.e. in the previous example you may receive the <see cref="GetSuccess"/> before
/// the <see cref="UpdateSuccess"/>.
///
/// In the <see cref="Get"/> message you can pass an optional request context in the same way as for the
/// <see cref="Update"/> message, described above. For example the original sender can be passed and replied
/// to after receiving and transforming <see cref="GetSuccess"/>.
/// </para>
/// <para>
/// == Subscribe ==
///
/// You may also register interest in change notifications by sending <see cref="Subscribe"/>
/// message to the <see cref="Replicator"/>. It will send <see cref="Changed"/> messages to the registered
/// subscriber when the data for the subscribed key is updated. Subscribers will be notified
/// periodically with the configured `notify-subscribers-interval`, and it is also possible to
/// send an explicit <see cref="FlushChanges"/> message to the <see cref="Replicator"/> to notify the subscribers
/// immediately.
///
/// The subscriber is automatically removed if the subscriber is terminated. A subscriber can
/// also be deregistered with the <see cref="Unsubscribe"/> message.
/// </para>
/// <para>
/// == Delete ==
///
/// A data entry can be deleted by sending a <see cref="Delete"/> message to the local
/// local <see cref="Replicator"/>. As reply of the <see cref="Delete"/> a <see cref="DeleteSuccess"/> is sent to
/// the sender of the <see cref="Delete"/> if the value was successfully deleted according to the supplied
/// consistency level within the supplied timeout. Otherwise a <see cref="ReplicationDeleteFailure"/>
/// is sent. Note that <see cref="ReplicationDeleteFailure"/> does not mean that the delete completely failed or
/// was rolled back. It may still have been replicated to some nodes, and may eventually be replicated
/// to all nodes.
///
/// A deleted key cannot be reused again, but it is still recommended to delete unused
/// data entries because that reduces the replication overhead when new nodes join the cluster.
/// Subsequent <see cref="Delete"/>, <see cref="Update"/> and <see cref="Get"/> requests will be replied with <see cref="DataDeleted"/>.
/// Subscribers will receive <see cref="Deleted"/>.
///
/// In the <see cref="Delete"/> message you can pass an optional request context in the same way as for the
/// <see cref="Update"/> message, described above. For example the original sender can be passed and replied
/// to after receiving and transforming <see cref="DeleteSuccess"/>.
/// </para>
/// <para>
/// == CRDT Garbage ==
///
/// One thing that can be problematic with CRDTs is that some data types accumulate history (garbage).
/// For example a <see cref="GCounter"/> keeps track of one counter per node. If a <see cref="GCounter"/> has been updated
/// from one node it will associate the identifier of that node forever. That can become a problem
/// for long running systems with many cluster nodes being added and removed. To solve this problem
/// the <see cref="Replicator"/> performs pruning of data associated with nodes that have been removed from the
/// cluster. Data types that need pruning have to implement <see cref="IRemovedNodePruning{T}"/>. The pruning consists
/// of several steps:
/// <list type="">
/// <item>When a node is removed from the cluster it is first important that all updates that were
/// done by that node are disseminated to all other nodes. The pruning will not start before the
/// <see cref="ReplicatorSettings.MaxPruningDissemination"/> duration has elapsed. The time measurement is stopped when any
/// replica is unreachable, but it's still recommended to configure this with certain margin.
/// It should be in the magnitude of minutes.</item>
/// <item>The nodes are ordered by their address and the node ordered first is called leader.
/// The leader initiates the pruning by adding a <see cref="PruningInitialized"/> marker in the data envelope.
/// This is gossiped to all other nodes and they mark it as seen when they receive it.</item>
/// <item>When the leader sees that all other nodes have seen the <see cref="PruningInitialized"/> marker
/// the leader performs the pruning and changes the marker to <see cref="PruningPerformed"/> so that nobody
/// else will redo the pruning. The data envelope with this pruning state is a CRDT itself.
/// The pruning is typically performed by "moving" the part of the data associated with
/// the removed node to the leader node. For example, a <see cref="GCounter"/> is a `Map` with the node as key
/// and the counts done by that node as value. When pruning the value of the removed node is
/// moved to the entry owned by the leader node. See <see cref="IRemovedNodePruning{T}.Prune"/>.</item>
/// <item>Thereafter the data is always cleared from parts associated with the removed node so that
/// it does not come back when merging. See <see cref="IRemovedNodePruning{T}.PruningCleanup"/></item>
/// <item>After another `maxPruningDissemination` duration after pruning the last entry from the
/// removed node the <see cref="PruningPerformed"/> markers in the data envelope are collapsed into a
/// single tombstone entry, for efficiency. Clients may continue to use old data and therefore
/// all data are always cleared from parts associated with tombstoned nodes. </item>
/// </list>
/// </para>
/// </summary>
internal sealed class Replicator : UntypedActor, IWithUnboundedStash
{
public static Props Props(ReplicatorSettings settings) =>
Actor.Props.Create(() => new Replicator(settings)).WithDeploy(Deploy.Local).WithDispatcher(settings.Dispatcher);
private static readonly Digest DeletedDigest = ByteString.Empty;
private static readonly Digest LazyDigest = ByteString.CopyFrom(new byte[] { 0 });
private static readonly Digest NotFoundDigest = ByteString.CopyFrom(new byte[] { 255 });
private static readonly DataEnvelope DeletedEnvelope = new DataEnvelope(DeletedData.Instance);
private readonly ReplicatorSettings _settings;
private readonly Cluster.Cluster _cluster;
private readonly Address _selfAddress;
private readonly UniqueAddress _selfUniqueAddress;
private readonly ICancelable _gossipTask;
private readonly ICancelable _notifyTask;
private readonly ICancelable _pruningTask;
private readonly ICancelable _clockTask;
private readonly Serializer _serializer;
private readonly long _maxPruningDisseminationNanos;
/// <summary>
/// Cluster nodes, doesn't contain selfAddress.
/// </summary>
private ImmutableSortedSet<Address> _nodes = ImmutableSortedSet<Address>.Empty;
// cluster members sorted by age, oldest first,, doesn't contain selfAddress, doesn't contain joining and weaklyUp
// only used when prefer-oldest is enabled
private ImmutableSortedSet<Member> _membersByAge = ImmutableSortedSet<Member>.Empty.WithComparer(Member.AgeOrdering);
private ImmutableSortedSet<Address> AllNodes => _nodes.Union(_weaklyUpNodes);
/// <summary>
/// Cluster weaklyUp nodes, doesn't contain joining and not selfAddress
/// </summary>
private ImmutableSortedSet<Address> _weaklyUpNodes = ImmutableSortedSet<Address>.Empty;
/// <summary>
/// cluster joining nodes, doesn't contain selfAddress
/// </summary>
private ImmutableSortedSet<Address> _joiningNodes = ImmutableSortedSet<Address>.Empty;
/// <summary>
/// cluster exiting nodes, doesn't contain selfAddress
/// </summary>
private ImmutableSortedSet<Address> _exitingNodes = ImmutableSortedSet<Address>.Empty;
private ImmutableDictionary<UniqueAddress, long> _removedNodes = ImmutableDictionary<UniqueAddress, long>.Empty;
private ImmutableDictionary<UniqueAddress, long> _pruningPerformed = ImmutableDictionary<UniqueAddress, long>.Empty;
private ImmutableHashSet<UniqueAddress> _tombstonedNodes = ImmutableHashSet<UniqueAddress>.Empty;
/// <summary>
/// All nodes sorted with the leader first
/// </summary>
private ImmutableSortedSet<Member> _leader = ImmutableSortedSet<Member>.Empty.WithComparer(Member.LeaderStatusOrdering);
private bool IsLeader => !_leader.IsEmpty && _leader.First().Address == _selfAddress;
private bool IsKnownNode(Address node) => _nodes.Contains(node) || _weaklyUpNodes.Contains(node) ||
_joiningNodes.Contains(node) || _selfAddress == node;
/// <summary>
/// For pruning timeouts are based on clock that is only increased when all nodes are reachable.
/// </summary>
private long _previousClockTime;
private long _allReachableClockTime = 0;
private ImmutableHashSet<Address> _unreachable = ImmutableHashSet<Address>.Empty;
/// <summary>
/// The actual data.
/// </summary>
private ImmutableDictionary<string, (DataEnvelope envelope, Digest digest)> _dataEntries = ImmutableDictionary<string, (DataEnvelope, Digest)>.Empty;
/// <summary>
/// Keys that have changed, Changed event published to subscribers on FlushChanges
/// </summary>
private ImmutableHashSet<string> _changed = ImmutableHashSet<string>.Empty;
/// <summary>
/// For splitting up gossip in chunks.
/// </summary>
private long _statusCount = 0;
private int _statusTotChunks = 0;
private readonly Dictionary<string, HashSet<IActorRef>> _subscribers = new Dictionary<string, HashSet<IActorRef>>();
private readonly Dictionary<string, HashSet<IActorRef>> _newSubscribers = new Dictionary<string, HashSet<IActorRef>>();
private ImmutableDictionary<string, IKey> _subscriptionKeys = ImmutableDictionary<string, IKey>.Empty;
private readonly ILoggingAdapter _log;
private readonly bool _hasDurableKeys;
private readonly IImmutableSet<string> _durableKeys;
private readonly IImmutableSet<string> _durableWildcards;
private readonly IActorRef _durableStore;
private readonly DeltaPropagationSelector _deltaPropagationSelector;
private readonly ICancelable _deltaPropagationTask;
private readonly int _maxDeltaSize;
private int _count;
private DateTime _startTime;
public Replicator(ReplicatorSettings settings)
{
_settings = settings;
_cluster = Cluster.Cluster.Get(Context.System);
_selfAddress = _cluster.SelfAddress;
_selfUniqueAddress = _cluster.SelfUniqueAddress;
_log = Context.GetLogger();
_maxDeltaSize = settings.MaxDeltaSize;
if (_cluster.IsTerminated) throw new ArgumentException("Cluster node must not be terminated");
if (!string.IsNullOrEmpty(_settings.Role) && !_cluster.SelfRoles.Contains(_settings.Role))
throw new ArgumentException($"The cluster node {_selfAddress} does not have the role {_settings.Role}");
_gossipTask = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(_settings.GossipInterval, _settings.GossipInterval, Self, GossipTick.Instance, Self);
_notifyTask = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(_settings.NotifySubscribersInterval, _settings.NotifySubscribersInterval, Self, FlushChanges.Instance, Self);
_pruningTask = _settings.PruningInterval != TimeSpan.Zero
? Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(_settings.PruningInterval, _settings.PruningInterval, Self, RemovedNodePruningTick.Instance, Self)
: null;
_clockTask = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(_settings.GossipInterval, _settings.GossipInterval, Self, ClockTick.Instance, Self);
_serializer = Context.System.Serialization.FindSerializerForType(typeof(DataEnvelope));
_maxPruningDisseminationNanos = _settings.MaxPruningDissemination.Ticks * 100;
_previousClockTime = DateTime.UtcNow.Ticks * 100;
_hasDurableKeys = settings.DurableKeys.Count > 0;
var durableKeysBuilder = ImmutableHashSet<string>.Empty.ToBuilder();
var durableWildcardsBuilder = ImmutableHashSet<string>.Empty.ToBuilder();
foreach (var key in settings.DurableKeys)
{
if (key.EndsWith("*"))
durableWildcardsBuilder.Add(key.Substring(0, key.Length - 1));
else
durableKeysBuilder.Add(key);
}
_durableKeys = durableKeysBuilder.ToImmutable();
_durableWildcards = durableWildcardsBuilder.ToImmutable();
_durableStore = _hasDurableKeys
? Context.Watch(Context.ActorOf(_settings.DurableStoreProps.WithDeploy(Deploy.Local), "durableStore"))
: Context.System.DeadLetters;
_deltaPropagationSelector = new ReplicatorDeltaPropagationSelector(this);
// Derive the deltaPropagationInterval from the gossipInterval.
// Normally the delta is propagated to all nodes within the gossip tick, so that
// full state gossip is not needed.
var deltaPropagationInterval = new TimeSpan(Math.Max(
(_settings.GossipInterval.Ticks / _deltaPropagationSelector.GossipInternalDivisor),
TimeSpan.TicksPerMillisecond * 200));
_deltaPropagationTask = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(deltaPropagationInterval, deltaPropagationInterval, Self, DeltaPropagationTick.Instance, Self);
if (_hasDurableKeys)
{
_count = 0;
_startTime = DateTime.UtcNow;
Become(Load);
}
else Become(NormalReceive);
}
private IImmutableList<Address> NodesForReadWrite(bool excludeExiting)
{
if (excludeExiting && !_exitingNodes.IsEmpty)
{
if (_settings.PreferOldest)
return _membersByAge.Where(i => !_exitingNodes.Contains(i.Address)).Select(i => i.Address).ToImmutableList();
else
return _nodes.Except(_exitingNodes).ToImmutableList();
}
else
{
if (_settings.PreferOldest)
return _membersByAge.Select(i => i.Address).ToImmutableList();
else
return _nodes.ToImmutableList();
}
}
protected override void PreStart()
{
if (_hasDurableKeys) _durableStore.Tell(LoadAll.Instance);
// not using LeaderChanged/RoleLeaderChanged because here we need one node independent of data center
_cluster.Subscribe(Self, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents,
typeof(ClusterEvent.IMemberEvent), typeof(ClusterEvent.IReachabilityEvent));
}
protected override void PostStop()
{
_cluster.Unsubscribe(Self);
_gossipTask.Cancel();
_deltaPropagationTask.Cancel();
_notifyTask.Cancel();
_pruningTask?.Cancel();
_clockTask.Cancel();
}
protected override SupervisorStrategy SupervisorStrategy() => new OneForOneStrategy(e =>
{
var fromDurableStore = Equals(Sender, _durableStore) && !Equals(Sender, Context.System.DeadLetters);
if ((e is LoadFailedException || e is ActorInitializationException) && fromDurableStore)
{
_log.Error(e,
"Stopping distributed-data Replicator due to load or startup failure in durable store, caused by: {0}",
e.Message);
Context.Stop(Self);
return Directive.Stop;
}
else return Actor.SupervisorStrategy.DefaultDecider.Decide(e);
});
private bool Load(object message)
{
switch (message)
{
case LoadData load:
_count += load.Data.Count;
foreach (var entry in load.Data)
{
var envelope = entry.Value.DataEnvelope;
var newEnvelope = Write(entry.Key, envelope);
if (!ReferenceEquals(newEnvelope, envelope))
{
_durableStore.Tell(new Store(entry.Key, new DurableDataEnvelope(newEnvelope), null));
}
}
return true;
case LoadAllCompleted _:
_log.Debug("Loading {0} entries from durable store took {1} ms", _count,
(DateTime.UtcNow - _startTime).TotalMilliseconds);
Become(NormalReceive);
Stash.UnstashAll();
Self.Tell(FlushChanges.Instance);
return true;
case GetReplicaCount _:
// 0 until durable data has been loaded, used by test
Sender.Tell(new ReplicaCount(0));
return true;
// ignore scheduled ticks when loading durable data
case RemovedNodePruningTick _:
case FlushChanges _:
case GossipTick _:
// Ignored
return true;
// ignore gossip and replication when loading durable data
case Read _:
case Write _:
case Status _:
case Gossip _:
_log.Debug("ignoring message [{0}] when loading durable data", message.GetType());
return true;
case ClusterEvent.IClusterDomainEvent msg:
return NormalReceive(msg);
default:
Stash.Stash();
return true;
}
}
protected override void OnReceive(object message)
{
throw new NotImplementedException();
}
private bool NormalReceive(object message)
{
switch (message)
{
case IDestinationSystemUid msg:
if (msg.ToSystemUid.HasValue && msg.ToSystemUid != _selfUniqueAddress.Uid)
{
// When restarting a node with same host:port it is possible that a Replicator on another node
// is sending messages to the restarted node even if it hasn't joined the same cluster.
// Therefore we check that the message was intended for this incarnation and otherwise
// it is discarded.
_log.Info("Ignoring message [{0}] from [{1}] intended for system uid [{2}], self uid is [{3}]",
Logging.SimpleName(msg),
Sender,
msg.ToSystemUid,
_selfUniqueAddress.Uid);
}
else
{
switch (msg)
{
case Status s:
ReceiveStatus(s.Digests, s.Chunk, s.TotalChunks); return true;
case Gossip g:
ReceiveGossip(g.UpdatedData, g.SendBack); return true;
}
}
return true;
case ISendingSystemUid msg:
if (msg.FromNode != null && !IsKnownNode(msg.FromNode.Address))
{
// When restarting a node with same host:port it is possible that a Replicator on another node
// is sending messages to the restarted node even if it hasn't joined the same cluster.
// Therefore we check that the message was from a known cluster member
_log.Info("Ignoring message [{0}] from [{1}] unknown node [{2}]", Logging.SimpleName(msg), Sender, msg.FromNode);
}
else
{
switch (msg)
{
case Read r: ReceiveRead(r.Key); return true;
case Write w: ReceiveWrite(w.Key, w.Envelope); return true;
case DeltaPropagation d: ReceiveDeltaPropagation(msg.FromNode, d.ShouldReply, d.Deltas); return true;
}
}
return true;
case Get g: ReceiveGet(g.Key, g.Consistency, g.Request); return true;
case Update msg: ReceiveUpdate(msg.Key, msg.Modify, msg.Consistency, msg.Request); return true;
case ReadRepair rr: ReceiveReadRepair(rr.Key, rr.Envelope); return true;
case FlushChanges _: ReceiveFlushChanges(); return true;
case DeltaPropagationTick _: ReceiveDeltaPropagationTick(); return true;
case GossipTick _: ReceiveGossipTick(); return true;
case ClockTick c: ReceiveClockTick(); return true;
case Subscribe s: ReceiveSubscribe(s.Key, s.Subscriber); return true;
case Unsubscribe u: ReceiveUnsubscribe(u.Key, u.Subscriber); return true;
case Terminated t: ReceiveTerminated(t.ActorRef); return true;
case ClusterEvent.MemberJoined m: ReceiveMemberJoining(m.Member); return true;
case ClusterEvent.MemberWeaklyUp m: ReceiveMemberWeaklyUp(m.Member); return true;
case ClusterEvent.MemberUp m: ReceiveMemberUp(m.Member); return true;
case ClusterEvent.MemberExited m: ReceiveMemberExiting(m.Member); return true;
case ClusterEvent.MemberRemoved m: ReceiveMemberRemoved(m.Member); return true;
case ClusterEvent.IMemberEvent m: ReceiveOtherMemberEvent(m.Member); return true;
case ClusterEvent.UnreachableMember u: ReceiveUnreachable(u.Member); return true;
case ClusterEvent.ReachableMember r: ReceiveReachable(r.Member); return true;
case GetKeyIds _: ReceiveGetKeyIds(); return true;
case Delete d: ReceiveDelete(d.Key, d.Consistency, d.Request); return true;
case RemovedNodePruningTick r: ReceiveRemovedNodePruningTick(); return true;
case GetReplicaCount _: ReceiveGetReplicaCount(); return true;
}
return false;
}
private void ReceiveGet(IKey key, IReadConsistency consistency, object req)
{
var localValue = GetData(key.Id);
_log.Debug("Received get for key {0}, local value {1}, consistency: {2}", key.Id, localValue, consistency);
if (IsLocalGet(consistency))
{
if (localValue == null) Sender.Tell(new NotFound(key, req));
else if (localValue.Data is DeletedData) Sender.Tell(new DataDeleted(key, req));
else Sender.Tell(new GetSuccess(key, req, localValue.Data));
}
else
{
var excludeExiting = consistency is ReadMajorityPlus || consistency is ReadAll;
Context.ActorOf(ReadAggregator.Props(key, consistency, req, NodesForReadWrite(excludeExiting), _unreachable, !_settings.PreferOldest, localValue, Sender)
.WithDispatcher(Context.Props.Dispatcher));
}
}
private bool IsLocalGet(IReadConsistency consistency)
{
if (consistency is ReadLocal) return true;
if (consistency is ReadAll || consistency is ReadMajority) return _nodes.Count == 0;
return false;
}
private void ReceiveRead(string key)
{
Sender.Tell(new ReadResult(GetData(key)));
}
private bool MatchingRole(Member m) => string.IsNullOrEmpty(_settings.Role) || m.HasRole(_settings.Role);
private void ReceiveUpdate(IKey key, Func<IReplicatedData, IReplicatedData> modify, IWriteConsistency consistency, object request)
{
var localValue = GetData(key.Id);
try
{
DataEnvelope envelope;
IReplicatedData delta;
if (localValue == null)
{
var d = modify(null);
if (d is IDeltaReplicatedData withDelta)
{
envelope = new DataEnvelope(withDelta.ResetDelta());
delta = withDelta.Delta ?? DeltaPropagation.NoDeltaPlaceholder;
}
else
{
envelope = new DataEnvelope(d);
delta = null;
}
}
else if (localValue.Data is DeletedData)
{
_log.Debug("Received update for deleted key {0}", key);
Sender.Tell(new DataDeleted(key, request));
return;
}
else
{
var d = modify(localValue.Data);
if (d is IDeltaReplicatedData withDelta)
{
envelope = localValue.Merge(withDelta.ResetDelta());
delta = withDelta.Delta ?? DeltaPropagation.NoDeltaPlaceholder;
}
else
{
envelope = localValue.Merge(d);
delta = null;
}
}
// case Success((envelope, delta)) ⇒
_log.Debug("Received Update for key {0}", key);
// handle the delta
if (delta != null)
{
_deltaPropagationSelector.Update(key.Id, delta);
}
// note that it's important to do deltaPropagationSelector.update before setData,
// so that the latest delta version is used
var newEnvelope = SetData(key.Id, envelope);
var durable = IsDurable(key.Id);
if (IsLocalUpdate(consistency))
{
if (durable)
{
var reply = new StoreReply(
successMessage: new UpdateSuccess(key, request),
failureMessage: new StoreFailure(key, request),
replyTo: Sender);
_durableStore.Tell(new Store(key.Id, new DurableDataEnvelope(newEnvelope), reply));
}
else Sender.Tell(new UpdateSuccess(key, request));
}
else
{
DataEnvelope writeEnvelope;
Delta writeDelta;
if (delta == null || Equals(delta, DeltaPropagation.NoDeltaPlaceholder))
{
writeEnvelope = newEnvelope;
writeDelta = null;
}
else if (delta is IRequireCausualDeliveryOfDeltas)
{
var version = _deltaPropagationSelector.CurrentVersion(key.Id);
writeEnvelope = newEnvelope;
writeDelta = new Delta(newEnvelope.WithData(delta), version, version);
}
else
{
writeEnvelope = newEnvelope.WithData(delta);
writeDelta = null;
}
// When RequiresCausalDeliveryOfDeltas use deterministic order to so that sequence numbers
// of subsequent updates are in sync on the destination nodes.
// The order is also kept when prefer-oldest is enabled.
var shuffle = !(_settings.PreferOldest || (writeDelta?.RequiresCausalDeliveryOfDeltas) == true);
var excludeExiting = consistency is WriteMajorityPlus || consistency is WriteAll;
var writeAggregator = Context.ActorOf(WriteAggregator
.Props(key, writeEnvelope, writeDelta, consistency, request, NodesForReadWrite(excludeExiting), _unreachable, shuffle, Sender, durable)
.WithDispatcher(Context.Props.Dispatcher));
if (durable)
{
var reply = new StoreReply(
successMessage: new UpdateSuccess(key, request),
failureMessage: new StoreFailure(key, request),
replyTo: writeAggregator);
_durableStore.Tell(new Store(key.Id, new DurableDataEnvelope(envelope), reply));
}
}
}
catch (Exception ex)
{
_log.Debug("Received update for key {0}, failed {1}", key, ex.Message);
Sender.Tell(new ModifyFailure(key, "Update failed: " + ex.Message, ex, request));
}
}
private bool IsDurable(string key) =>
_durableKeys.Contains(key) || (_durableWildcards.Count > 0 && _durableWildcards.Any(key.StartsWith));
private bool IsLocalUpdate(IWriteConsistency consistency)
{
if (consistency is WriteLocal) return true;
if (consistency is WriteAll || consistency is WriteMajority) return _nodes.Count == 0;
return false;
}
private void ReceiveWrite(string key, DataEnvelope envelope)
{
WriteAndStore(key, envelope, reply: true);
}
private void WriteAndStore(string key, DataEnvelope writeEnvelope, bool reply)
{
var newEnvelope = Write(key, writeEnvelope);
if (newEnvelope != null)
{
if (IsDurable(key))
{
var storeReply = reply
? new StoreReply(WriteAck.Instance, WriteNack.Instance, Sender)
: null;
_durableStore.Tell(new Store(key, new DurableDataEnvelope(newEnvelope), storeReply));
}
else if (reply) Sender.Tell(WriteAck.Instance);
}
else if (reply) Sender.Tell(WriteNack.Instance);
}
private DataEnvelope Write(string key, DataEnvelope writeEnvelope)
{
switch (GetData(key))
{
case DataEnvelope envelope when envelope.Equals(writeEnvelope):
return envelope;
case DataEnvelope envelope when envelope.Data is DeletedData:
// already deleted
return DeletedEnvelope;
case DataEnvelope envelope:
try
{
// DataEnvelope will mergeDelta when needed
var merged = envelope.Merge(writeEnvelope).AddSeen(_selfAddress);
return SetData(key, merged);
}
catch (ArgumentException e)
{
_log.Warning("Couldn't merge [{0}] due to: {1}", key, e.Message);
return null;
}
default:
// no existing data for the key
if (writeEnvelope.Data is IReplicatedDelta withDelta)
writeEnvelope = writeEnvelope.WithData(withDelta.Zero.MergeDelta(withDelta));
return SetData(key, writeEnvelope.AddSeen(_selfAddress));
}
}
private void ReceiveReadRepair(string key, DataEnvelope writeEnvelope)
{
WriteAndStore(key, writeEnvelope, reply: false);
Sender.Tell(ReadRepairAck.Instance);
}
private void ReceiveGetKeyIds()
{
var keys = _dataEntries
.Where(kvp => !(kvp.Value.envelope.Data is DeletedData))
.Select(x => x.Key)
.ToImmutableHashSet();
Sender.Tell(new GetKeysIdsResult(keys));
}
private void ReceiveDelete(IKey key, IWriteConsistency consistency, object request)
{
var envelope = GetData(key.Id);
if (envelope?.Data is DeletedData)
{
// already deleted
Sender.Tell(new DataDeleted(key, request));
}
else
{
SetData(key.Id, DeletedEnvelope);
var durable = IsDurable(key.Id);
if (IsLocalUpdate(consistency))
{
if (durable)
{
var reply = new StoreReply(
successMessage: new DeleteSuccess(key, request),
failureMessage: new StoreFailure(key, request),
replyTo: Sender);
_durableStore.Tell(new Store(key.Id, new DurableDataEnvelope(DeletedEnvelope), reply));
}
else Sender.Tell(new DeleteSuccess(key, request));
}
else
{
var excludeExiting = consistency is WriteMajorityPlus || consistency is WriteAll;
var writeAggregator = Context.ActorOf(WriteAggregator
.Props(key, DeletedEnvelope, null, consistency, request, NodesForReadWrite(excludeExiting), _unreachable, !_settings.PreferOldest, Sender, durable)
.WithDispatcher(Context.Props.Dispatcher));
if (durable)
{
var reply = new StoreReply(
successMessage: new DeleteSuccess(key, request),
failureMessage: new StoreFailure(key, request),
replyTo: writeAggregator);
_durableStore.Tell(new Store(key.Id, new DurableDataEnvelope(DeletedEnvelope), reply));
}
}
}
}
private DataEnvelope SetData(string key, DataEnvelope envelope)
{
var deltaVersions = envelope.DeltaVersions;
var currentVersion = _deltaPropagationSelector.CurrentVersion(key);
var newEnvelope = currentVersion == 0 || currentVersion == deltaVersions.VersionAt(_selfUniqueAddress)
? envelope
: envelope.WithDeltaVersions(deltaVersions.Merge(VersionVector.Create(_selfUniqueAddress, currentVersion)));
Digest digest;
if (_subscribers.ContainsKey(key) && !_changed.Contains(key))
{
var oldDigest = GetDigest(key);
var dig = Digest(newEnvelope);
if (!Equals(dig, oldDigest))
_changed = _changed.Add(key);
digest = dig;
}
else if (newEnvelope.Data is DeletedData) digest = DeletedDigest;
else digest = LazyDigest;
_dataEntries = _dataEntries.SetItem(key, (newEnvelope, digest));
if (newEnvelope.Data is DeletedData)
{
_deltaPropagationSelector.Delete(key);
}
return newEnvelope;
}
private Digest GetDigest(string key)
{
var contained = _dataEntries.TryGetValue(key, out var value);
if (contained)
{
if (value.digest == LazyDigest)
{
var digest = Digest(value.envelope);
_dataEntries = _dataEntries.SetItem(key, (value.envelope, digest));
return digest;
}
return value.digest;
}
return NotFoundDigest;
}
private Digest Digest(DataEnvelope envelope)
{
if (Equals(envelope.Data, DeletedData.Instance)) return DeletedDigest;
var bytes = _serializer.ToBinary(envelope.WithoutDeltaVersions());
var serialized = SHA1.Create().ComputeHash(bytes);
return ByteString.CopyFrom(serialized);
}
private DataEnvelope GetData(string key)
{
return !_dataEntries.TryGetValue(key, out var value) ? null : value.envelope;
}
private long GetDeltaSequenceNr(string key, UniqueAddress from)
{
return _dataEntries.TryGetValue(key, out var tuple) ? tuple.envelope.DeltaVersions.VersionAt(@from) : 0L;
}
private bool IsNodeRemoved(UniqueAddress node, IEnumerable<string> keys)
{
if (_removedNodes.ContainsKey(node)) return true;
return keys.Any(key => _dataEntries.TryGetValue(key, out var tuple) && tuple.envelope.Pruning.ContainsKey(node));
}
private void Notify(string keyId, HashSet<IActorRef> subs)
{
var key = _subscriptionKeys[keyId];
var envelope = GetData(keyId);
if (envelope != null)
{
var msg = envelope.Data is DeletedData
? (object)new DataDeleted(key, null)
: new Changed(key, envelope.Data);
foreach (var sub in subs) sub.Tell(msg);
}
}
private void ReceiveFlushChanges()
{
if (_subscribers.Count != 0)
{
foreach (var key in _changed)
{
if (_subscribers.TryGetValue(key, out var subs))
Notify(key, subs);
}
}
// Changed event is sent to new subscribers even though the key has not changed,
// i.e. send current value
if (_newSubscribers.Count != 0)
{
foreach (var kvp in _newSubscribers)
{
Notify(kvp.Key, kvp.Value);
if (!_subscribers.TryGetValue(kvp.Key, out var set))