Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncEnumerableEx.Merge doesn't behave the same in all cases #1284

Open
josh-endries opened this issue Sep 5, 2020 · 4 comments
Open

AsyncEnumerableEx.Merge doesn't behave the same in all cases #1284

josh-endries opened this issue Sep 5, 2020 · 4 comments

Comments

@josh-endries
Copy link

josh-endries commented Sep 5, 2020

I think I found a bug, or at least behavior that I wasn't expecting. I'm using AsyncEnumerableEx.Merge to try and parallelize a few IAsyncEnumerables. For testing, I'm using this method:

        static async IAsyncEnumerable<string> Foo(string prefix)
        {
            var rng = new Random();
            Console.WriteLine(prefix);
            await Task.Delay(rng.Next(1000, 5000));
            yield return $"{prefix} 1";
            await Task.Delay(rng.Next(1000, 5000));
            yield return $"{prefix} 2";
            await Task.Delay(rng.Next(1000, 5000));
            yield return $"{prefix} 3";
        }

When I run it with this:

return AsyncEnumerableEx.Merge(Foo("a"), Foo("b"), Foo("c"));

I get the expected output, ordered "a b c" console lines, followed by mixed prefix lines.

However, when I run it with this:

return AsyncEnumerableEx.Merge(new List<string> { "a", "b", "c" }.Select(x => Foo(x)));

It blocks, I get a and its prefix lines, then b and its prefix lines, and c then its prefix lines.

I would have expected, perhaps incorrectly, both forms of the method to work the same way.

<PackageReference Include="System.Interactive.Async" Version="4.1.1" />

Using ToList after the Select doesn't seem to work either, but ToArray does.

@quinmars
Copy link
Contributor

quinmars commented Sep 6, 2020

This seems to be a known issue, but I do not know which route @bartdesmet is going to take. See a comment in the source code:

            //
            // REVIEW: This implementation does not exploit concurrency. We should not introduce such behavior in order to
            //         avoid breaking changes, but we could introduce a parallel ConcurrentMerge implementation. It is
            //         unfortunate though that the Merge overload accepting an array has always been concurrent, so we can't
            //         change that either (in order to have consistency where Merge is non-concurrent, and ConcurrentMerge
            //         is). We could consider a breaking change to Ix Async to streamline this, but we should do so when
            //         shipping with the BCL interfaces (which is already a breaking change to existing Ix Async users). If
            //         we go that route, we can either have:
            //
            //         - All overloads of Merge are concurrent
            //           - and continue to be named Merge, or,
            //           - are renamed to ConcurrentMerge for clarity (likely alongside a ConcurrentZip).
            //         - All overloads of Merge are non-concurrent
            //           - and are simply SelectMany operator macros (maybe more optimized)
            //         - Have ConcurrentMerge next to Merge overloads
            //           - where ConcurrentMerge may need a degree of concurrency parameter (and maybe other options), and,
            //           - where the overload set of both families may be asymmetric

@bartdesmet
Copy link
Collaborator

Given that Merge always had the inconsistent behavior, we cannot really change that without breaking people's code. Even for a major version update, I'd consider that worrisome.

My current take on this issue is to have ConcurrentMerge (and things like ConcurrentZip) alongside the existing Merge. We could also have an XyzMerge (with the prefix TBD) that is explicitly non-concurrent for all overloads. Or just ask people to use SelectMany for that.

database64128 added a commit to database64128/shadowsocks-uri-generator that referenced this issue Apr 21, 2021
- New dependencies: System.Linq.Async, System.Interactive.Async
- This commit includes a custom wrapper `ConcurrentMerge` around `AsyncEnumerableEx.Merge<TSource>(params IAsyncEnumerable<TSource>[] sources)`, because other variants of the method break concurrency. See dotnet/reactive#1284.
@Ilchert
Copy link

Ilchert commented Jun 15, 2021

I added a simpler repro of this problem.
Actual output is

1-start
1-end
2-start
2-end

instead of expected

1-start
2-start
1-end
2-end
public static async Task Main()
{    
    var e = new[] { Get1(), Get2() };
    await e.Merge().ForEachAwaitAsync(_ => Task.CompletedTask);
}
public static async IAsyncEnumerable<int> Get1()
{
    Console.WriteLine("1-start");
    await Task.Delay(1000);
    Console.WriteLine("1-end");
    yield return 1;
}
public static async IAsyncEnumerable<int> Get2()
{
    Console.WriteLine("2-start");
    await Task.Delay(1000);
    Console.WriteLine("2-end");
    yield return 2;
}

Also I cant find tests for Merge :(.

@Arithmomaniac
Copy link

#1254 is essentially the same issue as this one.

Personally, I think the non-concurrent Merge overloads should get the Obsolete attribute applied to them, directing people to use the concurrent version with an array or to use SelectMany.

In a future version, they can then be removed entirely; leaving them in is a footgun that helps no one when non-concurrent merge via SelectMany is so trivial.

If you want the actually ConcurrentMerge functionality, you can try SuperLinq, which added it recently: viceroypenguin/SuperLinq#35

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants