Skip to content

Commit

Permalink
Derive num docs per chunk from max column value length for varbyte ra…
Browse files Browse the repository at this point in the history
…w index creator (#5256)

* Derive numDocsPerChunk from max column value length
for var byte raw forward index creator

* review comments

Co-authored-by: Siddharth Teotia <steotia@steotia-mn1.linkedin.biz>
  • Loading branch information
siddharthteotia and Siddharth Teotia authored Apr 17, 2020
1 parent d68ef7b commit 142a86f
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ public class VarByteChunkSingleValueReader extends BaseChunkSingleValueReader {
*/
public VarByteChunkSingleValueReader(PinotDataBuffer pinotDataBuffer) {
super(pinotDataBuffer);

int chunkHeaderSize = _numDocsPerChunk * Integer.BYTES;
_maxChunkSize = chunkHeaderSize + (_lengthOfLongestEntry * _numDocsPerChunk);
_maxChunkSize = _numDocsPerChunk * (VarByteChunkSingleValueWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + _lengthOfLongestEntry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
@NotThreadSafe
public class VarByteChunkSingleValueWriter extends BaseChunkSingleValueWriter {
private static final int CURRENT_VERSION = 2;
public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES;

private final int _chunkHeaderSize;
private int _chunkHeaderOffset;
Expand All @@ -70,11 +71,11 @@ public VarByteChunkSingleValueWriter(File file, ChunkCompressorFactory.Compressi
throws FileNotFoundException {

super(file, compressionType, totalDocs, numDocsPerChunk,
((numDocsPerChunk * Integer.BYTES) + (lengthOfLongestEntry * numDocsPerChunk)), // chunkSize
numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + lengthOfLongestEntry), // chunkSize
lengthOfLongestEntry, CURRENT_VERSION);

_chunkHeaderOffset = 0;
_chunkHeaderSize = numDocsPerChunk * Integer.BYTES;
_chunkHeaderSize = numDocsPerChunk * CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
_chunkDataOffSet = _chunkHeaderSize;
}

Expand All @@ -87,7 +88,7 @@ public void setString(int row, String string) {
@Override
public void setBytes(int row, byte[] bytes) {
_chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
_chunkHeaderOffset += Integer.BYTES;
_chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;

_chunkBuffer.position(_chunkDataOffSet);
_chunkBuffer.put(bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.segment.creator.impl.fwd;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
Expand All @@ -27,15 +28,21 @@


public class SingleValueVarByteRawIndexCreator extends BaseSingleValueRawIndexCreator {
private static final int NUM_DOCS_PER_CHUNK = 1000; // TODO: Auto-derive this based on metadata.
private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;

private final VarByteChunkSingleValueWriter _indexWriter;

public SingleValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressorFactory.CompressionType compressionType,
String column, int totalDocs, int maxLength)
throws IOException {
File file = new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
_indexWriter = new VarByteChunkSingleValueWriter(file, compressionType, totalDocs, NUM_DOCS_PER_CHUNK, maxLength);
_indexWriter = new VarByteChunkSingleValueWriter(file, compressionType, totalDocs, getNumDocsPerChunk(maxLength), maxLength);
}

@VisibleForTesting
public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
int overheadPerEntry = lengthOfLongestEntry + VarByteChunkSingleValueWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -338,7 +339,7 @@ void createV1ForwardIndexForTextIndex(String column, IndexLoadingConfig indexLoa
int totalDocs = _segmentMetadata.getTotalDocs();
Object defaultValue = fieldSpec.getDefaultNullValue();
String stringDefaultValue = (String) defaultValue;
int lengthOfLongestEntry = stringDefaultValue.length();
int lengthOfLongestEntry = StringUtil.encodeUtf8(stringDefaultValue).length;
int dictionaryElementSize = 0;

SingleValueVarByteRawIndexCreator rawIndexCreator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.io.reader.DataFileReader;
import org.apache.pinot.core.io.reader.impl.ChunkReaderContext;
import org.apache.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
import org.apache.pinot.core.segment.creator.TextIndexType;
import org.apache.pinot.core.segment.creator.impl.inv.text.LuceneTextIndexCreator;
Expand Down Expand Up @@ -162,8 +163,9 @@ private void createTextIndexForColumn(ColumnMetadata columnMetadata)
try (LuceneTextIndexCreator textIndexCreator = new LuceneTextIndexCreator(column, segmentDirectory, true)) {
try (DataFileReader forwardIndexReader = getForwardIndexReader(columnMetadata)) {
VarByteChunkSingleValueReader forwardIndex = (VarByteChunkSingleValueReader) forwardIndexReader;
ChunkReaderContext readerContext = forwardIndex.createContext();
for (int docID = 0; docID < numDocs; docID++) {
Object docToAdd = forwardIndex.getString(docID);
Object docToAdd = forwardIndex.getString(docID, readerContext);
textIndexCreator.addDoc(docToAdd, docID);
}
textIndexCreator.seal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pinot.core.io.reader.impl.ChunkReaderContext;
import org.apache.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
import org.apache.pinot.core.io.writer.impl.v1.VarByteChunkSingleValueWriter;
import org.apache.pinot.core.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -136,4 +137,69 @@ public void testBackwardCompatibility()
}
}
}

@Test
public void testVarCharWithDifferentSizes() throws Exception {
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 10, 1000);
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 10, 1000);

testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 100, 1000);
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 100, 1000);

testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 1000, 1000);
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 1000, 1000);

testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 10000, 100);
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 10000, 100);

testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 100000, 10);
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 100000, 10);

testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 1000000, 10);
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 1000000, 10);

testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 2000000, 10);
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 2000000, 10);
}

private void testLargeVarcharHelper(ChunkCompressorFactory.CompressionType compressionType, int numChars, int numDocs)
throws Exception {
String[] expected = new String[numDocs];
Random random = new Random();

File outFile = new File(TEST_FILE);
FileUtils.deleteQuietly(outFile);

int maxStringLengthInBytes = 0;
for (int i = 0; i < numDocs; i++) {
expected[i] = RandomStringUtils.random(random.nextInt(numChars));
maxStringLengthInBytes = Math.max(maxStringLengthInBytes, expected[i].getBytes(UTF_8).length);
}

int numDocsPerChunk = SingleValueVarByteRawIndexCreator.getNumDocsPerChunk(maxStringLengthInBytes);
VarByteChunkSingleValueWriter writer =
new VarByteChunkSingleValueWriter(outFile, compressionType, numDocs, numDocsPerChunk,
maxStringLengthInBytes);

for (int i = 0; i < numDocs; i += 2) {
writer.setString(i, expected[i]);
writer.setBytes(i + 1, expected[i].getBytes(UTF_8));
}

writer.close();

try (VarByteChunkSingleValueReader reader = new VarByteChunkSingleValueReader(
PinotDataBuffer.mapReadOnlyBigEndianFile(outFile))) {
ChunkReaderContext context = reader.createContext();

for (int i = 0; i < numDocs; i += 2) {
String actual = reader.getString(i, context);
Assert.assertEquals(actual, expected[i]);
Assert.assertEquals(actual.getBytes(UTF_8), expected[i].getBytes(UTF_8));
Assert.assertEquals(reader.getBytes(i + 1), expected[i].getBytes(UTF_8));
}
}

FileUtils.deleteQuietly(outFile);
}
}

0 comments on commit 142a86f

Please sign in to comment.