Skip to content

Commit

Permalink
[improve][broker] avoid creating new objects when intercepting (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao authored and Technoboy- committed May 30, 2024
1 parent 3fd59d2 commit 3e17c63
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.ClassLoaderSwitcher;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
Expand All @@ -51,16 +50,20 @@
public class BrokerInterceptorWithClassLoader implements BrokerInterceptor {

private final BrokerInterceptor interceptor;
private final NarClassLoader classLoader;
private final NarClassLoader narClassLoader;

@Override
public void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.beforeSendMessage(
subscription, entry, ackSet, msgMetadata);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

Expand All @@ -70,141 +73,217 @@ public void beforeSendMessage(Subscription subscription,
long[] ackSet,
MessageMetadata msgMetadata,
Consumer consumer) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.beforeSendMessage(
subscription, entry, ackSet, msgMetadata, consumer);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void onMessagePublish(Producer producer, ByteBuf headersAndPayload,
Topic.PublishContext publishContext) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.onMessagePublish(producer, headersAndPayload, publishContext);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void producerCreated(ServerCnx cnx, Producer producer,
Map<String, String> metadata){
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.producerCreated(cnx, producer, metadata);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void producerClosed(ServerCnx cnx,
Producer producer,
Map<String, String> metadata) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.producerClosed(cnx, producer, metadata);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void consumerCreated(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
this.interceptor.consumerCreated(
cnx, consumer, metadata);
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.consumerCreated(cnx, consumer, metadata);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void consumerClosed(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.consumerClosed(cnx, consumer, metadata);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}


@Override
public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
long entryId, Topic.PublishContext publishContext) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.messageProduced(cnx, producer, startTimeNs, ledgerId, entryId, publishContext);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,
long entryId, ByteBuf headersAndPayload) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.messageDispatched(cnx, consumer, ledgerId, entryId, headersAndPayload);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void messageAcked(ServerCnx cnx, Consumer consumer,
CommandAck ackCmd) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.messageAcked(cnx, consumer, ackCmd);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void txnOpened(long tcId, String txnID) {
this.interceptor.txnOpened(tcId, txnID);
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.txnOpened(tcId, txnID);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void txnEnded(String txnID, long txnAction) {
this.interceptor.txnEnded(txnID, txnAction);
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.txnEnded(txnID, txnAction);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void onConnectionCreated(ServerCnx cnx) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.onConnectionCreated(cnx);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.onPulsarCommand(command, cnx);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void onConnectionClosed(ServerCnx cnx) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.onConnectionClosed(cnx);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.onWebserviceRequest(request);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse response)
throws IOException, ServletException {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.onWebserviceResponse(request, response);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void initialize(PulsarService pulsarService) throws Exception {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.initialize(pulsarService);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void close() {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
interceptor.close();
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}

try {
classLoader.close();
narClassLoader.close();
} catch (IOException e) {
log.warn("Failed to close the broker interceptor class loader", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testLoadBrokerEventListener() throws Exception {
BrokerInterceptorWithClassLoader returnedPhWithCL = BrokerInterceptorUtils.load(metadata, "");
BrokerInterceptor returnedPh = returnedPhWithCL.getInterceptor();

assertSame(mockLoader, returnedPhWithCL.getClassLoader());
assertSame(mockLoader, returnedPhWithCL.getNarClassLoader());
assertTrue(returnedPh instanceof MockBrokerInterceptor);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void close() {
new BrokerInterceptorWithClassLoader(interceptor, narLoader);
ClassLoader curClassLoader = Thread.currentThread().getContextClassLoader();
// test class loader
assertEquals(brokerInterceptorWithClassLoader.getClassLoader(), narLoader);
assertEquals(brokerInterceptorWithClassLoader.getNarClassLoader(), narLoader);
// test initialize
brokerInterceptorWithClassLoader.initialize(mock(PulsarService.class));
assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
Expand Down

0 comments on commit 3e17c63

Please sign in to comment.