Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC: Use event bus to retrieve PeerCount from PeerPool by another process #1202

Conversation

cburgdorf
Copy link
Contributor

@cburgdorf cburgdorf commented Aug 21, 2018

What is this?

This is another PoC to demonstrate the concept of using an event bus for inter process communication. This time, a new independent process is interacting with the event bus to receive the current peer count from the PeerPool.

Keep in mind this is a PoC. It is full of bugs and poor ergonomics

What happens:

  1. There's a new independent dummy process being started
test_proc = multiprocessing.Process(
        target=run_test_proc,
        args=(
            test_proc_endpoint,
        ),
    )
test_proc.start()
  1. This process broadcasts a PeerCountRequest on the event bus every second
async def request_peer_count(event_bus: Endpoint):
    while True:
        await asyncio.sleep(1)
        event_bus.broadcast(PeerCountRequest())
  1. It also streams all incoming PeerCountResponse objects
async def receive_peer_count(event_bus: Endpoint):
    async for resp in event_bus.stream(PeerCountResponse):
        print("Peer Count: " + str(resp.payload))
  1. The PeerPool on the other hand listens for PeerCountRequest and answers them with PeerCountResponse
    async def answer_peer_count_requests(self):
        async for req in self.event_bus.stream(PeerCountRequest):
            print("answering!" + str(len(self)))
            self.event_bus.broadcast(PeerCountResponse(len(self)))

Known issues

  1. There's seems to be something wrong with the event loop. I could only get things to work if we don't spawn but rather fork processes. However, while that makes things work somehow I'm seeing duplicate log entries with that.

  2. The communication could be much much more efficient. Currently, the event bus just fans out each and every event to every participating process. In practice, we will want to limit the direction and flow of events to not waste resources

  3. I anticipate the event bus being used a lot for loose async communication with one sender and many receivers. However, as demoed in this PoC, the event bus can also be used for classical request / response type of work. That said, currently the API for that simply doesn't exist and combining broadcast and stream is both unergonomic and wasteful. It wouldn't be too hard to build the following API that would enable a simple request/response pattern between only two actors (without broadcasting to each and every participating actor)

response = await event_bus.request(PeerCountRequest())

@cburgdorf
Copy link
Contributor Author

@pipermerriam I'm currently implementing some ideas from #1206 to make this more ergonomic and efficient.

Once that is done I need to figure out the more generic issue with fork vs spawn and the resulting duplicate log entries that I'm seeing.

@cburgdorf cburgdorf force-pushed the christoph/feat/peer-pool-event-bus-comm branch from efcefc6 to 0a6bef9 Compare August 22, 2018 11:15
@cburgdorf
Copy link
Contributor Author

cburgdorf commented Aug 22, 2018

@pipermerriam in case you checked this yesterday, this is now updated to use the new request/response API introduced in cburgdorf/lahja@b3bda4c

This isn't yet fully implemented (responses still fan out to every endpoint) but should be fairly easy to resolve (implemented in cburgdorf/lahja@c524b8c). The nice thing is that the API for classical request / response situations looks fairly simple now.

response = await event_bus.request(PeerCountRequest())
print("Peer Count: " + str(response.payload))

@cburgdorf cburgdorf force-pushed the christoph/feat/peer-pool-event-bus-comm branch from 38fb9ff to 2499809 Compare August 23, 2018 07:53
@cburgdorf
Copy link
Contributor Author

Superseded by #1211

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant