Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to turn off Druid Catalog specific validation done on catalog defined tables in Druid #16465

Merged
merged 9 commits into from
May 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,14 @@ public boolean isWindowingStrictValidation()
);
}

public boolean isCatalogValidationEnabled()
{
return getBoolean(
QueryContexts.CATALOG_VALIDATION_ENABLED,
QueryContexts.DEFAULT_CATALOG_VALIDATION_ENABLED
);
}

public QueryResourceId getQueryResourceId()
{
return new QueryResourceId(getString(QueryContexts.QUERY_RESOURCE_ID));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class QueryContexts
public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit";
public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold";
public static final String WINDOWING_STRICT_VALIDATION = "windowingStrictValidation";
public static final String CATALOG_VALIDATION_ENABLED = "catalogValidationEnabled";
// Unique identifier for the query, that is used to map the global shared resources (specifically merge buffers) to the
// query's runtime
public static final String QUERY_RESOURCE_ID = "queryResourceId";
Expand Down Expand Up @@ -126,6 +127,7 @@ public class QueryContexts
public static final int DEFAULT_IN_FUNCTION_EXPR_THRESHOLD = 2;
public static final boolean DEFAULT_ENABLE_TIME_BOUNDARY_PLANNING = false;
public static final boolean DEFAULT_WINDOWING_STRICT_VALIDATION = true;
public static final boolean DEFAULT_CATALOG_VALIDATION_ENABLED = true;

@SuppressWarnings("unused") // Used by Jackson serialization
public enum Vectorize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,24 @@ public void testDefaultWindowingStrictValidation()
);
}

@Test
public void testCatalogValidationEnabled()
{
Assert.assertEquals(
QueryContexts.DEFAULT_CATALOG_VALIDATION_ENABLED,
QueryContext.empty().isCatalogValidationEnabled()
);
Assert.assertTrue(QueryContext.of(ImmutableMap.of(
QueryContexts.CATALOG_VALIDATION_ENABLED,
true
)).isCatalogValidationEnabled());
Assert.assertFalse(QueryContext.of(ImmutableMap.of(
QueryContexts.CATALOG_VALIDATION_ENABLED,
false
)).isCatalogValidationEnabled());
}


@Test
public void testGetEnableJoinLeftScanDirect()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,10 @@ private RelDataType validateTargetType(
if (tableMetadata == null) {
return sourceType;
}
final boolean isStrict = tableMetadata.isSealed();

final boolean isCatalogValidationEnabled = plannerContext.queryContext().isCatalogValidationEnabled();
// disable sealed mode validation if catalog validation is disabled.
final boolean isStrict = tableMetadata.isSealed() && isCatalogValidationEnabled;
final List<Map.Entry<String, RelDataType>> fields = new ArrayList<>();
for (RelDataTypeField sourceField : sourceFields) {
final String colName = sourceField.getName();
Expand Down Expand Up @@ -592,7 +595,11 @@ private RelDataType validateTargetType(
// matches above.
final RelDataType targetType = typeFactory.createStructType(fields);
final SqlValidatorTable target = insertNs.resolve().getTable();
checkTypeAssignment(scope, target, sourceType, targetType, insert);

// disable type checking if catalog validation is disabled.
if (isCatalogValidationEnabled) {
checkTypeAssignment(scope, target, sourceType, targetType, insert);
}
return targetType;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
Expand All @@ -56,9 +57,20 @@

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Map;

@SqlTestFrameworkConfig.ComponentSupplier(CatalogIngestionDmlComponentSupplier.class)
public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest
{

private static final Map<String, Object> CONTEXT_WITH_VALIDATION_DISABLED;

static {
CONTEXT_WITH_VALIDATION_DISABLED = new HashMap<>(DEFAULT_CONTEXT);
CONTEXT_WITH_VALIDATION_DISABLED.put(QueryContexts.CATALOG_VALIDATION_ENABLED, false);
}

private final String operationName;
private final String dmlPrefixPattern;

Expand Down Expand Up @@ -919,7 +931,7 @@ public void testInsertAddNonDefinedColumnIntoSealedCatalogTable()
" b AS dim1,\n" +
" 1 AS cnt,\n" +
" c AS m2,\n" +
" CAST(d AS BIGINT) AS extra2\n" +
" d AS extra2\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
Expand All @@ -933,6 +945,65 @@ public void testInsertAddNonDefinedColumnIntoSealedCatalogTable()
.verify();
}

/**
* Adding a new column during ingestion that is not defined in a sealed table, when catalog validation is disabled,
* should plan accordingly.
*/
@Test
public void testInsertAddNonDefinedColumnIntoSealedCatalogTableAndValidationDisabled()
{
ExternalDataSource externalDataSource = new ExternalDataSource(
new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
RowSignature.builder()
.add("a", ColumnType.STRING)
.add("b", ColumnType.STRING)
.add("c", ColumnType.LONG)
.add("d", ColumnType.STRING)
.add("e", ColumnType.STRING)
.build()
);
final RowSignature signature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG)
.add("m2", ColumnType.LONG)
.add("extra2", ColumnType.STRING)
.build();
testIngestionQuery()
.context(CONTEXT_WITH_VALIDATION_DISABLED)
.sql(StringUtils.format(dmlPrefixPattern, "fooSealed") + "\n" +
"SELECT\n" +
" TIME_PARSE(a) AS __time,\n" +
" b AS dim1,\n" +
" 1 AS cnt,\n" +
" c AS m2,\n" +
" d AS extra2\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("fooSealed", signature)
.expectResources(dataSourceWrite("fooSealed"), Externals.externalRead("EXTERNAL"))
.expectQuery(
newScanQueryBuilder()
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
expressionVirtualColumn("v1", "1", ColumnType.LONG)
)
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.columns("b", "c", "d", "v0", "v1")
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}


/**
* Inserting into a catalog table with a WITH source succeeds
Expand Down Expand Up @@ -1104,6 +1175,10 @@ public void testInsertIntoExistingStrictNoDefinedSchema()
.verify();
}

/**
* Assigning a column during ingestion, to an input type that is not compatible with the defined type of the
* column, should result in a proper validation error.
*/
@Test
public void testInsertIntoExistingWithIncompatibleTypeAssignment()
{
Expand All @@ -1120,6 +1195,48 @@ public void testInsertIntoExistingWithIncompatibleTypeAssignment()
.verify();
}

/**
* Assigning a column during ingestion, to an input type that is not compatible with the defined type of the
* column, when catalog validation is disabled, should plan accordingly.
*/
@Test
public void testInsertIntoExistingWithIncompatibleTypeAssignmentAndValidationDisabled()
{
final RowSignature signature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING_ARRAY)
.build();
testIngestionQuery()
.context(CONTEXT_WITH_VALIDATION_DISABLED)
.sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n"
+ "SELECT\n"
+ " __time AS __time,\n"
+ " ARRAY[dim1] AS dim1\n"
+ "FROM foo\n"
+ "PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("foo", signature)
.expectResources(dataSourceRead("foo"), dataSourceWrite("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
expressionVirtualColumn("v0", "array(\"dim1\")", ColumnType.STRING_ARRAY)
)
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.columns("__time", "v0")
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}

/**
* Assigning a complex type column during ingestion, to an input type that is not compatible with the defined type of
* the column, should result in a proper validation error.
*/
@Test
public void testGroupByInsertIntoExistingWithIncompatibleTypeAssignment()
{
Expand All @@ -1135,4 +1252,42 @@ public void testGroupByInsertIntoExistingWithIncompatibleTypeAssignment()
"Cannot assign to target field 'unique_dim1' of type COMPLEX<hyperUnique> from source field 'unique_dim1' of type VARCHAR ARRAY (line [4], column [3])")
.verify();
}

/**
* Assigning a complex type column during ingestion, to an input type that is not compatible with the defined type of
* the column, when catalog validation is disabled, should plan accordingly.
*/
@Test
public void testGroupByInsertIntoExistingWithIncompatibleTypeAssignmentAndValidationDisabled()
{
final RowSignature signature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("unique_dim1", ColumnType.STRING_ARRAY)
.build();
testIngestionQuery()
.context(CONTEXT_WITH_VALIDATION_DISABLED)
.sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n"
+ "SELECT\n"
+ " __time AS __time,\n"
+ " ARRAY[dim1] AS unique_dim1\n"
+ "FROM foo\n"
+ "PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("foo", signature)
.expectResources(dataSourceRead("foo"), dataSourceWrite("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
expressionVirtualColumn("v0", "array(\"dim1\")", ColumnType.STRING_ARRAY)
)
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.columns("__time", "v0")
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}
}