New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update Segment builder to use column major tables #11776
Update Segment builder to use column major tables #11776
Conversation
Codecov Report
@@ Coverage Diff @@
## master #11776 +/- ##
=============================================
+ Coverage 14.45% 62.95% +48.49%
- Complexity 201 1141 +940
=============================================
Files 2342 2367 +25
Lines 125917 127951 +2034
Branches 19370 19743 +373
=============================================
+ Hits 18205 80553 +62348
+ Misses 106170 41677 -64493
- Partials 1542 5721 +4179
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 1571 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overall logic looks good. Let's clean up the TODOs and debug code
To test it completely, let's enable it by default and see if the existing tests can pass
@@ -220,15 +222,20 @@ public String getSegmentName() { | |||
} | |||
|
|||
public void getRecord(int docId, GenericRow buffer) { | |||
// TODO: start duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the change in this class temporary debugging code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
|
||
// Check if column major mode should be enabled | ||
try { | ||
// TODO(Erich): move this so that the code does not directly reference the flag name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this into TableConfig -> IngestionConfig -> StreamIngestionConfig, and add a field _enableColumnMajorSegmentCreation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed this. is using the deprecated config structure for their tables (from the sample I got) and this is for the new stream config. To make migration simple I added a field for both the old and new configuration methods.
public boolean isColumnMajorEnabled() { | ||
return _enableColumnMajor; | ||
} | ||
|
||
public int getTotalDocCount() { | ||
return _totalDocs; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems not used. Suggest removing them and change these 2 variables to local variable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is used in a log statement to record what was done while building the segment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see them used in RealtimeSegmentDataManager
, but these info are already logged in SegmentIndexCreationDriverImpl
who has the best knowledge on how the segment is built. No need to log detailed info at top level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
@@ -104,6 +106,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { | |||
private int _totalDocs; | |||
private int _docIdCounter; | |||
private boolean _nullHandlingEnabled; | |||
private long _durationNS = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
} | ||
|
||
private void indexColumnValue(PinotSegmentColumnReader colReader, | ||
Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(code format) Please follow the Pinot Style
@@ -102,7 +104,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive | |||
private int _totalDocs = 0; | |||
private File _tempIndexDir; | |||
private String _segmentName; | |||
private long _totalRecordReadTime = 0; | |||
private long _totalRecordReadTimeNS = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the naming consistent
private long _totalRecordReadTimeNS = 0; | |
private long _totalRecordReadTimeNs = 0; |
@@ -344,7 +390,7 @@ private void handlePostCreation() | |||
// Persist creation metadata to disk | |||
persistCreationMeta(segmentOutputDir, crc, creationTime); | |||
|
|||
LOGGER.info("Driver, record read time : {}", _totalRecordReadTime); | |||
LOGGER.info("Driver, record read time (NS) : {}", _totalRecordReadTimeNS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the log unchanged, but convert the time to ms
* @param sortedDocIds - If not null, then this provides the sorted order of documents. | ||
* @param colReader - Used to get the values of the column. | ||
*/ | ||
void indexColumn(String columnName, @Nullable int[] sortedDocIds, IndexSegment colReader) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The third argument is not really a column reader
_indexCreator.indexColumn(col, sortedDocIds, indexSegment); | ||
} | ||
} catch (Exception e) { | ||
_indexCreator.close(); // TODO: Why is this only closed on an exception? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In regular case, it will be closed after handlePostCreation()
...java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
Show resolved
Hide resolved
…he new config structure
@@ -34,6 +34,9 @@ public class StreamIngestionConfig extends BaseJsonConfig { | |||
@JsonPropertyDescription("All configs for the streams from which to ingest") | |||
private final List<Map<String, String>> _streamConfigMaps; | |||
|
|||
@JsonPropertyDescription("Whether to use column major mode when creating the segment.") | |||
private boolean _columnMajorSegmentBuilderEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this PR always uses the config from IndexingConfig
instead of this. Even though ideally it should be configured here, since it is a short-lived config (we want to always enable it in the future), let's remove it from here to avoid confusion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both are checked in the RealTimeSegmentDataManager
constructor: if there's a Stream Ingestion Config section then it uses that to check for the Segment Builder mode and if there isn't then it checks the the old configuration section for the flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a newly added temporary flag, don't see much value supporting it in 2 different places. Let's just remove it from here and only keep the one in IndexingConfig
for simplicity
@@ -97,6 +93,7 @@ public enum TimeColumnType { | |||
private String _segmentNamePrefix = null; | |||
private String _segmentNamePostfix = null; | |||
private String _segmentTimeColumnName = null; | |||
private boolean _segmentEnableColumnMajor = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the changes in this config is required. Both the fields are not used
_segmentLogger | ||
.info("Stopping consumption due to time limit start={} now={} numRowsConsumed={} numRowsIndexed={}", | ||
_startTimeMs, now, _numRowsConsumed, _numRowsIndexed); | ||
_segmentLogger.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you revert the format change for the unrelated code? Currently it is very hard to find the relevant changes. Alternatively, you may also file a PR just for the reformat for the files changed in this PR, and we can merge that first then rebase this on top of that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -297,6 +297,7 @@ public void deleteSegmentFile() { | |||
private final Semaphore _segBuildSemaphore; | |||
private final boolean _isOffHeap; | |||
private final boolean _nullHandlingEnabled; | |||
private final boolean _enableColumnMajorSegmentBuilder; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to change this class. RealtimeSegmentConverter
has access to TableConfig
and we can extract this within RealtimeSegmentConverter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
public boolean isColumnMajorEnabled() { | ||
return _enableColumnMajor; | ||
} | ||
|
||
public int getTotalDocCount() { | ||
return _totalDocs; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see them used in RealtimeSegmentDataManager
, but these info are already logged in SegmentIndexCreationDriverImpl
who has the best knowledge on how the segment is built. No need to log detailed info at top level
@@ -303,6 +305,8 @@ public static ChunkCompressionType getDefaultCompressionType(FieldType fieldType | |||
@Override | |||
public void indexRow(GenericRow row) | |||
throws IOException { | |||
long startNS = System.nanoTime(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
@@ -102,14 +104,14 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { | |||
private Schema _schema; | |||
private File _indexDir; | |||
private int _totalDocs; | |||
private int _docIdCounter; | |||
private int _docPosOnDisk; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't change this. We should not update this when building the segment in column major fashion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (sortedDocIds != null) { | ||
int onDiskDocId = 0; | ||
for (int docId : sortedDocIds) { | ||
indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, dictionaryCreator, docId, onDiskDocId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For better performance, we want to change the order of the loop:
- Loop over the columns
- Loop over the index creator
- Loop over the docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we can directly seal the index creator when a column is indexed. Can be addressed separately
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Testing this right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the change.
// In row oriented: | ||
// - this.indexRow iterates over each column and checks if it isNullValue. If it is then it sets the null | ||
// value vector for that doc id | ||
// - This null value comes from the GenericRow that is created by PinotSegmentRecordReader | ||
// - PinotSegmentRecordReader:L224 is where we figure out the null value stuff | ||
// - PSegRecReader calls PinotSegmentColumnReader.isNull on the doc id to determine if the value for that | ||
// column of that docId is null | ||
// - if it returns true and we are NOT skipping null values we put the default null value into that field | ||
// of the GenericRow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this relevant? I don't follow this comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put it in because I wanted to see if it would help people better understand the different steps that are involved in the null value logic. But since it didn't help, I'll remove it.
} | ||
|
||
@Test | ||
public void testNoRecordsIndexedColumnMajorSegmentBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add a test when there are some records indexed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
… helper method rather than a member variable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM otherwise
@@ -34,6 +34,9 @@ public class StreamIngestionConfig extends BaseJsonConfig { | |||
@JsonPropertyDescription("All configs for the streams from which to ingest") | |||
private final List<Map<String, String>> _streamConfigMaps; | |||
|
|||
@JsonPropertyDescription("Whether to use column major mode when creating the segment.") | |||
private boolean _columnMajorSegmentBuilderEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a newly added temporary flag, don't see much value supporting it in 2 different places. Let's just remove it from here and only keep the one in IndexingConfig
for simplicity
@@ -117,6 +113,7 @@ public enum TimeColumnType { | |||
// Use on-heap or off-heap memory to generate index (currently only affect inverted index and star-tree v2) | |||
private boolean _onHeap = false; | |||
private boolean _nullHandlingEnabled = false; | |||
private boolean _columnMajorSegmentBuilderEnabled = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not used and not needed.
@@ -194,6 +194,10 @@ public int[] getSortedDocIds() { | |||
return _sortedDocIds; | |||
} | |||
|
|||
public boolean getSkipDefaultNullValues() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, seems it is not changed. Was there commit not pushed?
@@ -344,12 +394,13 @@ private void handlePostCreation() | |||
// Persist creation metadata to disk | |||
persistCreationMeta(segmentOutputDir, crc, creationTime); | |||
|
|||
LOGGER.info("Driver, record read time : {}", _totalRecordReadTime); | |||
LOGGER.info("Driver, record read time : {}", ((float) _totalRecordReadTimeNs) / 1000000.0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to log float time
LOGGER.info("Driver, record read time : {}", ((float) _totalRecordReadTimeNs) / 1000000.0); | |
LOGGER.info("Driver, record read time : {}", TimeUnit.NANOSECONDS.toMillis(_totalRecordReadTimeNs)); |
@@ -229,16 +232,21 @@ public void build() | |||
GenericRow reuse = new GenericRow(); | |||
TransformPipeline.Result reusedResult = new TransformPipeline.Result(); | |||
while (_recordReader.hasNext()) { | |||
long recordReadStartTime = System.currentTimeMillis(); | |||
long recordReadStopTime = System.currentTimeMillis(); | |||
long recordReadStopTime = System.nanoTime(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My IDE will show the redundant statement, not sure if you need to enable it explicitly
long recordReadStopTime = System.nanoTime(); | |
long recordReadStopTime; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd assumed that was a deliberate coding style choice for Pinot: went ahead and got rid of it.
for (int docId : sortedDocIds) { | ||
indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, dictionaryCreator, docId, onDiskDocId, | ||
nullVec, skipDefaultNullValues); | ||
onDiskDocId += 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit)
onDiskDocId += 1; | |
onDiskDocId++; |
indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex, creatorsByIndex); | ||
} | ||
|
||
if (_nullHandlingEnabled && !skipDefaultNullValues) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the second check
if (_nullHandlingEnabled && !skipDefaultNullValues) { | |
if (_nullHandlingEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No idea how this change got lost, but re-removed it.
This PR updates the Realtime Segment Builder to remove an unnecessary and costly transformation: Pinot would transpose the column major table to a row major table and then transform it back to a column oriented table. This PR updates the segment builder to use column oriented entirely for the construction. This change improves the performance of segment building, especially for very wide tables.
In order to minimize disruption and risk, this change is kept behind a table level configuration flag (
columnMajorSegmentBuilderEnabled
), which isfalse
by default. If this flag istrue
then the table's segments will be built with the column oriented process.