Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

yield.ReturnAsync inside ParallelForEachAsync #45

Open
galmok opened this issue Jun 26, 2019 · 5 comments
Open

yield.ReturnAsync inside ParallelForEachAsync #45

galmok opened this issue Jun 26, 2019 · 5 comments

Comments

@galmok
Copy link

galmok commented Jun 26, 2019

We have trouble using ParallelForEachAsync to start a number of parallel running async webrequests (actually they are Azure Blob Storage retrievals) and have the result being returned using yield.ReturnAsync.

The problem is that before the first result is returned, multiple parallel webrequests have been started and also completed, yet only the last of the results are returned to the consumer (that iterates using ForEachAsync).

The producer:

    public static IAsyncEnumerable<(MemoryStream, string)> Stream(this IEnumerable<string> blobNames,
        IBlobManager blobManager,
        CloudBlobContainer container, int maxDegreeOfParallelism = 5)
    {
        return new AsyncEnumerable<(MemoryStream, string)>(async yield =>
        {
            var cancellationToken1 = yield.CancellationToken;

            await blobNames.ParallelForEachAsync(async blobName =>
            {
                Console.WriteLine($"Trying to download blob: {blobName}");

               //TODO: Try-catch, what happens if one fail?
                var memoryStream = await blobManager
                    .DownloadBlobAsStreamAsync(container, blobName, cancellationToken1)
                    .ConfigureAwait(false);

                // Return immediately instead of waiting for all the blobs to complete
                await yield.ReturnAsync((memoryStream, blobName)).ConfigureAwait(false);
            }, maxDegreeOfParallelism, cancellationToken1).ConfigureAwait(false);
        });
    }

The consumer:

        var blobNames = MyFactory.BuildBlobNames(from, to);

        var asyncEnumerable = blobNames.Stream(BlobManager, Container, 4);

        // Assert
        var concurrentList = new ConcurrentBag<string>();
        await asyncEnumerable.ForEachAsync(async tuple =>
        {
            using (var ms = tuple.Item1)
            {
                var decoded = Encoding.UTF8.GetString(ms.ToArray());
                //TODO: Convert to text to assert the content
                concurrentList.Add(decoded);
                Console.WriteLine($"Blob: {tuple.Item2}. Content: {decoded}");
            }
        }, cancellationToken).ConfigureAwait(false);

What did we do wrong?

@felipecruz91
Copy link

I've exactly the same issue...

@kind-serge
Copy link
Member

Good try, but this is simply an unsupported scenario with concurrent producers.
Imagine a synchronous version of the producer side:

IEnumerable<...> Stream(...)
{
    Parallel.ForEach(... =>
    {
        yield return ...; // won't compile
    }

    yield return ...; // will compile
}

The yield keyword cannot be used inside the lambda function passed to the Parallel.ForEach method, because it is out of the scope of the synchronous enumerator method.

Since this library mimics a C# language feature, it cannot impose similar restrictions on using the variable yield (it's not a keyword in the async version).

IAsyncEnumerable<...> Stream(...)
{
    return new AsyncEnumerable<...>(async yield => // 'yield' is a variable
    {
        await ....ParallelForEachAsync(async ...=>
        {
             // 'yield' gets captured in the closure, that's why you can use it, but must not
            await yield.ReturnAsync(...);
         }, ...);
    });
}

If you try to use C# 3.0 and async streams, it won't work either. So this is something you have to be aware of.

Concurrent producer/consumer pattern with limiters is a slightly more complex problem, and is not available as an extension method in this library. I posted a possible solution on StackOverflow.

One thing I can recommend is to swap methods in your case - the producer uses ForEachAsync and the consumer uses ParallelForEachAsync if it works for you.

@kind-serge
Copy link
Member

P.S. this is a duplicate of #44

@galmok
Copy link
Author

galmok commented Jun 26, 2019 via email

@kind-serge
Copy link
Member

The use of SemaphoreSlim will definitely solve the problem, however, that will impact the performance of enumerations in regular cases as every iteration has to consult a synchronization primitive. How would solve that problem?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants