diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java index 97c017eca98..3636d4c6efb 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java @@ -586,22 +586,32 @@ public boolean isSingleValue() { public PinotDataType getSingleValueType() { switch (this) { + case BYTE: + case BYTES: case BYTE_ARRAY: return BYTE; + case CHARACTER: case CHARACTER_ARRAY: return CHARACTER; + case SHORT: case SHORT_ARRAY: return SHORT; + case INTEGER: case INTEGER_ARRAY: return INTEGER; + case LONG: case LONG_ARRAY: return LONG; + case FLOAT: case FLOAT_ARRAY: return FLOAT; + case DOUBLE: case DOUBLE_ARRAY: return DOUBLE; + case STRING: case STRING_ARRAY: return STRING; + case OBJECT: case OBJECT_ARRAY: return OBJECT; default: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java index 4ab665cf170..a8bee3e6776 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java @@ -24,8 +24,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import javax.annotation.Nullable; import org.apache.pinot.common.utils.PinotDataType; import org.apache.pinot.spi.data.FieldSpec; @@ -85,7 +87,7 @@ public GenericRow transform(GenericRow record) { continue; } PinotDataType dest = entry.getValue(); - value = standardize(column, value, dest.isSingleValue()); + value = standardize(record, column, value, dest.isSingleValue()); // NOTE: The standardized value could be null for empty Collection/Map/Object[]. if (value == null) { record.putValue(column, null); @@ -109,6 +111,9 @@ public GenericRow transform(GenericRow record) { } } if (source != dest) { + if (source.getSingleValueType() != dest.getSingleValueType()) { + putValueAsSetToKey(record, GenericRow.DATA_TYPE_MISMATCH_KEY, column); + } value = dest.convert(value, source); } @@ -127,28 +132,39 @@ public GenericRow transform(GenericRow record) { */ @VisibleForTesting @Nullable - static Object standardize(String column, @Nullable Object value, boolean isSingleValue) { + static Object standardize(GenericRow record, String column, @Nullable Object value, boolean isSingleValue) { + return standardize(record, column, value, isSingleValue, 1); + } + + static Object standardize(GenericRow record, String column, @Nullable Object value, boolean isSingleValue, int level) { if (value == null) { return null; } + // If it's single-value column and the value is Collection/Map/Object[], mark the key. if (value instanceof Collection) { - return standardizeCollection(column, (Collection) value, isSingleValue); + return standardizeCollection(record, column, (Collection) value, isSingleValue, level); } if (value instanceof Map) { - return standardizeCollection(column, ((Map) value).values(), isSingleValue); + // If it's a map structure, mark the key. + putValueAsSetToKey(record, GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY, column); + Collection values = ((Map) value).values(); + return standardizeCollection(record, column, values, isSingleValue, level); } if (value instanceof Object[]) { + if (isSingleValue && level == 1) { + putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column); + } Object[] values = (Object[]) value; int numValues = values.length; if (numValues == 0) { return null; } if (numValues == 1) { - return standardize(column, values[0], isSingleValue); + return standardize(record, column, values[0], isSingleValue, level + 1); } List standardizedValues = new ArrayList<>(numValues); for (Object singleValue : values) { - Object standardizedValue = standardize(column, singleValue, true); + Object standardizedValue = standardize(record, column, singleValue, true, level + 1); if (standardizedValue != null) { standardizedValues.add(standardizedValue); } @@ -164,20 +180,27 @@ static Object standardize(String column, @Nullable Object value, boolean isSingl Arrays.toString(values), column); return standardizedValues.toArray(); } + // If it's multi-value column and the level is 1, mark the key. + if (!isSingleValue && level == 1) { + putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column); + } return value; } - private static Object standardizeCollection(String column, Collection collection, boolean isSingleValue) { + private static Object standardizeCollection(GenericRow record, String column, Collection collection, boolean isSingleValue, int level) { + if (isSingleValue && level == 1) { + putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column); + } int numValues = collection.size(); if (numValues == 0) { return null; } if (numValues == 1) { - return standardize(column, collection.iterator().next(), isSingleValue); + return standardize(record, column, collection.iterator().next(), isSingleValue, level + 1); } List standardizedValues = new ArrayList<>(numValues); for (Object singleValue : collection) { - Object standardizedValue = standardize(column, singleValue, true); + Object standardizedValue = standardize(record, column, singleValue, true, level + 1); if (standardizedValue != null) { standardizedValues.add(standardizedValue); } @@ -193,4 +216,13 @@ private static Object standardizeCollection(String column, Collection collection .checkState(!isSingleValue, "Cannot read single-value from Collection: %s for column: %s", collection, column); return standardizedValues.toArray(); } + + private static void putValueAsSetToKey(GenericRow record, String key, String value) { + Set valueSet = (Set) record.getValue(key); + if (valueSet == null) { + valueSet = new HashSet<>(); + record.putValue(key, valueSet); + } + valueSet.add(value); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java index 2ba6246b5b2..adf6141c387 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Map; import org.apache.commons.configuration.ConfigurationException; +import org.apache.pinot.spi.data.IngestionSchemaValidator; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; @@ -43,7 +44,8 @@ public interface SegmentCreator extends Closeable { * @throws Exception */ void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo, - Map indexCreationInfoMap, Schema schema, File outDir) + Map indexCreationInfoMap, Schema schema, File outDir, + IngestionSchemaValidator ingestionSchemaValidator) throws Exception; /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java index 4489dc82ea6..2ab5427d6ad 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java @@ -57,6 +57,8 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.FieldSpec.FieldType; +import org.apache.pinot.spi.data.IngestionSchemaValidator; +import org.apache.pinot.spi.data.RowBasedSchemaValidationResults; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.utils.TimeUtils; @@ -79,6 +81,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { // TODO Refactor class name to match interface name private static final Logger LOGGER = LoggerFactory.getLogger(SegmentColumnarIndexCreator.class); private SegmentGeneratorConfig config; + private IngestionSchemaValidator _ingestionSchemaValidator; private Map indexCreationInfoMap; private Map _dictionaryCreatorMap = new HashMap<>(); private Map _forwardIndexCreatorMap = new HashMap<>(); @@ -96,11 +99,13 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { @Override public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo, - Map indexCreationInfoMap, Schema schema, File outDir) + Map indexCreationInfoMap, Schema schema, File outDir, + IngestionSchemaValidator ingestionSchemaValidator) throws Exception { docIdCounter = 0; config = segmentCreationSpec; this.indexCreationInfoMap = indexCreationInfoMap; + _ingestionSchemaValidator = ingestionSchemaValidator; // Check that the output directory does not exist Preconditions.checkState(!outDir.exists(), "Segment output directory: %s already exists", outDir); @@ -304,6 +309,7 @@ private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, SegmentG @Override public void indexRow(GenericRow row) { + validateRowBasedSchemas(row); for (Map.Entry entry : _forwardIndexCreatorMap.entrySet()) { String columnName = entry.getKey(); ForwardIndexCreator forwardIndexCreator = entry.getValue(); @@ -400,6 +406,7 @@ public void seal() nullValueVectorCreator.seal(); } writeMetadata(); + gatherRowBasedSchemaValidationResults(); } private void writeMetadata() @@ -558,6 +565,32 @@ public static void addColumnMinMaxValueInfo(PropertiesConfiguration properties, } } + private void validateRowBasedSchemas(GenericRow row) { + if (_ingestionSchemaValidator == null) { + return; + } + RowBasedSchemaValidationResults rowBasedSchemaValidationResults = _ingestionSchemaValidator.getRowBasedSchemaValidationResults(); + + if (row.getValue(GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY) != null) { + Set columns = (Set) row.getValue(GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY); + rowBasedSchemaValidationResults.collectMultiValueStructureMismatchColumns(columns); + } + if (row.getValue(GenericRow.DATA_TYPE_MISMATCH_KEY) != null) { + Set columns = (Set) row.getValue(GenericRow.DATA_TYPE_MISMATCH_KEY); + rowBasedSchemaValidationResults.collectDataTypeMismatchColumns(columns); + } + if (row.getValue(GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY) != null) { + Set columns = (Set) row.getValue(GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY); + rowBasedSchemaValidationResults.collectSingleValueMultiValueFieldMismatchColumns(columns); + } + } + + private void gatherRowBasedSchemaValidationResults() { + if (_ingestionSchemaValidator != null) { + _ingestionSchemaValidator.getRowBasedSchemaValidationResults().gatherRowBasedSchemaValidationResults(); + } + } + /** * Helper method to check whether the given value is a valid property value. *

Value is invalid iff: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java index cabdc47a8ae..e5a4b844043 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -187,7 +187,8 @@ public void build() try { // Initialize the index creation using the per-column statistics information - indexCreator.init(config, segmentIndexCreationInfo, indexCreationInfoMap, dataSchema, tempIndexDir); + indexCreator.init(config, segmentIndexCreationInfo, indexCreationInfoMap, dataSchema, tempIndexDir, + _ingestionSchemaValidator); // Build the index recordReader.rewind(); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java index a60c460ea0b..be4ad8dc9b8 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.pinot.spi.data.readers.GenericRow; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -39,17 +40,18 @@ public void testStandardize() { /** * Tests for Map */ + GenericRow record = new GenericRow(); // Empty Map Map map = Collections.emptyMap(); - assertNull(DataTypeTransformer.standardize(COLUMN, map, true)); - assertNull(DataTypeTransformer.standardize(COLUMN, map, false)); + assertNull(DataTypeTransformer.standardize(record, COLUMN, map, true)); + assertNull(DataTypeTransformer.standardize(record, COLUMN, map, false)); // Map with single entry String expectedValue = "testValue"; map = Collections.singletonMap("testKey", expectedValue); - assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), expectedValue); - assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, true), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValue); // Map with multiple entries Object[] expectedValues = new Object[]{"testValue1", "testValue2"}; @@ -58,12 +60,12 @@ public void testStandardize() { map.put("testKey2", "testValue2"); try { // Should fail because Map with multiple entries cannot be standardized as single value - DataTypeTransformer.standardize(COLUMN, map, true); + DataTypeTransformer.standardize(record, COLUMN, map, true); fail(); } catch (Exception e) { // Expected } - assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, map, false), expectedValues); + assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValues); /** * Tests for List @@ -71,24 +73,24 @@ public void testStandardize() { // Empty List List list = Collections.emptyList(); - assertNull(DataTypeTransformer.standardize(COLUMN, list, true)); - assertNull(DataTypeTransformer.standardize(COLUMN, list, false)); + assertNull(DataTypeTransformer.standardize(record, COLUMN, list, true)); + assertNull(DataTypeTransformer.standardize(record, COLUMN, list, false)); // List with single entry list = Collections.singletonList(expectedValue); - assertEquals(DataTypeTransformer.standardize(COLUMN, list, true), expectedValue); - assertEquals(DataTypeTransformer.standardize(COLUMN, list, false), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, list, true), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, list, false), expectedValue); // List with multiple entries list = Arrays.asList(expectedValues); try { // Should fail because List with multiple entries cannot be standardized as single value - DataTypeTransformer.standardize(COLUMN, list, true); + DataTypeTransformer.standardize(record, COLUMN, list, true); fail(); } catch (Exception e) { // Expected } - assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, list, false), expectedValues); + assertEquals((Object[]) DataTypeTransformer.standardize(record, COLUMN, list, false), expectedValues); /** * Tests for Object[] @@ -96,24 +98,24 @@ public void testStandardize() { // Empty Object[] Object[] values = new Object[0]; - assertNull(DataTypeTransformer.standardize(COLUMN, values, true)); - assertNull(DataTypeTransformer.standardize(COLUMN, values, false)); + assertNull(DataTypeTransformer.standardize(record, COLUMN, values, true)); + assertNull(DataTypeTransformer.standardize(record, COLUMN, values, false)); // Object[] with single entry values = new Object[]{expectedValue}; - assertEquals(DataTypeTransformer.standardize(COLUMN, values, true), expectedValue); - assertEquals(DataTypeTransformer.standardize(COLUMN, values, false), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, values, true), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValue); // Object[] with multiple entries values = new Object[]{"testValue1", "testValue2"}; try { // Should fail because Object[] with multiple entries cannot be standardized as single value - DataTypeTransformer.standardize(COLUMN, values, true); + DataTypeTransformer.standardize(record, COLUMN, values, true); fail(); } catch (Exception e) { // Expected } - assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, values, false), expectedValues); + assertEquals((Object[]) DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValues); /** * Tests for nested Map/List/Object[] @@ -121,32 +123,32 @@ public void testStandardize() { // Map with empty List map = Collections.singletonMap("testKey", Collections.emptyList()); - assertNull(DataTypeTransformer.standardize(COLUMN, map, true)); - assertNull(DataTypeTransformer.standardize(COLUMN, map, false)); + assertNull(DataTypeTransformer.standardize(record, COLUMN, map, true)); + assertNull(DataTypeTransformer.standardize(record, COLUMN, map, false)); // Map with single-entry List map = Collections.singletonMap("testKey", Collections.singletonList(expectedValue)); - assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), expectedValue); - assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, true), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValue); // Map with one empty Map and one single-entry Map map = new HashMap<>(); map.put("testKey1", Collections.emptyMap()); map.put("testKey2", Collections.singletonMap("testKey", expectedValue)); // Can be standardized into single value because empty Map should be ignored - assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), expectedValue); - assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, true), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValue); // Map with multi-entries List map = Collections.singletonMap("testKey", Arrays.asList(expectedValues)); try { // Should fail because Map with multiple entries cannot be standardized as single value - DataTypeTransformer.standardize(COLUMN, map, true); + DataTypeTransformer.standardize(record, COLUMN, map, true); fail(); } catch (Exception e) { // Expected } - assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, map, false), expectedValues); + assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValues); // Map with one empty Map, one single-entry List and one single-entry Object[] map = new HashMap<>(); @@ -155,12 +157,12 @@ public void testStandardize() { map.put("testKey3", new Object[]{"testValue2"}); try { // Should fail because Map with multiple entries cannot be standardized as single value - DataTypeTransformer.standardize(COLUMN, map, true); + DataTypeTransformer.standardize(record, COLUMN, map, true); fail(); } catch (Exception e) { // Expected } - assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, map, false), expectedValues); + assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValues); // List with two single-entry Maps and one empty Map list = Arrays @@ -168,35 +170,35 @@ public void testStandardize() { Collections.emptyMap()); try { // Should fail because List with multiple entries cannot be standardized as single value - DataTypeTransformer.standardize(COLUMN, list, true); + DataTypeTransformer.standardize(record, COLUMN, list, true); fail(); } catch (Exception e) { // Expected } - assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, list, false), expectedValues); + assertEquals((Object[]) DataTypeTransformer.standardize(record, COLUMN, list, false), expectedValues); // Object[] with two single-entry Maps values = new Object[]{Collections.singletonMap("testKey", "testValue1"), Collections.singletonMap("testKey", "testValue2")}; try { // Should fail because Object[] with multiple entries cannot be standardized as single value - DataTypeTransformer.standardize(COLUMN, values, true); + DataTypeTransformer.standardize(record, COLUMN, values, true); fail(); } catch (Exception e) { // Expected } - assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, values, false), expectedValues); + assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValues); // Object[] with one empty Object[], one multi-entries List of nested Map/List/Object[] values = new Object[]{new Object[0], Collections.singletonList( Collections.singletonMap("testKey", "testValue1")), Collections.singletonMap("testKey", Arrays.asList(new Object[]{"testValue2"}, Collections.emptyMap()))}; try { - DataTypeTransformer.standardize(COLUMN, values, true); + DataTypeTransformer.standardize(record, COLUMN, values, true); fail(); } catch (Exception e) { // Expected } - assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, values, false), expectedValues); + assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValues); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java index c28a2d87861..994f684f82b 100644 --- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java @@ -93,10 +93,17 @@ public void setUp() driver.init(config); driver.build(); IngestionSchemaValidator ingestionSchemaValidator = driver.getIngestionSchemaValidator(); - Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected()); - Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()); - Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected()); - Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected()); + + Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult() + .isMismatchDetected()); + Assert.assertFalse( + ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult() + .isMismatchDetected()); + Assert.assertFalse( + ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult() + .isMismatchDetected()); + Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult() + .isMismatchDetected()); _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, driver.getSegmentName()), ReadMode.mmap)); _segmentNames.add(driver.getSegmentName()); } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java index fcc5653e620..46c76375c62 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java @@ -52,8 +52,10 @@ import org.apache.pinot.spi.config.table.TableCustomConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.RowBasedSchemaValidationResults; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.IngestionSchemaValidator; +import org.apache.pinot.spi.data.SchemaValidationResults; import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.data.readers.RecordReaderConfig; import org.apache.pinot.spi.utils.DataSizeUtils; @@ -91,10 +93,13 @@ public class SegmentCreationMapper extends Mapper= org.apache.avro.Schema.Type.STRING.ordinal()) { // the column is a complex structure - _singleValueMultiValueFieldMismatch.addMismatchReason(String.format( - "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is 'single-value' column.", - columnName, avroColumnSchema.getName(), getInputSchemaType())); + _fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().addMismatchReason(String + .format( + "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is 'single-value' column.", + columnName, avroColumnName, getInputSchemaType())); } + // check multi-value column structure mismatch if (avroColumnType != org.apache.avro.Schema.Type.ARRAY) { // multi-value column should use array structure for now. - _multiValueStructureMismatch.addMismatchReason(String.format( + _fileBasedSchemaValidationResults.getMultiValueStructureMismatchResult().addMismatchReason(String.format( "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is of '%s' type, which should have been of 'array' type.", - columnName, avroColumnSchema.getName(), getInputSchemaType(), avroColumnType.getName())); + columnName, avroColumnName, getInputSchemaType(), avroColumnType.getName())); + } else if (avroColumnSchema.getElementType().getType().ordinal() < org.apache.avro.Schema.Type.STRING + .ordinal()) { + // even though the column schema is of array type, the element type of that array could be of complex type like array, map, etc. + _fileBasedSchemaValidationResults.getMultiValueStructureMismatchResult().addMismatchReason(String.format( + "The Pinot column: %s is 'multi-value' column and it's of 'array' type in input %s schema, but the element type is of '%s' type, which should have been of 'primitive' type.", + columnName, getInputSchemaType(), avroColumnSchema.getElementType().getType())); } } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java index 045327ac9e3..67a3da9d331 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java @@ -28,11 +28,7 @@ public interface IngestionSchemaValidator { String getInputSchemaType(); - SchemaValidatorResult getDataTypeMismatchResult(); + SchemaValidationResults getFileBasedSchemaValidationResults(); - SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult(); - - SchemaValidatorResult getMultiValueStructureMismatchResult(); - - SchemaValidatorResult getMissingPinotColumnResult(); + RowBasedSchemaValidationResults getRowBasedSchemaValidationResults(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/RowBasedSchemaValidationResults.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/RowBasedSchemaValidationResults.java new file mode 100644 index 00000000000..655275a1da9 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/RowBasedSchemaValidationResults.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.data; + +import java.util.HashSet; +import java.util.Set; + + +/** + * This is the extension class on top of {@code SchemaValidationResults} class, since row based schema validation will + * be called much more frequently than the file base schema validation. We collect all the mismatch columns into the hash + * set and then generate the mismatch message all at once. + */ +public class RowBasedSchemaValidationResults extends SchemaValidationResults { + + private Set _dataTypeMismatchColumns = new HashSet<>(); + private Set _singleValueMultiValueFieldMismatchColumns = new HashSet<>(); + private Set _multiValueStructureMismatchColumns = new HashSet<>(); + + public void collectDataTypeMismatchColumns(Set columns) { + _dataTypeMismatchColumns.addAll(columns); + } + + public void collectSingleValueMultiValueFieldMismatchColumns(Set columns) { + _singleValueMultiValueFieldMismatchColumns.addAll(columns); + } + + public void collectMultiValueStructureMismatchColumns(Set columns) { + _multiValueStructureMismatchColumns.addAll(columns); + } + + public void gatherRowBasedSchemaValidationResults() { + if (!_dataTypeMismatchColumns.isEmpty()) { + _dataTypeMismatch.addMismatchReason(String.format("Found data type mismatch from the following Pinot columns: %s", + _dataTypeMismatchColumns.toString())); + } + if (!_singleValueMultiValueFieldMismatchColumns.isEmpty()) { + _singleValueMultiValueFieldMismatch.addMismatchReason(String + .format("Found single-value multi-value field mismatch from the following Pinot columns: %s", + _singleValueMultiValueFieldMismatchColumns.toString())); + } + if (!_multiValueStructureMismatchColumns.isEmpty()) { + _multiValueStructureMismatch.addMismatchReason(String + .format("Found multi-value structure mismatch from the following Pinot columns: %s", + _multiValueStructureMismatchColumns.toString())); + } + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/SchemaValidationResults.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/SchemaValidationResults.java new file mode 100644 index 00000000000..2d793c0fca1 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/SchemaValidationResults.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.data; + +public class SchemaValidationResults { + SchemaValidatorResult _dataTypeMismatch = new SchemaValidatorResult(); + SchemaValidatorResult _singleValueMultiValueFieldMismatch = new SchemaValidatorResult(); + SchemaValidatorResult _multiValueStructureMismatch = new SchemaValidatorResult(); + SchemaValidatorResult _missingPinotColumnResult = new SchemaValidatorResult(); + + public SchemaValidatorResult getDataTypeMismatchResult() { + return _dataTypeMismatch; + } + + public SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult() { + return _singleValueMultiValueFieldMismatch; + } + + public SchemaValidatorResult getMultiValueStructureMismatchResult() { + return _multiValueStructureMismatch; + } + + public SchemaValidatorResult getMissingPinotColumnResult() { + return _missingPinotColumnResult; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java index 5c45d6baa18..fc3f9a356da 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java @@ -61,6 +61,24 @@ public class GenericRow { */ public static final String SKIP_RECORD_KEY = "$SKIP_RECORD_KEY$"; + /** + * This key is used to identify whether there is data type mismatch so that it requires a data type conversion. + * E.g. the Pinot column is of int type, whereas the input column is of long type. + */ + public static final String DATA_TYPE_MISMATCH_KEY = "$DATA_TYPE_MISMATCH_KEY$"; + + /** + * This key is used to identify whether the input value is a map structure for multi-value column. + * This is necessary for us to identify whether there is any existing use case that is leveraging this way to fetch values. + */ + public static final String MULTI_VALUE_STRUCTURE_MISMATCH_KEY = "$MULTI_VALUE_STRUCTURE_MISMATCH_KEY$"; + + /** + * This key is used to identify whether there is a single-value multi-value mismatch. E.g. the Pinot column is single-value, + * whereas the input data is a Collection/Map/object[]. + */ + public static final String SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY = "$SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY$"; + private final Map _fieldToValueMap = new HashMap<>(); private final Set _nullValueFields = new HashSet<>();