Skip to content

Commit

Permalink
Real-time json index
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Jan 7, 2021
1 parent 7e0398b commit 4bcdbce
Show file tree
Hide file tree
Showing 17 changed files with 960 additions and 341 deletions.
Expand Up @@ -40,6 +40,7 @@
import org.apache.pinot.parsers.AbstractCompiler;
import org.apache.pinot.parsers.utils.BrokerRequestComparisonUtils;
import org.apache.pinot.pql.parsers.pql2.ast.AstNode;
import org.apache.pinot.pql.parsers.pql2.ast.IdentifierAstNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -136,17 +137,23 @@ public BrokerRequest compileToBrokerRequest(String expression)
}

public AstNode parseToAstNode(String expression) {
CharStream charStream = new ANTLRInputStream(expression);
PQL2Lexer lexer = new PQL2Lexer(charStream);
lexer.setTokenFactory(new CommonTokenFactory(true));
TokenStream tokenStream = new UnbufferedTokenStream<CommonToken>(lexer);
PQL2Parser parser = new PQL2Parser(tokenStream);
parser.setErrorHandler(new BailErrorStrategy());
try {
CharStream charStream = new ANTLRInputStream(expression);
PQL2Lexer lexer = new PQL2Lexer(charStream);
lexer.setTokenFactory(new CommonTokenFactory(true));
TokenStream tokenStream = new UnbufferedTokenStream<CommonToken>(lexer);
PQL2Parser parser = new PQL2Parser(tokenStream);
parser.setErrorHandler(new BailErrorStrategy());
parser.removeErrorListeners();

// Parse
Pql2AstListener listener = new Pql2AstListener(expression);
new ParseTreeWalker().walk(listener, parser.expression());
return listener.getRootNode();
// Parse
Pql2AstListener listener = new Pql2AstListener(expression);
new ParseTreeWalker().walk(listener, parser.expression());
return listener.getRootNode();
} catch (Exception e) {
// NOTE: Treat reserved keys as identifiers. E.g. '*', 'group', 'order', etc.
return new IdentifierAstNode(expression);
}
}

public TransformExpressionTree compileToExpressionTree(String expression) {
Expand Down
Expand Up @@ -143,7 +143,7 @@ public boolean isFinal() {
}
}

private static int MINIMUM_CONSUME_TIME_MINUTES = 10;
private static final int MINIMUM_CONSUME_TIME_MINUTES = 10;

@VisibleForTesting
public class SegmentBuildDescriptor {
Expand Down Expand Up @@ -233,8 +233,8 @@ public void deleteSegmentFile() {
private final SegmentVersion _segmentVersion;
private final SegmentBuildTimeLeaseExtender _leaseExtender;
private SegmentBuildDescriptor _segmentBuildDescriptor;
private StreamConsumerFactory _streamConsumerFactory;
private StreamPartitionMsgOffsetFactory _streamPartitionMsgOffsetFactory;
private final StreamConsumerFactory _streamConsumerFactory;
private final StreamPartitionMsgOffsetFactory _streamPartitionMsgOffsetFactory;

// Segment end criteria
private volatile long _consumeEndTime = 0;
Expand All @@ -261,10 +261,10 @@ public void deleteSegmentFile() {
private final List<String> _noDictionaryColumns;
private final List<String> _varLengthDictionaryColumns;
private final String _sortedColumn;
private Logger segmentLogger;
private final Logger segmentLogger;
private final String _tableStreamName;
private final PinotDataBufferMemoryManager _memoryManager;
private AtomicLong _lastUpdatedRowsIndexed = new AtomicLong(0);
private final AtomicLong _lastUpdatedRowsIndexed = new AtomicLong(0);
private final String _instanceId;
private final ServerSegmentCompletionProtocolHandler _protocolHandler;
private final long _consumeStartTime;
Expand Down Expand Up @@ -1199,7 +1199,7 @@ public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata,
.setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
.setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
.setInvertedIndexColumns(invertedIndexColumns).setTextIndexColumns(textIndexColumns)
.setFSTIndexColumns(fstIndexColumns)
.setFSTIndexColumns(fstIndexColumns).setJsonIndexColumns(indexLoadingConfig.getJsonIndexColumns())
.setRealtimeSegmentZKMetadata(segmentZKMetadata).setOffHeap(_isOffHeap).setMemoryManager(_memoryManager)
.setStatsHistory(realtimeTableDataManager.getStatsHistory())
.setAggregateMetrics(indexingConfig.isAggregateMetrics()).setNullHandlingEnabled(_nullHandlingEnabled)
Expand Down
Expand Up @@ -18,10 +18,9 @@
*/
package org.apache.pinot.core.indexsegment.mutable;

import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.core.upsert.TableUpsertMetadataManager;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.RowMetadata;

Expand All @@ -35,7 +34,8 @@ public interface MutableSegment extends IndexSegment {
* @param rowMetadata the metadata associated with the message
* @return Whether the segment is full (i.e. cannot index more record into it)
*/
boolean index(GenericRow row, @Nullable RowMetadata rowMetadata);
boolean index(GenericRow row, @Nullable RowMetadata rowMetadata)
throws IOException;

/**
* Returns the number of records already indexed into the segment.
Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.apache.pinot.core.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState;
import org.apache.pinot.core.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders;
import org.apache.pinot.core.realtime.impl.invertedindex.RealtimeLuceneTextIndexReader;
import org.apache.pinot.core.realtime.impl.json.MutableJsonIndex;
import org.apache.pinot.core.realtime.impl.nullvalue.MutableNullValueVector;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.apache.pinot.core.segment.index.datasource.ImmutableDataSource;
Expand All @@ -59,7 +60,6 @@
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
import org.apache.pinot.core.segment.index.readers.JsonIndexReader;
import org.apache.pinot.core.segment.index.readers.MutableDictionary;
import org.apache.pinot.core.segment.index.readers.MutableForwardIndex;
import org.apache.pinot.core.segment.index.readers.ValidDocIndexReader;
Expand Down Expand Up @@ -222,6 +222,7 @@ public long getLatestIngestionTimestamp() {
Set<String> invertedIndexColumns = config.getInvertedIndexColumns();
Set<String> textIndexColumns = config.getTextIndexColumns();
Set<String> fstIndexColumns = config.getFSTIndexColumns();
Set<String> jsonIndexColumns = config.getJsonIndexColumns();

int avgNumMultiValues = config.getAvgNumMultiValues();

Expand Down Expand Up @@ -324,13 +325,17 @@ public long getLatestIngestionTimestamp() {
textIndex = null;
}

// Json index
MutableJsonIndex jsonIndex = jsonIndexColumns.contains(column) ? new MutableJsonIndex() : null;

// Null value vector
MutableNullValueVector nullValueVector = _nullHandlingEnabled ? new MutableNullValueVector() : null;

// TODO: Support range index, json index and bloom filter for mutable segment
// TODO: Support range index and bloom filter for mutable segment
_indexContainerMap.put(column,
new IndexContainer(fieldSpec, partitionFunction, partitions, new NumValuesInfo(), forwardIndex, dictionary,
invertedIndexReader, null, textIndex, fstIndexColumns.contains(column), null, null, nullValueVector));
invertedIndexReader, null, textIndex, fstIndexColumns.contains(column), jsonIndex, null,
nullValueVector));
}

if (_realtimeLuceneReaders != null) {
Expand Down Expand Up @@ -442,7 +447,8 @@ public void addExtraColumns(Schema newSchema) {
// NOTE: Okay for single-writer
@SuppressWarnings("NonAtomicOperationOnVolatileField")
@Override
public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) {
public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata)
throws IOException {
// Update dictionary first
updateDictionary(row);

Expand Down Expand Up @@ -509,7 +515,8 @@ private void updateDictionary(GenericRow row) {
}
}

private void addNewRow(GenericRow row) {
private void addNewRow(GenericRow row)
throws IOException {
int docId = _numDocsIndexed;
for (Map.Entry<String, IndexContainer> entry : _indexContainerMap.entrySet()) {
String column = entry.getKey();
Expand Down Expand Up @@ -604,6 +611,12 @@ private void addNewRow(GenericRow row) {
if (textIndex != null) {
textIndex.add((String) value);
}

// Update json index
MutableJsonIndex jsonIndex = indexContainer._jsonIndex;
if (jsonIndex != null) {
jsonIndex.add((String) value);
}
} else {
// Multi-value column (always dictionary-encoded)

Expand Down Expand Up @@ -1039,7 +1052,7 @@ private class IndexContainer implements Closeable {
final InvertedIndexReader _rangeIndex;
final RealtimeLuceneTextIndexReader _textIndex;
final boolean _enableFST;
final JsonIndexReader _jsonIndex;
final MutableJsonIndex _jsonIndex;
final BloomFilterReader _bloomFilter;
final MutableNullValueVector _nullValueVector;

Expand All @@ -1054,7 +1067,7 @@ private class IndexContainer implements Closeable {
@Nullable Set<Integer> partitions, NumValuesInfo numValuesInfo, MutableForwardIndex forwardIndex,
@Nullable MutableDictionary dictionary, @Nullable RealtimeInvertedIndexReader invertedIndex,
@Nullable InvertedIndexReader rangeIndex, @Nullable RealtimeLuceneTextIndexReader textIndex, boolean enableFST,
@Nullable JsonIndexReader jsonIndex, @Nullable BloomFilterReader bloomFilter,
@Nullable MutableJsonIndex jsonIndex, @Nullable BloomFilterReader bloomFilter,
@Nullable MutableNullValueVector nullValueVector) {
_fieldSpec = fieldSpec;
_partitionFunction = partitionFunction;
Expand Down
Expand Up @@ -41,6 +41,7 @@ public class RealtimeSegmentConfig {
private final Set<String> _invertedIndexColumns;
private final Set<String> _textIndexColumns;
private final Set<String> _fstIndexColumns;
private final Set<String> _jsonIndexColumns;
private final RealtimeSegmentZKMetadata _realtimeSegmentZKMetadata;
private final boolean _offHeap;
private final PinotDataBufferMemoryManager _memoryManager;
Expand All @@ -55,13 +56,14 @@ public class RealtimeSegmentConfig {
private final String _consumerDir;

// TODO: Clean up this constructor. Most of these things can be extracted from tableConfig.
private RealtimeSegmentConfig(String tableNameWithType, String segmentName, String streamName, Schema schema, String timeColumnName,
int capacity, int avgNumMultiValues, Set<String> noDictionaryColumns, Set<String> varLengthDictionaryColumns,
Set<String> invertedIndexColumns, Set<String> textIndexColumns, Set<String> fstIndexColumns,
RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, boolean offHeap, PinotDataBufferMemoryManager memoryManager,
RealtimeSegmentStatsHistory statsHistory, String partitionColumn, PartitionFunction partitionFunction,
int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled, String consumerDir,
UpsertConfig.Mode upsertMode, PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
private RealtimeSegmentConfig(String tableNameWithType, String segmentName, String streamName, Schema schema,
String timeColumnName, int capacity, int avgNumMultiValues, Set<String> noDictionaryColumns,
Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns, Set<String> textIndexColumns,
Set<String> fstIndexColumns, Set<String> jsonIndexColumns, RealtimeSegmentZKMetadata realtimeSegmentZKMetadata,
boolean offHeap, PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory statsHistory,
String partitionColumn, PartitionFunction partitionFunction, int partitionId, boolean aggregateMetrics,
boolean nullHandlingEnabled, String consumerDir, UpsertConfig.Mode upsertMode,
PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
_tableNameWithType = tableNameWithType;
_segmentName = segmentName;
_streamName = streamName;
Expand All @@ -74,6 +76,7 @@ private RealtimeSegmentConfig(String tableNameWithType, String segmentName, Stri
_invertedIndexColumns = invertedIndexColumns;
_textIndexColumns = textIndexColumns;
_fstIndexColumns = fstIndexColumns;
_jsonIndexColumns = jsonIndexColumns;
_realtimeSegmentZKMetadata = realtimeSegmentZKMetadata;
_offHeap = offHeap;
_memoryManager = memoryManager;
Expand Down Expand Up @@ -141,6 +144,10 @@ public Set<String> getFSTIndexColumns() {
return _fstIndexColumns;
}

public Set<String> getJsonIndexColumns() {
return _jsonIndexColumns;
}

public RealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata() {
return _realtimeSegmentZKMetadata;
}
Expand Down Expand Up @@ -202,6 +209,7 @@ public static class Builder {
private Set<String> _invertedIndexColumns;
private Set<String> _textIndexColumns = new HashSet<>();
private Set<String> _fstIndexColumns = new HashSet<>();
private Set<String> _jsonIndexColumns = new HashSet<>();
private RealtimeSegmentZKMetadata _realtimeSegmentZKMetadata;
private boolean _offHeap;
private PinotDataBufferMemoryManager _memoryManager;
Expand Down Expand Up @@ -286,6 +294,11 @@ public Builder setFSTIndexColumns(Set<String> fstIndexColumns) {
return this;
}

public Builder setJsonIndexColumns(Set<String> jsonIndexColumns) {
_jsonIndexColumns = jsonIndexColumns;
return this;
}

public Builder setRealtimeSegmentZKMetadata(RealtimeSegmentZKMetadata realtimeSegmentZKMetadata) {
_realtimeSegmentZKMetadata = realtimeSegmentZKMetadata;
return this;
Expand Down Expand Up @@ -349,9 +362,9 @@ public Builder setPartitionUpsertMetadataManager(PartitionUpsertMetadataManager
public RealtimeSegmentConfig build() {
return new RealtimeSegmentConfig(_tableNameWithType, _segmentName, _streamName, _schema, _timeColumnName,
_capacity, _avgNumMultiValues, _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns,
_textIndexColumns, _fstIndexColumns, _realtimeSegmentZKMetadata, _offHeap, _memoryManager,
_statsHistory, _partitionColumn, _partitionFunction, _partitionId, _aggregateMetrics,
_nullHandlingEnabled, _consumerDir, _upsertMode, _partitionUpsertMetadataManager);
_textIndexColumns, _fstIndexColumns, _jsonIndexColumns, _realtimeSegmentZKMetadata, _offHeap, _memoryManager,
_statsHistory, _partitionColumn, _partitionFunction, _partitionId, _aggregateMetrics, _nullHandlingEnabled,
_consumerDir, _upsertMode, _partitionUpsertMetadataManager);
}
}
}

0 comments on commit 4bcdbce

Please sign in to comment.