From d83c7878494583d6000146e4cb82a81d67e5f1eb Mon Sep 17 00:00:00 2001 From: Steven Zhang <35498506+stevenpyzhang@users.noreply.github.com> Date: Mon, 30 Sep 2019 13:28:51 -0700 Subject: [PATCH] feat: add KsqlUncaughtExceptionHandler and new KsqlRestConfig for enabling it (#3425) * feat: add KsqlUncaughtExceptionHandler and new KsqlRestConfig for enabling it * change config name * set UncaughtExceptionHandler for persistent query streams threads --- .../ksql/physical/PhysicalPlanBuilder.java | 2 + .../KafkaStreamsUncaughtExceptionHandler.java | 28 ++++++++++++ .../ksql/rest/server/KsqlRestApplication.java | 8 ++++ .../ksql/rest/server/KsqlRestConfig.java | 13 ++++++ .../server/computation/CommandRunner.java | 2 +- .../util/KsqlUncaughtExceptionHandler.java | 43 +++++++++++++++++++ .../server/computation/CommandRunnerTest.java | 2 +- 7 files changed, 96 insertions(+), 2 deletions(-) create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/util/KafkaStreamsUncaughtExceptionHandler.java create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/KsqlUncaughtExceptionHandler.java diff --git a/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java b/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java index 6d5471662cbc..4706f33fe574 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java @@ -53,6 +53,7 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.structured.SchemaKStream; import io.confluent.ksql.structured.SchemaKTable; +import io.confluent.ksql.util.KafkaStreamsUncaughtExceptionHandler; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; @@ -281,6 +282,7 @@ private QueryMetadata buildPlanForStructuredOutputNode( ); final KafkaStreams streams = kafkaStreamsBuilder.buildKafkaStreams(builder, streamsProperties); + streams.setUncaughtExceptionHandler(new KafkaStreamsUncaughtExceptionHandler()); final Topology topology = builder.build(); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/KafkaStreamsUncaughtExceptionHandler.java b/ksql-engine/src/main/java/io/confluent/ksql/util/KafkaStreamsUncaughtExceptionHandler.java new file mode 100644 index 000000000000..b3810d645567 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/KafkaStreamsUncaughtExceptionHandler.java @@ -0,0 +1,28 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.util; + +import io.confluent.ksql.engine.KsqlEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaStreamsUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + private static final Logger log = LoggerFactory.getLogger(KsqlEngine.class); + + public void uncaughtException(final Thread t, final Throwable e) { + log.error("Unhandled exception caught in streams thread {}.", t.getName(), e); + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index ed1a6e97dc98..e741a8d0ea5c 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -58,6 +58,7 @@ import io.confluent.ksql.rest.server.state.ServerStateDynamicBinding; import io.confluent.ksql.rest.util.ClusterTerminator; import io.confluent.ksql.rest.util.KsqlInternalTopicUtils; +import io.confluent.ksql.rest.util.KsqlUncaughtExceptionHandler; import io.confluent.ksql.rest.util.ProcessingLogServerUtils; import io.confluent.ksql.rest.util.RocksDBConfigSetterHandler; import io.confluent.ksql.security.KsqlAuthorizationValidator; @@ -107,6 +108,7 @@ import javax.ws.rs.core.Configurable; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.streams.StreamsConfig; +import org.apache.log4j.LogManager; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.websocket.jsr356.server.ServerContainer; import org.glassfish.hk2.utilities.Binder; @@ -445,8 +447,14 @@ static KsqlRestApplication buildApplication( final MutableFunctionRegistry functionRegistry = new InternalFunctionRegistry(); + if (restConfig.getBoolean(KsqlRestConfig.KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER)) { + Thread.setDefaultUncaughtExceptionHandler( + new KsqlUncaughtExceptionHandler(LogManager::shutdown)); + } + final HybridQueryIdGenerator hybridQueryIdGenerator = new HybridQueryIdGenerator(); + final KsqlEngine ksqlEngine = new KsqlEngine( serviceContext, processingLogContext, diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index d49b0b7f1aa0..be7b86caef82 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -64,6 +64,13 @@ public class KsqlRestConfig extends RestConfig { + "will not start serving requests until all preconditions are satisfied. Until that time, " + "requests will return a 503 error"; + static final String KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER = + KSQL_CONFIG_PREFIX + "server.exception.uncaught.handler.enable"; + + private static final String KSQL_SERVER_UNCAUGHT_EXCEPTION_HANDLER_DOC = + "Whether or not to set KsqlUncaughtExceptionHandler as the UncaughtExceptionHandler " + + "for all threads in the application (this can be overridden). Default is false."; + private static final ConfigDef CONFIG_DEF; static { @@ -97,6 +104,12 @@ public class KsqlRestConfig extends RestConfig { "", Importance.LOW, KSQL_SERVER_PRECONDITIONS_DOC + ).define( + KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER, + ConfigDef.Type.BOOLEAN, + false, + Importance.LOW, + KSQL_SERVER_UNCAUGHT_EXCEPTION_HANDLER_DOC ); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index d6bae5e4ab1f..43dbddf96b1b 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -98,7 +98,7 @@ public CommandRunner( * {@link WakeupException} is thrown or the {@link #close()} method is called. */ public void start() { - executor.submit(new Runner()); + executor.execute(new Runner()); executor.shutdown(); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/KsqlUncaughtExceptionHandler.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/KsqlUncaughtExceptionHandler.java new file mode 100644 index 000000000000..8a86adaea9cf --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/KsqlUncaughtExceptionHandler.java @@ -0,0 +1,43 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.util; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.confluent.ksql.rest.server.KsqlServerMain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class KsqlUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + private final Runnable flusher; + + private static final Logger log = LoggerFactory.getLogger(KsqlServerMain.class); + + public KsqlUncaughtExceptionHandler(final Runnable flusher) { + this.flusher = flusher; + } + + @SuppressFBWarnings + public void uncaughtException(final Thread t, final Throwable e) { + log.error("Unhandled exception caught in thread {}.", t.getName(), e); + System.err.println( + "Unhandled exception caught in thread: " + t.getName() + ". Exception:" + e.getMessage()); + + flusher.run(); + + System.exit(-1); + } +} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index ec119cd0bd00..1beed9df005d 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -196,7 +196,7 @@ public void shouldSubmitTaskOnStart() { // Then: final InOrder inOrder = inOrder(executor); - inOrder.verify(executor).submit(any(Runnable.class)); + inOrder.verify(executor).execute(any(Runnable.class)); inOrder.verify(executor).shutdown(); }