Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use your library to read from multiple locations in a file #66

Closed
JohannesLichtenberger opened this issue Oct 16, 2022 · 15 comments
Closed
Assignees

Comments

@JohannesLichtenberger
Copy link

Hello,

I want to use io_uring for my database prototype [1]. I have interfaces for doing I/O to read and write page fragments, which are only word aligned, but do not have a predefined length, otherwise.

A parent page of the leaf data page fragments in a trie index has at most N (currently set to 4 per default) references to leaf data page fragments. In order to reconstruct a leaf data page in memory at most these N page fragments have to be read. Currently, this is the function to read the page fragments in parallel: https://github.com/sirixdb/sirix/blob/1385d96916015f9703a6ef6e34de0c268e1536a7/bundles/sirix-core/src/main/java/org/sirix/access/trx/page/NodePageReadOnlyTrx.java#L544

I guess it would make sense to switch to Java 19 and to switch to virtual threads for this. In the I/O layer I'd use your library for instance instead of a simple FileChannel based implementation to read page fragments: https://github.com/sirixdb/sirix/blob/master/bundles/sirix-core/src/main/java/org/sirix/io/filechannel/FileChannelReader.java

Do you think it makes sense? However I'm not sure... guess I would have to change the interfaces to return Futures with the pages to do async I/O already in the I/O layer itself: https://github.com/sirixdb/sirix/blob/master/bundles/sirix-core/src/main/java/org/sirix/io/Reader.java

@JohannesLichtenberger
Copy link
Author

I think the library would also not be compatible with virtual threads, because of the Future return type?

@ikorennoy ikorennoy self-assigned this Oct 16, 2022
@ikorennoy
Copy link
Owner

ikorennoy commented Oct 16, 2022

Hello!

Do you have maybe some upper boundary value for page fragment size? As I can see in the FileChannelReader implementation, first, you read a page fragment size, then allocate the buffer of this size, and then try to read the whole fragment. I assume you have some upper boundary, and in that case, you can use BufRing API. It allows you to pre-register a buffer pool in io_uring, and then when you try to read, you don't need to allocate a buffer. Just tell io_uring to choose a buffer from the buffer pool and read into it. In your case, a BufRing buffer size would be the expected upper boundary of a page fragment. A number of buffers are a bit trickier. You should carefully choose it to avoid unnecessary memory consumption. I'd say ringEnrties/2 should be enough, but it needs to be tested. Unfortunately, BufRing API works only starting Linux Kernel 5.19. If it's not possible to use that recent kernel, I think plain read to DirectBuffer would work pretty well. BufRing API example.

Also, io_uring works really well when you need to submit several reads at a time (as far as I understand, it is your case). In the previous issue, I sent some benchmark results; I'll duplicate them here. To be precise, the benchmark was done with Kotlin Coroutines without CompletableFuture, but the core API is the same so I don't think it would be completely different:

FileChannel, buffer size 4096 bytes, one thread:

IOPS=16794, BW=65MiB/s, IOS/call=1/0
IOPS=16739, BW=65MiB/s, IOS/call=1/0
IOPS=16872, BW=65MiB/s, IOS/call=1/0

io_uring, buffer size 4096, one thread, 32 operations per iteration:

IOPS=76544, BW=299MiB/s, IOS/call=32/32
IOPS=79811, BW=311MiB/s, IOS/call=32/32
IOPS=79645, BW=311MiB/s, IOS/call=31/31

If you use more threads, the difference is even more dramatic:

FileChannel, buffer 8192 bytes, four threads:

IOPS=54958, BW=429MiB/s, IOS/call=1/0
IOPS=55163, BW=430MiB/s, IOS/call=1/0
IOPS=55294, BW=431MiB/s, IOS/call=1/0

io_uring, buffer 8192 bytes four threads, 32 operations per iteration:

IOPS=183975, BW=1437MiB/s, IOS/call=32/32
IOPS=184222, BW=1439MiB/s, IOS/call=32/32
IOPS=185061, BW=1445MiB/s, IOS/call=31/31

About Project Loom. I've tried integrating Loom and io_uring and didn't come up with something completely different from the current implementation. The problem is io_uring - it is a completeness interface, which means it performs I/O operations and returns result that you usually want to pass to some callback (I.E., CompletableFuture, Kotlin Coroutine, etc.). Compared to epoll, which is a readiness interface that already has implementation backed by Project Loom. Readiness means it does not perform actual I/O. It only notifies you when a file descriptor is ready to operate, and then a thread (virtual or OS, it doesn't matter) could perform I/O and not be blocked. To illustrate it, I'll put some code for the JDK 19 (it executed on a virtual thread).

n = tryRead(fd, b, off, len); // because the socket in non-blocking mode, even if the data is not yet ready read will returns immediately with the result EAGAIN
    while (IOStatus.okayToRetry(n) && isOpen()) {
        park(fd, Net.POLLIN); // <- here we register the thread in epoll and in event loop and call LockSupport.park, which will stop it and release the OS thread
        n = tryRead(fd, b, off, len); // this line will be called when epoll returns the event that the socket is ready and event loop wakes up the thread
    }

Then, when epoll sends a notification, event loop wakes up the virtual thread, and it continues from where it was stopped.

Thread t = map.remove(fdVal);
if (t != null) {
    LockSupport.unpark(t); // wakes up the virtual thread
}

So it is clear that Virtual Thread works well with the readiness model. I tried to reach JVM developers and ask them about the approach they will use with io_uring. As far as I understand, they have a prototype with it for the network, but it's not public yet. I guess when the implementation for the file I/O is done, it probably will be returning Future because of io_uring nature, and it will be the modification of AsynchronousFileChannelImpl that internally will use not a thread pool but event loop and io_uring.

About compatibility of io_uring and Project Loom. I believe that one of the main features of a Virtual Thread is that you can safely block it, at least in JDK, in synchronization primitives that internally use LockSupport.park. So if you are going to use Virtual Thread, it's not necessary to change Reader interface. You only need to ensure that a thread on which read is executed - it's a virtual thread, so you can safely call get() and that CompletableFuture is adapted to use with Virtual Threads (I'm sure it is, or at least it will be).

If you decide to write integration with io_uring, I would love to join the code review.

Thank you for your interest!

@JohannesLichtenberger
Copy link
Author

I guess the problem in my case is that currently 1024 records are stored at most in a page-fragments of the trie (specified in a Constants class IIRC). We store XML and JSON databases currently and allow optionally to store a DeweyID for each node (to quickly derive the relationships to other nodes), but they have a variable size. They are stored sorted and huffman encoded besides the actual records and have a variable size. I think overlong records itself are stored in Overflow pages simply referenced. That said if DeweyIDs are stored there's no upper bound to a paged fragment size. The size is variable and only padded to a word (multiple of 8 bytes).

The only method which is issuing parallel but synchronous I/O is the method you already saw:
https://github.com/sirixdb/sirix/blob/1385d96916015f9703a6ef6e34de0c268e1536a7/bundles/sirix-core/src/main/java/org/sirix/access/trx/page/NodePageReadOnlyTrx.java#L544

To be honest I'm not sure if I should simply implement something similar to the package for the file Channel based I/O: https://github.com/sirixdb/sirix/tree/master/bundles/sirix-core/src/main/java/org/sirix/io/filechannel

And simply call .get() on the Future. However I'm not sure if that's really the intended way to use io_uring.

What's more clear might be that I most likely simply can change the getPreviousPageFragments method to use virtual threads after switching to Java 19.

I also wonder if it may be possible to serialize the page fragments asynchronally to disk. Currently in a postorder traversal the trx intent log is checked for a modification and the changed fragments are serialized synchronously to disk and appended to the file. During each write of a page fragment the reference from the parent is adapted with a SHA256 checksum of the actual child content (as in ZFS for instance). Once the commit method returns all data must have been flushed to disk obviously.

BTW: PRs are more than welcome as I really struggle with I/O performance knowledge. Thanks for the small fixes already :)

@JohannesLichtenberger
Copy link
Author

Only of limited scope, but there's a Coroutine based preorder iterator to traverse the binary JSON or XML records to prefetch right siblings of a node while descending to the children, thus there's also parallel I/O involved at least: https://github.com/sirixdb/sirix/tree/master/bundles/sirix-kotlin-api

@ikorennoy
Copy link
Owner

That said if DeweyIDs are stored there's no upper bound to a paged fragment size.

Got it, but then this could be a problem, imagine a situation where really big page fragments are read, and judging by the code, a buffer of the size of the page fragment is allocated immediately and then read at once, this could lead to unexpectedly large memory consumption. Maybe you should think about implementing reading page fragments in parts into a buffer of a certain size (4096 or 8192 bytes) with subsequent processing of the result and reading a new chunk into the same buffer.

And simply call .get() on the Future. However I'm not sure if that's really the intended way to use io_uring.

In case of using JVM before Project Loom, then on get() call from CompletableFuture this thread will be really blocked. But here it all depends on the API in which this CompletableFuture is used, if we design the API in such a way that we don't need to call the get() method, but can use the available CompletableFuture combinators (whenComplete, thenApply, etc), then there is no problem and we get asynchronous input/output. However, this is quite tricky.

This problem arises because Java has no built-in support for asynchronous, all we have are combinators on primitives like CompletableFuture. Third-party libraries like Project Reactor or RxJava also have nothing but combinators to offer. So in Java, when you want to use some asynchronous operations, unfortunately, there is no way to combine their results in an intuitive way, like in Kotlin with Coroutines, where read() operation can use io_uring inside and be completely asynchronous, and Coroutines' runtime and Kotlin compiler will take care of that.

In the case of Project Loom, however, things generally become similar to Kotlin Coroutines. If read() is called on Virtual Thread, you can safely call get() from CompletableFuture and wait for the result of the read. The runtime of JVM will lock the Virtual Thread (quite similar to what Kotlin Coroutines runtime does), and the OS thread will be released. When the result of the operation is passed to CompletableFuture, the runtime will wake up the Virtual Thread.

What's more clear might be that I most likely simply can change the getPreviousPageFragments method to use virtual threads after switching to Java 19

In case of using FileChannel (or even AsynchronousFileChannelImpl) this will not make the read non-blocking, because internally these classes use the usual blocking OS call (pread). So in the case of switching to Virtual Threads, both the Virtual Thread and the OS-carrying thread will be blocked when reading.

I also wonder if it may be possible to serialize the page fragments asynchronally to disk. Currently in a postorder traversal the trx intent log is checked for a modification and the changed fragments are serialized synchronously to disk and appended to the file. During each write of a page fragment the reference from the parent is adapted with a SHA256 checksum of the actual child content (as in ZFS for instance). Once the commit method returns all data must have been flushed to disk obviously.

If I understand correctly, it can be done about this way:

CompletableFuture[] writes = new CompletableFuture[pageFragments.length];
for (int i = 0; i < pageFragments.length; i++) {
    writes[i] = asyncFile.write(pageFragments[i]);
}
CompletableFuture.allOf(writes).get();
asyncFile.dataSync().get();

In this case the page fragments will be written asynchronously, at the end we need to wait for all the writes to finish, then call dataSync() to flush the cache to the device.
Again, if writing before Project Loom, we should design the API so that we don't need to call the get() method, but can use combinators.
If you can use Project Loom, just make sure that this code runs on a Virtual Thread, and then you can call get() without fear of blocking the OS thread.

BTW: PRs are more than welcome as I really struggle with I/O performance knowledge. Thanks for the small fixes already :)

You're welcome!

@JohannesLichtenberger
Copy link
Author

Yes, I think we should switch to virtual threads and implement an I/O package org.sirix.io.iouring and change the read method to return a Future<Page> or CompletableFuture<Page>. The existing implementations can simply return a completed future I guess, whereas the implementation with your library returns a "real" future. Do you know if virtual threads currently already work with CompletableFuture::get calls to suspend the method and use the OS carrier thread for other tasks in the meantime?

@JohannesLichtenberger
Copy link
Author

Maybe the new package would become obsolete once they support io_uring transparently with a standard JDK I/O API regarding the FileChannels for instance.

Regarding the DeweyId/max buffer size issue I think it's more of a theoretical issue, as JSON "trees" usually aren't that deep and the variable DeweyID sizes grow for instance with the max level. However, currently storage space is blown up considerably and the time to import a 3,8 Gb JSON file on my laptop considerably increases from around 3:30 to 5:30 minutes, so it's already disabled per default and I'm thinking about other tricks (maybe not storing leaf node DeweyIDs at all...). Storing SHA256 hashes truncated to 128bit for each node however "only" adds around 8 seconds and storage space is not blown up due to not storing leaf node hashes.

That said in UmbraDB they somehow use different variable sized buffer sizes (but size classes), so they have an upper bound and not truly variable size buffers.

@JohannesLichtenberger
Copy link
Author

That said if DeweyIDs are stored there's no upper bound to a paged fragment size.

Got it, but then this could be a problem, imagine a situation where really big page fragments are read, and judging by the code, a buffer of the size of the page fragment is allocated immediately and then read at once, this could lead to unexpectedly large memory consumption. Maybe you should think about implementing reading page fragments in parts into a buffer of a certain size (4096 or 8192 bytes) with subsequent processing of the result and reading a new chunk into the same buffer.

I guess this won't work or is at least very complicated to achieve. Right now I'm serializing the DeweyIDs first and then the records in the UnorderedKeyValuePage. Thus, after each record for instance we'd have to check if it crosses a boundary of the chunked buffer. I'm also not sure if it's a real issue to for instance read maybe 50kbyte instead of 4 at once. Usually, I think the page fragments might not be much larger and even if I think it may not be an issue? UmbraDB as the max buffer frame class has 512 KiB.

And simply call .get() on the Future. However I'm not sure if that's really the intended way to use io_uring.

In case of using JVM before Project Loom, then on get() call from CompletableFuture this thread will be really blocked. But here it all depends on the API in which this CompletableFuture is used, if we design the API in such a way that we don't need to call the get() method, but can use the available CompletableFuture combinators (whenComplete, thenApply, etc), then there is no problem and we get asynchronous input/output. However, this is quite tricky.

Yes, the page fragments itself must be read before the getPreviousPageFragments method returns. I think even for simplicity we should simply switch to Java 19. If the OS thread is not blocked by called .get() We're fine.

What's more clear might be that I most likely simply can change the getPreviousPageFragments method to use virtual threads after switching to Java 19

In case of using FileChannel (or even AsynchronousFileChannelImpl) this will not make the read non-blocking, because internally these classes use the usual blocking OS call (pread). So in the case of switching to Virtual Threads, both the Virtual Thread and the OS-carrying thread will be blocked when reading.

Is it somewhere documented which I/O calls simply "work" with virtual threads and where the unpark method is used? By work I mean where the compiler is able to "suspend" the method and free the underlying OS thread and schedule the resumed function again once the I/O finished. So the virtual thread stuff will use at most 4 OS threads in our case per transaction to reconstruct a full page in-memory as it is now already the case? But with your library they'll be reused, so we can only gain latency improvements :-)

I also wonder if it may be possible to serialize the page fragments asynchronally to disk. Currently in a postorder traversal the trx intent log is checked for a modification and the changed fragments are serialized synchronously to disk and appended to the file. During each write of a page fragment the reference from the parent is adapted with a SHA256 checksum of the actual child content (as in ZFS for instance). Once the commit method returns all data must have been flushed to disk obviously.

If I understand correctly, it can be done about this way:

CompletableFuture[] writes = new CompletableFuture[pageFragments.length];
for (int i = 0; i < pageFragments.length; i++) {
    writes[i] = asyncFile.write(pageFragments[i]);
}
CompletableFuture.allOf(writes).get();
asyncFile.dataSync().get();

In case of writes we don't have a bunch of page fragments of the same page. They exist because of a sliding snapshot versioning algorithm to reduce write and read amplification. Instead of copy-on-write of a whole data page (UnorderedKeyValuePage) only changed records are written in a new page fragment plus records which fall out of a window. This (single) page fragment is appended to the data file. We might be however somehow able to write all child pages of an index page in the trie asynchronally, but it's also not that trivial because the pages are traversed in a postorder traversal recursively (NodeTrx::commit): https://github.com/sirixdb/sirix/blob/721877f0f58a9a8775c66b762cba192442b1ce2b/bundles/sirix-core/src/main/java/org/sirix/access/trx/page/NodePageTrx.java#L328

Kind regards
Johannes

@JohannesLichtenberger
Copy link
Author

"Unfortunately, there’s one more imperfection in the initial virtual thread proposal: When a virtual thread executes a native method or a foreign function or it executes code inside a synchronized block or method, the virtual thread will be pinned to its carrier thread. A pinned thread will not unmount in situations where it otherwise would."

From https://blogs.oracle.com/javamagazine/post/java-loom-virtual-threads-platform-threads... maybe when using virtual threads we just have to wait for io_uring support, sadly.

@JohannesLichtenberger
Copy link
Author

That said, we probable still can use the Future return type approach and use your library without using virtual threads in the first place. I think the I/O stuff might be one of the worst parts currently in SirixDB and the write path should be faster when used as an embedded library in comparison to let's say MongoDB, because of additional client/server overhead. In my naive test it was on par when importing a 3,8 Gb JSON document after fixing several performance issues mainly regarding boxing/unboxing of primitive wrappers and other fixes. However, it was naively running a JUnit test via Gradle in IntelliJ vs a MongoDB Dockers container... so no JMH benchmark as of now meaning warm-up overhead + IntelliJ and Gradle overhead....

@ikorennoy
Copy link
Owner

Is it somewhere documented which I/O calls simply "work" with virtual threads and where the unpark method is used?

No, not really. But there's a simple rule here, it's only possible if the OS provides such an API. In Linux before io_uring, there was actually no such API that would work in general and allow asynchronous I/O for files. There is AIO, but it has quite a few limitations and epoll only works for sockets.

That said, we probable still can use the Future return type approach and use your library without using virtual threads in the first place. I think the I/O stuff might be one of the worst parts currently in SirixDB and the write path should be faster when used as an embedded library in comparison to let's say MongoDB, because of additional client/server overhead. In my naive test it was on par when importing a 3,8 Gb JSON document after fixing several performance issues mainly regarding boxing/unboxing of primitive wrappers and other fixes. However, it was naively running a JUnit test via Gradle in IntelliJ vs a MongoDB Dockers container... so no JMH benchmark as of now meaning warm-up overhead + IntelliJ and Gradle overhead....

I think that in parallel with adapting jasyncfio it would be good to write benchmarks and get some numbers on throughput and latency for the current implementation. Also, compare them with MongoDB, for example. Then you can try to optimize the current I/O implementation, I noticed some simple things in the code that you can safely change now, e.g:

buffer = ByteBuffer.allocate(dataLength).order(ByteOrder.nativeOrder());

There is no point in allocating the HeapByteBuffer, since the DirectByteBuffer will be allocated inside anyway, to which the reading will be performed, and then the result will be copied to the HeapByteBuffer, one extra copy. Then the result of the reading is copied again already in the byte array:

 buffer.get(page);

it is desirable to avoid this as well and work directly with the allocated buffer.
I think you can start with those places and get rid of unnecessary copying where it's safe to do so.

Probably using jasyncfio will naturally avoid unnecessary copying, since the library only works with DirectBuffer :)

@JohannesLichtenberger
Copy link
Author

JohannesLichtenberger commented Oct 18, 2022

That said, we probable still can use the Future return type approach and use your library without using virtual threads in the first place. I think the I/O stuff might be one of the worst parts currently in SirixDB and the write path should be faster when used as an embedded library in comparison to let's say MongoDB, because of additional client/server overhead. In my naive test it was on par when importing a 3,8 Gb JSON document after fixing several performance issues mainly regarding boxing/unboxing of primitive wrappers and other fixes. However, it was naively running a JUnit test via Gradle in IntelliJ vs a MongoDB Dockers container... so no JMH benchmark as of now meaning warm-up overhead + IntelliJ and Gradle overhead....

I think that in parallel with adapting jasyncfio it would be good to write benchmarks and get some numbers on throughput and latency for the current implementation. Also, compare them with MongoDB, for example. Then you can try to optimize the current I/O implementation, I noticed some simple things in the code that you can safely change now, e.g:

buffer = ByteBuffer.allocate(dataLength).order(ByteOrder.nativeOrder());

There is no point in allocating the HeapByteBuffer, since the DirectByteBuffer will be allocated inside anyway, to which the reading will be performed, and then the result will be copied to the HeapByteBuffer, one extra copy. Then the result of the reading is copied again already in the byte array:

 buffer.get(page);

Do you mean instead to use a DirectByteBuffer? I didn't know that there's a simple array() method which however obviously only works for heap byte buffers.

it is desirable to avoid this as well and work directly with the allocated buffer. I think you can start with those places and get rid of unnecessary copying where it's safe to do so.

I think that's sadly not possible, as afterward, I want to perform byte operations (for instance, decompress via Snappy and, in the future, decrypt the read bytes and they almost always either need a byte-array or a stream (currently using the stream).

Thanks for your valuable input all the time :-)

@JohannesLichtenberger
Copy link
Author

Also, this is a lot of copying I'm not sure how to change because of decompress- and in the future hopefully decrypt-handlers:

@NotNull
  private Page getPage(PageReadOnlyTrx pageReadTrx, byte[] page) throws IOException {
    final var inputStream = byteHandler.deserialize(new ByteArrayInputStream(page));
    final Bytes<ByteBuffer> input = Bytes.elasticByteBuffer();
    BytesUtils.doWrite(input, inputStream.readAllBytes());
    final var deserializedPage = pagePersiter.deserializePage(pageReadTrx, input, type);
    input.clear();
    return deserializedPage;
  }

@ikorennoy
Copy link
Owner

Do you mean instead to use a DirectByteBuffer? I didn't know that there's a simple array() method which however obviously only works for heap byte buffers.

Yes, but given that you really need to use intermediate operations on streams (compression, encryption), you can leave the HeapBuffer for now and just remove the extra copying as much as possible.

To be safe in optimizations it would be nice to profile SirixDB. If you use IntelliJ IDEA on Linux, you already have a very accurate profiler built in (async profiler).

Also, this is a lot of copying I'm not sure how to change because of decompress- and in the future hopefully decrypt-handlers:

In fact the intermediate handlers act as decorators of the main stream, so no copying happens :)

Also, I wanted to add more about benchmarks. I recommend not trying to write neat microbenchmarks at this point. You can write a few macrobenchmarks, it will be much easier and immediately useful. By macrobenchmark I mean exactly what you described as a naive test:

In my naive test it was on par when importing a 3,8 Gb JSON

It's actually an extremely good approach, you just need to do it a bit more carefully to avoid unnecessary overheads and amortize JVM costs. Also, these macrobenchmarks will allow you to profile SirixDB and see which subsystems are taking up most of the CPU time.

Thanks for your valuable input all the time :-)

You're welcome, I hope my experience will be useful!

If you have any more questions about file I/O or anything related, feel free to contact korennoy.ilya at gmail.com, I suggest to move further discussion in the mail, as the discussion goes a little away from jasyncfio :)

@JohannesLichtenberger
Copy link
Author

JohannesLichtenberger commented Oct 18, 2022

Yes, I've already a six-benchmarks bundle/module added a couple of years ago and modified recently, but unfortunately, I'm not able to run the tests anymore because the runner, for some reason, is not able to find the tests (but the simple regex should be ok IMHO). I've up until now only XMark / XQuery tests, but I'm concentrating all effort for a couple of years now on storing JSON in our native binary format and querying via JSONiq now :-) yes, if you'd like, you can also join a channel on Discord: https://discord.gg/yC33wVpv7t :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants