Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Derive num docs per chunk from max column value length for varbyte raw index creator #5256

Merged
merged 2 commits into from
Apr 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ public class VarByteChunkSingleValueReader extends BaseChunkSingleValueReader {
*/
public VarByteChunkSingleValueReader(PinotDataBuffer pinotDataBuffer) {
super(pinotDataBuffer);

int chunkHeaderSize = _numDocsPerChunk * Integer.BYTES;
_maxChunkSize = chunkHeaderSize + (_lengthOfLongestEntry * _numDocsPerChunk);
_maxChunkSize = _numDocsPerChunk * (VarByteChunkSingleValueWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + _lengthOfLongestEntry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
@NotThreadSafe
public class VarByteChunkSingleValueWriter extends BaseChunkSingleValueWriter {
private static final int CURRENT_VERSION = 2;
public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES;

private final int _chunkHeaderSize;
private int _chunkHeaderOffset;
Expand All @@ -70,11 +71,11 @@ public VarByteChunkSingleValueWriter(File file, ChunkCompressorFactory.Compressi
throws FileNotFoundException {

super(file, compressionType, totalDocs, numDocsPerChunk,
((numDocsPerChunk * Integer.BYTES) + (lengthOfLongestEntry * numDocsPerChunk)), // chunkSize
numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + lengthOfLongestEntry), // chunkSize
lengthOfLongestEntry, CURRENT_VERSION);

_chunkHeaderOffset = 0;
_chunkHeaderSize = numDocsPerChunk * Integer.BYTES;
_chunkHeaderSize = numDocsPerChunk * CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
_chunkDataOffSet = _chunkHeaderSize;
}

Expand All @@ -87,7 +88,7 @@ public void setString(int row, String string) {
@Override
public void setBytes(int row, byte[] bytes) {
_chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
_chunkHeaderOffset += Integer.BYTES;
_chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;

_chunkBuffer.position(_chunkDataOffSet);
_chunkBuffer.put(bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.segment.creator.impl.fwd;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
Expand All @@ -27,15 +28,21 @@


public class SingleValueVarByteRawIndexCreator extends BaseSingleValueRawIndexCreator {
private static final int NUM_DOCS_PER_CHUNK = 1000; // TODO: Auto-derive this based on metadata.
private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;

private final VarByteChunkSingleValueWriter _indexWriter;

public SingleValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressorFactory.CompressionType compressionType,
String column, int totalDocs, int maxLength)
throws IOException {
File file = new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
_indexWriter = new VarByteChunkSingleValueWriter(file, compressionType, totalDocs, NUM_DOCS_PER_CHUNK, maxLength);
_indexWriter = new VarByteChunkSingleValueWriter(file, compressionType, totalDocs, getNumDocsPerChunk(maxLength), maxLength);
}

@VisibleForTesting
public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic can be pushed down to the VarByteChunkSingleValueWriter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be. The call to super() in the constructor of VarByteChunkSingleValueWriter makes things slightly since you have to call this function two times (as part of the call to super). I think the constructor of VarByteChunkSingleValueWriter and its base class can be refactored a little bit to make this logic private to the writer.

I have a follow-up coming up for the TODO mentioned in the PR description. Will do as part of that

int overheadPerEntry = lengthOfLongestEntry + VarByteChunkSingleValueWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -338,7 +339,7 @@ void createV1ForwardIndexForTextIndex(String column, IndexLoadingConfig indexLoa
int totalDocs = _segmentMetadata.getTotalDocs();
Object defaultValue = fieldSpec.getDefaultNullValue();
String stringDefaultValue = (String) defaultValue;
int lengthOfLongestEntry = stringDefaultValue.length();
int lengthOfLongestEntry = StringUtil.encodeUtf8(stringDefaultValue).length;
int dictionaryElementSize = 0;

SingleValueVarByteRawIndexCreator rawIndexCreator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.io.reader.DataFileReader;
import org.apache.pinot.core.io.reader.impl.ChunkReaderContext;
import org.apache.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
import org.apache.pinot.core.segment.creator.TextIndexType;
import org.apache.pinot.core.segment.creator.impl.inv.text.LuceneTextIndexCreator;
Expand Down Expand Up @@ -162,8 +163,9 @@ private void createTextIndexForColumn(ColumnMetadata columnMetadata)
try (LuceneTextIndexCreator textIndexCreator = new LuceneTextIndexCreator(column, segmentDirectory, true)) {
try (DataFileReader forwardIndexReader = getForwardIndexReader(columnMetadata)) {
VarByteChunkSingleValueReader forwardIndex = (VarByteChunkSingleValueReader) forwardIndexReader;
ChunkReaderContext readerContext = forwardIndex.createContext();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

for (int docID = 0; docID < numDocs; docID++) {
Object docToAdd = forwardIndex.getString(docID);
Object docToAdd = forwardIndex.getString(docID, readerContext);
textIndexCreator.addDoc(docToAdd, docID);
}
textIndexCreator.seal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pinot.core.io.reader.impl.ChunkReaderContext;
import org.apache.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
import org.apache.pinot.core.io.writer.impl.v1.VarByteChunkSingleValueWriter;
import org.apache.pinot.core.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -136,4 +137,69 @@ public void testBackwardCompatibility()
}
}
}

@Test
public void testVarCharWithDifferentSizes() throws Exception {
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 10, 1000);
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 10, 1000);

testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 100, 1000);
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 100, 1000);

testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 1000, 1000);
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 1000, 1000);

testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 10000, 100);
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 10000, 100);

testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 100000, 10);
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 100000, 10);

testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 1000000, 10);
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 1000000, 10);

testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 2000000, 10);
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 2000000, 10);
}

private void testLargeVarcharHelper(ChunkCompressorFactory.CompressionType compressionType, int numChars, int numDocs)
throws Exception {
String[] expected = new String[numDocs];
Random random = new Random();

File outFile = new File(TEST_FILE);
FileUtils.deleteQuietly(outFile);

int maxStringLengthInBytes = 0;
for (int i = 0; i < numDocs; i++) {
expected[i] = RandomStringUtils.random(random.nextInt(numChars));
maxStringLengthInBytes = Math.max(maxStringLengthInBytes, expected[i].getBytes(UTF_8).length);
}

int numDocsPerChunk = SingleValueVarByteRawIndexCreator.getNumDocsPerChunk(maxStringLengthInBytes);
VarByteChunkSingleValueWriter writer =
new VarByteChunkSingleValueWriter(outFile, compressionType, numDocs, numDocsPerChunk,
maxStringLengthInBytes);

for (int i = 0; i < numDocs; i += 2) {
writer.setString(i, expected[i]);
writer.setBytes(i + 1, expected[i].getBytes(UTF_8));
}

writer.close();

try (VarByteChunkSingleValueReader reader = new VarByteChunkSingleValueReader(
PinotDataBuffer.mapReadOnlyBigEndianFile(outFile))) {
ChunkReaderContext context = reader.createContext();

for (int i = 0; i < numDocs; i += 2) {
String actual = reader.getString(i, context);
Assert.assertEquals(actual, expected[i]);
Assert.assertEquals(actual.getBytes(UTF_8), expected[i].getBytes(UTF_8));
Assert.assertEquals(reader.getBytes(i + 1), expected[i].getBytes(UTF_8));
}
}

FileUtils.deleteQuietly(outFile);
}
}