diff --git a/src/Akka.sln b/src/Akka.sln index c52324212ab..2c21386bd39 100644 --- a/src/Akka.sln +++ b/src/Akka.sln @@ -240,9 +240,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.DependencyInjection.Te EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AspNetCore", "AspNetCore", "{162F5991-EA57-4221-9B70-F9B6FEC18036}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Samples.Akka.AspNetCore", "examples\AspNetCore\Samples.Akka.AspNetCore\Samples.Akka.AspNetCore.csproj", "{D62F4AD6-318F-4ECC-B875-83FA9933A81B}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Samples.Akka.AspNetCore", "examples\AspNetCore\Samples.Akka.AspNetCore\Samples.Akka.AspNetCore.csproj", "{D62F4AD6-318F-4ECC-B875-83FA9933A81B}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SerializationBenchmarks", "benchmark\SerializationBenchmarks\SerializationBenchmarks.csproj", "{2E4B9584-42CC-4D17-B719-9F462B16C94D}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SerializationBenchmarks", "benchmark\SerializationBenchmarks\SerializationBenchmarks.csproj", "{2E4B9584-42CC-4D17-B719-9F462B16C94D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DDataStressTest", "examples\Cluster\DData\DDataStressTest\DDataStressTest.csproj", "{44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -1123,6 +1125,18 @@ Global {2E4B9584-42CC-4D17-B719-9F462B16C94D}.Release|x64.Build.0 = Release|Any CPU {2E4B9584-42CC-4D17-B719-9F462B16C94D}.Release|x86.ActiveCfg = Release|Any CPU {2E4B9584-42CC-4D17-B719-9F462B16C94D}.Release|x86.Build.0 = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|Any CPU.Build.0 = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|x64.ActiveCfg = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|x64.Build.0 = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|x86.ActiveCfg = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|x86.Build.0 = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|Any CPU.ActiveCfg = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|Any CPU.Build.0 = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|x64.ActiveCfg = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|x64.Build.0 = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|x86.ActiveCfg = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1230,6 +1244,7 @@ Global {162F5991-EA57-4221-9B70-F9B6FEC18036} = {D3AF8295-AEB5-4324-AA82-FCC0014AC310} {D62F4AD6-318F-4ECC-B875-83FA9933A81B} = {162F5991-EA57-4221-9B70-F9B6FEC18036} {2E4B9584-42CC-4D17-B719-9F462B16C94D} = {73108242-625A-4D7B-AA09-63375DBAE464} + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50} = {C50E1A9E-820C-4E75-AE39-6F96A99AC4A7} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {03AD8E21-7507-4E68-A4E9-F4A7E7273164} diff --git a/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs b/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs new file mode 100644 index 00000000000..9e0c2c0e971 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs @@ -0,0 +1,129 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Akka.Actor; +using Akka.Benchmarks.Configurations; +using Akka.Cluster; +using Akka.DistributedData; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.DData +{ + [Config(typeof(MicroBenchmarkConfig))] + public class ORSetBenchmarks + { + [Params(25)] + public int NumElements; + + [Params(10)] + public int NumNodes; + + [Params(100)] + public int Iterations; + + private UniqueAddress[] _nodes; + private string[] _elements; + + private readonly string _user1 = "{\"username\":\"john\",\"password\":\"coltrane\"}"; + private readonly string _user2 = "{\"username\":\"sonny\",\"password\":\"rollins\"}"; + private readonly string _user3 = "{\"username\":\"charlie\",\"password\":\"parker\"}"; + private readonly string _user4 = "{\"username\":\"charles\",\"password\":\"mingus\"}"; + + // has data from all nodes + private ORSet _c1 = ORSet.Empty; + + // has additional items from all nodes + private ORSet _c2 = ORSet.Empty; + + // has removed items from all nodes + private ORSet _c3 = ORSet.Empty; + + [GlobalSetup] + public void Setup() + { + var newNodes = new List(NumNodes); + foreach(var i in Enumerable.Range(0, NumNodes)){ + var address = new Address("akka.tcp", "Sys", "localhost", 2552 + i); + var uniqueAddress = new UniqueAddress(address, i); + newNodes.Add(uniqueAddress); + } + _nodes = newNodes.ToArray(); + + var newElements = new List(NumNodes); + foreach(var i in Enumerable.Range(0, NumElements)){ + newElements.Add(i.ToString()); + } + _elements = newElements.ToArray(); + + _c1 = ORSet.Empty; + foreach(var node in _nodes){ + _c1 = _c1.Add(node, _elements[0]); + } + + // add some data that _c2 doesn't have + _c2 = _c1; + foreach(var node in _nodes.Skip(NumNodes/2)){ + _c2 = _c2.Add(node, _elements[1]); + } + + _c3 = _c1; + foreach(var node in _nodes.Take(NumNodes/2)){ + _c3 = _c3.Remove(node, _elements[0]); + } + } + + [Benchmark] + public void Should_add_node_to_ORSet() + { + for (var i = 0; i < Iterations; i++) + { + var init = ORSet.Empty; + foreach (var node in _nodes) + { + init = init.Add(node, _elements[0]); + } + } + + } + + [Benchmark] + public void Should_add_elements_for_Same_node() + { + for (var i = 0; i < Iterations; i++) + { + var init = ORSet.Empty; + foreach (var element in _elements) + { + init = init.Add(_nodes[0], element); + } + } + } + + [Benchmark] + public void Should_merge_in_new_Elements_from_other_nodes(){ + for (var i = 0; i < Iterations; i++) + { + var c4 = _c1.Merge(_c2); + } + + } + + [Benchmark] + public void Should_merge_in_removed_Elements_from_other_nodes(){ + for (var i = 0; i < Iterations; i++) + { + var c4 = _c1.Merge(_c3); + } + + } + + [Benchmark] + public void Should_merge_in_add_and_removed_Elements_from_other_nodes(){ + for (var i = 0; i < Iterations; i++) + { + var c4 = _c1.Merge(_c2).Merge(_c3); + } + } + } +} diff --git a/src/benchmark/Akka.Benchmarks/DData/VersionVectorBenchmark.cs b/src/benchmark/Akka.Benchmarks/DData/VersionVectorBenchmark.cs new file mode 100644 index 00000000000..dd8fea98b24 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/DData/VersionVectorBenchmark.cs @@ -0,0 +1,174 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Text; +using Akka.Benchmarks.Configurations; +using Akka.Cluster; +using Akka.DistributedData; +using BenchmarkDotNet.Attributes; +using FluentAssertions; +using static Akka.DistributedData.VersionVector; + +namespace Akka.Benchmarks.DData +{ + [Config(typeof(MicroBenchmarkConfig))] + public class VersionVectorBenchmarks + { + [Params(100)] + public int ClockSize; + + [Params(1000)] + public int Iterations; + + internal (VersionVector clock, ImmutableSortedSet nodes) CreateVectorClockOfSize(int size) + { + UniqueAddress GenerateUniqueAddress(int nodeCount){ + return new UniqueAddress(new Akka.Actor.Address("akka.tcp", "ClusterSys", "localhost", nodeCount), nodeCount); + } + + return Enumerable.Range(1, size) + .Aggregate((VersionVector.Empty, ImmutableSortedSet.Empty), + (tuple, i) => + { + var (vc, nodes) = tuple; + var node = GenerateUniqueAddress(i); + return (vc.Increment(node), nodes.Add(node)); + }); + } + + internal VersionVector CopyVectorClock(VersionVector vc) + { + var versions = ImmutableDictionary.Empty; + var enumerator = vc.VersionEnumerator; + while(enumerator.MoveNext()){ + var nodePair = enumerator.Current; + versions = versions.SetItem(nodePair.Key, nodePair.Value); + } + + return VersionVector.Create(versions); + } + + private UniqueAddress _firstNode; + private UniqueAddress _lastNode; + private UniqueAddress _middleNode; + private ImmutableSortedSet _nodes; + private VersionVector _vcBefore; + private VersionVector _vcBaseLast; + private VersionVector _vcAfterLast; + private VersionVector _vcConcurrentLast; + private VersionVector _vcBaseMiddle; + private VersionVector _vcAfterMiddle; + private VersionVector _vcConcurrentMiddle; + + [GlobalSetup] + public void Setup() + { + var (vcBefore, nodes) = CreateVectorClockOfSize(ClockSize); + _vcBefore = vcBefore; + _nodes = nodes; + + _firstNode = nodes.First(); + _lastNode = nodes.Last(); + _middleNode = nodes[ClockSize / 2]; + + _vcBaseLast = vcBefore.Increment(_lastNode); + _vcAfterLast = _vcBaseLast.Increment(_firstNode); + _vcConcurrentLast = _vcBaseLast.Increment(_lastNode); + _vcBaseMiddle = _vcBefore.Increment(_middleNode); + _vcAfterMiddle = _vcBaseMiddle.Increment(_firstNode); + _vcConcurrentMiddle = _vcBaseMiddle.Increment(_middleNode); + } + + private void CheckThunkFor(VersionVector vc1, VersionVector vc2, Action thunk, int times) + { + var vcc1 = CopyVectorClock(vc1); + var vcc2 = CopyVectorClock(vc2); + for (var i = 0; i < times; i++) + { + thunk(vcc1, vcc2); + } + } + + private void CompareTo(VersionVector vc1, VersionVector vc2, Ordering ordering) + { + vc1.Compare(vc2).Should().Be(ordering); + } + + private void NotEqual(VersionVector vc1, VersionVector vc2) + { + (vc1 == vc2).Should().BeFalse(); + } + + private void Merge(VersionVector vc1, VersionVector vc2) + { + vc1.Merge(vc2); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_same() + { + CheckThunkFor(_vcBaseLast, _vcBaseLast, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.Same), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_Before_last() + { + CheckThunkFor(_vcBefore, _vcBaseLast, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.Before), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_After_last() + { + CheckThunkFor(_vcAfterLast, _vcBaseLast, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.After), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_Concurrent_last() + { + CheckThunkFor(_vcAfterLast, _vcConcurrentLast, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.Concurrent), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_Before_middle() + { + CheckThunkFor(_vcBefore, _vcBaseMiddle, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.Before), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_After_middle() + { + CheckThunkFor(_vcAfterMiddle, _vcBaseMiddle, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.After), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_Concurrent_middle() + { + CheckThunkFor(_vcAfterMiddle, _vcConcurrentMiddle, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.Concurrent), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_notEquals_Before_Middle() + { + CheckThunkFor(_vcBefore, _vcBaseMiddle, (clock, vectorClock) => NotEqual(clock, vectorClock), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_notEquals_After_Middle() + { + CheckThunkFor(_vcAfterMiddle, _vcBaseMiddle, (clock, vectorClock) => NotEqual(clock, vectorClock), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_notEquals_Concurrent_Middle() + { + CheckThunkFor(_vcAfterMiddle, _vcConcurrentMiddle, (clock, vectorClock) => NotEqual(clock, vectorClock), Iterations); + } + + [Benchmark] + public void VersionVector_merge_Multi_Multi() + { + CheckThunkFor(_vcBefore, _vcAfterLast, (vector, versionVector) => Merge(vector, versionVector), Iterations); + } + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs index 97a87a20b00..883ebe461ac 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs @@ -57,16 +57,20 @@ public ORSetSpec(ITestOutputHelper output) [Fact] public void ORSet_must_be_able_to_add_element() { - var c1 = ORSet.Empty; - var c2 = c1.Add(_node1, _user1); - var c3 = c2.Add(_node1, _user2); - var c4 = c3.Add(_node1, _user4); - var c5 = c4.Add(_node1, _user3); - - Assert.Contains(_user1, c5.Elements); - Assert.Contains(_user2, c5.Elements); - Assert.Contains(_user3, c5.Elements); - Assert.Contains(_user4, c5.Elements); + for (var i = 0; i < 100; i++) + { + var c1 = ORSet.Empty; + var c2 = c1.Add(_node1, _user1); + var c3 = c2.Add(_node1, _user2); + var c4 = c3.Add(_node1, _user4); + var c5 = c4.Add(_node1, _user3); + + Assert.Contains(_user1, c5.Elements); + Assert.Contains(_user2, c5.Elements); + Assert.Contains(_user3, c5.Elements); + Assert.Contains(_user4, c5.Elements); + } + } [Fact] diff --git a/src/contrib/cluster/Akka.DistributedData/ORSet.cs b/src/contrib/cluster/Akka.DistributedData/ORSet.cs index 150b29c32ac..95e50398df1 100644 --- a/src/contrib/cluster/Akka.DistributedData/ORSet.cs +++ b/src/contrib/cluster/Akka.DistributedData/ORSet.cs @@ -518,35 +518,38 @@ public AddDeltaOperation(ORSet underlying) public override ORSet Underlying { get; } public override IReplicatedData Merge(IReplicatedData other) { - if (other is AddDeltaOperation) - { - var u = ((AddDeltaOperation)other).Underlying; - // Note that we only merge deltas originating from the same node - return new AddDeltaOperation(new ORSet( - ConcatElementsMap(u.ElementsMap), - Underlying.VersionVector.Merge(u.VersionVector))); - } - else if (other is AtomicDeltaOperation) - { - return new DeltaGroup(ImmutableArray.Create(this, other)); - } - else if (other is DeltaGroup) + switch (other) { - var vector = ((DeltaGroup)other).Operations; - return new DeltaGroup(vector.Add(this)); + case AddDeltaOperation operation: + { + var u = operation.Underlying; + // Note that we only merge deltas originating from the same node + return new AddDeltaOperation(new ORSet( + ConcatElementsMap(u.ElementsMap), + Underlying.VersionVector.Merge(u.VersionVector))); + } + case AtomicDeltaOperation _: + return new DeltaGroup(ImmutableArray.Create(this, other)); + case DeltaGroup dg: + { + var vector = dg.Operations; + return new DeltaGroup(vector.Add(this)); + } + default: + throw new ArgumentException($"Unknown delta operation of type {other.GetType()}", nameof(other)); } - else throw new ArgumentException($"Unknown delta operation of type {other.GetType()}", nameof(other)); } private ImmutableDictionary ConcatElementsMap( ImmutableDictionary thatMap) { - var u = Underlying.ElementsMap.ToBuilder(); - foreach (var entry in thatMap) - { - u[entry.Key] = entry.Value; - } - return u.ToImmutable(); + //var u = Underlying.ElementsMap.ToBuilder(); + //foreach (var entry in thatMap) + //{ + // u[entry.Key] = entry.Value; + //} + //return u.ToImmutable(); + return Underlying.ElementsMap.SetItems(thatMap); } } diff --git a/src/contrib/cluster/Akka.DistributedData/VersionVector.cs b/src/contrib/cluster/Akka.DistributedData/VersionVector.cs index d4d367a2283..d83daec7f38 100644 --- a/src/contrib/cluster/Akka.DistributedData/VersionVector.cs +++ b/src/contrib/cluster/Akka.DistributedData/VersionVector.cs @@ -52,13 +52,19 @@ public static VersionVector Create(ImmutableDictionary vers /// /// Marker to signal that we have reached the end of a version vector. /// - private static readonly KeyValuePair EndMarker = new KeyValuePair(null, long.MinValue); + private static readonly (UniqueAddress addr, long version) EndMarker = (null, long.MinValue); public abstract bool IsEmpty { get; } public abstract int Count { get; } public abstract IEnumerator> VersionEnumerator { get; } + + internal abstract IEnumerable<(UniqueAddress addr, long version)> InternalVersions { get; } + + internal IEnumerator<(UniqueAddress addr, long version)> InternalVersionEnumerator => + InternalVersions.GetEnumerator(); + public static readonly VersionVector Empty = new MultiVersionVector(ImmutableDictionary.Empty); /// @@ -156,15 +162,15 @@ private Ordering CompareOnlyTo(VersionVector other, Ordering order) { if (ReferenceEquals(this, other)) return Ordering.Same; - return Compare(VersionEnumerator, other.VersionEnumerator, + return Compare(InternalVersionEnumerator, other.InternalVersionEnumerator, order == Ordering.Concurrent ? Ordering.FullOrder : order); } - private T NextOrElse(IEnumerator enumerator, T defaultValue) => + private static T NextOrElse(IEnumerator enumerator, T defaultValue) => enumerator.MoveNext() ? enumerator.Current : defaultValue; - private Ordering Compare(IEnumerator> i1, - IEnumerator> i2, Ordering requestedOrder) + private Ordering Compare(IEnumerator<(UniqueAddress addr, long version)> i1, + IEnumerator<(UniqueAddress addr, long version)> i2, Ordering requestedOrder) { var nt1 = NextOrElse(i1, EndMarker); var nt2 = NextOrElse(i2, EndMarker); @@ -177,15 +183,15 @@ private Ordering Compare(IEnumerator> i1, else if (Equals(nt2, EndMarker)) return currentOrder == Ordering.Before ? Ordering.Concurrent : Ordering.After; else { - var nc = nt1.Key.CompareTo(nt2.Key); + var nc = nt1.addr.CompareTo(nt2.addr); if (nc == 0) { - if (nt1.Value < nt2.Value) + if (nt1.version < nt2.version) { if (currentOrder == Ordering.After) return Ordering.Concurrent; currentOrder = Ordering.Before; } - else if (nt1.Value > nt2.Value) + else if (nt1.version > nt2.version) { if (currentOrder == Ordering.Before) return Ordering.Concurrent; currentOrder = Ordering.After; @@ -258,6 +264,15 @@ public SingleVersionVector(UniqueAddress node, long version) public override bool IsEmpty => false; public override int Count => 1; public override IEnumerator> VersionEnumerator => new Enumerator(Node, Version); + + internal override IEnumerable<(UniqueAddress addr, long version)> InternalVersions + { + get + { + yield return (Node, Version); + } + } + public override VersionVector Increment(UniqueAddress node) { var v = Counter.GetAndIncrement(); @@ -274,23 +289,23 @@ public override VersionVector Increment(UniqueAddress node) public override VersionVector Merge(VersionVector other) { - if (other is MultiVersionVector vector1) + switch (other) { - var v2 = vector1.Versions.GetValueOrDefault(Node, 0L); - var mergedVersions = v2 >= Version ? vector1.Versions : vector1.Versions.SetItem(Node, Version); - return new MultiVersionVector(mergedVersions); - } - else if (other is SingleVersionVector vector) - { - if (Node == vector.Node) + case MultiVersionVector vector1: { - return Version >= vector.Version ? this : new SingleVersionVector(vector.Node, vector.Version); + var v2 = vector1.Versions.GetValueOrDefault(Node, 0L); + var mergedVersions = v2 >= Version ? vector1.Versions : vector1.Versions.SetItem(Node, Version); + return new MultiVersionVector(mergedVersions); } - else return new MultiVersionVector( - new KeyValuePair(Node, Version), - new KeyValuePair(vector.Node, vector.Version)); + case SingleVersionVector vector when Node == vector.Node: + return Version >= vector.Version ? this : new SingleVersionVector(vector.Node, vector.Version); + case SingleVersionVector vector: + return new MultiVersionVector( + new KeyValuePair(Node, Version), + new KeyValuePair(vector.Node, vector.Version)); + default: + throw new NotSupportedException("SingleVersionVector doesn't support merge with provided version vector"); } - else throw new NotSupportedException("SingleVersionVector doesn't support merge with provided version vector"); } public override ImmutableHashSet ModifiedByNodes => ImmutableHashSet.Create(Node); @@ -337,6 +352,10 @@ public MultiVersionVector(ImmutableDictionary nodeVersions) public override bool IsEmpty => Versions.IsEmpty; public override int Count => Versions.Count; public override IEnumerator> VersionEnumerator => Versions.GetEnumerator(); + + internal override IEnumerable<(UniqueAddress addr, long version)> InternalVersions => + Versions.Select(x => (x.Key, x.Value)); + public override VersionVector Increment(UniqueAddress node) => new MultiVersionVector(Versions.SetItem(node, Counter.GetAndIncrement())); @@ -346,25 +365,29 @@ public override VersionVector Increment(UniqueAddress node) => public override VersionVector Merge(VersionVector other) { - if (other is MultiVersionVector vector1) + switch (other) { - var merged = vector1.Versions.ToBuilder(); - foreach (var pair in Versions) + case MultiVersionVector vector1: { - var mergedCurrentTime = merged.GetValueOrDefault(pair.Key, 0L); - if (pair.Value >= mergedCurrentTime) - merged.AddOrSet(pair.Key, pair.Value); - } + var merged = vector1.Versions.ToBuilder(); + foreach (var pair in Versions) + { + var mergedCurrentTime = merged.GetValueOrDefault(pair.Key, 0L); + if (pair.Value >= mergedCurrentTime) + merged[pair.Key] = pair.Value; + } - return new MultiVersionVector(merged.ToImmutable()); - } - else if (other is SingleVersionVector vector) - { - var v1 = Versions.GetValueOrDefault(vector.Node, 0L); - var merged = v1 >= vector.Version ? Versions : Versions.SetItem(vector.Node, vector.Version); - return new MultiVersionVector(merged); + return new MultiVersionVector(merged.ToImmutable()); + } + case SingleVersionVector vector: + { + var v1 = Versions.GetValueOrDefault(vector.Node, 0L); + var merged = v1 >= vector.Version ? Versions : Versions.SetItem(vector.Node, vector.Version); + return new MultiVersionVector(merged); + } + default: + throw new NotSupportedException("MultiVersionVector doesn't support merge with provided version vector"); } - else throw new NotSupportedException("MultiVersionVector doesn't support merge with provided version vector"); } public override ImmutableHashSet ModifiedByNodes => Versions.Keys.ToImmutableHashSet(); diff --git a/src/examples/Cluster/DData/DDataStressTest/DDataStressTest.csproj b/src/examples/Cluster/DData/DDataStressTest/DDataStressTest.csproj new file mode 100644 index 00000000000..ee9b9a3f54c --- /dev/null +++ b/src/examples/Cluster/DData/DDataStressTest/DDataStressTest.csproj @@ -0,0 +1,12 @@ + + + + Exe + netcoreapp3.1 + + + + + + + diff --git a/src/examples/Cluster/DData/DDataStressTest/Program.cs b/src/examples/Cluster/DData/DDataStressTest/Program.cs new file mode 100644 index 00000000000..9d366410436 --- /dev/null +++ b/src/examples/Cluster/DData/DDataStressTest/Program.cs @@ -0,0 +1,91 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Cluster; +using Akka.DistributedData; + +namespace DDataStressTest +{ + class Program + { + public const int NumElements = 25; + + public const int NumNodes = 10; + + public const int Iterations = 10000; + + private const string _user1 = "{\"username\":\"john\",\"password\":\"coltrane\"}"; + private const string _user2 = "{\"username\":\"sonny\",\"password\":\"rollins\"}"; + private const string _user3 = "{\"username\":\"charlie\",\"password\":\"parker\"}"; + private const string _user4 = "{\"username\":\"charles\",\"password\":\"mingus\"}"; + + + static async Task Main(string[] args) + { + UniqueAddress[] _nodes; + + string[] _elements; + + + // has data from all nodes + ORSet _c1 = ORSet.Empty; + + // has additional items from all nodes + ORSet _c2 = ORSet.Empty; + + // has removed items from all nodes + ORSet _c3 = ORSet.Empty; + + var newNodes = new List(NumNodes); + foreach (var i in Enumerable.Range(0, NumNodes)) + { + var address = new Address("akka.tcp", "Sys", "localhost", 2552 + i); + var uniqueAddress = new UniqueAddress(address, i); + newNodes.Add(uniqueAddress); + } + _nodes = newNodes.ToArray(); + + var newElements = new List(NumNodes); + foreach (var i in Enumerable.Range(0, NumElements)) + { + newElements.Add(i.ToString()); + } + _elements = newElements.ToArray(); + + _c1 = ORSet.Empty; + foreach (var node in _nodes) + { + _c1 = _c1.Add(node, _elements[0]); + } + + // add some data that _c2 doesn't have + _c2 = _c1; + foreach (var node in _nodes.Skip(NumNodes / 2)) + { + _c2 = _c2.Add(node, _elements[1]); + } + + _c3 = _c1; + foreach (var node in _nodes.Take(NumNodes / 2)) + { + _c3 = _c3.Remove(node, _elements[0]); + } + + var init = ORSet.Empty; + + foreach (var element in _elements) + { + foreach (var node in _nodes) + { + init = init.Add(node, element); + } + } + + _c1.Merge(init).Merge(_c2).Merge(_c3); + + await Task.Delay(5000); + } + } +}