Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Frame format for data transfer and short-term storage. #12745

Merged
merged 10 commits into from
Jul 9, 2022

Conversation

gianm
Copy link
Contributor

@gianm gianm commented Jul 5, 2022

As we move towards query execution plans that involve more transfer
of data between servers, it's important to have a data format that
provides for doing this more efficiently than the options available to
us today.

This patch adds:

  • Columnar frames, which support fast querying. Writes are faster than
    on the segment format. Querying is slower than equivalent operations
    on the segment format, due to lack of indexes and due to various choices
    intended to support fast writes as well reasonably fast reads. Benchmarks
    below.
  • Row-based frames, which support fast sorting via memory comparison
    and fast whole-row copies via memory copying.
  • Frame files, a container format that can be stored on disk or
    transferred between servers.

Central classes:

The code in this patch is not used in production yet. Therefore, the
patch involves minimal changes outside of the org.apache.druid.frame
package. The main one is a change to StorageAdapter.getNumRows to
change unknown-cardinality from Integer.MAX_VALUE to
DimensionDictionarySelector.CARDINALITY_UNKNOWN. All the callers
I found can handle this OK, and it is more sound: it better matches the
behavior of dimension selectors created from the adapters.

Future patches in the #12262 sequence will use these frames for data
transfer and short-term storage.

Benchmarks for queries on frames vs. traditional segments (mmap):

Benchmark              (query)  (rowsPerSegment)   (storageType)  (vectorize)  Mode  Cnt    Score   Error  Units
SqlBenchmark.querySql        0           2000000            mmap        false  avgt   15    6.296 ± 0.081  ms/op
SqlBenchmark.querySql        0           2000000       frame-row        false  avgt   15   88.495 ± 0.579  ms/op
SqlBenchmark.querySql        0           2000000  frame-columnar        false  avgt   15   13.715 ± 0.562  ms/op
SqlBenchmark.querySql       10           2000000            mmap        false  avgt   15  251.530 ± 4.862  ms/op
SqlBenchmark.querySql       10           2000000       frame-row        false  avgt   15  626.003 ± 4.862  ms/op
SqlBenchmark.querySql       10           2000000  frame-columnar        false  avgt   15  466.353 ± 0.603  ms/op
SqlBenchmark.querySql       18           2000000            mmap        false  avgt   15  172.775 ± 0.890  ms/op
SqlBenchmark.querySql       18           2000000       frame-row        false  avgt   15  225.835 ± 2.350  ms/op
SqlBenchmark.querySql       18           2000000  frame-columnar        false  avgt   15  177.613 ± 1.210  ms/op

Benchmark              (query)  (rowsPerSegment)   (storageType)  (vectorize)  Mode  Cnt    Score    Error  Units
SqlBenchmark.querySql        0           2000000            mmap        force  avgt   15    0.509 ±  0.013  ms/op
SqlBenchmark.querySql        0           2000000  frame-columnar        force  avgt   15    7.524 ±  0.123  ms/op
SqlBenchmark.querySql       10           2000000            mmap        force  avgt   15  174.626 ± 10.985  ms/op
SqlBenchmark.querySql       10           2000000  frame-columnar        force  avgt   15  455.922 ± 19.296  ms/op
SqlBenchmark.querySql       18           2000000            mmap        force  avgt   15   38.537 ±  1.182  ms/op
SqlBenchmark.querySql       18           2000000  frame-columnar        force  avgt   15   50.755 ±  0.751  ms/op

As we move towards query execution plans that involve more transfer
of data between servers, it's important to have a data format that
provides for doing this more efficiently than the options available to
us today.

This patch adds:

- Columnar frames, which support fast querying.
- Row-based frames, which support fast sorting via memory comparison
  and fast whole-row copies via memory copying.
- Frame files, a container format that can be stored on disk or
  transferred between servers.

The idea is we should use row-based frames when data is expected to
be sorted, and columnar frames when data is expected to be queried.

The code in this patch is not used in production yet. Therefore, the
patch involves minimal changes outside of the org.apache.druid.frame
package.  The main ones are adjustments to SqlBenchmark to add benchmarks
for queries on frames, and the addition of a "forEach" method to Sequence.
@lgtm-com
Copy link

lgtm-com bot commented Jul 5, 2022

This pull request introduces 9 alerts when merging a85ce92 into 06251c5 - view on LGTM.com

new alerts:

  • 4 for Result of multiplication cast to wider type
  • 2 for Dereferenced variable may be null
  • 1 for Useless comparison test
  • 1 for Unused format argument
  • 1 for Missing format argument

@gianm
Copy link
Contributor Author

gianm commented Jul 6, 2022

This pull request introduces 9 alerts when merging a85ce92 into 06251c5 - view on LGTM.com

new alerts:

* 4 for Result of multiplication cast to wider type

* 2 for Dereferenced variable may be null

* 1 for Useless comparison test

* 1 for Unused format argument

* 1 for Missing format argument

Fixed these in the latest patch.

@FrankChen021
Copy link
Member

@gianm I think introducing columnar-based format is very helpful.

One thing I don't understand from intuition is that why the row-based format improves the final query performance because this kind of data format converting on large amount of data could be time-consuming.

In ClickHouse, columnar format is always used to transfer intermidiate sub-query result between servers, and when the final result is returned to client in row-based format, we find that converting from columnar formart to row-based format and serializing each field into text format is very time consuming.

Could you show me where the columnar data format is converted into row-based format?

@gianm
Copy link
Contributor Author

gianm commented Jul 6, 2022

@FrankChen021 good question! The advantage of the row-based format is that you can sort and merge it really fast. When there's a sort key, that's at the beginning of each row, and is designed in such a way that it can be compared as bytes. So data can be sorted using a single memory comparison, no matter the key length, without any decoding or deserialization. With columnar data, sorting requires at least one separate memory access per key part, and generally also requires decoding prior to comparison.

Sorted streams of frames can also be merged really fast, using a min-heap of input frames using a memcmp-based comparator. I tried implementing this sort-and-merge stuff with both columnar and row-based frames, and found row-based was 2–3x faster. The code for this isn't in this patch set, but it would be part of the next one.

So, the idea is we can use row-based frames when they have an advantage, and columnar frames when they don't. (I expect columnar frames would be faster for most ops that aren't comparison-related.)

Some relevant code in this patch:

  • RowBasedFrameWriter implements creation of row-based frames.
  • StringFieldWriter is an example of the field writers used to create row-based frames. Note that it is able to avoid serialization and deserialization: if the DimensionSelector it's reading from supports direct utf8 access, then it uses lookupNameUtf8 (by way of FrameWriterUtils.getUtf8ByteBufferFromStringSelector) to copy the input data without a serde round trip.
  • FrameComparisonWidgetImpl implements memcmp based comparison of row-based frames.

@gianm
Copy link
Contributor Author

gianm commented Jul 6, 2022

The jdk15 tests are failing because of an issue with AllocateDirectMap from datasketches-memory. The patch uses this library to map frame files > 2GB in size, because MappedByteBuffer can't handle them. The library only formally supports up through jdk13, and it appears that in this case, it's using a method that is no longer available in jdk15. I'll need to find a solution to this. The rest of the tests are passing, so this doesn't need to block review.

@gianm
Copy link
Contributor Author

gianm commented Jul 6, 2022

For now I've skipped the datasketches-memory mapping tests on JDK 14+, since we don't officially support those versions yet. These versions are able to process frame files up to 2GB in size, but won't be able to process larger ones. I'll continue looking into better options that allow things to work fully on higher JDK versions.

Copy link
Contributor

@paul-rogers paul-rogers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! This is a ton of new functionality.
Here are a few initial comments as I wrap my brain around the frame abstraction.

* Frames are written with {@link org.apache.druid.frame.write.FrameWriter} and read with
* {@link org.apache.druid.frame.read.FrameReader}.
*
* Frame format:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we consider using Protobuf or Thrift for the header portion? With a home-grown format, we will add code over time to handle version compatibility. Protobuf does that for us already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not consider it at the time but I'm considering it now. I definitely see the benefit of using a format that allows easier evolution. My immediate thought is we don't use protobuf or thrift for anything in core right now and I hesitate to add them just for this. We do use JSON/Smile for headers in the segment format. Maybe that would be OK here. Frames are going to be relatively large (I expect 1–8MB will be typical) so the overhead of reading one few-bytes-long Smile header should be minimal, if not negligible. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this as-is for now. But I added this comment:

Note to developers: if we end up needing to add more fields here, consider introducing a Smile (or Protobuf, etc) header to make it simpler to add more fields.

*
* This operation allocates memory on-heap to store the decompressed frame.
*/
public static Frame decompress(final Memory memory, final long position, final long length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This static method means that there is only one way to decompress. It requires that the decompressor itself be static. Should the codec as a separate class that contains the compressor, decompressor, and methods to do the work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point. We have an interface for this: CompressionStrategy. There's implementations for LZF, LZ4, ZSTD, and UNCOMPRESSED. I didn't use it because it's oriented around ByteBuffer, not Memory, and some work would be needed to make it work properly on Memory. I also didn't see a case where we'd actually want to use any non-LZ4 compressor at this time.

I'm thinking that if we don't migrate CompressionStrategy to Memory now, we should at least future-proof this compressed frame format by adding an additional byte for compression strategy. (CompressionStrategy impls are identified by a byte code.) It would always be LZ4 (0x1) at first, so it's compatible with a world where CompressionStrategy takes over.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a byte at the start of the compressed format to indicate the compression strategy. This will be helpful if we want to implement additional compression strategies in the future. However, right now, it's always going to be LZ4 (0x1), since I didn't do the work to migrate CompressionStrategy to Memory.

* - NNN bytes: LZ4-compressed frame
* - 8 bytes: 64-bit xxhash checksum of prior content, including 16-byte header and compressed frame, little-endian long
*/
public class Frame
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Frame is concrete. It appears to assume heap memory. One could imagine wanting a direct memory version for better memory control. Should this class be an interface (or abstract) with, at present, one concrete implementation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related: what is the memory management policy for a frame? When on heap, GC will handle releasing memory. If in direct memory, how is ownership handled?

Copy link
Contributor Author

@gianm gianm Jul 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It uses Memory, a class that can be either on-heap, "direct" off-heap memory, or regions of a memory-mapped file. So the abstraction you're inquiring about is bundled within the Memory interface.

As to memory management, my expectation is whoever created the Memory is responsible for disposing of it, if that is necessary. I'll add this comment:

Frames wrap some {@link Memory}. If the memory is backed by a resource that requires explicit releasing, such as direct off-heap memory or a memory-mapped file, the creator of the Memory is responsible for releasing that resource when the frame is no longer needed.

private static class Selector implements DoubleColumnSelector
{
private final Memory dataRegion;
private final ReadableFieldPointer fieldPointer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments would be helpful here.

At the memory level, a field pointer might point to the byte offset of the data. We see this in getDouble() below. That then requires something to advance the pointer 4 + 1 bytes per value.

It can be handy to have a "row" context pointer. If we know we're reading row 5, we do the math: 5 * (4 + 1) = 25 is the byte offset.

The row form moves the field width calculations into the reader: we don't need another layer that knows how much to advance the value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments would be helpful here.

I added some brief comments. Please let me know if you had something else in mind.

It can be handy to have a "row" context pointer. If we know we're reading row 5, we do the math: 5 * (4 + 1) = 25 is the byte offset.

With the way the code is currently structured, the reader doesn't need to know it's part of a row at all, and doesn't need to be aware of the row format. I thought that was a nice property. Please let me know if I'm missing something.

}

@Override
public long writeTo(final WritableMemory memory, final long position, final long maxSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example of the above comment. Here, the caller has to keep track of the offset for each field, and update that offset using the returned length. That's quite a bit of accounting if we have, say, 100 fields!

This class appears to be static: it accepts the memory, the position and a value (from a selector?) but yet another thing keeps track of offsets.

Might be cleaner if this object is created per field and tracks offsets internally. If we assume sequential writes (that is, row-by-row), then the write is only moving forward: works for fixed-width and variable-width values. If we allow random writes (not sure how we'd do that), can we explain the logic?

Did we consider using a row offset so we can do the math for fixed fields as above? For variable-width fields, is there

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RowBasedFrameWriter is the main place this is used; it uses the return value of these field-writer functions (bytes written) to increase a single bytesWritten pointer that it tracks. It seems clean to me but please let me know if I'm missing something.

return rowPointer.position() + (long) Integer.BYTES * fieldCount;
} else {
return rowPointer.position() + memory.getInt(rowPointer.position() + (long) Integer.BYTES * (fieldNumber - 1));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment to explain this logic would be helpful. A quick read suggests that field 0 is some how special. For any other field, we get an indirection from our memory. For field 0, we seem to get a position at the end of...what? Maybe a small memory layout diagram would help?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments here, and added a format description to RowReader.

Quick answer is: a row with N fields is comprised of N pointers (to the end of each field), followed by the N fields concatenated togther.


/**
* Reads fields written by {@link StringFieldWriter} or {@link StringArrayFieldWriter}.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a bit of discussion about the data layout? Is the underlying string a string? Or, an index into a dictionary, as in segments? For a string, what is the layout? Size + data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair! I added some details to all the Reader implementations, and linked to them from the Writers, to make them easier to find. Quick note for this one: it's an actual string, done that way to enable sorting on keys as bytes directly. (So, we don't want indirection.)

return true;
}

private static class Selector implements DimensionSelector
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a note about the relationship between a reader and a selector? A naive reader (me) would expect them to be the same thing. I have a string reader and an index. As I advance the index, I can read strings from string columns. A brief not on the model used here would be helpful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Selectors are core Druid things used by the query engines and various other stuff. The main two nonvectorized ones are ColumnValueSelector and DimensionSelector. Their javadocs have some notes about what they are and how they are used. So, this is provided so we have a way to plug frame data into all the other Druid stuff that works with selectors.

public int getValueCardinality()
{
return CARDINALITY_UNKNOWN;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of these attributes seem related more to the type of the data than the actual data. Should there be a reader which can deliver strings, and can provide an object with the type info? That is, separate the representation into data and metadata?

Copy link
Contributor Author

@gianm gianm Jul 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is implementing DimensionSelector, an interface we've had since time immemorial. It's used for reading string and multi-value string fields/columns. I think @clintropolis has been working on teasing some of these things apart, as part of the effort to enable dictionary coding and indexing for things that aren't strings. So he may be able to comment more on future plans for this interface. In this patch, I'm taking it as it is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got a bit side-tracked with some other stuff and haven't got back to DimensionSelector yet, but still thinking about it in the back of my mind. I mainly want to do 2 things to it, 1) decouple the dictionary encoded selector part from strings so we can make more easily make dictionary encoded selectors for other column types and use them in places, and 2) split out the 'use it like an array' usages from the dictionary encoded uses, since its the reason we have to decorate these selectors with stuff like the cardinality and methods like 'nameLookupPossibleInAdvance', so that callers can check to see what type of selector they got so they know what they can do with it. I would agree the selector itself might not be the best place for this stuff to live, so will keep this in mind.

@paul-rogers
Copy link
Contributor

@gianm , thanks for the PR description! Since there are no uses of this code yet, can you perhaps describe how it might be used? Or, point to some UTs? For example, how might I use a frame to write a set of 10 rows, with a schema that includes a long (fixed-width), string (variable-width), array (variable-length arrays of variable-width values) and complex (fixed-width, but non-scalar values)? Then, how would I read those back? Does my writable frame have to get converted to a readable one somehow? How does that work? Thanks!

@paul-rogers
Copy link
Contributor

@FrankChen021, @gianm, comment on the row vs. columnar discussion. Over on Apache Drill, our internal "value vector" formats are roughly similar to Gian's columnar frame. What we found was that, for local operators such as project, select and so on, columnar is fast. (Adding or removing a column requires adding or removing just one vector, the rest are unchanged.)

Filtering is a wash. Yes, we can apply a predicate only on the target columns, but we still have to copy the entire row when removing the "misses". That is actually more costly with columns than with rows because of all the per-column twiddling needed.

The killer, however, is on exchanges, such as a hash exchange. You have a batch of, say 1000 rows. You want to send it to 100 different receivers. You've got to do row-based slices based on hash, then build up an "outgoing" batch. If you buffer, say, 100 rows for each of your 100 servers (so the batch is efficient), you end up buffering 10K rows, and causing a pipeline stall.

Our theory is that row-based exchanges (as we had in Apache Impala) would be more efficient: buffer when you can, but if x ms. goes by with no additional data, send what you have to keep things flowing. A row based format makes this more feasible. (Obviously lots of details that I'm skipping over.)

There was a very lively debate on this over at the Drill project, if anyone is interested.

In short, I like the approach of having both options in the frame abstraction: it provides tuning options. Even better would be if the surrounding code could be agnostic: a column writer is a column writer, independent of format. (There may be two different implementations, but the API is the same.)

That way, a planner could decide when it is faster to be row-based vs. column based for any given operator.

@gianm
Copy link
Contributor Author

gianm commented Jul 6, 2022

@gianm , thanks for the PR description! Since there are no uses of this code yet, can you perhaps describe how it might be used? Or, point to some UTs? For example, how might I use a frame to write a set of 10 rows, with a schema that includes a long (fixed-width), string (variable-width), array (variable-length arrays of variable-width values) and complex (fixed-width, but non-scalar values)? Then, how would I read those back? Does my writable frame have to get converted to a readable one somehow? How does that work? Thanks!

Sure! For writing, check out FrameSequenceBuilder in the tests. It creates frames from a ColumnSelectorFactory that comes from a Cursor. The ColumnSelectorFactory interface (or its vectorized cousin, VectorColumnSelectorFactory) is generally what we use to interface with data things. There's implementations for regular segments, realtime segments, lists of java objects, etc.

In your specific example, it'd depend where the 10 rows came from. If they were in a regular segment, you'd want to get the ColumnSelectorFactory by calling QueryableIndexStorageAdapter.makeCursors. If they were java objects, you'd want to use a RowBasedColumnSelectorFactory.

Then, for reading, check out FrameReader. The most straightforward thing to do is frameReader.makeCursorFactory(frame).makeCursors(...). This gets you another Cursor, where you can get a ColumnSelectorFactory, and the cycle may begin anew.

@gianm
Copy link
Contributor Author

gianm commented Jul 6, 2022

Does my writable frame have to get converted to a readable one somehow? How does that work?

When a FrameWriter initially builds up a frame row-by-row, it does that using memory allocated from an arena. When done, it needs be converted to the format described in the class-level javadoc for Frame. That's done by calling FrameWriter.writeTo or FrameWriter.toByteArray.

Once it's in that format, it can be wrapped with Frame.wrap and read however you want. It can also be modified in limited ways in-place: for example, the FrameSort class sorts frames in-place.

@gianm
Copy link
Contributor Author

gianm commented Jul 6, 2022

@paul-rogers thanks for the color from experience in Drill. It makes sense that exchanges/shuffles would be a good use for row-based frames: sorting, hashing, and copying are all things that work best if you can reference the entire row as a contiguous region in memory.

I'm interested in a pointer to the JIRA or dev list thread(s) you're referring to.

@paul-rogers
Copy link
Contributor

@gianm, The Drill discussion I mentioned was spread across a number of venues, and was mixed in with the perennial "should Drill use Arrow" discussion. Some "lessons learned" appear in the discussion of this PR. The discussion moved to this issue. There were no conclusions, just varying points of view; observations about reality, and the continued aspirations of the magical powers of columnar formats.

Druid is different (it is worth repeating), so the specific issues don't apply. The two key points that are relevant:

  • Columnar is said to be faster, especially with non-null data, stored in direct memory and with operations that can be compiled down to SIMD instructions, such as simple aggregations. Though, in practice, SQL uses nulls, our tools use Java, and only Gandiva has provided a SIMD implementation.
  • Columnar can lead to very poor performance during shuffles (exchanges) at scale because of the buffering issues mentioned above.

There is nothing from that discussion that would cause this PR to change. Just some "school of hard knocks" learning that we want to avoid repeating as we move ahead.

@paul-rogers
Copy link
Contributor

@gianm, thanks for the explanation! Having just suffered though trying to find info in a PR from many months back, I wonder if we can capture these answers as either Javadoc, or a brief README.md, in the frame package? Would be super helpful for others as they dive into the code.

@gianm
Copy link
Contributor Author

gianm commented Jul 7, 2022

@gianm, thanks for the explanation! Having just suffered though trying to find info in a PR from many months back, I wonder if we can capture these answers as either Javadoc, or a brief README.md, in the frame package? Would be super helpful for others as they dive into the code.

Yeah, that's a good idea. I'm planning to go through and add some explanation about the field/column formats. In addition to that I'm down to add some info on why we have both row/columnar formats. Right now the Frame class javadoc seems like a good place. That's the most central thing to this project.

@fjy
Copy link
Contributor

fjy commented Jul 7, 2022

Putting in my +1 as this code has already gone through extensive testing at scale and otherwise.

@Test
public void test_makeColumnValueSelector_singleString_notArray()
{
writeToMemory(Collections.singletonList("foo"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the unit tests! Tedious to write, but a great way to catch issues and document the API.

Should there be tests that verify more than one value? What happens if there are zero values? What is the max quantity? What happens when we hit this limit? Or, is there no limit and we keep adding memory? Should we test at least a few generations of the "add memory" use case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, all the test in this area are for a single column. Do we have tests that show how to write a row of, say, a string, long and array? That would show how we coordinate the offsets, how we write the values together, and how we read the entire row.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have tests that show how to write a row of, say, a string, long and array? That would show how we coordinate the offsets, how we write the values together, and how we read the entire row.

There isn't for the field writers, since the field writers aren't aware of the row format. But! For an example, check out RowBasedFrameWriter#writeDataUsingFieldWriters, the main production usage.

Copy link
Contributor

@LakshSingla LakshSingla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have gone through the code once (excluding the tests). Leaving some comments (read: doubts) as a newbie, mostly pertaining to the readability of the code.

regionEnd = memory.getLong(HEADER_SIZE + rowOrderSize + (long) regionNumber * Long.BYTES);

if (regionEnd < regionStart || regionEnd > numBytes) {
throw new ISE(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should these be IAE as above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Sure. I changed them.

final int numRegions = memory.getInt(Byte.BYTES + Long.BYTES + Integer.BYTES);
final boolean permuted = memory.getByte(Byte.BYTES + Long.BYTES + Integer.BYTES + Integer.BYTES) != 0;

if (numBytes != memory.getCapacity()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we have extracted the necessary information required to generate the Frame object above. Should the code detecting the correctness of the frame be extracted into a separate method and be called here? That would also allow for quicker testing in case that is desirable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I moved it to a new validate method. The code looks cleaner now.

import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.write.RowBasedFrameWriter;
import org.apache.druid.segment.data.ReadableOffset;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is Javadoc for this class required? I am unable to understand the update() method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a few comments. Hopefully they make sense. Please let me know if it's still murky.

*
* @return true if reservation was successful, false otherwise.
*/
public boolean reserve(final int bytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it more appropriate to rename reserve() to reserveAdditional() or equivalent. While the Javadoc specifies that it is the amount of space available after the cursor, it might not be apparent from the name. (For reference, in realloc() in C we specify the total size that we want to reallocate the existing chunk to and not the diff)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that makes sense. I renamed it to reserveAdditional and added some more notes about usage.

* Rewinds the cursor a certain number of bytes, effectively erasing them. This number of bytes must not exceed
* the current block. Typically, it is used to erase the memory most recently advanced by {@link #advanceCursor}.
*/
public void rewindCursor(final int bytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a doubt here: Since the usage of AppendableMemory is such that we want to dynamically allocate memory on the fly, it seems unfair to me that the callee of this method is expected to remember the last block size allocated and not the total memory allocated in the AppendableMemory. As per my intuition, blocks are a concept that should remain internal to the AppendableMemory, and rewind should also allow rewinding past the current block. Any thoughts on this?

(The additional blocks would stay as is, and not be freed, since we have just rewound the cursor. This would mean advanceCursor would also need some logic updation)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expected usage is that callers will call reserveAdditional, then write some stuff, then call advanceCursor. Then, they might want to undo it. (This happens when undoing the most recent row written.) Callers don't need to undo more than the most recent call to advanceCursor, so the block boundary thing isn't a problem in practice. I wrote some javadocs clarifying this.

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm 🤘 I did a rather detailed review and didn't spot anything obvious that needs to be changed now, and between the test coverage looking pretty good and the fact nothing is really using it yet, it seems pretty safe to merge.

I've many thoughts on the stuff in this PR that I'm still formulating, but none of them blockers, more just thinking forward how we can consolidate and solidify some stuff in the future.

I think there is a fair bit of overlap here with some of the goals the TypeStrategy stuff I previously added to allow expressions to participate in buffer/vector aggregators, though its primarily based on ByteBuffer while this stuff is all using Memory, and your stuff has a cooler approach to comparison which leads to some subtle differences in how values are stored (particularly double and float in the row based frames). Beyond the ByteBuffer difference, consolidation might also be a bit tricky because that stuff is in core while this stuff needs to be in processing because the selectors are tied to the writers/readers, though that could probably be shuffled a bit? Or maybe we can merge core into processing? heh

TypeStrategy does have more complete handling of arrays, since it delegates writing array contents to the TypeStrategy of whatever is inside the array, where as here we currently only have support for string arrays. I think this is fine for now given that true array columns don't yet exist and the initial use of the stuff here is mainly in service of SQL insert statements where a string array is translated into a multi-value string column, but think it does mean there is some limitations using this stuff for numeric and other types of arrays which can be constructed at query time with expressions that we will need to think about how to deal with in the near future if I understood what i saw correctly.

I am actually planning on adding true array of literals (strings/numbers) typed columns within my nested data columns in the near future, which I was also thinking would be useful to add as regular columns as well, so at that point it would be possible to have true arrays in segments and maybe that's the appropriate time to try to further solve the array problem.

Also there is perhaps another opportunity to share some stuff with the compressed frames stuff in this PR and the variable sized blob compression column stuff i have in #12753, though I haven't really thought quite enough about it to know if it should be actually shared or not, since slight differences and for a bit different purposes, but mechanically it needs to do quite similar things so worth looking closer at i think.

Anyway, cool stuff 🚀 so I'll go ahead and leave a +1, but definitely will keep looking over this stuff and thinking about the road ahead.

*/
public class ComplexFieldWriter implements FieldWriter
{
public static final byte NULL_BYTE = 0x00;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you explained to me offline the reason for not using the constants from NullHandling, but it might be worth leaving a comment here about why - the values being flipped so that nulls come first in a compare. Also might be nice for these to be shared constants across all writers? Other field writers define their own copies of this, and the column writers just use inline 0 or 1 values...

Copy link
Contributor Author

@gianm gianm Jul 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments about why the values were chosen.

As to whether the field writers should/shouldn't have their own versions. I dunno. I thought it was cleaner and more self contained for each field writer to define its own constants, even if they ended up all being the same. I think sharing them would be fine too. I'd like to leave it this way since I don't think it makes a big difference, and I slightly prefer it this way.


if (compressionBuffer == null || compressionBuffer.capacity() < requiredSize) {
// Re-allocate a larger buffer.
compressionBuffer = ByteBuffer.allocate(requiredSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder if this should this use the compression buffer allocation stuff, or did you not want to deal with tracking closing stuff?

I'm unsure exactly if some compression strategies require specific buffer types, I know this came up recently in zstd PR #12408 about it needing direct buffers, but recall that we adjusted that to work with heap memory too. I don't fully remember if lzf has strong opinions on buffer type or not, and I guess it doesn't really matter too much right now since stuff is fixed to use lz4.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I figured that we're only using LZ4 now, so there wasn't a need to bring CompressionStrategy into it. There's definitely some potential follow up work around making the compressors all work with Memory and having their buffer allocation fit in better here.

public int getValueCardinality()
{
return CARDINALITY_UNKNOWN;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got a bit side-tracked with some other stuff and haven't got back to DimensionSelector yet, but still thinking about it in the back of my mind. I mainly want to do 2 things to it, 1) decouple the dictionary encoded selector part from strings so we can make more easily make dictionary encoded selectors for other column types and use them in places, and 2) split out the 'use it like an array' usages from the dictionary encoded uses, since its the reason we have to decorate these selectors with stuff like the cardinality and methods like 'nameLookupPossibleInAdvance', so that callers can check to see what type of selector they got so they know what they can do with it. I would agree the selector itself might not be the best place for this stuff to live, so will keep this in mind.

@gianm
Copy link
Contributor Author

gianm commented Jul 8, 2022

@clintropolis yeah the similarities between what is needed here and the TypeStrategy stuff are interesting. I didn't end up using TypeStrategy because of the unique requirements around comparisons, and also because of wanting to provide selectors. However I agree some thought about how to bring these things closer together would be good. Maybe expressions could use these field writers? And we could update the field writers to support numeric arrays?

@paul-rogers
Copy link
Contributor

@gianm, thanks for the explanations. LGTM (non-binding).

@LakshSingla
Copy link
Contributor

Thanks for addressing the comments. Nonbinding LGTM 💯 .

Copy link
Contributor

@vogievetsky vogievetsky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The web console part looks 👍

@vogievetsky vogievetsky merged commit 9c925b4 into apache:master Jul 9, 2022
@gianm gianm deleted the frames branch July 9, 2022 04:19
gianm added a commit to gianm/druid that referenced this pull request Aug 2, 2022
Follow-up to apache#12745. This patch adds three new concepts:

1) Frame channels are interfaces for doing nonblocking reads and writes
   of frames.

2) Frame processors are interfaces for doing nonblocking processing of
   frames received from input channels and sent to output channels.

3) Cluster-by keys, which can be used for sorting or partitioning.

The patch also adds SuperSorter, a user of these concepts, both to
illustrate how they are used, and also because it is going to be useful
in future work.

Central classes:

- ReadableFrameChannel. Implementations include
  BlockingQueueFrameChannel (in-memory channel that implements both interfaces),
  ReadableFileFrameChannel (file-based channel),
  ReadableByteChunksFrameChannel (byte-stream-based channel), and others.

- WritableFrameChannel. Implementations include BlockingQueueFrameChannel
  and WritableStreamFrameChannel (byte-stream-based channel).

- ClusterBy, a sorting or partitioning key.

- FrameProcessor, nonblocking processor of frames. Implementations include
  FrameChannelBatcher, FrameChannelMerger, and FrameChannelMuxer.

- FrameProcessorExecutor, an executor service that runs FrameProcessors.

- SuperSorter, a class that uses frame channels and processors to
  do parallel external merge sort of any amount of data (as long as there
  is enough disk space).
gianm added a commit to gianm/druid that referenced this pull request Aug 2, 2022
Follow-up to apache#12745. This patch adds three new concepts:

1) Frame channels are interfaces for doing nonblocking reads and writes
   of frames.

2) Frame processors are interfaces for doing nonblocking processing of
   frames received from input channels and sent to output channels.

3) Cluster-by keys, which can be used for sorting or partitioning.

The patch also adds SuperSorter, a user of these concepts, both to
illustrate how they are used, and also because it is going to be useful
in future work.

Central classes:

- ReadableFrameChannel. Implementations include
  BlockingQueueFrameChannel (in-memory channel that implements both interfaces),
  ReadableFileFrameChannel (file-based channel),
  ReadableByteChunksFrameChannel (byte-stream-based channel), and others.

- WritableFrameChannel. Implementations include BlockingQueueFrameChannel
  and WritableStreamFrameChannel (byte-stream-based channel).

- ClusterBy, a sorting or partitioning key.

- FrameProcessor, nonblocking processor of frames. Implementations include
  FrameChannelBatcher, FrameChannelMerger, and FrameChannelMuxer.

- FrameProcessorExecutor, an executor service that runs FrameProcessors.

- SuperSorter, a class that uses frame channels and processors to
  do parallel external merge sort of any amount of data (as long as there
  is enough disk space).
gianm added a commit that referenced this pull request Aug 5, 2022
* Frame processing and channels.

Follow-up to #12745. This patch adds three new concepts:

1) Frame channels are interfaces for doing nonblocking reads and writes
   of frames.

2) Frame processors are interfaces for doing nonblocking processing of
   frames received from input channels and sent to output channels.

3) Cluster-by keys, which can be used for sorting or partitioning.

The patch also adds SuperSorter, a user of these concepts, both to
illustrate how they are used, and also because it is going to be useful
in future work.

Central classes:

- ReadableFrameChannel. Implementations include
  BlockingQueueFrameChannel (in-memory channel that implements both interfaces),
  ReadableFileFrameChannel (file-based channel),
  ReadableByteChunksFrameChannel (byte-stream-based channel), and others.

- WritableFrameChannel. Implementations include BlockingQueueFrameChannel
  and WritableStreamFrameChannel (byte-stream-based channel).

- ClusterBy, a sorting or partitioning key.

- FrameProcessor, nonblocking processor of frames. Implementations include
  FrameChannelBatcher, FrameChannelMerger, and FrameChannelMuxer.

- FrameProcessorExecutor, an executor service that runs FrameProcessors.

- SuperSorter, a class that uses frame channels and processors to
  do parallel external merge sort of any amount of data (as long as there
  is enough disk space).

* Additional tests, fixes.

* Changes from review.

* Better implementation for ReadableInputStreamFrameChannel.

* Rename getFrameFileReference -> newFrameFileReference.

* Add InterruptedException to runIncrementally; add more tests.

* Cancellation adjustments.

* Review adjustments.

* Refactor BlockingQueueFrameChannel, rename doneReading and doneWriting to close.

* Additional changes from review.

* Additional changes.

* Fix test.

* Adjustments.

* Adjustments.
@abhishekagarwal87 abhishekagarwal87 added this to the 24.0.0 milestone Aug 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants