Skip to content

Commit

Permalink
feat: introduce RegexClassifier for classifying errors via cfg (#5412)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed May 20, 2020
1 parent 0189e0a commit b25dd98
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 5 deletions.
19 changes: 19 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import io.confluent.ksql.errors.ProductionExceptionHandlerUtil;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.model.SemanticVersion;
import io.confluent.ksql.query.QueryError;
import io.confluent.ksql.testing.EffectivelyImmutable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -262,6 +264,16 @@ public class KsqlConfig extends AbstractConfig {
+ "behavior, and instead throw an exception to ensure that no data is missed, set "
+ "ksql.timestamp.skip.invalid to true.";

public static final String KSQL_ERROR_CLASSIFIER_REGEX_PREFIX = "ksql.error.classifier.regex";
public static final String KSQL_ERROR_CLASSIFIER_REGEX_PREFIX_DOC = "Any configuration with the "
+ "regex prefix will create a new classifier that will be configured to classify anything "
+ "that matches the content as the specified type. The value must match "
+ "<TYPE><whitespace><REGEX> (for example " + KSQL_ERROR_CLASSIFIER_REGEX_PREFIX + ".invalid"
+ "=\"USER .*InvalidTopicException.*\"). The type can be one of "
+ GrammaticalJoiner.or().join(Arrays.stream(QueryError.Type.values()))
+ " and the regex pattern will be matched against the error class name and message of any "
+ "uncaught error and subsequent error causes in the Kafka Streams applications.";

private enum ConfigGeneration {
LEGACY,
CURRENT
Expand Down Expand Up @@ -624,6 +636,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_PULL_MAX_QPS_DOC
)
.define(
KSQL_ERROR_CLASSIFIER_REGEX_PREFIX,
Type.STRING,
"",
Importance.LOW,
KSQL_ERROR_CLASSIFIER_REGEX_PREFIX_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public MissingTopicClassifier(

@Override
public Type classify(final Throwable e) {
LOG.warn("Attempting to classify error for {}", queryId, e);
LOG.info("Attempting to classify error for {}", queryId);
for (String requiredTopic : requiredTopics) {
if (!topicClient.isTopicExists(requiredTopic)) {
LOG.warn("Query {} requires topic {} which cannot be found.", queryId, requiredTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import static io.confluent.ksql.util.KsqlConfig.KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.errors.ProductionExceptionHandlerUtil;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
Expand Down Expand Up @@ -230,6 +232,14 @@ public PersistentQueryMetadata buildQuery(
applicationId
));

final QueryErrorClassifier topicClassifier = new MissingTopicClassifier(
applicationId,
extractTopics(built.topology),
serviceContext.getTopicClient());
final QueryErrorClassifier classifier = buildConfiguredClassifiers(ksqlConfig, applicationId)
.map(topicClassifier::and)
.orElse(topicClassifier);

return new PersistentQueryMetadata(
statementText,
built.kafkaStreams,
Expand All @@ -248,10 +258,7 @@ public PersistentQueryMetadata buildQuery(
overrides,
queryCloseCallback,
ksqlConfig.getLong(KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG),
new MissingTopicClassifier(
applicationId,
extractTopics(built.topology),
serviceContext.getTopicClient()));
classifier);
}

private TransientQueryQueue buildTransientQueryQueue(
Expand Down Expand Up @@ -317,6 +324,32 @@ private Map<String, Object> buildStreamsProperties(
return newStreamsProperties;
}

private static Optional<QueryErrorClassifier> buildConfiguredClassifiers(
final KsqlConfig cfg,
final String queryId
) {
final Map<String, Object> regexPrefixes = cfg.originalsWithPrefix(
KsqlConfig.KSQL_ERROR_CLASSIFIER_REGEX_PREFIX
);

final ImmutableList.Builder<QueryErrorClassifier> builder = ImmutableList.builder();
for (final Object value : regexPrefixes.values()) {
final String classifier = (String) value;
builder.add(RegexClassifier.fromConfig(classifier, queryId));
}
final ImmutableList<QueryErrorClassifier> classifiers = builder.build();

if (classifiers.isEmpty()) {
return Optional.empty();
}

QueryErrorClassifier combined = Iterables.get(classifiers, 0);
for (final QueryErrorClassifier classifier : Iterables.skip(classifiers, 1)) {
combined = combined.and(classifier);
}
return Optional.ofNullable(combined);
}

private static Set<String> extractTopics(final Topology topology) {
final Set<String> usedTopics = new HashSet<>();
for (final Subtopology subtopology : topology.describe().subtopologies()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.query;

import io.confluent.ksql.query.QueryError.Type;
import java.util.Objects;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The {@code RegexClassifier} classifies errors based on regex patterns of
* the stack trace. This class is intended as "last resort" so that noisy
* errors can be classified without deploying a new version of code.
*/
public final class RegexClassifier implements QueryErrorClassifier {

private static final Logger LOG = LoggerFactory.getLogger(RegexClassifier.class);

private final Pattern pattern;
private final Type type;
private final String queryId;

/**
* Specifying a RegexClassifier in a config requires specifying
* {@code <TYPE> <PATTERN>}, for example {@code USER .*InvalidTopicException.*}
*
* @param config the configuration specifying the type and pattern
* @return the classifier
*/
public static QueryErrorClassifier fromConfig(final String config, final String queryId) {
final String[] split = config.split("\\s", 2);
if (split.length < 2) {
LOG.warn("Ignoring invalid configuration for RegexClassifier: " + config);
return err -> Type.UNKNOWN;
}

return new RegexClassifier(
Type.valueOf(split[0].toUpperCase()),
Pattern.compile(split[1], Pattern.DOTALL),
queryId
);
}

private RegexClassifier(final Type type, final Pattern pattern, final String queryId) {
this.type = Objects.requireNonNull(type, "type");
this.pattern = Objects.requireNonNull(pattern, "pattern");
this.queryId = Objects.requireNonNull(queryId, "queryId");
}

@Override
public Type classify(final Throwable e) {
LOG.info("Attempting to classify for {} under regex pattern {}.", queryId, pattern);

Throwable error = e;
do {
if (matches(error)) {
LOG.warn(
"Classified error for queryId {} under regex pattern {} as type {}.",
queryId,
pattern,
type);
return type;
}
error = error.getCause();
} while (error != null);

return Type.UNKNOWN;
}

private boolean matches(final Throwable e) {
return pattern.matcher(e.getClass().getName()).matches()
|| pattern.matcher(e.getMessage()).matches();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.query;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.when;

import io.confluent.ksql.query.QueryError.Type;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class RegexClassifierTest {

@Mock
private Throwable error;
@Mock
private Throwable cause;

@Test
public void shouldClassifyWithRegex() {
// Given:
final QueryErrorClassifier classifier = RegexClassifier.fromConfig("USER .*foo.*", "id");
givenMessage(error, "foo");

// When:
final Type type = classifier.classify(error);

// Then:
assertThat(type, is(Type.USER));
}

@Test
public void shouldClassifyWithRegexInCause() {
// Given:
final QueryErrorClassifier classifier = RegexClassifier.fromConfig("USER .*foo.*", "id");
when(error.getCause()).thenReturn(cause);
givenMessage(error, "bar");
givenMessage(cause, "foo");

// When:
final Type type = classifier.classify(error);

// Then:
assertThat(type, is(Type.USER));
}


@Test
public void shouldClassifyAsUnknownIfBadRegex() {
// Given:
final QueryErrorClassifier classifier = RegexClassifier.fromConfig("USER", "id");
givenMessage(error, "foo");

// When:
final Type type = classifier.classify(error);

// Then:
assertThat(type, is(Type.UNKNOWN));
}

@Test
public void shouldClassifyAsUnknown() {
// Given:
final QueryErrorClassifier classifier = RegexClassifier.fromConfig("USER .*foo.*", "id");
when(error.getCause()).thenReturn(cause);
givenMessage(error, "bar");
givenMessage(cause, "baz");

// When:
final Type type = classifier.classify(error);

// Then:
assertThat(type, is(Type.UNKNOWN));
}

private void givenMessage(final Throwable error, final String message) {
when(error.getMessage()).thenReturn(message);
}
}

0 comments on commit b25dd98

Please sign in to comment.