Skip to content

Commit

Permalink
Presence vector (apache#4585)
Browse files Browse the repository at this point in the history
This PR adds support for a presence vector inside a mutable and immutable segment. This will enable the query layer to ignore null values in the corresponding columns. Please see this issue for more details: apache#4230

High level gist is as follows:

Create a presence vector per column per segment by default. This presence vector keeps track of which document ID has a null value in the corresponding column.
Expose this through the DataSource interface to enable the caller to ignore such document IDs.
Presence vector is not really used anywhere as part of this PR. Subsequent PRs will use them to enable filtering out columns in predicates (eg: select ... from table where column_name != null)
  • Loading branch information
icefury71 authored and chenboat committed Nov 15, 2019
1 parent 4975d31 commit 2c044d7
Show file tree
Hide file tree
Showing 40 changed files with 971 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ public class IndexingConfig {
@ConfigKey("aggregateMetrics")
private boolean _aggregateMetrics;

@ConfigKey("nullHandlingEnabled")
private boolean _nullHandlingEnabled;

/**
* The list of columns for which the variable length dictionary needs to be enabled in offline
* segments. This is only valid for string and bytes columns and has no impact for columns of
Expand Down Expand Up @@ -228,6 +231,14 @@ public void setVarLengthDictionaryColumns(List<String> varLengthDictionaryColumn
_varLengthDictionaryColumns = varLengthDictionaryColumns;
}

public boolean isNullHandlingEnabled() {
return _nullHandlingEnabled;
}

public void setNullHandlingEnabled(boolean nullHandlingEnabled) {
_nullHandlingEnabled = nullHandlingEnabled;
}

@Override
public String toString() {
final StringBuilder result = new StringBuilder();
Expand Down Expand Up @@ -285,7 +296,9 @@ public boolean equals(Object o) {
.isEqual(_starTreeIndexSpec, that._starTreeIndexSpec) && EqualityUtils
.isEqual(_segmentPartitionConfig, that._segmentPartitionConfig) && EqualityUtils
.isEqual(_bloomFilterColumns, that._bloomFilterColumns) && EqualityUtils
.isEqual(_varLengthDictionaryColumns, that._varLengthDictionaryColumns);
.isEqual(_varLengthDictionaryColumns, that._varLengthDictionaryColumns) && EqualityUtils
.isEqual(_aggregateMetrics, that._aggregateMetrics) && EqualityUtils
.isEqual(_nullHandlingEnabled, that._nullHandlingEnabled);
}

@Override
Expand All @@ -305,6 +318,8 @@ public int hashCode() {
result = EqualityUtils.hashCodeOf(result, _segmentPartitionConfig);
result = EqualityUtils.hashCodeOf(result, _bloomFilterColumns);
result = EqualityUtils.hashCodeOf(result, _varLengthDictionaryColumns);
result = EqualityUtils.hashCodeOf(result, _aggregateMetrics);
result = EqualityUtils.hashCodeOf(result, _nullHandlingEnabled);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public interface SegmentMetadata {

String getBloomFilterFileName(String column);

String getNullValueVectorFileName(String column);

String getCreatorName();

char getPaddingCharacter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
import org.apache.pinot.core.segment.index.readers.Dictionary;
import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
import org.apache.pinot.core.segment.index.readers.NullValueVectorReader;


public abstract class DataSource extends BaseOperator {
Expand All @@ -45,4 +46,10 @@ public abstract class DataSource extends BaseOperator {
* Returns the bloom filter for the data source if exists, or {@code null} if not.
*/
public abstract BloomFilterReader getBloomFilter();

/**
* Returns null value vector for the data source if exists, or {@code null} if not.
*/
public abstract NullValueVectorReader getNullValueVector();

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ public boolean isNullValue(String fieldName) {
return _nullValueFields.contains(fieldName);
}

/**
* Returns whether this row has null values for any of the columns
*/
public boolean hasNullValues() {
return !_nullValueFields.isEmpty();
}

/**
* Sets the value for the given field.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ public HLRealtimeSegmentDataManager(final RealtimeSegmentZKMetadata realtimeSegm
getMemoryManager(realtimeTableDataManager.getConsumerDir(), segmentName,
indexLoadingConfig.isRealtimeOffheapAllocation(),
indexLoadingConfig.isDirectRealtimeOffheapAllocation(), serverMetrics))
.setStatsHistory(realtimeTableDataManager.getStatsHistory()).build();
.setStatsHistory(realtimeTableDataManager.getStatsHistory())
.setNullHandlingEnabled(indexingConfig.isNullHandlingEnabled()).build();
realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfig);

notifier = realtimeTableDataManager;
Expand Down Expand Up @@ -272,7 +273,7 @@ public void run() {
new RealtimeSegmentConverter(realtimeSegment, tempSegmentFolder.getAbsolutePath(), schema,
tableNameWithType, timeColumnName, realtimeSegmentZKMetadata.getSegmentName(), sortedColumn,
HLRealtimeSegmentDataManager.this.invertedIndexColumns, noDictionaryColumns,
varLengthDictionaryColumns, null/*StarTreeIndexSpec*/); // Star tree not supported for HLC.
varLengthDictionaryColumns, null/*StarTreeIndexSpec*/, indexingConfig.isNullHandlingEnabled()); // Star tree not supported for HLC.

segmentLogger.info("Trying to build segment");
final long buildStartTime = System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ public Map<String, File> getMetadataFiles() {
private String _stopReason = null;
private final Semaphore _segBuildSemaphore;
private final boolean _isOffHeap;
private final boolean _nullHandlingEnabled;

// TODO each time this method is called, we print reason for stop. Good to print only once.
private boolean endCriteriaReached() {
Expand Down Expand Up @@ -689,7 +690,8 @@ protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(), _schema,
_tableNameWithType, _timeColumnName, _segmentZKMetadata.getSegmentName(), _sortedColumn,
_invertedIndexColumns, _noDictionaryColumns, _varLengthDictionaryColumns, _starTreeIndexSpec);
_invertedIndexColumns, _noDictionaryColumns, _varLengthDictionaryColumns, _starTreeIndexSpec,
_nullHandlingEnabled);
segmentLogger.info("Trying to build segment");
try {
converter.build(_segmentVersion, _serverMetrics);
Expand Down Expand Up @@ -1134,6 +1136,8 @@ public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata,

_isOffHeap = indexLoadingConfig.isRealtimeOffheapAllocation();

_nullHandlingEnabled = indexingConfig.isNullHandlingEnabled();

// Start new realtime segment
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
new RealtimeSegmentConfig.Builder().setSegmentName(_segmentNameStr).setStreamName(_streamTopic)
Expand All @@ -1144,14 +1148,15 @@ public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata,
.setInvertedIndexColumns(invertedIndexColumns).setRealtimeSegmentZKMetadata(segmentZKMetadata)
.setOffHeap(_isOffHeap).setMemoryManager(_memoryManager)
.setStatsHistory(realtimeTableDataManager.getStatsHistory())
.setAggregateMetrics(indexingConfig.isAggregateMetrics());
.setAggregateMetrics(indexingConfig.isAggregateMetrics())
.setNullHandlingEnabled(_nullHandlingEnabled);

// Create message decoder
_messageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, _schema);
_clientId = _streamPartitionId + "-" + NetUtil.getHostnameOrAddress();

// Create record transformer
_recordTransformer = CompositeTransformer.getDefaultTransformer(_schema);
_recordTransformer = CompositeTransformer.getDefaultTransformer(schema);
makeStreamConsumer("Starting");
makeStreamMetadataProvider("Starting");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public enum TimeColumnType {
// Use on-heap or off-heap memory to generate index (currently only affect inverted index and star-tree v2)
private boolean _onHeap = false;
private boolean _checkTimeColumnValidityDuringGeneration = true;
private boolean _nullHandlingEnabled = false;

public SegmentGeneratorConfig() {
}
Expand Down Expand Up @@ -162,6 +163,7 @@ public SegmentGeneratorConfig(SegmentGeneratorConfig config) {
_onHeap = config._onHeap;
_recordReaderPath = config._recordReaderPath;
_checkTimeColumnValidityDuringGeneration = config._checkTimeColumnValidityDuringGeneration;
_nullHandlingEnabled = config._nullHandlingEnabled;
}

/**
Expand Down Expand Up @@ -218,6 +220,8 @@ public SegmentGeneratorConfig(@Nullable TableConfig tableConfig, Schema schema)

SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
_hllConfig = validationConfig.getHllConfig();

_nullHandlingEnabled = indexingConfig.isNullHandlingEnabled();
}

public SegmentGeneratorConfig(Schema schema) {
Expand Down Expand Up @@ -680,4 +684,12 @@ private String getQualifyingFields(FieldType type, boolean excludeVirtualColumns
Collections.sort(fields);
return StringUtils.join(fields, ",");
}

public boolean isNullHandlingEnabled() {
return _nullHandlingEnabled;
}

public void setNullHandlingEnabled(boolean nullHandlingEnabled) {
_nullHandlingEnabled = nullHandlingEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@
import org.apache.pinot.core.realtime.impl.dictionary.BaseOffHeapMutableDictionary;
import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory;
import org.apache.pinot.core.realtime.impl.invertedindex.RealtimeInvertedIndexReader;
import org.apache.pinot.core.realtime.impl.nullvalue.RealtimeNullValueVectorReaderWriter;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
import org.apache.pinot.core.segment.index.data.source.ColumnDataSource;
import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
import org.apache.pinot.core.segment.index.readers.NullValueVectorReader;
import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnContext;
import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnProvider;
import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnProviderFactory;
Expand Down Expand Up @@ -86,12 +88,14 @@ public class MutableSegmentImpl implements MutableSegment {
private final PinotDataBufferMemoryManager _memoryManager;
private final RealtimeSegmentStatsHistory _statsHistory;
private final SegmentPartitionConfig _segmentPartitionConfig;
private final boolean _nullHandlingEnabled;

private final Map<String, BaseMutableDictionary> _dictionaryMap = new HashMap<>();
private final Map<String, DataFileReader> _indexReaderWriterMap = new HashMap<>();
private final Map<String, Integer> _maxNumValuesMap = new HashMap<>();
private final Map<String, RealtimeInvertedIndexReader> _invertedIndexMap = new HashMap<>();
private final Map<String, BloomFilterReader> _bloomFilterMap = new HashMap<>();
private final Map<String, RealtimeNullValueVectorReaderWriter> _nullValueVectorMap = new HashMap<>();
private final IdMap<FixedIntArray> _recordIdMap;
private boolean _aggregateMetrics;

Expand Down Expand Up @@ -142,6 +146,7 @@ public long getLatestIngestionTimestamp() {
_memoryManager = config.getMemoryManager();
_statsHistory = config.getStatsHistory();
_segmentPartitionConfig = config.getSegmentPartitionConfig();
_nullHandlingEnabled = config.isNullHandlingEnabled();

Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
Expand Down Expand Up @@ -225,6 +230,10 @@ public long getLatestIngestionTimestamp() {
if (invertedIndexColumns.contains(column)) {
_invertedIndexMap.put(column, new RealtimeInvertedIndexReader());
}

if (_nullHandlingEnabled) {
_nullValueVectorMap.put(column, new RealtimeNullValueVectorReaderWriter());
}
}

// Metric aggregation can be enabled only if config is specified, and all dimensions have dictionary,
Expand Down Expand Up @@ -262,6 +271,9 @@ public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) {
// Add forward and inverted indices for new document.
addForwardIndex(row, docId, dictIdMap);
addInvertedIndex(docId, dictIdMap);
if (_nullHandlingEnabled) {
handleNullValues(row, docId);
}

// Update number of document indexed at last to make the latest record queryable
canTakeMore = _numDocsIndexed++ < _capacity;
Expand Down Expand Up @@ -379,6 +391,22 @@ private void addInvertedIndex(int docId, Map<String, Object> dictIdMap) {
}
}

/**
* Check if the row has any null fields and update the
* column null value vectors accordingly
* @param row specifies row being ingested
* @param docId specified docId for this row
*/
private void handleNullValues(GenericRow row, int docId) {
if (!row.hasNullValues()) {
return;
}

for (String columnName : row.getNullValueFields()) {
_nullValueVectorMap.get(columnName).setNull(docId);
}
}

private boolean aggregateMetrics(GenericRow row, int docId) {
for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) {
String column = metricFieldSpec.getName();
Expand Down Expand Up @@ -453,7 +481,7 @@ public ColumnDataSource getDataSource(String columnName) {
} else {
return new ColumnDataSource(fieldSpec, _numDocsIndexed, _maxNumValuesMap.get(columnName),
_indexReaderWriterMap.get(columnName), _invertedIndexMap.get(columnName), _dictionaryMap.get(columnName),
_bloomFilterMap.get(columnName));
_bloomFilterMap.get(columnName), _nullValueVectorMap.get(columnName));
}
}

Expand All @@ -471,9 +499,18 @@ public List<StarTreeV2> getStarTrees() {
public GenericRow getRecord(int docId, GenericRow reuse) {
for (FieldSpec fieldSpec : _physicalFieldSpecs) {
String column = fieldSpec.getName();
reuse.putField(column, IndexSegmentUtils
Object value = IndexSegmentUtils
.getValue(docId, fieldSpec, _indexReaderWriterMap.get(column), _dictionaryMap.get(column),
_maxNumValuesMap.getOrDefault(column, 0)));
_maxNumValuesMap.getOrDefault(column, 0));
reuse.putValue(column, value);

if (_nullHandlingEnabled) {
NullValueVectorReader reader = _nullValueVectorMap.get(column);
// If column has null value for this docId, set that accordingly in GenericRow
if (reader.isNull(docId)) {
reuse.putDefaultNullValue(column, value);
}
}
}
return reuse;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ public class RealtimeSegmentConverter {
private List<String> noDictionaryColumns;
private StarTreeIndexSpec starTreeIndexSpec;
private List<String> varLengthDictionaryColumns;
private final boolean _nullHandlingEnabled;

public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, String outputPath, Schema schema,
String tableName, String timeColumnName, String segmentName, String sortedColumn,
List<String> invertedIndexColumns, List<String> noDictionaryColumns,
List<String> varLengthDictionaryColumns, StarTreeIndexSpec starTreeIndexSpec) {
List<String> varLengthDictionaryColumns, StarTreeIndexSpec starTreeIndexSpec, boolean nullHandlingEnabled) {
if (new File(outputPath).exists()) {
throw new IllegalAccessError("path already exists:" + outputPath);
}
Expand All @@ -77,12 +78,13 @@ public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, String outpu
this.noDictionaryColumns = noDictionaryColumns;
this.varLengthDictionaryColumns = varLengthDictionaryColumns;
this.starTreeIndexSpec = starTreeIndexSpec;
this._nullHandlingEnabled = nullHandlingEnabled;
}

public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, String outputPath, Schema schema,
String tableName, String timeColumnName, String segmentName, String sortedColumn) {
this(realtimeSegment, outputPath, schema, tableName, timeColumnName, segmentName, sortedColumn, new ArrayList<>(),
new ArrayList<>(), new ArrayList<>(), null/*StarTreeIndexSpec*/);
new ArrayList<>(), new ArrayList<>(), null/*StarTreeIndexSpec*/, false/*nullHandlingEnabled*/);
}

public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverMetrics)
Expand Down Expand Up @@ -135,6 +137,7 @@ public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverM
genConfig.setSegmentName(segmentName);
SegmentPartitionConfig segmentPartitionConfig = realtimeSegmentImpl.getSegmentPartitionConfig();
genConfig.setSegmentPartitionConfig(segmentPartitionConfig);
genConfig.setNullHandlingEnabled(_nullHandlingEnabled);
final SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
RealtimeSegmentSegmentCreationDataSource dataSource =
new RealtimeSegmentSegmentCreationDataSource(realtimeSegmentImpl, reader, dataSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ public class RealtimeSegmentConfig {
private final RealtimeSegmentStatsHistory _statsHistory;
private final SegmentPartitionConfig _segmentPartitionConfig;
private final boolean _aggregateMetrics;
private final boolean _nullHandlingEnabled;

private RealtimeSegmentConfig(String segmentName, String streamName, Schema schema, int capacity,
int avgNumMultiValues, Set<String> noDictionaryColumns, Set<String> varLengthDictionaryColumns,
Set<String> invertedIndexColumns, RealtimeSegmentZKMetadata realtimeSegmentZKMetadata,
boolean offHeap, PinotDataBufferMemoryManager memoryManager,
RealtimeSegmentStatsHistory statsHistory, SegmentPartitionConfig segmentPartitionConfig,
boolean aggregateMetrics) {
boolean aggregateMetrics, boolean nullHandlingEnabled) {
_segmentName = segmentName;
_streamName = streamName;
_schema = schema;
Expand All @@ -61,6 +62,7 @@ private RealtimeSegmentConfig(String segmentName, String streamName, Schema sche
_statsHistory = statsHistory;
_segmentPartitionConfig = segmentPartitionConfig;
_aggregateMetrics = aggregateMetrics;
_nullHandlingEnabled = nullHandlingEnabled;
}

public String getSegmentName() {
Expand Down Expand Up @@ -119,6 +121,10 @@ public boolean aggregateMetrics() {
return _aggregateMetrics;
}

public boolean isNullHandlingEnabled() {
return _nullHandlingEnabled;
}

public static class Builder {
private String _segmentName;
private String _streamName;
Expand All @@ -134,6 +140,7 @@ public static class Builder {
private RealtimeSegmentStatsHistory _statsHistory;
private SegmentPartitionConfig _segmentPartitionConfig;
private boolean _aggregateMetrics = false;
private boolean _nullHandlingEnabled = false;

public Builder() {
}
Expand Down Expand Up @@ -208,11 +215,16 @@ public Builder setAggregateMetrics(boolean aggregateMetrics) {
return this;
}

public Builder setNullHandlingEnabled(boolean nullHandlingEnabled) {
_nullHandlingEnabled = nullHandlingEnabled;
return this;
}

public RealtimeSegmentConfig build() {
return new RealtimeSegmentConfig(_segmentName, _streamName, _schema, _capacity, _avgNumMultiValues,
_noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns,
_realtimeSegmentZKMetadata, _offHeap, _memoryManager,
_statsHistory, _segmentPartitionConfig, _aggregateMetrics);
_statsHistory, _segmentPartitionConfig, _aggregateMetrics, _nullHandlingEnabled);
}
}
}
Loading

0 comments on commit 2c044d7

Please sign in to comment.