Skip to content

Commit

Permalink
[improve][broker] Omit making a copy of CommandAck when there are no …
Browse files Browse the repository at this point in the history
…broker interceptors (#18997)
  • Loading branch information
lhotari committed Dec 20, 2022
1 parent 154d352 commit 1154d0a
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -779,8 +779,12 @@ public void start() throws PulsarServerException {
this.defaultOffloader = createManagedLedgerOffloader(defaultOffloadPolicies);

this.brokerInterceptor = BrokerInterceptors.load(config);
brokerService.setInterceptor(getBrokerInterceptor());
this.brokerInterceptor.initialize(this);
// use getter to support mocking getBrokerInterceptor method in tests
BrokerInterceptor interceptor = getBrokerInterceptor();
if (interceptor != null) {
brokerService.setInterceptor(interceptor);
interceptor.initialize(this);
}
brokerService.start();

// Load additional servlets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,44 +231,6 @@ default void onFilter(ServletRequest request, ServletResponse response, FilterCh
*/
void initialize(PulsarService pulsarService) throws Exception;

BrokerInterceptor DISABLED = new BrokerInterceptorDisabled();

/**
* Broker interceptor disabled implementation.
*/
class BrokerInterceptorDisabled implements BrokerInterceptor {

@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
// no-op
}

@Override
public void onConnectionClosed(ServerCnx cnx) {
// no-op
}

@Override
public void onWebserviceRequest(ServletRequest request) {
// no-op
}

@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
// no-op
}

@Override
public void initialize(PulsarService pulsarService) throws Exception {
// no-op
}

@Override
public void close() {
// no-op
}
}

/**
* Close this broker interceptor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static BrokerInterceptor load(ServiceConfiguration conf) throws IOExcepti
if (interceptors != null && !interceptors.isEmpty()) {
return new BrokerInterceptors(interceptors);
} else {
return DISABLED;
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.Topic.PublishContext;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class Producer {
private final boolean userProvidedProducerName;
private final long producerId;
private final String appId;
private final BrokerInterceptor brokerInterceptor;
private Rate msgIn;
private Rate chunkedMessageRate;
// it records msg-drop rate only for non-persistent topic
Expand Down Expand Up @@ -156,6 +158,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
this.topicEpoch = topicEpoch;

this.clientAddress = cnx.clientSourceAddress();
this.brokerInterceptor = cnx.getBrokerService().getInterceptor();
}

/**
Expand Down Expand Up @@ -271,8 +274,10 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, l
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(),
batchSize, isChunked, System.nanoTime(), isMarker, position);
this.cnx.getBrokerService().getInterceptor()
.onMessagePublish(this, headersAndPayload, messagePublishContext);
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
}
topic.publishMessage(headersAndPayload, messagePublishContext);
}

Expand All @@ -281,8 +286,10 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenc
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker, position);
this.cnx.getBrokerService().getInterceptor()
.onMessagePublish(this, headersAndPayload, messagePublishContext);
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
}
topic.publishMessage(headersAndPayload, messagePublishContext);
}

Expand Down Expand Up @@ -538,8 +545,10 @@ public void run() {
producer.chunkedMessageRate.recordEvent();
}
producer.publishOperationCompleted();
producer.cnx.getBrokerService().getInterceptor().messageProduced(
(ServerCnx) producer.cnx, producer, startTimeNs, ledgerId, entryId, this);
if (producer.brokerInterceptor != null) {
producer.brokerInterceptor.messageProduced(
(ServerCnx) producer.cnx, producer, startTimeNs, ledgerId, entryId, this);
}
recycle();
}

Expand Down Expand Up @@ -806,8 +815,10 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null);
this.cnx.getBrokerService().getInterceptor()
.onMessagePublish(this, headersAndPayload, messagePublishContext);
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
}
topic.publishTxnMessage(txnID, headersAndPayload, messagePublishContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,12 @@ private void interceptAndWriteCommand(BaseCommand command) {
}

private void safeIntercept(BaseCommand command, ServerCnx cnx) {
try {
this.interceptor.onPulsarCommand(command, cnx);
} catch (Exception e) {
log.error("Failed to execute command {} on broker interceptor.", command.getType(), e);
if (this.interceptor != null) {
try {
this.interceptor.onPulsarCommand(command, cnx);
} catch (Exception e) {
log.error("Failed to execute command {} on broker interceptor.", command.getType(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import lombok.val;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
Expand Down Expand Up @@ -184,6 +183,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private final boolean enableSubscriptionPatternEvaluation;
private final int maxSubscriptionPatternLength;
private final TopicListService topicListService;
private final BrokerInterceptor brokerInterceptor;
private State state;
private volatile boolean isActive = true;
private String authRole = null;
Expand Down Expand Up @@ -296,6 +296,7 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
this.maxSubscriptionPatternLength = conf.getSubscriptionPatternMaxLength();
this.topicListService = new TopicListService(pulsar, this,
enableSubscriptionPatternEvaluation, maxSubscriptionPatternLength);
this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null;
}

@Override
Expand All @@ -312,7 +313,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
log.info("New connection from {}", remoteAddress);
this.ctx = ctx;
this.commandSender = new PulsarCommandSenderImpl(getBrokerService().getInterceptor(), this);
this.commandSender = new PulsarCommandSenderImpl(brokerInterceptor, this);
this.service.getPulsarStats().recordConnectionCreate();
cnxsPerThread.get().add(this);
}
Expand All @@ -323,8 +324,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
connectionController.decreaseConnection(ctx.channel().remoteAddress());
isActive = false;
log.info("Closed connection from {}", remoteAddress);
BrokerInterceptor brokerInterceptor = getBrokerService().getInterceptor();
brokerInterceptor.onConnectionClosed(this);
if (brokerInterceptor != null) {
brokerInterceptor.onConnectionClosed(this);
}

cnxsPerThread.get().remove(this);

Expand All @@ -338,7 +340,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
Producer producer = producerFuture.getNow(null);
producer.closeNow(true);
brokerInterceptor.producerClosed(this, producer, producer.getMetadata());
if (brokerInterceptor != null) {
brokerInterceptor.producerClosed(this, producer, producer.getMetadata());
}
}
});

Expand All @@ -352,7 +356,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Consumer consumer = consumerFuture.getNow(null);
try {
consumer.close();
brokerInterceptor.consumerClosed(this, consumer, consumer.getMetadata());
if (brokerInterceptor != null) {
brokerInterceptor.consumerClosed(this, consumer, consumer.getMetadata());
}
} catch (BrokerServiceException e) {
log.warn("Consumer {} was already closed: {}", consumer, e);
}
Expand Down Expand Up @@ -691,7 +697,9 @@ private void completeConnect(int clientProtoVersion, String clientVersion, boole
if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /* ignore default version: pulsar client */) {
this.clientVersion = clientVersion.intern();
}
getBrokerService().getInterceptor().onConnectionCreated(this);
if (brokerInterceptor != null) {
brokerInterceptor.onConnectionCreated(this);
}
}

// According to auth result, send newConnected or newAuthChallenge command.
Expand Down Expand Up @@ -1148,7 +1156,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
log.info("[{}] Created subscription on topic {} / {}",
remoteAddress, topicName, subscriptionName);
commandSender.sendSuccessResponse(requestId);
getBrokerService().getInterceptor().consumerCreated(this, consumer, metadata);
if (brokerInterceptor != null) {
brokerInterceptor.consumerCreated(this, consumer, metadata);
}
} else {
// The consumer future was completed before by a close command
try {
Expand Down Expand Up @@ -1491,8 +1501,10 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
newTopicEpoch, true /* producer is ready now */);
getBrokerService().getInterceptor().
producerCreated(this, producer, metadata);
if (brokerInterceptor != null) {
brokerInterceptor.
producerCreated(this, producer, metadata);
}
return;
} else {
// The producer's future was completed before by
Expand Down Expand Up @@ -1553,8 +1565,10 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
Optional.empty(), false/* producer is not ready now */);
getBrokerService().getInterceptor().
producerCreated(this, producer, metadata);
if (brokerInterceptor != null) {
brokerInterceptor.
producerCreated(this, producer, metadata);
}
}
});
}
Expand Down Expand Up @@ -1637,24 +1651,27 @@ protected void handleAck(CommandAck ack) {
final boolean hasRequestId = ack.hasRequestId();
final long requestId = hasRequestId ? ack.getRequestId() : 0;
final long consumerId = ack.getConsumerId();
final CommandAck finalAck = getBrokerService().getInterceptor() != null ? new CommandAck().copyFrom(ack) : null;
// It is necessary to make a copy of the CommandAck instance for the interceptor.
final CommandAck copyOfAckForInterceptor = brokerInterceptor != null ? new CommandAck().copyFrom(ack) : null;

if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
consumer.messageAcked(ack).thenRun(() -> {
if (hasRequestId) {
ctx.writeAndFlush(Commands.newAckResponse(
requestId, null, null, consumerId));
}
getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck);
if (hasRequestId) {
ctx.writeAndFlush(Commands.newAckResponse(
requestId, null, null, consumerId));
}
if (brokerInterceptor != null) {
brokerInterceptor.messageAcked(this, consumer, copyOfAckForInterceptor);
}
}).exceptionally(e -> {
if (hasRequestId) {
ctx.writeAndFlush(Commands.newAckResponse(requestId,
BrokerServiceException.getClientErrorCode(e),
e.getMessage(), consumerId));
}
return null;
});
if (hasRequestId) {
ctx.writeAndFlush(Commands.newAckResponse(requestId,
BrokerServiceException.getClientErrorCode(e),
e.getMessage(), consumerId));
}
return null;
});
}
}

Expand Down Expand Up @@ -1840,7 +1857,9 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) {
remoteAddress, producerId);
commandSender.sendSuccessResponse(requestId);
producers.remove(producerId, producerFuture);
getBrokerService().getInterceptor().producerClosed(this, producer, producer.getMetadata());
if (brokerInterceptor != null) {
brokerInterceptor.producerClosed(this, producer, producer.getMetadata());
}
});
}

Expand Down Expand Up @@ -1884,7 +1903,9 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
consumers.remove(consumerId, consumerFuture);
commandSender.sendSuccessResponse(requestId);
log.info("[{}] Closed consumer, consumerId={}", remoteAddress, consumerId);
getBrokerService().getInterceptor().consumerClosed(this, consumer, consumer.getMetadata());
if (brokerInterceptor != null) {
brokerInterceptor.consumerClosed(this, consumer, consumer.getMetadata());
}
} catch (BrokerServiceException e) {
log.warn("[{]] Error closing consumer {} : {}", remoteAddress, consumer, e);
commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
Expand Down Expand Up @@ -2710,7 +2731,9 @@ public ChannelHandlerContext ctx() {

@Override
protected void interceptCommand(BaseCommand command) throws InterceptException {
getBrokerService().getInterceptor().onPulsarCommand(command, this);
if (brokerInterceptor != null) {
brokerInterceptor.onPulsarCommand(command, this);
}
}

@Override
Expand Down Expand Up @@ -2993,12 +3016,15 @@ public ByteBufPair newMessageAndIntercept(long consumerId, long ledgerId, long e
ackSet, epoch);
ByteBufPair res = Commands.serializeCommandMessageWithSize(command, metadataAndPayload);
try {
val brokerInterceptor = getBrokerService().getInterceptor();
brokerInterceptor.onPulsarCommand(command, this);
if (brokerInterceptor != null) {
brokerInterceptor.onPulsarCommand(command, this);
}
CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
brokerInterceptor.messageDispatched(this, consumer, ledgerId, entryId, metadataAndPayload);
if (brokerInterceptor != null) {
brokerInterceptor.messageDispatched(this, consumer, ledgerId, entryId, metadataAndPayload);
}
}
} catch (Exception e) {
log.error("Exception occur when intercept messages.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo
}
try {
RequestWrapper requestWrapper = new RequestWrapper((HttpServletRequest) servletRequest);
interceptor.onWebserviceRequest(requestWrapper);
if (interceptor != null) {
interceptor.onWebserviceRequest(requestWrapper);
}
filterChain.doFilter(requestWrapper, servletResponse);
} catch (InterceptException e) {
exceptionHandler.handle(servletResponse, e);
Expand Down
Loading

0 comments on commit 1154d0a

Please sign in to comment.