Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataTable V3 implementation and measure data table serialization cost on server #6710

Merged
merged 1 commit into from Apr 2, 2021

Conversation

mqliang
Copy link
Contributor

@mqliang mqliang commented Mar 23, 2021

Description

This PR:

  • Add a positional data section to the tail of data table, bump up data table version to V3
  • Data in the positional data section is supposed to be key/value pairs, and data are supposed to be positional(value of a given key is locatable even after serialization), so use String[] to store keys and use enum to store keys.
  • Currently we only have one KV pair (response_serialization_cost) in positional data section. But if we add more KV pairs, we can add some utility function such as getOffsetForValueOfGivenKey() to locate the value of given key.
  • measure data table serialization cost on server and put the cost in the positional data section.

Upgrade Notes

Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)

  • Yes (Please label as backward-incompat, and complete the section below on Release Notes)

Does this PR fix a zero-downtime upgrade introduced earlier?

  • Yes (Please label this as backward-incompat, and complete the section below on Release Notes)

Does this PR otherwise need attention when creating release notes? Things to consider:

  • New configuration options
  • Deprecation of configurations
  • Signature changes to public methods/interfaces
  • New plugins added or old plugins removed
  • Yes (Please label this PR as release-notes and complete the section on Release Notes)

Release Notes

If you have tagged this as either backward-incompat or release-notes,
you MUST add text here that you would like to see appear in release notes of the
next release.

If you have a series of commits adding or enabling a feature, then
add this section only in final commit that marks the feature completed.
Refer to earlier release notes to see examples of text

Documentation

If you have introduced a new feature or configuration, please add it to the documentation as well.
See https://docs.pinot.apache.org/developers/developers-and-contributors/update-document

@codecov-io
Copy link

codecov-io commented Mar 23, 2021

Codecov Report

Merging #6710 (636ec0e) into master (8dbb70b) will decrease coverage by 8.00%.
The diff coverage is 83.86%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #6710      +/-   ##
==========================================
- Coverage   73.83%   65.82%   -8.01%     
==========================================
  Files        1396     1405       +9     
  Lines       67765    68161     +396     
  Branches     9807     9853      +46     
==========================================
- Hits        50035    44870    -5165     
- Misses      14485    20100    +5615     
+ Partials     3245     3191      -54     
Flag Coverage Δ
integration ?
unittests 65.82% <83.86%> (-0.18%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...org/apache/pinot/common/utils/CommonConstants.java 21.15% <ø> (-13.47%) ⬇️
...e/pinot/core/common/datatable/DataTableImplV2.java 0.00% <0.00%> (-89.46%) ⬇️
...core/operator/blocks/IntermediateResultsBlock.java 76.21% <0.00%> (-5.41%) ⬇️
...core/query/executor/ServerQueryExecutorV1Impl.java 46.19% <0.00%> (-33.70%) ⬇️
...e/pinot/core/transport/InstanceRequestHandler.java 55.88% <0.00%> (-22.06%) ⬇️
...pinot/server/starter/helix/HelixServerStarter.java 0.00% <0.00%> (-51.99%) ⬇️
...e/pinot/core/query/reduce/BrokerReduceService.java 68.54% <33.33%> (-25.81%) ⬇️
...che/pinot/core/query/scheduler/QueryScheduler.java 68.96% <66.66%> (-13.09%) ⬇️
...pinot/core/common/datatable/DataTableImplBase.java 76.53% <76.53%> (ø)
.../pinot/core/common/datatable/DataTableBuilder.java 86.72% <85.71%> (-0.32%) ⬇️
... and 366 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 8dbb70b...636ec0e. Read the comment docs.

import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;


public class DataTableImplV2 implements DataTable {
private static final int VERSION = 2;
public class DataTableImplV2V3 implements DataTable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) suggest not including the version name in class name. It should just be DataTableImpl. Tomorrow if we bump up the version to 4, then the name will be DataTableImplV2V3V4 which is undesirable

Copy link
Contributor Author

@mqliang mqliang Mar 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I name it as DataTableImplV2V3 since V2 and V3 share a lot of common logic. If V2 and V3 has major changes, as you suggest:

Since we are anyway bumping up the version, how about we move the existing metadata of key-value pairs to the end of file to keep consistency in the format. So, all the metadata stuff (aka key-value pairs) + new positional stuff can be a file footer.

If we do that, I vote for put V2 logic into DataTableImplV2 and V3 logic into DataTableImplV3, and extract common logic (e.g. serialize/de-serialize metada/dictionaryMap into DataTableUtils.java)

move the existing metadata of key-value pairs to the end of file

Actually I considered that. I also considered to make metadata as a String[] instead of Map<String, String> and make all meta data keys as enum value. Also make "serialization_cpu_times_ns" as part of metadata. In other words, "serialization_cpu_times_ns" is part of mate data and footer section only contains meta data. In this way:

  • all meta data is positional, we can replace values in metadata even after data table is serialized. (bytes of Map<String, String> is not positional because when loop over a hashmap, the order of items is not deterministic, but loop over of an array, the order is deterministic)
  • meta data previously is Map<String, String>, where we need to write keys(type string) to byte buffer. When replaces as String[], we don't write the enum constant itself. Just the value (length+bytes) corresponding to the ordinal/position of the constant. So less data is transfered between server/broker.

But if we change in this way, as I previously stated, I vote to keep the current DataTableImplV2.java as it is, and create a DataTableImplV3.java to put all V3 logic (with extracting common into DataTableUtils.java ). Otherwise, puting all V2/V3 logic in same file will make the code hard to read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss the approach again by moving the metadata to the end of the payload. I think we both are inclined towards doing that since all the metadata (existing + new) will be together in the footer.

Coming to naming, my initial suggestion of not including version was indeed because they share the logic. So tomorrow if we move to v4 and still share a lot of common logic, we can continue to retain the name DataTableImpl and not DataTableImplv2v3v4 as everything will be in the same file as long as it is readable.

I agree that moving the metadata is a change which will make some code unreadable if we try to keep everything in the same file. So yes, if we go down this path, I agree we should create a new class.

public class DataTableImplV2V3 implements DataTable {
public static final int VERSION_2 = 2;
public static final int VERSION_3 = 3;
public static final int DEFAULT_VERSION = VERSION_3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this to CURRENT_VERSION ?

@@ -61,12 +65,15 @@
private final byte[] _variableSizeDataBytes;
private final ByteBuffer _variableSizeData;
private final Map<String, String> _metadata;
// Only V3 has _positionalData
private final String[] _positionalData;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest calling this footer and add some comments on the structure of footer. Please give some example as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also update the javadocs in class DataTableBuilder because that's where the structure of the file is listed


/**
* Construct data table with results. (Server side)
*/
public DataTableImplV2(int numRows, DataSchema dataSchema, Map<String, Map<Integer, String>> dictionaryMap,
byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
public DataTableImplV2V3(int version, int numRows, DataSchema dataSchema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we passing version number to the constructor so that we can do backward compatibility tests between V2 and V3 ? Other than tests, I don't see why server should decide a version. It should always write the data table with CURRENT_VERSION


dataOutputStream.writeInt(_positionalData.length);
for (String entry : _positionalData) {
byte[] bytes = StringUtil.encodeUtf8(entry);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments on the format here would be useful. We don't write the enum constant itself. Just the value (length+bytes) corresponding to the ordinal/position of the constant. Correct ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, your understanding is correct.

@@ -344,6 +395,20 @@ public void addException(ProcessingException processingException) {
return byteArrayOutputStream.toByteArray();
}

private byte[] serializePositionalData()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually not doing the serialization to the main output stream opened by the caller toByte().
This function like the other serialization functions first writes to a temporary output stream and then converts to byte array which is returned to the caller and written to the main stream. I think the reason for doing that is upfront we don't know the length of byte[] array to allocate.

However, for this footer we can probably do different and it might be faster

  • Write a loop to go over each entry and keep a running sum of size
  • At the end of loop, allocate byte array of that size
  • Start another loop and go over each entry again and fill out the pre-allocated byte array.
  • Return the filled byte array

This will prevent the unnecessary creation of streams at lined 400,401 and then writing to them followed by converting to byte array. We can directly write to byte array. I think this can be faster.
For the other Serialization functions which follow this approach, we can fix them later outside this PR if need be

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will write a benchmark to compare these two serialization approach. If the proposed approach is better, will send a PR to address it. Create a issue to track this: #6714

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not worry about spending time on that optimization. It can always be done later is not a must have for this change.

if (version == VERSION_3 && byteBuffer.hasRemaining()) {
int positionalDataStart = variableSizeDataStart + variableSizeDataLength;
int positionalDataLength = byteBuffer.remaining();
byteBuffer.position(positionalDataStart);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are using byteBuffer.remaining() to compute the length of positional data, it implies we are treating it as a footer of specific format (name-value pairs as defined in the enum) even though we are not calling it. So, technically no other structure can come after this as we will fail to distinguish between the length of positional data + whatever comes after it. I don't think we should limit that flexibility. Even if we call this footer, let's please write the length of footer as well before line 348

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. Length of footer are written into header now

@siddharthteotia
Copy link
Contributor

siddharthteotia commented Mar 23, 2021

With the addition of new data structure in this PR, there are essentially two places in DataTable where the key-value / name-value style structure is located.

  • First is the existing DataTable metadata which is also a series of key-value pairs where key is string and value is some statistic/metric. This is towards the beginning of the byte stream
  • Second is the structure introduced in this PR which is written as a footer.

Since we are anyway bumping up the version, how about we move the existing metadata of key-value pairs to the end of file to keep consistency in the format. So, all the metadata stuff (aka key-value pairs) + new positional stuff can be a file footer.

@siddharthteotia
Copy link
Contributor

siddharthteotia commented Mar 23, 2021

With this PR, we should resolve a couple of TODOs introduced in PR #6680

  • Expose the serialization time through an API at the DataTable level and log it in QueryScheduler. You need to serialize before the logging line. Currently it is after.
  • Revisit this. The execution cpu time is not yet serialized as part of metadata. May be we can just remove line 258.

@mcvsubbu
Copy link
Contributor

Any reason we are restricting the trailer (or footer) to have only key-value pairs? We don't need to place that restriction as long as the length is also encoded up front. It can be any serialized object, right?

@mqliang
Copy link
Contributor Author

mqliang commented Mar 23, 2021

@mcvsubbu

Any reason we are restricting the trailer (or footer) to have only key-value pairs? We don't need to place that restriction as long as the length is also encoded up front. It can be any serialized object, right?

You are right, it can be any serialized object, but restricting to only contains KV pairs has following benefit:

  • Any object can be add as a KV pair, just: (key, serialized_object). So it's easy to add new section to footer in future.
  • For all KV pairs in footer, put their keys in enum, so when serialize footer, the order of KV pairs is deterministic. This make all KV pairs is positional/locatable. So we are able to replace value of a given key in footer even after serialized.
  • If we want to add a new object into data table. If we are OK to put it as a KV pair into footer, we don't need to bum up version Here is the pseudocode of serialize/de-serialize footer:
enum footerkeys {
	k0,
	k1,
	k2,
}

String footerkeysToStr = new String[]{
	"k0",
	"k1",
	"k2",
}

function serializeFooter() {
 	byte[] bytes;
 	for (key in footerkeys) {
 	    String data = encode_to_str(value_of_key(key));
 	    bytes = append(bytes, len(data));
 	    bytes = append(bytes, data.toBytes());
 	}
}

function String[] deSerializeFooter(byte[] bytes) {
	String[] values = new String[len(footerkeys)];
	for (int i = 0; i < len(footerkeys); i++) {
	   int data_len = bytes.nextInt();
	   values[i] = bytes.nextBytesofLens(data_len);
	}
}

// If values_i is a complex object instead of a string, we can deserialize it even further:
    String[] footerKVpairs = deSerializeFooter(bytes);
	Object_i = deserialize(footerKVpairs[i].toBytes());

So, if we want to add new object to footer, add it as KV pair, and as long as we add the key as the last one of the enum, old broker will just ignore the extra one, it's back-compatable).

If we make footer not only contains KV pairs, but also other arbitrary serializable objects:

+------------------------------------+
|     
|    serializable object 1
|
+------------------------------------
|
|    serializable object 2
|
+------------------------------------
|
|    KV pairs
|
+------------------------------------

It's not extensible: If we wanner add a serializable_object_3 in between of serializable_object_2 and KV_pairs, we need to bump up version (If we bump version, we can also add in to the middle of data table, not necessarily in footer).

That's the reason I prefer footer only contains KV pairs: If we want to add a new simple section into data table, and don't want bump up version, add it as KV pair to footer. If we want add new very complex section or re-arrange current sections, add it into the middle of data table, and bump up version.

@siddharthteotia
Copy link
Contributor

With the addition of new data structure in this PR, there are essentially two places in DataTable where the key-value / name-value style structure is located.

  • First is the existing DataTable metadata which is also a series of key-value pairs where key is string and value is some statistic/metric. This is towards the beginning of the byte stream
  • Second is the structure introduced in this PR which is written as a footer.

Since we are anyway bumping up the version, how about we move the existing metadata of key-value pairs to the end of file to keep consistency in the format. So, all the metadata stuff (aka key-value pairs) + new positional stuff can be a file footer.

KV pair might be misleading here. Within a KV pair, the value part is indeed a arbitrary serialized object. The KV concept in the footer is just to give it some structure. So, we can keep growing the footer by adding a key to the enum and then the corresponding serialized bytes into the payload

@mcvsubbu
Copy link
Contributor

@siddharthteotia , @mqliang and I met, and agreed on the following (I have added some extras, so take a look)

  • We will move the metadata to the trailer, retain the other elements in the same order.
  • We will encode the trailer as
  • = (int, int, blob)+
  • The first int is the enum ordinal, second int is the length of the blob, the third part is utf8 encoding of a string, or int/long as dictated by the enum. If int/long, then we will encode in network byte order (big-endian). Alternative is to convert it to a string.

@siddharthteotia
Copy link
Contributor

@siddharthteotia , @mqliang and I met, and agreed on the following (I have added some extras, so take a look)

  • We will move the metadata to the trailer, retain the other elements in the same order.
  • We will encode the trailer as
  • = (int, int, blob)+
  • The first int is the enum ordinal, second int is the length of the blob, the third part is utf8 encoding of a string, or int/long as dictated by the enum. If int/long, then we will encode in network byte order (big-endian). Alternative is to convert it to a string.

I think (int, int, bytes/blob in utf-8) is preferable as opposed to converting to string

@mcvsubbu
Copy link
Contributor

@siddharthteotia , @mqliang and I met, and agreed on the following (I have added some extras, so take a look)

  • We will move the metadata to the trailer, retain the other elements in the same order.
  • We will encode the trailer as
  • = (int, int, blob)+
  • The first int is the enum ordinal, second int is the length of the blob, the third part is utf8 encoding of a string, or int/long as dictated by the enum. If int/long, then we will encode in network byte order (big-endian). Alternative is to convert it to a string.

I think (int, int, bytes/blob in utf-8) is preferable as opposed to converting to string

Correct, but if the "blob" is an int or long value, then utf8 will mean long ->string->utf8 right? Alternatively, toBigEndian(longValue)

@mqliang
Copy link
Contributor Author

mqliang commented Mar 24, 2021

@mcvsubbu Just found a defect of using enum value as key and encode trailer as (int, int, bytes/blob in utf-8) :

  • We are able to add new key into the enum, without bumping up version
  • We are able to not include a key into trailer, without bumping up version
  • However, we are unable to remove a key from the enum (if the key is no long used in a future version)

Namely, say we now have three keys:

// old version:
enum {
    key1,
    key2,
    key3,
}

Now if we remove key2 from the enum since it's no longer been used.

// new version
enum {
    key1,
    key3,
}

Then, when new broker receive bytes from old server, it will interpret value of k2 as value of k3.

So a better solution is using string as key and encode trailer as (int of key length, bytes of key in utf-8, int of value length, bytes of value in utf-8). Which is exactly how we encode metadata in V2.

However, if we do it in his way, it's equivalent to just moving metadata section to the end of datatable, which does not make too much sense to bump up a version just for rearranging sections in datatable.

Let's take a step back to what we wanner solve:

  • we wanner add serialization_cost to datatable, but serialization_cost is not available before serialization.
  • we wanner keep back-comp

To add serialization_cost to datatable after serialization, basically we have two options:

  • append it to the end of bytes.
  • put a temporary value of serialization_cost when serialization, after serialization is done, replace it as the actual value.

So, here is another approach:

  • don't add a trailer section
  • put serialization_cost into metedata
  • we serialize metedata, in V2 we encode it as (int of key length, bytes of key in utf-8, int of value length, bytes of value in utf-8). Encoding in this way makes value replacement after serialization impossible, since String.valueOf("1000").length() != String.valueOf("100000").length().
  • In V3, keep all existing logic. However, if the value is long, we should encode it as (int of key length, bytes of key in utf-8, toBigEndian(longValue)). And the the function of serializaMetadata(), we can have a variable to record the start offset of serialization_cost.
bytes[] bytes;
int serialization_cost_value_start_offset;

offset = 0;
for (String key: metadata.keySet()) {
      keybytes[] = to-utf8(key);
      bytes.append(keybytes.length())
      bytes.append(keybytes)

      offset += 4;
      offset += keybytes.length

      if (key.equals("erialization_cost")) {
            serialization_cost_value_start_offset = offset;
            valuebytes = toBigEndian(value);
            bytes.append(valuebytes)
            offset += 8;
      } else {
            valuebytes = to-utf8(value);
            bytes.append(valuebytes.length())
            bytes.append(valuebytes)
            offset += 4
            offset += keybytes.length
      }
}

So after serialization, we are able to replace the value of serialization_cost (toBigEndian(longValue) is always 8 bytes, which makes replacement possible):

offset = metadataStartOffset+serialization_cost_value_start_offset
bytes[offset:offset+8] = toBigEndian(actualValue)

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High level question: why do we need this new field? We should be able to use the metadata field for this

@mqliang
Copy link
Contributor Author

mqliang commented Mar 24, 2021

@Jackie-Jiang

High level question: why do we need this new field? We should be able to use the metadata field for this

We wanner measure CPU time to serialize datatable (AKA: serialization_cost)on each server, and send it back to broker. Here is the dilemma: we will only know the CPU time after the serialization is completed, however if the serialization is already completed, how can make serialization_cost as part of the payload (it's a chicken-and-egg problem)?

To add serialization_cost to serialized bytes of datatable, basically we have two options (we don't want serialize two times):

  • append it to the end of bytes.
  • put a temporary value of serialization_cost when serialization, after serialization is done, replace it as the actual value.

No matter which options we adopt, we need bump up the version.

@mcvsubbu
Copy link
Contributor

@siddharthteotia , @mqliang and I met, and agreed on the following (I have added some extras, so take a look)

  • We will move the metadata to the trailer, retain the other elements in the same order.
  • We will encode the trailer as
  • = (int, int, blob)+
  • The first int is the enum ordinal, second int is the length of the blob, the third part is utf8 encoding of a string, or int/long as dictated by the enum. If int/long, then we will encode in network byte order (big-endian). Alternative is to convert it to a string.

Not sure which option @siddharthteotia agrees with, but the alternatives are something like:
7, 8, "12609856" (8 byte string for a number)
vs
7, 4, 12609856 (4-byte integer for a number)

Maybe we can decide based on what looks easier in code.

@mcvsubbu
Copy link
Contributor

@mcvsubbu Just found a defect of using enum value as key and encode trailer as (int, int, bytes/blob in utf-8) :

  • We are able to add new key into the enum, without bumping up version
  • We are able to not include a key into trailer, without bumping up version
  • However, we are unable to remove a key from the enum (if the key is no long used in a future version)

Namely, say we now have three keys:

// old version:
enum {
    key1,
    key2,
    key3,
}

Now if we remove key2 from the enum since it's no longer been used.

// new version
enum {
    key1,
    key3,
}

Then, when new broker receive bytes from old server, it will interpret value of k2 as value of k3.

So a better solution is using string as key and encode trailer as (int of key length, bytes of key in utf-8, int of value length, bytes of value in utf-8). Which is exactly how we encode metadata in V2.

However, if we do it in his way, it's equivalent to just moving metadata section to the end of datatable, which does not make too much sense to bump up a version just for rearranging sections in datatable.

Let's take a step back to what we wanner solve:

  • we wanner add serialization_cost to datatable, but serialization_cost is not available before serialization.
  • we wanner keep back-comp

To add serialization_cost to datatable after serialization, basically we have two options:

  • append it to the end of bytes.
  • put a temporary value of serialization_cost when serialization, after serialization is done, replace it as the actual value.

So, here is another approach:

  • don't add a trailer section
  • put serialization_cost into metedata
  • we serialize metedata, in V2 we encode it as (int of key length, bytes of key in utf-8, int of value length, bytes of value in utf-8). Encoding in this way makes value replacement after serialization impossible, since String.valueOf("1000").length() != String.valueOf("100000").length().
  • In V3, keep all existing logic. However, if the value is long, we should encode it as (int of key length, bytes of key in utf-8, toBigEndian(longValue)). And the the function of serializaMetadata(), we can have a variable to record the start offset of serialization_cost.
bytes[] bytes;
int serialization_cost_value_start_offset;

offset = 0;
for (String key: metadata.keySet()) {
      keybytes[] = to-utf8(key);
      bytes.append(keybytes.length())
      bytes.append(keybytes)

      offset += 4;
      offset += keybytes.length

      if (key.equals("erialization_cost")) {
            serialization_cost_value_start_offset = offset;
            valuebytes = toBigEndian(value);
            bytes.append(valuebytes)
            offset += 8;
      } else {
            valuebytes = to-utf8(value);
            bytes.append(valuebytes.length())
            bytes.append(valuebytes)
            offset += 4
            offset += keybytes.length
      }
}

So after serialization, we are able to replace the value of serialization_cost (toBigEndian(longValue) is always 8 bytes, which makes replacement possible):

offset = metadataStartOffset+serialization_cost_value_start_offset
bytes[offset:offset+8] = toBigEndian(actualValue)
  • Removing enums will break the protocol and is not allowed. We need to state in the comments clearly.
  • We should use a trailer instead of hacking the length. This will be applicable for streaming use cases as well

@mqliang mqliang changed the title Add a positional data section to data table and measure data table serialization cost on server Add a trailer section to data table and measure data table serialization cost on server Mar 24, 2021
@mqliang
Copy link
Contributor Author

mqliang commented Mar 24, 2021

@mcvsubbu Ready for another round of review.

commit of "implement datatable V3":

  • Add DataTableImplV3, compared with V2:

    • V3 has a trailer section, at the end of datatable
    • V3 don't have metadata sections, all KV pairs are put into trailer section
    • V3 has an exceptions section in the middle of datatable. V2 use meta data to store exceptions (use
      "Exception"+errCode as key). In V3, all key are enum value, which must be defined statically, we can not use
      "Exception"+errCode to create new keys, so use a dedicate section to store exceptions
  • Although metadata section has been removed in V3, there are many existing code use dataTable.getMetadata().get("key")/dataTable.getMetadata().set("key", "value") to set/get metadata KV pairs, to provide the same interface with V2, V3 also implement the getMetadata() method. When serialize, move all metadata into trailer section; when deserialize, move all metadata KV pair trailer section to matedata map.

  • When serialize the trailer section, for each KV pairs:

    • if value is int/long, encode it as: [keyOrdinal, bigEndianRepresentationOfValue]
    • if value is string, encode it as: [keyOrdinal, valueLength, Utf8EncodedValue]

To make review easier, will @you at where V3 is different with V2.

commit of "add responseSerializationCpuTimeNs measurement":

  • put a temporary value of serialization_cost when serialization, after serialization is done, replace it as the actual value.

* TODO(@mqliang): revise this if we decide to get/set metadata by
* datable.getTailerData(key)/datable.setTailer(key, value).
*/
private final Map<String, String> _metadata;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All metadata KV pairs are stored in trailer in V3, however, to provide the same interface with V2, V3 also implement the Map<String, String> getMedadata() method. We need to copy KV paird between _metadata and _trailer during serializaion/deserialization.

// Write trailer data (START|SIZE).
dataOutputStream.writeInt(dataOffset);
// Put all meta data into trailer.
_trailer = putAllMetaDataIntoTrailer();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcvsubbu Before serialize _trailer, we need copy all KV pairs in metadata in to trailer.

/**
* Construct data table from V2 byte array. (broker side)
*/
public DataTableImplV3(ByteBuffer byteBuffer, boolean isV2)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcvsubbu This function is used to deserialize a V2 bytes into V3 datatable object

* Metadata is actually a part of _trailer in V3 when serialize DataTable into bytes. When deserialize,
* we extract metadata from _trailer into this _metadata map to provide the same interface with V2.
* */
_metadata = extractMetadataFormTrailer();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcvsubbu After de-serialize _trailer, we need copy all metadata KV pairs in _trailer into _metadata.

* V2 stores exceptions as a bunch of KV pairs in metadata, all exceptions has key of "Exception"+errCode.
* To interpret V2 bytes as V3 object, extract exceptions from metadata.
*/
_exceptions = extractExceptionsFormV2Metadata();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcvsubbu V2 stores exceptions as a bunch of KV pairs in metadata, all exceptions has key of "Exception"+errCode. To interpret V2 bytes as V3 object, extract exceptions from metadata and put them into _exceptions

* - if value is int/long, encode it as: [keyOrdinal, bigEndianRepresentationOfValue]
* - if value is string, encode it as: [keyOrdinal, valueLength, Utf8EncodedValue]
*/
private byte[] serializeTrailer()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcvsubbu This is the code to serialize trailer.

return byteArrayOutputStream.toByteArray();
}

private Map<TrailerKeys, String> deserializeTrailer(byte[] bytes)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcvsubbu This is the code to de-serialize trailer.

@Jackie-Jiang
Copy link
Contributor

Since we are adding a new data table version, please use this opportunity to address the TODOs within the DataTableBuilder.
For the TrailerKeys enum, let's put an id for each key instead of using the ordinal of the enum. This way it is much easier to manage as long as we don't reuse the ids. Also suggest renaming it to MetadataKeys

/**
* Construct data table from V2 byte array. (broker side)
*/
public DataTableImplV3(ByteBuffer byteBuffer, boolean isV2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be cleaner if you add a data table v2 to v3 converter instead of constructing v3 directly from v2 buffer

@siddharthteotia
Copy link
Contributor

@siddharthteotia , @mqliang and I met, and agreed on the following (I have added some extras, so take a look)

  • We will move the metadata to the trailer, retain the other elements in the same order.
  • We will encode the trailer as
  • = (int, int, blob)+
  • The first int is the enum ordinal, second int is the length of the blob, the third part is utf8 encoding of a string, or int/long as dictated by the enum. If int/long, then we will encode in network byte order (big-endian). Alternative is to convert it to a string.

Not sure which option @siddharthteotia agrees with, but the alternatives are something like:
7, 8, "12609856" (8 byte string for a number)
vs
7, 4, 12609856 (4-byte integer for a number)

Maybe we can decide based on what looks easier in code.

@mcvsubbu I agree with the big endian approach in case when the value/blob part itself is fixed with int or long

@siddharthteotia
Copy link
Contributor

@mcvsubbu Just found a defect of using enum value as key and encode trailer as (int, int, bytes/blob in utf-8) :

  • We are able to add new key into the enum, without bumping up version
  • We are able to not include a key into trailer, without bumping up version
  • However, we are unable to remove a key from the enum (if the key is no long used in a future version)

Namely, say we now have three keys:

// old version:
enum {
    key1,
    key2,
    key3,
}

Now if we remove key2 from the enum since it's no longer been used.

// new version
enum {
    key1,
    key3,
}

Then, when new broker receive bytes from old server, it will interpret value of k2 as value of k3.

So a better solution is using string as key and encode trailer as (int of key length, bytes of key in utf-8, int of value length, bytes of value in utf-8). Which is exactly how we encode metadata in V2.

However, if we do it in his way, it's equivalent to just moving metadata section to the end of datatable, which does not make too much sense to bump up a version just for rearranging sections in datatable.

Let's take a step back to what we wanner solve:

  • we wanner add serialization_cost to datatable, but serialization_cost is not available before serialization.
  • we wanner keep back-comp

To add serialization_cost to datatable after serialization, basically we have two options:

  • append it to the end of bytes.
  • put a temporary value of serialization_cost when serialization, after serialization is done, replace it as the actual value.

So, here is another approach:

  • don't add a trailer section
  • put serialization_cost into metedata
  • we serialize metedata, in V2 we encode it as (int of key length, bytes of key in utf-8, int of value length, bytes of value in utf-8). Encoding in this way makes value replacement after serialization impossible, since String.valueOf("1000").length() != String.valueOf("100000").length().
  • In V3, keep all existing logic. However, if the value is long, we should encode it as (int of key length, bytes of key in utf-8, toBigEndian(longValue)). And the the function of serializaMetadata(), we can have a variable to record the start offset of serialization_cost.
bytes[] bytes;
int serialization_cost_value_start_offset;

offset = 0;
for (String key: metadata.keySet()) {
      keybytes[] = to-utf8(key);
      bytes.append(keybytes.length())
      bytes.append(keybytes)

      offset += 4;
      offset += keybytes.length

      if (key.equals("erialization_cost")) {
            serialization_cost_value_start_offset = offset;
            valuebytes = toBigEndian(value);
            bytes.append(valuebytes)
            offset += 8;
      } else {
            valuebytes = to-utf8(value);
            bytes.append(valuebytes.length())
            bytes.append(valuebytes)
            offset += 4
            offset += keybytes.length
      }
}

So after serialization, we are able to replace the value of serialization_cost (toBigEndian(longValue) is always 8 bytes, which makes replacement possible):

offset = metadataStartOffset+serialization_cost_value_start_offset
bytes[offset:offset+8] = toBigEndian(actualValue)

@mqliang @mcvsubbu I don't think we should worry about or even allow removal of enums. It is complicating the design plus it's something that is typically not allowed

@siddharthteotia
Copy link
Contributor

With the addition of new data structure in this PR, there are essentially two places in DataTable where the key-value / name-value style structure is located.

  • First is the existing DataTable metadata which is also a series of key-value pairs where key is string and value is some statistic/metric. This is towards the beginning of the byte stream
  • Second is the structure introduced in this PR which is written as a footer.

Since we are anyway bumping up the version, how about we move the existing metadata of key-value pairs to the end of file to keep consistency in the format. So, all the metadata stuff (aka key-value pairs) + new positional stuff can be a file footer.

With this PR, we should resolve a couple of TODOs introduced in PR #6680

  • Expose the serialization time through an API at the DataTable level and log it in QueryScheduler. You need to serialize before the logging line. Currently it is after.
  • Revisit this. The execution cpu time is not yet serialized as part of metadata. May be we can just remove line 258.

@mqliang , please make sure to address these TODOs

@mcvsubbu mcvsubbu added the release-notes Referenced by PRs that need attention when compiling the next release notes label Mar 30, 2021
@mcvsubbu
Copy link
Contributor

I have labelled it as backward-incompat and release-notes. Please add appropriate checkin comments mentioning that this change will be backward incompat if servers are upgraded first, so brokers must be upgraded before servers.
Also mention that the compatibility of the protocols will not be retained beyond 0.8.0 (or the next version that is released), create an issue so that we remove all V2 protocol code after 0.8.0 is released.

Copy link
Contributor

@mcvsubbu mcvsubbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test for metadata section serialize/deserialize. Be sure to examine the actual bytes in the test and not just call deserialize code. Thanks.

@@ -321,6 +321,9 @@
public static final String CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT =
"pinot.server.instance.enableThreadCpuTimeMeasurement";
public static final boolean DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT = false;

public static final String CONFIG_OF_CURRENT_DATA_TABLE_VERSION = "pinot.server.instance.currentDataTableVersion";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can retain this config forever, to be used for upgrading the protocol.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that by default protocol version is the latest (3). The config will be used to downgrade the protocol to 2 without having to rollback the server deployment if in case there are any issues.

_dataSchema = dataSchema;
_columnOffsets = new int[dataSchema.size()];
_rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets);
}

public static void setCurrentDataTableVersion(int version) {
_version = version;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throw exception if it is not one of the supported versions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* Datatable V3 implementation.
* The layout of serialized V3 datatable looks like:
* +-----------------------------------------------+
* | 13 bytes of header: |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* | 13 bytes of header: |
* | 13 integers of header: |

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

public class DataTableImplV3 extends DataTableImplBase {
private static final int HEADER_SIZE = Integer.BYTES * 13;
// _exceptions stores exceptions as a map of errorCode->errorMessage
private final Map<Integer, String> _exceptions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final Map<Integer, String> _exceptions;
private final Map<Integer, String> _errCodeToExceptionMap;


// Read metadata.
int metadataLength = byteBuffer.getInt();
byte[] trailerBytes = new byte[metadataLength];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
byte[] trailerBytes = new byte[metadataLength];
byte[] metadataBytes = new byte[metadataLength];

Let us keep the naming consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// Read metadata.
int metadataLength = byteBuffer.getInt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the case where metadataLength is 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/**
* Serialize metadata section to bytes.
* Format of the bytes looks like:
* [numEntries, bytesOfKV2, bytesOfKV2, bytesOfKV3]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is wrong description. The format is:

  • length of metadata section
  • actual metadata
    Metadata can be one of two types -- fixed (Int/long) or var length.
    A fixed length metadata is coded as: (enumOrdinal , metadata value )
    Var length metadata is coded as: (enumOrdinal, metadata length, metadata value)
    All integer values (including ordinal, etc.) are encoded in BigEndian format

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, actually the length of metadata section is written outside of this function, it's write by the caller. So the description of [numEntries, bytesOfKV2, bytesOfKV2, bytesOfKV3] here is correct. Has add comments at caller to highlight the length writing logic.

protected ByteBuffer _variableSizeData;
protected Map<String, String> _metadata;

public DataTableImplBase(int numRows, DataSchema dataSchema, Map<String, Map<Integer, String>> dictionaryMap,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add javadoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -91,11 +94,16 @@
private ByteBuffer _currentRowDataByteBuffer;

public DataTableBuilder(DataSchema dataSchema) {
_version = VERSION_3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really needed since you already define it at line# 82

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -138,7 +138,7 @@ public DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService e
String errorMessage = String
.format("Query scheduling took %dms (longer than query timeout of %dms)", querySchedulingTimeMs,
queryTimeoutMs);
DataTable dataTable = new DataTableImplV2();
DataTable dataTable = new DataTableImplV3();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems incorrect since if the protocol config is set to V2, we should not be constructing V3 data table.

I think all these places are constructing empty data table on the server right?

I think we should replace these with DataTableUtils.buildEmptyDataTable() to properly build an empty data table. Secondly, since DataTableUtils internally uses DataTableBuilder which is aware of the version so it will build an empty table based on V2 or V3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss this to see what we need to do here. Might want to cleanup the existing code first to always build empty data table in the same manner. We have 2 options

  • Add a static method to DataTableBuilder -- something like DataTableBuilder.getDefaultTable() , this internally has the version so it will either return new DataTableImplV2() or new DataTableImplV3()
  • Clean the existing code by always using DataTableUtils.buildEmptyDataTable in these situations

For option 2, I am not sure why the existing code (not this PR) is having mixed semantics for constructing empty data table. Several places are directly calling the constructor which sets everything to null whereas in one unique place we are calling DataTableUtils.buildEmptyDataTable(queryContext) to return an empty data table with properly initialized schema

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this offline with @mqliang. For now we decided to go with option 1. Add a TODO there to follow-up with a PR which unifies the the way of constructing empty data table in the same manner everywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -161,13 +163,15 @@ public void stop() {
queryRequest.getBrokerId(), e);
// For not handled exceptions
serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
dataTable = new DataTableImplV2();
dataTable = new DataTableImplV3();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use DataTableUtils.buildEmptyDataTable()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address this as per approach discussed in #6710

}

@Test
public void testV2V3Compatibility()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also add test cases for

  • v3 data table sent by server is empty
  • v3 data table sent by server has metadata length as 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v3 data table sent by server has metadata length as 0

That's impossible, since the toBytes() in V3 will always add a threadCpuTimeNs KV pair to metadata, so for V3, metadata at least contains 1 KV pair. We can add a test: empty datatable (numRow = 0); datatable whoes metadata only contains threadCpuTimeNs KV; datatable whoes metadata has multiple KV pairs, etc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mqliang the idea here is to make sure that the receiver handles things as much as possible even if sender does something weird (say, someone introduces a bug, or somehow the next rev of the protocol does something funky). See Robustmness principle: https://en.wikipedia.org/wiki/Robustness_principle

DataTable dataTableV2 = dataTableBuilderV2.build(); // create a V2 data table
// Deserialize data table bytes as V3
DataTable newDataTable = DataTableFactory.getDataTable(dataTableV2.toBytes());
Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow this test

  • server is constructing a v2 data table and serializing it
  • broker will use DataTableFactory to get the data table. How can broker get it as v3 when the version # will indicate 2 and DataTableFactory will accordingly create DataTableImplV2 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's from previous implementation, where we have a convert to convert V2 to V3. Will change the comments here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I have updated the comments.

@@ -315,7 +313,7 @@ private boolean forceLog(long schedulerWaitMs, long numDocsScanned) {
*/
protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest queryRequest,
ProcessingException error) {
DataTable result = new DataTableImplV2();
DataTable result = new DataTableImplV3();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest separating the builder and reader for v2 and v3 because we will need to remove v2 implementations in the next release

* - Always add new keys to the end.
* Otherwise, backward compatibility will be broken.
*/
enum MetadataKeys {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still suggest associating an id with each key instead of using ordinal of the enum. The convention here should be always increasing the id when adding new keys.
Id is more flexible than ordinal for the following reasons:

  • Ordinal works as always putting the index key as the id. If by any chance people accidentally change the order of the keys, it will break
  • With id, we can remove keys in a backward-compatible way in two releases if necessary. With ordinal, we have to keep a place holder so that the ordinal for other keys don't change

@mqliang @siddharthteotia @mcvsubbu Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Jackie-Jiang , I prefer enums. We can add a unit test that asserts (A < B< C ...), to catch any re-orders.
If we have to manually insert a value, then duplicate values are possible (by mistake) and that can also cause problems.

Copy link
Contributor Author

@mqliang mqliang Mar 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about let's use enum at this moment. We can discuss more, if we decide to associate an id with each key later on, as long as we associate the first key with 0, second with 1, third key with 3...The bytes send on wire will not change. We can address it in a separate PR, it's just some code level change, will not change any payloads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can argue both ways here but my preference would be enum with implicit ordinal as opposed to id based. I agree the latter gives more flexibility to the user but I don't think we need it. So a simple enum with ordinal as id along with clear javadoc highlighting the rules for updating the enum is preferable imo.

Comment on lines 115 to 116
THREAD_CPU_TIME_NS("threadCpuTimeNs"),
;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit)

Suggested change
THREAD_CPU_TIME_NS("threadCpuTimeNs"),
;
THREAD_CPU_TIME_NS("threadCpuTimeNs");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* - Always add new keys to the end.
* Otherwise, backward compatibility will be broken.
*/
enum MetadataKeys {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
enum MetadataKeys {
enum MetadataKey {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// getByOrdinal returns an optional enum key for a given ordinal
public static Optional<MetadataKeys> getByOrdinal(int ordinal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need Optional here, but either:

  • Throw exception for invalid id (suggest this way)
  • Return null for invalid id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// getByName returns an optional enum key for a given name.
public static Optional<MetadataKeys> getByName(String name) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return _name;
}

static {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this block following the map definition for better readability

Copy link
Contributor Author

@mqliang mqliang Mar 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, the code block putting here was conduct by IntellJ reformat. I'd suggest keep as it is, since assume later someone change this file and run IntellJ reformatting before commit, it will be moved to here anyway.

@@ -77,6 +77,9 @@
// TODO: 3. Given a data schema, write all values one by one instead of using rowId and colId to position (save time).
// TODO: 4. Store bytes as variable size data instead of String
public class DataTableBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest making 2 builders, one for v2 and one for v3. You can extract the common logic into a base class, or just duplicate code because we will deprecate v2 in the next release once v3 is well tested

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We plan to remove all V2 logic after the next release. So, we can keep re-factors and beautifications to a minimum. Please do only what is necessary because all of V2 logic will disappear and someone looking at the code will wonder why we have a base class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for keeping the current logic. Another drawback of having two builder is: all caller need to decide call v2 builder or v3 builder based on instance config, which is ugly.

@@ -96,6 +99,17 @@ public DataTableBuilder(DataSchema dataSchema) {
_rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't be correct because we want to fix the float value size (should be 4 but use 8 bytes in v2)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, we wanner this change focus on metadata change, I will send a separate PR to bump up version to V4, which is dedicated to address all TODOs in DataTableBuilder, including:

  • fix the float value size issue
  • Store bytes as variable size data instead of String
  • Use one map of "String->Int" for all columns, instead a one map for one column.

@@ -77,6 +77,9 @@
// TODO: 3. Given a data schema, write all values one by one instead of using rowId and colId to position (save time).
// TODO: 4. Store bytes as variable size data instead of String
public class DataTableBuilder {
public static final int VERSION_2 = 2;
public static final int VERSION_3 = 3;
private static int _version = VERSION_3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be hardcoded but from the config

Copy link
Contributor Author

@mqliang mqliang Mar 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a setCurrentDataTableVersion() static function to set versions, which is called in HelixServerStarter

/**
* Base implementation of the DataTable interface.
*/
public abstract class DataTableImplBase implements DataTable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to BaseDataTable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* Otherwise, backward compatibility will be broken.
*/
enum MetadataKey {
UNKNOWN("unknown"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is UNKNOWN really needed? Can we get rid of it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have found it useful to have one enum reserved that is never used. It is never sent by the sender, but the receiver, if needed, can set it to this value if it encounters a value that it does not know about. In that case, the special case handling is restricted to the layer that first scans the enums, and the other layers above don't need to worry about default cases.

private static final Set<MetadataKey> _intValueMetadataKey = ImmutableSet
.of(MetadataKey.NUM_SEGMENTS_QUERIED, MetadataKey.NUM_SEGMENTS_PROCESSED, MetadataKey.NUM_SEGMENTS_MATCHED,
MetadataKey.NUM_RESIZES, MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED, MetadataKey.NUM_RESIZES);
// _longValueMetadataKey contains all metadata keys which has value of long type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need _intValueMetadataKey and _longValueMetadataKey? Instead of maintaining two static maps to decide which parameter is long and which is int, can we add a member variable _type for each of the enum options? This will also allow for replacing isIntValueMetadataKey() and isLongValueMetadataKey() functions with getType()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, but wondering if we can use ColumnDataType (which is widely used already) instead of defining a new enum which more or less means the same thing? I think the ordinal position of values in ColumnDataType is already fixed (from serialization, deserialization point of view), but for safety we can add a comment their saying not to change the ordinal position.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, I see Jackie has some relate work to unify the usage of CloummDataType: #6728, he mentation that we will consider merging DataType and ColumnDataType in the future. So let's address it separately.

Comment on lines 74 to 83
super();
_numRows = 0;
_numColumns = 0;
_dataSchema = null;
_columnOffsets = null;
_rowSizeInBytes = 0;
_dictionaryMap = null;
_fixedSizeDataBytes = null;
_fixedSizeData = null;
_variableSizeDataBytes = null;
_variableSizeData = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block of code including the call to super() is redundant as Java will automatically do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

_variableSizeDataBytes = null;
_variableSizeData = null;
_metadata = new HashMap<>();
super();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor can be removed. super() is redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. super() is redundant, but default constructor is needed here since we have a DataTableImplV2(ByteBuffer byteBuffer) no-default constructor.

* Construct empty data table. (Server side)
*/
public DataTableImplV3() {
super();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call to super() is redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherwise

Comment on lines 122 to 123
this._name = name;
this._valueType = valueType;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit)

Suggested change
this._name = name;
this._valueType = valueType;
_name = name;
_valueType = valueType;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// getByOrdinal returns an optional enum key for a given ordinal or null if the key does not exist.
public static MetadataKey getByOrdinal(int ordinal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static MetadataKey getByOrdinal(int ordinal) {
@Nullable
public static MetadataKey getByOrdinal(int ordinal) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +135 to +139
public static MetadataKey getByName(String name) {
return _nameToEnumKeyMap.getOrDefault(name, null);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static MetadataKey getByName(String name) {
return _nameToEnumKeyMap.getOrDefault(name, null);
}
@Nullable
public static MetadataKey getByName(String name) {
return _nameToEnumKeyMap.get(name);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mqliang , can you please address this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;

import static org.apache.pinot.core.common.datatable.DataTableUtils.decodeString;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Code style) Avoid using static import. Same for other non-test files

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mqliang , can you please fix static imports? They are in a quite a few places

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do it in a follow-up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in #6738

}
}

public Map<String, String> getMetadata() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put override annotation over these classes that implements the interface

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mqliang , can you please address this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in #6738

/**
* Helper method to serialize dictionary map.
*/
protected byte[] serializeDictionaryMap(Map<String, Map<Integer, String>> dictionaryMap)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to have the argument. It always serializes the _dictionaryMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

… on server

DataTable V3 move metadata section to the end of bytes when
serialization and use enum values (instead of String in V2) as key.
This change will be backward incompat if servers are upgraded first,
so brokers must be upgraded before servers. The compatibility of
the protocols will not be retained beyond 0.8.0 (or the next
version that is released)
@mqliang mqliang closed this Apr 2, 2021
@mqliang mqliang reopened this Apr 2, 2021
@mqliang mqliang closed this Apr 2, 2021
@mqliang mqliang reopened this Apr 2, 2021
@siddharthteotia siddharthteotia merged commit fb7ceb0 into apache:master Apr 2, 2021
dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
}
newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V2
Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mqliang , can you please fix the typo? this should be V3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in #6738

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backward-incompat Referenced by PRs that introduce or fix backward compat issues release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants