Skip to content

Commit

Permalink
[wasm-mt] A version of LowLevelLifoSemaphore that uses callbacks on t…
Browse files Browse the repository at this point in the history
…he browser (#84491)

This is part of #84489 - landing support for async JS interop on threadpool threads in multi-threaded WebAssembly.

This PR adds two things:

1. A new unmanaged `LifoSemaphoreAsyncWait` semaphore that can use Emscripten's ability to push C calls from one thread to another in order to implement a callback-based semaphore - when a thread wants to wait, it sets up a success callback and a timeout callback, and then can return to the JS event loop.  When the semaphore is released, Emscripten will trigger the callback to run on the waiting thread.  If the wait times out, the timeout callback will run.
2. A new managed `LowLevelLifoAsyncWaitSemaphore` that doesn't have the normal `Wait()` function, and instead needs to use the callback-based `PrepareAsyncWait()` function.  Also refactored `LowLevelLifoSemaphore` to pull out a common `LowLevelLifoSemaphoreBase` class to share with the async wait version.

* [wasm-mt][mono] Add new LifoSemaphoreAsyncWait C primitive

Add a new kind of LifoSemaphore that has a callback-based wait
function, instead of a blocking wait using Emscripten's ability to
send work from one webworker to another in C.

This will allow us to wait for a semaphore from the JS event loop in a
web worker.

* [wasm-mt][mono] split LowLevelLifoSemaphore into two kinds

A normal LowLevelLifoSemaphore that can do a synchronous wait
and another that can do a callback-based wait from the JS event loop

* Add LowLevelLifoSemaphoreBase

Move the counts to the base class

Move Release to the base class, make ReleaseCore abstract

* make a new LowLevelLifoAsyncWaitSemaphore for wasm-mt

* Revert unintentional package-lock.json changes

* fix possible null dereference

* use a separate icall for async wait InitInternal

instead of magic constants that are otherwise not needed in managed

* remove dead code; fixup comments

* LowLevelLifoSemaphore: decrement timeoutMs if we lost InterlockedCompareExchange

When a thread wakes after waiting for a semaphore to be released, if
it raced with another thread that is also trying to update the
semaphore counts and loses, it has to go back to waiting again.

In that case, decrement the remaining timeout by the elapsed wait time
so that the next wait is shorter.

* better timeout decrement code

* move timeoutMs == 0 check to PrepareAsyncWaitCore

make PrepareAsyncWaitCore static and remove a redundant argument
  • Loading branch information
lambdageek committed Apr 21, 2023
1 parent f107b4b commit dcb34de
Show file tree
Hide file tree
Showing 14 changed files with 878 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace System.Threading
/// <summary>
/// A LIFO semaphore implemented using the PAL's semaphore with uninterruptible waits.
/// </summary>
internal sealed partial class LowLevelLifoSemaphore : IDisposable
internal sealed partial class LowLevelLifoSemaphore : LowLevelLifoSemaphoreBase, IDisposable
{
private Semaphore? _semaphore;

Expand All @@ -34,7 +34,7 @@ public bool WaitCore(int timeoutMs)
[LibraryImport(RuntimeHelpers.QCall, EntryPoint = "WaitHandle_CorWaitOnePrioritizedNative")]
private static partial int WaitNative(SafeWaitHandle handle, int timeoutMs);

public void ReleaseCore(int count)
protected override void ReleaseCore(int count)
{
Debug.Assert(_semaphore != null);
Debug.Assert(count > 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace System.Threading
/// A LIFO semaphore.
/// Waits on this semaphore are uninterruptible.
/// </summary>
internal sealed partial class LowLevelLifoSemaphore : IDisposable
internal sealed partial class LowLevelLifoSemaphore : LowLevelLifoSemaphoreBase, IDisposable
{
private WaitSubsystem.WaitableObject _semaphore;

Expand All @@ -27,7 +27,7 @@ private bool WaitCore(int timeoutMs)
return WaitSubsystem.Wait(_semaphore, timeoutMs, false, true) == WaitHandle.WaitSuccess;
}

private void ReleaseCore(int count)
protected override void ReleaseCore(int count)
{
WaitSubsystem.ReleaseSemaphore(_semaphore, count);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2531,6 +2531,7 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Windows.cs" Condition="'$(TargetsWindows)' == 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\LowLevelLifoSemaphore.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\LowLevelLifoSemaphore.Windows.cs" Condition="'$(TargetsWindows)' == 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\LowLevelLifoSemaphoreBase.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PreAllocatedOverlapped.cs" Condition="('$(TargetsBrowser)' != 'true' and '$(TargetsWasi)' != 'true') or '$(FeatureWasmThreads)' == 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\RegisteredWaitHandle.Portable.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadPoolBoundHandle.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public bool WaitCore(int timeoutMs)
return success;
}

public void ReleaseCore(int count)
protected override void ReleaseCore(int count)
{
Debug.Assert(count > 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,13 @@ namespace System.Threading
/// A LIFO semaphore.
/// Waits on this semaphore are uninterruptible.
/// </summary>
internal sealed partial class LowLevelLifoSemaphore : IDisposable
internal sealed partial class LowLevelLifoSemaphore : LowLevelLifoSemaphoreBase, IDisposable
{
private CacheLineSeparatedCounts _separated;

private readonly int _maximumSignalCount;
private readonly int _spinCount;
private readonly Action _onWait;

private const int SpinSleep0Threshold = 10;

public LowLevelLifoSemaphore(int initialSignalCount, int maximumSignalCount, int spinCount, Action onWait)
: base (initialSignalCount, maximumSignalCount, spinCount, onWait)
{
Debug.Assert(initialSignalCount >= 0);
Debug.Assert(initialSignalCount <= maximumSignalCount);
Debug.Assert(maximumSignalCount > 0);
Debug.Assert(spinCount >= 0);

_separated = default;
_separated._counts.SignalCount = (uint)initialSignalCount;
_maximumSignalCount = maximumSignalCount;
_spinCount = spinCount;
_onWait = onWait;

Create(maximumSignalCount);
}

Expand Down Expand Up @@ -144,55 +128,6 @@ public bool Wait(int timeoutMs, bool spinWait)
}
}

public void Release(int releaseCount)
{
Debug.Assert(releaseCount > 0);
Debug.Assert(releaseCount <= _maximumSignalCount);

int countOfWaitersToWake;
Counts counts = _separated._counts;
while (true)
{
Counts newCounts = counts;

// Increase the signal count. The addition doesn't overflow because of the limit on the max signal count in constructor.
newCounts.AddSignalCount((uint)releaseCount);

// Determine how many waiters to wake, taking into account how many spinners and waiters there are and how many waiters
// have previously been signaled to wake but have not yet woken
countOfWaitersToWake =
(int)Math.Min(newCounts.SignalCount, (uint)counts.WaiterCount + counts.SpinnerCount) -
counts.SpinnerCount -
counts.CountOfWaitersSignaledToWake;
if (countOfWaitersToWake > 0)
{
// Ideally, limiting to a maximum of releaseCount would not be necessary and could be an assert instead, but since
// WaitForSignal() does not have enough information to tell whether a woken thread was signaled, and due to the cap
// below, it's possible for countOfWaitersSignaledToWake to be less than the number of threads that have actually
// been signaled to wake.
if (countOfWaitersToWake > releaseCount)
{
countOfWaitersToWake = releaseCount;
}

// Cap countOfWaitersSignaledToWake to its max value. It's ok to ignore some woken threads in this count, it just
// means some more threads will be woken next time. Typically, it won't reach the max anyway.
newCounts.AddUpToMaxCountOfWaitersSignaledToWake((uint)countOfWaitersToWake);
}

Counts countsBeforeUpdate = _separated._counts.InterlockedCompareExchange(newCounts, counts);
if (countsBeforeUpdate == counts)
{
Debug.Assert(releaseCount <= _maximumSignalCount - counts.SignalCount);
if (countOfWaitersToWake > 0)
ReleaseCore(countOfWaitersToWake);
return;
}

counts = countsBeforeUpdate;
}
}

private bool WaitForSignal(int timeoutMs)
{
Debug.Assert(timeoutMs > 0 || timeoutMs == -1);
Expand All @@ -201,13 +136,15 @@ private bool WaitForSignal(int timeoutMs)

while (true)
{
if (!WaitCore(timeoutMs))
int startWaitTicks = timeoutMs != -1 ? Environment.TickCount : 0;
if (timeoutMs == 0 || !WaitCore(timeoutMs))
{
// Unregister the waiter. The wait subsystem used above guarantees that a thread that wakes due to a timeout does
// not observe a signal to the object being waited upon.
_separated._counts.InterlockedDecrementWaiterCount();
return false;
}
int endWaitTicks = timeoutMs != -1 ? Environment.TickCount : 0;

// Unregister the waiter if this thread will not be waiting anymore, and try to acquire the semaphore
Counts counts = _separated._counts;
Expand Down Expand Up @@ -238,132 +175,15 @@ private bool WaitForSignal(int timeoutMs)
}

counts = countsBeforeUpdate;
if (timeoutMs != -1) {
int waitMs = endWaitTicks - startWaitTicks;
if (waitMs >= 0 && waitMs < timeoutMs)
timeoutMs -= waitMs;
else
timeoutMs = 0;
}
}
}
}

private struct Counts : IEquatable<Counts>
{
private const byte SignalCountShift = 0;
private const byte WaiterCountShift = 32;
private const byte SpinnerCountShift = 48;
private const byte CountOfWaitersSignaledToWakeShift = 56;

private ulong _data;

private Counts(ulong data) => _data = data;

private uint GetUInt32Value(byte shift) => (uint)(_data >> shift);
private void SetUInt32Value(uint value, byte shift) =>
_data = (_data & ~((ulong)uint.MaxValue << shift)) | ((ulong)value << shift);
private ushort GetUInt16Value(byte shift) => (ushort)(_data >> shift);
private void SetUInt16Value(ushort value, byte shift) =>
_data = (_data & ~((ulong)ushort.MaxValue << shift)) | ((ulong)value << shift);
private byte GetByteValue(byte shift) => (byte)(_data >> shift);
private void SetByteValue(byte value, byte shift) =>
_data = (_data & ~((ulong)byte.MaxValue << shift)) | ((ulong)value << shift);

public uint SignalCount
{
get => GetUInt32Value(SignalCountShift);
set => SetUInt32Value(value, SignalCountShift);
}

public void AddSignalCount(uint value)
{
Debug.Assert(value <= uint.MaxValue - SignalCount);
_data += (ulong)value << SignalCountShift;
}

public void IncrementSignalCount() => AddSignalCount(1);

public void DecrementSignalCount()
{
Debug.Assert(SignalCount != 0);
_data -= (ulong)1 << SignalCountShift;
}

public ushort WaiterCount
{
get => GetUInt16Value(WaiterCountShift);
set => SetUInt16Value(value, WaiterCountShift);
}

public void IncrementWaiterCount()
{
Debug.Assert(WaiterCount < ushort.MaxValue);
_data += (ulong)1 << WaiterCountShift;
}

public void DecrementWaiterCount()
{
Debug.Assert(WaiterCount != 0);
_data -= (ulong)1 << WaiterCountShift;
}

public void InterlockedDecrementWaiterCount()
{
var countsAfterUpdate = new Counts(Interlocked.Add(ref _data, unchecked((ulong)-1) << WaiterCountShift));
Debug.Assert(countsAfterUpdate.WaiterCount != ushort.MaxValue); // underflow check
}

public byte SpinnerCount
{
get => GetByteValue(SpinnerCountShift);
set => SetByteValue(value, SpinnerCountShift);
}

public void IncrementSpinnerCount()
{
Debug.Assert(SpinnerCount < byte.MaxValue);
_data += (ulong)1 << SpinnerCountShift;
}

public void DecrementSpinnerCount()
{
Debug.Assert(SpinnerCount != 0);
_data -= (ulong)1 << SpinnerCountShift;
}

public byte CountOfWaitersSignaledToWake
{
get => GetByteValue(CountOfWaitersSignaledToWakeShift);
set => SetByteValue(value, CountOfWaitersSignaledToWakeShift);
}

public void AddUpToMaxCountOfWaitersSignaledToWake(uint value)
{
uint availableCount = (uint)(byte.MaxValue - CountOfWaitersSignaledToWake);
if (value > availableCount)
{
value = availableCount;
}
_data += (ulong)value << CountOfWaitersSignaledToWakeShift;
}

public void DecrementCountOfWaitersSignaledToWake()
{
Debug.Assert(CountOfWaitersSignaledToWake != 0);
_data -= (ulong)1 << CountOfWaitersSignaledToWakeShift;
}

public Counts InterlockedCompareExchange(Counts newCounts, Counts oldCounts) =>
new Counts(Interlocked.CompareExchange(ref _data, newCounts._data, oldCounts._data));

public static bool operator ==(Counts lhs, Counts rhs) => lhs.Equals(rhs);
public static bool operator !=(Counts lhs, Counts rhs) => !lhs.Equals(rhs);

public override bool Equals([NotNullWhen(true)] object? obj) => obj is Counts other && Equals(other);
public bool Equals(Counts other) => _data == other._data;
public override int GetHashCode() => (int)_data + (int)(_data >> 32);
}

[StructLayout(LayoutKind.Sequential)]
private struct CacheLineSeparatedCounts
{
private readonly Internal.PaddingFor32 _pad1;
public Counts _counts;
private readonly Internal.PaddingFor32 _pad2;
}
}
}

0 comments on commit dcb34de

Please sign in to comment.