Skip to content

Commit

Permalink
Add row based schema validation code to detect schema mismatch
Browse files Browse the repository at this point in the history
  • Loading branch information
Jack Li(Analytics Engineering) committed Sep 6, 2020
1 parent 8a31bf7 commit 57acbde
Show file tree
Hide file tree
Showing 16 changed files with 477 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -586,22 +586,32 @@ public boolean isSingleValue() {

public PinotDataType getSingleValueType() {
switch (this) {
case BYTE:
case BYTES:
case BYTE_ARRAY:
return BYTE;
case CHARACTER:
case CHARACTER_ARRAY:
return CHARACTER;
case SHORT:
case SHORT_ARRAY:
return SHORT;
case INTEGER:
case INTEGER_ARRAY:
return INTEGER;
case LONG:
case LONG_ARRAY:
return LONG;
case FLOAT:
case FLOAT_ARRAY:
return FLOAT;
case DOUBLE:
case DOUBLE_ARRAY:
return DOUBLE;
case STRING:
case STRING_ARRAY:
return STRING;
case OBJECT:
case OBJECT_ARRAY:
return OBJECT;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.PinotDataType;
import org.apache.pinot.spi.data.FieldSpec;
Expand Down Expand Up @@ -85,7 +87,7 @@ public GenericRow transform(GenericRow record) {
continue;
}
PinotDataType dest = entry.getValue();
value = standardize(column, value, dest.isSingleValue());
value = standardize(record, column, value, dest.isSingleValue());
// NOTE: The standardized value could be null for empty Collection/Map/Object[].
if (value == null) {
record.putValue(column, null);
Expand All @@ -109,6 +111,9 @@ public GenericRow transform(GenericRow record) {
}
}
if (source != dest) {
if (source.getSingleValueType() != dest.getSingleValueType()) {
putValueAsSetToKey(record, GenericRow.DATA_TYPE_MISMATCH_KEY, column);
}
value = dest.convert(value, source);
}

Expand All @@ -127,28 +132,39 @@ public GenericRow transform(GenericRow record) {
*/
@VisibleForTesting
@Nullable
static Object standardize(String column, @Nullable Object value, boolean isSingleValue) {
static Object standardize(GenericRow record, String column, @Nullable Object value, boolean isSingleValue) {
return standardize(record, column, value, isSingleValue, 1);
}

static Object standardize(GenericRow record, String column, @Nullable Object value, boolean isSingleValue, int level) {
if (value == null) {
return null;
}
// If it's single-value column and the value is Collection/Map/Object[], mark the key.
if (value instanceof Collection) {
return standardizeCollection(column, (Collection) value, isSingleValue);
return standardizeCollection(record, column, (Collection) value, isSingleValue, level);
}
if (value instanceof Map) {
return standardizeCollection(column, ((Map) value).values(), isSingleValue);
// If it's a map structure, mark the key.
putValueAsSetToKey(record, GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY, column);
Collection values = ((Map) value).values();
return standardizeCollection(record, column, values, isSingleValue, level);
}
if (value instanceof Object[]) {
if (isSingleValue && level == 1) {
putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column);
}
Object[] values = (Object[]) value;
int numValues = values.length;
if (numValues == 0) {
return null;
}
if (numValues == 1) {
return standardize(column, values[0], isSingleValue);
return standardize(record, column, values[0], isSingleValue, level + 1);
}
List<Object> standardizedValues = new ArrayList<>(numValues);
for (Object singleValue : values) {
Object standardizedValue = standardize(column, singleValue, true);
Object standardizedValue = standardize(record, column, singleValue, true, level + 1);
if (standardizedValue != null) {
standardizedValues.add(standardizedValue);
}
Expand All @@ -164,20 +180,27 @@ static Object standardize(String column, @Nullable Object value, boolean isSingl
Arrays.toString(values), column);
return standardizedValues.toArray();
}
// If it's multi-value column and the level is 1, mark the key.
if (!isSingleValue && level == 1) {
putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column);
}
return value;
}

private static Object standardizeCollection(String column, Collection collection, boolean isSingleValue) {
private static Object standardizeCollection(GenericRow record, String column, Collection collection, boolean isSingleValue, int level) {
if (isSingleValue && level == 1) {
putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column);
}
int numValues = collection.size();
if (numValues == 0) {
return null;
}
if (numValues == 1) {
return standardize(column, collection.iterator().next(), isSingleValue);
return standardize(record, column, collection.iterator().next(), isSingleValue, level + 1);
}
List<Object> standardizedValues = new ArrayList<>(numValues);
for (Object singleValue : collection) {
Object standardizedValue = standardize(column, singleValue, true);
Object standardizedValue = standardize(record, column, singleValue, true, level + 1);
if (standardizedValue != null) {
standardizedValues.add(standardizedValue);
}
Expand All @@ -193,4 +216,13 @@ private static Object standardizeCollection(String column, Collection collection
.checkState(!isSingleValue, "Cannot read single-value from Collection: %s for column: %s", collection, column);
return standardizedValues.toArray();
}

private static void putValueAsSetToKey(GenericRow record, String key, String value) {
Set<String> valueSet = (Set) record.getValue(key);
if (valueSet == null) {
valueSet = new HashSet<>();
record.putValue(key, valueSet);
}
valueSet.add(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.Map;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.pinot.spi.data.IngestionSchemaValidator;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
Expand All @@ -43,7 +44,8 @@ public interface SegmentCreator extends Closeable {
* @throws Exception
*/
void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo,
Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir)
Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir,
IngestionSchemaValidator ingestionSchemaValidator)
throws Exception;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.FieldSpec.FieldType;
import org.apache.pinot.spi.data.IngestionSchemaValidator;
import org.apache.pinot.spi.data.RowBasedSchemaValidationResults;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.TimeUtils;
Expand All @@ -79,6 +81,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
// TODO Refactor class name to match interface name
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentColumnarIndexCreator.class);
private SegmentGeneratorConfig config;
private IngestionSchemaValidator _ingestionSchemaValidator;
private Map<String, ColumnIndexCreationInfo> indexCreationInfoMap;
private Map<String, SegmentDictionaryCreator> _dictionaryCreatorMap = new HashMap<>();
private Map<String, ForwardIndexCreator> _forwardIndexCreatorMap = new HashMap<>();
Expand All @@ -96,11 +99,13 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {

@Override
public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo,
Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir)
Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir,
IngestionSchemaValidator ingestionSchemaValidator)
throws Exception {
docIdCounter = 0;
config = segmentCreationSpec;
this.indexCreationInfoMap = indexCreationInfoMap;
_ingestionSchemaValidator = ingestionSchemaValidator;

// Check that the output directory does not exist
Preconditions.checkState(!outDir.exists(), "Segment output directory: %s already exists", outDir);
Expand Down Expand Up @@ -304,6 +309,7 @@ private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, SegmentG

@Override
public void indexRow(GenericRow row) {
validateRowBasedSchemas(row);
for (Map.Entry<String, ForwardIndexCreator> entry : _forwardIndexCreatorMap.entrySet()) {
String columnName = entry.getKey();
ForwardIndexCreator forwardIndexCreator = entry.getValue();
Expand Down Expand Up @@ -400,6 +406,7 @@ public void seal()
nullValueVectorCreator.seal();
}
writeMetadata();
gatherRowBasedSchemaValidationResults();
}

private void writeMetadata()
Expand Down Expand Up @@ -558,6 +565,32 @@ public static void addColumnMinMaxValueInfo(PropertiesConfiguration properties,
}
}

private void validateRowBasedSchemas(GenericRow row) {
if (_ingestionSchemaValidator == null) {
return;
}
RowBasedSchemaValidationResults rowBasedSchemaValidationResults = _ingestionSchemaValidator.getRowBasedSchemaValidationResults();

if (row.getValue(GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY) != null) {
Set<String> columns = (Set) row.getValue(GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY);
rowBasedSchemaValidationResults.collectMultiValueStructureMismatchColumns(columns);
}
if (row.getValue(GenericRow.DATA_TYPE_MISMATCH_KEY) != null) {
Set<String> columns = (Set) row.getValue(GenericRow.DATA_TYPE_MISMATCH_KEY);
rowBasedSchemaValidationResults.collectDataTypeMismatchColumns(columns);
}
if (row.getValue(GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY) != null) {
Set<String> columns = (Set) row.getValue(GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY);
rowBasedSchemaValidationResults.collectSingleValueMultiValueFieldMismatchColumns(columns);
}
}

private void gatherRowBasedSchemaValidationResults() {
if (_ingestionSchemaValidator != null) {
_ingestionSchemaValidator.getRowBasedSchemaValidationResults().gatherRowBasedSchemaValidationResults();
}
}

/**
* Helper method to check whether the given value is a valid property value.
* <p>Value is invalid iff:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ public void build()

try {
// Initialize the index creation using the per-column statistics information
indexCreator.init(config, segmentIndexCreationInfo, indexCreationInfoMap, dataSchema, tempIndexDir);
indexCreator.init(config, segmentIndexCreationInfo, indexCreationInfoMap, dataSchema, tempIndexDir,
_ingestionSchemaValidator);

// Build the index
recordReader.rewind();
Expand Down
Loading

0 comments on commit 57acbde

Please sign in to comment.