Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Do not allow access to new streaming endpoints using HTTP1.x #5193

Merged
merged 4 commits into from
Apr 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public CloseQueryHandler(final Server server) {

@Override
public void handle(final RoutingContext routingContext) {

final Optional<CloseQueryArgs> closeQueryArgs = ServerUtils
.deserialiseObject(routingContext.getBody(), routingContext, CloseQueryArgs.class);
if (!closeQueryArgs.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private ErrorCodes() {
public static final int ERROR_MAX_PUSH_QUERIES_EXCEEDED = 8;
public static final int ERROR_FAILED_AUTHENTICATION = 9;
public static final int ERROR_FAILED_AUTHORIZATION = 10;
public static final int ERROR_HTTP2_ONLY = 11;

public static final int ERROR_CODE_INTERNAL_ERROR = 100;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.api.server;

import static io.confluent.ksql.api.server.QueryStreamHandler.DELIMITED_CONTENT_TYPE;
import static io.confluent.ksql.api.server.ServerUtils.checkHttp2;
import static io.confluent.ksql.api.server.ServerUtils.deserialiseObject;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;

Expand Down Expand Up @@ -64,6 +65,11 @@ public InsertsStreamHandler(final Context ctx, final Endpoints endpoints,

@Override
public void handle(final RoutingContext routingContext) {

if (!checkHttp2(routingContext)) {
return;
}

// The record parser takes in potentially fragmented buffers from the request and spits
// out the chunks delimited by newline
final RecordParser recordParser = RecordParser.newDelimited("\n", routingContext.request());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.api.server;

import static io.confluent.ksql.api.server.ServerUtils.checkHttp2;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;

Expand Down Expand Up @@ -56,9 +57,14 @@ public QueryStreamHandler(final Endpoints endpoints,
this.server = Objects.requireNonNull(server);
}


@Override
public void handle(final RoutingContext routingContext) {

if (!checkHttp2(routingContext)) {
return;
}

final String contentType = routingContext.getAcceptableContentType();
final QueryStreamResponseWriter queryStreamResponseWriter;
if (DELIMITED_CONTENT_TYPE.equals(contentType) || contentType == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.api.server.protocol.PojoCodec;
import io.confluent.ksql.api.server.protocol.PojoDeserializerErrorHandler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpVersion;
import io.vertx.ext.web.RoutingContext;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -67,6 +68,17 @@ public static String convertCommaSeparatedWilcardsToRegex(final String csv) {
return out.toString();
}

public static boolean checkHttp2(final RoutingContext routingContext) {
if (routingContext.request().version() != HttpVersion.HTTP_2) {
routingContext.fail(BAD_REQUEST.code(),
new KsqlApiException("This endpoint is only available when using HTTP2",
ErrorCodes.ERROR_HTTP2_ONLY));
return false;
} else {
return true;
}
}

private static class HttpResponseErrorHandler implements PojoDeserializerErrorHandler {

private final RoutingContext routingContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,6 @@ private void shouldRejectWhenInternalErrorInProcessingQuery(final String query)
queryResponse.responseObject);
}


private static void validateInsertStreamError(final int errorCode, final String message,
final JsonObject error, final long sequence) {
assertThat(error.size(), is(4));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api;

import static io.confluent.ksql.api.server.ErrorCodes.ERROR_HTTP2_ONLY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import io.confluent.ksql.api.utils.QueryResponse;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClientOptions;
import org.junit.Test;

public class Http2OnlyStreamTest extends BaseApiTest {

protected WebClientOptions createClientOptions() {
return new WebClientOptions()
.setDefaultHost("localhost")
.setDefaultPort(server.getListeners().get(0).getPort())
.setReusePort(true);
}

@Test
public void shouldRejectQueryUsingHttp11() throws Exception {

// Given:
JsonObject requestBody = new JsonObject().put("sql", DEFAULT_PULL_QUERY);
JsonObject properties = new JsonObject().put("prop1", "val1").put("prop2", 23);
requestBody.put("properties", properties);

// Then
shouldRejectRequestUsingHttp11("/query-stream", requestBody);
}

@Test
public void shouldRejectInsertsUsingHttp11() throws Exception {

// Given:
JsonObject requestBody = new JsonObject().put("target", "test-stream");

// Then:
shouldRejectRequestUsingHttp11("/inserts-stream", requestBody);
}

private void shouldRejectRequestUsingHttp11(final String uri, final JsonObject request)
throws Exception {
// When
HttpResponse<Buffer> response = sendRequest(uri, request.toBuffer());

// Then
assertThat(response.statusCode(), is(400));
QueryResponse queryResponse = new QueryResponse(response.bodyAsString());
validateError(ERROR_HTTP2_ONLY, "This endpoint is only available when using HTTP2",
queryResponse.responseObject);
}


}