Skip to content

Commit

Permalink
[fix][broker] EntryFilters fix NoClassDefFoundError due to closed cla…
Browse files Browse the repository at this point in the history
…ssloader (apache#22767)

(cherry picked from commit caccd54)
  • Loading branch information
eolivelli authored and nikhil-ctds committed May 31, 2024
1 parent 699e443 commit ad8472b
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ protected EntryFilter load(EntryFilterMetaData metadata)
+ " does not implement entry filter interface");
}
EntryFilter pi = (EntryFilter) filter;
return new EntryFilterWithClassLoader(pi, ncl);
// the classloader is shared with the broker, the instance doesn't own it
return new EntryFilterWithClassLoader(pi, ncl, false);
} catch (Throwable e) {
if (e instanceof IOException) {
throw (IOException) e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,23 @@
public class EntryFilterWithClassLoader implements EntryFilter {
private final EntryFilter entryFilter;
private final NarClassLoader classLoader;
private final boolean classLoaderOwned;

public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader) {
public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader, boolean classLoaderOwned) {
this.entryFilter = entryFilter;
this.classLoader = classLoader;
this.classLoaderOwned = classLoaderOwned;
}

@Override
public FilterResult filterEntry(Entry entry, FilterContext context) {
return entryFilter.filterEntry(entry, context);
ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(classLoader);
return entryFilter.filterEntry(entry, context);
} finally {
Thread.currentThread().setContextClassLoader(currentClassLoader);
}
}

@VisibleForTesting
Expand All @@ -48,11 +56,20 @@ public EntryFilter getEntryFilter() {

@Override
public void close() {
entryFilter.close();
ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
try {
classLoader.close();
} catch (IOException e) {
log.error("close EntryFilterWithClassLoader failed", e);
Thread.currentThread().setContextClassLoader(classLoader);
entryFilter.close();
} finally {
Thread.currentThread().setContextClassLoader(currentClassLoader);
}
if (classLoaderOwned) {
log.info("Closing classloader {} for EntryFilter {}", classLoader, entryFilter.getClass().getName());
try {
classLoader.close();
} catch (IOException e) {
log.error("close EntryFilterWithClassLoader failed", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ public void testFilter() throws Exception {
hasFilterField.setAccessible(true);
NarClassLoader narClassLoader = mock(NarClassLoader.class);
EntryFilter filter1 = new EntryFilterTest();
EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class, filter1, narClassLoader);
EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class, filter1, narClassLoader, false);
EntryFilter filter2 = new EntryFilter2Test();
EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class, filter2, narClassLoader);
EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class, filter2, narClassLoader, false);
field.set(dispatcher, List.of(loader1, loader2));
hasFilterField.set(dispatcher, true);

Expand Down Expand Up @@ -371,9 +371,9 @@ public void testFilteredMsgCount(String topic) throws Throwable {
hasFilterField.setAccessible(true);
NarClassLoader narClassLoader = mock(NarClassLoader.class);
EntryFilter filter1 = new EntryFilterTest();
EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader, false);
EntryFilter filter2 = new EntryFilter2Test();
EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader);
EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader, false);
field.set(dispatcher, List.of(loader1, loader2));
hasFilterField.set(dispatcher, true);

Expand Down Expand Up @@ -463,10 +463,10 @@ public void testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscriptio
NarClassLoader narClassLoader = mock(NarClassLoader.class);
EntryFilter filter1 = new EntryFilterTest();
EntryFilterWithClassLoader loader1 =
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader, false);
EntryFilter filter2 = new EntryFilterTest();
EntryFilterWithClassLoader loader2 =
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader);
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader, false);
field.set(dispatcher, List.of(loader1, loader2));
hasFilterField.set(dispatcher, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ public void testAvgMessagesPerEntry() throws Exception {
EntryFilter filter = new EntryFilterProducerTest();
EntryFilterWithClassLoader
loader = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter,
narClassLoader);
narClassLoader, false);
Pair<String, List<EntryFilter>> entryFilters = Pair.of("filter", List.of(loader));

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public void testSubscriptionStats(final String topic, final String subName, bool
NarClassLoader narClassLoader = mock(NarClassLoader.class);
EntryFilter filter1 = new EntryFilterTest();
EntryFilterWithClassLoader loader1 =
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader, false);
field.set(dispatcher, List.of(loader1));
hasFilterField.set(dispatcher, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -135,6 +136,7 @@ public class NarClassLoader extends URLClassLoader {
* The NAR for which this <tt>ClassLoader</tt> is responsible.
*/
private final File narWorkingDirectory;
private final AtomicBoolean closed = new AtomicBoolean();

private static final String TMP_DIR_PREFIX = "pulsar-nar";

Expand Down Expand Up @@ -292,4 +294,18 @@ protected String findLibrary(final String libname) {
public String toString() {
return NarClassLoader.class.getName() + "[" + narWorkingDirectory.getPath() + "]";
}

@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
if (closed.get()) {
log.warn("Loading class {} from a closed classloader ({})", name, this);
}
return super.loadClass(name, resolve);
}

@Override
public void close() throws IOException {
closed.set(true);
super.close();
}
}

0 comments on commit ad8472b

Please sign in to comment.