Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make readModifyUpdate in MetadataCacheImpl thread-safe #9900

Merged
merged 10 commits into from
Mar 22, 2021

Conversation

315157973
Copy link
Contributor

@315157973 315157973 commented Mar 13, 2021

Motivation

Now that the modification of Namespace Policies is not locked, multiple threads may modify the same local Policies object at the same time, which may cause thread safety issues.
The Policies object also contains non-thread-safe collections such as HashMap and HashSet. Concurrent operations on these objects also have thread-safety issues. E.g #9711

Modifications

Use clone+CAS method to ensure thread safety
Visibility: The cache will be cloned, only the cloned object will be modified, and the entire object will be written back eventually. The cached object in caffeine will be immediately visible to other threads
Atomicity: Modification refers to the replacement of the entire cloned object. There will be no intermediate state where some attribute values are modified, so it is atomic
Orderliness: Guaranteed by the implementation class of MetadataCache. Now the cached object corresponds to a version number. Use CAS to write back. If the writeback fails, it will read -> clone -> modify -> update again to ensure order.

Verifying this change

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we fall into some kind of deadlock?

It will become like when we are doing sync calls to Zookeeper, we are going to depend implicitly to an external (not under our control) execution serializer

@315157973
Copy link
Contributor Author

Can we fall into some kind of deadlock?

It will become like when we are doing sync calls to Zookeeper, we are going to depend implicitly to an external (not under our control) execution serializer

I don't quite understand what you mean, could you please help explain?

@lhotari
Copy link
Member

lhotari commented Mar 13, 2021

OrderedExecutor is used to ensure that all modification operations under each namespace are in a single thread. Different namespaces can be operated in parallel due to different objects.

This might not be a sufficient solution for ensuring thread safety. Considering read operations is equally important. How does the solution ensure that read operations are thread safe?

@315157973
Copy link
Contributor Author

OrderedExecutor is used to ensure that all modification operations under each namespace are in a single thread. Different namespaces can be operated in parallel due to different objects.

This might not be a sufficient solution for ensuring thread safety. Considering read operations is equally important. How does the solution ensure that read operations are thread safe?

I have added a snapshot, I don’t know if there is any omission

Visibility: A snapshot is generated for each modification. After the modification is completed, the entire snapshot is written to the cache, and the value of caffeine is immediately visible to all threads
Atomicity: Since we only modify the snapshot, finally writing entire snapshot to the cache is atomic
Orderliness: Because it is single-threaded, it is orderly

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that we are falling back in using a synchronous pattern to access the metadata service, and this may lead to deadlocks

@@ -166,7 +166,7 @@ private MetadataCacheImpl(MetadataStore store, MetadataSerde<T> serde) {
T newValueObj;
byte[] newValue;
try {
newValueObj = modifyFunction.apply(currentValue);
newValueObj = modifyFunction.apply(serde.deserialize(serde.serialize(currentValue)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment that explains that we are cloning the value?
Adding an explicit static method cloneValue will help in understanding better what's going on

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment has been added.
Now I just let the write operations of the same namespace be executed by the same thread, and will not lock, so there will be no deadlock problem

@eolivelli
Copy link
Contributor

If the problem is about modifying concurrently the Policies structure,
isn't it better to add a clone operation here when we execute the mutation function ?

public CompletableFuture<Void> readModifyUpdate(String path, Function<T, T> modifyFunction) {

MetadataCacheImpl already handles versioning in respect to the MetadataService, but the problem is that we are modifying the stored value returned from objCache.get().
We should clone that value, apply the modify function and then write.
and then return the modified (immutable) value.

cc @rdhabalia @merlimat

@315157973
Copy link
Contributor Author

MetadataCacheImpl is just one of the implementations of the Metadatacache, and we cannot control the subsequent implementations. The cache implementation should not care about these things. So I moved clone to namespaceBase.

@eolivelli
Copy link
Contributor

IMHO we should fix MetadataCacheImpl, the signature and the semantics of the method is to be documented.
A cache that lets you modify internal state is very likely to cause much problems in the future.
If there will be other implementations then we must ensure that they don't have this problem.

@315157973
Copy link
Contributor Author

IMHO we should fix MetadataCacheImpl, the signature and the semantics of the method is to be documented.
A cache that lets you modify internal state is very likely to cause much problems in the future.
If there will be other implementations then we must ensure that they don't have this problem.

You will have this problem with any local cache, such as GuavaCache, Caffeine. We cannot require these caches to guarantee.
Of course, it would be better if there is readClonedModifyUpdate.

@eolivelli
Copy link
Contributor

@315157973 IMHO there is no need to change the name to readClonedModifyUpdate because it is expected that entries in the cache are to be managed as immutable.
We should fix that implementation, write it explicitly in the JavaDocs, and possibly create a little test suite that asserts the behaviour of any implementations of the MetadataCache interface (for instance fail if you are able to change the internal status of cached objects by calling "set".

@lhotari
Copy link
Member

lhotari commented Mar 15, 2021

I have added a snapshot, I don’t know if there is any omission

@315157973 Good work. Please also update the description of this PR to cover the considerations about thread safety in accessing objects.

@lhotari
Copy link
Member

lhotari commented Mar 15, 2021

IMHO we should fix MetadataCacheImpl, the signature and the semantics of the method is to be documented.
A cache that lets you modify internal state is very likely to cause much problems in the future.
If there will be other implementations then we must ensure that they don't have this problem.

@eolivelli @315157973 I agree that the wider problem should be addressed. It seems that all classes that extend org.apache.pulsar.broker.resources.BaseResources are impacted.

@315157973 315157973 changed the title Use orderedExecutor to ensure all modification are in a single thread Use orderedExecutor to ensure that namespace-level operations are thread-safe Mar 15, 2021
@315157973 315157973 changed the title Use orderedExecutor to ensure that namespace-level operations are thread-safe Make namespace-level operations thread-safe Mar 15, 2021
@eolivelli
Copy link
Contributor

eolivelli commented Mar 15, 2021

@315157973
the mutation function is execution inside this kind of optimistic "lock"
here we get the version and we store it in a stack local variable

long expectedVersion = optEntry.get().getValue().getVersion();

then we write to ZK

return store.put(path, newValue, Optional.of(expectedVersion)).thenAccept(stat -> {

so there is no need to add a new version variable to Policies IMHO

@315157973
Copy link
Contributor Author

@315157973
the mutation function is execution inside this kind of optimistic "lock"
here we get the version and we store it in a stack local variable

long expectedVersion = optEntry.get().getValue().getVersion();

then we write to ZK

return store.put(path, newValue, Optional.of(expectedVersion)).thenAccept(stat -> {

so there is no need to add a new version variable to Policies IMHO

Yes, you are right, now it is optimistic lock

@315157973
Copy link
Contributor Author

We can only modify the set of BaseResources to readCloneModifyUpdate, so that all implementation classes can solve this problem.

@codelipenghui codelipenghui added the type/bug The PR fixed a bug or issue reported a bug label Mar 16, 2021
@codelipenghui codelipenghui added this to the 2.8.0 milestone Mar 16, 2021
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we are on our way.

I left one last comment.

thanks

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

Probably we should change the title and the description because now the fix is no more strictly related only to Policies

@315157973 315157973 changed the title Make namespace-level operations thread-safe Make readModifyUpdate in MetadataCacheImpl thread-safe Mar 16, 2021
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -166,6 +165,8 @@ private MetadataCacheImpl(MetadataStore store, MetadataSerde<T> serde) {
T newValueObj;
byte[] newValue;
try {
// Use clone and CAS zk to ensure thread safety
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do the same operation on the readModifyUpdateOrCreate method as well

@eolivelli eolivelli requested a review from merlimat March 17, 2021 07:02
@eolivelli
Copy link
Contributor

This error in a test case looks related to this patch:

Error: Tests run: 20, Failures: 1, Errors: 0, Skipped: 1, Time elapsed: 7.434 s <<< FAILURE! - in org.apache.pulsar.metadata.MetadataCacheTest
Error: testReadCloned(org.apache.pulsar.metadata.MetadataCacheTest) Time elapsed: 0.133 s <<< FAILURE!
java.util.concurrent.ExecutionException: org.apache.pulsar.metadata.api.MetadataStoreException$AlreadyExistsException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /policies
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.pulsar.metadata.MetadataCacheTest.testReadCloned(MetadataCacheTest.java:209)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:45)
at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:73)
at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.pulsar.metadata.api.MetadataStoreException$AlreadyExistsException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /policies
at org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl.lambda$create$9(MetadataCacheImpl.java:210)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$null$15(ZKMetadataStore.java:217)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
Caused by: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /policies
at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:297)
... 5 more
Caused by: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /policies
at org.apache.zookeeper.KeeperException.create(KeeperException.java:122)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:293)
... 5 more

[INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 16.252 s - in org.apache.pulsar.metadata.ZKSessionTest
[INFO]
[INFO] Results:
[INFO]
Error: Failures:
Error: org.apache.pulsar.metadata.MetadataCacheTest.testReadCloned(org.apache.pulsar.metadata.MetadataCacheTest)
[INFO] Run 1: PASS
Error: Run 2: MetadataCacheTest.testReadCloned:209 ? Execution org.apache.pulsar.metadata.ap...
[INFO] Run 3: PASS
[INFO]
[INFO]
Error: Tests run: 50, Failures: 1, Errors: 0, Skipped: 0

@315157973
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui codelipenghui merged commit 33063bb into apache:master Mar 22, 2021
yangl pushed a commit to yangl/pulsar that referenced this pull request Mar 22, 2021
### Motivation
Now that the modification of Namespace Policies is not locked, multiple threads may modify the same local Policies object at the same time, which may cause thread safety issues.
The Policies object also contains non-thread-safe collections such as HashMap and HashSet. Concurrent operations on these objects also have thread-safety issues. E.g apache#9711

### Modifications
Use clone+CAS method to ensure thread safety
Visibility: The cache will be cloned, only the cloned object will be modified, and the entire object will be written back eventually. The cached object in caffeine will be immediately visible to other threads
Atomicity: Modification refers to the replacement of the entire cloned object. There will be no intermediate state where some attribute values are modified, so it is atomic
Orderliness: Guaranteed by the implementation class of MetadataCache. Now the cached object corresponds to a version number. Use CAS to write back. If the writeback fails, it will read -> clone -> modify -> update again to ensure order.
@zymap
Copy link
Member

zymap commented Apr 14, 2021

This is a fix based on the new feature which should be released in 2.8.0. So I will remove the release/2.7.2 label.

@lhotari
Copy link
Member

lhotari commented Apr 26, 2021

This is a fix based on the new feature which should be released in 2.8.0. So I will remove the release/2.7.2 label.

There's a severe thread safety issue in 2.7.1, #9711, which this PR fixes. The issue is broader than authorization configuration and this PR addresses the fix for policy updates in a general way.
@zymap @315157973 @eolivelli @codelipenghui Could we backport this fix to 2.7.x ?

@eolivelli
Copy link
Contributor

We cannot pick this patch as it is.
We should create another one.
The change is not trivial and it could introduce additional instability.
I am not convinced to try to fix it now that we are close to the RC.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants