Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve async lock #1520

Merged
merged 10 commits into from
Sep 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ReleaseNotes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
* [Core] MQTT Packets being sent over web socket transport are now setting the web socket frame boundaries correctly (#1499).
* [Core] Add support for attaching and detaching events from different threads.
* [Core] Fixed a deadlock in _AsyncLock_ implementation (#1520).
* [Client] Keep alive mechanism now uses the configured timeout value from the options (thanks to @Stannieman, #1495).
* [Client] The _PingAsync_ will fallback to the timeout specified in the client options when the cancellation token cannot be cancelled.
* [Server] A DISCONNECT packet is no longer sent to MQTT clients < 5.0.0 (thanks to @logicaloud, #1506).
Expand Down
66 changes: 66 additions & 0 deletions Source/MQTTnet.Benchmarks/AsyncLockBenchmark.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Threading;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using MQTTnet.Internal;

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[MemoryDiagnoser]
public class AsyncLockBenchmark : BaseBenchmark
{
[Benchmark]
public async Task Synchronize_100_Tasks()
{
const int tasksCount = 100;

var tasks = new Task[tasksCount];
var asyncLock = new AsyncLock();
var globalI = 0;

for (var i = 0; i < tasksCount; i++)
{
tasks[i] = Task.Run(
async () =>
{
using (await asyncLock.WaitAsync(CancellationToken.None).ConfigureAwait(false))
{
var localI = globalI;
await Task.Delay(5).ConfigureAwait(false); // Increase the chance for wrong data.
localI++;
globalI = localI;
}
});
}

await Task.WhenAll(tasks).ConfigureAwait(false);

if (globalI != tasksCount)
{
throw new Exception($"Code is broken ({globalI})!");
}
}

[Benchmark]
public async Task Wait_100_000_Times()
{
var asyncLock = new AsyncLock();

using (var cancellationToken = new CancellationTokenSource())
{
for (var i = 0; i < 100000; i++)
{
using (await asyncLock.WaitAsync(cancellationToken.Token).ConfigureAwait(false))
{
}
}
}
}
}
}
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.1" />
<PackageReference Include="BenchmarkDotNet" Version="0.13.2" />
<PackageReference Include="Microsoft.AspNetCore" Version="2.2.0" />
</ItemGroup>

Expand Down
4 changes: 4 additions & 0 deletions Source/MQTTnet.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public static void Main(string[] args)
Console.WriteLine("c = SubscribeBenchmark");
Console.WriteLine("d = UnsubscribeBenchmark");
Console.WriteLine("e = MessageDeliveryBenchmark");
Console.WriteLine("f = AsyncLockBenchmark");

var pressedKey = Console.ReadKey(true);
switch (pressedKey.KeyChar)
Expand Down Expand Up @@ -75,6 +76,9 @@ public static void Main(string[] args)
case 'e':
BenchmarkRunner.Run<MessageDeliveryBenchmark>();
break;
case 'f':
BenchmarkRunner.Run<AsyncLockBenchmark>();
break;
}

Console.ReadLine();
Expand Down
28 changes: 28 additions & 0 deletions Source/MQTTnet.TestApp/AsyncLockTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Internal;

namespace MQTTnet.TestApp
{
public sealed class AsyncLockTest
{
public async Task Run()
{
var asyncLock = new AsyncLock();

using (var cancellationToken = new CancellationTokenSource())
{
for (var i = 0; i < 100000; i++)
{
using (await asyncLock.WaitAsync(cancellationToken.Token).ConfigureAwait(false))
{
}
}
}
}
}
}
5 changes: 5 additions & 0 deletions Source/MQTTnet.TestApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public static void Main()
Console.WriteLine("c = Start QoS 0 benchmark");
Console.WriteLine("d = Start server with logging");
Console.WriteLine("e = Start Message Throughput Test");
Console.WriteLine("f = Start AsyncLock Test");

var pressedKey = Console.ReadKey(true);
if (pressedKey.KeyChar == '1')
Expand Down Expand Up @@ -88,6 +89,10 @@ public static void Main()
{
Task.Run(new MessageThroughputTest().Run);
}
else if (pressedKey.KeyChar == 'f')
{
Task.Run(new AsyncLockTest().Run);
}

Thread.Sleep(Timeout.Infinite);
}
Expand Down
141 changes: 95 additions & 46 deletions Source/MQTTnet.Tests/Internal/AsyncLock_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand All @@ -13,6 +14,80 @@ namespace MQTTnet.Tests.Internal
[TestClass]
public class AsyncLock_Tests
{
[TestMethod]
public void Lock_10_Parallel_Tasks()
{
const int ThreadsCount = 10;

var threads = new Task[ThreadsCount];
var @lock = new AsyncLock();
var globalI = 0;
for (var i = 0; i < ThreadsCount; i++)
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
threads[i] = Task.Run(
async () =>
{
using (await @lock.WaitAsync(CancellationToken.None))
{
var localI = globalI;
await Task.Delay(10); // Increase the chance for wrong data.
localI++;
globalI = localI;
}
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}

Task.WaitAll(threads);
Assert.AreEqual(ThreadsCount, globalI);
}

[TestMethod]
public void Lock_10_Parallel_Tasks_With_Dispose_Doesnt_Lockup()
{
const int ThreadsCount = 10;

var threads = new Task[ThreadsCount];
var @lock = new AsyncLock();
var globalI = 0;
for (var i = 0; i < ThreadsCount; i++)
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
threads[i] = Task.Run(
async () =>
{
using (await @lock.WaitAsync(CancellationToken.None))
{
var localI = globalI;
await Task.Delay(10); // Increase the chance for wrong data.
localI++;
globalI = localI;
}
})
.ContinueWith(
x =>
{
if (globalI == 5)
{
@lock.Dispose();
@lock = new AsyncLock();
}

if (x.Exception != null)
{
Debug.WriteLine(x.Exception.GetBaseException().GetType().Name);
}
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}

Task.WaitAll(threads);

// Expect only 6 because the others are failing due to disposal (if (globalI == 5)).
Assert.AreEqual(6, globalI);
}

[TestMethod]
public async Task Lock_Serial_Calls()
{
Expand Down Expand Up @@ -45,58 +120,32 @@ public async Task Test_Cancellation()
}
}

//[TestMethod]
//public async Task Test_Cancellation_With_Later_Access()
//{
// var @lock = new AsyncLock();

// var releaser = await @lock.WaitAsync().ConfigureAwait(false);

// try
// {
// using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)))
// {
// await @lock.WaitAsync(cts.Token).ConfigureAwait(false);
// }
// }
// catch (OperationCanceledException)
// {
// }

// releaser.Dispose();

// using (await @lock.WaitAsync().ConfigureAwait(false))
// {
// // When the method finished, the thread got access.
// }
//}

[TestMethod]
public void Lock_10_Parallel_Tasks()
public async Task Test_Cancellation_With_Later_Access()
{
const int ThreadsCount = 10;
var asyncLock = new AsyncLock();

var threads = new Task[ThreadsCount];
var @lock = new AsyncLock();
var globalI = 0;
for (var i = 0; i < ThreadsCount; i++)
var releaser = await asyncLock.WaitAsync(CancellationToken.None).ConfigureAwait(false);

try
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
threads[i] = Task.Run(async () =>
using (var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(3)))
{
using (var releaser = await @lock.WaitAsync(CancellationToken.None))
{
var localI = globalI;
await Task.Delay(10); // Increase the chance for wrong data.
localI++;
globalI = localI;
}
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
await asyncLock.WaitAsync(timeout.Token).ConfigureAwait(false);
}

Assert.Fail("Exception should be thrown!");
}
catch (OperationCanceledException)
{
}

Task.WaitAll(threads);
Assert.AreEqual(ThreadsCount, globalI);
releaser.Dispose();

using (await asyncLock.WaitAsync(CancellationToken.None).ConfigureAwait(false))
{
// When the method finished, the thread got access.
}
}
}
}
}
4 changes: 2 additions & 2 deletions Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ namespace MQTTnet.Implementations
public static class PlatformAbstractionLayer
{
#if NET452
public static Task CompletedTask => Task.FromResult(0);
public static Task CompletedTask { get; } = Task.FromResult(0);

public static byte[] EmptyByteArray { get; } = new byte[0];
#else
public static Task CompletedTask => Task.CompletedTask;
public static Task CompletedTask { get; } = Task.CompletedTask;

public static byte[] EmptyByteArray { get; } = Array.Empty<byte>();
#endif
Expand Down
Loading