New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support STRING and BYTES for no dictionary columns in realtime consuming segments #4791
Support STRING and BYTES for no dictionary columns in realtime consuming segments #4791
Conversation
@@ -38,11 +41,13 @@ | |||
private static final String LONG_COL_NAME = "longcol"; | |||
private static final String FLOAT_COL_NAME = "floatcol"; | |||
private static final String DOUBLE_COL_NAME = "doublecol"; | |||
private static final int NUM_ROWS = 1000; | |||
private static final String STRING_COL_NAME = "stringcol"; | |||
private static final int NUM_ROWS = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry this was a bad change... while I was debugging the test I added. Need to undo this to set NUM_ROWS back to 1000
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -420,6 +420,7 @@ public int size() { | |||
case DOUBLE: | |||
return Double.BYTES; | |||
case BYTES: | |||
case STRING: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment below is not valid anymore I suppose. Actually, I am not sure what that comment means. So, can you please clarify the comment ? thanks
indexReaderWriter = new FixedByteSingleColumnSingleValueReaderWriter(_capacity, indexColumnSize, _memoryManager, | ||
allocationContext); | ||
|
||
if (forwardIndexColumnSize > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a little uncomfortable assuming the size of the column to be a certain value when FieldSpec does not export that value. How about we add a method to FieldSpec like isFixedWidthColumn(). We can set fwdIndexColumnSize to be -1 in this method, and modify it only if it is fixed width no dictionary column, or a column for which we create dictionary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Added isFixedWidthColumn() method to FieldSpec.DataType
// No dictionary | ||
indexColumnSize = dataType.size(); | ||
&& !invertedIndexColumns.contains(column)) { | ||
// No dictionary -- size will be equal to size of data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does mean that we support no dictionary for all types of columns. Seems ok, but just worried that if another column type is added for which we may need some speical logic, it may be hard to locate this place to change it. Short of introducing a method isNoDictionarySupportedForColumnType() I am not sure what else can be done. We can add the isSingleValueField() check inside the new method, though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion. Done
pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
Show resolved
Hide resolved
// for STRING/BYTES SV column, we support raw index in consuming segments | ||
// RealtimeSegmentStatsHistory does not have the stats for no-dictionary columns | ||
// from previous consuming segments | ||
// TODO: come up with better estimated values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cardinality should not be a factor here, since it is a raw index, and the actual values are stored. You only need some estimate for the average string length. We can get that from StatsHistory (as long as we update it correctly, of course). The call to construct VarByteSiunceColumnSVRW should take _capacity as the number of strings to add, and the averageLen that we can get from stats history.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cardinality was a wrong word I used for variable names. I meant number of values (rows).
I have added a TODO to capture the estimated average column length in realtime segment stats history for no dictionary columns as well. Currently we only do this for dictionary encoded columns. For now, a constant value of 100 is being used. I will follow-up with a PR to add support for this.
Secondly, using _capacity (which is essentially the maxSegmentRows as indicated RealtimeSegmentConfig) directly for allocating the memory for VarByteSVRW might not be good. Note that VarByteSVRW internally uses MutableOffHeapByteArrayStore which stores data in a list of buffers. Passing _capacity means the byte store will try to allocate a single giant buffer to store the strings/bytes for all rows. This memory allocation might fail if we are using DIRECT off-heap memory mode and _capacity is very high and memory is fragmented. So I do a min(_capacity, 100_000) as the initial capacity for VarByteSVRW to begin with smaller capacity as opposed to _capacity
Looking at the code, it seems like we already have a TODO to start with smaller capacity for MV columns.
public VarByteSingleColumnSingleValueReaderWriter( | ||
PinotDataBufferMemoryManager memoryManager, | ||
String allocationContext, | ||
int estimatedCardinality, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to maxNumberOfValues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -63,12 +68,52 @@ public int getCardinality() { | |||
|
|||
@Override | |||
public int getLengthOfShortestElement() { | |||
return lengthOfDataType(); // Only fixed length data types supported. | |||
FieldSpec.DataType dataType = _blockValSet.getValueType(); | |||
if (dataType == FieldSpec.DataType.STRING || dataType == FieldSpec.DataType.BYTES) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us introduce a method in FieldType isFixedWidth()? Will be easier when /if we add new data types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (dataType == FieldSpec.DataType.STRING || dataType == FieldSpec.DataType.BYTES) { | ||
// variable width no dictionary columns | ||
int minLength = Integer.MAX_VALUE; | ||
BaseSingleColumnSingleValueReaderWriter readerWriter = (BaseSingleColumnSingleValueReaderWriter)_forwardIndex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep track of shortest and longest element in the fwd index and just read it here? Will save time as well as garbage generation during segment build.
We can definitely compute min and max in one go, and not need to walk over the fwd index for each of them separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
e0275d8
to
e64b46f
Compare
@mcvsubbu , I have addressed the review comments. Also rebased on latest master |
Fixed test failures. Build is passing with latest changes. |
/** | ||
* Returns the forward index for the data source | ||
*/ | ||
public abstract DataFileReader getForwardIndex(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of modifying the DataSource interface and returning null in many places, let us modify the ColumnDataSource class to get the forwrardIndex. We can then use and type-cast it to our hearts content in the RealtimeNoDictionaryColStatistics class. Since this class is very specific, the type-casting should work fine without danger of exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
int forwardIndexColumnSize = -1; | ||
if (isNoDictionarySupportedForColumn(noDictionaryColumns, invertedIndexColumns, fieldSpec, column)) { | ||
// no dictionary | ||
// each forward index entry will be equal to size of data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// each forward index entry will be equal to size of data | |
// each forward index entry will be equal to size of data for that row |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (isNoDictionarySupportedForColumn(noDictionaryColumns, invertedIndexColumns, fieldSpec, column)) { | ||
// no dictionary | ||
// each forward index entry will be equal to size of data | ||
// for INT, LONG, FLOAT, DOUBLE it is equal to the number of fixed bytes used to store the value, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// for INT, LONG, FLOAT, DOUBLE it is equal to the number of fixed bytes used to store the value, | |
// For INT, LONG, FLOAT, DOUBLE it is equal to the size of the (serialized) raw value, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* @param column column name | ||
* @return true if column is no-dictionary, false if dictionary encoded | ||
*/ | ||
private boolean isNoDictionarySupportedForColumn(Set<String> noDictionaryColumns, Set<String> invertedIndexColumns, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private boolean isNoDictionarySupportedForColumn(Set<String> noDictionaryColumns, Set<String> invertedIndexColumns, | |
private boolean isNoDictionaryColumn(Set<String> noDictionaryColumns, Set<String> invertedIndexColumns, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
allocationContext); | ||
|
||
// create forward index reader/writer | ||
if (forwardIndexColumnSize != -1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be easier to read if we invert the if condition and exchange the if
and else
body.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -76,6 +77,9 @@ | |||
private static final int MIN_ROWS_TO_INDEX = 1000_000; // Min size of recordIdMap for updatable metrics. | |||
private static final int MIN_RECORD_ID_MAP_CACHE_SIZE = 10000; // Min overflow map size for updatable metrics. | |||
|
|||
private static final int NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe post-fix these definitions with DEFAULT at the end (since we will eventually get these from the stats) -- unless you are going to file the stats PR real soon. In that case, just let it be. this definition will probably move into the stats object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I will put a PR soon though
} | ||
try { | ||
int[] intValues = new int[NUM_ROWS]; | ||
dataFetcher.fetchIntValues(INT_COL_NAME, docIds, numDocIds, intValues); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we testing this again? This functionality is already tested in testIntValues() right? You only need to add testStringValue() and testBytesValue(). Since these data types cannot be fetched as any other,we only need to iterate the fetcher through the corresponding value types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes those were redundant. I included tests for STRING and BYTES in the same unit test.
...a/org/apache/pinot/core/io/readerwriter/impl/VarByteSingleColumnSingleValueReaderWriter.java
Show resolved
Hide resolved
e626b64
to
981e94a
Compare
@mcvsubbu , I have addressed the review comments. Please take another look |
Codecov Report
@@ Coverage Diff @@
## master #4791 +/- ##
=============================================
+ Coverage 36.06% 57.34% +21.27%
- Complexity 0 12 +12
=============================================
Files 1174 1174
Lines 62349 62244 -105
Branches 9175 9148 -27
=============================================
+ Hits 22487 35694 +13207
+ Misses 37851 23879 -13972
- Partials 2011 2671 +660
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm other than a minor comment, thanks
@Override | ||
public void setString(int row, String val) { | ||
byte[] serializedValue = StringUtil.encodeUtf8(val); | ||
_byteArrayStore.add(serializedValue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just invoke setBytes() here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks
981e94a
to
33f780f
Compare
Addressed the latest comment. Thanks for the review @mcvsubbu |
(1) PR apache#5256 added support for deriving num docs per chunk for var byte raw index create from column length. This was specifically done as part of supporting text blobs. For use cases that don't want this feature and are high QPS, see a negative impact since size of chunk increases (earlier value of numDocsPerChunk was hardcoded to 1000) and based on the access pattern we might end up uncompressing a bigger chunk to get values for a set of docIds. We have made this change configurable. So the default behaviour is same as old (1000 docs per chunk) (2) PR apache#4791 added support for noDict for STRING/BYTES in consuming segments. There is a particular impact of this change on the use cases that have set noDict on their STRING dimension columns for other performance reasons and also want metricsAggregation. These use cases don't get to aggregateMetrics because the new implementation was able to honor their table config setting of noDict on STRING/BYTES. Without metrics aggregation, memory pressure increases. So to continue aggregating metrics for such cases, we will create dictionary even if the column is part of noDictionary set from table config.
(1) PR apache#5256 added support for deriving num docs per chunk for var byte raw index create from column length. This was specifically done as part of supporting text blobs. For use cases that don't want this feature and are high QPS, see a negative impact since size of chunk increases (earlier value of numDocsPerChunk was hardcoded to 1000) and based on the access pattern we might end up uncompressing a bigger chunk to get values for a set of docIds. We have made this change configurable. So the default behaviour is same as old (1000 docs per chunk) (2) PR apache#4791 added support for noDict for STRING/BYTES in consuming segments. There is a particular impact of this change on the use cases that have set noDict on their STRING dimension columns for other performance reasons and also want metricsAggregation. These use cases don't get to aggregateMetrics because the new implementation was able to honor their table config setting of noDict on STRING/BYTES. Without metrics aggregation, memory pressure increases. So to continue aggregating metrics for such cases, we will create dictionary even if the column is part of noDictionary set from table config.
(1) PR #5256 added support for deriving num docs per chunk for var byte raw index create from column length. This was specifically done as part of supporting text blobs. For use cases that don't want this feature and are high QPS, see a negative impact since size of chunk increases (earlier value of numDocsPerChunk was hardcoded to 1000) and based on the access pattern we might end up uncompressing a bigger chunk to get values for a set of docIds. We have made this change configurable. So the default behaviour is same as old (1000 docs per chunk) (2) PR #4791 added support for noDict for STRING/BYTES in consuming segments. There is a particular impact of this change on the use cases that have set noDict on their STRING dimension columns for other performance reasons and also want metricsAggregation. These use cases don't get to aggregateMetrics because the new implementation was able to honor their table config setting of noDict on STRING/BYTES. Without metrics aggregation, memory pressure increases. So to continue aggregating metrics for such cases, we will create dictionary even if the column is part of noDictionary set from table config. Co-authored-by: Siddharth Teotia <steotia@steotia-mn1.linkedin.biz>
(1) PR #5256 added support for deriving num docs per chunk for var byte raw index create from column length. This was specifically done as part of supporting text blobs. For use cases that don't want this feature and are high QPS, see a negative impact since size of chunk increases (earlier value of numDocsPerChunk was hardcoded to 1000) and based on the access pattern we might end up uncompressing a bigger chunk to get values for a set of docIds. We have made this change configurable. So the default behaviour is same as old (1000 docs per chunk) (2) PR #4791 added support for noDict for STRING/BYTES in consuming segments. There is a particular impact of this change on the use cases that have set noDict on their STRING dimension columns for other performance reasons and also want metricsAggregation. These use cases don't get to aggregateMetrics because the new implementation was able to honor their table config setting of noDict on STRING/BYTES. Without metrics aggregation, memory pressure increases. So to continue aggregating metrics for such cases, we will create dictionary even if the column is part of noDictionary set from table config. Co-authored-by: Siddharth Teotia <steotia@steotia-mn1.linkedin.biz>
Added support for creation of raw index for var length columns in realtime consuming segments.
Issue #4034
cc @mcvsubbu
This is also needed for text search feature.