Skip to content

Commit

Permalink
feat: add config to make error messages configurable (#4121)
Browse files Browse the repository at this point in the history
* feat: add config to make error messages configurable

* refactor the config

* changes
  • Loading branch information
stevenpyzhang committed Dec 17, 2019
1 parent d595985 commit cedf47e
Show file tree
Hide file tree
Showing 13 changed files with 297 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.query.id.SpecificQueryIdGenerator;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
Expand Down Expand Up @@ -381,6 +383,10 @@ protected void registerWebSocketEndpoints(final ServerContainer container) {
final StatementParser statementParser = new StatementParser(ksqlEngine);
final Optional<KsqlAuthorizationValidator> authorizationValidator =
KsqlAuthorizationValidatorFactory.create(ksqlConfigNoPort, serviceContext);
final Errors errorHandler = new Errors(restConfig.getConfiguredInstance(
KsqlRestConfig.KSQL_SERVER_ERROR_MESSAGES,
ErrorMessages.class
));

container.addEndpoint(
ServerEndpointConfig.Builder
Expand All @@ -403,6 +409,7 @@ public <T> T getEndpointInstance(final Class<T> endpointClass) {
Duration.ofMillis(config.getLong(
KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
authorizationValidator,
errorHandler,
securityExtension,
serverState
);
Expand Down Expand Up @@ -498,14 +505,29 @@ static KsqlRestApplication buildApplication(
final Optional<KsqlAuthorizationValidator> authorizationValidator =
KsqlAuthorizationValidatorFactory.create(ksqlConfig, serviceContext);

final Errors errorHandler = new Errors(restConfig.getConfiguredInstance(
KsqlRestConfig.KSQL_SERVER_ERROR_MESSAGES,
ErrorMessages.class
));

final StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
ksqlEngine,
commandStore,
Duration.ofMillis(
restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)),
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
versionChecker::updateLastRequestTime,
authorizationValidator
authorizationValidator,
errorHandler
);

final KsqlResource ksqlResource = new KsqlResource(
ksqlEngine,
commandStore,
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
versionChecker::updateLastRequestTime,
authorizationValidator,
errorHandler
);

final List<String> managedTopics = new LinkedList<>();
Expand All @@ -526,14 +548,6 @@ static KsqlRestApplication buildApplication(
metricsPrefix
);

final KsqlResource ksqlResource = new KsqlResource(
ksqlEngine,
commandStore,
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
versionChecker::updateLastRequestTime,
authorizationValidator
);

final List<KsqlServerPrecondition> preconditions = restConfig.getConfiguredInstances(
KsqlRestConfig.KSQL_SERVER_PRECONDITIONS,
KsqlServerPrecondition.class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package io.confluent.ksql.rest.server;

import io.confluent.ksql.rest.DefaultErrorMessages;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.util.KsqlException;
import io.confluent.rest.RestConfig;
import java.util.Map;
Expand Down Expand Up @@ -64,6 +66,12 @@ public class KsqlRestConfig extends RestConfig {
+ "will not start serving requests until all preconditions are satisfied. Until that time, "
+ "requests will return a 503 error";

static final String KSQL_SERVER_ERROR_MESSAGES =
KSQL_CONFIG_PREFIX + "server.error.messages";
private static final String KSQL_SERVER_ERRORS_DOC =
"A class the implementing " + ErrorMessages.class.getSimpleName() + " interface."
+ "This allows the KSQL server to return pluggable error messages.";

static final String KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER =
KSQL_CONFIG_PREFIX + "server.exception.uncaught.handler.enable";

Expand Down Expand Up @@ -136,6 +144,12 @@ public class KsqlRestConfig extends RestConfig {
15000L,
Importance.LOW,
KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS_DOC
).define(
KSQL_SERVER_ERROR_MESSAGES,
Type.CLASS,
DefaultErrorMessages.class,
Importance.LOW,
KSQL_SERVER_ERRORS_DOC
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import io.confluent.ksql.rest.server.validation.CustomValidators;
import io.confluent.ksql.rest.server.validation.RequestValidator;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.rest.util.ErrorResponseUtil;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.services.SandboxedServiceContext;
Expand Down Expand Up @@ -102,22 +101,25 @@ public class KsqlResource implements KsqlConfigurable {
private final Optional<KsqlAuthorizationValidator> authorizationValidator;
private RequestValidator validator;
private RequestHandler handler;
private Errors errorHandler;


public KsqlResource(
final KsqlEngine ksqlEngine,
final CommandQueue commandQueue,
final Duration distributedCmdResponseTimeout,
final ActivenessRegistrar activenessRegistrar,
final Optional<KsqlAuthorizationValidator> authorizationValidator
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final Errors errorHandler
) {
this(
ksqlEngine,
commandQueue,
distributedCmdResponseTimeout,
activenessRegistrar,
Injectors.DEFAULT,
authorizationValidator
authorizationValidator,
errorHandler
);
}

Expand All @@ -127,7 +129,8 @@ public KsqlResource(
final Duration distributedCmdResponseTimeout,
final ActivenessRegistrar activenessRegistrar,
final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory,
final Optional<KsqlAuthorizationValidator> authorizationValidator
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final Errors errorHandler
) {
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue");
Expand All @@ -138,6 +141,7 @@ public KsqlResource(
this.injectorFactory = Objects.requireNonNull(injectorFactory, "injectorFactory");
this.authorizationValidator = Objects
.requireNonNull(authorizationValidator, "authorizationValidator");
this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler");
}

@Override
Expand Down Expand Up @@ -233,10 +237,9 @@ public Response handleKsqlStatements(
} catch (final KsqlStatementException e) {
return Errors.badStatement(e.getRawMessage(), e.getSqlStatement());
} catch (final KsqlException e) {
return ErrorResponseUtil.generateResponse(
e, Errors.badRequest(e));
return errorHandler.generateResponse(e, Errors.badRequest(e));
} catch (final Exception e) {
return ErrorResponseUtil.generateResponse(
return errorHandler.generateResponse(
e, Errors.serverErrorForStatement(e, request.getKsql()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.confluent.ksql.rest.server.resources.KsqlConfigurable;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.rest.util.ErrorResponseUtil;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
Expand Down Expand Up @@ -80,6 +79,7 @@ public class StreamedQueryResource implements KsqlConfigurable {
private final ObjectMapper objectMapper;
private final ActivenessRegistrar activenessRegistrar;
private final Optional<KsqlAuthorizationValidator> authorizationValidator;
private final Errors errorHandler;
private KsqlConfig ksqlConfig;

public StreamedQueryResource(
Expand All @@ -88,7 +88,8 @@ public StreamedQueryResource(
final Duration disconnectCheckInterval,
final Duration commandQueueCatchupTimeout,
final ActivenessRegistrar activenessRegistrar,
final Optional<KsqlAuthorizationValidator> authorizationValidator
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final Errors errorHandler
) {
this(
ksqlEngine,
Expand All @@ -97,7 +98,8 @@ public StreamedQueryResource(
disconnectCheckInterval,
commandQueueCatchupTimeout,
activenessRegistrar,
authorizationValidator
authorizationValidator,
errorHandler
);
}

Expand All @@ -109,7 +111,8 @@ public StreamedQueryResource(
final Duration disconnectCheckInterval,
final Duration commandQueueCatchupTimeout,
final ActivenessRegistrar activenessRegistrar,
final Optional<KsqlAuthorizationValidator> authorizationValidator
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final Errors errorHandler
) {
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.statementParser = Objects.requireNonNull(statementParser, "statementParser");
Expand All @@ -122,6 +125,7 @@ public StreamedQueryResource(
this.activenessRegistrar =
Objects.requireNonNull(activenessRegistrar, "activenessRegistrar");
this.authorizationValidator = authorizationValidator;
this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler");;
}

@Override
Expand Down Expand Up @@ -224,12 +228,11 @@ private Response handleStatement(
"Statement type `%s' not supported for this resource",
statement.getClass().getName()));
} catch (final TopicAuthorizationException e) {
return Errors.accessDeniedFromKafka(e);
return errorHandler.accessDeniedFromKafkaResponse(e);
} catch (final KsqlStatementException e) {
return Errors.badStatement(e.getRawMessage(), e.getSqlStatement());
} catch (final KsqlException e) {
return ErrorResponseUtil.generateResponse(
e, Errors.badRequest(e));
return errorHandler.generateResponse(e, Errors.badRequest(e));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.StreamedRow;
Expand Down Expand Up @@ -60,6 +61,7 @@
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import javax.ws.rs.core.Response;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -93,6 +95,7 @@ public class WSQueryEndpoint {
private final UserServiceContextFactory serviceContextFactory;
private final DefaultServiceContextFactory defaultServiceContextFactory;
private final ServerState serverState;
private final Errors errorHandler;

private WebSocketSubscriber<?> subscriber;
private ServiceContext serviceContext;
Expand All @@ -109,6 +112,7 @@ public WSQueryEndpoint(
final ActivenessRegistrar activenessRegistrar,
final Duration commandQueueCatchupTimeout,
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final Errors errorHandler,
final KsqlSecurityExtension securityExtension,
final ServerState serverState
) {
Expand All @@ -124,6 +128,7 @@ public WSQueryEndpoint(
activenessRegistrar,
commandQueueCatchupTimeout,
authorizationValidator,
errorHandler,
securityExtension,
RestServiceContextFactory::create,
RestServiceContextFactory::create,
Expand All @@ -145,6 +150,7 @@ public WSQueryEndpoint(
final ActivenessRegistrar activenessRegistrar,
final Duration commandQueueCatchupTimeout,
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final Errors errorHandler,
final KsqlSecurityExtension securityExtension,
final UserServiceContextFactory serviceContextFactory,
final DefaultServiceContextFactory defaultServiceContextFactory,
Expand Down Expand Up @@ -172,6 +178,7 @@ public WSQueryEndpoint(
this.defaultServiceContextFactory =
Objects.requireNonNull(defaultServiceContextFactory, "defaultServiceContextFactory");
this.serverState = Objects.requireNonNull(serverState, "serverState");
this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler");;
}

@SuppressWarnings("unused")
Expand Down Expand Up @@ -221,6 +228,12 @@ public void onOpen(final Session session, final EndpointConfig unused) {
HANDLER_MAP
.getOrDefault(type, WSQueryEndpoint::handleUnsupportedStatement)
.handle(this, new RequestContext(session, request, serviceContext), statement);
} catch (final TopicAuthorizationException e) {
log.debug("Error processing request", e);
SessionUtil.closeSilently(
session,
CloseCodes.CANNOT_ACCEPT,
errorHandler.kafkaAuthorizationErrorMessage(e));
} catch (final Exception e) {
log.debug("Error processing request", e);
SessionUtil.closeSilently(session, CloseCodes.CANNOT_ACCEPT, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.SpecificQueryIdGenerator;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandId.Action;
import io.confluent.ksql.rest.entity.CommandId.Type;
Expand Down Expand Up @@ -218,7 +219,8 @@ private class KsqlServer {
fakeCommandQueue,
Duration.ofMillis(0),
()->{},
Optional.of((sc, metastore, statement) -> { })
Optional.of((sc, metastore, statement) -> { }),
mock(Errors.class)
);

this.statementExecutor.configure(ksqlConfig);
Expand Down
Loading

0 comments on commit cedf47e

Please sign in to comment.