Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KREST-10243 Add custom-request-logging to kafka-rest, and log error-codes for when various rate-limiters are triggered #1168

Merged
merged 11 commits into from
Jun 20, 2023
1 change: 1 addition & 0 deletions checkstyle/import_control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
<allow class="javax.xml.bind.DatatypeConverter" />
<allow class="org.everit.json.schema.ValidationException" />
<allow class="org.hibernate.validator.constraints.URL" />
<allow class="javax.servlet.http.HttpServletRequest" />

<!-- Static field imports -->
<allow class="io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS" />
Expand Down
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@
<suppress checks="MethodTypeParameterName" files="AbstractConsumerTest" />

<!-- The tests below are fairly big and complex, so they go over the NCSS threshold. -->
<suppress checks="JavaNCSS" files="ProtobufConverterTest|ProduceActionTest" />
<suppress checks="JavaNCSS" files="ProtobufConverterTest|ProduceActionTest|CustomLogIntegrationTest" />

<!-- TestUtils#encodeComparable contains a complex, switch-like if-else structure. -->
<suppress checks="CyclomaticComplexity" files="TestUtils" />
<suppress checks="CyclomaticComplexity" files="TestUtils|CustomLogIntegrationTest" />

<suppress
checks="ClassDataAbstractionCoupling"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,36 @@
import io.confluent.kafkarest.extension.ResourceAccesslistFeature;
import io.confluent.kafkarest.extension.RestResourceExtension;
import io.confluent.kafkarest.ratelimit.RateLimitFeature;
import io.confluent.kafkarest.requestlog.CustomLog;
import io.confluent.kafkarest.requestlog.CustomLogRequestAttributes;
import io.confluent.kafkarest.requestlog.GlobalDosFilterListener;
import io.confluent.kafkarest.requestlog.PerConnectionDosFilterListener;
import io.confluent.kafkarest.resources.ResourcesFeature;
import io.confluent.kafkarest.response.JsonStreamMessageBodyReader;
import io.confluent.kafkarest.response.ResponseModule;
import io.confluent.rest.Application;
import io.confluent.rest.RestConfig;
import io.confluent.rest.exceptions.ConstraintViolationExceptionMapper;
import io.confluent.rest.exceptions.WebApplicationExceptionMapper;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Properties;
import java.util.TimeZone;
import javax.ws.rs.core.Configurable;
import org.eclipse.jetty.server.CustomRequestLog;
import org.eclipse.jetty.server.RequestLog;
import org.eclipse.jetty.server.Slf4jRequestLogWriter;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.StringUtil;
import org.glassfish.jersey.server.ServerProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Utilities for configuring and running an embedded Kafka server. */
public class KafkaRestApplication extends Application<KafkaRestConfig> {

private static final Logger log = LoggerFactory.getLogger(KafkaRestApplication.class);

List<RestResourceExtension> restResourceExtensions;

public KafkaRestApplication() {
Expand All @@ -69,12 +81,61 @@ public KafkaRestApplication(KafkaRestConfig config, String path) {
}

public KafkaRestApplication(KafkaRestConfig config, String path, String listenerName) {
super(config, path, listenerName);
this(config, path, listenerName, null /* requestLogWriter */, null /* requestLogFormat */);
}

/* This public-constructor exists to facilitate testing with a custom requestLogWriter, and
* requestLogFormat in an integration test in a different package.
*/
public KafkaRestApplication(
KafkaRestConfig config,
String path,
String listenerName,
RequestLog.Writer requestLogWriter,
String requestLogFormat) {
super(
config,
path,
listenerName,
createRequestLog(config, requestLogWriter, requestLogFormat, log, listenerName));

restResourceExtensions =
config.getConfiguredInstances(
KafkaRestConfig.KAFKA_REST_RESOURCE_EXTENSION_CONFIG, RestResourceExtension.class);
config.setMetrics(metrics);

// Set up listeners for dos-filters, needed for custom-logging for when dos-filter rate-limits.
this.addNonGlobalDosfilterListener(new PerConnectionDosFilterListener());
this.addGlobalDosfilterListener(new GlobalDosFilterListener());
}

private static RequestLog createRequestLog(
KafkaRestConfig config,
RequestLog.Writer requestLogWriter,
String requestLogFormat,
Logger log,
String listenerName) {
if (config.getBoolean(KafkaRestConfig.USE_CUSTOM_REQUEST_LOGGING_CONFIG)) {
msn-tldr marked this conversation as resolved.
Show resolved Hide resolved
log.info("For rest-app with listener {}, configuring custom request logging", listenerName);
if (requestLogWriter == null) {
Slf4jRequestLogWriter logWriter = new Slf4jRequestLogWriter();
logWriter.setLoggerName(config.getString(RestConfig.REQUEST_LOGGER_NAME_CONFIG));
requestLogWriter = logWriter;
}

if (requestLogFormat == null) {
requestLogFormat = CustomRequestLog.EXTENDED_NCSA_FORMAT + " %{ms}T";
}

CustomLog customRequestLog =
new CustomLog(
requestLogWriter,
requestLogFormat,
new String[] {CustomLogRequestAttributes.REST_ERROR_CODE});
return customRequestLog;
}
// Return null, as Application's ctor would set-up a default request-logger.
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,12 @@ public class KafkaRestConfig extends RestConfig {
+ "requests will be processed for before the connection is closed.";
private static final String STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT = "500";

public static final String USE_CUSTOM_REQUEST_LOGGING_CONFIG = "use.custom.request.logging";
private static final String USE_CUSTOM_REQUEST_LOGGING_DOC =
"Whether to use custom-request-logging i.e. CustomLog.java. Instead of using"
+ "Jetty's request-logging.";
private static final boolean USE_CUSTOM_REQUEST_LOGGING_DEFAULT = true;

private static final ConfigDef config;
private volatile Metrics metrics;

Expand Down Expand Up @@ -889,7 +895,13 @@ protected static ConfigDef baseKafkaRestConfigDef() {
Type.LONG,
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT,
Importance.LOW,
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC);
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC)
.define(
USE_CUSTOM_REQUEST_LOGGING_CONFIG,
Type.BOOLEAN,
USE_CUSTOM_REQUEST_LOGGING_DEFAULT,
Importance.LOW,
USE_CUSTOM_REQUEST_LOGGING_DOC);
}

private static Properties getPropsFromFile(String propsFile) throws RestConfigException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
package io.confluent.kafkarest.ratelimit;

import static com.google.common.base.Preconditions.checkArgument;
import static io.confluent.kafkarest.ratelimit.RateLimitExceededException.ErrorCodes.PERMITS_MAX_GLOBAL_LIMIT_EXCEEDED;
import static io.confluent.kafkarest.ratelimit.RateLimitExceededException.ErrorCodes.PERMITS_MAX_PER_CLUSTER_LIMIT_EXCEEDED;
import static java.util.Objects.requireNonNull;

import com.google.common.cache.LoadingCache;
import io.confluent.kafkarest.requestlog.CustomLogRequestAttributes;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;

Expand All @@ -29,6 +32,7 @@
* io.confluent.kafkarest.KafkaRestConfig#RATE_LIMIT_DEFAULT_COST_CONFIG} configs.
*/
final class FixedCostRateLimitRequestFilter implements ContainerRequestFilter {

private final RequestRateLimiter genericRateLimiter;
private final int cost;
private final LoadingCache<String, RequestRateLimiter> perClusterRateLimiterCache;
Expand All @@ -49,9 +53,28 @@ public void filter(ContainerRequestContext requestContext) {
String clusterId = requestContext.getUriInfo().getPathParameters(true).getFirst("clusterId");
if (clusterId != null) {
RequestRateLimiter rateLimiter = perClusterRateLimiterCache.getUnchecked(clusterId);
rateLimiter.rateLimit(cost);
try {
rateLimiter.rateLimit(cost);
} catch (RateLimitExceededException ex) {
// The setProperty() call below maps to HttpServletRequest.setAttribute(), when Jersey is
// running in servlet environment, see
// https://github.com/eclipse-ee4j/jersey/blob/d60da249fdd06a5059472c6d9c1d8a757588e710/containers/jersey-servlet-core/src/main/java/org/glassfish/jersey/servlet/ServletPropertiesDelegate.java#L29
requestContext.setProperty(
CustomLogRequestAttributes.REST_ERROR_CODE, PERMITS_MAX_PER_CLUSTER_LIMIT_EXCEEDED);
throw ex;
}
}

try {
// apply generic (global) rate limiter
genericRateLimiter.rateLimit(cost);
} catch (RateLimitExceededException ex) {
// The setProperty() call below maps to HttpServletRequest.setAttribute, when Jersey is
// running in servlet environment, see
// https://github.com/eclipse-ee4j/jersey/blob/d60da249fdd06a5059472c6d9c1d8a757588e710/containers/jersey-servlet-core/src/main/java/org/glassfish/jersey/servlet/ServletPropertiesDelegate.java#L29
requestContext.setProperty(
CustomLogRequestAttributes.REST_ERROR_CODE, PERMITS_MAX_GLOBAL_LIMIT_EXCEEDED);
throw ex;
}
// apply generic (global) rate limiter
genericRateLimiter.rateLimit(cost);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,27 @@
*/
public final class RateLimitExceededException extends StatusCodeException {

/*
* These error-codes signify which rate-limit got exceeded, and then
* logged with request in CustomLog.
*/
public static class ErrorCodes {

// Only apply to Produce APIs.
public static final int PRODUCE_MAX_REQUESTS_GLOBAL_LIMIT_EXCEEDED = 429001;
public static final int PRODUCE_MAX_REQUESTS_PER_TENANT_LIMIT_EXCEEDED = 429002;
public static final int PRODUCE_MAX_BYTES_GLOBAL_LIMIT_EXCEEDED = 429003;
public static final int PRODUCE_MAX_BYTES_PER_TENANT_LIMIT_EXCEEDED = 429004;

// Only apply to Admin APIs for now.
public static final int PERMITS_MAX_GLOBAL_LIMIT_EXCEEDED = 429005;
public static final int PERMITS_MAX_PER_CLUSTER_LIMIT_EXCEEDED = 429006;

// Apply to all APIs, via the Dos-filters at Jetty layer.
public static final int DOS_FILTER_MAX_REQUEST_LIMIT_EXCEEDED = 429007;
public static final int DOS_FILTER_MAX_REQUEST_PER_CONNECTION_LIMIT_EXCEEDED = 429008;
}

public RateLimitExceededException() {
super(
Response.Status.TOO_MANY_REQUESTS,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2023 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.kafkarest.requestlog;

import org.eclipse.jetty.server.CustomRequestLog;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.RequestLog;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* CustomLog implements Jetty's RequestLog interface. It offers the same log-format jetty's
* CustomRequestLog. Additionally, it would append configured request-attributes (see
* requestAttributesToLog) to the end of the log. NOTE - this needs to extend
* AbstractLifeCycle(implicitly implement LifeCycle) to correctly initialise CustomRequestLog which
* implements LifeCycle. This in turn will correctly initialise the input RequestLog.Writer, if it
* also implements LifeCycle.
*/
public class CustomLog extends AbstractLifeCycle implements RequestLog {

private static final Logger log = LoggerFactory.getLogger(CustomLog.class);

private final CustomRequestLog delegateJettyLog;

private final String[] requestAttributesToLog;

public CustomLog(RequestLog.Writer writer, String formatString, String[] requestAttributesToLog) {
for (String attr : requestAttributesToLog) {
// Add format-specifier to log request-attributes as response-headers in Jetty's
// CustomRequestLog.
formatString += " %{" + attr + "}o";
}
this.requestAttributesToLog = requestAttributesToLog;
this.delegateJettyLog = new CustomRequestLog(writer, formatString);
}

@Override
protected synchronized void doStart() throws Exception {
if (this.delegateJettyLog != null) {
this.delegateJettyLog.start();
}
}

@Override
protected void doStop() throws Exception {
if (this.delegateJettyLog != null) {
this.delegateJettyLog.stop();
}
}

@Override
public void log(Request request, Response response) {
// The configured request-attributes are converted to response-headers so Jetty can log them.
// Request-attributes are chosen to propagate custom-info to the request-log, as
// 1. Its idiomatic as per ServletRequest(which is implemented by Jetty's request).
// 2. Places like dosfilter-listeners, ex - GlobalJettyDosFilterListener, only request is
// readily available(Vs response).
// Unfortunately Jetty doesn't provide a way to log request-attributes, hence they are converted
// to response-headers, which can be logged.
for (String attr : this.requestAttributesToLog) {
Object attrVal = request.getAttribute(attr);
if (attrVal != null) {
request.removeAttribute(attr);
response.setHeader(attr, attrVal.toString());
}
}

try {
delegateJettyLog.log(request, response);
} catch (Exception e) {
log.debug(
"Logging with Jetty's CustomRequestLogFailed with exception {}, stack is \n{}",
e,
e.getStackTrace());
} finally {
// Remove the response-headers that were added above just for logging.
for (String attr : this.requestAttributesToLog) {
response.getHttpFields().remove(attr);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2023 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License
*/

package io.confluent.kafkarest.requestlog;

/**
* This class lists the request-attributes that are used to propagate custom-info that will be added
* to the request-log(see CustomLog.java).
*/
public final class CustomLogRequestAttributes {

private CustomLogRequestAttributes() {}

public static final String REST_ERROR_CODE = "REST_ERROR_CODE";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2023 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.kafkarest.requestlog;

import io.confluent.kafkarest.ratelimit.RateLimitExceededException.ErrorCodes;
import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.servlets.DoSFilter;
import org.eclipse.jetty.servlets.DoSFilter.Action;
import org.eclipse.jetty.servlets.DoSFilter.OverLimit;

/**
* This class is a Jetty DosFilter.Listener, for the global-dos filter. This on 429s will populate
* relevant metadata as attributed on the request, that later-on can be logged.
*/
public class GlobalDosFilterListener extends DoSFilter.Listener {

@Override
public Action onRequestOverLimit(
HttpServletRequest request, OverLimit overlimit, DoSFilter dosFilter) {
// KREST-10418: we don't use super function to get action object because
// it will log a WARN line, in order to reduce verbosity
Action action = Action.fromDelay(dosFilter.getDelayMs());
if (action.equals(Action.REJECT)) {
request.setAttribute(
CustomLogRequestAttributes.REST_ERROR_CODE,
ErrorCodes.DOS_FILTER_MAX_REQUEST_LIMIT_EXCEEDED);
}
return action;
}
}