Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce filter_predicate to tame event bus traffic. #34

Closed

Conversation

cburgdorf
Copy link
Contributor

@cburgdorf cburgdorf commented Mar 8, 2019

What was wrong?

Obviously, the Endpoint provides APIs to subscribe to only specific events (subscribe, stream, wait_for, request). However, using these APIs, we do only filter events within our current Endpoint to only receive specific events at specific callsites.

But that doesn't mean these are the only events that are pushed into our Endpoint. Which events are pushed into which endpoints is controlled by the Endpoint that is broadcasting the event (Assuming we are listening to that Endpoint at all).

The logic for that is rather simpe so far:

  1. If an event is only intended for a specific Endpoint, only push it into that Endpoint

  2. Otherwise, push it into every listening Endpoint.

The producer of these events that are broadcasted over all listening endpoints does and should not care about who needs which events. However, this can become a bottleneck.

Let's use Trinity as an example. Once we start broadcasting all peer messages from the PeerPool across the listening endpoints, pushing every new block, receipt, transaction etc. into every listening Endpoint is very wasteful.

We may be tempted to say, "well, if a process isn't interested in these events, why listen to the PeerPool events at all"? Because, we may be interested in some of the events. E.g. the EthStats plugin periodically wants to now the current peer count but it does not need all the constant chatter of connected peers.

To put out some numbers, imagine sending something to 10 processes.

                                                                    +++Globals+++                                                                     
|Consumer processes |   Total time   | Total aggegated time  | Propagated events  |Received events | Propagated EPS |  Received EPS  |
|        10         |    1.49226     |        7.68962        |        100         |      1000      |     67.013     |    670.126     |
                                                                +++Process Details+++                                                                 
|      Process      |Processed Events|   First sent   | Last received  |    Fastest     |    Slowest     |      AVG       | Total duration | Total aggregated time |
|    consumer_0     |      100       |  12:32:36.439  |  12:32:37.806  |    0.00115     |    0.00282     |    0.00151     |    1.36743     |        0.15127        |
|    consumer_1     |      100       |  12:32:36.439  |  12:32:37.808  |    0.00225     |    0.00521     |    0.00286     |    1.36916     |        0.28572        |
|    consumer_2     |      100       |  12:32:36.439  |  12:32:37.810  |    0.00362     |    0.00745     |    0.00424     |    1.37069     |        0.42381        |
|    consumer_3     |      100       |  12:32:36.439  |  12:32:37.812  |    0.00479     |    0.00968     |    0.00557     |    1.37288     |        0.55673        |
|    consumer_4     |      100       |  12:32:36.439  |  12:32:37.814  |    0.00586     |    0.01194     |    0.00690     |    1.37482     |        0.69032        |
|    consumer_5     |      100       |  12:32:36.439  |  12:32:37.817  |    0.00709     |    0.01427     |    0.00837     |    1.37840     |        0.83749        |
|    consumer_7     |      100       |  12:32:36.439  |  12:32:37.821  |    0.00923     |    0.01922     |    0.01116     |    1.38228     |        1.11622        |
|    consumer_6     |      100       |  12:32:36.439  |  12:32:37.819  |    0.00814     |    0.01677     |    0.00977     |    1.37999     |        0.97717        |
|    consumer_8     |      100       |  12:32:36.439  |  12:32:37.823  |    0.01021     |    0.02218     |    0.01255     |    1.38440     |        1.25522        |
|    consumer_9     |      100       |  12:32:36.439  |  12:32:37.825  |    0.01154     |    0.02634     |    0.01396     |    1.38599     |        1.39567        |

When actually only one of them needs it:

$ python scripts/perf_benchmark.py --num-processes 1 --num-events 100 --payload-bytes 1000000
                                                                    +++Globals+++                                                                     
|Consumer processes |   Total time   | Total aggegated time  | Propagated events  |Received events | Propagated EPS |  Received EPS  |
|         1         |    0.28560     |        0.23907        |        100         |      100       |    350.145     |    350.145     |
                                                                +++Process Details+++                                                                 
|      Process      |Processed Events|   First sent   | Last received  |    Fastest     |    Slowest     |      AVG       | Total duration | Total aggregated time |
|    consumer_0     |      100       |  12:33:42.726  |  12:33:42.944  |    0.00124     |    0.00631     |    0.00239     |    0.21890     |        0.23907        |

How was it fixed?

With this PR, when we start listening to another Endpoint, we can provide a filter_predicate. It's a bit like subscribe or stream but on a macro level. It let's us shield from any events that we do not want to even reach our Endpoint because we know that no callsite is interested in it at all.

The filter_predicate is a Callable[[BaseEvent], bool] that is expected to return True to let an event go through or return False to shield from an event.

The exact rules are as follows:

  1. No filter_predicate -> not shielding from anything
  2. Events specifically send to us via custom BroadcastConfig -> not shielded
  3. Events broadcasted across all listening Endpoints -> only send to us if filter_predicate(event) returns True.

Cute Animal Picture

put a cute animal picture link inside the parentheses

@cburgdorf cburgdorf force-pushed the christoph/perf/filter-predicate branch from fe6bf88 to d61c088 Compare March 8, 2019 14:31
Copy link
Member

@pipermerriam pipermerriam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect you're still working on this but just in case it isn't on the TODO list, a code example in the docs showing how to use this new API would be valuable.


if config is None or config.filter_endpoint is None:
# Check if a `filter_predicate` applies
if listening_endpoint.config.filter_predicate is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up-front I want to say i'm not sure this suggestion is actually better but I want to toss it out and see if it sticks.

This value checking seems.... ?unfortunate?. Lots of logic branches that might be reducable if the None version of at least one of these was able to be captured via the same API as when it is present. I.E. maybe there should always be a filter_predicate that unconditionally returns True.

Similarly with the config, could there be a default empty config which gets attached by default which exhibits the same behavior as if it was left out?

This article is topical to this. https://blog.conjur.org/special-cases-are-a-code-smell/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. In fact, a lot of this is based on a misconception that I had around Pythons default class attribute handling that scared me. I thought that the following was a footgun in Python

class Foo:
    y = ''

Similar to

class Foo:
    y = []

But while the latter is indeed a footgun, the former isn't because strings are immutable in Python. Based on this wrong assumption I used a lot of Optiona[str] what could have been just str with strings preassigned to ''.

Will clean it up.

@cburgdorf cburgdorf force-pushed the christoph/perf/filter-predicate branch from d61c088 to 7e3c90d Compare March 11, 2019 16:14
@cburgdorf
Copy link
Contributor Author

@pipermerriam This is now ready for review. Notice that I also renamed all the connect_to_endpoints* APIs to add_listener_endpoints* because I think that does a better job of describing what these APIs do and also rhymes well with ListenerConfig.

I find connect_to_endpoints* a bit vague as it doesn't make it clear what kind of connection we are talking about. These APIs really add an Endpoint as a listener of another Endpoint. I also added an example that demonstrates how to use the filter_predicate.

Trinity PR that uses this will follow shortly.

@lithp
Copy link
Contributor

lithp commented Mar 16, 2019

I spent some time skimming over this and I wonder if there's a different architecture which might work better.

It took me a while to understand what was happening: add_listener_endpoints accepts a ListenerConfig. This makes it sound like this Endpoint will listen for events which come from those endpoints and somehow tell them to filter, but really this endpoint will push messages to the ListenerEndpoints and do the filtering on its own side.

I think this design has a few drawbacks:

  • it's not entirely intuitive. If I hear that lahja allows filter_predicate I'll assume an API which lets me connect to an Endpoint and only receive selected messages from it.
  • lahja is now more tightly tied to trinity, anyone who wants to use this feature will have to either know ahead of time which events every listener will want, or implement their own versions of AvailableEndpointsUpdated and EventBusConnected.
  • because the filtering is setup at connect time it remains static. This binds trinity's hands, it has to load all the plugins and ask for their filters before setting up the Endpoint. There's no way for it to create the endpoint and then change the filter_predicate as it loads plugins.
  • There's also no way to implement logic such as: "Now I care about PeerCandidates." or "My PeerPool is full, I no longer care about PeerCandidates"

Since this is a performance optimization, maybe the interface doesn't have to change at all!

The Endpoint knows exactly which event_types it's expecting, the intersection of the keys of self._queues and self._handlers. What if Endpoints kept each other informed about changes to the set of event_types they're expecting and only pushed events which matched the expected set of the listener? Keeping each other informed could be accomplished either by sending special messages or by using methods/attributes of the proxy objects. I think this would take about an equal amount of code, have about the same performance, but have the same user interface as the current one and also support dynamically reconfigure the filter_predicate!

@cburgdorf
Copy link
Contributor Author

cburgdorf commented Mar 18, 2019

Thank you for the review Brian. Let me dig in!

If I hear that lahja allows filter_predicate I'll assume an API which lets me connect to an Endpoint and only receive selected messages from it.

Wait. But this is what it does no?

producer.add_listener_endpoints(
            ListenerConfig.from_name(
                'receiver',
                filter_predicate=lambda ev: not isinstance(ev, BadEvent),
            )
        )

What happens here is that the receive Endpoint is starting to listen for events from producer, except that all events are filtered by the filter_predicate and this filtering happens on the producer side.

anyone who wants to use this feature will have to either know ahead of time which events every listener will want, or implement their own versions of

In Trinity, every (BaseIsolated)Plugin is a listening for events from all over endpoints. The listener is the one who gets to decide about its own filter_predicate method and it can do so by overwriting should_receive_broadcast.

Here's an example that I'm currently using to work on the sync.

https://github.com/cburgdorf/trinity/blob/c84f49fa6ca1c04e13af0f211e2194de9594fcd9/trinity/plugins/builtin/tmp_sync/plugin.py#L62-L70

    @staticmethod
    def should_receive_broadcast(event: BaseEvent) -> bool:

        return any((
            isinstance(event, PeerJoinedEvent),
            isinstance(event, PeerLeftEvent),
            isinstance(event, PeerPoolMessageEvent) and isinstance(event.cmd, NewBlock),
            BaseIsolatedPlugin.should_receive_broadcast(event),
        ))

It is true that the listener needs to come up with the filter_predicate at design time and can not dynamically decide about which events it wants to receive or not. I will add though that allowing to update the filter_predicate at a later time would be a relative quick and easy addition to this PR.

There's also no way to implement logic such as: "Now I care about PeerCandidates." or "My PeerPool is full, I no longer care about PeerCandidates"

Yes, that is true but I would argue that these things can happen on a micro level using the subscribe, stream and wait_for APIs. Of course that doesn't shut down the actual traffic between the endpoints but I'm not sure if that's really needed on that level of granularity.

What if Endpoints kept each other informed about changes to the set of event_types they're expecting and only pushed events which matched the expected set of the listener? Keeping each other informed could be accomplished either by sending special messages or by using methods/attributes of the proxy objects.

While I like the idea in general I'm hesitant because I think it is much harder and might be a case of YAGNI. Here are some thoughts:

  1. Lahja doesn't dictate you the topology of the mesh. Let's assume we have endpoints A, B, C and A wants to receive from B and C (in other words it did B.add_listener_endpoints(a) and C.add_listener_endpoints(a)). B and C however aren't interested in updates from A at all. With that topology, there is no implicit communication channel for A to send information updates to B and C other than the initial call to add_listening_endpoints(...).

I'm not saying this is something that can't be solved but given the fact that one is free how to setup the topology, automatically broadcasting event subscriptions between the right endpoints may not be as simple as it looks on first sight.

  1. The filter_predicate allows any kind of custom logic to apply the filtering. For instance, let's assume the peer pool sends PeerPoolMessageEvent types that have a cmd property that can be something like NewBlock, Transactions etc. This mimics what the PeerPool in Trinity currently does.

The filter_predicate allows us to drill into the cmd property and filter things based on this property. So, must deriving the filtering from the usage of something like async for ev in eventbus.stream(PeerPoolMessageEvent) may not be enough.

Not sure how we'd solve that other than redesigning the events so that every command has it's entirely own event type. But even then, the filter_predicate does allow you to do more flexible things (e.g. only receive events with more than 100 transactions)

So, basically what I'm saying is, maybe the filter_predicate is simple enough to get us the performance optimization that we need for now?

@pipermerriam
Copy link
Member

So dropping this in favor of #55 right?

@cburgdorf
Copy link
Contributor Author

Yes, will close this.

@cburgdorf cburgdorf closed this May 14, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants