Skip to content

Commit

Permalink
[improve][misc] Use shared Jackson ObjectMapper to reduce overhead an…
Browse files Browse the repository at this point in the history
…d remove ThreadLocal solution (#19160)
  • Loading branch information
lhotari committed Jan 13, 2023
1 parent fb6e0b8 commit ec102fb
Show file tree
Hide file tree
Showing 165 changed files with 1,194 additions and 536 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void cleanup() {
Object ongoingStubbing = MethodUtils.invokeMethod(mockingProgress, "pullOngoingStubbing");
if (ongoingStubbing != null) {
Object mock = MethodUtils.invokeMethod(ongoingStubbing, "getMock");
if (mock != null) {
if (mock != null && MockUtil.isMock(mock)) {
LOG.warn("Invalid usage of Mockito detected on thread {}."
+ " There is ongoing stubbing on mock of class={} instance={}",
thread, mock.getClass().getName(), mock);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.tests;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.commons.lang3.ClassUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This TestNG listener contains cleanup for some singletons or caches.
*/
public class SingletonCleanerListener extends BetweenTestClassesListenerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(SingletonCleanerListener.class);
private static final Method OBJECTMAPPERFACTORY_CLEANCACHES_METHOD;
private static final Method OBJECTMAPPERFACTORY_REFRESH_METHOD;

static {
Class<?> objectMapperFactoryClazz =
null;
try {
objectMapperFactoryClazz = ClassUtils.getClass("org.apache.pulsar.common.util.ObjectMapperFactory");
} catch (ClassNotFoundException e) {
LOG.warn("Cannot find ObjectMapperFactory class", e);
}

Method clearCachesMethod = null;
try {
if (objectMapperFactoryClazz != null) {
clearCachesMethod =
objectMapperFactoryClazz
.getMethod("clearCaches");
}
} catch (NoSuchMethodException e) {
LOG.warn("Cannot find method for cleaning singleton ObjectMapper caches", e);
}
OBJECTMAPPERFACTORY_CLEANCACHES_METHOD = clearCachesMethod;

Method refreshMethod = null;
try {
if (objectMapperFactoryClazz != null) {
refreshMethod =
objectMapperFactoryClazz
.getMethod("refresh");
}
} catch (NoSuchMethodException e) {
LOG.warn("Cannot find method for refreshing singleton ObjectMapper instances", e);
}
OBJECTMAPPERFACTORY_REFRESH_METHOD = refreshMethod;
}

@Override
protected void onBetweenTestClasses(Class<?> endedTestClass, Class<?> startedTestClass) {
cleanObjectMapperFactoryCaches();
refreshObjectMapperFactory();
}

// Call ObjectMapperFactory.clearCaches() using reflection to clear up classes held in
// the singleton Jackson ObjectMapper instances
private static void cleanObjectMapperFactoryCaches() {
if (OBJECTMAPPERFACTORY_CLEANCACHES_METHOD != null) {
try {
OBJECTMAPPERFACTORY_CLEANCACHES_METHOD.invoke(null);
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.warn("Cannot clean singleton ObjectMapper caches", e);
}
}
}

// Call ObjectMapperFactory.refresh() using reflection to release ObjectMapper instances
// that might be holding on classloaders and classes
private static void refreshObjectMapperFactory() {
if (OBJECTMAPPERFACTORY_REFRESH_METHOD != null) {
try {
OBJECTMAPPERFACTORY_REFRESH_METHOD.invoke(null);
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.warn("Cannot refresh ObjectMapper instances", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ static Pair<NarClassLoader, LedgerOffloaderFactory> getOffloaderFactory(String n
.build();
String configStr = ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME);

OffloaderDefinition conf = ObjectMapperFactory.getThreadLocalYaml()
OffloaderDefinition conf = ObjectMapperFactory.getYamlMapper().getObjectMapper()
.readValue(configStr, OffloaderDefinition.class);
if (StringUtils.isEmpty(conf.getOffloaderFactoryClass())) {
throw new IOException(
Expand Down Expand Up @@ -120,7 +120,7 @@ public static OffloaderDefinition getOffloaderDefinition(String narPath, String
.build()) {
String configStr = ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME);

return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, OffloaderDefinition.class);
return ObjectMapperFactory.getYamlMapper().reader().readValue(configStr, OffloaderDefinition.class);
}
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1495,7 +1495,7 @@ flexible messaging model and an intuitive client API.</description>
<properties>
<property>
<name>listener</name>
<value>org.apache.pulsar.tests.PulsarTestListener,org.apache.pulsar.tests.AnnotationListener,org.apache.pulsar.tests.FailFastNotifier,org.apache.pulsar.tests.MockitoCleanupListener,org.apache.pulsar.tests.FastThreadLocalCleanupListener,org.apache.pulsar.tests.ThreadLeakDetectorListener</value>
<value>org.apache.pulsar.tests.PulsarTestListener,org.apache.pulsar.tests.AnnotationListener,org.apache.pulsar.tests.FailFastNotifier,org.apache.pulsar.tests.MockitoCleanupListener,org.apache.pulsar.tests.FastThreadLocalCleanupListener,org.apache.pulsar.tests.ThreadLeakDetectorListener,org.apache.pulsar.tests.SingletonCleanerListener</value>
</property>
</properties>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public CompletableFuture<Optional<CacheGetResult<LocalPolicies>>> getLocalPolici
public CompletableFuture<Void> setLocalPoliciesWithVersion(NamespaceName ns, LocalPolicies policies,
Optional<Long> version) {
try {
byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies);
byte[] content = ObjectMapperFactory.getMapper().writer().writeValueAsBytes(policies);
return getStore().put(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), content, version)
.thenApply(__ -> null);
} catch (JsonProcessingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public AdditionalServletDefinition getAdditionalServletDefinition(

private AdditionalServletDefinition getAdditionalServletDefinition(NarClassLoader ncl) throws IOException {
String configStr = ncl.getServiceDefinition(ADDITIONAL_SERVLET_FILE);
return ObjectMapperFactory.getThreadLocalYaml().readValue(
return ObjectMapperFactory.getYamlMapper().reader().readValue(
configStr, AdditionalServletDefinition.class
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testLoadEventListener() throws Exception {

NarClassLoader mockLoader = mock(NarClassLoader.class);
when(mockLoader.getServiceDefinition(eq(AdditionalServletUtils.ADDITIONAL_SERVLET_FILE)))
.thenReturn(ObjectMapperFactory.getThreadLocalYaml().writeValueAsString(def));
.thenReturn(ObjectMapperFactory.getYamlMapper().writer().writeValueAsString(def));
Class listenerClass = MockAdditionalServlet.class;
when(mockLoader.loadClass(eq(MockAdditionalServlet.class.getName())))
.thenReturn(listenerClass);
Expand Down Expand Up @@ -80,7 +80,7 @@ public void testLoadEventListenerWithBlankListerClass() throws Exception {

NarClassLoader mockLoader = mock(NarClassLoader.class);
when(mockLoader.getServiceDefinition(eq(AdditionalServletUtils.ADDITIONAL_SERVLET_FILE)))
.thenReturn(ObjectMapperFactory.getThreadLocalYaml().writeValueAsString(def));
.thenReturn(ObjectMapperFactory.getYamlMapper().writer().writeValueAsString(def));
Class listenerClass = MockAdditionalServlet.class;
when(mockLoader.loadClass(eq(MockAdditionalServlet.class.getName())))
.thenReturn(listenerClass);
Expand Down Expand Up @@ -108,7 +108,7 @@ public void testLoadEventListenerWithWrongListerClass() throws Exception {

NarClassLoader mockLoader = mock(NarClassLoader.class);
when(mockLoader.getServiceDefinition(eq(AdditionalServletUtils.ADDITIONAL_SERVLET_FILE)))
.thenReturn(ObjectMapperFactory.getThreadLocalYaml().writeValueAsString(def));
.thenReturn(ObjectMapperFactory.getYamlMapper().writer().writeValueAsString(def));
Class listenerClass = Runnable.class;
when(mockLoader.loadClass(eq(Runnable.class.getName())))
.thenReturn(listenerClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
*/
package org.apache.pulsar.broker.admin;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -445,8 +446,12 @@ protected AutoSubscriptionCreationOverrideImpl autoSubscriptionCreationOverride(
.build();
}

public static ObjectMapper jsonMapper() {
return ObjectMapperFactory.getThreadLocal();
protected ObjectWriter objectWriter() {
return ObjectMapperFactory.getMapper().writer();
}

protected ObjectReader objectReader() {
return ObjectMapperFactory.getMapper().reader();
}

protected Set<String> clusters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1301,7 +1301,7 @@ protected void internalSetRetention(RetentionPolicies retention) {
policies.retention_policies = retention;
namespaceResources().setPolicies(namespaceName, p -> policies);
log.info("[{}] Successfully updated retention configuration: namespace={}, map={}", clientAppId(),
namespaceName, jsonMapper().writeValueAsString(retention));
namespaceName, objectWriter().writeValueAsString(retention));
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
Expand Down Expand Up @@ -1540,7 +1540,7 @@ protected void internalSetSubscriptionAuthMode(SubscriptionAuthMode subscription
return policies;
});
log.info("[{}] Successfully updated subscription auth mode: namespace={}, map={}", clientAppId(),
namespaceName, jsonMapper().writeValueAsString(authMode));
namespaceName, objectWriter().writeValueAsString(authMode));

} catch (RestException pfe) {
throw pfe;
Expand Down Expand Up @@ -1590,7 +1590,7 @@ protected void internalSetPolicies(String fieldName, Object value) {
field.set(policies, value);
namespaceResources().setPolicies(namespaceName, p -> policies);
log.info("[{}] Successfully updated {} configuration: namespace={}, value={}", clientAppId(), fieldName,
namespaceName, jsonMapper().writeValueAsString(value));
namespaceName, objectWriter().writeValueAsString(value));

} catch (RestException pfe) {
throw pfe;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectReader;
import com.github.zafarkhaja.semver.Version;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -1313,12 +1314,14 @@ protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
final ObjectReader managedLedgerInfoReader = objectReader()
.forType(ManagedLedgerInfo.class);
futures.add(pulsar().getAdminClient().topics()
.getInternalInfoAsync(topicNamePartition.toString())
.thenApply((response) -> {
try {
return Pair.of(topicNamePartition.toString(), jsonMapper()
.readValue(response, ManagedLedgerInfo.class));
return Pair.of(topicNamePartition.toString(), managedLedgerInfoReader
.readValue(response));
} catch (JsonProcessingException e) {
throw new UncheckedIOException(e);
}
Expand Down Expand Up @@ -1347,7 +1350,7 @@ protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean
partitionedManagedLedgerInfo.partitions.put(info.getKey(), info.getValue());
}
asyncResponse.resume((StreamingOutput) output -> {
jsonMapper().writer().writeValue(output, partitionedManagedLedgerInfo);
objectWriter().writeValue(output, partitionedManagedLedgerInfo);
});
}
});
Expand Down Expand Up @@ -1384,7 +1387,7 @@ protected void internalGetManagedLedgerInfoForNonPartitionedTopic(AsyncResponse
@Override
public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
asyncResponse.resume((StreamingOutput) output -> {
jsonMapper().writer().writeValue(output, info);
objectWriter().writeValue(output, info);
});
}
@Override
Expand Down Expand Up @@ -3317,7 +3320,7 @@ protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQu
clientAppId(),
namespaceName,
topicName.getLocalName(),
jsonMapper().writeValueAsString(backLogQuotaMap));
objectWriter().writeValueAsString(backLogQuotaMap));
} catch (JsonProcessingException ignore) {
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -2697,7 +2696,7 @@ public void object(Map<String, Object> data) throws Exception {
} else {
first = true;
}
String json = ObjectMapperFactory.getThreadLocal().writeValueAsString(data);
String json = objectWriter().writeValueAsString(data);
out.write(json);
}

Expand Down
Loading

0 comments on commit ec102fb

Please sign in to comment.