Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In SegmentPurger, use table config to generate the segment #5325

Merged
merged 1 commit into from
May 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
Expand Down Expand Up @@ -195,7 +194,8 @@ public SegmentGeneratorConfig(@Nullable TableConfig tableConfig, Schema schema)
if (noDictionaryColumnMap != null) {
Map<String, ChunkCompressorFactory.CompressionType> serializedNoDictionaryColumnMap =
noDictionaryColumnMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
e -> (ChunkCompressorFactory.CompressionType) ChunkCompressorFactory.CompressionType.valueOf(e.getValue())));
e -> (ChunkCompressorFactory.CompressionType) ChunkCompressorFactory.CompressionType
.valueOf(e.getValue())));
this.setRawIndexCompressionType(serializedNoDictionaryColumnMap);
}
}
Expand All @@ -214,7 +214,13 @@ public SegmentGeneratorConfig(@Nullable TableConfig tableConfig, Schema schema)
setStarTreeV2BuilderConfigs(starTreeV2BuilderConfigs);
}

if (indexingConfig.isCreateInvertedIndexDuringSegmentGeneration()) {
// NOTE: There are 2 ways to configure creating inverted index during segment generation:
// - Set 'generate.inverted.index.before.push' to 'true' in custom config (deprecated)
// - Enable 'createInvertedIndexDuringSegmentGeneration' in indexing config
// TODO: Clean up the table configs with the deprecated settings, and always use the one in the indexing config
Map<String, String> customConfigs = tableConfig.getCustomConfig().getCustomConfigs();
if ((customConfigs != null && Boolean.parseBoolean(customConfigs.get("generate.inverted.index.before.push")))
|| indexingConfig.isCreateInvertedIndexDuringSegmentGeneration()) {
_invertedIndexCreationColumns = indexingConfig.getInvertedIndexColumns();
}

Expand Down Expand Up @@ -340,6 +346,8 @@ public void setRawIndexCreationColumns(List<String> rawIndexCreationColumns) {
_rawIndexCreationColumns.addAll(rawIndexCreationColumns);
}

// NOTE: Should always be extracted from the table config
@Deprecated
public void setInvertedIndexCreationColumns(List<String> indexCreationColumns) {
Preconditions.checkNotNull(indexCreationColumns);
_invertedIndexCreationColumns.addAll(indexCreationColumns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -84,8 +83,8 @@ public class RawIndexConverter {
* NOTE: original segment should be in V1 format.
* TODO: support V3 format
*/
public RawIndexConverter(@Nonnull String rawTableName, @Nonnull File originalIndexDir,
@Nonnull File convertedIndexDir, @Nullable String columnsToConvert)
public RawIndexConverter(String rawTableName, File originalIndexDir, File convertedIndexDir,
@Nullable String columnsToConvert)
throws Exception {
FileUtils.copyDirectory(originalIndexDir, convertedIndexDir);
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
Expand Down Expand Up @@ -119,15 +118,15 @@ public boolean convert()
for (String columnToConvert : StringUtils.split(_columnsToConvert, ',')) {
FieldSpec fieldSpec = schema.getFieldSpecFor(columnToConvert);
if (fieldSpec == null) {
LOGGER.warn("Skip converting column: {} because is does not exist in the schema");
LOGGER.warn("Skip converting column: {} because is does not exist in the schema", columnsToConvert);
continue;
}
if (!fieldSpec.isSingleValueField()) {
LOGGER.warn("Skip converting column: {} because it's a multi-value column");
LOGGER.warn("Skip converting column: {} because it's a multi-value column", columnsToConvert);
continue;
}
if (!_originalSegmentMetadata.hasDictionary(columnToConvert)) {
LOGGER.warn("Skip converting column: {} because its index is not dictionary-based");
LOGGER.warn("Skip converting column: {} because its index is not dictionary-based", columnsToConvert);
continue;
}
columnsToConvert.add(fieldSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public class SegmentConverter {

public SegmentConverter(List<File> inputIndexDirs, File workingDir, String tableName, String segmentName,
int totalNumPartition, RecordTransformer recordTransformer, @Nullable RecordPartitioner recordPartitioner,
@Nullable RecordAggregator recordAggregator, @Nullable List<String> groupByColumns,
TableConfig tableConfig, boolean skipTimeValueCheck) {
@Nullable RecordAggregator recordAggregator, @Nullable List<String> groupByColumns, TableConfig tableConfig,
boolean skipTimeValueCheck) {
_inputIndexDirs = inputIndexDirs;
_workingDir = workingDir;
_recordTransformer = recordTransformer;
Expand Down Expand Up @@ -152,13 +152,8 @@ private void buildSegment(String outputPath, String tableName, String segmentNam
throws Exception {
SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
segmentGeneratorConfig.setOutDir(outputPath);
segmentGeneratorConfig.setTableName(tableName);
segmentGeneratorConfig.setSegmentName(segmentName);
segmentGeneratorConfig.setSkipTimeValueCheck(_skipTimeValueCheck);
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
if (indexingConfig != null && indexingConfig.getInvertedIndexColumns() != null) {
segmentGeneratorConfig.setInvertedIndexCreationColumns(indexingConfig.getInvertedIndexColumns());
}
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(segmentGeneratorConfig, recordReader);
driver.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,13 @@
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.core.segment.store.ColumnIndexType;
import org.apache.pinot.core.segment.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
Expand All @@ -47,31 +43,31 @@
public class SegmentPurger {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPurger.class);

private final String _rawTableName;
private final File _originalIndexDir;
private final File _indexDir;
private final File _workingDir;
private final TableConfig _tableConfig;
private final RecordPurger _recordPurger;
private final RecordModifier _recordModifier;

private int _numRecordsPurged;
private int _numRecordsModified;

public SegmentPurger(String rawTableName, File originalIndexDir, File workingDir, @Nullable RecordPurger recordPurger,
public SegmentPurger(File indexDir, File workingDir, TableConfig tableConfig, @Nullable RecordPurger recordPurger,
@Nullable RecordModifier recordModifier) {
Preconditions.checkArgument(recordPurger != null || recordModifier != null,
"At least one of record purger and modifier should be non-null");
_rawTableName = rawTableName;
_originalIndexDir = originalIndexDir;
_indexDir = indexDir;
_workingDir = workingDir;
_tableConfig = tableConfig;
_recordPurger = recordPurger;
_recordModifier = recordModifier;
}

public File purgeSegment()
throws Exception {
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_originalIndexDir);
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
String segmentName = segmentMetadata.getName();
LOGGER.info("Start purging table: {}, segment: {}", _rawTableName, segmentName);
LOGGER.info("Start purging table: {}, segment: {}", _tableConfig.getTableName(), segmentName);

try (PurgeRecordReader purgeRecordReader = new PurgeRecordReader()) {
// Make a first pass through the data to see if records need to be purged or modified
Expand All @@ -84,11 +80,8 @@ public File purgeSegment()
return null;
}

Schema schema = purgeRecordReader.getSchema();
// FIXME: make table config available here, and pass it to the SegmentGeneratorConfig
SegmentGeneratorConfig config = new SegmentGeneratorConfig(null, schema);
SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, purgeRecordReader.getSchema());
config.setOutDir(_workingDir.getPath());
config.setTableName(_rawTableName);
config.setSegmentName(segmentName);

// Keep index creation time the same as original segment because both segments use the same raw data.
Expand All @@ -105,30 +98,14 @@ public File purgeSegment()
config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
}

// Generate inverted index if it exists in the original segment
// TODO: once the column metadata correctly reflects whether inverted index exists for the column, use that
// instead of reading the segment
// TODO: uniform the behavior of Pinot Hadoop segment generation, segment converter and purger
List<String> invertedIndexCreationColumns = new ArrayList<>();
try (SegmentDirectory segmentDirectory = SegmentDirectory
.createFromLocalFS(_originalIndexDir, segmentMetadata, ReadMode.mmap);
SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
for (String column : schema.getColumnNames()) {
if (reader.hasIndexFor(column, ColumnIndexType.INVERTED_INDEX)) {
invertedIndexCreationColumns.add(column);
}
}
}
config.setInvertedIndexCreationColumns(invertedIndexCreationColumns);

SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
purgeRecordReader.rewind();
driver.init(config, purgeRecordReader);
driver.build();
}

LOGGER.info("Finish purging table: {}, segment: {}, purged {} records, modified {} records", _rawTableName,
segmentName, _numRecordsPurged, _numRecordsModified);
LOGGER.info("Finish purging table: {}, segment: {}, purged {} records, modified {} records",
_tableConfig.getTableName(), segmentName, _numRecordsPurged, _numRecordsModified);
return new File(_workingDir, segmentName);
}

Expand Down Expand Up @@ -160,7 +137,7 @@ private class PurgeRecordReader implements RecordReader {

PurgeRecordReader()
throws Exception {
_pinotSegmentRecordReader = new PinotSegmentRecordReader(_originalIndexDir);
_pinotSegmentRecordReader = new PinotSegmentRecordReader(_indexDir);
}

public Schema getSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.pinot.core.segment.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
Expand All @@ -61,6 +60,7 @@ public class SegmentPurgerTest {
private static final String D1 = "d1";
private static final String D2 = "d2";

private TableConfig _tableConfig;
private File _originalIndexDir;
private int _expectedNumRecordsPurged;
private int _expectedNumRecordsModified;
Expand All @@ -70,10 +70,11 @@ public void setUp()
throws Exception {
FileUtils.deleteDirectory(TEMP_DIR);

Schema schema = new Schema();
schema.addField(new DimensionFieldSpec(D1, FieldSpec.DataType.INT, true));
schema.addField(new DimensionFieldSpec(D2, FieldSpec.DataType.INT, true));
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
_tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setInvertedIndexColumns(Collections.singletonList(D1)).setCreateInvertedIndexDuringSegmentGeneration(true)
.build();
Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(D1, FieldSpec.DataType.INT)
.addSingleValueDimension(D2, FieldSpec.DataType.INT).build();

List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
for (int i = 0; i < NUM_ROWS; i++) {
Expand All @@ -85,17 +86,15 @@ public void setUp()
} else if (value2 == 0) {
_expectedNumRecordsModified++;
}
row.putField(D1, value1);
row.putField(D2, value2);
row.putValue(D1, value1);
row.putValue(D2, value2);
rows.add(row);
}
GenericRowRecordReader genericRowRecordReader = new GenericRowRecordReader(rows);

SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, schema);
config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath());
config.setTableName(TABLE_NAME);
config.setSegmentName(SEGMENT_NAME);
config.setInvertedIndexCreationColumns(Collections.singletonList(D1));

SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(config, genericRowRecordReader);
Expand All @@ -112,15 +111,15 @@ public void testPurgeSegment()
// Modify records with d2 = 0 to d2 = Integer.MAX_VALUE
SegmentPurger.RecordModifier recordModifier = row -> {
if (row.getValue(D2).equals(0)) {
row.putField(D2, Integer.MAX_VALUE);
row.putValue(D2, Integer.MAX_VALUE);
return true;
} else {
return false;
}
};

SegmentPurger segmentPurger =
new SegmentPurger(TABLE_NAME, _originalIndexDir, PURGED_SEGMENT_DIR, recordPurger, recordModifier);
new SegmentPurger(_originalIndexDir, PURGED_SEGMENT_DIR, _tableConfig, recordPurger, recordModifier);
File purgedIndexDir = segmentPurger.purgeSegment();

// Check the purge/modify counter in segment purger
Expand Down