Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only send subscribed messages #36

Closed
wants to merge 2 commits into from

Conversation

lithp
Copy link
Contributor

@lithp lithp commented Mar 19, 2019

@cburgdorf Here's a prototype of a different approach to #34. This PR also only sends messages which the remote endpoint is listening for, it doesn't require any interface changes or cognitive overhead for the user, and as a bonus it dynamically changes the set of messages being listened for.

It switches on the type of the event so it doesn't support only subscribing to peer NewTransaction messages, for instance, but that could be added:

  • filter_predicate parameters could be added to subscribe() and stream() and the callables sent to the remote ConnectionFilters.
  • the PeerPoolMessageEvent events could be broken up into multiple types, that would give us the best performance.

The code needs to be cleaned up a lot, so this is still a WIP, but what do you think about this approach?

Copy link
Contributor

@cburgdorf cburgdorf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woah! This looks cool! Thanks for jumping on this!

filter_predicate parameters could be added to subscribe() and stream() and the callables sent to the remote ConnectionFilters.

How would that work if different code pathes use different filter_predicate configs?

foo.stream(Bar, filter_predicate=lambda ev.x == 0)
foo.stream(Bar, filter_predicate=lambda ev.x == 1)

Guess, we would need to collect all filter_predicate, run all, and if any of them would let the event go through, let it go through. Does that make sense?

the PeerPoolMessageEvent events could be broken up into multiple types, that would give us the best performance.

I agree in general but at the same time there's some benefit of just staying compatible to the current PeerPool message format (if only to reduce scope creep).

I think that if the filter_predicate feature could be build into the stream / subscribe APIs and work with this approach that would be the best of both worlds and a very useful feature in general.

I'm 👍 to go down that route.

def put_nowait(self, item_and_config: Tuple[BaseEvent, Optional[BroadcastConfig]]) -> None:
item, config = item_and_config
is_response = config is not None and config.filter_event_id
is_request = hasattr(item, '_id')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you feel about simplifying this logic and just say every event that is exclusively directed toward that endpoint (filter_endpoint matches name of endpoint) is considered allowed.

In practice that will include requests and responses because those should be directed towards a single recipient.
As I understand the current logic, this will just let any request and response go through even if it might be missing an exclusive recipient via filter_endpoint.

The current logic relies on this hasattr(item, '_id') which feels like checking for a fragile implementation detail.

Copy link
Contributor Author

@lithp lithp Mar 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree that using _id isn't ideal! In my defense, BaseEvent also assumes it :) Some of the tests do too, if _id were to disappear it would be caught pretty quickly.

lahja/lahja/misc.py

Lines 42 to 58 in e7928a8

class BaseEvent:
_origin = ''
_id: Optional[str] = None
_config: Optional[BroadcastConfig] = None
def broadcast_config(self, internal: bool = False) -> BroadcastConfig:
if internal:
return BroadcastConfig(
internal=True,
filter_event_id=self._id
)
return BroadcastConfig(
filter_endpoint=self._origin,
filter_event_id=self._id
)

It looks like filter_endpoint doesn't work though, nothing enforces that it will be set. For example, it's not set in any of lahja's tests! I think I have a solution though, updates incoming.

Copy link
Contributor

@cburgdorf cburgdorf Mar 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in this test.

response = await endpoint3.request(item, BroadcastConfig(filter_endpoint=endpoint1.name))

I think I have a solution though, updates incoming.

Ok, curious what you come up with!

Copy link
Contributor Author

@lithp lithp Mar 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in this test.

Heh, oops, s/any/many/, good catch!

Ok, curious what you come up with!

I spent some time thinking about this yesterday, and I can't find any obviously right solution.

is_request really shouldn't need to be there. If there's nothing on the other side listening for this request then there's no point in sending the request! A future PR could even notice that filter_endpoint was used and throw an exception rather than block the calling coro forever while it waits for a response which will never come.

However, removing is_request causes the tests to fail, and this is because of an annoying race condition:

async def test_request(endpoint: Endpoint) -> None:
endpoint.subscribe(
DummyRequestPair,
lambda ev: endpoint.broadcast(
# Accessing `ev.property_of_dummy_request_pair` here allows us to validate
# mypy has the type information we think it has. We run mypy on the tests.
DummyResponse(ev.property_of_dummy_request_pair), ev.broadcast_config()
)
)
item = DummyRequestPair()
response = await endpoint.request(item)

Here endpoint.request() blocks forever because this thread moves too quickly. There's no time for notify_all() to tell the endpoint about the new subscription or for the endpoint to react. This can be fixed by adding an await asyncio.sleep(0.05), but that'd have to be added to many of the tests.

I don't think this will be a problem in the real world. I don't think we have anything which subscribes and expects to immediately receive messages, where a dropped initial message would cause problems. Since everything is in different processes trinity doesn't have the same idea of simultaneity that these lahja tests have.

There are some options for dealing with this:

  1. Just add asyncio.sleep() to the tests. There's good precedent here, we often call asyncio.sleep(0) to allow other coroutines a chance to run.
  2. Add a helper, await_subscriptions_updated(endpoint, event_type), which blocks until the endpoint notices the subscription.
  3. Somehow make _notify_subscriptions_changed synchronous. I couldn't find an easy way to do this, there's not really a way for Servers to send messages back to their clients.
  4. Somehow make an asyncio version of BaseManager such that while everything is in the same process we don't need to use any extra threads, and asyncio.sleep(0) is enough to register a subscription.
  5. Give our Endpoints the ability to inspect the BaseManager and notice that the remote Endpoint is running in the same thread as we are. When that's true notify_subscriptions_changed does something different which is synchronous, so we never have a race condition.

I don't like any of these, though (2) might be my favorite, do you have any thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's nothing on the other side listening for this request then there's no point in sending the request!

Correct!

I don't think this will be a problem in the real world. I don't think we have anything which subscribes and expects to immediately receive messages, where a dropped initial message would cause problems. Since everything is in different processes trinity doesn't have the same idea of simultaneity that these lahja tests have.

Yes, I agree, there shouldn't be a guarantee about which message will be the initial one receives after subscribing.

Solution 2. would be my favorite as well but I'm easy to please so I would even be ok with 1. 😄

lahja/endpoint.py Outdated Show resolved Hide resolved
lahja/endpoint.py Outdated Show resolved Hide resolved
lahja/endpoint.py Outdated Show resolved Hide resolved
self._queues[event_type].remove(queue)
self._notify_subscriptions_changed()
try:
for _ in iterations:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of the itertools.repeat(True, num_events) wouldn't something like for i in range(num_events) come more natural?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm pretty frustrated by this code! But range(None) and even itertools.repeat(True, None) both throw exceptions, rather than iterating forever. And I could pull this out into a helper, but think that whatever name I could pick would more more confusing than just inlineing this code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like islice does the thing this needs! Just pushed a slightly cleaner commit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code above doing the islice probably deserves a comment. It wasn't clear to me on a quick read that it was also handling the possibility that num_events is None and thus needing it to do an infinite iteration in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed a version which is a more verbose but also easier to read.

Copy link
Contributor

@cburgdorf cburgdorf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good (after obvious git cleanup) 👍

Two things that I think would be nice:

  1. The tests seem to focus on just checking if the subscribed_events behaves as expected but there doesn't seem to be a test that truly proves that a non-listening endpoint does not receive the broadcast. From the top of my head I'm not sure if there's an easy way to test that but if you can think of something I feel such test would be valuable.

  2. I'd love to see the proposed filter_predicate config be added to stream, subscribe, wait_for to have more flexibility to not be forced to break up events into smaller events. While I do think these more focused events are actually a good thing, I think having the flexibility of filter_predicate is a valuable API to have. For my current work on the sync, it would allow the BaseProxyPeerPool to mimic the PeerMessage until some future refactoring where the events in both actual and proxy peer pool are refactored. Doesn't have to be in this PR though.

proxy = remote._connected_endpoints[local_config.name]
assert proxy.proxy.subscribed_events() == {TestEvent}

remote.stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like something that should be doable via a context manager or in a try/finally. What happens if we forget to explicitly stop these or the assertions fail and these statements don't get hit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wrapped it into a context manager.

endpoint.stop()


class TestEvent(BaseEvent):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of annoying, but if you name this EventForTesting then it pytest won't try to collect it as a test class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're save, pytest doesn't seem to be collecting this:

(lahja) brian@rhythm:~/ef/lahja$ pytest --collect-only
========================================================================================================= test session starts =========================================================================================================
platform linux -- Python 3.7.0, pytest-4.0.2, py-1.8.0, pluggy-0.9.0 -- /home/brian/.local/share/virtualenvs/lahja-2m5rLbjb/bin/python3.7
cachedir: .pytest_cache
rootdir: /home/brian/ef/lahja, inifile: pytest.ini
plugins: xdist-1.25.0, forked-1.0.2, asyncio-0.9.0
collected 22 items                                                                                                                                                                                                                    
<Module 'tests/core/test_basics.py'>
  <Function 'test_request'>
  <Function 'test_request_can_get_cancelled'>
  <Function 'test_response_must_match'>
  <Function 'test_stream_with_break'>
  <Function 'test_stream_with_num_events'>
  <Function 'test_stream_can_get_cancelled'>
  <Function 'test_stream_cancels_when_parent_task_is_cancelled'>
  <Function 'test_wait_for'>
  <Function 'test_wait_for_can_get_cancelled'>
<Module 'tests/core/test_broadcast_config.py'>
  <Function 'test_broadcasts_to_all_endpoints'>
  <Function 'test_broadcasts_to_specific_endpoint'>
<Module 'tests/core/test_connect.py'>
  <Function 'test_can_not_connect_conflicting_names_blocking'>
  <Function 'test_can_not_connect_conflicting_names'>
  <Function 'test_rejects_duplicates_when_connecting_blocking'>
  <Function 'test_rejects_duplicates_when_connecting'>
  <Function 'test_rejects_duplicates_when_connecting_nowait'>
<Module 'tests/core/test_filter_pred.py'>
  <Function 'test_subscribed_events'>
  <Function 'test_remote_subscribed_events'>
  <Function 'test_can_wait_for_changes'>
<Module 'tests/core/test_import.py'>
  <Function 'test_import'>
<Module 'tests/core/test_internal.py'>
  <Function 'test_internal_propagation'>
<Module 'tests/core/test_stop.py'>
  <Function 'test_can_stop'>

@lithp
Copy link
Contributor Author

lithp commented Apr 3, 2019

A quick update on this. The benchmarking test failed with an exception, and I've also seen it fail a different way when running locally. Here's a gist with both failures: https://gist.github.com/lithp/35f66be963b6cf68c1e3fedaa18b814d

The code runs just fine during normal execution, but multiprocessing's atexit handlers cause exceptions when trying to tear everything down.

A solution might be:

  • Put a try-catch block somewhere and swallow exceptions which we know can only be caused by the program quitting
  • The problem happens because we have long-lived condition.wait() calls which are still active when the process exits. A different architecture where endpoints directly told their remotes that their subscriptions had updated wouldn't have this problem. It would be a little more complicated though.

I'm looking at exactly why the failures occur during shutdown, hopefully there's a way to get it to shutdown cleanly without resorting to either of the above solutions.

@lithp
Copy link
Contributor Author

lithp commented Apr 4, 2019

Today I tried going with a different approach: it might be easier to reason about Endpoints if they always communicate with each other directly. Instead of using a thread to long-poll Endpoints connect to the Endpoints which have connected to them and synchronously send messages to tell each other about changes to the message they're subscribed to. I'm still in the process of getting it working.

@lithp lithp force-pushed the lithp/event_subscriptions branch from c6a22b8 to 60f4743 Compare April 4, 2019 21:56
@lithp lithp force-pushed the lithp/event_subscriptions branch from b84c948 to 090cf35 Compare April 5, 2019 01:07
@lithp
Copy link
Contributor Author

lithp commented Apr 5, 2019

Okay, I think this is ready for review again. I fixed all the atexit exceptions and squashed history and added a test that messages which are filtered out really aren't being sent.

I found it pretty hard to get this working, and I think a big piece is that multiprocessing doesn't play nicely with asyncio. A bigger change might be to change how we're using multiprocessing or replace it with something like zeromq which has an asyncio binding, I think that would make this code a lot easier to work with.

If you don't mind, I think filter_predicate can be added as part of a different PR, this one seems to be working! And that's a further performance optimization that can be added in the future.

@lithp
Copy link
Contributor Author

lithp commented Apr 5, 2019

Well... it looks like perf_benchmark now deadlocks. I'll look into it tomorrow 😢

Copy link
Contributor

@cburgdorf cburgdorf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bigger change might be to change how we're using multiprocessing or replace it with something like zeromq which has an asyncio binding

Yep, it's been on my radar as well. Not only because it doesn't play nicely together but also because it should be more efficient. We could either use something like zeromq or py-nng as mentioned in #31 or we could implement something with asyncio.Streams if we care for a pure Python solution (See my related SO post).

But this is opening a bigger box, so for now, I'm happy with whatever moves us forward with this issue.

If you don't mind, I think filter_predicate can be added as part of a different PR

It will mean we have to break up the peer pool commands into distinct events but that's alright. It actually leads to a nicer, more focused API anyway 👍

Well... it looks like perf_benchmark now deadlocks. I'll look into it tomorrow 😢

😭

This looks good apart from the CI fails. Fingers crossed the remaining issues won't cause you nightmares.

# Often the multiproxessing server shuts down before this thread does, leading
# to exceptions when we try to call methods on our ConditionProxy.
if not multiprocessing.util.is_exiting():
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clever

@pipermerriam
Copy link
Member

pipermerriam commented Apr 5, 2019

or we could implement something with asyncio.Streams

This seems like a good hack-day project for while we're here together in June. I suspect a minimal implementation should be pretty straight forward to do.

Also it looks like we may have the option of doing that in cython and gaining some additional performance.

@lithp lithp force-pushed the lithp/event_subscriptions branch from 8964982 to 04fbba8 Compare April 6, 2019 03:59
@lithp lithp changed the title WIP - only send subscribed messages Only send subscribed messages Apr 6, 2019
1. The deadlocks should never occur, processes will time out if some
events are dropped.

2. The driver waits for all the consumers to be ready before pushing
events, to ensure that no messages are dropped.
@lithp lithp force-pushed the lithp/event_subscriptions branch from 98778a4 to abc8e7a Compare April 6, 2019 05:46
@lithp
Copy link
Contributor Author

lithp commented Apr 6, 2019

Okay, all the benchmarking now works every time. I know you've already approved it twice but this was a big enough change to perf_benchmark.py that I don't want to just merge without giving you a chance to review it one last time.

Feel free to merge it yourself if you want to build off this! And if I don't hear back I'll merge it sometime next week.

@cburgdorf
Copy link
Contributor

Awesome! Looks good! The only thing I want to double check is if it work in Trinity or if any adjustments would be needed. I don't think so but I think it's worth checking. I'm on mobile now but can check it later today or tomorrow.

@cburgdorf
Copy link
Contributor

Ok, it looks like it doesn't work as a drop-in replacement in Trinity. It breaks the system where the endpoints connect to each other. There might be some changes in Trinity needed to adjust to this PR but I haven't looked into it yet.

@lithp
Copy link
Contributor Author

lithp commented Apr 7, 2019

Ah, that's frustrating, and also a little embarrassing; I should have checked myself. What did it break in trinity?

@cburgdorf
Copy link
Contributor

cburgdorf commented Apr 8, 2019

What did it break in trinity?

The endpoints aren't subscribing each other which then causes things like the discovery not answering the peer pool etc. I guess what happens is that the endpoints send EndpointConnected to main before the main process has the subscription propagated back to the sending endpoint.

@lithp
Copy link
Contributor Author

lithp commented Apr 10, 2019

It looks like Ctrl-c also got uglier:

    INFO  04-09 19:16:49               trinity  DB process (pid=19580) terminated
Exception in thread Thread-39:
Traceback (most recent call last):
  File "/home/brian/.local/share/virtualenvs/trinity-mNs_Su9C/lib/python3.7/site-packages/lahja/endpoint.py", line 126, in _run
    self.condition.wait()
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/managers.py", line 1038, in wait
    return self._callmethod('wait', (timeout,))
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/managers.py", line 796, in _callmethod
    kind, result = conn.recv()
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/connection.py", line 383, in _recv
    raise EOFError
EOFError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/threading.py", line 917, in _bootstrap_inner
    self.run()
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "/home/brian/.local/share/virtualenvs/trinity-mNs_Su9C/lib/python3.7/site-packages/lahja/endpoint.py", line 126, in _run
    self.condition.wait()
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/managers.py", line 1032, in __exit__
    return self._callmethod('release')
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/managers.py", line 795, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

@lithp
Copy link
Contributor Author

lithp commented Apr 10, 2019

I think I should admit failure here, I thought this approach would be simpler and the user interface it exposes it certainly easier to use, but it was obvious how filter_predicate worked and with that approach we wouldn't have had to worry about these kinds of race conditions or messy exits.

in this PR the sending endpoints wait for any updates by long-polling into each of the endpoints they connect to. Because of how multiprocessing works this required a thread, and because Python doesn't support interrupting threads there's no easy way I can think of to make shutdowns definitely cleaner. Also because of the long-polling there's no easy way to block until other threads have updated their subscriptions, so we get race-conditions.

I think an easy fix to the race conditions is to add a flag which lets you say "I know the other side will want this message" so it skips the filter. That would only be a stop-gap until this is replaced with a better architecture. I'm currently looking into why this fails on Trinity, hopefully that or a similar fix is enough.

I also have a branch which gets rid of the threads and communicates via endpoints connecting back to the endpoints which connect to them, then sending messages the usual way. I think I'll take a look at getting that working too.

@cburgdorf
Copy link
Contributor

I think I should admit failure here, I thought this approach would be simpler and the user interface it exposes it certainly easier to use, but it was obvious how filter_predicate worked and with that approach we wouldn't have had to worry about these kinds of race conditions or messy exits.

The fiter_predicate solution is certainly simpler implementation wise it has worked well for me so far but it is in fact very appealing not having to worry about manually setting up filters for efficiency!

I think an easy fix to the race conditions is to add a flag which lets you say "I know the other side will want this message" so it skips the filter.

So, you mean the sender would use something like a force flag in the BroadcastConfig to bypass the filter? This would then only be used for certain event types where the initial event is important such as EndpointConnected and maybe ShutdownRequested.

I also have a branch which gets rid of the threads and communicates via endpoints connecting back to the endpoints which connect to them, then sending messages the usual way. I think I'll take a look at getting that working too.

Nice! But I guess, the main problem remains that we have no certainty at the time of sending an event whether we do already know about the receivers possible subscription or not. I guess what we would need for that is that connect_to_endpoints would not only wait until we have established the connection to the remote endpoint but also send an initial info about its possible subscriptions to our events.

If you feel that the complexity passes a certain threshold and takes away too much of your time that you rather spend on the sync effort, we can also fall back to the filter_predicate solution and push the transparent solution down the road. That said, no pressure from me as I'm on vacation until 24th anyway. I just wanted to throw this in as an option, so you know you shouldn't feel obligated to finish the solution if the effort gets too high after all.

@lithp
Copy link
Contributor Author

lithp commented Apr 11, 2019

So, you mean the sender would use something like a force flag in the BroadcastConfig to bypass the filter? This would then only be used for certain event types where the initial event is important such as EndpointConnected and maybe ShutdownRequested.

Yeah, exactly!

Nice! But I guess, the main problem remains that we have no certainty at the time of sending an event whether we do already know about the receivers possible subscription or not. I guess what we would need for that is that connect_to_endpoints would not only wait until we have established the connection to the remote endpoint but also send an initial info about its possible subscriptions to our events.

Yep, the idea is to make that part synchronous. connect_to_endpoints shouldn't return until you can safely send messages.

If you feel that the complexity passes a certain threshold and takes away too much of your time that you rather spend on the sync effort, we can also fall back to the filter_predicate solution and push the transparent solution down the road. That said, no pressure from me as I'm on vacation until 24th anyway. I just wanted to throw this in as an option, so you know you shouldn't feel obligated to finish the solution if the effort gets too high after all.

Thanks for the offer! And enjoy your vacation :) I want to give this another try but I'll keep that in mind and switch back to state sync if this takes much longer.

Today I spent more time looking into why this fails with Trinity. I'm still not sure what's happening but I've discovered some things:

  • _connect_receiving_queue is a coroutine, but handle_new_endpoints calls connect_to_endpoints_blocking, which blocks the main thread which it tries to connect.
  • _connect_receiving_queue is also in a precarious spot. It doesn't catch any exceptions, so any exception which a handler raises causes it to stop processing messages. Even worse, nothing is printed! I think that's because the coro is passed into gather(), asyncio plans on returning the exception along with the gather() call.
  • For some reason, when main tries to connect to bjson-rpc-api it receives [Errno 111] Connection refused.
  • When main tries to connect to discovery, it receives [Errno 104] Connection reset by peer

That last exception prevents networking from being told about the discovery service (the AvailableEndpointsUpdated message is never sent) and therefore it's requests to the discovery service are dropped, and this is why the peer pool complains that the discovery service did not respond in time.

@lithp
Copy link
Contributor Author

lithp commented Apr 12, 2019

For some reason, when main tries to connect to bjson-rpc-api it receives [Errno 111] Connection refused.

After spending some quality time with strace, it appears that this happens because the plugin segfaults soon after it starts listening. Here's the output from a core dump:

Program terminated with signal SIGSEGV, Segmentation fault.
#0  __new_sem_wait_fast (definitive_result=1, sem=<optimized out>) at sem_waitcommon.c:135
135	sem_waitcommon.c: No such file or directory.
[Current thread is 1 (Thread 0x7f82a88a9080 (LWP 4921))]
(gdb) bt
#0  __new_sem_wait_fast (definitive_result=1, sem=<optimized out>) at sem_waitcommon.c:135
#1  __new_sem_trywait (sem=0x7fe8718de000) at sem_wait.c:82
#2  0x00007f82a603b183 in semlock_acquire (self=0x7f82a3777d18, args=<optimized out>, kwds=<optimized out>)
    at /home/brian/.pyenv/sources/3.7.0/Python-3.7.0/Modules/_multiprocessing/semaphore.c:309
#3  0x000000000043beca in _PyMethodDef_RawFastCallKeywords (method=0x7f82a623c560 <semlock_methods+64>, self=self@entry=0x7f82a3777d18, args=args@entry=0x7f829c2f99e8, 
    nargs=nargs@entry=0, kwnames=kwnames@entry=0x0) at Objects/call.c:690
#4  0x00000000005bc349 in _PyMethodDescr_FastCallKeywords (descrobj=0x7f82a6447048, args=args@entry=0x7f829c2f99e0, nargs=nargs@entry=1, kwnames=kwnames@entry=0x0)
    at Objects/descrobject.c:288
#5  0x000000000042acb9 in call_function (kwnames=0x0, oparg=1, pp_stack=<synthetic pointer>) at Python/ceval.c:4563
(gdb) py-bt
Traceback (most recent call first):
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/synchronize.py", line 231, in __enter__
    return self._lock.__enter__()
  File "/home/brian/ef/lahja/lahja/endpoint.py", line 261, in _notify_subscriptions_changed
    with self._subscriptions_changed:
  File "/home/brian/ef/lahja/lahja/endpoint.py", line 571, in subscribe
    self._notify_subscriptions_changed()
  File "/home/brian/ef/trinity/trinity/endpoint.py", line 54, in auto_connect_new_announced_endpoints
    self.subscribe(AvailableEndpointsUpdated, self.connect_to_other_endpoints)
  File "/home/brian/ef/trinity/trinity/extensibility/plugin.py", line 318, in _prepare_start
    self.event_bus.auto_connect_new_announced_endpoints()
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/spawn.py", line 118, in _main
    return self._bootstrap()
  File "/home/brian/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
  File "<string>", line 1, in <module>

@lithp
Copy link
Contributor Author

lithp commented Apr 12, 2019

The same segfault happens to PeerDiscoveryPlugin, which would explain why main can't connect back to it.

@lithp
Copy link
Contributor Author

lithp commented Apr 13, 2019

After spending more time with strace I've figured out why the plugins are segfaulting. They segfault while calling: self._notify_subscriptions_changed(), which uses a semaphore internally. The problem is that this semaphore was created in the main process and the memory it was mapped to does not exist in the child processes.

The process is failing on a dereference:

(gdb) disas
Dump of assembler code for function __new_sem_trywait:
=> 0x00007f0922dea840 <+0>:	mov    (%rdi),%rax
   0x00007f0922dea843 <+3>:	test   %eax,%eax
   0x00007f0922dea845 <+5>:	je     0x7f0922dea858 <__new_sem_trywait+24>
   0x00007f0922dea847 <+7>:	lea    -0x1(%rax),%rdx
...
(gdb) info registers
...
rdi            0x7fe8718da000	140636314247168

That address is not a part of any of the memory segments, however it does show up in the main process' strace:

lstat("/dev/shm/DsUT5s", 0x7ffcf6436c80) = -1 ENOENT (No such file or directory)
openat(AT_FDCWD, "/dev/shm/DsUT5s", O_RDWR|O_CREAT|O_EXCL, 0600) = 22
write(22, "\1\0\0\0\0\0\0\0\200\0\0\0\350\177\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 32) = 32
mmap(NULL, 32, PROT_READ|PROT_WRITE, MAP_SHARED, 22, 0) = 0x7fe8718da000
link("/dev/shm/DsUT5s", "/dev/shm/sem.mp-8zv2nf9_") = 0
fstat(22, {st_mode=S_IFREG|0600, st_size=32, ...}) = 0
unlink("/dev/shm/DsUT5s")               = 0
close(22)                               = 0
unlink("/dev/shm/sem.mp-8zv2nf9_")      = 0

(the above is roughly what you'd expect sem_open to do)

The child process is created by these lines:

https://github.com/ethereum/trinity/blob/413882806fe5031e790fa0be11ef1e037ed06886/trinity/extensibility/plugin.py#L284-L294

Under the covers, Process.start() is using a clone() followed by an execve() which wipes away everything and starts afresh. Python tries to hide this from you by pickling up all the state, sending it down a pipe to the child, and unpickleing it on the other side. However, it does not correctly set the semaphores back up, so the child crashes when it tries to access the memory which no longer exists.

This would be fixed by not creating an event bus for the Plugin until the new process has already been started. It might also be possible to customize the pickling and recreate the semaphore, since it has been mapped to a real file.

And another thing that I discovered: The children processes were hitting SIGSEGV, and the main process received SIGCHLD signals informing it that some children had crashed. However, it never emitted anything to the log or otherwise acted on this information.

@lithp
Copy link
Contributor Author

lithp commented Apr 13, 2019

Yeah, that does the trick. I've opened a PR against trinity which changes it to not send Endpoints over process boundaries. Now everything appears to be working!

@lithp
Copy link
Contributor Author

lithp commented May 3, 2019

Closing in favor of #42. #42 is based on asyncio which should cause fewer headaches in the long run.

@lithp lithp closed this May 3, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants