Skip to content

Commit

Permalink
Two changes: (#5470)
Browse files Browse the repository at this point in the history
(1) PR #5256
added support for deriving num docs per chunk for var byte
raw index create from column length. This was specifically
done as part of supporting text blobs. For use cases that
don't want this feature and are high QPS, see a negative
impact since size of chunk increases (earlier value
of numDocsPerChunk was hardcoded to 1000) and based on the
access pattern we might end up uncompressing a bigger chunk to get values
for a set of docIds. We have made this change configurable.
So the default behaviour is same as old (1000 docs per chunk)

(2) PR #4791
added support for noDict for STRING/BYTES in consuming segments.
There is a particular impact of this change on the use cases
that have set noDict on their STRING dimension columns for other performance
reasons and also want metricsAggregation. These use cases don't get to
aggregateMetrics because the new implementation was able to honor their
table config setting of noDict on STRING/BYTES. Without metrics aggregation,
memory pressure increases. So to continue aggregating metrics for such cases,
we will create dictionary even if the column is part of noDictionary set
from table config.

Co-authored-by: Siddharth Teotia <steotia@steotia-mn1.linkedin.biz>
  • Loading branch information
siddharthteotia and Siddharth Teotia committed May 31, 2020
1 parent de97edc commit ee21e79
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 8 deletions.
Expand Up @@ -102,6 +102,9 @@ public enum TimeColumnType {
private boolean _skipTimeValueCheck = false;
private boolean _nullHandlingEnabled = false;

// constructed from FieldConfig
private Map<String, Map<String, String>> _columnProperties = new HashMap<>();

@Deprecated
public SegmentGeneratorConfig() {
}
Expand Down Expand Up @@ -174,12 +177,24 @@ public SegmentGeneratorConfig(TableConfig tableConfig, Schema schema) {
_invertedIndexCreationColumns = indexingConfig.getInvertedIndexColumns();
}

List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
if (fieldConfigList != null) {
for (FieldConfig fieldConfig : fieldConfigList) {
_columnProperties.put(fieldConfig.getName(), fieldConfig.getProperties());
}
}

extractTextIndexColumnsFromTableConfig(tableConfig);

_nullHandlingEnabled = indexingConfig.isNullHandlingEnabled();
}
}

@Nonnull
public Map<String, Map<String, String>> getColumnProperties() {
return _columnProperties;
}

/**
* Set time column details using the given time column
*/
Expand Down
Expand Up @@ -219,7 +219,7 @@ public long getLatestIngestionTimestamp() {
FieldSpec.DataType dataType = fieldSpec.getDataType();
boolean isFixedWidthColumn = dataType.isFixedWidth();
int forwardIndexColumnSize = -1;
if (isNoDictionaryColumn(noDictionaryColumns, invertedIndexColumns, textIndexColumns, fieldSpec, column)) {
if (isNoDictionaryColumn(noDictionaryColumns, invertedIndexColumns, fieldSpec, column)) {
// no dictionary
// each forward index entry will be equal to size of data for that row
// For INT, LONG, FLOAT, DOUBLE it is equal to the number of fixed bytes used to store the value,
Expand Down Expand Up @@ -329,9 +329,30 @@ public long getLatestIngestionTimestamp() {
* @return true if column is no-dictionary, false if dictionary encoded
*/
private boolean isNoDictionaryColumn(Set<String> noDictionaryColumns, Set<String> invertedIndexColumns,
Set<String> textIndexColumns, FieldSpec fieldSpec, String column) {
return textIndexColumns.contains(column) || (noDictionaryColumns.contains(column) && fieldSpec.isSingleValueField()
&& !invertedIndexColumns.contains(column));
FieldSpec fieldSpec, String column) {
FieldSpec.DataType dataType = fieldSpec.getDataType();
if (noDictionaryColumns.contains(column)) {
// Earlier we didn't support noDict in consuming segments for STRING and BYTES columns.
// So even if the user had the column in noDictionaryColumns set in table config, we still
// created dictionary in consuming segments.
// Later on we added this support. There is a particular impact of this change on the use cases
// that have set noDict on their STRING dimension columns for other performance
// reasons and also want metricsAggregation. These use cases don't get to
// aggregateMetrics because the new implementation is able to honor their table config setting
// of noDict on STRING/BYTES. Without metrics aggregation, memory pressure increases.
// So to continue aggregating metrics for such cases, we will create dictionary even
// if the column is part of noDictionary set from table config
if (fieldSpec instanceof DimensionFieldSpec && _aggregateMetrics && (dataType == FieldSpec.DataType.STRING ||
dataType == FieldSpec.DataType.BYTES)) {
_logger.info("Not creating dictionary in consuming segment for column {} of type {}", column, dataType.toString());
return false;
}
// So don't create dictionary if the column is member of noDictionary, is single-value
// and doesn't have an inverted index
return fieldSpec.isSingleValueField() && !invertedIndexColumns.contains(column);
}
// column is not a part of noDictionary set, so create dictionary
return false;
}

public SegmentPartitionConfig getSegmentPartitionConfig() {
Expand Down
Expand Up @@ -53,6 +53,7 @@
import org.apache.pinot.core.segment.creator.impl.inv.OnHeapBitmapInvertedIndexCreator;
import org.apache.pinot.core.segment.creator.impl.inv.text.LuceneTextIndexCreator;
import org.apache.pinot.core.segment.creator.impl.nullvalue.NullValueVectorCreator;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.FieldType;
Expand Down Expand Up @@ -193,9 +194,10 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio
getColumnCompressionType(segmentCreationSpec, fieldSpec);

// Initialize forward index creator
boolean deriveNumDocsPerChunk = shouldDeriveNumDocsPerChunk(columnName, segmentCreationSpec.getColumnProperties());
_forwardIndexCreatorMap.put(columnName,
getRawIndexCreatorForColumn(_indexDir, compressionType, columnName, fieldSpec.getDataType(), totalDocs,
indexCreationInfo.getLengthOfLongestEntry()));
indexCreationInfo.getLengthOfLongestEntry(), deriveNumDocsPerChunk));

// Initialize text index creator
if (_textIndexColumns.contains(columnName)) {
Expand All @@ -213,6 +215,14 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio
}
}

public static boolean shouldDeriveNumDocsPerChunk(String columnName, Map<String, Map<String, String>> columnProperties) {
if (columnProperties != null) {
Map<String, String> properties = columnProperties.get(columnName);
return properties != null && Boolean.parseBoolean(properties.get(FieldConfig.DERIVE_NUM_DOCS_PER_CHUNK_RAW_INDEX_KEY));
}
return false;
}

/**
* Helper method that returns compression type to use based on segment creation spec and field type.
* <ul>
Expand Down Expand Up @@ -539,6 +549,13 @@ public static SingleValueRawIndexCreator getRawIndexCreatorForColumn(File file,
ChunkCompressorFactory.CompressionType compressionType, String column, FieldSpec.DataType dataType, int totalDocs,
int lengthOfLongestEntry)
throws IOException {
return getRawIndexCreatorForColumn(file, compressionType, column, dataType, totalDocs, lengthOfLongestEntry, false);
}

public static SingleValueRawIndexCreator getRawIndexCreatorForColumn(File file,
ChunkCompressorFactory.CompressionType compressionType, String column, FieldSpec.DataType dataType, int totalDocs,
int lengthOfLongestEntry, boolean deriveNumDocsPerChunk)
throws IOException {

SingleValueRawIndexCreator indexCreator;
switch (dataType) {
Expand All @@ -561,7 +578,8 @@ public static SingleValueRawIndexCreator getRawIndexCreatorForColumn(File file,
case STRING:
case BYTES:
indexCreator =
new SingleValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, lengthOfLongestEntry);
new SingleValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, lengthOfLongestEntry,
deriveNumDocsPerChunk);
break;

default:
Expand Down
Expand Up @@ -28,15 +28,23 @@


public class SingleValueVarByteRawIndexCreator extends BaseSingleValueRawIndexCreator {
private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;

private final VarByteChunkSingleValueWriter _indexWriter;

public SingleValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressorFactory.CompressionType compressionType,
String column, int totalDocs, int maxLength)
throws IOException {
this(baseIndexDir, compressionType, column, totalDocs, maxLength, false);
}

public SingleValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressorFactory.CompressionType compressionType,
String column, int totalDocs, int maxLength, boolean deriveNumDocsPerChunk)
throws IOException {
File file = new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
_indexWriter = new VarByteChunkSingleValueWriter(file, compressionType, totalDocs, getNumDocsPerChunk(maxLength), maxLength);
int numDocsPerChunk = deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
_indexWriter = new VarByteChunkSingleValueWriter(file, compressionType, totalDocs, numDocsPerChunk, maxLength);
}

@VisibleForTesting
Expand Down
Expand Up @@ -362,9 +362,10 @@ void createV1ForwardIndexForTextIndex(String column, IndexLoadingConfig indexLoa
int lengthOfLongestEntry = StringUtil.encodeUtf8(stringDefaultValue).length;
int dictionaryElementSize = 0;

boolean deriveNumDocsPerChunk = SegmentColumnarIndexCreator.shouldDeriveNumDocsPerChunk(column, indexLoadingConfig.getColumnProperties());
SingleValueVarByteRawIndexCreator rawIndexCreator =
new SingleValueVarByteRawIndexCreator(_indexDir, ChunkCompressorFactory.CompressionType.SNAPPY, column,
totalDocs, lengthOfLongestEntry);
totalDocs, lengthOfLongestEntry, deriveNumDocsPerChunk);

for (int docId = 0; docId < totalDocs; docId++) {
rawIndexCreator.index(docId, defaultValue);
Expand Down
Expand Up @@ -36,6 +36,7 @@ public class FieldConfig extends BaseJsonConfig {
public static String BLOOM_FILTER_COLUMN_KEY = "bloom.filter";
public static String ON_HEAP_DICTIONARY_COLUMN_KEY = "onheap.dictionary";
public static String VAR_LENGTH_DICTIONARY_COLUMN_KEY = "var.length.dictionary";
public static String DERIVE_NUM_DOCS_PER_CHUNK_RAW_INDEX_KEY = "derive.num.docs.per.chunk.raw.index";

public static String TEXT_INDEX_REALTIME_READER_REFRESH_KEY = "text.index.realtime.reader.refresh";
// Lucene creates a query result cache if this option is enabled
Expand Down

0 comments on commit ee21e79

Please sign in to comment.