Skip to content

Commit

Permalink
feat: add config for suppress buffer (#5969)
Browse files Browse the repository at this point in the history
* fix: add config to disable suppress andbound buffer size

* fix: add suppress feature flag and buffer configs

* fix: fix checkstyle

* fix: fix checstyle violation

* fix: address feedback

* fix: add commmit suggestions

* fix: import order

* test: fix tablesuppressbuilder test

* test: fix logical planner test
  • Loading branch information
nae701 committed Aug 9, 2020
1 parent cce2c10 commit 53b0d4d
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 3 deletions.
Expand Up @@ -304,10 +304,16 @@ public class KsqlConfig extends AbstractConfig {
+ "KSQL metastore backup files are located.";

public static final String KSQL_SUPPRESS_ENABLED = "ksql.suppress.enabled";
public static final Boolean KSQL_SUPPRESS_ENABLED_DEFAULT = false;
public static final Boolean KSQL_SUPPRESS_ENABLED_DEFAULT = true;
public static final String KSQL_SUPPRESS_ENABLED_DOC =
"Feature flag for suppression, specifically EMIT FINAL";

public static final String KSQL_SUPPRESS_BUFFER_SIZE = "ksql.suppress.buffer.size";
public static final Long KSQL_SUPPRESS_BUFFER_SIZE_DEFAULT = -1L;
public static final String KSQL_SUPPRESS_BUFFER_SIZE_DOC = "Bound the size of the buffer used "
+ "for suppression. Negative size means the buffer will be unbounded. If the buffer exceeds "
+ "its max capacity, a StreamsException stating this is thrown";

// Defaults for config NOT defined by this class's ConfigDef:
static final ImmutableMap<String, ?> NON_KSQL_DEFAULTS = ImmutableMap
.<String, Object>builder()
Expand Down Expand Up @@ -767,6 +773,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_ERROR_MAX_QUEUE_SIZE_DOC
)
.define(
KSQL_SUPPRESS_BUFFER_SIZE,
Type.LONG,
KSQL_SUPPRESS_BUFFER_SIZE_DEFAULT,
Importance.LOW,
KSQL_SUPPRESS_BUFFER_SIZE_DOC
)
.define(
KSQL_PROPERTIES_OVERRIDES_DENYLIST,
Type.LIST,
Expand Down
Expand Up @@ -110,7 +110,9 @@ public LogicalPlanner(
this.aggregateAnalyzer = new AggregateAnalyzer(functionRegistry);
}

// CHECKSTYLE_RULES.OFF: CyclomaticComplexity
public OutputNode buildPlan() {
// CHECKSTYLE_RULES.ON: CyclomaticComplexity
PlanNode currentNode = buildSourceNode();

if (analysis.getWhereExpression().isPresent()) {
Expand Down Expand Up @@ -143,6 +145,10 @@ public OutputNode buildPlan() {

if (analysis.getRefinementInfo().isPresent()
&& analysis.getRefinementInfo().get().getOutputRefinement() == OutputRefinement.FINAL) {
if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_SUPPRESS_ENABLED)) {
throw new KsqlException("Suppression is currently disabled. You can enable it by setting "
+ KsqlConfig.KSQL_SUPPRESS_ENABLED + " to true");
}
if (!(analysis.getGroupBy().isPresent() && analysis.getWindowExpression().isPresent())) {
throw new KsqlException("EMIT FINAL is only supported for windowed aggregations.");
}
Expand Down
Expand Up @@ -22,7 +22,6 @@
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThrows;

import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression.Type;
import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp;
Expand All @@ -45,6 +44,7 @@
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.MetaStoreFixture;
import java.util.Collections;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -59,8 +59,10 @@ public class LogicalPlannerTest {
public void init() {
metaStore = MetaStoreFixture.getNewMetaStore(TestFunctionRegistry.INSTANCE.get());
ksqlConfig = new KsqlConfig(Collections.emptyMap());

}


@Test
public void shouldCreatePlanWithTableAsSource() {
final PlanNode planNode = buildLogicalPlan("select col0 from TEST2 EMIT CHANGES limit 5;");
Expand Down Expand Up @@ -309,6 +311,7 @@ public void shouldCreateTableOutputForTableSuppress() {
final PlanNode logicalPlan = buildLogicalPlan(simpleQuery);
assertThat(logicalPlan.getNodeOutputType(), equalTo(DataSourceType.KTABLE));
}

@Test
public void shouldThrowOnNonWindowedAggregationSuppressions() {
final String simpleQuery = "SELECT * FROM test2 EMIT FINAL;";
Expand All @@ -319,6 +322,23 @@ public void shouldThrowOnNonWindowedAggregationSuppressions() {

assertThat(e.getMessage(), containsString("EMIT FINAL is only supported for windowed aggregations."));
}

@Test
public void shouldThrowOnSuppressDisabledInConfig() {
// Given:
KsqlConfig ksqlConfigSuppressDisabled = new KsqlConfig(Collections.singletonMap(KsqlConfig.KSQL_SUPPRESS_ENABLED, false));
final String simpleQuery = "SELECT col1,COUNT(*) as COUNT FROM test2 WINDOW TUMBLING (SIZE 2 MILLISECONDS, GRACE PERIOD 1 MILLISECONDS) GROUP BY col1 EMIT FINAL;";

// When:
final Exception e = assertThrows(
KsqlException.class,
() -> AnalysisTestUtil.buildLogicalPlan(ksqlConfigSuppressDisabled, simpleQuery, metaStore)
);

// Then:
assertThat(e.getMessage(), containsString("Suppression is currently disabled. You can enable it by setting ksql.suppress.enabled to true"));
}

private PlanNode buildLogicalPlan(final String query) {
return AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore);
}
Expand Down
Expand Up @@ -289,6 +289,28 @@
{"topic": "OUTPUT", "key": "k1", "value": {"COUNT": 1},"window": {"start": 2, "end": 4, "type": "time"},"timestamp": 2}
]
},
{
"name": "should throw when max buffer size is exceeded",
"properties": {
"ksql.suppress.buffer.size": 10
},
"statements": [
"CREATE STREAM INPUT (ID STRING KEY, COL1 STRING) WITH (kafka_topic='input_topic',value_format='JSON');",
"CREATE TABLE OUTPUT AS SELECT ID, COUNT(*) as COUNT FROM INPUT WINDOW TUMBLING (SIZE 2 MILLISECONDS, GRACE PERIOD 1 MILLISECONDS) GROUP BY ID EMIT FINAL;"
],
"inputs": [
{"topic": "input_topic", "key": "k1", "value": {"COL1": 1},"timestamp": 0},
{"topic": "input_topic", "key": "k1", "value": {"COL1": 4},"timestamp": 1},
{"topic": "input_topic", "key": "k1", "value": {"COL1": 1},"timestamp": 2},
{"topic": "input_topic", "key": "k1", "value": {"COL1": 3},"timestamp": 1},
{"topic": "input_topic", "key": "k1", "value": {"COL1": 2},"timestamp": 0},
{"topic": "input_topic", "key": "k1", "value": {"COL1": 1},"timestamp": 5}
],
"expectedException": {
"type": "org.apache.kafka.streams.errors.StreamsException",
"message": "Suppress buffer exceeded its max capacity."
}
},
{
"name": "should throw on non windowed tables",
"statements": [
Expand Down
Expand Up @@ -26,8 +26,10 @@
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Set;
import java.util.function.BiFunction;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.KTable;
Expand Down Expand Up @@ -91,6 +93,19 @@ <K> KTableHolder<K> build(
keySerde,
valueSerde
);

final Suppressed.StrictBufferConfig strictBufferConfig;
final long maxBytes = queryBuilder.getKsqlConfig().getLong(
KsqlConfig.KSQL_SUPPRESS_BUFFER_SIZE);

if (maxBytes < 0) {
strictBufferConfig = Suppressed.BufferConfig.unbounded();
} else {
strictBufferConfig = Suppressed.BufferConfig
.maxBytes(maxBytes)
.shutDownWhenFull();
}

/* This is a dummy transformValues() call, we do this to ensure that the correct materialized
with the correct key and val serdes is passed on when we call suppress
*/
Expand All @@ -99,7 +114,7 @@ <K> KTableHolder<K> build(
materialized
).suppress(
(Suppressed<? super K>) Suppressed
.untilWindowCloses(Suppressed.BufferConfig.unbounded())
.untilWindowCloses(strictBufferConfig)
.withName(SUPPRESS_OP_NAME)
);

Expand Down
Expand Up @@ -63,6 +63,8 @@ public class TableSuppressBuilderTest {
@Mock
private KsqlQueryBuilder queryBuilder;
@Mock
private KsqlConfig ksqlConfig;
@Mock
private ExecutionStep<KTableHolder<Struct>> sourceStep;
@Mock
private KTable<Struct, GenericRow> sourceKTable;
Expand Down Expand Up @@ -94,9 +96,11 @@ public class TableSuppressBuilderTest {
private final QueryContext queryContext = new QueryContext.Stacker()
.push("bar")
.getQueryContext();

private TableSuppress<Struct> tableSuppress;
private BiFunction<LogicalSchema, Set<SerdeOption>, PhysicalSchema> physicalSchemaFactory;
private BiFunction<Serde<Struct>, Serde<GenericRow>, Materialized> materializedFactory;
private Long maxBytes = 300L;
private TableSuppressBuilder builder;

@Rule
Expand All @@ -112,6 +116,9 @@ public void init() {

when(queryBuilder.buildValueSerde(any(), any(), any())).thenReturn(valueSerde);
when(keySerdeFactory.buildKeySerde(any(), any(), any())).thenReturn(keySerde);
when(queryBuilder.getKsqlConfig()).thenReturn(ksqlConfig);
when(ksqlConfig.getLong(any())).thenReturn(maxBytes);

when(tableHolder.getTable()).thenReturn(sourceKTable);
when(sourceKTable.transformValues(any(), any(Materialized.class))).thenReturn(preKTable);
when(preKTable.suppress(any())).thenReturn(suppressedKTable);
Expand Down

0 comments on commit 53b0d4d

Please sign in to comment.