Skip to content

Commit 961e379

Browse files
authored
[API-1824] Fix Compact Serialization as per QA (#787)
* Add Compact serialization tests * Fix compact following QA * Fix compact following QA + associated fixesà * Misc improvements of tests infrastructures * Refactor cluster code for issues identified after events re-ordering * build fix * refactor fix * refactor fix * refactor fix * refactor fix * refactor fix * cleanup
1 parent 0949766 commit 961e379

23 files changed

+861
-394
lines changed

src/Hazelcast.Net.Testing/Accessors/MemberConnectionAccessor.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ internal class MemberConnectionAccessor: AccessorBase<MemberConnection>
2222
public MemberConnectionAccessor(MemberConnection instance): base(instance)
2323
{ }
2424

25-
public bool Active
25+
public bool Connected
2626
{
27-
get => GetField<bool>("_active");
28-
set => SetField("_active", value);
27+
get => GetField<bool>("_connected");
28+
set => SetField("_connected", value);
2929
}
3030

3131
public Guid MemberId

src/Hazelcast.Net.Testing/RemoteControllerClientExtensions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,14 @@ public static async Task StopMemberWaitRemovedAsync(this IRemoteControllerClient
160160
var removed = new SemaphoreSlim(0);
161161

162162
var subscriptionId = await clientInternal.SubscribeAsync(on => on
163+
.StateChanged((sender, args) =>
164+
{
165+
// removing the only member disconnects the client and triggers this event
166+
if (args.State == ClientState.Disconnected) removed.Release();
167+
})
163168
.MembersUpdated((sender, args) =>
164169
{
170+
// removing one member amongst others triggers this event (but does not disconnect the client)
165171
if (args.RemovedMembers.Count > 0) removed.Release();
166172
}))
167173
.CfAwait();

src/Hazelcast.Net.Tests/Clustering/ConnectMembersTests.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ async Task ConnectMembers(MemberConnectionQueue memberConnectionQueue, Cancellat
6161
var cancellation = new CancellationTokenSource();
6262
var connecting = ConnectMembers(queue, cancellation.Token);
6363

64+
queue.Resume(); // the queue is initially suspended, resume
6465

6566
// -- connects
6667

@@ -211,6 +212,8 @@ async Task ConnectMembers(MemberConnectionQueue memberConnectionQueue, Cancellat
211212
var cancellation = new CancellationTokenSource();
212213
var connecting = ConnectMembers(queue, cancellation.Token);
213214

215+
queue.Resume(); // the queue is initially suspended, resume
216+
214217
// -- connects
215218

216219
var stopwatch = Stopwatch.StartNew();

src/Hazelcast.Net.Tests/Clustering/MemberConnectionTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ private MemberConnection NewActiveMemberConnection(MemberInfo member, Authentica
397397
new SslOptions(), new Int64Sequence(), loggerFactory
398398
);
399399

400-
connection.Accessor().Active = true;
400+
connection.Accessor().Connected = true;
401401
connection.Accessor().MemberId = member.Id;
402402

403403
return connection;

src/Hazelcast.Net.Tests/Clustering/SubscriptionCollectTests.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
using Hazelcast.Protocol.Models;
2525
using Hazelcast.Serialization;
2626
using Hazelcast.Testing;
27+
using Hazelcast.Testing.Logging;
2728
using Hazelcast.Testing.TestServer;
2829
using Microsoft.Extensions.Logging.Abstractions;
2930
using NUnit.Framework;
@@ -92,7 +93,9 @@ public async Task SubscriptionIsCollected()
9293

9394
HConsole.WriteLine(this, "Start client");
9495

95-
var options = new HazelcastOptionsBuilder().With(options =>
96+
var options = new HazelcastOptionsBuilder()
97+
.WithHConsoleLogger()
98+
.With(options =>
9699
{
97100
options.Networking.Addresses.Add("127.0.0.1:11001");
98101
options.Networking.Addresses.Add("127.0.0.1:11002");

src/Hazelcast.Net.Tests/Remote/ClientTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
namespace Hazelcast.Tests.Remote
2424
{
2525
[TestFixture]
26-
[Timeout(10_000)]
26+
[Timeout(30_000)]
2727
public class ClientTests : SingleMemberRemoteTestBase
2828
{
2929
[Test]
@@ -40,15 +40,15 @@ public async Task ClientCanConnect()
4040
}
4141

4242
[Test]
43-
public async Task ClientStaringClientWithConfig()
43+
public async Task ClientStartingClientWithConfig()
4444
{
4545
var clientStarting = HazelcastClientFactory.GetNewStartingClient(CreateHazelcastOptions());
4646
await clientStarting.Task;
4747
await clientStarting.Client.DisposeAsync();
4848
}
4949

5050
[Test]
51-
public async Task ClientStaringClientWithConfig2()
51+
public async Task ClientStatringClientWithConfig2()
5252
{
5353
var o = CreateHazelcastOptions();
5454
var clientStarting = HazelcastClientFactory.GetNewStartingClient(options=>
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
// Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using System;
16+
using System.Collections.Generic;
17+
using System.Linq;
18+
using System.Threading;
19+
using System.Threading.Tasks;
20+
using Hazelcast.Core;
21+
using Hazelcast.Exceptions;
22+
using Hazelcast.Messaging;
23+
using Hazelcast.Networking;
24+
using Hazelcast.Partitioning;
25+
using Hazelcast.Protocol.Codecs;
26+
using Hazelcast.Serialization;
27+
using Hazelcast.Testing;
28+
using Hazelcast.Testing.Logging;
29+
using NUnit.Framework;
30+
31+
namespace Hazelcast.Tests.Serialization.Compact;
32+
33+
[TestFixture]
34+
public class CompactQaTests : ClusterRemoteTestBase
35+
{
36+
private IDisposable UseHConsole() => HConsole.Capture(o => o
37+
//.WithFilename("console.out")
38+
.Configure().SetMaxLevel().EnableTimeStamp(origin: DateTime.Now)
39+
.Configure(this).SetPrefix("TEST")
40+
.Configure<SocketConnectionBase>().SetIndent(8).SetPrefix("SOCKET").SetLevel(0)
41+
.Configure<ClientMessageConnection>().SetMinLevel()
42+
.Configure<AsyncContext>().SetMinLevel()
43+
.Configure<Partitioner>().SetLevel(1));
44+
45+
[Test]
46+
[Explicit("See comment in test.")]
47+
public async Task MemberAddressMatch()
48+
{
49+
using var _ = UseHConsole();
50+
51+
var member = await RcClient.StartMemberAsync(RcCluster).CfAwait();
52+
await using var cleanup = new DisposeAsyncAction(async () => await RcClient.StopMemberAsync(RcCluster, member));
53+
54+
var options = CreateHazelcastOptions();
55+
options.Networking.ConnectionRetry.ClusterConnectionTimeoutMilliseconds = 10_000;
56+
options.Networking.Addresses.Clear();
57+
options.Networking.Addresses.Add("127.0.0.2:5701");
58+
59+
// this relies on the connect queue running all the time, but we have now disabled
60+
// the queue when disconnected - so this test cannot work anymore, we will need to
61+
// find a different way to do address re-routing in the future, if other clients
62+
// do it too.
63+
64+
await using var client = await HazelcastClientFactory.StartNewClientAsync(options);
65+
}
66+
67+
[TestCase(true)]
68+
[TestCase(false)]
69+
[Explicit]
70+
public async Task ExceptionPreventsClientFromReconnecting(bool recover)
71+
{
72+
using var _ = UseHConsole();
73+
74+
// in case a test times out, NUnit just reports that it failed, without further
75+
// details - in fact, it does not even say that the test timed out - so by wrapping
76+
// the actual test method this way, we make sure that the test "fails" instead of
77+
// timing out, and we get proper output (eg HConsole output)
78+
await ExceptionPreventsClientFromReconnectingTask(recover).CfAwait(TimeSpan.FromSeconds(60));
79+
}
80+
81+
private async Task ExceptionPreventsClientFromReconnectingTask(bool recover)
82+
{
83+
var throwException = false;
84+
85+
// start a member
86+
var member = await RcClient.StartMemberAsync(RcCluster).CfAwait();
87+
await using var cleanup = new DisposeAsyncAction(async () => await RcClient.StopMemberAsync(RcCluster, member));
88+
89+
// use a clean client + hook into cluster messaging to capture messages (before client starts)
90+
var options = CreateHazelcastOptions();
91+
options.Messaging.RetryTimeoutSeconds = 20;
92+
await using var client = HazelcastClientFactory.CreateClient(options);
93+
// use an internal-level handler, exceptions in user-level handlers are caught
94+
client.Cluster.Connections.ConnectionOpened += (_, _, _, _) =>
95+
{
96+
if (throwException) throw new Exception("bang!");
97+
return default;
98+
};
99+
await client.StartAsync(CancellationToken.None);
100+
101+
var map = await client.GetMapAsync<int, IGenericRecord>("bar");
102+
await map.PutAsync(1, GenericRecordBuilder.Compact("bar1").Build());
103+
104+
HConsole.WriteLine(this, "-------- STOP MEMBER --------");
105+
106+
// stop the member, wait until it is actually removed (else we might reconnect to it)
107+
await RcClient.StopMemberWaitRemovedAsync(client, RcCluster, member).CfAwait();
108+
109+
// trigger exceptions
110+
throwException = true;
111+
if (recover)
112+
{
113+
async Task RecoverAfter(int delayMilliseconds)
114+
{
115+
await Task.Delay(delayMilliseconds);
116+
throwException = false;
117+
}
118+
119+
var recoverTask = RecoverAfter(5000); // fire-and-forget
120+
}
121+
122+
HConsole.WriteLine(this, "-------- START MEMBER --------");
123+
124+
// start another member
125+
member = await RcClient.StartMemberAsync(RcCluster).CfAwait();
126+
127+
HConsole.WriteLine(this, "-------- PUT ASYNC --------");
128+
129+
if (recover)
130+
{
131+
// this will eventually complete once we're able to reconnect
132+
await map.PutAsync(2, GenericRecordBuilder.Compact("bar2").Build()).CfAwait();
133+
}
134+
else
135+
{
136+
// this will never complete because we'll never be able to reconnect
137+
//
138+
// Java: ReconnectMode can be OFF, ON (blocking invocations) or ASYNC (not blocking, triggers HazelcastClientOfflineException)
139+
// .NET: ReconnectMode can be DoNotReconnect = OFF, ReconnectSync or ReconnectAsync - but these two have the same effect
140+
//
141+
// In Java, ASYNC causes any invocation to *immediately* fail with HazelcastClientOfflineException if the client is
142+
// reconnecting, whereas ON causes the invocation to be retried, and it may eventually fail with OperationTimeoutException.
143+
//
144+
// In .NET, invocations are tried (and retried) while the client is reconnecting, until either the client reconnects and
145+
// the invocation succeeds, or it times out. So, essentially, .NET is ON and it makes sense that we get a TaskTimeoutException
146+
// below.
147+
148+
await AssertEx.ThrowsAsync<TaskTimeoutException>(async () => await map.PutAsync(2, GenericRecordBuilder.Compact("bar2").Build()).CfAwait());
149+
}
150+
151+
HConsole.WriteLine(this, "-------- STOP MEMBER --------");
152+
153+
await RcClient.StopMemberAsync(RcCluster, member).CfAwait();
154+
155+
HConsole.WriteLine(this, "-------- END --------");
156+
}
157+
158+
[Test]
159+
public async Task ClusterRestart()
160+
{
161+
//using var _ = UseHConsole();
162+
163+
// start a member
164+
var member = await RcClient.StartMemberAsync(RcCluster).CfAwait();
165+
await using var cleanup = new DisposeAsyncAction(async () => await RcClient.StopMemberAsync(RcCluster, member));
166+
167+
var messages = new List<ClientMessage>();
168+
169+
// use a clean client + hook into cluster messaging to capture messages (before client starts)
170+
await using var client = HazelcastClientFactory.CreateClient(CreateHazelcastOptions());
171+
client.Cluster.Messaging.SendingMessage += message =>
172+
{
173+
messages.Add(message);
174+
return default;
175+
};
176+
await client.StartAsync(CancellationToken.None);
177+
178+
var clientMembers = client.Members;
179+
Assert.That(clientMembers.Count, Is.EqualTo(1));
180+
Assert.That(clientMembers.First().Member.Id, Is.EqualTo(Guid.Parse(member.Uuid)));
181+
HConsole.WriteLine(this, $"CONNECTED TO MEMBER {member.Uuid}");
182+
183+
var map = await client.GetMapAsync<int, IGenericRecord>("bar");
184+
185+
// add values = will publish the corresponding schemas
186+
await map.PutAsync(1, GenericRecordBuilder.Compact("bar1").Build());
187+
await map.PutAsync(2, GenericRecordBuilder.Compact("bar2").Build());
188+
189+
// stop the member, wait until it is actually removed (else we might reconnect to it)
190+
await RcClient.StopMemberWaitRemovedAsync(client, RcCluster, member);
191+
192+
messages.Clear();
193+
194+
// start another member
195+
member = await RcClient.StartMemberAsync(RcCluster);
196+
197+
// ensure that, eventually, the client is going to connect to *that* other member
198+
// and not, because of some timing issues, on the previous one that would not stop
199+
// fast enough - StopMemberWaitRemoved waits for the client to lose its connection
200+
// to the member but the remote controller has no wait of notifying us that the
201+
// member is actually dead for real and not going to accept connections anymore.
202+
await AssertEx.SucceedsEventually(() =>
203+
{
204+
clientMembers = client.Members;
205+
Assert.That(clientMembers.Count, Is.EqualTo(1));
206+
Assert.That(clientMembers.First().Member.Id, Is.EqualTo(Guid.Parse(member.Uuid)));
207+
}, 60_000, 1_000);
208+
209+
// new member, values are lost
210+
var value1 = await map.GetAsync(1);
211+
Assert.That(value1, Is.Null);
212+
213+
// yet we've been reconnected, schemas have been republished
214+
Assert.That(messages.Any(x => x.MessageType == ClientSendAllSchemasCodec.RequestMessageType));
215+
216+
await RcClient.StopMemberAsync(RcCluster, member).CfAwait();
217+
}
218+
219+
[Test]
220+
public async Task ReadSchemaAfterWrite_withObjectValueType()
221+
{
222+
//using var _ = UseHConsole();
223+
224+
// start a member
225+
var member = await RcClient.StartMemberAsync(RcCluster).CfAwait();
226+
await using var cleanup = new DisposeAsyncAction(async () => await RcClient.StopMemberAsync(RcCluster, member));
227+
228+
// use a clean client
229+
await using var client = await CreateAndStartClientAsync();
230+
231+
var map = await client.GetMapAsync<int, object>("bar");
232+
233+
await map.PutAsync(1, GenericRecordBuilder.Compact("bar1").Build());
234+
var value = await map.PutAsync(1, GenericRecordBuilder.Compact("bar2").Build());
235+
Assert.That(value, Is.Not.Null);
236+
Assert.That(value, Is.InstanceOf<IGenericRecord>());
237+
238+
await RcClient.StopMemberAsync(RcCluster, member).CfAwait();
239+
}
240+
241+
protected override HazelcastOptions CreateHazelcastOptions()
242+
{
243+
var options = base.CreateHazelcastOptions();
244+
options.Networking.ReconnectMode = ReconnectMode.ReconnectSync;
245+
options.Messaging.RetryTimeoutSeconds = 10;
246+
return options;
247+
}
248+
249+
protected override HazelcastOptionsBuilder CreateHazelcastOptionsBuilder()
250+
{
251+
return base.CreateHazelcastOptionsBuilder().WithHConsoleLogger();
252+
}
253+
}

src/Hazelcast.Net.Tests/Serialization/Compact/CompactSerializationSerializerTests.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,18 @@ public void ReadImpossibleType()
234234

235235
var input = new ObjectDataInput(output.ToByteArray(), orw, Endianness.BigEndian);
236236

237-
// fails because the type cannot be constructed
238-
Assert.Throws<SerializationException>(() => serializer.Read(input));
237+
// fails because the type cannot be constructed, and therefore we cannot create a
238+
// proper compact serialization registration producing an ImpossibleType = we don't
239+
// know how to deserialize.
240+
Assert.Throws<SerializationException>(() => serializer.Read<ImpossibleType>(input));
241+
242+
// on the other hand, if we don't specify the type, we indicate that we accept any
243+
// object, and therefore since we fail to create a proper registration we fall back
244+
// to generic record.
245+
input.Position = 0;
246+
var obj = serializer.Read(input);
247+
Assert.That(obj, Is.Not.Null);
248+
Assert.That(obj, Is.InstanceOf<IGenericRecord>());
239249
}
240250

241251
public class ImpossibleType

0 commit comments

Comments
 (0)