Skip to content

Commit

Permalink
Fix ORSet.Merge with AddDeltaOperation takes too long to complete (#5686
Browse files Browse the repository at this point in the history
)

* Fix ORSet.Merge with AddDeltaOperation takes too long to complete

* Make sure that insertion is still correct after the changes

* Increase update timeout setting

Co-authored-by: Gregorius Soedharmo <gregorius.soedharmo@petabridge.com>
  • Loading branch information
Arkatufus and Greg-Petabridge committed Feb 24, 2022
1 parent 8faab89 commit 700f33d
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 9 deletions.
82 changes: 76 additions & 6 deletions src/contrib/cluster/Akka.DistributedData.Tests/ReplicatorSpecs.cs
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Numerics;
using System.Text;
Expand All @@ -33,7 +34,11 @@ static ReplicatorSpecs()
SpecConfig = ConfigurationFactory.ParseString(@"
akka.loglevel = DEBUG
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.port = 0")
akka.remote.dot-netty.tcp.port = 0
akka.remote.dot-netty.tcp.send-buffer-size = 2000000
akka.remote.dot-netty.tcp.receive-buffer-size = 2000000
akka.remote.dot-netty.tcp.maximum-frame-size = 1000000
akka.cluster.sharding.updating-state-timeout = 15s")
.WithFallback(DistributedData.DefaultConfig());
}

Expand Down Expand Up @@ -503,7 +508,7 @@ public async Task Bugfix_4367_ORMultiValueDictionary_WithValueDeltas_DeltaGroup_

// Scenario 1 - add 1 entry with multiple values to all nodes
var keyA = "A";
var entryA = ImmutableHashSet<string>.Empty.Add("1").Add("2");
var entryA = ImmutableHashSet<string>.Empty.Add("1").Add("2").Add("3").Add("4");
await AwaitAssertAsync(async () => {
var m1 = await _replicator1.Ask<UpdateSuccess>(Dsl.Update(
_keyJ,
Expand Down Expand Up @@ -539,7 +544,7 @@ public async Task Bugfix_4367_ORMultiValueDictionary_WithValueDeltas_DeltaGroup_
});

// Scenario 3 - modify set with existing items in it
var entryA1 = entryA.Add("4");
var entryA1 = entryA.Add("6");
ORMultiValueDictionary<string, string> node2EntriesBCA = null;
await AwaitAssertAsync(async () =>
{
Expand All @@ -551,10 +556,10 @@ public async Task Bugfix_4367_ORMultiValueDictionary_WithValueDeltas_DeltaGroup_
s => s.SetItems(Cluster.Cluster.Get(_sys1), keyA, entryA1)));
node2EntriesBCA = changedProbe2.ExpectMsg<Changed>(g => Equals(g.Key, _keyJ)).Get(_keyJ);
node2EntriesBCA.Entries["A"].Should().BeEquivalentTo("1", "2", "4");
node2EntriesBCA.Entries["A"].Should().BeEquivalentTo("1", "2", "3", "4", "6");
var node3EntriesBCA = changedProbe3.ExpectMsg<Changed>(g => Equals(g.Key, _keyJ)).Get(_keyJ).Entries;
node3EntriesBCA["A"].Should().BeEquivalentTo("1", "2", "4");
node3EntriesBCA["A"].Should().BeEquivalentTo("1", "2", "3", "4", "6");
});

// Trigger update from Node2 back to Node 1
Expand All @@ -570,11 +575,76 @@ public async Task Bugfix_4367_ORMultiValueDictionary_WithValueDeltas_DeltaGroup_
s => s.SetItems(Cluster.Cluster.Get(_sys2), keyA, entryA2)));
var node1EntriesBCA = changedProbe1.ExpectMsg<Changed>(g => Equals(g.Key, _keyJ)).Get(_keyJ).Entries;
node1EntriesBCA["A"].Should().BeEquivalentTo("1", "2", "4", "5");
node1EntriesBCA["A"].Should().BeEquivalentTo("1", "2", "3", "4", "5", "6");
});
}

// Reproduction spec for issue #5663
[Fact]
public async Task ORMultiValueDictionary_WithValueDeltas_LargeDataSet()
{
await InitCluster();

var changedProbe2 = CreateTestProbe(_sys2);
_replicator2.Tell(Dsl.Subscribe(_keyJ, changedProbe2.Ref));

var changedProbe3 = CreateTestProbe(_sys3);
_replicator3.Tell(Dsl.Subscribe(_keyJ, changedProbe3.Ref));

var messages = Enumerable.Range(0, 20000).Select(i => i.ToString()).ToList();

// Scenario 1 - add 1 entry with multiple values to all nodes
var keyA = "A";
var entryA = messages.ToImmutableHashSet();

var stopwatch = Stopwatch.StartNew();
try
{
await _replicator1.Ask<UpdateSuccess>(Dsl.Update(
_keyJ,
ORMultiValueDictionary<string, string>.EmptyWithValueDeltas,
new WriteMajority(_timeOut),
s => s.SetItems(Cluster.Cluster.Get(_sys1), keyA, entryA)));
}
finally
{
stopwatch.Stop();
}
Log.Info($"Update time: {stopwatch.ElapsedMilliseconds} ms ({stopwatch.ElapsedMilliseconds / 1000.0} s)");

var node2EntriesA = changedProbe2.ExpectMsg<Changed>(g => Equals(g.Key, _keyJ)).Get(_keyJ).Entries;
node2EntriesA[keyA].Should().BeEquivalentTo(entryA);

var node3EntriesA = changedProbe3.ExpectMsg<Changed>(g => Equals(g.Key, _keyJ)).Get(_keyJ).Entries;
node3EntriesA[keyA].Should().BeEquivalentTo(entryA);

// Scenario 2 - modify set with existing items in it
var entryA1 = entryA.Add("999999").Add("1000000");

stopwatch = Stopwatch.StartNew();
try
{
await _replicator1.Ask<UpdateSuccess>(Dsl.Update(
_keyJ,
ORMultiValueDictionary<string, string>.EmptyWithValueDeltas,
new WriteMajority(_timeOut),
s => s.SetItems(Cluster.Cluster.Get(_sys1), keyA, entryA1)));
}
finally
{
stopwatch.Stop();
}
Log.Info($"Single update time: {stopwatch.ElapsedMilliseconds} ms ({stopwatch.ElapsedMilliseconds / 1000.0} s)");

var node2Changed = changedProbe2.ExpectMsg<Changed>(g => Equals(g.Key, _keyJ), TimeSpan.FromSeconds(3));
var node2EntriesBCA = node2Changed.Get(_keyJ);
node2EntriesBCA.Entries["A"].Should().BeEquivalentTo(entryA1);

var node3Changed = changedProbe3.ExpectMsg<Changed>(g => Equals(g.Key, _keyJ), TimeSpan.FromSeconds(3));
var node3EntriesBCA = node3Changed.Get(_keyJ).Entries;
node3EntriesBCA["A"].Should().BeEquivalentTo(entryA1);
}

protected override void BeforeTermination()
{
Shutdown(_sys1);
Expand Down
6 changes: 3 additions & 3 deletions src/contrib/cluster/Akka.DistributedData/ORSet.cs
Expand Up @@ -613,12 +613,12 @@ public DeltaGroup(ImmutableArray<IReplicatedData> operations)

public IReplicatedData Merge(IReplicatedData other)
{
if (other is AddDeltaOperation)
if (other is AddDeltaOperation thatAdd)
{
// merge AddDeltaOp into last AddDeltaOp in the group, if possible
var last = Operations[Operations.Length - 1];
return last is AddDeltaOperation
? new DeltaGroup(Operations.SetItem(Operations.Length - 1, other.Merge(last)))
return last is AddDeltaOperation thisAdd
? new DeltaGroup(Operations.SetItem(Operations.Length - 1, thisAdd.Merge(thatAdd)))
: new DeltaGroup(Operations.Add(other));
}
else if (other is DeltaGroup @group)
Expand Down

0 comments on commit 700f33d

Please sign in to comment.