From 1154d0a8703bcf3fbc6e0c6f9df1f189ae09ef64 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 20 Dec 2022 20:47:52 +0200 Subject: [PATCH] [improve][broker] Omit making a copy of CommandAck when there are no broker interceptors (#18997) --- .../apache/pulsar/broker/PulsarService.java | 8 +- .../broker/intercept/BrokerInterceptor.java | 38 -------- .../broker/intercept/BrokerInterceptors.java | 2 +- .../pulsar/broker/service/Producer.java | 27 ++++-- .../service/PulsarCommandSenderImpl.java | 10 ++- .../pulsar/broker/service/ServerCnx.java | 88 ++++++++++++------- .../pulsar/broker/web/PreInterceptFilter.java | 4 +- .../service/MessageCumulativeAckTest.java | 14 +-- 8 files changed, 99 insertions(+), 92 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 0a49d1092d3dc..4bde8e90cfee3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -779,8 +779,12 @@ public void start() throws PulsarServerException { this.defaultOffloader = createManagedLedgerOffloader(defaultOffloadPolicies); this.brokerInterceptor = BrokerInterceptors.load(config); - brokerService.setInterceptor(getBrokerInterceptor()); - this.brokerInterceptor.initialize(this); + // use getter to support mocking getBrokerInterceptor method in tests + BrokerInterceptor interceptor = getBrokerInterceptor(); + if (interceptor != null) { + brokerService.setInterceptor(interceptor); + interceptor.initialize(this); + } brokerService.start(); // Load additional servlets diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java index 08b6c1559e5f3..0ade5e0b91bb3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java @@ -231,44 +231,6 @@ default void onFilter(ServletRequest request, ServletResponse response, FilterCh */ void initialize(PulsarService pulsarService) throws Exception; - BrokerInterceptor DISABLED = new BrokerInterceptorDisabled(); - - /** - * Broker interceptor disabled implementation. - */ - class BrokerInterceptorDisabled implements BrokerInterceptor { - - @Override - public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException { - // no-op - } - - @Override - public void onConnectionClosed(ServerCnx cnx) { - // no-op - } - - @Override - public void onWebserviceRequest(ServletRequest request) { - // no-op - } - - @Override - public void onWebserviceResponse(ServletRequest request, ServletResponse response) { - // no-op - } - - @Override - public void initialize(PulsarService pulsarService) throws Exception { - // no-op - } - - @Override - public void close() { - // no-op - } - } - /** * Close this broker interceptor. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java index e2e6b2e051b72..e7f82742a97cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java @@ -89,7 +89,7 @@ public static BrokerInterceptor load(ServiceConfiguration conf) throws IOExcepti if (interceptors != null && !interceptors.isEmpty()) { return new BrokerInterceptors(interceptors); } else { - return DISABLED; + return null; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index bc101e31d2723..5b62e3261e64f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; import org.apache.pulsar.broker.service.Topic.PublishContext; @@ -70,6 +71,7 @@ public class Producer { private final boolean userProvidedProducerName; private final long producerId; private final String appId; + private final BrokerInterceptor brokerInterceptor; private Rate msgIn; private Rate chunkedMessageRate; // it records msg-drop rate only for non-persistent topic @@ -156,6 +158,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN this.topicEpoch = topicEpoch; this.clientAddress = cnx.clientSourceAddress(); + this.brokerInterceptor = cnx.getBrokerService().getInterceptor(); } /** @@ -271,8 +274,10 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, l MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, position); - this.cnx.getBrokerService().getInterceptor() - .onMessagePublish(this, headersAndPayload, messagePublishContext); + if (brokerInterceptor != null) { + brokerInterceptor + .onMessagePublish(this, headersAndPayload, messagePublishContext); + } topic.publishMessage(headersAndPayload, messagePublishContext); } @@ -281,8 +286,10 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenc MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId, highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, position); - this.cnx.getBrokerService().getInterceptor() - .onMessagePublish(this, headersAndPayload, messagePublishContext); + if (brokerInterceptor != null) { + brokerInterceptor + .onMessagePublish(this, headersAndPayload, messagePublishContext); + } topic.publishMessage(headersAndPayload, messagePublishContext); } @@ -538,8 +545,10 @@ public void run() { producer.chunkedMessageRate.recordEvent(); } producer.publishOperationCompleted(); - producer.cnx.getBrokerService().getInterceptor().messageProduced( - (ServerCnx) producer.cnx, producer, startTimeNs, ledgerId, entryId, this); + if (producer.brokerInterceptor != null) { + producer.brokerInterceptor.messageProduced( + (ServerCnx) producer.cnx, producer, startTimeNs, ledgerId, entryId, this); + } recycle(); } @@ -806,8 +815,10 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null); - this.cnx.getBrokerService().getInterceptor() - .onMessagePublish(this, headersAndPayload, messagePublishContext); + if (brokerInterceptor != null) { + brokerInterceptor + .onMessagePublish(this, headersAndPayload, messagePublishContext); + } topic.publishTxnMessage(txnID, headersAndPayload, messagePublishContext); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java index 2bc933e75fd88..b5f4d17801cf2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java @@ -381,10 +381,12 @@ private void interceptAndWriteCommand(BaseCommand command) { } private void safeIntercept(BaseCommand command, ServerCnx cnx) { - try { - this.interceptor.onPulsarCommand(command, cnx); - } catch (Exception e) { - log.error("Failed to execute command {} on broker interceptor.", command.getType(), e); + if (this.interceptor != null) { + try { + this.interceptor.onPulsarCommand(command, cnx); + } catch (Exception e) { + log.error("Failed to execute command {} on broker interceptor.", command.getType(), e); + } } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 88e51341a8c8f..1239bf72c52bc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -59,7 +59,6 @@ import java.util.stream.Collectors; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; -import lombok.val; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -184,6 +183,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private final boolean enableSubscriptionPatternEvaluation; private final int maxSubscriptionPatternLength; private final TopicListService topicListService; + private final BrokerInterceptor brokerInterceptor; private State state; private volatile boolean isActive = true; private String authRole = null; @@ -296,6 +296,7 @@ public ServerCnx(PulsarService pulsar, String listenerName) { this.maxSubscriptionPatternLength = conf.getSubscriptionPatternMaxLength(); this.topicListService = new TopicListService(pulsar, this, enableSubscriptionPatternEvaluation, maxSubscriptionPatternLength); + this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null; } @Override @@ -312,7 +313,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { } log.info("New connection from {}", remoteAddress); this.ctx = ctx; - this.commandSender = new PulsarCommandSenderImpl(getBrokerService().getInterceptor(), this); + this.commandSender = new PulsarCommandSenderImpl(brokerInterceptor, this); this.service.getPulsarStats().recordConnectionCreate(); cnxsPerThread.get().add(this); } @@ -323,8 +324,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { connectionController.decreaseConnection(ctx.channel().remoteAddress()); isActive = false; log.info("Closed connection from {}", remoteAddress); - BrokerInterceptor brokerInterceptor = getBrokerService().getInterceptor(); - brokerInterceptor.onConnectionClosed(this); + if (brokerInterceptor != null) { + brokerInterceptor.onConnectionClosed(this); + } cnxsPerThread.get().remove(this); @@ -338,7 +340,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) { Producer producer = producerFuture.getNow(null); producer.closeNow(true); - brokerInterceptor.producerClosed(this, producer, producer.getMetadata()); + if (brokerInterceptor != null) { + brokerInterceptor.producerClosed(this, producer, producer.getMetadata()); + } } }); @@ -352,7 +356,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { Consumer consumer = consumerFuture.getNow(null); try { consumer.close(); - brokerInterceptor.consumerClosed(this, consumer, consumer.getMetadata()); + if (brokerInterceptor != null) { + brokerInterceptor.consumerClosed(this, consumer, consumer.getMetadata()); + } } catch (BrokerServiceException e) { log.warn("Consumer {} was already closed: {}", consumer, e); } @@ -691,7 +697,9 @@ private void completeConnect(int clientProtoVersion, String clientVersion, boole if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /* ignore default version: pulsar client */) { this.clientVersion = clientVersion.intern(); } - getBrokerService().getInterceptor().onConnectionCreated(this); + if (brokerInterceptor != null) { + brokerInterceptor.onConnectionCreated(this); + } } // According to auth result, send newConnected or newAuthChallenge command. @@ -1148,7 +1156,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName, subscriptionName); commandSender.sendSuccessResponse(requestId); - getBrokerService().getInterceptor().consumerCreated(this, consumer, metadata); + if (brokerInterceptor != null) { + brokerInterceptor.consumerCreated(this, consumer, metadata); + } } else { // The consumer future was completed before by a close command try { @@ -1491,8 +1501,10 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ commandSender.sendProducerSuccessResponse(requestId, producerName, producer.getLastSequenceId(), producer.getSchemaVersion(), newTopicEpoch, true /* producer is ready now */); - getBrokerService().getInterceptor(). - producerCreated(this, producer, metadata); + if (brokerInterceptor != null) { + brokerInterceptor. + producerCreated(this, producer, metadata); + } return; } else { // The producer's future was completed before by @@ -1553,8 +1565,10 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ commandSender.sendProducerSuccessResponse(requestId, producerName, producer.getLastSequenceId(), producer.getSchemaVersion(), Optional.empty(), false/* producer is not ready now */); - getBrokerService().getInterceptor(). - producerCreated(this, producer, metadata); + if (brokerInterceptor != null) { + brokerInterceptor. + producerCreated(this, producer, metadata); + } } }); } @@ -1637,24 +1651,27 @@ protected void handleAck(CommandAck ack) { final boolean hasRequestId = ack.hasRequestId(); final long requestId = hasRequestId ? ack.getRequestId() : 0; final long consumerId = ack.getConsumerId(); - final CommandAck finalAck = getBrokerService().getInterceptor() != null ? new CommandAck().copyFrom(ack) : null; + // It is necessary to make a copy of the CommandAck instance for the interceptor. + final CommandAck copyOfAckForInterceptor = brokerInterceptor != null ? new CommandAck().copyFrom(ack) : null; if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { Consumer consumer = consumerFuture.getNow(null); consumer.messageAcked(ack).thenRun(() -> { - if (hasRequestId) { - ctx.writeAndFlush(Commands.newAckResponse( - requestId, null, null, consumerId)); - } - getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck); + if (hasRequestId) { + ctx.writeAndFlush(Commands.newAckResponse( + requestId, null, null, consumerId)); + } + if (brokerInterceptor != null) { + brokerInterceptor.messageAcked(this, consumer, copyOfAckForInterceptor); + } }).exceptionally(e -> { - if (hasRequestId) { - ctx.writeAndFlush(Commands.newAckResponse(requestId, - BrokerServiceException.getClientErrorCode(e), - e.getMessage(), consumerId)); - } - return null; - }); + if (hasRequestId) { + ctx.writeAndFlush(Commands.newAckResponse(requestId, + BrokerServiceException.getClientErrorCode(e), + e.getMessage(), consumerId)); + } + return null; + }); } } @@ -1840,7 +1857,9 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) { remoteAddress, producerId); commandSender.sendSuccessResponse(requestId); producers.remove(producerId, producerFuture); - getBrokerService().getInterceptor().producerClosed(this, producer, producer.getMetadata()); + if (brokerInterceptor != null) { + brokerInterceptor.producerClosed(this, producer, producer.getMetadata()); + } }); } @@ -1884,7 +1903,9 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { consumers.remove(consumerId, consumerFuture); commandSender.sendSuccessResponse(requestId); log.info("[{}] Closed consumer, consumerId={}", remoteAddress, consumerId); - getBrokerService().getInterceptor().consumerClosed(this, consumer, consumer.getMetadata()); + if (brokerInterceptor != null) { + brokerInterceptor.consumerClosed(this, consumer, consumer.getMetadata()); + } } catch (BrokerServiceException e) { log.warn("[{]] Error closing consumer {} : {}", remoteAddress, consumer, e); commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage()); @@ -2710,7 +2731,9 @@ public ChannelHandlerContext ctx() { @Override protected void interceptCommand(BaseCommand command) throws InterceptException { - getBrokerService().getInterceptor().onPulsarCommand(command, this); + if (brokerInterceptor != null) { + brokerInterceptor.onPulsarCommand(command, this); + } } @Override @@ -2993,12 +3016,15 @@ public ByteBufPair newMessageAndIntercept(long consumerId, long ledgerId, long e ackSet, epoch); ByteBufPair res = Commands.serializeCommandMessageWithSize(command, metadataAndPayload); try { - val brokerInterceptor = getBrokerService().getInterceptor(); - brokerInterceptor.onPulsarCommand(command, this); + if (brokerInterceptor != null) { + brokerInterceptor.onPulsarCommand(command, this); + } CompletableFuture consumerFuture = consumers.get(consumerId); if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { Consumer consumer = consumerFuture.getNow(null); - brokerInterceptor.messageDispatched(this, consumer, ledgerId, entryId, metadataAndPayload); + if (brokerInterceptor != null) { + brokerInterceptor.messageDispatched(this, consumer, ledgerId, entryId, metadataAndPayload); + } } } catch (Exception e) { log.error("Exception occur when intercept messages.", e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java index e760c64d986f8..1ebea67d6036d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java @@ -66,7 +66,9 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo } try { RequestWrapper requestWrapper = new RequestWrapper((HttpServletRequest) servletRequest); - interceptor.onWebserviceRequest(requestWrapper); + if (interceptor != null) { + interceptor.onWebserviceRequest(requestWrapper); + } filterChain.doFilter(requestWrapper, servletResponse); } catch (InterceptException e) { exceptionHandler.handle(servletResponse, e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java index 6b106bfd47de7..0a227f6812c93 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java @@ -99,6 +99,12 @@ public void setup() throws Exception { doReturn(pulsarResources).when(pulsar).getPulsarResources(); }); + eventLoopGroup = new NioEventLoopGroup(); + brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup); + PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { + doReturn(brokerService).when(pulsar).getBrokerService(); + }); + serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar); doReturn(true).when(serverCnx).isActive(); doReturn(true).when(serverCnx).isWritable(); @@ -106,13 +112,7 @@ public void setup() throws Exception { when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getValue()); when(serverCnx.ctx()).thenReturn(mock(ChannelHandlerContext.class)); doReturn(new PulsarCommandSenderImpl(null, serverCnx)) - .when(serverCnx).getCommandSender(); - - eventLoopGroup = new NioEventLoopGroup(); - brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup); - PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { - doReturn(brokerService).when(pulsar).getBrokerService(); - }); + .when(serverCnx).getCommandSender(); String topicName = TopicName.get("MessageCumulativeAckTest").toString(); PersistentTopic persistentTopic = new PersistentTopic(topicName, mock(ManagedLedger.class), brokerService);