Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to turn off Druid Catalog specific validation done on catalog defined tables in Druid #16465

Merged
merged 9 commits into from
May 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.testsEx.catalog;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.TableMetadata;
Expand All @@ -29,14 +30,21 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.sql.SqlTaskStatus;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.testing.utils.MsqTestQueryHelper;
import org.apache.druid.testsEx.cluster.CatalogClient;
import org.apache.druid.testsEx.cluster.DruidClusterClient;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.Map;

import static org.junit.Assert.assertTrue;

/**
* Tests that expect succesfully ingestng data into catalog defined tables and querying the data
* gives expected results.
Expand Down Expand Up @@ -445,4 +453,171 @@ public void testInsertWithMultiClusteringFromQuery() throws Exception

msqHelper.testQueriesFromFile(queryFile, tableName);
}

/**
* Adding a new column during ingestion that is not defined in a sealed table, should fail with
* proper validation error. Disabling catalog validation, through context parameter, and issuing ingest
* query again, should succeed.
*/
@Test
public void testInsertNonDefinedColumnIntoSealedCatalogTableWithValidationDisabled() throws Exception
{
String queryFile = "/catalog/sealedWithValidationDisabled_select.sql";
String tableName = "testInsertNonDefinedColumnIntoSealedCatalogTableWithValidationDisabled" + operationName;
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
.column(Columns.TIME_COLUMN, Columns.LONG)
.property(DatasourceDefn.SEALED_PROPERTY, true)
.build();

client.createTable(table, true);
LOG.info("table created:\n%s", client.readTable(table.id()));
String queryInline =
StringUtils.format(dmlPrefixPattern, tableName) + "\n"
+ "SELECT\n"
+ " TIME_PARSE(a) AS __time,\n"
+ " f AS extra\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
+ " )\n"
+ ") "
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n"
+ "PARTITIONED BY DAY\n";

LOG.info("Running query:\n%s", queryInline);
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskWithExpectedStatusCode(
sqlQueryFromString(
queryInline,
ImmutableMap.of()
),
null,
null,
HttpResponseStatus.BAD_REQUEST
);
assertTrue(sqlTaskStatus.getError() != null && sqlTaskStatus.getError()
.getUnderlyingException()
.getMessage()
.equals(
StringUtils.format("Column [extra] is not defined in the target table [druid.%s] strict schema", tableName))
);

// Submit the task and wait for the datasource to get loaded
LOG.info("Running query:\n%s", queryInline);
sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(
queryInline,
ImmutableMap.of(QueryContexts.CATALOG_VALIDATION_ENABLED, false)
);

if (sqlTaskStatus.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()
Dismissed Show dismissed Hide dismissed
));
}

msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
dataLoaderHelper.waitUntilDatasourceIsReady(tableName);

msqHelper.testQueriesFromFile(queryFile, tableName);
}

/**
* Assigning a column during ingestion, to an input type that is not compatible with the defined type of the
* column, should result in a proper validation error. Disabling catalog validation, through context parameter, and
* issuing ingest query again, should succeed.
*
* In this test we define the table as
* <p>
* __time LONG
* double_col DOUBLE
* <p>
* And insert the following data:
* <p>
* __time, varchar_col1, bigint_col1, float_col1, varchar_col2
* 2022-12-26T12:34:56,extra,10,"20",2.0,foo
* <p>
* even though the data is written
* as
* <p>
* 2022-12-26T12:34:56,extra
* <p>
* When querying the table with query: 'SELECT * from ##tableName', the data is returned as:
* <p>
* __time, double_col
* 2022-12-26T12:34:56,0.0
* <p>
* because the broker knows the double_col column to be a DOUBLE, and so converts to null (0.0) at query time.
*/
@Test
public void testInsertWithIncompatibleTypeAssignmentWithValidationDisabled() throws Exception
{
String tableName = "testInsertWithIncompatibleTypeAssignmentWithValidationDisabled" + operationName;
String queryFile = "/catalog/incompatibleTypeAssignmentWithValidationDisabled_select.sql";
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
.column(Columns.TIME_COLUMN, Columns.LONG)
.column("double_col", "DOUBLE")
.property(DatasourceDefn.SEALED_PROPERTY, true)
.build();

client.createTable(table, true);
LOG.info("table created:\n%s", client.readTable(table.id()));
String queryInline =
StringUtils.format(
"INSERT INTO %s\n"
+ "SELECT\n"
+ " TIME_PARSE(a) AS __time,\n"
+ " b AS double_col\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
+ " )\n"
+ ") "
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n"
+ "PARTITIONED BY DAY\n",
tableName
);

LOG.info("Running query:\n%s", queryInline);
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskWithExpectedStatusCode(
sqlQueryFromString(
queryInline,
ImmutableMap.of()
),
null,
null,
HttpResponseStatus.BAD_REQUEST
);
assertTrue(sqlTaskStatus.getError() != null && sqlTaskStatus.getError()
.getUnderlyingException()
.getMessage()
.equals(
"Cannot assign to target field 'double_col' of type DOUBLE from source field 'double_col' of type VARCHAR (line [4], column [3])")
);

// Submit the task and wait for the datasource to get loaded
LOG.info("Running query:\n%s", queryInline);
sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(
queryInline,
ImmutableMap.of(QueryContexts.CATALOG_VALIDATION_ENABLED, false)
);

if (sqlTaskStatus.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()
Dismissed Show dismissed Hide dismissed
));
}

msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
dataLoaderHelper.waitUntilDatasourceIsReady(tableName);

msqHelper.testQueriesFromFile(queryFile, tableName);
}

private static SqlQuery sqlQueryFromString(String queryString, Map<String, Object> context)
{
return new SqlQuery(queryString, null, false, false, false, context, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[
{
"query": "SELECT * FROM %%DATASOURCE%%",
"expectedResults": [
{
"__time": 1672058096000,
"double_col": 0.0
}
]
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[
{
"query": "SELECT * FROM %%DATASOURCE%%",
"expectedResults": [
{
"__time": 1672058096000,
"extra": "foo"
}
]
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,14 @@ public boolean isWindowingStrictValidation()
);
}

public boolean isCatalogValidationEnabled()
{
return getBoolean(
QueryContexts.CATALOG_VALIDATION_ENABLED,
QueryContexts.DEFAULT_CATALOG_VALIDATION_ENABLED
);
}

public QueryResourceId getQueryResourceId()
{
return new QueryResourceId(getString(QueryContexts.QUERY_RESOURCE_ID));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class QueryContexts
public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit";
public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold";
public static final String WINDOWING_STRICT_VALIDATION = "windowingStrictValidation";
public static final String CATALOG_VALIDATION_ENABLED = "catalogValidationEnabled";
// Unique identifier for the query, that is used to map the global shared resources (specifically merge buffers) to the
// query's runtime
public static final String QUERY_RESOURCE_ID = "queryResourceId";
Expand Down Expand Up @@ -126,6 +127,7 @@ public class QueryContexts
public static final int DEFAULT_IN_FUNCTION_EXPR_THRESHOLD = 2;
public static final boolean DEFAULT_ENABLE_TIME_BOUNDARY_PLANNING = false;
public static final boolean DEFAULT_WINDOWING_STRICT_VALIDATION = true;
public static final boolean DEFAULT_CATALOG_VALIDATION_ENABLED = true;

@SuppressWarnings("unused") // Used by Jackson serialization
public enum Vectorize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,24 @@ public void testDefaultWindowingStrictValidation()
);
}

@Test
public void testCatalogValidationEnabled()
{
Assert.assertEquals(
QueryContexts.DEFAULT_CATALOG_VALIDATION_ENABLED,
QueryContext.empty().isCatalogValidationEnabled()
);
Assert.assertTrue(QueryContext.of(ImmutableMap.of(
QueryContexts.CATALOG_VALIDATION_ENABLED,
true
)).isCatalogValidationEnabled());
Assert.assertFalse(QueryContext.of(ImmutableMap.of(
QueryContexts.CATALOG_VALIDATION_ENABLED,
false
)).isCatalogValidationEnabled());
}


@Test
public void testGetEnableJoinLeftScanDirect()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,12 @@ private RelDataType validateTargetType(
);
}
}
if (tableMetadata == null) {
final boolean isCatalogValidationEnabled = plannerContext.queryContext().isCatalogValidationEnabled();
if (tableMetadata == null || !isCatalogValidationEnabled) {
return sourceType;
}

// disable sealed mode validation if catalog validation is disabled.
final boolean isStrict = tableMetadata.isSealed();
final List<Map.Entry<String, RelDataType>> fields = new ArrayList<>();
for (RelDataTypeField sourceField : sourceFields) {
Expand Down Expand Up @@ -592,6 +595,8 @@ private RelDataType validateTargetType(
// matches above.
final RelDataType targetType = typeFactory.createStructType(fields);
final SqlValidatorTable target = insertNs.resolve().getTable();

// disable type checking if catalog validation is disabled.
checkTypeAssignment(scope, target, sourceType, targetType, insert);
return targetType;
}
Expand Down