From b25dd987b219dcb8a3105beb7251503828ade023 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Wed, 20 May 2020 09:37:12 -0700 Subject: [PATCH] feat: introduce RegexClassifier for classifying errors via cfg (#5412) --- .../io/confluent/ksql/util/KsqlConfig.java | 19 ++++ .../ksql/query/MissingTopicClassifier.java | 2 +- .../confluent/ksql/query/QueryExecutor.java | 41 +++++++- .../confluent/ksql/query/RegexClassifier.java | 89 +++++++++++++++++ .../ksql/query/RegexClassifierTest.java | 96 +++++++++++++++++++ 5 files changed, 242 insertions(+), 5 deletions(-) create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/query/RegexClassifier.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/query/RegexClassifierTest.java diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 1837d1d8deff..64924f42df2b 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -27,7 +27,9 @@ import io.confluent.ksql.errors.ProductionExceptionHandlerUtil; import io.confluent.ksql.logging.processing.ProcessingLogConfig; import io.confluent.ksql.model.SemanticVersion; +import io.confluent.ksql.query.QueryError; import io.confluent.ksql.testing.EffectivelyImmutable; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -262,6 +264,16 @@ public class KsqlConfig extends AbstractConfig { + "behavior, and instead throw an exception to ensure that no data is missed, set " + "ksql.timestamp.skip.invalid to true."; + public static final String KSQL_ERROR_CLASSIFIER_REGEX_PREFIX = "ksql.error.classifier.regex"; + public static final String KSQL_ERROR_CLASSIFIER_REGEX_PREFIX_DOC = "Any configuration with the " + + "regex prefix will create a new classifier that will be configured to classify anything " + + "that matches the content as the specified type. The value must match " + + " (for example " + KSQL_ERROR_CLASSIFIER_REGEX_PREFIX + ".invalid" + + "=\"USER .*InvalidTopicException.*\"). The type can be one of " + + GrammaticalJoiner.or().join(Arrays.stream(QueryError.Type.values())) + + " and the regex pattern will be matched against the error class name and message of any " + + "uncaught error and subsequent error causes in the Kafka Streams applications."; + private enum ConfigGeneration { LEGACY, CURRENT @@ -624,6 +636,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.LOW, KSQL_QUERY_PULL_MAX_QPS_DOC ) + .define( + KSQL_ERROR_CLASSIFIER_REGEX_PREFIX, + Type.STRING, + "", + Importance.LOW, + KSQL_ERROR_CLASSIFIER_REGEX_PREFIX_DOC + ) .withClientSslSupport(); for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/MissingTopicClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/MissingTopicClassifier.java index 6f33a9b42599..8b2462b0c46b 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/MissingTopicClassifier.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/MissingTopicClassifier.java @@ -48,7 +48,7 @@ public MissingTopicClassifier( @Override public Type classify(final Throwable e) { - LOG.warn("Attempting to classify error for {}", queryId, e); + LOG.info("Attempting to classify error for {}", queryId); for (String requiredTopic : requiredTopics) { if (!topicClient.isTopicExists(requiredTopic)) { LOG.warn("Query {} requires topic {} which cannot be found.", queryId, requiredTopic); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index d8836819ee95..c856bfaadef8 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -17,7 +17,9 @@ import static io.confluent.ksql.util.KsqlConfig.KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import io.confluent.ksql.GenericRow; import io.confluent.ksql.errors.ProductionExceptionHandlerUtil; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; @@ -230,6 +232,14 @@ public PersistentQueryMetadata buildQuery( applicationId )); + final QueryErrorClassifier topicClassifier = new MissingTopicClassifier( + applicationId, + extractTopics(built.topology), + serviceContext.getTopicClient()); + final QueryErrorClassifier classifier = buildConfiguredClassifiers(ksqlConfig, applicationId) + .map(topicClassifier::and) + .orElse(topicClassifier); + return new PersistentQueryMetadata( statementText, built.kafkaStreams, @@ -248,10 +258,7 @@ public PersistentQueryMetadata buildQuery( overrides, queryCloseCallback, ksqlConfig.getLong(KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG), - new MissingTopicClassifier( - applicationId, - extractTopics(built.topology), - serviceContext.getTopicClient())); + classifier); } private TransientQueryQueue buildTransientQueryQueue( @@ -317,6 +324,32 @@ private Map buildStreamsProperties( return newStreamsProperties; } + private static Optional buildConfiguredClassifiers( + final KsqlConfig cfg, + final String queryId + ) { + final Map regexPrefixes = cfg.originalsWithPrefix( + KsqlConfig.KSQL_ERROR_CLASSIFIER_REGEX_PREFIX + ); + + final ImmutableList.Builder builder = ImmutableList.builder(); + for (final Object value : regexPrefixes.values()) { + final String classifier = (String) value; + builder.add(RegexClassifier.fromConfig(classifier, queryId)); + } + final ImmutableList classifiers = builder.build(); + + if (classifiers.isEmpty()) { + return Optional.empty(); + } + + QueryErrorClassifier combined = Iterables.get(classifiers, 0); + for (final QueryErrorClassifier classifier : Iterables.skip(classifiers, 1)) { + combined = combined.and(classifier); + } + return Optional.ofNullable(combined); + } + private static Set extractTopics(final Topology topology) { final Set usedTopics = new HashSet<>(); for (final Subtopology subtopology : topology.describe().subtopologies()) { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/RegexClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/RegexClassifier.java new file mode 100644 index 000000000000..2f96e813a183 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/RegexClassifier.java @@ -0,0 +1,89 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.query; + +import io.confluent.ksql.query.QueryError.Type; +import java.util.Objects; +import java.util.regex.Pattern; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@code RegexClassifier} classifies errors based on regex patterns of + * the stack trace. This class is intended as "last resort" so that noisy + * errors can be classified without deploying a new version of code. + */ +public final class RegexClassifier implements QueryErrorClassifier { + + private static final Logger LOG = LoggerFactory.getLogger(RegexClassifier.class); + + private final Pattern pattern; + private final Type type; + private final String queryId; + + /** + * Specifying a RegexClassifier in a config requires specifying + * {@code }, for example {@code USER .*InvalidTopicException.*} + * + * @param config the configuration specifying the type and pattern + * @return the classifier + */ + public static QueryErrorClassifier fromConfig(final String config, final String queryId) { + final String[] split = config.split("\\s", 2); + if (split.length < 2) { + LOG.warn("Ignoring invalid configuration for RegexClassifier: " + config); + return err -> Type.UNKNOWN; + } + + return new RegexClassifier( + Type.valueOf(split[0].toUpperCase()), + Pattern.compile(split[1], Pattern.DOTALL), + queryId + ); + } + + private RegexClassifier(final Type type, final Pattern pattern, final String queryId) { + this.type = Objects.requireNonNull(type, "type"); + this.pattern = Objects.requireNonNull(pattern, "pattern"); + this.queryId = Objects.requireNonNull(queryId, "queryId"); + } + + @Override + public Type classify(final Throwable e) { + LOG.info("Attempting to classify for {} under regex pattern {}.", queryId, pattern); + + Throwable error = e; + do { + if (matches(error)) { + LOG.warn( + "Classified error for queryId {} under regex pattern {} as type {}.", + queryId, + pattern, + type); + return type; + } + error = error.getCause(); + } while (error != null); + + return Type.UNKNOWN; + } + + private boolean matches(final Throwable e) { + return pattern.matcher(e.getClass().getName()).matches() + || pattern.matcher(e.getMessage()).matches(); + } + +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/RegexClassifierTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/RegexClassifierTest.java new file mode 100644 index 000000000000..f42c9b999a59 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/RegexClassifierTest.java @@ -0,0 +1,96 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.query; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.when; + +import io.confluent.ksql.query.QueryError.Type; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class RegexClassifierTest { + + @Mock + private Throwable error; + @Mock + private Throwable cause; + + @Test + public void shouldClassifyWithRegex() { + // Given: + final QueryErrorClassifier classifier = RegexClassifier.fromConfig("USER .*foo.*", "id"); + givenMessage(error, "foo"); + + // When: + final Type type = classifier.classify(error); + + // Then: + assertThat(type, is(Type.USER)); + } + + @Test + public void shouldClassifyWithRegexInCause() { + // Given: + final QueryErrorClassifier classifier = RegexClassifier.fromConfig("USER .*foo.*", "id"); + when(error.getCause()).thenReturn(cause); + givenMessage(error, "bar"); + givenMessage(cause, "foo"); + + // When: + final Type type = classifier.classify(error); + + // Then: + assertThat(type, is(Type.USER)); + } + + + @Test + public void shouldClassifyAsUnknownIfBadRegex() { + // Given: + final QueryErrorClassifier classifier = RegexClassifier.fromConfig("USER", "id"); + givenMessage(error, "foo"); + + // When: + final Type type = classifier.classify(error); + + // Then: + assertThat(type, is(Type.UNKNOWN)); + } + + @Test + public void shouldClassifyAsUnknown() { + // Given: + final QueryErrorClassifier classifier = RegexClassifier.fromConfig("USER .*foo.*", "id"); + when(error.getCause()).thenReturn(cause); + givenMessage(error, "bar"); + givenMessage(cause, "baz"); + + // When: + final Type type = classifier.classify(error); + + // Then: + assertThat(type, is(Type.UNKNOWN)); + } + + private void givenMessage(final Throwable error, final String message) { + when(error.getMessage()).thenReturn(message); + } +} \ No newline at end of file