/
SegmentPreProcessor.java
125 lines (113 loc) · 5.68 KB
/
SegmentPreProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.segment.index.loader;
import java.io.File;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
import org.apache.pinot.core.segment.index.loader.bloomfilter.BloomFilterHandler;
import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGenerator;
import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode;
import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandler;
import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandlerFactory;
import org.apache.pinot.core.segment.index.loader.invertedindex.InvertedIndexHandler;
import org.apache.pinot.core.segment.store.SegmentDirectory;
/**
* Use mmap to load the segment and perform all pre-processing steps. (This can be slow)
* <p>Pre-processing steps include:
* <ul>
* <li>Use {@link InvertedIndexHandler} to create inverted indices</li>
* <li>Use {@link DefaultColumnHandler} to update auto-generated default columns</li>
* <li>Use {@link ColumnMinMaxValueGenerator} to add min/max value to column metadata</li>
* </ul>
*/
public class SegmentPreProcessor implements AutoCloseable {
private final File _indexDir;
private final IndexLoadingConfig _indexLoadingConfig;
private final Schema _schema;
private final SegmentDirectory _segmentDirectory;
private SegmentMetadataImpl _segmentMetadata;
public SegmentPreProcessor(@Nonnull File indexDir, @Nonnull IndexLoadingConfig indexLoadingConfig,
@Nullable Schema schema)
throws Exception {
_indexDir = indexDir;
_indexLoadingConfig = indexLoadingConfig;
_schema = schema;
_segmentMetadata = new SegmentMetadataImpl(indexDir);
// Always use mmap to load the segment because it is safest and performs well without impact from -Xmx params.
// This is not the final load of the segment.
_segmentDirectory = SegmentDirectory.createFromLocalFS(indexDir, _segmentMetadata, ReadMode.mmap);
}
public void process()
throws Exception {
if (_segmentMetadata.getTotalDocs() == 0) {
return;
}
// Remove all the existing inverted index temp files before loading segments.
// NOTE: This step fixes the issue of temporary files not getting deleted after creating new inverted indexes.
// In this, we look for all files in the directory and remove the ones with '.bitmap.inv.tmp' extension.
File[] directoryListing = _indexDir.listFiles();
String tempFileExtension = V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION + ".tmp";
if (directoryListing != null) {
for (File child : directoryListing) {
if (child.getName().endsWith(tempFileExtension)) {
FileUtils.deleteQuietly(child);
}
}
}
try (SegmentDirectory.Writer segmentWriter = _segmentDirectory.createWriter()) {
// Update default columns according to the schema.
if (_schema != null) {
DefaultColumnHandler defaultColumnHandler =
DefaultColumnHandlerFactory.getDefaultColumnHandler(_indexDir, _schema, _segmentMetadata, segmentWriter);
defaultColumnHandler.updateDefaultColumns();
_segmentMetadata = new SegmentMetadataImpl(_indexDir);
}
// Create column inverted indices according to the index config.
InvertedIndexHandler invertedIndexHandler =
new InvertedIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
invertedIndexHandler.createInvertedIndices();
// Create bloom filter if required
BloomFilterHandler bloomFilterHandler =
new BloomFilterHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
bloomFilterHandler.createBloomFilters();
// Add min/max value to column metadata according to the prune mode.
// For star-tree index, because it can only increase the range, so min/max value can still be used in pruner.
ColumnMinMaxValueGeneratorMode columnMinMaxValueGeneratorMode =
_indexLoadingConfig.getColumnMinMaxValueGeneratorMode();
if (columnMinMaxValueGeneratorMode != ColumnMinMaxValueGeneratorMode.NONE) {
ColumnMinMaxValueGenerator columnMinMaxValueGenerator =
new ColumnMinMaxValueGenerator(_segmentMetadata, segmentWriter, columnMinMaxValueGeneratorMode);
columnMinMaxValueGenerator.addColumnMinMaxValue();
// NOTE: This step may modify the segment metadata. When adding new steps after this, un-comment the next line.
// _segmentMetadata = new SegmentMetadataImpl(_indexDir);
}
segmentWriter.save();
}
}
@Override
public void close()
throws Exception {
_segmentDirectory.close();
}
}