Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support STRING and BYTES for no dictionary columns in realtime consuming segments #4791

Merged
merged 1 commit into from Jan 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -37,6 +37,7 @@
import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
import org.apache.pinot.core.io.readerwriter.impl.FixedByteSingleColumnMultiValueReaderWriter;
import org.apache.pinot.core.io.readerwriter.impl.FixedByteSingleColumnSingleValueReaderWriter;
import org.apache.pinot.core.io.readerwriter.impl.VarByteSingleColumnSingleValueReaderWriter;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.core.realtime.impl.dictionary.BaseMutableDictionary;
Expand Down Expand Up @@ -76,6 +77,9 @@ public class MutableSegmentImpl implements MutableSegment {
private static final int MIN_ROWS_TO_INDEX = 1000_000; // Min size of recordIdMap for updatable metrics.
private static final int MIN_RECORD_ID_MAP_CACHE_SIZE = 10000; // Min overflow map size for updatable metrics.

private static final int NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT = 100;
private static final int NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT = 100_000;

private final Logger _logger;
private final long _startTimeMillis = System.currentTimeMillis();

Expand Down Expand Up @@ -183,15 +187,22 @@ public long getLatestIngestionTimestamp() {
_maxNumValuesMap.put(column, 0);

// Check whether to generate raw index for the column while consuming
// Only support generating raw index on single-value non-string columns that do not have inverted index while
// Only support generating raw index on single-value columns that do not have inverted index while
// consuming. After consumption completes and the segment is built, all single-value columns can have raw index
FieldSpec.DataType dataType = fieldSpec.getDataType();
int indexColumnSize = FieldSpec.DataType.INT.size();
if (noDictionaryColumns.contains(column) && fieldSpec.isSingleValueField()
&& dataType != FieldSpec.DataType.STRING && !invertedIndexColumns.contains(column)) {
// No dictionary
indexColumnSize = dataType.size();
boolean isFixedWidthColumn = dataType.isFixedWidth();
int forwardIndexColumnSize = -1;
if (isNoDictionaryColumn(noDictionaryColumns, invertedIndexColumns, fieldSpec, column)) {
// no dictionary
// each forward index entry will be equal to size of data for that row
// For INT, LONG, FLOAT, DOUBLE it is equal to the number of fixed bytes used to store the value,
if (isFixedWidthColumn) {
forwardIndexColumnSize = dataType.size();
}
} else {
// dictionary encoded index
// each forward index entry will contain a 4 byte dictionary ID
forwardIndexColumnSize = FieldSpec.DataType.INT.size();
int dictionaryColumnSize;
if (dataType == FieldSpec.DataType.STRING) {
dictionaryColumnSize = _statsHistory.getEstimatedAvgColSize(column);
Expand All @@ -211,19 +222,41 @@ public long getLatestIngestionTimestamp() {
}

DataFileReader indexReaderWriter;
if (fieldSpec.isSingleValueField()) {
String allocationContext =
buildAllocationContext(_segmentName, column, V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
indexReaderWriter = new FixedByteSingleColumnSingleValueReaderWriter(_capacity, indexColumnSize, _memoryManager,
allocationContext);

// create forward index reader/writer
if (forwardIndexColumnSize == -1) {
// for STRING/BYTES SV column, we support raw index in consuming segments
// RealtimeSegmentStatsHistory does not have the stats for no-dictionary columns
// from previous consuming segments
// TODO: Add support for updating RealtimeSegmentStatsHistory with average column value size for no dictionary columns as well
// TODO: Use the stats to get estimated average length
// Use a smaller capacity as opposed to segment flush size
int initialCapacity = Math.min(_capacity, NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT);
String allocationContext = buildAllocationContext(_segmentName, column, V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
indexReaderWriter = new VarByteSingleColumnSingleValueReaderWriter(_memoryManager, allocationContext, initialCapacity, NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT);
} else {
// TODO: Start with a smaller capacity on FixedByteSingleColumnMultiValueReaderWriter and let it expand
String allocationContext =
buildAllocationContext(_segmentName, column, V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
indexReaderWriter =
new FixedByteSingleColumnMultiValueReaderWriter(MAX_MULTI_VALUES_PER_ROW, avgNumMultiValues, _capacity,
indexColumnSize, _memoryManager, allocationContext);
// two possible cases can lead here:
// (1) dictionary encoded forward index
// (2) raw forward index for fixed width types -- INT, LONG, FLOAT, DOUBLE
if (fieldSpec.isSingleValueField()) {
// SV column -- both dictionary encoded and raw index are supported on SV
// columns for both fixed and variable width types
String allocationContext =
buildAllocationContext(_segmentName, column, V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
indexReaderWriter = new FixedByteSingleColumnSingleValueReaderWriter(_capacity, forwardIndexColumnSize, _memoryManager,
allocationContext);
} else {
// MV column -- only dictionary encoded index is supported on MV columns
// for both fixed and variable width types
// TODO: Start with a smaller capacity on FixedByteSingleColumnMultiValueReaderWriter and let it expand
mcvsubbu marked this conversation as resolved.
Show resolved Hide resolved
String allocationContext =
buildAllocationContext(_segmentName, column, V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
indexReaderWriter =
new FixedByteSingleColumnMultiValueReaderWriter(MAX_MULTI_VALUES_PER_ROW, avgNumMultiValues, _capacity,
forwardIndexColumnSize, _memoryManager, allocationContext);
}
}

_indexReaderWriterMap.put(column, indexReaderWriter);

if (invertedIndexColumns.contains(column)) {
Expand All @@ -240,6 +273,20 @@ public long getLatestIngestionTimestamp() {
_recordIdMap = enableMetricsAggregationIfPossible(config, noDictionaryColumns);
}

/**
* Decide whether a given column should be dictionary encoded or not
* @param noDictionaryColumns no dictionary column set
* @param invertedIndexColumns inverted index column set
* @param fieldSpec field spec of column
* @param column column name
* @return true if column is no-dictionary, false if dictionary encoded
*/
private boolean isNoDictionaryColumn(Set<String> noDictionaryColumns, Set<String> invertedIndexColumns,
FieldSpec fieldSpec, String column) {
return noDictionaryColumns.contains(column) && fieldSpec.isSingleValueField()
&& !invertedIndexColumns.contains(column);
}

public SegmentPartitionConfig getSegmentPartitionConfig() {
return _segmentPartitionConfig;
}
Expand Down Expand Up @@ -336,14 +383,15 @@ private void addForwardIndex(GenericRow row, int docId, Map<String, Object> dict
String column = fieldSpec.getName();
Object value = row.getValue(column);
if (fieldSpec.isSingleValueField()) {
// SV column
FixedByteSingleColumnSingleValueReaderWriter indexReaderWriter =
(FixedByteSingleColumnSingleValueReaderWriter) _indexReaderWriterMap.get(column);
Integer dictId = (Integer) dictIdMap.get(column);
if (dictId != null) {
// Column with dictionary
// SV column with dictionary
indexReaderWriter.setInt(docId, dictId);
} else {
// No-dictionary column
// No-dictionary SV column
FieldSpec.DataType dataType = fieldSpec.getDataType();
switch (dataType) {
case INT:
Expand All @@ -358,6 +406,12 @@ private void addForwardIndex(GenericRow row, int docId, Map<String, Object> dict
case DOUBLE:
indexReaderWriter.setDouble(docId, (Double) value);
break;
case STRING:
indexReaderWriter.setString(docId, (String) value);
break;
case BYTES:
indexReaderWriter.setBytes(docId, (byte[]) value);
break;
default:
throw new UnsupportedOperationException(
"Unsupported data type: " + dataType + " for no-dictionary column: " + column);
Expand Down
Expand Up @@ -92,7 +92,7 @@ public byte[] getBytes(int row) {

@Override
public byte[] getBytes(int row, ReaderContext context) {
throw new UnsupportedOperationException();
return getBytes(row);
}

@Override
Expand Down Expand Up @@ -144,4 +144,12 @@ public void setString(int row, String string) {
public void setBytes(int row, byte[] bytes) {
throw new UnsupportedOperationException();
}

public int getLengthOfShortestElement() {
throw new UnsupportedOperationException();
}

public int getLengthOfLongestElement() {
throw new UnsupportedOperationException();
}
}
Expand Up @@ -71,6 +71,16 @@ public FixedByteSingleColumnSingleValueReaderWriter(int numRowsPerChunk, int col
addBuffer();
}

@Override
public int getLengthOfShortestElement() {
return _columnSizesInBytes;
}

@Override
public int getLengthOfLongestElement() {
return _columnSizesInBytes;
}

@Override
public void close()
throws IOException {
Expand Down
@@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.io.readerwriter.impl;

import java.io.IOException;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.core.io.readerwriter.BaseSingleColumnSingleValueReaderWriter;
import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
import org.apache.pinot.core.io.writer.impl.MutableOffHeapByteArrayStore;

public class VarByteSingleColumnSingleValueReaderWriter extends BaseSingleColumnSingleValueReaderWriter {
private final MutableOffHeapByteArrayStore _byteArrayStore;
private int _lengthOfShortestElement;
private int _lengthOfLongestElement;

public VarByteSingleColumnSingleValueReaderWriter(
PinotDataBufferMemoryManager memoryManager,
String allocationContext,
int estimatedMaxNumberOfValues,
int estimatedAverageStringLength) {
_byteArrayStore = new MutableOffHeapByteArrayStore(memoryManager, allocationContext, estimatedMaxNumberOfValues, estimatedAverageStringLength);
mcvsubbu marked this conversation as resolved.
Show resolved Hide resolved
_lengthOfShortestElement = Integer.MAX_VALUE;
_lengthOfLongestElement = Integer.MIN_VALUE;
}

@Override
public int getLengthOfShortestElement() {
return _lengthOfShortestElement;
}

@Override
public int getLengthOfLongestElement() {
return _lengthOfLongestElement;
}

@Override
public void close()
throws IOException {
_byteArrayStore.close();
}

@Override
public void setInt(int row, int i) {
throw new UnsupportedOperationException();
}

@Override
public int getInt(int row) {
throw new UnsupportedOperationException();
}

@Override
public void setLong(int row, long l) {
throw new UnsupportedOperationException();
}

@Override
public long getLong(int row) {
throw new UnsupportedOperationException();
}

@Override
public void setFloat(int row, float f) {
throw new UnsupportedOperationException();
}

@Override
public float getFloat(int row) {
throw new UnsupportedOperationException();
}

@Override
public void setDouble(int row, double d) {
throw new UnsupportedOperationException();
}

@Override
public double getDouble(int row) {
throw new UnsupportedOperationException();
}

@Override
public void setString(int row, String val) {
byte[] serializedValue = StringUtil.encodeUtf8(val);
setBytes(row, serializedValue);
}

@Override
public String getString(int row) {
return StringUtil.decodeUtf8(_byteArrayStore.get(row));
}

@Override
public void setBytes(int row, byte[] value) {
_byteArrayStore.add(value);
_lengthOfLongestElement = Math.max(_lengthOfLongestElement, value.length);
_lengthOfShortestElement = Math.min(_lengthOfShortestElement, value.length);
}

@Override
public byte[] getBytes(int row) {
return _byteArrayStore.get(row);
}
}
Expand Up @@ -22,6 +22,7 @@
import org.apache.pinot.core.common.Block;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.data.partition.PartitionFunction;
import org.apache.pinot.core.io.readerwriter.BaseSingleColumnSingleValueReaderWriter;
import org.apache.pinot.core.segment.creator.ColumnStatistics;
import org.apache.pinot.core.segment.index.data.source.ColumnDataSource;

Expand All @@ -30,12 +31,15 @@

public class RealtimeNoDictionaryColStatistics implements ColumnStatistics {

final BaseSingleColumnSingleValueReaderWriter _forwardIndex;
final BlockValSet _blockValSet;
final int _numDocIds;
final String _operatorName;

public RealtimeNoDictionaryColStatistics(ColumnDataSource dataSource) {
_operatorName = dataSource.getOperatorName();
// no-dictionary is only supported for SV columns
_forwardIndex = (BaseSingleColumnSingleValueReaderWriter)dataSource.getForwardIndex();
Block block = dataSource.nextBlock();
_numDocIds = block.getMetadata().getEndDocId() + 1;
_blockValSet = block.getBlockValueSet();
Expand Down Expand Up @@ -63,12 +67,12 @@ public int getCardinality() {

@Override
public int getLengthOfShortestElement() {
return lengthOfDataType(); // Only fixed length data types supported.
return _forwardIndex.getLengthOfShortestElement();
}

@Override
public int getLengthOfLargestElement() {
return lengthOfDataType(); // Only fixed length data types supported.
return _forwardIndex.getLengthOfLongestElement();
}

@Override
Expand Down Expand Up @@ -105,19 +109,4 @@ public int getNumPartitions() {
public Set<Integer> getPartitions() {
return null;
}

private int lengthOfDataType() {
switch (_blockValSet.getValueType()) {
case INT:
return Integer.BYTES;
case LONG:
return Long.BYTES;
case FLOAT:
return Float.BYTES;
case DOUBLE:
return Double.BYTES;
default:
throw new UnsupportedOperationException();
}
}
}
Expand Up @@ -190,4 +190,8 @@ protected Block getNextBlock() {
public String getOperatorName() {
return _operatorName;
}

public DataFileReader getForwardIndex() {
return _forwardIndex;
}
}