Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Omit making a copy of CommandAck when there are no broker interceptors #18997

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -779,8 +779,12 @@ public void start() throws PulsarServerException {
this.defaultOffloader = createManagedLedgerOffloader(defaultOffloadPolicies);

this.brokerInterceptor = BrokerInterceptors.load(config);
brokerService.setInterceptor(getBrokerInterceptor());
this.brokerInterceptor.initialize(this);
// use getter to support mocking getBrokerInterceptor method in tests
BrokerInterceptor interceptor = getBrokerInterceptor();
if (interceptor != null) {
brokerService.setInterceptor(interceptor);
interceptor.initialize(this);
}
brokerService.start();

// Load additional servlets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,44 +231,6 @@ default void onFilter(ServletRequest request, ServletResponse response, FilterCh
*/
void initialize(PulsarService pulsarService) throws Exception;

BrokerInterceptor DISABLED = new BrokerInterceptorDisabled();
lhotari marked this conversation as resolved.
Show resolved Hide resolved

/**
* Broker interceptor disabled implementation.
*/
class BrokerInterceptorDisabled implements BrokerInterceptor {

@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
// no-op
}

@Override
public void onConnectionClosed(ServerCnx cnx) {
// no-op
}

@Override
public void onWebserviceRequest(ServletRequest request) {
// no-op
}

@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
// no-op
}

@Override
public void initialize(PulsarService pulsarService) throws Exception {
// no-op
}

@Override
public void close() {
// no-op
}
}

/**
* Close this broker interceptor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static BrokerInterceptor load(ServiceConfiguration conf) throws IOExcepti
if (interceptors != null && !interceptors.isEmpty()) {
return new BrokerInterceptors(interceptors);
} else {
return DISABLED;
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.Topic.PublishContext;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class Producer {
private final boolean userProvidedProducerName;
private final long producerId;
private final String appId;
private final BrokerInterceptor brokerInterceptor;
private Rate msgIn;
private Rate chunkedMessageRate;
// it records msg-drop rate only for non-persistent topic
Expand Down Expand Up @@ -156,6 +158,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
this.topicEpoch = topicEpoch;

this.clientAddress = cnx.clientSourceAddress();
this.brokerInterceptor = cnx.getBrokerService().getInterceptor();
}

/**
Expand Down Expand Up @@ -271,8 +274,10 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, l
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(),
batchSize, isChunked, System.nanoTime(), isMarker, position);
this.cnx.getBrokerService().getInterceptor()
.onMessagePublish(this, headersAndPayload, messagePublishContext);
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
}
topic.publishMessage(headersAndPayload, messagePublishContext);
}

Expand All @@ -281,8 +286,10 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenc
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker, position);
this.cnx.getBrokerService().getInterceptor()
.onMessagePublish(this, headersAndPayload, messagePublishContext);
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
}
topic.publishMessage(headersAndPayload, messagePublishContext);
}

Expand Down Expand Up @@ -538,8 +545,10 @@ public void run() {
producer.chunkedMessageRate.recordEvent();
}
producer.publishOperationCompleted();
producer.cnx.getBrokerService().getInterceptor().messageProduced(
(ServerCnx) producer.cnx, producer, startTimeNs, ledgerId, entryId, this);
if (producer.brokerInterceptor != null) {
producer.brokerInterceptor.messageProduced(
(ServerCnx) producer.cnx, producer, startTimeNs, ledgerId, entryId, this);
}
recycle();
}

Expand Down Expand Up @@ -806,8 +815,10 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null);
this.cnx.getBrokerService().getInterceptor()
.onMessagePublish(this, headersAndPayload, messagePublishContext);
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
}
topic.publishTxnMessage(txnID, headersAndPayload, messagePublishContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,12 @@ private void interceptAndWriteCommand(BaseCommand command) {
}

private void safeIntercept(BaseCommand command, ServerCnx cnx) {
try {
this.interceptor.onPulsarCommand(command, cnx);
} catch (Exception e) {
log.error("Failed to execute command {} on broker interceptor.", command.getType(), e);
if (this.interceptor != null) {
try {
this.interceptor.onPulsarCommand(command, cnx);
} catch (Exception e) {
log.error("Failed to execute command {} on broker interceptor.", command.getType(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import lombok.val;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
Expand Down Expand Up @@ -178,6 +177,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private final boolean enableSubscriptionPatternEvaluation;
private final int maxSubscriptionPatternLength;
private final TopicListService topicListService;
private final BrokerInterceptor brokerInterceptor;
private State state;
private volatile boolean isActive = true;
private String authRole = null;
Expand Down Expand Up @@ -290,6 +290,7 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
this.maxSubscriptionPatternLength = conf.getSubscriptionPatternMaxLength();
this.topicListService = new TopicListService(pulsar, this,
enableSubscriptionPatternEvaluation, maxSubscriptionPatternLength);
this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null;
}

@Override
Expand All @@ -306,7 +307,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
log.info("New connection from {}", remoteAddress);
this.ctx = ctx;
this.commandSender = new PulsarCommandSenderImpl(getBrokerService().getInterceptor(), this);
this.commandSender = new PulsarCommandSenderImpl(brokerInterceptor, this);
this.service.getPulsarStats().recordConnectionCreate();
cnxsPerThread.get().add(this);
}
Expand All @@ -317,8 +318,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
connectionController.decreaseConnection(ctx.channel().remoteAddress());
isActive = false;
log.info("Closed connection from {}", remoteAddress);
BrokerInterceptor brokerInterceptor = getBrokerService().getInterceptor();
brokerInterceptor.onConnectionClosed(this);
if (brokerInterceptor != null) {
brokerInterceptor.onConnectionClosed(this);
}

cnxsPerThread.get().remove(this);

Expand All @@ -332,7 +334,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
Producer producer = producerFuture.getNow(null);
producer.closeNow(true);
brokerInterceptor.producerClosed(this, producer, producer.getMetadata());
if (brokerInterceptor != null) {
brokerInterceptor.producerClosed(this, producer, producer.getMetadata());
}
}
});

Expand All @@ -346,7 +350,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Consumer consumer = consumerFuture.getNow(null);
try {
consumer.close();
brokerInterceptor.consumerClosed(this, consumer, consumer.getMetadata());
if (brokerInterceptor != null) {
brokerInterceptor.consumerClosed(this, consumer, consumer.getMetadata());
}
} catch (BrokerServiceException e) {
log.warn("Consumer {} was already closed: {}", consumer, e);
}
Expand Down Expand Up @@ -685,7 +691,9 @@ private void completeConnect(int clientProtoVersion, String clientVersion, boole
if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /* ignore default version: pulsar client */) {
this.clientVersion = clientVersion.intern();
}
getBrokerService().getInterceptor().onConnectionCreated(this);
if (brokerInterceptor != null) {
brokerInterceptor.onConnectionCreated(this);
}
}

// According to auth result, send newConnected or newAuthChallenge command.
Expand Down Expand Up @@ -1142,7 +1150,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
log.info("[{}] Created subscription on topic {} / {}",
remoteAddress, topicName, subscriptionName);
commandSender.sendSuccessResponse(requestId);
getBrokerService().getInterceptor().consumerCreated(this, consumer, metadata);
if (brokerInterceptor != null) {
brokerInterceptor.consumerCreated(this, consumer, metadata);
}
} else {
// The consumer future was completed before by a close command
try {
Expand Down Expand Up @@ -1485,8 +1495,10 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
newTopicEpoch, true /* producer is ready now */);
getBrokerService().getInterceptor().
producerCreated(this, producer, metadata);
if (brokerInterceptor != null) {
brokerInterceptor.
producerCreated(this, producer, metadata);
}
return;
} else {
// The producer's future was completed before by
Expand Down Expand Up @@ -1547,8 +1559,10 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
Optional.empty(), false/* producer is not ready now */);
getBrokerService().getInterceptor().
producerCreated(this, producer, metadata);
if (brokerInterceptor != null) {
brokerInterceptor.
producerCreated(this, producer, metadata);
}
}
});
}
Expand Down Expand Up @@ -1631,24 +1645,27 @@ protected void handleAck(CommandAck ack) {
final boolean hasRequestId = ack.hasRequestId();
final long requestId = hasRequestId ? ack.getRequestId() : 0;
final long consumerId = ack.getConsumerId();
final CommandAck finalAck = getBrokerService().getInterceptor() != null ? new CommandAck().copyFrom(ack) : null;
// It is necessary to make a copy of the CommandAck instance for the interceptor.
final CommandAck copyOfAckForInterceptor = brokerInterceptor != null ? new CommandAck().copyFrom(ack) : null;

if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
consumer.messageAcked(ack).thenRun(() -> {
if (hasRequestId) {
ctx.writeAndFlush(Commands.newAckResponse(
requestId, null, null, consumerId));
}
getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck);
if (hasRequestId) {
ctx.writeAndFlush(Commands.newAckResponse(
requestId, null, null, consumerId));
}
if (brokerInterceptor != null) {
brokerInterceptor.messageAcked(this, consumer, copyOfAckForInterceptor);
}
}).exceptionally(e -> {
if (hasRequestId) {
ctx.writeAndFlush(Commands.newAckResponse(requestId,
BrokerServiceException.getClientErrorCode(e),
e.getMessage(), consumerId));
}
return null;
});
if (hasRequestId) {
ctx.writeAndFlush(Commands.newAckResponse(requestId,
BrokerServiceException.getClientErrorCode(e),
e.getMessage(), consumerId));
}
return null;
});
}
}

Expand Down Expand Up @@ -1834,7 +1851,9 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) {
remoteAddress, producerId);
commandSender.sendSuccessResponse(requestId);
producers.remove(producerId, producerFuture);
getBrokerService().getInterceptor().producerClosed(this, producer, producer.getMetadata());
if (brokerInterceptor != null) {
brokerInterceptor.producerClosed(this, producer, producer.getMetadata());
}
});
}

Expand Down Expand Up @@ -1878,7 +1897,9 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
consumers.remove(consumerId, consumerFuture);
commandSender.sendSuccessResponse(requestId);
log.info("[{}] Closed consumer, consumerId={}", remoteAddress, consumerId);
getBrokerService().getInterceptor().consumerClosed(this, consumer, consumer.getMetadata());
if (brokerInterceptor != null) {
brokerInterceptor.consumerClosed(this, consumer, consumer.getMetadata());
}
} catch (BrokerServiceException e) {
log.warn("[{]] Error closing consumer {} : {}", remoteAddress, consumer, e);
commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
Expand Down Expand Up @@ -2704,7 +2725,9 @@ public ChannelHandlerContext ctx() {

@Override
protected void interceptCommand(BaseCommand command) throws InterceptException {
getBrokerService().getInterceptor().onPulsarCommand(command, this);
if (brokerInterceptor != null) {
brokerInterceptor.onPulsarCommand(command, this);
}
}

@Override
Expand Down Expand Up @@ -2987,12 +3010,15 @@ public ByteBufPair newMessageAndIntercept(long consumerId, long ledgerId, long e
ackSet, epoch);
ByteBufPair res = Commands.serializeCommandMessageWithSize(command, metadataAndPayload);
try {
val brokerInterceptor = getBrokerService().getInterceptor();
brokerInterceptor.onPulsarCommand(command, this);
if (brokerInterceptor != null) {
brokerInterceptor.onPulsarCommand(command, this);
}
CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
brokerInterceptor.messageDispatched(this, consumer, ledgerId, entryId, metadataAndPayload);
if (brokerInterceptor != null) {
brokerInterceptor.messageDispatched(this, consumer, ledgerId, entryId, metadataAndPayload);
}
}
} catch (Exception e) {
log.error("Exception occur when intercept messages.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo
}
try {
RequestWrapper requestWrapper = new RequestWrapper((HttpServletRequest) servletRequest);
interceptor.onWebserviceRequest(requestWrapper);
if (interceptor != null) {
interceptor.onWebserviceRequest(requestWrapper);
}
filterChain.doFilter(requestWrapper, servletResponse);
} catch (InterceptException e) {
exceptionHandler.handle(servletResponse, e);
Expand Down
Loading