Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] APIs I expected/wanted on "first contact" #89

Open
carver opened this issue May 24, 2019 · 8 comments
Open

[WIP] APIs I expected/wanted on "first contact" #89

carver opened this issue May 24, 2019 · 8 comments

Comments

@carver
Copy link
Contributor

carver commented May 24, 2019

Since this is roughly my first real contact with lahja, I wanted to write down my first impressions. I'll leave this open and [WIP] for a bit, as I collect my thoughts.

Custom APIs

An acknowledgment response

I have a request/response pair, but the response is super simple. Basically, I just want to know when it succeeds. So I don't want to have to define a custom response event type. I just want the caller to hang until the server responds.

This could look almost identical to the current Endpoint.subscribe() API (except I also want an async version, mentioned below). As soon as the handler exits, the vanilla response event is sent to acknowledge completion.

Response with a primitive value

I have a request, but just need a simple value back. Do I really need to define the response event?

Something like, client-side:

result = await endpoint.request(NeedsDoubling(3))
assert result == 6

# also, a blocking version:
endpoint.request_blocking(NeedsDoubling(3))

Server-side

async def doubler(event: NeedsDoubling):
  return event.operand * 2
endpoint.add_async_handler(NeedsDoubling, doubler)

# also
endpoint.add_handler(NeedsDoubling, lambda event: event.operand * 2)

Maybe the acknowledgement becomes redundant with this API, because you could just return None.

Connection Retries

If I start both the server and client and the client runs slightly before, I get a connection refused. I'd prefer to have it retry, at least for a while. Probably something like:

  • connect_to_endpoints(ConnectionConfig, timeout=seconds_im_willing_to_wait)

There are likely more APIs that would benefit. Unsure what the default timeout should be, if any.

Synchronous versions

Sync versions of most APIs. I expect the call to block until success (probably by launching a new thread under the hood):

  • Endpoint.connect_to_endpoints()
  • Endpoint.broadcast()

Async versions

like Endpoint.subscribe(event, some_awaitable_handler)

@pipermerriam
Copy link
Member

Distilled to the things that aren't already addressed in master or via one of the open pull requests.

This also touches on something that is now possible, at least once #70 is merged, which is setting up the server-side request/response handlers using the EndpointAPI.subscribe API now that it accepts a coroutine as the handler.

@carver
Copy link
Contributor Author

carver commented May 27, 2019

I suppose another thing to do is update the examples in https://github.com/ethereum/lahja/tree/master/examples -- which was where I ended up on first contact.

@carver
Copy link
Contributor Author

carver commented May 28, 2019

So here's a toy example of something I'd like to do from inside a BaseIsolatedPlugin. I thought it would help give color to the API desires...

def stacked_calls_demo(bus):
    bus.start_serving('task-runner')
    # This ^ is where task requesters will send events

    # Task execution will sometimes need extra data, connect below...

    # Block below connection until the endpoint becomes available...
    data_gatherer = bus.connect_to_endpoint('DataGatherer', timeout=10)
    # In the real implementation, this is probably lazier, and doesn't connect until
    # the first data is needed.

    # spawn a thread when this event is received. event expects primitive bool response
    bus.add_handler(RequestLongTask, long_task_runner(data_gatherer))

    # block until sigint || sigterm
    bus.serve_forever()

@curry
def long_task_runner(data_gatherer, event) -> bool:
    """
    returns bool: whether the long task was completed successfully
    """
    if not is_data_available():
        # block until data retrieved
        data = data_gatherer.request(WantData())
        # in the real beam sync, this ^ would probably just be an ACK
        # and the data would be loaded direct from the DB
    else:
        data = get_data()                                                                                  
    
    result = do_long_processing(data)
    
    return is_result_valid(result)

Also, maybe this requires making a sync version of the isolated plugin. Maybe it doesn't matter that we're blocking the event loop if we're not using it anywhere in this plugin, but it feels wrong.

@cburgdorf
Copy link
Contributor

@carver I've updated the examples and added tests to CI so that they don't break again #101

@carver
Copy link
Contributor Author

carver commented May 29, 2019

Also want to be able to run a broadcast from another thread using something like:

future = asyncio.run_coroutine_threadsafe(event_bus.request(GetSomeData(key)), loop)
data = future.result(timeout=10)

(courtesy of Brian )

This crashes (in v0.12.0) with:

  File ".../venv/lib/python3.6/site-packages/lahja/endpoint.py", line 179, in run
    if self._loop != asyncio.get_event_loop():
  File "/usr/lib/python3.6/asyncio/events.py", line 694, in get_event_loop
    return get_event_loop_policy().get_event_loop()
  File "/usr/lib/python3.6/asyncio/events.py", line 602, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.

@lithp
Copy link
Contributor

lithp commented May 30, 2019

@carver I think you already figured this out since you got beam sync working but the RuntimeError is because the decorator runs too soon, ti's not async-aware. Wrapping the call to event_bus.request() in another coro is enough to get this working:

async def run_request(endpoint):
    return await endpoint.request(Request('hi'))

Here's a working (on 3.7.0) example of blocking an event-loop-less thread on a response from the endpoint: https://gist.github.com/lithp/13ca70023dfedb25ceab7703ed6e4207

I'm not sure exactly what we could do to make the decorator wait a little before checking the event loop but I'm sure it's possible to fix this, it might be worth opening a ticket just for this problem.

@carver
Copy link
Contributor Author

carver commented May 31, 2019

I guess it's all really a tangent, because what I really want is a ThreadedEndpoint that will block on:

endpoint.request(GetSomeData(key))

@carver
Copy link
Contributor Author

carver commented May 31, 2019

FWIW, this is a local lahja patch on v0.12.0 that I'm running, so I don't have to deal with the RuntimeError:

diff --git a/lahja/endpoint.py b/lahja/endpoint.py
index 20dae5e..366aa5b 100644
--- a/lahja/endpoint.py
+++ b/lahja/endpoint.py
@@ -175,11 +175,16 @@ class Endpoint:
         def run(self, *args, **kwargs):  # type: ignore
             if not self._loop:
                 self._loop = asyncio.get_event_loop()
-
-            if self._loop != asyncio.get_event_loop():
-                raise Exception(
-                    'All endpoint methods must be called from the same event loop'
-                )
+            else:
+                try:
+                    current_loop = asyncio.get_event_loop()
+                except RuntimeError:
+                    pass
+                else:
+                    if self._loop != asyncio.get_event_loop():
+                        raise Exception(
+                            'All endpoint methods must be called from the same event loop'
+                        )
 
             return func(self, *args, **kwargs)
         return cast(TFunc, run)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants