Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Presence vector #4585

Merged
merged 16 commits into from
Oct 28, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ public class IndexingConfig {
@ConfigKey("aggregateMetrics")
private boolean _aggregateMetrics;

@ConfigKey("nullHandlingEnabled")
Jackie-Jiang marked this conversation as resolved.
Show resolved Hide resolved
private boolean _nullHandlingEnabled;

/**
* The list of columns for which the variable length dictionary needs to be enabled in offline
* segments. This is only valid for string and bytes columns and has no impact for columns of
Expand Down Expand Up @@ -239,6 +242,14 @@ public void setVarLengthDictionaryColumns(List<String> varLengthDictionaryColumn
_varLengthDictionaryColumns = varLengthDictionaryColumns;
}

public boolean isNullHandlingEnabled() {
return _nullHandlingEnabled;
}

public void setNullHandlingEnabled(boolean nullHandlingEnabled) {
_nullHandlingEnabled = nullHandlingEnabled;
}

@Override
public String toString() {
final StringBuilder result = new StringBuilder();
Expand Down Expand Up @@ -296,7 +307,9 @@ public boolean equals(Object o) {
.isEqual(_starTreeIndexSpec, that._starTreeIndexSpec) && EqualityUtils
.isEqual(_segmentPartitionConfig, that._segmentPartitionConfig) && EqualityUtils
.isEqual(_bloomFilterColumns, that._bloomFilterColumns) && EqualityUtils
.isEqual(_varLengthDictionaryColumns, that._varLengthDictionaryColumns);
.isEqual(_varLengthDictionaryColumns, that._varLengthDictionaryColumns) && EqualityUtils
.isEqual(_aggregateMetrics, that._aggregateMetrics) && EqualityUtils
.isEqual(_nullHandlingEnabled, that._nullHandlingEnabled);
}

@Override
Expand All @@ -316,6 +329,8 @@ public int hashCode() {
result = EqualityUtils.hashCodeOf(result, _segmentPartitionConfig);
result = EqualityUtils.hashCodeOf(result, _bloomFilterColumns);
result = EqualityUtils.hashCodeOf(result, _varLengthDictionaryColumns);
result = EqualityUtils.hashCodeOf(result, _aggregateMetrics);
result = EqualityUtils.hashCodeOf(result, _nullHandlingEnabled);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public interface SegmentMetadata {

String getBloomFilterFileName(String column);

String getNullValueVectorFileName(String column);

String getCreatorName();

char getPaddingCharacter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
import org.apache.pinot.core.segment.index.readers.Dictionary;
import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
import org.apache.pinot.core.segment.index.readers.NullValueVectorReader;


public abstract class DataSource extends BaseOperator {
Expand All @@ -45,4 +46,10 @@ public abstract class DataSource extends BaseOperator {
* Returns the bloom filter for the data source if exists, or {@code null} if not.
*/
public abstract BloomFilterReader getBloomFilter();

/**
* Returns null value vector for the data source if exists, or {@code null} if not.
*/
public abstract NullValueVectorReader getNullValueVector();

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ public boolean isNullValue(String fieldName) {
return _nullValueFields.contains(fieldName);
}

/**
* Returns whether this row has null values for any of the columns
*/
public boolean hasNullValues() {
return !_nullValueFields.isEmpty();
}

/**
* Sets the value for the given field.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ public HLRealtimeSegmentDataManager(final RealtimeSegmentZKMetadata realtimeSegm
getMemoryManager(realtimeTableDataManager.getConsumerDir(), segmentName,
indexLoadingConfig.isRealtimeOffheapAllocation(),
indexLoadingConfig.isDirectRealtimeOffheapAllocation(), serverMetrics))
.setStatsHistory(realtimeTableDataManager.getStatsHistory()).build();
.setStatsHistory(realtimeTableDataManager.getStatsHistory())
.setNullHandlingEnabled(indexingConfig.isNullHandlingEnabled()).build();
realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfig);

notifier = realtimeTableDataManager;
Expand Down Expand Up @@ -272,7 +273,7 @@ public void run() {
new RealtimeSegmentConverter(realtimeSegment, tempSegmentFolder.getAbsolutePath(), schema,
tableNameWithType, timeColumnName, realtimeSegmentZKMetadata.getSegmentName(), sortedColumn,
HLRealtimeSegmentDataManager.this.invertedIndexColumns, noDictionaryColumns,
varLengthDictionaryColumns, null/*StarTreeIndexSpec*/); // Star tree not supported for HLC.
varLengthDictionaryColumns, null/*StarTreeIndexSpec*/, indexingConfig.isNullHandlingEnabled()); // Star tree not supported for HLC.

segmentLogger.info("Trying to build segment");
final long buildStartTime = System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ public Map<String, File> getMetadataFiles() {
private String _stopReason = null;
private final Semaphore _segBuildSemaphore;
private final boolean _isOffHeap;
private final boolean _nullHandlingEnabled;

// TODO each time this method is called, we print reason for stop. Good to print only once.
private boolean endCriteriaReached() {
Expand Down Expand Up @@ -687,7 +688,8 @@ protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(), _schema,
_tableNameWithType, _timeColumnName, _segmentZKMetadata.getSegmentName(), _sortedColumn,
_invertedIndexColumns, _noDictionaryColumns, _varLengthDictionaryColumns, _starTreeIndexSpec);
_invertedIndexColumns, _noDictionaryColumns, _varLengthDictionaryColumns, _starTreeIndexSpec,
_nullHandlingEnabled);
segmentLogger.info("Trying to build segment");
try {
converter.build(_segmentVersion, _serverMetrics);
Expand Down Expand Up @@ -1139,6 +1141,8 @@ public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata,

_isOffHeap = indexLoadingConfig.isRealtimeOffheapAllocation();

_nullHandlingEnabled = indexingConfig.isNullHandlingEnabled();

// Start new realtime segment
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
new RealtimeSegmentConfig.Builder().setSegmentName(_segmentNameStr).setStreamName(_streamTopic)
Expand All @@ -1149,14 +1153,15 @@ public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata,
.setInvertedIndexColumns(invertedIndexColumns).setRealtimeSegmentZKMetadata(segmentZKMetadata)
.setOffHeap(_isOffHeap).setMemoryManager(_memoryManager)
.setStatsHistory(realtimeTableDataManager.getStatsHistory())
.setAggregateMetrics(indexingConfig.isAggregateMetrics());
.setAggregateMetrics(indexingConfig.isAggregateMetrics())
.setNullHandlingEnabled(_nullHandlingEnabled);

// Create message decoder
_messageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, _schema);
_clientId = _streamPartitionId + "-" + NetUtil.getHostnameOrAddress();

// Create record transformer
_recordTransformer = CompositeTransformer.getDefaultTransformer(_schema);
_recordTransformer = CompositeTransformer.getDefaultTransformer(schema);
makeStreamConsumer("Starting");
makeStreamMetadataProvider("Starting");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public enum TimeColumnType {
// Use on-heap or off-heap memory to generate index (currently only affect inverted index and star-tree v2)
private boolean _onHeap = false;
private boolean _checkTimeColumnValidityDuringGeneration = true;
private boolean _nullHandlingEnabled = false;

public SegmentGeneratorConfig() {
}
Expand Down Expand Up @@ -162,6 +163,7 @@ public SegmentGeneratorConfig(SegmentGeneratorConfig config) {
_onHeap = config._onHeap;
_recordReaderPath = config._recordReaderPath;
_checkTimeColumnValidityDuringGeneration = config._checkTimeColumnValidityDuringGeneration;
_nullHandlingEnabled = config._nullHandlingEnabled;
}

/**
Expand Down Expand Up @@ -218,6 +220,8 @@ public SegmentGeneratorConfig(@Nullable TableConfig tableConfig, Schema schema)

SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
_hllConfig = validationConfig.getHllConfig();

_nullHandlingEnabled = indexingConfig.isNullHandlingEnabled();
}

public SegmentGeneratorConfig(Schema schema) {
Expand Down Expand Up @@ -680,4 +684,12 @@ private String getQualifyingFields(FieldType type, boolean excludeVirtualColumns
Collections.sort(fields);
return StringUtils.join(fields, ",");
}

public boolean isNullHandlingEnabled() {
return _nullHandlingEnabled;
}

public void setNullHandlingEnabled(boolean nullHandlingEnabled) {
_nullHandlingEnabled = nullHandlingEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@
import org.apache.pinot.core.realtime.impl.dictionary.BaseOffHeapMutableDictionary;
import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory;
import org.apache.pinot.core.realtime.impl.invertedindex.RealtimeInvertedIndexReader;
import org.apache.pinot.core.realtime.impl.nullvalue.RealtimeNullValueVectorReaderWriter;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
import org.apache.pinot.core.segment.index.data.source.ColumnDataSource;
import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
import org.apache.pinot.core.segment.index.readers.NullValueVectorReader;
import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnContext;
import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnProvider;
import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnProviderFactory;
Expand Down Expand Up @@ -86,12 +88,14 @@ public class MutableSegmentImpl implements MutableSegment {
private final PinotDataBufferMemoryManager _memoryManager;
private final RealtimeSegmentStatsHistory _statsHistory;
private final SegmentPartitionConfig _segmentPartitionConfig;
private final boolean _nullHandlingEnabled;

private final Map<String, BaseMutableDictionary> _dictionaryMap = new HashMap<>();
private final Map<String, DataFileReader> _indexReaderWriterMap = new HashMap<>();
private final Map<String, Integer> _maxNumValuesMap = new HashMap<>();
private final Map<String, RealtimeInvertedIndexReader> _invertedIndexMap = new HashMap<>();
private final Map<String, BloomFilterReader> _bloomFilterMap = new HashMap<>();
private final Map<String, RealtimeNullValueVectorReaderWriter> _nullValueVectorMap = new HashMap<>();
private final IdMap<FixedIntArray> _recordIdMap;
private boolean _aggregateMetrics;

Expand Down Expand Up @@ -142,6 +146,7 @@ public long getLatestIngestionTimestamp() {
_memoryManager = config.getMemoryManager();
_statsHistory = config.getStatsHistory();
_segmentPartitionConfig = config.getSegmentPartitionConfig();
_nullHandlingEnabled = config.isNullHandlingEnabled();

Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
Expand Down Expand Up @@ -225,6 +230,10 @@ public long getLatestIngestionTimestamp() {
if (invertedIndexColumns.contains(column)) {
_invertedIndexMap.put(column, new RealtimeInvertedIndexReader());
}

if (_nullHandlingEnabled) {
_nullValueVectorMap.put(column, new RealtimeNullValueVectorReaderWriter());
}
}

// Metric aggregation can be enabled only if config is specified, and all dimensions have dictionary,
Expand Down Expand Up @@ -262,6 +271,9 @@ public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) {
// Add forward and inverted indices for new document.
addForwardIndex(row, docId, dictIdMap);
addInvertedIndex(docId, dictIdMap);
if (_nullHandlingEnabled) {
addNullValueVector(row, docId);
icefury71 marked this conversation as resolved.
Show resolved Hide resolved
}

// Update number of document indexed at last to make the latest record queryable
canTakeMore = _numDocsIndexed++ < _capacity;
Expand Down Expand Up @@ -379,6 +391,22 @@ private void addInvertedIndex(int docId, Map<String, Object> dictIdMap) {
}
}

/**
* Check if the row has any null fields and update the
* column null value vectors accordingly
* @param row specifies row being ingested
* @param docId specified docId for this row
*/
private void addNullValueVector(GenericRow row, int docId) {
if (!row.hasNullValues()) {
return;
}

for (String columnName : row.getNullValueFields()) {
_nullValueVectorMap.get(columnName).setNull(docId);
}
}

private boolean aggregateMetrics(GenericRow row, int docId) {
for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) {
String column = metricFieldSpec.getName();
Expand Down Expand Up @@ -453,7 +481,7 @@ public ColumnDataSource getDataSource(String columnName) {
} else {
return new ColumnDataSource(fieldSpec, _numDocsIndexed, _maxNumValuesMap.get(columnName),
_indexReaderWriterMap.get(columnName), _invertedIndexMap.get(columnName), _dictionaryMap.get(columnName),
_bloomFilterMap.get(columnName));
_bloomFilterMap.get(columnName), _nullValueVectorMap.get(columnName));
}
}

Expand All @@ -471,9 +499,18 @@ public List<StarTreeV2> getStarTrees() {
public GenericRow getRecord(int docId, GenericRow reuse) {
for (FieldSpec fieldSpec : _physicalFieldSpecs) {
String column = fieldSpec.getName();
reuse.putField(column, IndexSegmentUtils
Object value = IndexSegmentUtils
.getValue(docId, fieldSpec, _indexReaderWriterMap.get(column), _dictionaryMap.get(column),
_maxNumValuesMap.getOrDefault(column, 0)));
_maxNumValuesMap.getOrDefault(column, 0));
reuse.putValue(column, value);

if (_nullHandlingEnabled) {
NullValueVectorReader reader = _nullValueVectorMap.get(column);
// If column has null value for this docId, set that accordingly in GenericRow
if (reader != null && reader.isNull(docId)) {
icefury71 marked this conversation as resolved.
Show resolved Hide resolved
reuse.putDefaultNullValue(column, value);
}
}
}
return reuse;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.pinot.core.operator.docidsets;

import java.util.Arrays;
import java.util.Iterator;
import org.apache.pinot.core.common.BlockDocIdIterator;
import org.apache.pinot.core.operator.dociditerators.BitmapDocIdIterator;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.roaringbitmap.buffer.MutableRoaringBitmap;

Expand All @@ -33,7 +36,8 @@ public class BitmapDocIdSet implements FilterBlockDocIdSet {
public BitmapDocIdSet(ImmutableRoaringBitmap[] bitmaps, int startDocId, int endDocId, boolean exclusive) {
int numBitmaps = bitmaps.length;
if (numBitmaps > 1) {
MutableRoaringBitmap orBitmap = MutableRoaringBitmap.or(bitmaps);
Iterator iterator = Arrays.asList(bitmaps).iterator();
MutableRoaringBitmap orBitmap = MutableRoaringBitmap.or(iterator, startDocId, endDocId + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change about a bug fix or API change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kishoreg : can you please clarify ? I'm not sure we need this change for presence vector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this might be slightly more efficient (based on conversation with Kishore)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could backfire because there is an extra step of selecting range for each bitmap. I checked the code and seems this step is always redundant because we always use 0 as startDocId and numDocs-1 as endDocId. Also this is not related to this PR. @kishoreg Do you see actual performance gain for this change?

if (exclusive) {
orBitmap.flip(startDocId, endDocId + 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ public class RealtimeSegmentConverter {
private List<String> noDictionaryColumns;
private StarTreeIndexSpec starTreeIndexSpec;
private List<String> varLengthDictionaryColumns;
private final boolean _nullHandlingEnabled;

public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, String outputPath, Schema schema,
String tableName, String timeColumnName, String segmentName, String sortedColumn,
List<String> invertedIndexColumns, List<String> noDictionaryColumns,
List<String> varLengthDictionaryColumns, StarTreeIndexSpec starTreeIndexSpec) {
List<String> varLengthDictionaryColumns, StarTreeIndexSpec starTreeIndexSpec, boolean nullHandlingEnabled) {
if (new File(outputPath).exists()) {
throw new IllegalAccessError("path already exists:" + outputPath);
}
Expand All @@ -77,12 +78,13 @@ public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, String outpu
this.noDictionaryColumns = noDictionaryColumns;
this.varLengthDictionaryColumns = varLengthDictionaryColumns;
this.starTreeIndexSpec = starTreeIndexSpec;
this._nullHandlingEnabled = nullHandlingEnabled;
}

public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, String outputPath, Schema schema,
String tableName, String timeColumnName, String segmentName, String sortedColumn) {
this(realtimeSegment, outputPath, schema, tableName, timeColumnName, segmentName, sortedColumn, new ArrayList<>(),
new ArrayList<>(), new ArrayList<>(), null/*StarTreeIndexSpec*/);
new ArrayList<>(), new ArrayList<>(), null/*StarTreeIndexSpec*/, false/*nullHandlingEnabled*/);
}

public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverMetrics)
Expand Down Expand Up @@ -135,6 +137,7 @@ public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverM
genConfig.setSegmentName(segmentName);
SegmentPartitionConfig segmentPartitionConfig = realtimeSegmentImpl.getSegmentPartitionConfig();
genConfig.setSegmentPartitionConfig(segmentPartitionConfig);
genConfig.setNullHandlingEnabled(_nullHandlingEnabled);
final SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
RealtimeSegmentSegmentCreationDataSource dataSource =
new RealtimeSegmentSegmentCreationDataSource(realtimeSegmentImpl, reader, dataSchema);
Expand Down
Loading