Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more options to json index #9543

Merged
merged 2 commits into from Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1346,7 +1346,7 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
.setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
.setInvertedIndexColumns(invertedIndexColumns).setTextIndexColumns(indexLoadingConfig.getTextIndexColumns())
.setFSTIndexColumns(indexLoadingConfig.getFSTIndexColumns())
.setJsonIndexColumns(indexLoadingConfig.getJsonIndexColumns())
.setJsonIndexConfigs(indexLoadingConfig.getJsonIndexConfigs())
.setH3IndexConfigs(indexLoadingConfig.getH3IndexConfigs()).setSegmentZKMetadata(segmentZKMetadata)
.setOffHeap(_isOffHeap).setMemoryManager(_memoryManager)
.setStatsHistory(realtimeTableDataManager.getStatsHistory())
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -37,6 +38,7 @@
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.JsonIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
Expand Down Expand Up @@ -436,12 +438,18 @@ private static boolean isIndexedJSONColumn(String columnName, @Nullable TableCon
return false;
}

// Ignore jsonIndexColumns when jsonIndexConfigs is configured
Map<String, JsonIndexConfig> jsonIndexConfigs = indexingConfig.getJsonIndexConfigs();
if (jsonIndexConfigs != null) {
return jsonIndexConfigs.containsKey(columnName);
}

List<String> jsonIndexColumns = indexingConfig.getJsonIndexColumns();
if (jsonIndexColumns == null) {
return false;
if (jsonIndexColumns != null) {
return jsonIndexColumns.contains(columnName);
}

return jsonIndexColumns.contains(columnName);
return false;
}

/** @return symbolic representation of function operator delimited by spaces. */
Expand Down
Expand Up @@ -116,7 +116,7 @@ private IndexSegment createRealtimeSegment(int index)
RealtimeSegmentConfig realtimeSegmentConfig = new RealtimeSegmentConfig.Builder()
.setTableNameWithType(REALTIME_TABLE_NAME).setSegmentName(segmentName).setSchema(SCHEMA).setCapacity(100000)
.setAvgNumMultiValues(2).setNoDictionaryColumns(Collections.emptySet())
.setJsonIndexColumns(Collections.emptySet()).setVarLengthDictionaryColumns(Collections.emptySet())
.setJsonIndexConfigs(Collections.emptyMap()).setVarLengthDictionaryColumns(Collections.emptySet())
.setInvertedIndexColumns(Collections.emptySet()).setSegmentZKMetadata(new SegmentZKMetadata(segmentName))
.setMemoryManager(new DirectMemoryManager(segmentName)).setStatsHistory(statsHistory).setAggregateMetrics(false)
.setNullHandlingEnabled(true).setIngestionAggregationConfigs(Collections.emptyList()).build();
Expand Down
Expand Up @@ -105,7 +105,7 @@ public MutableInvertedIndex newInvertedIndex(MutableIndexContext.Inverted contex

@Override
public MutableJsonIndex newJsonIndex(MutableIndexContext.Json context) {
return new MutableJsonIndexImpl();
return new MutableJsonIndexImpl(context.getJsonIndexConfig());
}

@Override
Expand Down
Expand Up @@ -90,6 +90,7 @@
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.JsonIndexConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
Expand Down Expand Up @@ -244,7 +245,7 @@ public long getLatestIngestionTimestamp() {
Set<String> invertedIndexColumns = config.getInvertedIndexColumns();
Set<String> textIndexColumns = config.getTextIndexColumns();
Set<String> fstIndexColumns = config.getFSTIndexColumns();
Set<String> jsonIndexColumns = config.getJsonIndexColumns();
Map<String, JsonIndexConfig> jsonIndexConfigs = config.getJsonIndexConfigs();
Map<String, H3IndexConfig> h3IndexConfigs = config.getH3IndexConfigs();

int avgNumMultiValues = config.getAvgNumMultiValues();
Expand Down Expand Up @@ -354,8 +355,9 @@ public long getLatestIngestionTimestamp() {
}

// Json index
JsonIndexConfig jsonIndexConfig = jsonIndexConfigs.get(column);
MutableJsonIndex jsonIndex =
jsonIndexColumns.contains(column) ? indexProvider.newJsonIndex(context.forJsonIndex()) : null;
jsonIndexConfig != null ? indexProvider.newJsonIndex(context.forJsonIndex(jsonIndexConfig)) : null;

// H3 index
// TODO consider making this overridable
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.JsonIndexConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
import org.apache.pinot.spi.data.Schema;
Expand All @@ -49,7 +50,7 @@ public class RealtimeSegmentConfig {
private final Set<String> _invertedIndexColumns;
private final Set<String> _textIndexColumns;
private final Set<String> _fstIndexColumns;
private final Set<String> _jsonIndexColumns;
private final Map<String, JsonIndexConfig> _jsonIndexConfigs;
private final Map<String, H3IndexConfig> _h3IndexConfigs;
private final SegmentZKMetadata _segmentZKMetadata;
private final boolean _offHeap;
Expand All @@ -72,11 +73,11 @@ public class RealtimeSegmentConfig {
private RealtimeSegmentConfig(String tableNameWithType, String segmentName, String streamName, Schema schema,
String timeColumnName, int capacity, int avgNumMultiValues, Set<String> noDictionaryColumns,
Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns, Set<String> textIndexColumns,
Set<String> fstIndexColumns, Set<String> jsonIndexColumns, Map<String, H3IndexConfig> h3IndexConfigs,
SegmentZKMetadata segmentZKMetadata, boolean offHeap, PinotDataBufferMemoryManager memoryManager,
RealtimeSegmentStatsHistory statsHistory, String partitionColumn, PartitionFunction partitionFunction,
int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled, String consumerDir,
UpsertConfig.Mode upsertMode, String upsertComparisonColumn,
Set<String> fstIndexColumns, Map<String, JsonIndexConfig> jsonIndexConfigs,
Map<String, H3IndexConfig> h3IndexConfigs, SegmentZKMetadata segmentZKMetadata, boolean offHeap,
PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory statsHistory, String partitionColumn,
PartitionFunction partitionFunction, int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled,
String consumerDir, UpsertConfig.Mode upsertMode, String upsertComparisonColumn,
PartitionUpsertMetadataManager partitionUpsertMetadataManager,
PartitionDedupMetadataManager partitionDedupMetadataManager, List<FieldConfig> fieldConfigList,
List<AggregationConfig> ingestionAggregationConfigs) {
Expand All @@ -92,7 +93,7 @@ private RealtimeSegmentConfig(String tableNameWithType, String segmentName, Stri
_invertedIndexColumns = invertedIndexColumns;
_textIndexColumns = textIndexColumns;
_fstIndexColumns = fstIndexColumns;
_jsonIndexColumns = jsonIndexColumns;
_jsonIndexConfigs = jsonIndexConfigs;
_h3IndexConfigs = h3IndexConfigs;
_segmentZKMetadata = segmentZKMetadata;
_offHeap = offHeap;
Expand Down Expand Up @@ -165,8 +166,8 @@ public Set<String> getFSTIndexColumns() {
return _fstIndexColumns;
}

public Set<String> getJsonIndexColumns() {
return _jsonIndexColumns;
public Map<String, JsonIndexConfig> getJsonIndexConfigs() {
return _jsonIndexConfigs;
}

public Map<String, H3IndexConfig> getH3IndexConfigs() {
Expand Down Expand Up @@ -254,7 +255,7 @@ public static class Builder {
private Set<String> _invertedIndexColumns;
private Set<String> _textIndexColumns = new HashSet<>();
private Set<String> _fstIndexColumns = new HashSet<>();
private Set<String> _jsonIndexColumns = new HashSet<>();
private Map<String, JsonIndexConfig> _jsonIndexConfigs = new HashMap<>();
private Map<String, H3IndexConfig> _h3IndexConfigs = new HashMap<>();
private SegmentZKMetadata _segmentZKMetadata;
private boolean _offHeap;
Expand Down Expand Up @@ -344,8 +345,8 @@ public Builder setFSTIndexColumns(Set<String> fstIndexColumns) {
return this;
}

public Builder setJsonIndexColumns(Set<String> jsonIndexColumns) {
_jsonIndexColumns = jsonIndexColumns;
public Builder setJsonIndexConfigs(Map<String, JsonIndexConfig> jsonIndexConfigs) {
_jsonIndexConfigs = jsonIndexConfigs;
return this;
}

Expand Down Expand Up @@ -437,11 +438,10 @@ public Builder setIngestionAggregationConfigs(List<AggregationConfig> ingestionA
public RealtimeSegmentConfig build() {
return new RealtimeSegmentConfig(_tableNameWithType, _segmentName, _streamName, _schema, _timeColumnName,
_capacity, _avgNumMultiValues, _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns,
_textIndexColumns, _fstIndexColumns, _jsonIndexColumns, _h3IndexConfigs, _segmentZKMetadata, _offHeap,
_textIndexColumns, _fstIndexColumns, _jsonIndexConfigs, _h3IndexConfigs, _segmentZKMetadata, _offHeap,
_memoryManager, _statsHistory, _partitionColumn, _partitionFunction, _partitionId, _aggregateMetrics,
_nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumn,
_partitionUpsertMetadataManager, _partitionDedupMetadataManager, _fieldConfigList,
_ingestionAggregationConfigs);
_nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumn, _partitionUpsertMetadataManager,
_partitionDedupMetadataManager, _fieldConfigList, _ingestionAggregationConfigs);
}
}
}
Expand Up @@ -37,6 +37,7 @@
import org.apache.pinot.segment.local.segment.creator.impl.inv.json.BaseJsonIndexCreator;
import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
import org.apache.pinot.segment.spi.index.mutable.MutableJsonIndex;
import org.apache.pinot.spi.config.table.JsonIndexConfig;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
Expand All @@ -49,6 +50,7 @@
* Json index for mutable segment.
*/
public class MutableJsonIndexImpl implements MutableJsonIndex {
private final JsonIndexConfig _jsonIndexConfig;
private final Map<String, RoaringBitmap> _postingListMap;
private final IntList _docIdMapping;
private final ReentrantReadWriteLock.ReadLock _readLock;
Expand All @@ -57,7 +59,8 @@ public class MutableJsonIndexImpl implements MutableJsonIndex {
private int _nextDocId;
private int _nextFlattenedDocId;

public MutableJsonIndexImpl() {
public MutableJsonIndexImpl(JsonIndexConfig jsonIndexConfig) {
_jsonIndexConfig = jsonIndexConfig;
_postingListMap = new HashMap<>();
_docIdMapping = new IntArrayList();

Expand All @@ -73,7 +76,8 @@ public MutableJsonIndexImpl() {
public void add(String jsonString)
throws IOException {
try {
List<Map<String, String>> flattenedRecords = JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString));
List<Map<String, String>> flattenedRecords =
JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString), _jsonIndexConfig);
_writeLock.lock();
try {
addFlattenedRecords(flattenedRecords);
Expand All @@ -90,8 +94,8 @@ public void add(String jsonString)
*/
private void addFlattenedRecords(List<Map<String, String>> records) {
int numRecords = records.size();
Preconditions
.checkState(_nextFlattenedDocId + numRecords >= 0, "Got more than %s flattened records", Integer.MAX_VALUE);
Preconditions.checkState(_nextFlattenedDocId + numRecords >= 0, "Got more than %s flattened records",
Integer.MAX_VALUE);
for (int i = 0; i < numRecords; i++) {
_docIdMapping.add(_nextDocId);
}
Expand Down Expand Up @@ -125,15 +129,15 @@ public MutableRoaringBitmap getMatchingDocIds(String filterString) {
// order to get the correct result, and it cannot be nested
RoaringBitmap matchingFlattenedDocIds = getMatchingFlattenedDocIds(filter.getPredicate());
MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
matchingFlattenedDocIds
.forEach((IntConsumer) flattenedDocId -> matchingDocIds.add(_docIdMapping.getInt(flattenedDocId)));
matchingFlattenedDocIds.forEach(
(IntConsumer) flattenedDocId -> matchingDocIds.add(_docIdMapping.getInt(flattenedDocId)));
matchingDocIds.flip(0, (long) _nextDocId);
return matchingDocIds;
} else {
RoaringBitmap matchingFlattenedDocIds = getMatchingFlattenedDocIds(filter);
MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
matchingFlattenedDocIds
.forEach((IntConsumer) flattenedDocId -> matchingDocIds.add(_docIdMapping.getInt(flattenedDocId)));
matchingFlattenedDocIds.forEach(
(IntConsumer) flattenedDocId -> matchingDocIds.add(_docIdMapping.getInt(flattenedDocId)));
return matchingDocIds;
}
} finally {
Expand Down Expand Up @@ -174,8 +178,8 @@ private RoaringBitmap getMatchingFlattenedDocIds(FilterContext filter) {
}
case PREDICATE: {
Predicate predicate = filter.getPredicate();
Preconditions
.checkArgument(!isExclusive(predicate.getType()), "Exclusive predicate: %s cannot be nested", predicate);
Preconditions.checkArgument(!isExclusive(predicate.getType()), "Exclusive predicate: %s cannot be nested",
predicate);
return getMatchingFlattenedDocIds(predicate);
}
default:
Expand All @@ -192,8 +196,7 @@ private RoaringBitmap getMatchingFlattenedDocIds(Predicate predicate) {
ExpressionContext lhs = predicate.getLhs();
Preconditions.checkArgument(lhs.getType() == ExpressionContext.Type.IDENTIFIER,
"Left-hand side of the predicate must be an identifier, got: %s (%s). Put double quotes around the identifier"
+ " if needed.",
lhs, lhs.getType());
+ " if needed.", lhs, lhs.getType());
String key = lhs.getIdentifier();

// Support 2 formats:
Expand Down
Expand Up @@ -122,8 +122,10 @@ public JsonIndexCreator newJsonIndexCreator(IndexCreationContext.Json context)
"Json index is currently only supported on single-value columns");
Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.STRING,
"Json index is currently only supported on STRING columns");
return context.isOnHeap() ? new OnHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName())
: new OffHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName());
return context.isOnHeap() ? new OnHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
context.getJsonIndexConfig())
: new OffHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
context.getJsonIndexConfig());
}

@Override
Expand Down
Expand Up @@ -60,6 +60,7 @@
import org.apache.pinot.spi.config.table.FSTType;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.JsonIndexConfig;
import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
Expand Down Expand Up @@ -167,11 +168,10 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio
fstIndexColumns.add(columnName);
}

Set<String> jsonIndexColumns = new HashSet<>();
for (String columnName : _config.getJsonIndexCreationColumns()) {
Map<String, JsonIndexConfig> jsonIndexConfigs = _config.getJsonIndexConfigs();
for (String columnName : jsonIndexConfigs.keySet()) {
Preconditions.checkState(schema.hasColumn(columnName),
"Cannot create text index for column: %s because it is not in schema", columnName);
jsonIndexColumns.add(columnName);
"Cannot create json index for column: %s because it is not in schema", columnName);
}

Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs();
Expand Down Expand Up @@ -278,8 +278,10 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio
(String[]) columnIndexCreationInfo.getSortedUniqueElementsArray())));
}

if (jsonIndexColumns.contains(columnName)) {
_jsonIndexCreatorMap.put(columnName, _indexCreatorProvider.newJsonIndexCreator(context.forJsonIndex()));
JsonIndexConfig jsonIndexConfig = jsonIndexConfigs.get(columnName);
if (jsonIndexConfig != null) {
_jsonIndexCreatorMap.put(columnName,
_indexCreatorProvider.newJsonIndexCreator(context.forJsonIndex(jsonIndexConfig)));
}

H3IndexConfig h3IndexConfig = h3IndexConfigs.get(columnName);
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
import org.apache.pinot.segment.spi.memory.CleanerUtil;
import org.apache.pinot.spi.config.table.JsonIndexConfig;
import org.apache.pinot.spi.utils.JsonUtils;
import org.roaringbitmap.Container;
import org.roaringbitmap.RoaringBitmap;
Expand Down Expand Up @@ -61,20 +62,21 @@ public abstract class BaseJsonIndexCreator implements JsonIndexCreator {
static final String DICTIONARY_FILE_NAME = "dictionary.buf";
static final String INVERTED_INDEX_FILE_NAME = "inverted.index.buf";

final JsonIndexConfig _jsonIndexConfig;
final File _indexFile;
final File _tempDir;
final File _dictionaryFile;
final File _invertedIndexFile;
final IntList _numFlattenedRecordsList = new IntArrayList();
final Map<String, RoaringBitmapWriter<RoaringBitmap>> _postingListMap = new TreeMap<>();
final RoaringBitmapWriter.Wizard<Container, RoaringBitmap> _bitmapWriterWizard =
RoaringBitmapWriter.writer();
final RoaringBitmapWriter.Wizard<Container, RoaringBitmap> _bitmapWriterWizard = RoaringBitmapWriter.writer();

int _nextFlattenedDocId;
int _maxValueLength;

BaseJsonIndexCreator(File indexDir, String columnName)
BaseJsonIndexCreator(File indexDir, String columnName, JsonIndexConfig jsonIndexConfig)
throws IOException {
_jsonIndexConfig = jsonIndexConfig;
_indexFile = new File(indexDir, columnName + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
_tempDir = new File(indexDir, columnName + TEMP_DIR_SUFFIX);
if (_tempDir.exists()) {
Expand All @@ -89,7 +91,7 @@ public abstract class BaseJsonIndexCreator implements JsonIndexCreator {
@Override
public void add(String jsonString)
throws IOException {
addFlattenedRecords(JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString)));
addFlattenedRecords(JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString), _jsonIndexConfig));
}

/**
Expand All @@ -98,8 +100,8 @@ public void add(String jsonString)
void addFlattenedRecords(List<Map<String, String>> records)
throws IOException {
int numRecords = records.size();
Preconditions
.checkState(_nextFlattenedDocId + numRecords >= 0, "Got more than %s flattened records", Integer.MAX_VALUE);
Preconditions.checkState(_nextFlattenedDocId + numRecords >= 0, "Got more than %s flattened records",
Integer.MAX_VALUE);
_numFlattenedRecordsList.add(numRecords);
for (Map<String, String> record : records) {
for (Map.Entry<String, String> entry : record.entrySet()) {
Expand Down