Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataflowLinkOptions.MaxMessages is ignored for a link to encapsulated block #35751

Open
azyobuzin opened this issue May 2, 2020 · 2 comments

Comments

@azyobuzin
Copy link

Area
System.Threading.Tasks.Dataflow
Version
.NET Core SDK 5.0.100-preview.3.20216.6

Summary

DataflowLinkOptions.MaxMessages is ignored when I make a link from a source block to a propagator block created by DataflowBlock.Encapsulate method. I could not find out this behavior in the documentation.

Repro

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    static void Main(string[] args)
    {
        var (propagator, actualTargetBlock) = CreatePropagator();

        var outputBlock = new ActionBlock<int>(
            async i =>
            {
                Console.WriteLine(i);
                await Task.Delay(500); // Delay to postpone offer
            },
            new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });

        propagator.LinkTo(outputBlock);

        // sourceBlock emits 1, 2, 3, 4, 5
        var sourceBlock = new BufferBlock<int>();
        for (var i = 1; i <= 5; i++) sourceBlock.Post(i);

        sourceBlock.LinkTo(propagator, new DataflowLinkOptions() { MaxMessages = 2 });

        // Here works as expected:
        // sourceBlock.LinkTo(actualTargetBlock, new DataflowLinkOptions() { MaxMessages = 2 });

        Thread.Sleep(5000);
    }

    static (IPropagatorBlock<int, int>, ITargetBlock<int>) CreatePropagator()
    {
        // Set 1 to BoundedCapacity to postpone offers
        var bufferBlock = new BufferBlock<int>(new DataflowBlockOptions() { BoundedCapacity = 1 });
        var doNothingTransformBlock = new TransformBlock<int, int>(x => x, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
        bufferBlock.LinkTo(doNothingTransformBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        var propagator = DataflowBlock.Encapsulate(bufferBlock, doNothingTransformBlock);
        return (propagator, bufferBlock);
    }
}

Expected output:

1
2

Actual output:

1
2
3
4
5
@Dotnet-GitSync-Bot Dotnet-GitSync-Bot added area-System.Threading.Tasks untriaged New issue has not been triaged by the area owner labels May 2, 2020
@ghost
Copy link

ghost commented May 2, 2020

Tagging subscribers to this area: @tarekgh
Notify danmosemsft if you want to be subscribed.

@stephentoub stephentoub added bug and removed untriaged New issue has not been triaged by the area owner labels May 2, 2020
@stephentoub
Copy link
Member

Thanks for reporting.

I took a look at the repro, and the problem is specific to the case where the encapsulated target of the block postpones messages (this is happening in the repro because the targets are bounded, and there's enough back pressure to cause the target buffer block to postpone rather than immediately accept). When the target block then has space free up and it goes back to the original source to ask for the postponed message, the accounting logic sees it as coming from the encapsulatee rather than the encapsulator and doesn't apply the intended max messages limit.

@tarekgh tarekgh added this to the 5.0 milestone May 2, 2020
@stephentoub stephentoub modified the milestones: 5.0, Future Jun 7, 2020
@joperezr joperezr moved this from Needs triage to Future in Triage POD for Meta, Reflection, etc Nov 24, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
No open projects
Development

No branches or pull requests

4 participants