Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform batch RPC requests to ethereum node #832

Open
Uxio0 opened this Issue May 9, 2018 · 33 comments

Comments

Projects
None yet
8 participants
@Uxio0
Copy link
Contributor

commented May 9, 2018

  • Version: 4.2.0
  • Python: 3.6
  • OS: osx/linux/win

What was wrong?

There's currently not support for batch requests, when using HTTPProvider. Even using IPC/WS connectors I think a speed up could be possible using batch requests.

How can it be fixed?

Implementing batch requests

@carver

This comment was marked as outdated.

Copy link
Collaborator

commented May 9, 2018

I don't understand what you mean by batching, here. IPC is a streaming connection. Can you elaborate?

@pipermerriam

This comment was marked as outdated.

Copy link
Member

commented May 9, 2018

@carver web3js at least historically had a feature for asynchronous request batching via an API that let you stack up some JSON-RPC requests, and then submit them as a batch. The requests would be issued concurrently. (feature may still exist, not sure).

I'm good with exploring this alongside #657

@Uxio0

This comment was marked as outdated.

Copy link
Contributor Author

commented May 9, 2018

I'm using this feature with HTTP requests to the JSON RPC API (no web3 involved) and is working very fast (compared to doing everything in multiple requests or event using a ThreadPoolExecutor), so I think it can't harm web3 to at least have that option

@carver

This comment was marked as outdated.

Copy link
Collaborator

commented May 9, 2018

web3js at least historically had a feature for asynchronous request batching via an API that let you stack up some JSON-RPC requests, and then submit them as a batch

Right, that makes sense in web3js, because the HTTP request to your node has a lot of overhead (HTTP headers) for each message. But there is roughly 0 overhead for each message in an IPC stream. So I'm skeptical that it will provide any benefit in that context.

I'm less sure about the internals of websockets. There might be some benefit there.

Batching over an HTTP connection is likely to have benefits. 👍

@Uxio0

This comment was marked as outdated.

Copy link
Contributor Author

commented May 9, 2018

Hi @carver,

I agree about that, but:

  • I'm talking about HTTP, not in every environment is possible to use IPC/WS.
  • Even using IPC/WS, it's possible that Geth/Parity and other nodes have optimizations depending on if they are receiving a batch request or multiple streamed requests. At least for me is faster to send batchs than to send synchronous IPC requests (python 3.6 at least with web3 4.2.0).

Hope I have explained myself better :)

@carver

This comment has been minimized.

Copy link
Collaborator

commented May 9, 2018

Ah yes, thanks. I updated the issue to indicate that HTTP is the primary target. 👍

I will be happy if we find that it boosts speed for everything else, too!

@pipermerriam

This comment has been minimized.

Copy link
Member

commented May 9, 2018

@carver having now thought about this for a few minutes, I think this is going to be easy if we build it on top of #657 since everything under web3.async.* will be coroutine based and thus, batching them to run concurrently (in the asyncio version of concurrency) should be really easy to expose .

Here's some imagined API

w3 = Web3(...)

batch = web3.async.create_batch()
batch.eth.getBalance(addr_a)
batch.eth.getBalance(addr_b)
batch.eth.getBalance(addr_c)
bal_a, bal_b, bal_c = await batch.execute()
@dangell7

This comment has been minimized.

Copy link

commented May 9, 2018

Interesting idea. What happens when one tries to send multiple tranascations? Cuz in my system, I have to wait till the first transaction clears before send the next. Prob because getTransactionCount doesn’t take the pending transaction into consideration. I didn’t really dive into the problem, but wanted to bring it up.

@pipermerriam

This comment has been minimized.

Copy link
Member

commented May 10, 2018

@dangell7 good point. We may choose to start with a minimal initial implementation which only allows batching requests that are read-only or that don't modify state.

@boneyard93501

This comment has been minimized.

Copy link
Contributor

commented May 21, 2018

@pipermerriam anybody working on this? it's not all that far off the poc i did a while ago. might be able to resurrect that without too many changes as a wip starting point.

@carver

This comment has been minimized.

Copy link
Collaborator

commented May 21, 2018

@boneyard93501 not that I've seen

@boneyard93501

This comment has been minimized.

Copy link
Contributor

commented May 21, 2018

@carver cool. if you don't mind, i'll take a stab at it starting wednesday.

@pipermerriam

This comment has been minimized.

Copy link
Member

commented May 21, 2018

@boneyard93501 are you looking at starting into the web3.async API? If not, can you provide a proposal for what you're looking to build? Basic API and a basic description of how it'll work.

@boneyard93501

This comment has been minimized.

Copy link
Contributor

commented May 23, 2018

@pipermerriam @carver
if a solution is supposed to be provider agnostic, i think a thread-based pub-sub solution is probably the most straight forward (and could be wrapped in an event-loop, if needed). quick and dirty outline, untested, unoptimized/un-toolzed:

import time
import datetime
import queue
import threading


WORKER_TIMEOUT = 5.0


def batch_request_handler(rpc_request_queue, provider, response_processing_fn):
    '''  '''
    last_job = None
    while 1:
        try:
            if (datetime.datetime.utcnow() - last_job).total_seconds() > WORKER_TIMEOUT:
                response_processing_fn(('','', 'TimeoutError'))
                break
            job = rpc_request_queue.get()
            if job is None:
                break
            last_job = datetime.datetime.utcnow()
            try:
                result = provider._make_request(job)
                response_processing_fn((job,result, ''))
            except Exception as exc:
                response_processing_fn((job,'',exc))
        except (KeyboardInterrupt, SystemExit):
            pass
        finally:
            rpc_request_queue.task_done()
        time.sleep(0.1)


def batch_request(provider, payload, response_processing_fn, max_request_threads=10, max_request_q=200):
    '''
        could change provider to provider_type and then create provider in the worker to
        avoid any possible thread-safety issues now of down the road.
        also should take threading.Event, adjust worker while condition accordingly, so user can kill it all at will
   '''

    rpc_reqeuest_queue = queue.Queue()

    request_args = (rpc_reqeuest_queue, provider, response_processing_fn)
    req_threads = []

    for i in range(max_request_threads):
        t = threading.Thread(target=batch_request_handler, args=request_args)
        t.setDaemon(True)  # we don't really need it since we're joining but it helps to avoid zombies
        t.name = f'batch request thread {i}'
        req_threads.append(t)

    [t.start() for t in req_threads]

    for job in payload:
        rpc_reqeuest_queue.put(job)

    time.sleep(WORKER_TIMEOUT + 1.0)
    [t.join() for t in req_threads]

 

# user side
class SuperSimplClientBatchProcessingClass:
    def __init__(self, batch_size):
        ''' '''
        self.expected_results = batch_size
        self.jobs_processed = 0

    def job_update(self, data):
        ''' only requirement from web3py side is to be able to accept a (named) tuple '''
        self.jobs_processed += 1
        if data[1]:
            pass  # do something with successes
        else:
            pass  # do something with failures including TimeoutError

    @property
    def done(self):
        if len(self.jobs_processed) == self.jobs_processed:
            return True
        else:
            return False

    @property
    def progress(self):
        return {'batch-size': self.batch_size, 'processed_jobs': self.jobs_processed}
@pipermerriam

This comment has been minimized.

Copy link
Member

commented May 23, 2018

@boneyard93501 echoing what @voith said in another issue.

  1. I think asyncio is going to be our goto for concurrency (and conversely, I'm hesitant to add any thread based concurrency for that reason).
  2. I don't have a specific concern here, but I know historically that users who've tried to use web3.py in conjunction with threads have had issues, so a thread based solution seems likely to have unexpected problems.
@boneyard93501

This comment has been minimized.

Copy link
Contributor

commented May 23, 2018

k. converting above to asyncio Queue is fairly trivial. although you'll want to make threaded allowances (concurrent.futures, ThreadPoolExecutor). for event loop registration purposes, i was trying to find the (repo for the) async API but came up short. where can i find it? thx.

now, wrapping providers is nice and may have some benefits but the real benefit of batch processing comes from muxing over websocket(s). how do you feel about being opinionated and a) provide a ws-based batch processing solution only (once a suitable ws implementation is in place) and b) provide an example(s) in the documentation on how to wrap providers right now?

@pipermerriam

This comment has been minimized.

Copy link
Member

commented May 23, 2018

i was trying to find the (repo for the) async API but came up short. where can i find it? thx.

Is this what you are looking for?

https://docs.python.org/3.5/library/asyncio.html

how do you feel about being opinionated and a) provide a ws-based batch processing solution only (once a suitable ws implementation is in place) and b) provide an example(s) in the documentation on how to wrap providers right now?

Not sure I fully understand the question. Generally, we are going to be cautious about releasing an API that we aren't sure about because of our commitment to graceful deprecation cycle for breaking API changes.

I think the code and approach you are taking will be useful, but I'm not confident that it's something that we can release prior to having some baseline web3.async API to build on. Basically, we've removed all async from web3 to make room for a sensible async approach. We think that our plans for a web3.async namespace are that approach, however, we need the basics in place before we can start adding features like async batched requests. It's hard for me to evaluate the API your proposing without already having some basis for what it's being built upon... make sense?

@boneyard93501

This comment has been minimized.

Copy link
Contributor

commented May 23, 2018

@pipermerriam actually, i was looking for web3.async API which i gather doesn't exist just yet.

other than that, i proposed, well attempted to, to not implement any batch functionality until we know what the websocket revision looks like and instead consider addressing batch with user-side implementation examples in the documentation.

@jakublipinski

This comment has been minimized.

Copy link

commented Aug 11, 2018

@pipermerriam @boneyard93501 I compared the execution time between requesting the transaction receipts for a particular block synchronously and asynchronously. I used asyncio and aiohttp module for async HTTP requests. I based my code on the excellent article by @pawelmhm.
The results are very promising. When the program is run locally on a machine with a running parity node I get:

sync: 1.843s
async: 0.264s

When it's run over the Internet the results are:

sync: 8.701s
async: 0.757s

Overall I get 7x-11x execution time improvement if the requests are batched and sent asynchronously. My code below:

import timeit
import asyncio

from aiohttp import ClientSession

from web3.providers.base import JSONBaseProvider
from web3.providers import HTTPProvider
from web3 import Web3

# synchronously request receipts for given transactions
def sync_receipts(web3, transactions):
    for tran in transactions:
        web3.eth.getTransactionReceipt(tran)

# asynchronous JSON RPC API request
async def async_make_request(session, url, method, params):
    base_provider = JSONBaseProvider()
    request_data = base_provider.encode_rpc_request(method, params)
    async with session.post(url, data=request_data,
                        headers={'Content-Type': 'application/json'}) as response:
        content = await response.read()
    response = base_provider.decode_rpc_response(content)
    return response

async def run(node_address, transactions):
    tasks = []

    # Fetch all responses within one Client session,
    # keep connection alive for all requests.
    async with ClientSession() as session:
        for tran in transactions:
            task = asyncio.ensure_future(async_make_request(session, node_address,
                                                            'eth_getTransactionReceipt',[tran.hex()]))
            tasks.append(task)

        responses = await asyncio.gather(*tasks)

if __name__ == "__main__":
    eth_node_address = "http://localhost:8545"
    web3 = Web3(HTTPProvider(eth_node_address))

    block = web3.eth.getBlock(web3.eth.blockNumber)
    transactions = block['transactions']

    start_time = timeit.default_timer()
    sync_receipts(web3, transactions)
    print('sync: {:.3f}s'.format(timeit.default_timer() - start_time))

    start_time = timeit.default_timer()
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(run(eth_node_address, transactions))
    loop.run_until_complete(future)
    print('async: {:.3f}s'.format(timeit.default_timer() - start_time))
@pipermerriam

This comment has been minimized.

Copy link
Member

commented Aug 13, 2018

Yes, these look like very nice performance boosts. I'd be excited to see someone start working on the web3.async API so that users can start taking advantage of the performance gains.

@medvedev1088

This comment has been minimized.

Copy link
Contributor

commented Aug 19, 2018

@jakublipinski Have you tried batch JSON RPC requests https://www.jsonrpc.org/specification#batch? I found that it significantly boosts performance.

@voith

This comment has been minimized.

Copy link
Contributor

commented Aug 20, 2018

Looks like I had a totally incorrect understanding of this feature. I was thinking of batching in terms of concurrency, but after reading the json-rpc documentation, batching seems to mean packing several request objects into a single request.

rpc call Batch:

--> [
        {"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"},
        {"jsonrpc": "2.0", "method": "notify_hello", "params": [7]},
        {"jsonrpc": "2.0", "method": "subtract", "params": [42,23], "id": "2"},
        {"foo": "boo"},
        {"jsonrpc": "2.0", "method": "foo.get", "params": {"name": "myself"}, "id": "5"},
        {"jsonrpc": "2.0", "method": "get_data", "id": "9"} 
    ]
<-- [
        {"jsonrpc": "2.0", "result": 7, "id": "1"},
        {"jsonrpc": "2.0", "result": 19, "id": "2"},
        {"jsonrpc": "2.0", "error": {"code": -32600, "message": "Invalid Request"}, "id": null},
        {"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": "5"},
        {"jsonrpc": "2.0", "result": ["hello", 5], "id": "9"}
    ]

Thanks @medvedev1088

@Uxio0

This comment has been minimized.

Copy link
Contributor Author

commented Aug 20, 2018

@medvedev1088 @voith nice! That's what I meant from the beginning. I got a really good performance doing that

@medvedev1088

This comment has been minimized.

Copy link
Contributor

commented Aug 20, 2018

@Uxio0 @voith Any ideas how it can be added to web3.py? I'm afraid it will require substantial refactoring.

@Uxio0

This comment has been minimized.

Copy link
Contributor Author

commented Aug 20, 2018

@medvedev1088 I would suggest to use the same approach as web3 js, https://github.com/ethereum/wiki/wiki/JavaScript-API#batch-requests

Reading the source code I understand it would need a lot of refactor, though

@voith

This comment has been minimized.

Copy link
Contributor

commented Aug 20, 2018

@Uxio0 @medvedev1088 One thing that is at the top of my mind is that the middlewares will need a some refactoring. Middlewares at the moment work under the assumption that there will always be one method that will be called. This feature will need middlewares to accept list of methods.

if we build it on top of #657 since everything under web3.async.* will be coroutine based and thus, batching them to run concurrently

@pipermerriam It doesn't seem like we need concurrency to get batching work(#832 (comment)). All we need is support for packing multiple request objects into a single request.

cc @boneyard93501

@pipermerriam

This comment has been minimized.

Copy link
Member

commented Aug 20, 2018

It doesn't seem like we need concurrency to get batching work

@voith yes, I was reading batching to mean something different and have since learned about the batching of API requests.

Yes, this is likely to require some underlying architecture changes. I wouldn't be surprised if we ended up needing a major version bump to really fit this in (though we can add it as an opt-in feature ahead of that schedule or something).

In general, supporting this in web3.py is something I can get behind, it'll just take sorting out how to do it in cleanly.

@jakublipinski

This comment has been minimized.

Copy link

commented Aug 21, 2018

I updated my test code to include batch requests and it turns out batching is indeed faster than synchronous requests but it's still 2.5x slower than asynchronous approach :

sync: 80.447s
async: 17.552s
batch: 52.311s

(Tested by requesting receipts from the parity node from last 20 blocks. The script was run on the same machine)

I looked at the web3.py code and confirm that it would be much easier to introduce batching / async requests if middlewares are refactored so that there is more control on when and how they're called.

@medvedev1088

This comment has been minimized.

Copy link
Contributor

commented Aug 22, 2018

@jakublipinski great results! I think combining async and batch will be even faster.

Also curious to see the results when run against a remote Ethereum node e.g. Infura. Batching allows to eliminate many round trips so the performance boost will be different from when run against a local node since latency is often the culprit of poor performance.

@jakublipinski

This comment has been minimized.

Copy link

commented Aug 22, 2018

@medvedev1088 The following are the test results with Infura mainnet (requesting all receipts from the last 5 blocks):

sync: 96.470s
async: 3.491s
batch: 3.467s

Async requests make sense only if there are several of them. Batching allows you to combine all the requests into one. There will be no benefit from using both batching and async.

@medvedev1088

This comment has been minimized.

Copy link
Contributor

commented Aug 22, 2018

@jakublipinski I meant the situation when you want to export many blocks. Imagine you need to export 6M blocks. You can't possibly batch them into a single request. What you can do though is to create batches of 100 blocks then run them in parallel.

@jakublipinski

This comment has been minimized.

Copy link

commented Aug 22, 2018

@medvedev1088 Yes, you can use async requests in such case.

@pipermerriam

This comment has been minimized.

Copy link
Member

commented Aug 27, 2018

I'm curious to hear other people's ideas on how to implement this. Here is the best I've thought of thus far.

First, we need to modify how web3 does requests to support the concept of building a request. I'm thinking that all of the callables that current issue requests get a new api like the following.

>>> w3.eth.getBalance.build_request('0x1234abcd...')
Request(['eth_getBalance', ['0x1234abcd...']])

Then we can do something like this.

>>> w3.batch_request(
...     w3.eth.getBalance.build_request('0x1234abcd...'),
...     w3.eth.getCode.build_request('0x1234abcd...'),
...     ...
... )
...
(
    80000000000  # result of `getBalance`
    '0x63deadbeef'  # result of `getCode`
    ...
)

Updates will also have to happen for how middlewares process requests and responses but I think this will be doable by adding a new Manager.batch_request_blocking which handles the different logic.

This is a reasonably significant change, as all of the existing web3 modules will need to be updated so that the request functions have the new build_request API which might be a new class with a __call__ implementation or maybe doable as a decorator.

@Zapata Zapata referenced this issue Sep 8, 2018

Open

Implement JSON-RPC batch #1310

0 of 16 tasks complete
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.