Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -72,6 +75,22 @@ private LuceneTextIndexCombined() {
*/
public static void combineLuceneIndexFiles(File luceneIndexDir, String outputFilePath)
throws IOException {
combineLuceneIndexFiles(luceneIndexDir, outputFilePath, null, null);
}

/**
* Combines all files from a Lucene text index directory into a single file.
* Also collects the docIdMapping file from the segment directory if present.
*
* @param luceneIndexDir the Lucene index directory to combine
* @param outputFilePath the output file path to write the combined data
* @param segmentIndexDir the segment index directory (optional, used to find docIdMapping file)
* @param column the column name (optional, used to find docIdMapping file)
* @throws IOException if any file operations fail
*/
public static void combineLuceneIndexFiles(File luceneIndexDir, String outputFilePath,
@Nullable File segmentIndexDir, @Nullable String column)
throws IOException {
if (!luceneIndexDir.exists() || !luceneIndexDir.isDirectory()) {
throw new IllegalArgumentException(
"Lucene index directory does not exist or is not a directory: " + luceneIndexDir);
Expand All @@ -80,7 +99,7 @@ public static void combineLuceneIndexFiles(File luceneIndexDir, String outputFil
LOGGER.info("Combining Lucene text index files from directory: {}", luceneIndexDir.getAbsolutePath());

// Step 1: Collect all files and calculate total size
Map<String, FileInfo> fileInfoMap = collectFiles(luceneIndexDir);
Map<String, FileInfo> fileInfoMap = collectFiles(luceneIndexDir, segmentIndexDir, column);
int fileCount = fileInfoMap.size();

if (fileCount == 0) {
Expand Down Expand Up @@ -115,11 +134,18 @@ public static void combineLuceneIndexFiles(File luceneIndexDir, String outputFil

/**
* Collects all files from the Lucene index directory and their metadata.
* Also collects the docIdMapping file from the segment directory if present.
*
* @param luceneIndexDir the Lucene index directory
* @param segmentIndexDir the segment index directory (optional, used to find docIdMapping file)
* @param column the column name (optional, used to find docIdMapping file)
*/
private static Map<String, FileInfo> collectFiles(File luceneIndexDir)
private static Map<String, FileInfo> collectFiles(File luceneIndexDir, @Nullable File segmentIndexDir,
@Nullable String column)
throws IOException {
Map<String, FileInfo> fileInfoMap = new TreeMap<>(); // Use TreeMap for consistent ordering

// Collect files from the Lucene index directory
File[] files = luceneIndexDir.listFiles();
if (files != null) {
for (File file : files) {
Expand All @@ -132,6 +158,19 @@ private static Map<String, FileInfo> collectFiles(File luceneIndexDir)
}
}

// Collect the docIdMapping file from the segment directory if it exists
if (segmentIndexDir != null && column != null) {
File segmentDir = SegmentDirectoryPaths.findSegmentDirectory(segmentIndexDir);
File docIdMappingFile = new File(segmentDir,
column + V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION);
if (docIdMappingFile.exists() && docIdMappingFile.isFile()) {
String mappingFileName = docIdMappingFile.getName();
long mappingFileSize = docIdMappingFile.length();
fileInfoMap.put(mappingFileName, new FileInfo(docIdMappingFile, mappingFileName, mappingFileSize));
LOGGER.info("Including docIdMapping file: {} ({} bytes)", mappingFileName, mappingFileSize);
}
}

return fileInfoMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class LuceneTextIndexCreator extends AbstractTextIndexCreator {
private IndexWriter _indexWriter;
private File _segmentDirectory = null;
private int _nextDocId = 0;
private final TextIndexConfig _config;

public static HashSet<String> getDefaultEnglishStopWordsSet() {
return new HashSet<>(
Expand Down Expand Up @@ -118,6 +119,7 @@ public LuceneTextIndexCreator(String column, File segmentIndexDir, boolean commi
_commitOnClose = commit;
_combineAndCleanupFiles = combineAndCleanupFiles;
_segmentDirectory = segmentIndexDir;
_config = config;
String luceneAnalyzerClass = config.getLuceneAnalyzerClass();
try {
// segment generation is always in V1 and later we convert (as part of post creation processing)
Expand Down Expand Up @@ -380,7 +382,7 @@ private void combineAndCleanupTextIndexFiles()
// Find the lucene text index directory first
File textIndexFile = SegmentDirectoryPaths.findTextIndexIndexFile(_segmentDirectory, _textColumn);
if (textIndexFile != null && textIndexFile.exists()) {
LuceneTextIndexCombined.combineLuceneIndexFiles(textIndexFile, outputFilePath);
LuceneTextIndexCombined.combineLuceneIndexFiles(textIndexFile, outputFilePath, _segmentDirectory, _textColumn);
} else {
LOGGER.warn("Text index directory not found for combining: {}", _textColumn);
}
Expand All @@ -396,6 +398,18 @@ private void combineAndCleanupTextIndexFiles()
} else {
LOGGER.warn("Text index directory not found or does not exist for column: {}", _textColumn);
}

// Delete the lucene mapping file if it exists
File docIdMappingFile = new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentDirectory),
_textColumn + V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION);
if (docIdMappingFile.exists()) {
try {
FileUtils.delete(docIdMappingFile);
LOGGER.info("Successfully deleted Lucene text index mapping file: {}", docIdMappingFile.getAbsolutePath());
} catch (IOException e) {
LOGGER.warn("Failed to delete Lucene text index mapping file: {}", docIdMappingFile.getAbsolutePath(), e);
}
}
}

@Override
Expand All @@ -407,6 +421,18 @@ public void close()
try {
// based on the commit flag set in IndexWriterConfig, this will decide to commit or not
_indexWriter.close();
// Build docIdMapping file if storeInSegmentFile is true
// This allows the mapping file to be available during read without building it on-the-fly
if (_config.isStoreInSegmentFile()) {
//Check if mapping file already exists
File mappingFile = new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentDirectory),
_textColumn + V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION);
if (!mappingFile.exists()) {
LOGGER.info("lucene doc IdMapping file doesn't exists for column: {}, building mapping file", _textColumn);
// Build the docId mapping file so it's available during segment load
buildMappingFile(_segmentDirectory, _textColumn, _indexDirectory, null);
}
}
_indexDirectory.close();
} catch (Exception e) {
throw new RuntimeException("Caught exception while closing the Lucene index for column: " + _textColumn, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,10 @@ public LuceneTextIndexReader(String column, PinotDataBuffer indexBuffer, int num
}
PinotDataBuffer docIdMappingBuffer = LuceneTextIndexBufferReader.extractDocIdMappingBuffer(indexBuffer, column);
// Initialize docId translator
long startTime = System.currentTimeMillis();
_docIdTranslator = createDocIdTranslator(docIdMappingBuffer, config, numDocs);
LOGGER.info("Time taken to create docIdTranslator for column {}: {} ms", column,
System.currentTimeMillis() - startTime);
// Initialize analyzer and query parser
_analyzer = TextIndexUtils.getAnalyzer(config);
_queryParserClass = config.getLuceneQueryParserClass();
Expand Down Expand Up @@ -446,9 +449,15 @@ private DocIdTranslator createDocIdTranslator(PinotDataBuffer docIdMappingBuffer
}

if (docIdMappingBuffer != null) {
return new DefaultDocIdTranslator(docIdMappingBuffer);
// Ensure the buffer is in little endian format as expected by DefaultDocIdTranslator
// Create a view with little endian byte order if the buffer is not already in little endian
PinotDataBuffer littleEndianBuffer = docIdMappingBuffer.order() == ByteOrder.LITTLE_ENDIAN
? docIdMappingBuffer
: docIdMappingBuffer.view(0, docIdMappingBuffer.size(), ByteOrder.LITTLE_ENDIAN);
return new DefaultDocIdTranslator(littleEndianBuffer);
}

LOGGER.info("building doc id mapping for text index column: {}", _column);
// Create a new buffer and populate it
int length = Integer.BYTES * numDocs;
String desc = "Text index docId mapping buffer: " + _column;
Expand Down
Loading