-
Notifications
You must be signed in to change notification settings - Fork 1k
/
LWWRegister.cs
220 lines (193 loc) · 7.88 KB
/
LWWRegister.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
//-----------------------------------------------------------------------
// <copyright file="LWWRegister.cs" company="Akka.NET Project">
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using Akka.Cluster;
namespace Akka.DistributedData
{
/// <summary>
/// Delegate responsible for managing <see cref="LWWRegister{T}"/> clock.
/// </summary>
/// <typeparam name="T">TBD</typeparam>
/// <param name="currentTimestamp">The current timestamp value of the <see cref="LWWRegister{T}"/>.</param>
/// <param name="value">The register value to set and associate with the returned timestamp.</param>
/// <returns>Next timestamp</returns>
public delegate long Clock<in T>(long currentTimestamp, T value);
/// <summary>
/// INTERNAL API
///
/// Marker interface for serialization
/// </summary>
internal interface ILWWRegisterKey
{
Type RegisterType { get; }
}
/// <summary>
/// Key types for <see cref="LWWRegister{T}"/>
/// </summary>
/// <typeparam name="T">TBD</typeparam>
[Serializable]
public sealed class LWWRegisterKey<T> : Key<LWWRegister<T>>, ILWWRegisterKey
{
/// <summary>
/// TBD
/// </summary>
/// <param name="id">TBD</param>
public LWWRegisterKey(string id) : base(id)
{
}
public Type RegisterType { get; } = typeof(T);
}
/// <summary>
/// INTERNAL API
///
/// Marker interface for serialization
/// </summary>
internal interface ILWWRegister
{
Type RegisterType { get; }
}
/// <summary>
/// Implements a 'Last Writer Wins Register' CRDT, also called a 'LWW-Register'.
///
/// It is described in the paper
/// <a href="http://hal.upmc.fr/file/index/docid/555588/filename/techreport.pdf">A comprehensive study of Convergent and Commutative Replicated Data Types</a>.
///
/// Merge takes the register with highest timestamp. Note that this
/// relies on synchronized clocks. <see cref="LWWRegister{T}"/> should only be used when the choice of
/// value is not important for concurrent updates occurring within the clock skew.
///
/// Merge takes the register updated by the node with lowest address (<see cref="UniqueAddress"/> is ordered)
/// if the timestamps are exactly the same.
///
/// Instead of using timestamps based on `DateTime.UtcNow` time it is possible to
/// use a timestamp value based on something else, for example an increasing version number
/// from a database record that is used for optimistic concurrency control.
///
/// For first-write-wins semantics you can use the <see cref="LWWRegister{T}.ReverseClock"/> instead of the
/// [[LWWRegister#defaultClock]]
///
/// This class is immutable, i.e. "modifying" methods return a new instance.
/// </summary>
/// <typeparam name="T">TBD</typeparam>
[Serializable]
public sealed class LWWRegister<T> : IReplicatedData<LWWRegister<T>>, IReplicatedDataSerialization, IEquatable<LWWRegister<T>>, ILWWRegister
{
/// <summary>
/// Default clock is using max between DateTime.UtcNow.Ticks and current timestamp + 1.
/// </summary>
public static readonly Clock<T> DefaultClock =
(timestamp, _) => Math.Max(DateTime.UtcNow.Ticks, timestamp + 1);
/// <summary>
/// Reverse clock can be used for first-write-wins semantics. It's counting backwards,
/// using min between -DateTime.UtcNow.Ticks and current timestamp - 1.
/// </summary>
public static readonly Clock<T> ReverseClock =
(timestamp, _) => Math.Min(-DateTime.UtcNow.Ticks, timestamp - 1);
/// <summary>
/// TBD
/// </summary>
/// <param name="node">TBD</param>
/// <param name="initial">TBD</param>
public LWWRegister(UniqueAddress node, T initial)
{
UpdatedBy = node;
Value = initial;
Timestamp = DefaultClock(0L, initial);
}
/// <summary>
/// TBD
/// </summary>
/// <param name="node">TBD</param>
/// <param name="value">TBD</param>
/// <param name="timestamp">TBD</param>
public LWWRegister(UniqueAddress node, T value, long timestamp)
{
UpdatedBy = node;
Value = value;
Timestamp = timestamp;
}
/// <summary>
/// TBD
/// </summary>
/// <param name="node">TBD</param>
/// <param name="initial">TBD</param>
/// <param name="clock">TBD</param>
public LWWRegister(UniqueAddress node, T initial, Clock<T> clock)
{
UpdatedBy = node;
Value = initial;
Timestamp = clock(0L, initial);
}
/// <summary>
/// Returns a timestamp used to determine precedence in current register updates.
/// </summary>
public long Timestamp { get; }
/// <summary>
/// Returns value of the current register.
/// </summary>
public T Value { get; }
/// <summary>
/// Returns a unique address of the last cluster node, that updated current register value.
/// </summary>
public UniqueAddress UpdatedBy { get; }
/// <summary>
/// Change the value of the register.
///
/// You can provide your <paramref name="clock"/> implementation instead of using timestamps based
/// on DateTime.UtcNow.Ticks time. The timestamp can for example be an
/// increasing version number from a database record that is used for optimistic
/// concurrency control.
/// </summary>
/// <param name="node">TBD</param>
/// <param name="value">TBD</param>
/// <param name="clock">TBD</param>
/// <returns>TBD</returns>
public LWWRegister<T> WithValue(UniqueAddress node, T value, Clock<T> clock = null)
{
var c = clock ?? DefaultClock;
return new LWWRegister<T>(node, value, c(Timestamp, value));
}
/// <summary>
/// TBD
/// </summary>
/// <param name="other">TBD</param>
/// <returns>TBD</returns>
public LWWRegister<T> Merge(LWWRegister<T> other)
{
if (other.Timestamp > Timestamp) return other;
if (other.Timestamp < Timestamp) return this;
if (other.UpdatedBy.Uid < UpdatedBy.Uid) return other;
return this;
}
/// <summary>
/// TBD
/// </summary>
/// <param name="other">TBD</param>
/// <returns>TBD</returns>
public IReplicatedData Merge(IReplicatedData other) => Merge((LWWRegister<T>)other);
public bool Equals(LWWRegister<T> other)
{
if (ReferenceEquals(other, null)) return false;
if (ReferenceEquals(this, other)) return true;
return Timestamp == other.Timestamp && UpdatedBy == other.UpdatedBy && Equals(Value, other.Value);
}
public override bool Equals(object obj) => obj is LWWRegister<T> register && Equals(register);
public override int GetHashCode()
{
unchecked
{
var hashCode = UpdatedBy.GetHashCode();
hashCode = (hashCode * 397) ^ EqualityComparer<T>.Default.GetHashCode(Value);
hashCode = (hashCode * 397) ^ Timestamp.GetHashCode();
return hashCode;
}
}
public override string ToString() => $"LWWRegister(value={Value}, timestamp={Timestamp}, updatedBy={UpdatedBy})";
public Type RegisterType { get; } = typeof(T);
}
}