From 42ab00ffb82464765b708fca4c43b43869886e29 Mon Sep 17 00:00:00 2001 From: guangning Date: Thu, 15 Dec 2022 17:17:07 +0800 Subject: [PATCH 1/6] Update interceptor handler exception --- .../pulsar/broker/web/ExceptionHandler.java | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java index 16eda7fc58c22..33cb5ab4a965a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.web; +import com.google.gson.Gson; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -25,6 +26,10 @@ import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.Response; import org.apache.pulsar.common.intercept.InterceptException; +import org.apache.pulsar.common.policies.data.ErrorData; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; @@ -35,16 +40,21 @@ public class ExceptionHandler { public void handle(ServletResponse response, Exception ex) throws IOException { if (ex instanceof InterceptException) { - String reason = ex.getMessage(); - byte[] content = reason.getBytes(StandardCharsets.UTF_8); - MetaData.Response info = new MetaData.Response(); + String errorData = new Gson().toJson(new ErrorData(ex.getMessage())); + byte[] errorBytes = errorData.getBytes(StandardCharsets.UTF_8); + int errCode = ((InterceptException) ex).getErrorCode(); + HttpFields httpFields = new HttpFields(); + HttpField httpField = new HttpField(HttpHeader.CONTENT_TYPE, "application/json;charset=utf-8"); + httpFields.add(httpField); + MetaData.Response info = new MetaData.Response(HttpVersion.HTTP_1_1, errCode, httpFields); info.setHttpVersion(HttpVersion.HTTP_1_1); - info.setReason(reason); - info.setStatus(((InterceptException) ex).getErrorCode()); - info.setContentLength(content.length); + info.setReason(errorData); + info.setStatus(errCode); + info.setContentLength(errorBytes.length); if (response instanceof org.eclipse.jetty.server.Response) { ((org.eclipse.jetty.server.Response) response).getHttpChannel().sendResponse(info, - ByteBuffer.wrap(content), true); + ByteBuffer.wrap(errorBytes), + true); } else { ((HttpServletResponse) response).sendError(((InterceptException) ex).getErrorCode(), ex.getMessage()); From 2ad5376dc424ba8c42cc1e362a82c50a1dd0dbaf Mon Sep 17 00:00:00 2001 From: guangning Date: Thu, 15 Dec 2022 17:39:28 +0800 Subject: [PATCH 2/6] Fixed json ser function --- .../org/apache/pulsar/broker/web/ExceptionHandler.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java index 33cb5ab4a965a..d3c9217f581d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.web; -import com.google.gson.Gson; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -27,6 +26,7 @@ import javax.ws.rs.core.Response; import org.apache.pulsar.common.intercept.InterceptException; import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; @@ -40,16 +40,16 @@ public class ExceptionHandler { public void handle(ServletResponse response, Exception ex) throws IOException { if (ex instanceof InterceptException) { - String errorData = new Gson().toJson(new ErrorData(ex.getMessage())); + String errorData = ObjectMapperFactory.getThreadLocal().writeValueAsString(new ErrorData(ex.getMessage())); byte[] errorBytes = errorData.getBytes(StandardCharsets.UTF_8); - int errCode = ((InterceptException) ex).getErrorCode(); + int errorCode = ((InterceptException) ex).getErrorCode(); HttpFields httpFields = new HttpFields(); HttpField httpField = new HttpField(HttpHeader.CONTENT_TYPE, "application/json;charset=utf-8"); httpFields.add(httpField); - MetaData.Response info = new MetaData.Response(HttpVersion.HTTP_1_1, errCode, httpFields); + MetaData.Response info = new MetaData.Response(HttpVersion.HTTP_1_1, errorCode, httpFields); info.setHttpVersion(HttpVersion.HTTP_1_1); info.setReason(errorData); - info.setStatus(errCode); + info.setStatus(errorCode); info.setContentLength(errorBytes.length); if (response instanceof org.eclipse.jetty.server.Response) { ((org.eclipse.jetty.server.Response) response).getHttpChannel().sendResponse(info, From 42dac5bb2b4ff0dcc78b118cf0cd5829e5542f52 Mon Sep 17 00:00:00 2001 From: guangning Date: Sun, 18 Dec 2022 10:09:59 +0800 Subject: [PATCH 3/6] Remove logic to jetty.server.Response for avoid unnecessary checks --- .../pulsar/broker/web/ExceptionHandler.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java index d3c9217f581d1..036f12f26c628 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java @@ -40,18 +40,18 @@ public class ExceptionHandler { public void handle(ServletResponse response, Exception ex) throws IOException { if (ex instanceof InterceptException) { - String errorData = ObjectMapperFactory.getThreadLocal().writeValueAsString(new ErrorData(ex.getMessage())); - byte[] errorBytes = errorData.getBytes(StandardCharsets.UTF_8); - int errorCode = ((InterceptException) ex).getErrorCode(); - HttpFields httpFields = new HttpFields(); - HttpField httpField = new HttpField(HttpHeader.CONTENT_TYPE, "application/json;charset=utf-8"); - httpFields.add(httpField); - MetaData.Response info = new MetaData.Response(HttpVersion.HTTP_1_1, errorCode, httpFields); - info.setHttpVersion(HttpVersion.HTTP_1_1); - info.setReason(errorData); - info.setStatus(errorCode); - info.setContentLength(errorBytes.length); if (response instanceof org.eclipse.jetty.server.Response) { + String errorData = ObjectMapperFactory.getThreadLocal().writeValueAsString(new ErrorData(ex.getMessage())); + byte[] errorBytes = errorData.getBytes(StandardCharsets.UTF_8); + int errorCode = ((InterceptException) ex).getErrorCode(); + HttpFields httpFields = new HttpFields(); + HttpField httpField = new HttpField(HttpHeader.CONTENT_TYPE, "application/json;charset=utf-8"); + httpFields.add(httpField); + MetaData.Response info = new MetaData.Response(HttpVersion.HTTP_1_1, errorCode, httpFields); + info.setHttpVersion(HttpVersion.HTTP_1_1); + info.setReason(errorData); + info.setStatus(errorCode); + info.setContentLength(errorBytes.length); ((org.eclipse.jetty.server.Response) response).getHttpChannel().sendResponse(info, ByteBuffer.wrap(errorBytes), true); From e6fe0e2813fba403650c373c8f4f291dc1ffc018 Mon Sep 17 00:00:00 2001 From: guangning Date: Sun, 18 Dec 2022 11:42:49 +0800 Subject: [PATCH 4/6] Update style --- .../java/org/apache/pulsar/broker/web/ExceptionHandler.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java index 036f12f26c628..aed6fbdc10cca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java @@ -41,7 +41,8 @@ public class ExceptionHandler { public void handle(ServletResponse response, Exception ex) throws IOException { if (ex instanceof InterceptException) { if (response instanceof org.eclipse.jetty.server.Response) { - String errorData = ObjectMapperFactory.getThreadLocal().writeValueAsString(new ErrorData(ex.getMessage())); + String errorData = ObjectMapperFactory + .getThreadLocal().writeValueAsString(new ErrorData(ex.getMessage())); byte[] errorBytes = errorData.getBytes(StandardCharsets.UTF_8); int errorCode = ((InterceptException) ex).getErrorCode(); HttpFields httpFields = new HttpFields(); From b264f721d99f9a7533e1fcf448969f4f68891a20 Mon Sep 17 00:00:00 2001 From: guangning Date: Sun, 18 Dec 2022 16:28:39 +0800 Subject: [PATCH 5/6] Add test for interceptor --- .../intercept/BrokerInterceptorTest.java | 31 +++++++++++++++++++ .../intercept/CounterBrokerInterceptor.java | 20 ++++++++++-- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java index 4b12fbdf1f4f8..c1543abdf4184 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.intercept; +import com.google.common.collect.Sets; import lombok.Cleanup; import okhttp3.Call; import okhttp3.Callback; @@ -32,6 +33,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -40,7 +42,9 @@ import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -269,8 +273,35 @@ public void onResponse(Call call, Response response) throws IOException { Awaitility.await().until(() -> !interceptor.getResponseList().isEmpty()); CounterBrokerInterceptor.ResponseEvent responseEvent = interceptor.getResponseList().get(0); Assert.assertEquals(responseEvent.getRequestUri(), "/admin/v3/test/asyncGet/my-topic/1000"); + Assert.assertEquals(responseEvent.getResponseStatus(), javax.ws.rs.core.Response.noContent().build().getStatus()); } + public void requestInterceptorFailedTest() { + Set allowedClusters = new HashSet<>(); + allowedClusters.add(configClusterName); + TenantInfoImpl tenantInfo = new TenantInfoImpl(new HashSet<>(), allowedClusters); + try { + admin.tenants().createTenant("test-interceptor-failed-tenant", tenantInfo); + Assert.fail("Create tenant because interceptor should fail"); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getHttpError(), "Create tenant failed"); + } + + try { + admin.namespaces().createNamespace("public/test-interceptor-failed-namespace"); + Assert.fail("Create namespace because interceptor should fail"); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getHttpError(), "Create namespace failed"); + } + + try { + admin.topics().createNonPartitionedTopic("persistent://public/default/test-interceptor-failed-topic"); + Assert.fail("Create topic because interceptor should fail"); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getHttpError(), "Create topic failed"); + } + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java index e436444ec7c10..6cda131a67e74 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java @@ -33,6 +33,7 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; +import org.apache.http.HttpStatus; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Producer; @@ -43,6 +44,7 @@ import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.TxnAction; +import org.apache.pulsar.common.intercept.InterceptException; import org.eclipse.jetty.server.Response; @@ -216,10 +218,20 @@ public void onConnectionClosed(ServerCnx cnx) { } @Override - public void onWebserviceRequest(ServletRequest request) { + public void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException { count.incrementAndGet(); + String url = ((HttpServletRequest) request).getRequestURL().toString(); if (log.isDebugEnabled()) { - log.debug("[{}] On [{}] Webservice request", count, ((HttpServletRequest) request).getRequestURL().toString()); + log.debug("[{}] On [{}] Webservice request", count, url); + } + if (url.contains("/admin/v2/tenants/test-interceptor-failed-tenant")) { + throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create tenant failed"); + } + if (url.contains("/admin/v2/namespaces/public/test-interceptor-failed-namespace")) { + throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create namespace failed"); + } + if (url.contains("/admin/v2/persistent/public/default/test-interceptor-failed-topic")) { + throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create topic failed"); } } @@ -227,7 +239,8 @@ public void onWebserviceRequest(ServletRequest request) { public void onWebserviceResponse(ServletRequest request, ServletResponse response) { count.incrementAndGet(); if (log.isDebugEnabled()) { - log.debug("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest) request).getRequestURL().toString(), response); + log.debug("[{}] On [{}] Webservice response {}", + count, ((HttpServletRequest) request).getRequestURL().toString(), response); } if (response instanceof Response) { Response res = (Response) response; @@ -235,6 +248,7 @@ public void onWebserviceResponse(ServletRequest request, ServletResponse respons } } + @Override public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { From 3b1963f6f8d8e9e7fb5b73175d533e676fbb86d0 Mon Sep 17 00:00:00 2001 From: guangning Date: Sun, 18 Dec 2022 17:15:35 +0800 Subject: [PATCH 6/6] Fixed import --- .../apache/pulsar/broker/intercept/BrokerInterceptorTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java index c1543abdf4184..bc6cc6fc5be28 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.intercept; -import com.google.common.collect.Sets; import lombok.Cleanup; import okhttp3.Call; import okhttp3.Callback;