Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix deadlock in ExtensibleLoadManagerImplTest.initializeState #20130

Conversation

lifepuzzlefun
Copy link
Contributor

@lifepuzzlefun lifepuzzlefun commented Apr 18, 2023

fix #20157

deadlock stack:

"configuration-metadata-store-186-1@14606" prio=5 tid=0x175 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at org.apache.pulsar.broker.resources.BaseResources.get(BaseResources.java:88)
	  at org.apache.pulsar.broker.resources.NamespaceResources$IsolationPolicyResources.getIsolationDataPolicies(NamespaceResources.java:177)
	  at org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies.getIsolationPolicies(SimpleResourceAllocationPolicies.java:49)
	  at org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies.areIsolationPoliciesPresent(SimpleResourceAllocationPolicies.java:59)
	  at org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.applyNamespacePolicies(LoadManagerShared.java:103)
	  at org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper.applyIsolationPolicies(IsolationPoliciesHelper.java:52)
	  at org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerIsolationPoliciesFilter.filter(BrokerIsolationPoliciesFilter.java:53)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$selectAsync$12(ExtensibleLoadManagerImpl.java:394)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl$$Lambda$1347/0x0000000801645428.apply(Unknown Source:-1)
	  at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
	  at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.selectAsync(ExtensibleLoadManagerImpl.java:385)
	  at java.lang.invoke.LambdaForm$DMH/0x0000000800e1d000.invokeVirtual(LambdaForm$DMH:-1)
	  at java.lang.invoke.LambdaForm$MH/0x0000000800fac800.invoke(LambdaForm$MH:-1)
	  at java.lang.invoke.LambdaForm$MH/0x0000000800f46000.invoke(LambdaForm$MH:-1)
	  at java.lang.invoke.LambdaForm$MH/0x0000000800c14400.invokeExact_MT(LambdaForm$MH:-1)
	  at java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:732)
	  at org.mockito.internal.util.reflection.InstrumentationMemberAccessor$Dispatcher$ByteBuddy$MBY4HIiM.invokeWithArguments(Unknown Source:-1)
	  at org.mockito.internal.util.reflection.InstrumentationMemberAccessor.invoke(InstrumentationMemberAccessor.java:239)
	  at org.mockito.internal.util.reflection.ModuleMemberAccessor.invoke(ModuleMemberAccessor.java:55)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.tryInvoke(MockMethodAdvice.java:333)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.access$500(MockMethodAdvice.java:60)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice$RealMethodCall.invoke(MockMethodAdvice.java:253)
	  at org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:142)
	  at org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:45)
	  at org.mockito.Answers.answer(Answers.java:99)
	  at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:110)
	  at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
	  at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34)
	  at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:151)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.selectAsync(ExtensibleLoadManagerImpl.java:383)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$6(ExtensibleLoadManagerImpl.java:336)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl$$Lambda$1343/0x0000000801644b18.apply(Unknown Source:-1)
	  at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
	  at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$10(ExtensibleLoadManagerImpl.java:333)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl$$Lambda$1342/0x00000008016446c8.apply(Unknown Source:-1)
	  at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:409)
	  at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:243)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.assign(ExtensibleLoadManagerImpl.java:327)
	  at java.lang.invoke.LambdaForm$DMH/0x00000008013ac800.invokeVirtual(LambdaForm$DMH:-1)
	  at java.lang.invoke.LambdaForm$MH/0x00000008013ad400.invoke(LambdaForm$MH:-1)
	  at java.lang.invoke.LambdaForm$MH/0x0000000800f44800.invoke(LambdaForm$MH:-1)
	  at java.lang.invoke.LambdaForm$MH/0x0000000800c14400.invokeExact_MT(LambdaForm$MH:-1)
	  at java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:732)
	  at org.mockito.internal.util.reflection.InstrumentationMemberAccessor$Dispatcher$ByteBuddy$MBY4HIiM.invokeWithArguments(Unknown Source:-1)
	  at org.mockito.internal.util.reflection.InstrumentationMemberAccessor.invoke(InstrumentationMemberAccessor.java:239)
	  at org.mockito.internal.util.reflection.ModuleMemberAccessor.invoke(ModuleMemberAccessor.java:55)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.tryInvoke(MockMethodAdvice.java:333)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.access$500(MockMethodAdvice.java:60)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice$RealMethodCall.invoke(MockMethodAdvice.java:253)
	  at org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:142)
	  at org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:45)
	  at org.mockito.Answers.answer(Answers.java:99)
	  at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:110)
	  at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
	  at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34)
	  at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:151)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.assign(ExtensibleLoadManagerImpl.java:325)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper.findBrokerServiceUrl(ExtensibleLoadManagerWrapper.java:66)
	  at org.apache.pulsar.broker.namespace.NamespaceService.lambda$internalGetWebServiceUrl$9(NamespaceService.java:302)
	  at org.apache.pulsar.broker.namespace.NamespaceService$$Lambda$1337/0x000000080163bb68.apply(Unknown Source:-1)
	  at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
	  at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
	  at org.apache.pulsar.broker.namespace.NamespaceService.internalGetWebServiceUrl(NamespaceService.java:285)
	  at org.apache.pulsar.broker.namespace.NamespaceService.getWebServiceUrlAsync(NamespaceService.java:265)
	  at java.lang.invoke.LambdaForm$DMH/0x00000008013ac800.invokeVirtual(LambdaForm$DMH:-1)
	  at java.lang.invoke.LambdaForm$MH/0x0000000801640400.invoke(LambdaForm$MH:-1)
	  at java.lang.invoke.LambdaForm$MH/0x0000000800f44800.invoke(LambdaForm$MH:-1)
	  at java.lang.invoke.LambdaForm$MH/0x0000000800c14400.invokeExact_MT(LambdaForm$MH:-1)
	  at java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:732)
	  at org.mockito.internal.util.reflection.InstrumentationMemberAccessor$Dispatcher$ByteBuddy$MBY4HIiM.invokeWithArguments(Unknown Source:-1)
	  at org.mockito.internal.util.reflection.InstrumentationMemberAccessor.invoke(InstrumentationMemberAccessor.java:239)
	  at org.mockito.internal.util.reflection.ModuleMemberAccessor.invoke(ModuleMemberAccessor.java:55)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.tryInvoke(MockMethodAdvice.java:333)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.access$500(MockMethodAdvice.java:60)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice$RealMethodCall.invoke(MockMethodAdvice.java:253)
	  at org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:142)
	  at org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:45)
	  at org.mockito.Answers.answer(Answers.java:99)
	  at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:110)
	  at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
	  at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34)
	  at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:151)
	  at org.apache.pulsar.broker.namespace.NamespaceService.getWebServiceUrlAsync(NamespaceService.java:248)
	  at org.apache.pulsar.broker.web.PulsarWebResource.isBundleOwnedByAnyBroker(PulsarWebResource.java:614)
	  at org.apache.pulsar.broker.admin.impl.NamespacesBase.lambda$internalUnloadNamespaceBundleAsync$125(NamespacesBase.java:1052)
	  at org.apache.pulsar.broker.admin.impl.NamespacesBase$$Lambda$1336/0x000000080163b920.apply(Unknown Source:-1)
	  at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
	  at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	  at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	  at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$existsFromStore$7(ZKMetadataStore.java:351)
	  at org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$445/0x00000008010e0678.run(Unknown Source:-1)
	  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	  at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	  at java.util.concurrent.FutureTask.run(FutureTask.java:-1)
	  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	  at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	  at java.lang.Thread.run(Thread.java:833)

Motivation

this can be seen in local environment sometimes.

why deadlock happen ?

initializeState will call admin.namespaces().unload("public/default");

so in the broker side one call stack is

  • org.apache.pulsar.broker.admin.v2.Namespaces.internalUnloadNamespaceBundleAsync
    |_ org.apache.pulsar.broker.namespace.NamespaceService.isNamespaceBundleOwned
    |_ org.apache.pulsar.metadata.api.MetadataStore.exists
    |_ AbstractMetadataStore#existsCache -> asyncLoad
    |_ AbstractMetadataStore.existsFromStore
    |_ ZKMetadataStore.existsFromStore line:351 // we are on the configuration-metadata-store internal executor thread now.

so we triggered the CompletableFuture here and the following thing is always execute on this thread. ( the stack above is the following thing )

then the code will call on metadata store IsolationPolicyResources.getIsolationDataPolicies
so only one thread is here to trigger BatchMetadataStore to flush metadata call. but it won't be executed.
an deadlock occurs.

Modifications

force cache loading callback complete on forkjoinPool to avoid other task execute on metadataStore thread by CompletableFuture.

Verifying this change

the unit test should not fail on ExtensibleLoadManagerImplTest.initializeState

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Apr 18, 2023
@nodece
Copy link
Member

nodece commented Apr 18, 2023

Could you share the deadlock thread?

when metadata store thread is execute other async code path which will be executed on the metadata store an deadlock is occured.
@lifepuzzlefun lifepuzzlefun force-pushed the fix_dead_lock_on_metadata_store branch from 1c83bf2 to 1a17f37 Compare April 18, 2023 16:04
@lifepuzzlefun
Copy link
Contributor Author

lifepuzzlefun commented Apr 18, 2023

Could you share the deadlock thread?

more information applied, maybe we can only make the call internalUnloadNamespaceBundleAsync -> pulsar().getNamespaceService().isNamespaceBundleOwned jump to forkjoinPool.

.thenCompose(bundle ->
pulsar().getNamespaceService().isNamespaceBundleOwned(bundle)
.exceptionally(ex -> {
if (log.isDebugEnabled()) {
log.debug("Failed to validate cluster ownership for {}-{}, {}",
namespaceName.toString(), bundleRange, ex.getMessage(), ex);
}
return false;
})
)

@lifepuzzlefun lifepuzzlefun changed the title [improve][broker] Fix deadlock in ExtensibleLoadManagerImplTest.initializeState [fix][broker] Fix deadlock in ExtensibleLoadManagerImplTest.initializeState Apr 19, 2023
@lifepuzzlefun
Copy link
Contributor Author

@nodece @heesung-sn @Demogorgon314 can you take a look ? thank you :- )

} else {
// Do not refresh if we're not connected
return CompletableFuture.completedFuture(oldValue);
return CompletableFuture.supplyAsync(() -> oldValue);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use the executor in this class.

Copy link
Contributor Author

@lifepuzzlefun lifepuzzlefun May 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @Technoboy- after read the Caffeine asyncReload method javadoc. i change back to the origin CompletableFuture.completedFuture so there wont be any thread exchange. and the callback will execute on the origin cache.get thread which wont make deadlock like this cause.

Copy link
Contributor

@Technoboy- Technoboy- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use the executor in this class.

Copy link
Member

@nodece nodece left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you want to switch a thread to call the callback, I also do a similar thing: #13809, and I'm not sure whether this is correct, but I know we can avoid timeout by not using 'CompleteFuture#join' and 'CompletetableFuture#get'.

Let me know your thoughts.

@lhotari
Copy link
Member

lhotari commented May 4, 2023

@lifepuzzlefun thanks for the good investigation work. It would be useful to fix this deadlock. @merlimat can you provide advice how to address this? thanks

@lifepuzzlefun
Copy link
Contributor Author

copy that. sorry guys, i'm a little busy right now. i'll handle the comments when i am available.

@lifepuzzlefun
Copy link
Contributor Author

lifepuzzlefun commented May 7, 2023

@nodece hi, i think current metadataStore assume the caller should take the callback thread carefully and know if the logic will need to change thread. the most of origin logic will try to change thread explicitly to other thread. I agree with merlimat's comment #13809 (review) . maybe we can hide all the callback thread change logic inside the metadataStore which will prevent the deadlock happen again.

@lifepuzzlefun lifepuzzlefun requested a review from nodece May 7, 2023 06:42
@nodece
Copy link
Member

nodece commented May 7, 2023

@nodece hi, i think current metadataStore assume the caller should take the callback thread carefully and know if the logic will need to change thread. the most of origin logic will try to change thread explicitly to other thread. I agree with merlimat's comment #13809 (review) . maybe we can hide all the callback thread change logic inside the metadataStore which will prevent the deadlock happen again.

I agreed that, and I think we can make a discussion on the ML.

Copy link
Contributor

@poorbarcode poorbarcode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the root cause is that the interface BrokerFilter introduced by PIP-192, which declares a non-asynchronous API filter[1], and implementation BrokerIsolationPoliciesFilter uses a blocking call IsolationPolicyResources.getIsolationDataPolicies.

@Demogorgon314 Can we make the API BrokerFilter.filter asynchronous?

https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java#L45

@Demogorgon314
Copy link
Member

@poorbarcode Can you explain why making the API BrokerFilter.filter asynchronous can resolve this issue? Do we need a timeout for IsolationPolicyResources.getIsolationDataPolicies method?

@github-actions
Copy link

The pr had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Jun 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Flaky-test: ExtensibleLoadManagerImplTest.initializeState
6 participants