Skip to content

Commit

Permalink
tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
hadashiA committed Sep 8, 2023
1 parent 91667a3 commit fa65a01
Showing 1 changed file with 21 additions and 25 deletions.
46 changes: 21 additions & 25 deletions src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Merge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ sealed class _Merge : MoveNextSource, IUniTaskAsyncEnumerator<T>
readonly Queue<(T, Exception)> resultQueue = new Queue<(T, Exception)>();
readonly CancellationToken cancellationToken;

bool allCompleted;

public T Current { get; private set; }

public _Merge(IUniTaskAsyncEnumerable<T>[] sources, CancellationToken cancellationToken)
Expand Down Expand Up @@ -100,7 +98,7 @@ public UniTask<bool> MoveNextAsync()
else
{
Current = queuedValue;
completionSource.TrySetResult(Volatile.Read(ref allCompleted));
completionSource.TrySetResult(!IsCompletedAll());
}
return new UniTask<bool>(this, completionSource.Version);
}
Expand Down Expand Up @@ -155,39 +153,22 @@ void GetResultAt(int index, UniTask<bool>.Awaiter awaiter)
lock (states)
{
states[index] = hasNext ? MergeSourceState.Pending : MergeSourceState.Completed;

// Check to complete
if (!hasNext)
{
allCompleted = true;
for (var i = 0; i < length; i++)
{
if (states[i] != MergeSourceState.Completed)
{
allCompleted = false;
break;
}
}
}
}
}
catch (Exception ex)
{
if (completionSource.GetStatus(completionSource.Version).IsCompleted())
if (!completionSource.TrySetException(ex))
{
lock (resultQueue)
{
resultQueue.Enqueue((default, ex));
}
}
else
{
completionSource.TrySetException(ex);
}
return;
}

if (hasNext || Volatile.Read(ref allCompleted))
var completed = IsCompletedAll();
if (hasNext || completed)
{
if (completionSource.GetStatus(completionSource.Version).IsCompleted())
{
Expand All @@ -199,7 +180,7 @@ void GetResultAt(int index, UniTask<bool>.Awaiter awaiter)
else
{
Current = enumerators[index].Current;
completionSource.TrySetResult(!Volatile.Read(ref allCompleted));
completionSource.TrySetResult(!completed);
}
}
}
Expand All @@ -220,6 +201,21 @@ bool TryDequeue(out T value, out Exception ex)
ex = default;
return false;
}
}

bool IsCompletedAll()
{
lock (states)
{
for (var i = 0; i < length; i++)
{
if (states[i] != MergeSourceState.Completed)
{
return false;
}
}
return true;
}
}
}
}
}

0 comments on commit fa65a01

Please sign in to comment.