diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 832c791af676..fa7bf72e6a68 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -304,10 +304,16 @@ public class KsqlConfig extends AbstractConfig { + "KSQL metastore backup files are located."; public static final String KSQL_SUPPRESS_ENABLED = "ksql.suppress.enabled"; - public static final Boolean KSQL_SUPPRESS_ENABLED_DEFAULT = false; + public static final Boolean KSQL_SUPPRESS_ENABLED_DEFAULT = true; public static final String KSQL_SUPPRESS_ENABLED_DOC = "Feature flag for suppression, specifically EMIT FINAL"; + public static final String KSQL_SUPPRESS_BUFFER_SIZE = "ksql.suppress.buffer.size"; + public static final Long KSQL_SUPPRESS_BUFFER_SIZE_DEFAULT = -1L; + public static final String KSQL_SUPPRESS_BUFFER_SIZE_DOC = "Bound the size of the buffer used " + + "for suppression. Negative size means the buffer will be unbounded. If the buffer exceeds " + + "its max capacity, a StreamsException stating this is thrown"; + // Defaults for config NOT defined by this class's ConfigDef: static final ImmutableMap NON_KSQL_DEFAULTS = ImmutableMap .builder() @@ -767,6 +773,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.LOW, KSQL_QUERY_ERROR_MAX_QUEUE_SIZE_DOC ) + .define( + KSQL_SUPPRESS_BUFFER_SIZE, + Type.LONG, + KSQL_SUPPRESS_BUFFER_SIZE_DEFAULT, + Importance.LOW, + KSQL_SUPPRESS_BUFFER_SIZE_DOC + ) .define( KSQL_PROPERTIES_OVERRIDES_DENYLIST, Type.LIST, diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java index 76de6db87d03..f7abf3702ec8 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java @@ -110,7 +110,9 @@ public LogicalPlanner( this.aggregateAnalyzer = new AggregateAnalyzer(functionRegistry); } + // CHECKSTYLE_RULES.OFF: CyclomaticComplexity public OutputNode buildPlan() { + // CHECKSTYLE_RULES.ON: CyclomaticComplexity PlanNode currentNode = buildSourceNode(); if (analysis.getWhereExpression().isPresent()) { @@ -143,6 +145,10 @@ public OutputNode buildPlan() { if (analysis.getRefinementInfo().isPresent() && analysis.getRefinementInfo().get().getOutputRefinement() == OutputRefinement.FINAL) { + if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_SUPPRESS_ENABLED)) { + throw new KsqlException("Suppression is currently disabled. You can enable it by setting " + + KsqlConfig.KSQL_SUPPRESS_ENABLED + " to true"); + } if (!(analysis.getGroupBy().isPresent() && analysis.getWindowExpression().isPresent())) { throw new KsqlException("EMIT FINAL is only supported for windowed aggregations."); } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java index 517b97514e38..9ad39470701b 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertThrows; - import io.confluent.ksql.execution.expression.tree.ComparisonExpression; import io.confluent.ksql.execution.expression.tree.ComparisonExpression.Type; import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp; @@ -45,6 +44,7 @@ import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.MetaStoreFixture; import java.util.Collections; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -59,8 +59,10 @@ public class LogicalPlannerTest { public void init() { metaStore = MetaStoreFixture.getNewMetaStore(TestFunctionRegistry.INSTANCE.get()); ksqlConfig = new KsqlConfig(Collections.emptyMap()); + } + @Test public void shouldCreatePlanWithTableAsSource() { final PlanNode planNode = buildLogicalPlan("select col0 from TEST2 EMIT CHANGES limit 5;"); @@ -309,6 +311,7 @@ public void shouldCreateTableOutputForTableSuppress() { final PlanNode logicalPlan = buildLogicalPlan(simpleQuery); assertThat(logicalPlan.getNodeOutputType(), equalTo(DataSourceType.KTABLE)); } + @Test public void shouldThrowOnNonWindowedAggregationSuppressions() { final String simpleQuery = "SELECT * FROM test2 EMIT FINAL;"; @@ -319,6 +322,23 @@ public void shouldThrowOnNonWindowedAggregationSuppressions() { assertThat(e.getMessage(), containsString("EMIT FINAL is only supported for windowed aggregations.")); } + + @Test + public void shouldThrowOnSuppressDisabledInConfig() { + // Given: + KsqlConfig ksqlConfigSuppressDisabled = new KsqlConfig(Collections.singletonMap(KsqlConfig.KSQL_SUPPRESS_ENABLED, false)); + final String simpleQuery = "SELECT col1,COUNT(*) as COUNT FROM test2 WINDOW TUMBLING (SIZE 2 MILLISECONDS, GRACE PERIOD 1 MILLISECONDS) GROUP BY col1 EMIT FINAL;"; + + // When: + final Exception e = assertThrows( + KsqlException.class, + () -> AnalysisTestUtil.buildLogicalPlan(ksqlConfigSuppressDisabled, simpleQuery, metaStore) + ); + + // Then: + assertThat(e.getMessage(), containsString("Suppression is currently disabled. You can enable it by setting ksql.suppress.enabled to true")); + } + private PlanNode buildLogicalPlan(final String query) { return AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore); } diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/suppress.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/suppress.json index f70258eb35f2..31af15e8f81e 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/suppress.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/suppress.json @@ -289,6 +289,28 @@ {"topic": "OUTPUT", "key": "k1", "value": {"COUNT": 1},"window": {"start": 2, "end": 4, "type": "time"},"timestamp": 2} ] }, + { + "name": "should throw when max buffer size is exceeded", + "properties": { + "ksql.suppress.buffer.size": 10 + }, + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, COL1 STRING) WITH (kafka_topic='input_topic',value_format='JSON');", + "CREATE TABLE OUTPUT AS SELECT ID, COUNT(*) as COUNT FROM INPUT WINDOW TUMBLING (SIZE 2 MILLISECONDS, GRACE PERIOD 1 MILLISECONDS) GROUP BY ID EMIT FINAL;" + ], + "inputs": [ + {"topic": "input_topic", "key": "k1", "value": {"COL1": 1},"timestamp": 0}, + {"topic": "input_topic", "key": "k1", "value": {"COL1": 4},"timestamp": 1}, + {"topic": "input_topic", "key": "k1", "value": {"COL1": 1},"timestamp": 2}, + {"topic": "input_topic", "key": "k1", "value": {"COL1": 3},"timestamp": 1}, + {"topic": "input_topic", "key": "k1", "value": {"COL1": 2},"timestamp": 0}, + {"topic": "input_topic", "key": "k1", "value": {"COL1": 1},"timestamp": 5} + ], + "expectedException": { + "type": "org.apache.kafka.streams.errors.StreamsException", + "message": "Suppress buffer exceeded its max capacity." + } + }, { "name": "should throw on non windowed tables", "statements": [ diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/TableSuppressBuilder.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/TableSuppressBuilder.java index 1ee481ab5ad6..35a8d776b2c2 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/TableSuppressBuilder.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/TableSuppressBuilder.java @@ -26,8 +26,10 @@ import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.util.KsqlConfig; import java.util.Set; import java.util.function.BiFunction; + import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.KTable; @@ -91,6 +93,19 @@ KTableHolder build( keySerde, valueSerde ); + + final Suppressed.StrictBufferConfig strictBufferConfig; + final long maxBytes = queryBuilder.getKsqlConfig().getLong( + KsqlConfig.KSQL_SUPPRESS_BUFFER_SIZE); + + if (maxBytes < 0) { + strictBufferConfig = Suppressed.BufferConfig.unbounded(); + } else { + strictBufferConfig = Suppressed.BufferConfig + .maxBytes(maxBytes) + .shutDownWhenFull(); + } + /* This is a dummy transformValues() call, we do this to ensure that the correct materialized with the correct key and val serdes is passed on when we call suppress */ @@ -99,7 +114,7 @@ KTableHolder build( materialized ).suppress( (Suppressed) Suppressed - .untilWindowCloses(Suppressed.BufferConfig.unbounded()) + .untilWindowCloses(strictBufferConfig) .withName(SUPPRESS_OP_NAME) ); diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableSuppressBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableSuppressBuilderTest.java index f3dff447a01c..d697784ff2b0 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableSuppressBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableSuppressBuilderTest.java @@ -63,6 +63,8 @@ public class TableSuppressBuilderTest { @Mock private KsqlQueryBuilder queryBuilder; @Mock + private KsqlConfig ksqlConfig; + @Mock private ExecutionStep> sourceStep; @Mock private KTable sourceKTable; @@ -94,9 +96,11 @@ public class TableSuppressBuilderTest { private final QueryContext queryContext = new QueryContext.Stacker() .push("bar") .getQueryContext(); + private TableSuppress tableSuppress; private BiFunction, PhysicalSchema> physicalSchemaFactory; private BiFunction, Serde, Materialized> materializedFactory; + private Long maxBytes = 300L; private TableSuppressBuilder builder; @Rule @@ -112,6 +116,9 @@ public void init() { when(queryBuilder.buildValueSerde(any(), any(), any())).thenReturn(valueSerde); when(keySerdeFactory.buildKeySerde(any(), any(), any())).thenReturn(keySerde); + when(queryBuilder.getKsqlConfig()).thenReturn(ksqlConfig); + when(ksqlConfig.getLong(any())).thenReturn(maxBytes); + when(tableHolder.getTable()).thenReturn(sourceKTable); when(sourceKTable.transformValues(any(), any(Materialized.class))).thenReturn(preKTable); when(preKTable.suppress(any())).thenReturn(suppressedKTable);