Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add row based schema validation code to detect schema mismatch #5984

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -586,22 +586,32 @@ public boolean isSingleValue() {

public PinotDataType getSingleValueType() {
switch (this) {
case BYTE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this PR is just adding a check, why does it need to modify the functionality?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is to get the single value type for single value type. This method should return the same single value type for single value type itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this method would be called in https://github.com/apache/incubator-pinot/blob/2cfaed37cf581362b87a36e924cdd5744d430e03/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java#L112
If the single value type between source and dest are the same, the data type are the same. E.g. if source is string_array and dest is string, the data type is the same, even though we should mark the flag of single-value multi-value mismatch.

case BYTES:
case BYTE_ARRAY:
return BYTE;
case CHARACTER:
case CHARACTER_ARRAY:
return CHARACTER;
case SHORT:
case SHORT_ARRAY:
return SHORT;
case INTEGER:
case INTEGER_ARRAY:
return INTEGER;
case LONG:
case LONG_ARRAY:
return LONG;
case FLOAT:
case FLOAT_ARRAY:
return FLOAT;
case DOUBLE:
case DOUBLE_ARRAY:
return DOUBLE;
case STRING:
case STRING_ARRAY:
return STRING;
case OBJECT:
case OBJECT_ARRAY:
return OBJECT;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.PinotDataType;
import org.apache.pinot.spi.data.FieldSpec;
Expand Down Expand Up @@ -85,7 +87,7 @@ public GenericRow transform(GenericRow record) {
continue;
}
PinotDataType dest = entry.getValue();
value = standardize(column, value, dest.isSingleValue());
value = standardize(record, column, value, dest.isSingleValue());
// NOTE: The standardized value could be null for empty Collection/Map/Object[].
if (value == null) {
record.putValue(column, null);
Expand All @@ -109,6 +111,9 @@ public GenericRow transform(GenericRow record) {
}
}
if (source != dest) {
if (source.getSingleValueType() != dest.getSingleValueType()) {
putValueAsSetToKey(record, GenericRow.DATA_TYPE_MISMATCH_KEY, column);
}
value = dest.convert(value, source);
}

Expand All @@ -127,28 +132,39 @@ public GenericRow transform(GenericRow record) {
*/
@VisibleForTesting
@Nullable
static Object standardize(String column, @Nullable Object value, boolean isSingleValue) {
static Object standardize(GenericRow record, String column, @Nullable Object value, boolean isSingleValue) {
return standardize(record, column, value, isSingleValue, 1);
}

static Object standardize(GenericRow record, String column, @Nullable Object value, boolean isSingleValue, int level) {
if (value == null) {
return null;
}
// If it's single-value column and the value is Collection/Map/Object[], mark the key.
if (value instanceof Collection) {
return standardizeCollection(column, (Collection) value, isSingleValue);
return standardizeCollection(record, column, (Collection) value, isSingleValue, level);
}
if (value instanceof Map) {
return standardizeCollection(column, ((Map) value).values(), isSingleValue);
// If it's a map structure, mark the key.
putValueAsSetToKey(record, GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY, column);
Collection values = ((Map) value).values();
return standardizeCollection(record, column, values, isSingleValue, level);
}
if (value instanceof Object[]) {
if (isSingleValue && level == 1) {
putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column);
}
Object[] values = (Object[]) value;
int numValues = values.length;
if (numValues == 0) {
return null;
}
if (numValues == 1) {
return standardize(column, values[0], isSingleValue);
return standardize(record, column, values[0], isSingleValue, level + 1);
}
List<Object> standardizedValues = new ArrayList<>(numValues);
for (Object singleValue : values) {
Object standardizedValue = standardize(column, singleValue, true);
Object standardizedValue = standardize(record, column, singleValue, true, level + 1);
if (standardizedValue != null) {
standardizedValues.add(standardizedValue);
}
Expand All @@ -164,20 +180,27 @@ static Object standardize(String column, @Nullable Object value, boolean isSingl
Arrays.toString(values), column);
return standardizedValues.toArray();
}
// If it's multi-value column and the level is 1, mark the key.
if (!isSingleValue && level == 1) {
putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column);
}
return value;
}

private static Object standardizeCollection(String column, Collection collection, boolean isSingleValue) {
private static Object standardizeCollection(GenericRow record, String column, Collection collection, boolean isSingleValue, int level) {
if (isSingleValue && level == 1) {
putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column);
}
int numValues = collection.size();
if (numValues == 0) {
return null;
}
if (numValues == 1) {
return standardize(column, collection.iterator().next(), isSingleValue);
return standardize(record, column, collection.iterator().next(), isSingleValue, level + 1);
}
List<Object> standardizedValues = new ArrayList<>(numValues);
for (Object singleValue : collection) {
Object standardizedValue = standardize(column, singleValue, true);
Object standardizedValue = standardize(record, column, singleValue, true, level + 1);
if (standardizedValue != null) {
standardizedValues.add(standardizedValue);
}
Expand All @@ -193,4 +216,13 @@ private static Object standardizeCollection(String column, Collection collection
.checkState(!isSingleValue, "Cannot read single-value from Collection: %s for column: %s", collection, column);
return standardizedValues.toArray();
}

private static void putValueAsSetToKey(GenericRow record, String key, String value) {
Set<String> valueSet = (Set) record.getValue(key);
if (valueSet == null) {
valueSet = new HashSet<>();
record.putValue(key, valueSet);
}
valueSet.add(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.Map;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.pinot.spi.data.IngestionSchemaValidator;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
Expand All @@ -43,7 +44,8 @@ public interface SegmentCreator extends Closeable {
* @throws Exception
*/
void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo,
Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir)
Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir,
IngestionSchemaValidator ingestionSchemaValidator)
throws Exception;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.FieldSpec.FieldType;
import org.apache.pinot.spi.data.IngestionSchemaValidator;
import org.apache.pinot.spi.data.RowBasedSchemaValidationResults;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.TimeUtils;
Expand All @@ -79,6 +81,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
// TODO Refactor class name to match interface name
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentColumnarIndexCreator.class);
private SegmentGeneratorConfig config;
private IngestionSchemaValidator _ingestionSchemaValidator;
private Map<String, ColumnIndexCreationInfo> indexCreationInfoMap;
private Map<String, SegmentDictionaryCreator> _dictionaryCreatorMap = new HashMap<>();
private Map<String, ForwardIndexCreator> _forwardIndexCreatorMap = new HashMap<>();
Expand All @@ -96,11 +99,13 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {

@Override
public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo,
Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir)
Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir,
IngestionSchemaValidator ingestionSchemaValidator)
throws Exception {
docIdCounter = 0;
config = segmentCreationSpec;
this.indexCreationInfoMap = indexCreationInfoMap;
_ingestionSchemaValidator = ingestionSchemaValidator;

// Check that the output directory does not exist
Preconditions.checkState(!outDir.exists(), "Segment output directory: %s already exists", outDir);
Expand Down Expand Up @@ -304,6 +309,7 @@ private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, SegmentG

@Override
public void indexRow(GenericRow row) {
validateRowBasedSchemas(row);
for (Map.Entry<String, ForwardIndexCreator> entry : _forwardIndexCreatorMap.entrySet()) {
String columnName = entry.getKey();
ForwardIndexCreator forwardIndexCreator = entry.getValue();
Expand Down Expand Up @@ -400,6 +406,7 @@ public void seal()
nullValueVectorCreator.seal();
}
writeMetadata();
gatherRowBasedSchemaValidationResults();
}

private void writeMetadata()
Expand Down Expand Up @@ -558,6 +565,32 @@ public static void addColumnMinMaxValueInfo(PropertiesConfiguration properties,
}
}

private void validateRowBasedSchemas(GenericRow row) {
if (_ingestionSchemaValidator == null) {
return;
}
RowBasedSchemaValidationResults rowBasedSchemaValidationResults = _ingestionSchemaValidator.getRowBasedSchemaValidationResults();

if (row.getValue(GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY) != null) {
Set<String> columns = (Set) row.getValue(GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY);
rowBasedSchemaValidationResults.collectMultiValueStructureMismatchColumns(columns);
}
if (row.getValue(GenericRow.DATA_TYPE_MISMATCH_KEY) != null) {
Set<String> columns = (Set) row.getValue(GenericRow.DATA_TYPE_MISMATCH_KEY);
rowBasedSchemaValidationResults.collectDataTypeMismatchColumns(columns);
}
if (row.getValue(GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY) != null) {
Set<String> columns = (Set) row.getValue(GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY);
rowBasedSchemaValidationResults.collectSingleValueMultiValueFieldMismatchColumns(columns);
}
}

private void gatherRowBasedSchemaValidationResults() {
if (_ingestionSchemaValidator != null) {
_ingestionSchemaValidator.getRowBasedSchemaValidationResults().gatherRowBasedSchemaValidationResults();
}
}

/**
* Helper method to check whether the given value is a valid property value.
* <p>Value is invalid iff:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ public void build()

try {
// Initialize the index creation using the per-column statistics information
indexCreator.init(config, segmentIndexCreationInfo, indexCreationInfoMap, dataSchema, tempIndexDir);
indexCreator.init(config, segmentIndexCreationInfo, indexCreationInfoMap, dataSchema, tempIndexDir,
_ingestionSchemaValidator);

// Build the index
recordReader.rewind();
Expand Down