diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/catalog/ITCatalogIngestAndQueryTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/catalog/ITCatalogIngestAndQueryTest.java index eff9e2e197b4..3dc3c4f5d90f 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/catalog/ITCatalogIngestAndQueryTest.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/catalog/ITCatalogIngestAndQueryTest.java @@ -20,6 +20,7 @@ package org.apache.druid.testsEx.catalog; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import org.apache.druid.catalog.model.Columns; import org.apache.druid.catalog.model.TableMetadata; @@ -29,14 +30,21 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.sql.SqlTaskStatus; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.sql.http.SqlQuery; import org.apache.druid.testing.utils.DataLoaderHelper; import org.apache.druid.testing.utils.MsqTestQueryHelper; import org.apache.druid.testsEx.cluster.CatalogClient; import org.apache.druid.testsEx.cluster.DruidClusterClient; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.Map; + +import static org.junit.Assert.assertTrue; + /** * Tests that expect succesfully ingestng data into catalog defined tables and querying the data * gives expected results. @@ -445,4 +453,171 @@ public void testInsertWithMultiClusteringFromQuery() throws Exception msqHelper.testQueriesFromFile(queryFile, tableName); } + + /** + * Adding a new column during ingestion that is not defined in a sealed table, should fail with + * proper validation error. Disabling catalog validation, through context parameter, and issuing ingest + * query again, should succeed. + */ + @Test + public void testInsertNonDefinedColumnIntoSealedCatalogTableWithValidationDisabled() throws Exception + { + String queryFile = "/catalog/sealedWithValidationDisabled_select.sql"; + String tableName = "testInsertNonDefinedColumnIntoSealedCatalogTableWithValidationDisabled" + operationName; + TableMetadata table = TableBuilder.datasource(tableName, "P1D") + .column(Columns.TIME_COLUMN, Columns.LONG) + .property(DatasourceDefn.SEALED_PROPERTY, true) + .build(); + + client.createTable(table, true); + LOG.info("table created:\n%s", client.readTable(table.id())); + String queryInline = + StringUtils.format(dmlPrefixPattern, tableName) + "\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " f AS extra\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\"}',\n" + + " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n" + + " )\n" + + ") " + + " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n" + + "PARTITIONED BY DAY\n"; + + LOG.info("Running query:\n%s", queryInline); + SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskWithExpectedStatusCode( + sqlQueryFromString( + queryInline, + ImmutableMap.of() + ), + null, + null, + HttpResponseStatus.BAD_REQUEST + ); + assertTrue(sqlTaskStatus.getError() != null && sqlTaskStatus.getError() + .getUnderlyingException() + .getMessage() + .equals( + StringUtils.format("Column [extra] is not defined in the target table [druid.%s] strict schema", tableName)) + ); + + // Submit the task and wait for the datasource to get loaded + LOG.info("Running query:\n%s", queryInline); + sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully( + queryInline, + ImmutableMap.of(QueryContexts.CATALOG_VALIDATION_ENABLED, false) + ); + + if (sqlTaskStatus.getState().isFailure()) { + Assert.fail(StringUtils.format( + "Unable to start the task successfully.\nPossible exception: %s", + sqlTaskStatus.getError() + )); + } + + msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId()); + dataLoaderHelper.waitUntilDatasourceIsReady(tableName); + + msqHelper.testQueriesFromFile(queryFile, tableName); + } + + /** + * Assigning a column during ingestion, to an input type that is not compatible with the defined type of the + * column, should result in a proper validation error. Disabling catalog validation, through context parameter, and + * issuing ingest query again, should succeed. + * + * In this test we define the table as + *

+ * __time LONG + * double_col DOUBLE + *

+ * And insert the following data: + *

+ * __time, varchar_col1, bigint_col1, float_col1, varchar_col2 + * 2022-12-26T12:34:56,extra,10,"20",2.0,foo + *

+ * even though the data is written + * as + *

+ * 2022-12-26T12:34:56,extra + *

+ * When querying the table with query: 'SELECT * from ##tableName', the data is returned as: + *

+ * __time, double_col + * 2022-12-26T12:34:56,0.0 + *

+ * because the broker knows the double_col column to be a DOUBLE, and so converts to null (0.0) at query time. + */ + @Test + public void testInsertWithIncompatibleTypeAssignmentWithValidationDisabled() throws Exception + { + String tableName = "testInsertWithIncompatibleTypeAssignmentWithValidationDisabled" + operationName; + String queryFile = "/catalog/incompatibleTypeAssignmentWithValidationDisabled_select.sql"; + TableMetadata table = TableBuilder.datasource(tableName, "P1D") + .column(Columns.TIME_COLUMN, Columns.LONG) + .column("double_col", "DOUBLE") + .property(DatasourceDefn.SEALED_PROPERTY, true) + .build(); + + client.createTable(table, true); + LOG.info("table created:\n%s", client.readTable(table.id())); + String queryInline = + StringUtils.format( + "INSERT INTO %s\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS double_col\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\"}',\n" + + " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n" + + " )\n" + + ") " + + " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n" + + "PARTITIONED BY DAY\n", + tableName + ); + + LOG.info("Running query:\n%s", queryInline); + SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskWithExpectedStatusCode( + sqlQueryFromString( + queryInline, + ImmutableMap.of() + ), + null, + null, + HttpResponseStatus.BAD_REQUEST + ); + assertTrue(sqlTaskStatus.getError() != null && sqlTaskStatus.getError() + .getUnderlyingException() + .getMessage() + .equals( + "Cannot assign to target field 'double_col' of type DOUBLE from source field 'double_col' of type VARCHAR (line [4], column [3])") + ); + + // Submit the task and wait for the datasource to get loaded + LOG.info("Running query:\n%s", queryInline); + sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully( + queryInline, + ImmutableMap.of(QueryContexts.CATALOG_VALIDATION_ENABLED, false) + ); + + if (sqlTaskStatus.getState().isFailure()) { + Assert.fail(StringUtils.format( + "Unable to start the task successfully.\nPossible exception: %s", + sqlTaskStatus.getError() + )); + } + + msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId()); + dataLoaderHelper.waitUntilDatasourceIsReady(tableName); + + msqHelper.testQueriesFromFile(queryFile, tableName); + } + + private static SqlQuery sqlQueryFromString(String queryString, Map context) + { + return new SqlQuery(queryString, null, false, false, false, context, null); + } } diff --git a/integration-tests-ex/cases/src/test/resources/catalog/incompatibleTypeAssignmentWithValidationDisabled_select.sql b/integration-tests-ex/cases/src/test/resources/catalog/incompatibleTypeAssignmentWithValidationDisabled_select.sql new file mode 100644 index 000000000000..4b6fafdae916 --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/catalog/incompatibleTypeAssignmentWithValidationDisabled_select.sql @@ -0,0 +1,11 @@ +[ + { + "query": "SELECT * FROM %%DATASOURCE%%", + "expectedResults": [ + { + "__time": 1672058096000, + "double_col": 0.0 + } + ] + } +] \ No newline at end of file diff --git a/integration-tests-ex/cases/src/test/resources/catalog/sealedWithValidationDisabled_select.sql b/integration-tests-ex/cases/src/test/resources/catalog/sealedWithValidationDisabled_select.sql new file mode 100644 index 000000000000..a818bcb2ea4c --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/catalog/sealedWithValidationDisabled_select.sql @@ -0,0 +1,11 @@ +[ + { + "query": "SELECT * FROM %%DATASOURCE%%", + "expectedResults": [ + { + "__time": 1672058096000, + "extra": "foo" + } + ] + } +] \ No newline at end of file diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java b/processing/src/main/java/org/apache/druid/query/QueryContext.java index 80a860bb273f..daa6760f8f32 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContext.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java @@ -621,6 +621,14 @@ public boolean isWindowingStrictValidation() ); } + public boolean isCatalogValidationEnabled() + { + return getBoolean( + QueryContexts.CATALOG_VALIDATION_ENABLED, + QueryContexts.DEFAULT_CATALOG_VALIDATION_ENABLED + ); + } + public QueryResourceId getQueryResourceId() { return new QueryResourceId(getString(QueryContexts.QUERY_RESOURCE_ID)); diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index 3010b4fa923c..61520a04bc28 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -88,6 +88,7 @@ public class QueryContexts public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit"; public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold"; public static final String WINDOWING_STRICT_VALIDATION = "windowingStrictValidation"; + public static final String CATALOG_VALIDATION_ENABLED = "catalogValidationEnabled"; // Unique identifier for the query, that is used to map the global shared resources (specifically merge buffers) to the // query's runtime public static final String QUERY_RESOURCE_ID = "queryResourceId"; @@ -126,6 +127,7 @@ public class QueryContexts public static final int DEFAULT_IN_FUNCTION_EXPR_THRESHOLD = 2; public static final boolean DEFAULT_ENABLE_TIME_BOUNDARY_PLANNING = false; public static final boolean DEFAULT_WINDOWING_STRICT_VALIDATION = true; + public static final boolean DEFAULT_CATALOG_VALIDATION_ENABLED = true; @SuppressWarnings("unused") // Used by Jackson serialization public enum Vectorize diff --git a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java index 09496ece9f2e..2fa414188a11 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java @@ -160,6 +160,24 @@ public void testDefaultWindowingStrictValidation() ); } + @Test + public void testCatalogValidationEnabled() + { + Assert.assertEquals( + QueryContexts.DEFAULT_CATALOG_VALIDATION_ENABLED, + QueryContext.empty().isCatalogValidationEnabled() + ); + Assert.assertTrue(QueryContext.of(ImmutableMap.of( + QueryContexts.CATALOG_VALIDATION_ENABLED, + true + )).isCatalogValidationEnabled()); + Assert.assertFalse(QueryContext.of(ImmutableMap.of( + QueryContexts.CATALOG_VALIDATION_ENABLED, + false + )).isCatalogValidationEnabled()); + } + + @Test public void testGetEnableJoinLeftScanDirect() { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index d1a47520a90e..a6319c40f668 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -534,9 +534,12 @@ private RelDataType validateTargetType( ); } } - if (tableMetadata == null) { + final boolean isCatalogValidationEnabled = plannerContext.queryContext().isCatalogValidationEnabled(); + if (tableMetadata == null || !isCatalogValidationEnabled) { return sourceType; } + + // disable sealed mode validation if catalog validation is disabled. final boolean isStrict = tableMetadata.isSealed(); final List> fields = new ArrayList<>(); for (RelDataTypeField sourceField : sourceFields) { @@ -592,6 +595,8 @@ private RelDataType validateTargetType( // matches above. final RelDataType targetType = typeFactory.createStructType(fields); final SqlValidatorTable target = insertNs.resolve().getTable(); + + // disable type checking if catalog validation is disabled. checkTypeAssignment(scope, target, sourceType, targetType, insert); return targetType; } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java index bf5f504aca88..621c919b0ed7 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java @@ -36,6 +36,7 @@ import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; @@ -56,9 +57,20 @@ import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; + @SqlTestFrameworkConfig.ComponentSupplier(CatalogIngestionDmlComponentSupplier.class) public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest { + + private static final Map CONTEXT_WITH_VALIDATION_DISABLED; + + static { + CONTEXT_WITH_VALIDATION_DISABLED = new HashMap<>(DEFAULT_CONTEXT); + CONTEXT_WITH_VALIDATION_DISABLED.put(QueryContexts.CATALOG_VALIDATION_ENABLED, false); + } + private final String operationName; private final String dmlPrefixPattern; @@ -919,7 +931,7 @@ public void testInsertAddNonDefinedColumnIntoSealedCatalogTable() " b AS dim1,\n" + " 1 AS cnt,\n" + " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2\n" + + " d AS extra2\n" + "FROM TABLE(inline(\n" + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + " format => 'csv'))\n" + @@ -933,6 +945,65 @@ public void testInsertAddNonDefinedColumnIntoSealedCatalogTable() .verify(); } + /** + * Adding a new column during ingestion that is not defined in a sealed table, when catalog validation is disabled, + * should plan accordingly. + */ + @Test + public void testInsertAddNonDefinedColumnIntoSealedCatalogTableAndValidationDisabled() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.LONG) + .add("extra2", ColumnType.STRING) + .build(); + testIngestionQuery() + .context(CONTEXT_WITH_VALIDATION_DISABLED) + .sql(StringUtils.format(dmlPrefixPattern, "fooSealed") + "\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " d AS extra2\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + "PARTITIONED BY ALL TIME") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("fooSealed", signature) + .expectResources(dataSourceWrite("fooSealed"), Externals.externalRead("EXTERNAL")) + .expectQuery( + newScanQueryBuilder() + .dataSource(externalDataSource) + .intervals(querySegmentSpec(Filtration.eternity())) + .virtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), + expressionVirtualColumn("v1", "1", ColumnType.LONG) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("b", "c", "d", "v0", "v1") + .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + /** * Inserting into a catalog table with a WITH source succeeds @@ -1104,6 +1175,10 @@ public void testInsertIntoExistingStrictNoDefinedSchema() .verify(); } + /** + * Assigning a column during ingestion, to an input type that is not compatible with the defined type of the + * column, should result in a proper validation error. + */ @Test public void testInsertIntoExistingWithIncompatibleTypeAssignment() { @@ -1120,6 +1195,48 @@ public void testInsertIntoExistingWithIncompatibleTypeAssignment() .verify(); } + /** + * Assigning a column during ingestion, to an input type that is not compatible with the defined type of the + * column, when catalog validation is disabled, should plan accordingly. + */ + @Test + public void testInsertIntoExistingWithIncompatibleTypeAssignmentAndValidationDisabled() + { + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING_ARRAY) + .build(); + testIngestionQuery() + .context(CONTEXT_WITH_VALIDATION_DISABLED) + .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" + + "SELECT\n" + + " __time AS __time,\n" + + " ARRAY[dim1] AS dim1\n" + + "FROM foo\n" + + "PARTITIONED BY ALL TIME") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceRead("foo"), dataSourceWrite("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + .virtualColumns( + expressionVirtualColumn("v0", "array(\"dim1\")", ColumnType.STRING_ARRAY) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("__time", "v0") + .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + + /** + * Assigning a complex type column during ingestion, to an input type that is not compatible with the defined type of + * the column, should result in a proper validation error. + */ @Test public void testGroupByInsertIntoExistingWithIncompatibleTypeAssignment() { @@ -1135,4 +1252,42 @@ public void testGroupByInsertIntoExistingWithIncompatibleTypeAssignment() "Cannot assign to target field 'unique_dim1' of type COMPLEX from source field 'unique_dim1' of type VARCHAR ARRAY (line [4], column [3])") .verify(); } + + /** + * Assigning a complex type column during ingestion, to an input type that is not compatible with the defined type of + * the column, when catalog validation is disabled, should plan accordingly. + */ + @Test + public void testGroupByInsertIntoExistingWithIncompatibleTypeAssignmentAndValidationDisabled() + { + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("unique_dim1", ColumnType.STRING_ARRAY) + .build(); + testIngestionQuery() + .context(CONTEXT_WITH_VALIDATION_DISABLED) + .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" + + "SELECT\n" + + " __time AS __time,\n" + + " ARRAY[dim1] AS unique_dim1\n" + + "FROM foo\n" + + "PARTITIONED BY ALL TIME") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceRead("foo"), dataSourceWrite("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + .virtualColumns( + expressionVirtualColumn("v0", "array(\"dim1\")", ColumnType.STRING_ARRAY) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("__time", "v0") + .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } }