Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-14033 .NET: Fix MessagingTest.TestRemoteListen flakiness #8685

Merged
merged 1 commit into from Jan 22, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
128 changes: 110 additions & 18 deletions modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs
Expand Up @@ -334,14 +334,35 @@ public void TestRemoteListen([Values(true, false)] bool async)
}
}

/// <summary>
/// Tests that <see cref="IMessaging.StopRemoteListen"/> guarantees that all handlers are removed
/// upon method exit.
/// </summary>
[Test]
[Ignore("IGNITE-14032")]
public void TestStopRemoteListenRemovesAllCallbacksUponExit()
{
const string topic = "topic";

var messaging =_grid1.GetMessaging();
var listenId = messaging.RemoteListen(MessagingTestHelper.GetListener("first"), topic);

TestUtils.AssertHandleRegistryHasItems(-1, 1, _grid1, _grid2, _grid3);

messaging.Send(1, topic);
messaging.StopRemoteListen(listenId);

TestUtils.AssertHandleRegistryHasItems(-1, 0, _grid1, _grid2, _grid3);
}

/// <summary>
/// Tests RemoteListen.
/// </summary>
private void TestRemoteListen(object topic, bool async = false)
{
var messaging =_grid1.GetMessaging();

var listener = MessagingTestHelper.GetListener();
var listener = MessagingTestHelper.GetListener("first");
var listenId = async
? messaging.RemoteListenAsync(listener, topic).Result
: messaging.RemoteListen(listener, topic);
Expand All @@ -353,9 +374,10 @@ private void TestRemoteListen(object topic, bool async = false)
CheckNoMessage(NextId());

// Test multiple subscriptions for the same filter
var listener2 = MessagingTestHelper.GetListener("second");
var listenId2 = async
? messaging.RemoteListenAsync(listener, topic).Result
: messaging.RemoteListen(listener, topic);
? messaging.RemoteListenAsync(listener2, topic).Result
: messaging.RemoteListen(listener2, topic);

CheckSend(topic, msg: messaging, remoteListen: true, repeatMultiplier: 2); // expect twice the messages

Expand All @@ -364,6 +386,14 @@ private void TestRemoteListen(object topic, bool async = false)
else
messaging.StopRemoteListen(listenId2);

// Wait for all to unsubscribe: StopRemoteListen (both sync and async) does not remove remote listeners
// upon exit. Remote listeners are removed with disco messages after some delay -
// see TestStopRemoteListenRemovesAllCallbacksUponExit.
TestUtils.AssertHandleRegistryHasItems(
(int)MessagingTestHelper.SleepTimeout.TotalMilliseconds,
1,
_grid1, _grid2, _grid3);

CheckSend(topic, msg: messaging, remoteListen: true); // back to normal after unsubscription

// Test message type mismatch
Expand Down Expand Up @@ -464,7 +494,7 @@ public void TestRemoteListenMultithreaded()
if (sharedResult.Length != 0)
{
Assert.Fail("Unexpected messages ({0}): {1}; last sent message: {2}", sharedResult.Length,
string.Join(",", sharedResult), lastMsg);
string.Join(",", sharedResult.Select(x => x.ToString())), lastMsg);
}
}

Expand Down Expand Up @@ -570,17 +600,14 @@ private static int NextId()
public static class MessagingTestHelper
{
/** */
public static readonly ConcurrentStack<string> ReceivedMessages = new ConcurrentStack<string>();
public static readonly ConcurrentStack<ReceivedMessage> ReceivedMessages = new ConcurrentStack<ReceivedMessage>();

/** */
private static readonly ConcurrentStack<string> Failures = new ConcurrentStack<string>();

/** */
private static readonly CountdownEvent ReceivedEvent = new CountdownEvent(0);

/** */
private static readonly ConcurrentStack<Guid> LastNodeIds = new ConcurrentStack<Guid>();

/** */
public static volatile bool ListenResult = true;

Expand All @@ -598,7 +625,6 @@ public static void ClearReceived(int expectedCount)
{
ReceivedMessages.Clear();
ReceivedEvent.Reset(expectedCount);
LastNodeIds.Clear();
}

/// <summary>
Expand All @@ -611,18 +637,28 @@ public static void ClearReceived(int expectedCount)
public static void VerifyReceive(IClusterGroup cluster, IEnumerable<string> expectedMessages,
Func<IEnumerable<string>, IEnumerable<string>> resultFunc, int expectedRepeat)
{
expectedMessages = expectedMessages.SelectMany(x => Enumerable.Repeat(x, expectedRepeat)).ToArray();
var expectedMessagesStr = string.Join(", ", expectedMessages);

// check if expected message count has been received; Wait returns false if there were none.
Assert.IsTrue(ReceivedEvent.Wait(MessageTimeout),
string.Format("expectedMessages: {0}, expectedRepeat: {1}, remaining: {2}",
expectedMessages, expectedRepeat, ReceivedEvent.CurrentCount));
expectedMessagesStr, expectedRepeat, ReceivedEvent.CurrentCount));

expectedMessages = expectedMessages.SelectMany(x => Enumerable.Repeat(x, expectedRepeat));
var receivedMessages = ReceivedMessages.ToArray();
var actualMessages = resultFunc(receivedMessages.Select(m => m.Message)).ToArray();

Assert.AreEqual(expectedMessages, resultFunc(ReceivedMessages));
CollectionAssert.AreEqual(
expectedMessages,
actualMessages,
string.Format("Expected messages: '{0}', actual messages: '{1}', expectedRepeat: {2}",
expectedMessagesStr,
string.Join(", ", receivedMessages.Select(x => x.ToString())),
expectedRepeat));

// check that all messages came from local node.
var localNodeId = cluster.Ignite.GetCluster().GetLocalNode().Id;
Assert.AreEqual(localNodeId, LastNodeIds.Distinct().Single());
Assert.AreEqual(localNodeId, ReceivedMessages.Select(m => m.NodeId).Distinct().Single());

AssertFailures();
}
Expand All @@ -631,9 +667,9 @@ public static void ClearReceived(int expectedCount)
/// Gets the message listener.
/// </summary>
/// <returns>New instance of message listener.</returns>
public static IMessageListener<string> GetListener()
public static RemoteListener GetListener(string name = null)
{
return new RemoteListener();
return new RemoteListener(name);
}

/// <summary>
Expand All @@ -651,15 +687,25 @@ public static void AssertFailures()
/// <summary>
/// Remote listener.
/// </summary>
private class RemoteListener : IMessageListener<string>
public class RemoteListener : IMessageListener<string>
{
/** */
private readonly string _name;

/** */
public RemoteListener(string name)
{
_name = name;
}

/** <inheritdoc /> */
public bool Invoke(Guid nodeId, string message)
{
var receivedMessage = new ReceivedMessage(message, nodeId, GetHashCode(), _name);

try
{
LastNodeIds.Push(nodeId);
ReceivedMessages.Push(message);
ReceivedMessages.Push(receivedMessage);

ReceivedEvent.Signal();

Expand All @@ -674,6 +720,52 @@ public bool Invoke(Guid nodeId, string message)
}
}
}

/// <summary>
/// Received message data.
/// </summary>
public class ReceivedMessage
{
/** */
private readonly string _message;

/** */
private readonly Guid _nodeId;

/** */
private readonly int _listenerId;

/** */
private readonly string _listenerName;

/** */
public ReceivedMessage(string message, Guid nodeId, int listenerId, string listenerName)
{
_message = message;
_nodeId = nodeId;
_listenerId = listenerId;
_listenerName = listenerName;
}

/** */
public string Message
{
get { return _message; }
}

/** */
public Guid NodeId
{
get { return _nodeId; }
}

/** <inheritdoc /> */
public override string ToString()
{
return string.Format(
"ReceivedMessage [{0}, {1}, {2}, {3}]", _message, _nodeId, _listenerId, _listenerName);
}
}
}

/// <summary>
Expand Down