Skip to content

Commit

Permalink
refactor: Cleanup and refactoring of new server API (#5368)
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox committed May 18, 2020
1 parent 62dacca commit a56c54a
Show file tree
Hide file tree
Showing 61 changed files with 770 additions and 1,099 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private static <T> void handleErrorResponse(
"Received %d response from server: %s. Error code: %d",
response.statusCode(),
errorResponse.getString("message"),
errorResponse.getInteger("errorCode")
errorResponse.getInteger("error_code")
)));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import io.confluent.ksql.api.client.ColumnType;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.util.RowUtil;
import io.confluent.ksql.api.server.protocol.QueryResponseMetadata;
import io.confluent.ksql.rest.client.KsqlRestClientException;
import io.confluent.ksql.rest.entity.QueryResponseMetadata;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.util.JsonMapper;
import io.confluent.ksql.api.server.protocol.QueryResponseMetadata;
import io.confluent.ksql.rest.entity.QueryResponseMetadata;
import io.confluent.ksql.util.VertxUtils;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.util.RowUtil;
import io.confluent.ksql.api.server.KsqlApiException;
import io.confluent.ksql.api.server.protocol.QueryResponseMetadata;
import io.confluent.ksql.rest.entity.QueryResponseMetadata;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
Expand Down Expand Up @@ -75,14 +75,10 @@ protected void handleRow(final Buffer buff) {
}
} else if (json instanceof JsonObject) {
final JsonObject error = (JsonObject) json;
if ("error".equals(error.getString("status"))) {
queryResult.handleError(new KsqlApiException(
error.getString("message"),
error.getInteger("errorCode"))
);
} else {
throw new RuntimeException("Unexpected response from server: " + error);
}
queryResult.handleError(new KsqlApiException(
error.getString("message"),
error.getInteger("error_code"))
);
} else {
throw new RuntimeException("Could not decode JSON: " + json);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package io.confluent.ksql.api.client;

import static io.confluent.ksql.api.server.ErrorCodes.ERROR_CODE_UNKNOWN_QUERY_ID;
import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_REQUEST;
import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
Expand All @@ -33,9 +33,9 @@
import io.confluent.ksql.api.client.impl.StreamedQueryResultImpl;
import io.confluent.ksql.api.client.util.RowUtil;
import io.confluent.ksql.api.server.KsqlApiException;
import io.confluent.ksql.api.server.PushQueryId;
import io.confluent.ksql.parser.exception.ParseFailedException;
import io.confluent.ksql.rest.client.KsqlRestClientException;
import io.confluent.ksql.rest.entity.PushQueryId;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClient;
Expand All @@ -45,7 +45,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -487,7 +486,7 @@ public void shouldHandleErrorResponseFromTerminatePushQuery() {
assertThat(e.getCause(), instanceOf(KsqlRestClientException.class));
assertThat(e.getCause().getMessage(), containsString("Received 400 response from server"));
assertThat(e.getCause().getMessage(), containsString("No query with id"));
assertThat(e.getCause().getMessage(), containsString("Error code: " + ERROR_CODE_UNKNOWN_QUERY_ID));
assertThat(e.getCause().getMessage(), containsString("Error code: " + ERROR_CODE_BAD_REQUEST));
}

protected Client createJavaClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.confluent.ksql.api.tck;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.endpoints.BlockingQueryPublisher;
import io.confluent.ksql.api.impl.BlockingQueryPublisher;
import io.confluent.ksql.api.server.PushQueryHandle;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.TransientQueryQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package io.confluent.ksql.api.auth;

import static io.confluent.ksql.api.server.ServerUtils.convertCommaSeparatedWilcardsToRegex;
import static io.confluent.ksql.rest.Errors.ERROR_CODE_UNAUTHORIZED;
import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;

import io.confluent.ksql.api.server.ErrorCodes;
import io.confluent.ksql.api.server.KsqlApiException;
import io.confluent.ksql.api.server.Server;
import io.confluent.ksql.rest.server.KsqlRestConfig;
Expand Down Expand Up @@ -77,7 +77,7 @@ public void handle(final RoutingContext routingContext) {
// Not authenticated
routingContext
.fail(UNAUTHORIZED.code(), new KsqlApiException("Failed authentication",
ErrorCodes.ERROR_FAILED_AUTHENTICATION));
ERROR_CODE_UNAUTHORIZED));
} else {
routingContext.setUser(new AuthPluginUser(principal));
routingContext.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.endpoints;
package io.confluent.ksql.api.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.GenericRow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.endpoints;
package io.confluent.ksql.api.impl;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.api.auth.ApiSecurityContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.endpoints;
package io.confluent.ksql.api.impl;

import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.endpoints;
package io.confluent.ksql.api.impl;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.server.InsertResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.endpoints;
package io.confluent.ksql.api.impl;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.server.ErrorCodes;
import io.confluent.ksql.api.server.KsqlApiException;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SchemaConverters;
Expand All @@ -44,7 +44,7 @@ public static Struct extractKey(final JsonObject values, final LogicalSchema log
final Object value = values.getValue(field.name());
if (value == null) {
throw new KsqlApiException("Key field must be specified: " + field.name(),
ErrorCodes.ERROR_CODE_MISSING_KEY_FIELD);
Errors.ERROR_CODE_BAD_REQUEST);
}
final Object coercedValue = coerceObject(value,
SchemaConverters.connectToSqlConverter().toSqlType(field.schema()),
Expand Down Expand Up @@ -92,7 +92,7 @@ private static Object coerceObject(
.orElseThrow(() -> new KsqlApiException(
String.format("Can't coerce a field of type %s (%s) into type %s", value.getClass(),
value, sqlType),
ErrorCodes.ERROR_CODE_CANNOT_COERCE_FIELD))
Errors.ERROR_CODE_BAD_REQUEST))
.orElse(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.endpoints;
package io.confluent.ksql.api.impl;

import io.confluent.ksql.api.auth.ApiSecurityContext;
import io.confluent.ksql.security.KsqlSecurityContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.endpoints;
package io.confluent.ksql.api.impl;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.spi.QueryPublisher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.endpoints;
package io.confluent.ksql.api.impl;

import io.confluent.ksql.api.server.PushQueryHandle;
import io.confluent.ksql.api.spi.QueryPublisher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@

package io.confluent.ksql.api.server;

import static io.confluent.ksql.api.server.ErrorCodes.ERROR_CODE_INTERNAL_ERROR;
import static io.confluent.ksql.rest.Errors.ERROR_CODE_SERVER_ERROR;

import io.confluent.ksql.api.server.protocol.InsertAck;
import io.confluent.ksql.api.server.protocol.InsertError;
import io.confluent.ksql.reactive.BaseSubscriber;
import io.confluent.ksql.rest.entity.InsertAck;
import io.confluent.ksql.rest.entity.InsertError;
import io.vertx.core.Context;
import io.vertx.core.http.HttpServerResponse;
import java.util.Objects;
Expand Down Expand Up @@ -105,7 +105,7 @@ private void handleFailedInsert(final InsertResult result) {
((KsqlApiException) exception).getErrorCode(),
exception.getMessage());
} else {
insertError = new InsertError(result.sequenceNumber(), ERROR_CODE_INTERNAL_ERROR,
insertError = new InsertError(result.sequenceNumber(), ERROR_CODE_SERVER_ERROR,
"Error in processing inserts");
}
insertsStreamResponseWriter.writeError(insertError).end();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.server;

import static io.confluent.ksql.rest.Errors.ERROR_CODE_UNAUTHORIZED;
import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;

import io.confluent.ksql.api.auth.AuthenticationPlugin;
import io.confluent.ksql.api.auth.AuthenticationPluginHandler;
import io.confluent.ksql.api.auth.JaasAuthProvider;
import io.confluent.ksql.api.auth.KsqlAuthorizationProviderHandler;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.security.KsqlSecurityExtension;
import io.vertx.core.Handler;
import io.vertx.ext.auth.AuthProvider;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.AuthHandler;
import io.vertx.ext.web.handler.BasicAuthHandler;
import java.util.Optional;

public final class AuthHandlers {

private AuthHandlers() {
}

static void setupAuthHandlers(final Server server, final Router router) {
final Optional<AuthHandler> jaasAuthHandler = getJaasAuthHandler(server);
final KsqlSecurityExtension securityExtension = server.getSecurityExtension();
final Optional<AuthenticationPlugin> authenticationPlugin = server.getAuthenticationPlugin();
final Optional<Handler<RoutingContext>> pluginHandler =
authenticationPlugin.map(plugin -> new AuthenticationPluginHandler(server, plugin));

if (jaasAuthHandler.isPresent() || authenticationPlugin.isPresent()) {
router.route().handler(AuthHandlers::pauseHandler);

router.route().handler(rc -> wrappedAuthHandler(rc, jaasAuthHandler, pluginHandler));

// For authorization use auth provider configured via security extension (if any)
securityExtension.getAuthorizationProvider()
.ifPresent(ksqlAuthorizationProvider -> router.route()
.handler(new KsqlAuthorizationProviderHandler(server.getWorkerExecutor(),
ksqlAuthorizationProvider)));

router.route().handler(AuthHandlers::resumeHandler);
}
}

private static void wrappedAuthHandler(final RoutingContext routingContext,
final Optional<AuthHandler> jaasAuthHandler,
final Optional<Handler<RoutingContext>> pluginHandler) {
if (jaasAuthHandler.isPresent()) {
// If we have a Jaas handler configured and we have Basic credentials then we should auth
// with that
final String authHeader = routingContext.request().getHeader("Authorization");
if (authHeader != null && authHeader.toLowerCase().startsWith("basic ")) {
jaasAuthHandler.get().handle(routingContext);
return;
}
}
// Fall through to authing with any authentication plugin
if (pluginHandler.isPresent()) {
pluginHandler.get().handle(routingContext);
} else {
// Fail the request as unauthorized - this will occur if no auth plugin but Jaas handler
// is configured, but auth header is not basic auth
routingContext
.fail(UNAUTHORIZED.code(),
new KsqlApiException("Unauthorized", ERROR_CODE_UNAUTHORIZED));
}
}

private static Optional<AuthHandler> getJaasAuthHandler(final Server server) {
final String authMethod = server.getConfig()
.getString(KsqlRestConfig.AUTHENTICATION_METHOD_CONFIG);
switch (authMethod) {
case KsqlRestConfig.AUTHENTICATION_METHOD_BASIC:
return Optional.of(basicAuthHandler(server));
case KsqlRestConfig.AUTHENTICATION_METHOD_NONE:
return Optional.empty();
default:
throw new IllegalStateException(String.format(
"Unexpected value for %s: %s",
KsqlRestConfig.AUTHENTICATION_METHOD_CONFIG,
authMethod
));
}
}

private static AuthHandler basicAuthHandler(final Server server) {
final AuthProvider authProvider = new JaasAuthProvider(server, server.getConfig());
final String realm = server.getConfig().getString(KsqlRestConfig.AUTHENTICATION_REALM_CONFIG);
final AuthHandler basicAuthHandler = BasicAuthHandler.create(authProvider, realm);
// It doesn't matter what we set here as we actually do the authorisation at the
// authentication stage and cache the result, but we must add an authority or
// no authorisation will be done
basicAuthHandler.addAuthority("ksql");
return basicAuthHandler;
}

private static void pauseHandler(final RoutingContext routingContext) {
// prevent auth handler from reading request body
routingContext.request().pause();
routingContext.next();
}

private static void resumeHandler(final RoutingContext routingContext) {
// Un-pause body handling as async auth provider calls have completed by this point
routingContext.request().resume();
routingContext.next();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@

package io.confluent.ksql.api.server;

import static io.confluent.ksql.api.server.ErrorCodes.ERROR_CODE_UNKNOWN_QUERY_ID;
import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;

import io.confluent.ksql.api.server.protocol.CloseQueryArgs;
import io.confluent.ksql.rest.entity.CloseQueryArgs;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;
import java.util.Objects;
Expand Down Expand Up @@ -50,7 +50,7 @@ public void handle(final RoutingContext routingContext) {
routingContext
.fail(BAD_REQUEST.code(),
new KsqlApiException("No query with id " + closeQueryArgs.get().queryId,
ERROR_CODE_UNKNOWN_QUERY_ID));
ERROR_CODE_BAD_REQUEST));
return;
}
query.get().close();
Expand Down
Loading

0 comments on commit a56c54a

Please sign in to comment.