New native database adapter #859
New native database adapter #859
Conversation
Replaces #756 |
Problems that arose while working on this. Pretty much everywhere in Trinity where we interact with the chain we use the async wrapper/proxy objects for things like This is how Trinity keeps the networking code from being blocked by expensive CPU-bound operations related to the EVM or Chain. If we want to replace the way we interact with the database, we will also be required to replace this mechanism. I.E. we will still need some semblance of a proxy object that represents the actual With the direction that we are currently moving, this probably means architecture that looks something like:
Doing this today will involve a lot of boilerplate since we'd need to write events and handlers for all of the methods. @cburgdorf recently did some similar things for the What I think we need for this to be less heavy on the boilerplate code and thus on the complexity of working with it is some sort of machinery to automate the concept of serving an object's methods over an event bus. Something like a metaclass that generates all of the events under the hood as well as the handlers for serving the request/response for those events. |
a946e86
to
aebe299
Compare
348c0f1
to
692483a
Compare
@carver I ended up running with this to try an easier way to get it working. I did a minimal change such that all of the I'm wondering if you can profile beam sync against this branch to see if we actually gain or lose any measurable performance using this mechanism |
21cdee5
to
3f3baa0
Compare
Hm, maybe a small lift, but nothing obvious. At least, it doesn't seem to hurt! :) At this point, it can be hard to tell because Beam Sync is usually staying caught up. I have a few profiling improvements I've been thinking about, which might help me give you a definitive answer later today. |
Cool, I'll work on cleaning this up with intent to merge after some review next week. |
Test failures are unrelated to this PR |
local profiling shows these numbers:
Better profiling would be to do something like running a |
@@ -1,6 +1,7 @@ | |||
import trio | |||
|
|||
import pytest | |||
import pytest_trio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract this re-org to its own PR
trinity/chains/base.py
Outdated
|
||
@abstractmethod | ||
async def coro_validate_chain( | ||
self, | ||
parent: BlockHeader, | ||
chain: Tuple[BlockHeader, ...], | ||
seal_check_random_sample_rate: int = 1) -> None: | ||
pass | ||
... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: extract to stand-alone cleanup PR
trinity/db/eth1/header.py
Outdated
|
||
|
||
class BaseAsyncHeaderDB(ABC): | ||
def async_thread_method(method: Callable[..., Any]) -> Callable[..., Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably belongs somewhere under trinity._utils
try: | ||
manager.wait_stopped() | ||
except KeyboardInterrupt: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: I don't think this is actually running it's cleanup step because the IPC path is consistently left behind.
trinity/protocol/common/peer.py
Outdated
@@ -149,7 +149,7 @@ def __init__(self, | |||
super().__init__(token) | |||
|
|||
def __str__(self) -> str: | |||
return f"{self.__class__.__name__} {self.remote.uri}" | |||
return f"{self.__class__.__name__} {self.remote}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bugfix can be extracted to standalone
trinity/server.py
Outdated
@@ -295,87 +293,3 @@ def _make_receive_server(self) -> BCCReceiveServer: | |||
peer_pool=self.peer_pool, | |||
token=self.cancel_token, | |||
) | |||
|
|||
|
|||
def _test() -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removal of these tests can be extracted to standalone
@@ -233,7 +234,8 @@ def _schedule(self, node_key: Hash32, parent: SyncRequest, depth: int, | |||
called. | |||
""" | |||
self.committed_nodes += 1 | |||
await self.db.coro_set(request.node_key, request.data) | |||
loop = asyncio.get_event_loop() | |||
await loop.run_in_executor(None, self.db.set, request.node_key, request.data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK this is the only place that used the base async db methods so I just updated it to run the sync versions asynchronously using a thread.
a5e8ba7
to
8d73204
Compare
8d73204
to
b367112
Compare
|
||
clients = [ | ||
multiprocessing.Process(target=run_client, args=[ipc_path, i]) | ||
for i in range(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This benchmark needs to be updated to have a CLI that allows specifying these values like how many clients, how many keys/values and it needs to get added to the CI run.
trinity/db/manager.py
Outdated
elif result_byte == FAIL_BYTE: | ||
return False | ||
else: | ||
raise Exception("Unknown result byte: {result_byte.hex}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing f
for f-string.
trinity/db/manager.py
Outdated
elif result_byte == FAIL_BYTE: | ||
raise KeyError(key) | ||
else: | ||
raise Exception("Unknown result byte: {result_byte.hex}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing f
for f-string.
trinity/db/manager.py
Outdated
elif result_byte == FAIL_BYTE: | ||
raise KeyError(key) | ||
else: | ||
raise Exception("Unknown result byte: {result_byte.hex}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing f
for f-string.
trinity/db/manager.py
Outdated
*delete_sizes, | ||
) | ||
kv_data = b''.join(itertools.chain(*pending_kv_pairs)) | ||
delete_data = b''.join(pending_deletes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can combine this with the statement above inside the chain
call
d63dc5d
to
b3613b5
Compare
2c01c2f
to
40c2b04
Compare
8efe867
to
c3db355
Compare
c3db355
to
2dd9c2f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Submitting a batch of comments so you can start reading this review while I continue going through this PR.
manager.logger.info('started db manager') | ||
yield manager | ||
manager.logger.info('exiting db manager') | ||
manager.logger.info('exited db manager') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These three log lines (and the clos.* client
lines just below) might not be necessary anymore now that this has reached the point of a PR waiting to be merged.
@@ -382,7 +382,7 @@ def finalizer(): | |||
async with run_peer_pool_event_server( | |||
event_bus, server_peer_pool, handler_type=LESPeerPoolEventServer | |||
), run_request_server( | |||
event_bus, FakeAsyncChainDB(chaindb_20.db), server_type=LightRequestServer | |||
event_bus, AsyncChainDB(chaindb_20.db), server_type=LightRequestServer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really cool, being able to drop all these FakeAsyncChainDB
s.
@@ -160,8 +159,7 @@ def get_beacon_shell_context(database_dir: Path, trinity_config: TrinityConfig) | |||
|
|||
trinity_already_running = ipc_path.exists() | |||
if trinity_already_running: | |||
db_manager = beacon.manager.create_db_consumer_manager(ipc_path) # type: ignore | |||
db = db_manager.get_db() | |||
db = DBClient.connect(ipc_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Man, this interface is so much better!
newsfragments/859.feature.rst
Outdated
@@ -0,0 +1 @@ | |||
Replace ``multiprocessing`` based database access with a custom implementation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since newsfragments
is user-facing maybe mention the performance boost too!
ATOMIC_BATCH Response: | ||
|
||
- Success Byte: 0x01 | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<3 these docstrings
self._stopped.set() | ||
break | ||
self.logger.debug('Server accepted connection: %r', addr) | ||
threading.Thread( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What made you decide to do this synchronously w/ a bunch of threads? I'm surprised! Writing a server which response to requests from a lot of clients seems like exactly the asyncio-usecase. Maybe it's a performance thing, calls into leveldb block the calling thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We did both and the synchronous outperformed the asyncio
implementation by roughly 2-3x. We didn't do any real measuring of why but I suspect event loop overhead is just way more significant than the gains we get from being able to do work while the i/o
is occuring.
try: | ||
operation = Operation(operation_byte) | ||
except TypeError: | ||
self.logger.error("Unrecognized database operation: %s", operation_byte.hex()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be useful to also print raw_socket
here like above, getting out of sync like this is a pretty serious problem! It could be useful to know who's having trouble talking to the database.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't actually think we get any useful information from that which would let us know who is misbehaving...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, not without probably too much extra work.
trinity/db/manager.py
Outdated
""" | ||
logger = logging.getLogger('trinity.db.manager.DBManager') | ||
|
||
def __init__(self, db: BaseAtomicDB): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be worth adding a comment here, we're not exactly expecting a BaseAtomicDB
, we're expecting one which is also threadsafe.
trinity/db/manager.py
Outdated
|
||
|
||
class empty: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accidental commit?
return cls(s) | ||
|
||
|
||
class AtomicBatch(BaseDB): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's frustrating that this so nearly duplicates py-evm's AtomicDBWriteBatch
, there might be a refactoring of it which allows it to be used here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll open an issue.
pass | ||
|
||
|
||
class DBClient(BaseAtomicDB): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dropping multiprocessing is already a nice improvement and this gives me some ideas for future PRS:
I expect that we'll see an even larger performance improvement if these methods were to become asynchronous! Because these methods are synchronous every call to the database blocks the event loop, if the database ever gets backed up then the other processes will back up along with it when they could be talking over the network or otherwise let their other coros truck along.
And this is a far-future idea, but since so much of our database is immutable data I wonder whether some kind of caching on this side of the process could also improve performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that any caching would likely need to happen at a higher level but yes, there are future improvements that could be made and now that we fully own this API we have a lot of freedom to explore.
|
||
def __getitem__(self, key: bytes) -> bytes: | ||
if self._track_diff is None: | ||
raise ValidationError("Cannot get data from a write batch, out of context") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These error messages assume that AtomicBatch
will only ever be used by DBClient
, it might be better for AtomicBatch
to have a name which better indicates that it's private to this file, or for AtomicBatch
to do it's own context management, or for these messages to say something like "cannot use AtomicBatch after it has been finalized".
@@ -163,7 +163,8 @@ def next_batch(self, n: int = 1) -> List[SyncRequest]: | |||
if node_key in self.nodes_cache: | |||
self.logger.debug2("Node %s already exists in db", encode_hex(node_key)) | |||
return | |||
if await self.db.coro_exists(node_key): | |||
loop = asyncio.get_event_loop() | |||
if await loop.run_in_executor(None, self.db.exists, node_key): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're already blocking the loop everywhere we call a method on this database so it probably isn't a big problem to keep the code clean and call self.db.exists
directly here, and same with self.db.set
below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, however I don't actually forsee this being an issue because all of our database operations occur behind HeaderDB
or ChainDB
and for the most part we use async_thread_method
to mitigate the blocking nature of these calls.
IIRC, making these calls asynchronous was an intentional choice back when this code was written and it produced a performance gain. Also, I think that this code is going to go away with beam/firehose so I'm not inclined to do much more than maintain status quo.
break | ||
|
||
try: | ||
if operation is GET: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not fully on the if-less programming bandwagon but wanted to drop a comment here, I bet this could be turned into an: operation.perform(self.db, sock)
. And a world where the different operations are different subclasses of Operation
is also a world where the method to send an ATOMIC_BATCH
and the method to receive an ATOMIC_BATCH
sit right next to each other, which might make future changes easier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ChihChengLiang originally wrote them that way but I felt they were harder to understand, though that could have just been the code organization or something. Going to leave that idea for a future refactor. Also noting that I tried to minimize any overhead since this is somewhat performance sensitive code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's exciting to see so much multiprocessing code be deleted, this is a great improvement!
2dd9c2f
to
2fc22bd
Compare
What was wrong?
Currently, we've been using the
multiprocessing
facilities to access the database and chain from multiple processes.How was it fixed?
This implements a custom server that serves the
AtomicDatabaseAPI
over an IPC socket and a client that implements the fullAtomicDatabaseAPI
.This requires converting all of the various
Chain
,ChaindDB
andHeaderDB
operations to now run in the local process using a thread-based executor.To-Do
Cute Animal Picture