Skip to content

Fix modifying Map in policies is not thread safe#9850

Closed
315157973 wants to merge 3 commits intoapache:masterfrom
315157973:thread
Closed

Fix modifying Map in policies is not thread safe#9850
315157973 wants to merge 3 commits intoapache:masterfrom
315157973:thread

Conversation

@315157973
Copy link
Contributor

@315157973 315157973 commented Mar 9, 2021

#9711

Motivation

The policies of the namespace contains Map, and when we modify the Policies, the Map API will be called. If we use HashMap, it will cause thread safety issues.

Modifications

HashMap changed to ConcurrentHashMap

Verifying this change

After jackson serialization and deserialization, the type of map is still ConcurrentHashMap

@315157973
Copy link
Contributor Author

@lhotari Please take a look

@lhotari
Copy link
Member

lhotari commented Mar 9, 2021

@315157973 Good work. I think it would be great if the issue gets fixed with such a simple change.

I think that the changes would also have to cover org.apache.pulsar.common.policies.data.AuthPolicies class in order to fix #9711 . The mutations & thread-safe access of subscription_types_enabled and replication_clusters would need a solution too.

@315157973
Copy link
Contributor Author

315157973 commented Mar 9, 2021

subscription_types_enabled

subscription_types_enabled will create a new HashSet every time
replication_clusters only adds data when namespace is created or StandaloneWorker is initialized, and there will be no multithreading in the above scenarios.

@lhotari
Copy link
Member

lhotari commented Mar 9, 2021

subscription_types_enabled will create a new HashSet every time, which can be considered to be copy-on-write

copy-on-write alone isn't sufficient in Java since it could lead to "unsafe publication" where some values in the published reference aren't visible to the other thread (safe publication explained).
By googling a simple demonstration of the problem: https://gist.github.com/synther/2631238 .

Sharing a HashSet to another thread requires the usage of Collections.unmodifiableSet wrapper (unless there is another way to ensure "safe publication"). The usage of Collections.unmodifiableSet wrapper ensures safe publication. This is explained in this SO answer: https://stackoverflow.com/a/5379941

Similarly, sharing a HashMap reference requires the usage of Collections.unmodifiableMap wrapper unless there is some other means to ensure "safe publication".

@315157973
Copy link
Contributor Author

315157973 commented Mar 9, 2021

subscription_types_enabled will create a new HashSet every time, which can be considered to be copy-on-write

copy-on-write alone isn't sufficient in Java since it could lead to "unsafe publication" where some values in the published reference aren't visible to the other thread (safe publication explained).
By googling a simple demonstration of the problem: https://gist.github.com/synther/2631238 .

Sharing a HashSet to another thread requires the usage of Collections.unmodifiableSet wrapper (unless there is another way to ensure "safe publication"). The usage of Collections.unmodifiableSet wrapper ensures safe publication. This is explained in this SO answer: https://stackoverflow.com/a/5379941

Similarly, sharing a HashMap reference requires the usage of Collections.unmodifiableMap wrapper unless there is some other means to ensure "safe publication".

You're right. Sorry, I missed some points. This Set will only add data when it is created, and it will only be read later.

@lhotari
Copy link
Member

lhotari commented Mar 9, 2021

replication_clusters only adds data when namespace is created or StandaloneWorker is initialized, and there will be no multithreading in the above scenarios.

As long as the field is accessed in another thread, it's necessary to handle it.

@lhotari
Copy link
Member

lhotari commented Mar 9, 2021

What about org.apache.pulsar.common.policies.data.AuthPolicies ? That would need to be handled too.

@lhotari
Copy link
Member

lhotari commented Mar 9, 2021

This Set will only add data when it is created, and it will only be read later.

If this happens in different threads, it must be handled.

@315157973
Copy link
Contributor Author

315157973 commented Mar 9, 2021

This Set will only add data when it is created, and it will only be read later.

If this happens in different threads, it must be handled.

Different threads will not add data to a same Set at the same time, they will construct their own Set and then add
Then there is no thread to modify it, it is "unmodifiable"

@lhotari
Copy link
Member

lhotari commented Mar 9, 2021

Then there is no thread to modify it, it is "unmodifiable"

"safe publication" is also needed for objects that aren't modified. I guess there's an existing way to achieve "safe publication" in that case?

@315157973
Copy link
Contributor Author

Then there is no thread to modify it, it is "unmodifiable"

"safe publication" is also needed for objects that aren't modified. I guess there's an existing way to achieve "safe publication" in that case?

I understand that you are talking about the visibility of objects to other threads in Policies, just like double check locks.
But this PR is just to fix the thread safety of objects in Policies, such as Set and Map. Even if the current HashMap is set to be concurrent, it will only solve the problem of concurrent operation of the Map.

If we want to solve the problem of visibility, we can set all internal objects to volatile, but it doesn't make much sense. Now all the attributes in Policies are objects, and many of them are directly assigned. Does this kind of short-lived invisible have a great impact on us?
When the user uses the Get method, old data may be read. But writing to zk will not be affected. When the version is different, the new policy will be read again, then the policy will be modified again, and then written to ZK

If it has a big impact, we can open another Issue to solve the visibility of all the objects in the Policy

@315157973
Copy link
Contributor Author

What about org.apache.pulsar.common.policies.data.AuthPolicies ? That would need to be handled too.

Yes,I will add in soon

@lhotari
Copy link
Member

lhotari commented Mar 10, 2021

If we want to solve the problem of visibility, we can set all internal objects to volatile, but it doesn't make much sense. Now all the attributes in Policies are objects, and many of them are directly assigned. Does this kind of short-lived invisible have a great impact on us?

Yes, it's really necessary to achieve "safe publication" when new objects are shared to other threads. Otherwise, thread safety issues won't be fixed.

Using "volatile" isn't necessary for achieving "safe publication".

Since the description of this PR says "Fixes #9711", I think that should be removed from the description or the remaining thread safety issues should be fixed as part of this PR.

@lhotari
Copy link
Member

lhotari commented Mar 11, 2021

Great work @315157973 , really elegant.

@lhotari
Copy link
Member

lhotari commented Mar 11, 2021

It's great that the thread safely of the data structures is now handled so we won't run into infinite loops which would happen with plain HashMaps. With the ConcurrentHashMaps, it will also be possible to handle data consistency issues.

There are some remaining data consistency ("lost update") issues when mutating AuthPolicies maps, for example here:

updatePolicies(path(POLICIES, namespaceName.toString()), policies ->{
policies.auth_policies.namespace_auth.remove(role);
return policies;
});

pulsarResources.getNamespaceResources().set(policiesPath, (policies)->{
policies.auth_policies.namespace_auth.put(role, actions);
return policies;
});

Policies policies = pulsarResources.getNamespaceResources().get(policiesPath)
.orElseThrow(() -> new NotFoundException(policiesPath + " not found"));
if (remove) {
if (policies.auth_policies.subscription_auth_roles.get(subscriptionName) != null) {
policies.auth_policies.subscription_auth_roles.get(subscriptionName).removeAll(roles);
}else {
log.info("[{}] Couldn't find role {} while revoking for sub = {}", namespace, subscriptionName, roles);
result.completeExceptionally(new IllegalArgumentException("couldn't find subscription"));
return result;
}
} else {
policies.auth_policies.subscription_auth_roles.put(subscriptionName, roles);
}

namespaceResources().set(path(POLICIES, namespaceName.toString()), (policies) -> {
if (!policies.auth_policies.destination_auth.containsKey(topicUri)) {
policies.auth_policies.destination_auth.put(topicUri, new HashMap<>());
}
policies.auth_policies.destination_auth.get(topicUri).put(role, actions);
return policies;
});

String namespacePath = path(POLICIES, namespaceName.toString());
namespaceResources().set(namespacePath, (p) -> {
p.auth_policies.destination_auth.get(topicUri).remove(role);
return p;
});

There might be some other locations besides the ones above.

Would it help to use Map.compute instead of .get(..).removeAll(...) and .put(..,..) for mutations? Isn't Map.compute atomic for ConcurrentHashMap? WDYT?

@315157973 are you thinking of addressing the mutations of the AuthPolicies maps in this PR or separately?

@315157973
Copy link
Contributor Author

It's great that the thread safely of the data structures is now handled so we won't run into infinite loops which would happen with plain HashMaps. With the ConcurrentHashMaps, it will also be possible to handle data consistency issues.

There are some remaining data consistency ("lost update") issues when mutating AuthPolicies maps, for example here:

updatePolicies(path(POLICIES, namespaceName.toString()), policies ->{
policies.auth_policies.namespace_auth.remove(role);
return policies;
});

pulsarResources.getNamespaceResources().set(policiesPath, (policies)->{
policies.auth_policies.namespace_auth.put(role, actions);
return policies;
});

Policies policies = pulsarResources.getNamespaceResources().get(policiesPath)
.orElseThrow(() -> new NotFoundException(policiesPath + " not found"));
if (remove) {
if (policies.auth_policies.subscription_auth_roles.get(subscriptionName) != null) {
policies.auth_policies.subscription_auth_roles.get(subscriptionName).removeAll(roles);
}else {
log.info("[{}] Couldn't find role {} while revoking for sub = {}", namespace, subscriptionName, roles);
result.completeExceptionally(new IllegalArgumentException("couldn't find subscription"));
return result;
}
} else {
policies.auth_policies.subscription_auth_roles.put(subscriptionName, roles);
}

namespaceResources().set(path(POLICIES, namespaceName.toString()), (policies) -> {
if (!policies.auth_policies.destination_auth.containsKey(topicUri)) {
policies.auth_policies.destination_auth.put(topicUri, new HashMap<>());
}
policies.auth_policies.destination_auth.get(topicUri).put(role, actions);
return policies;
});

String namespacePath = path(POLICIES, namespaceName.toString());
namespaceResources().set(namespacePath, (p) -> {
p.auth_policies.destination_auth.get(topicUri).remove(role);
return p;
});

There might be some other locations besides the ones above.

Would it help to use Map.compute instead of .get(..).removeAll(...) and .put(..,..) for mutations? Isn't Map.compute atomic for ConcurrentHashMap? WDYT?

@315157973 are you thinking of addressing the mutations of the AuthPolicies maps in this PR or separately?

The solution I thought of:
I will start a new PR to add zk version to the policies cache.
When the policies cache in some thread is dirty(with old version), writing to zk will report an error. At this time, the latest cache will be read, and then this Function will be called again to ensure that the dirty cache will not overwrite the new data.

@lhotari
Copy link
Member

lhotari commented Mar 11, 2021

The solution I thought of:
I will start a new PR to add zk version to the policies cache.
When the policies cache in some thread is dirty(with old version), writing to zk will report an error. At this time, the latest cache will be read, and then this Function will be called again to ensure that the dirty cache will not overwrite the new data.

Yes, I think that it's a good idea to handle the ZK and distributed updates issues separately. Please create a new issue for the data consistency issues around ZK and distributed modifications of policies since that is a broader scope than #9711.

My previous question was about preventing the "lost updates" problems that happen in the local data structures. I consider that problem a part of #9711.

Please take a look if you could change the mutation logic to prevent the "lost updates" issue in the locations that were listed in my previous comment.

@315157973 Could you resolve the local "lost updates" issues as part of this current PR, since after that I think it's ok to close 9711 with this PR? WDYT?

@315157973
Copy link
Contributor Author

I thought of these solutions:

1 Tag Policies with the version of zk, lock them when reading the cache, and clone a copy of Policies. When updating to zk, if the version is incorrect, re-read the latest cache of zk and redo the modification, and then update.

2 In the unit of namespace, all namespace operations are put into orderExecutor, and each namespace is processed by a fixed thread. Then call back.

I tend to use method 2, and this PR will be closed

@lhotari

@lhotari
Copy link
Member

lhotari commented Mar 11, 2021

I thought of these solutions:

1 Tag Policies with the version of zk, lock them when reading the cache, and clone a copy of Policies. When updating to zk, if the version is incorrect, re-read the latest cache of zk and redo the modification, and then update.

2 In the unit of namespace, all namespace operations are put into orderExecutor, and each namespace is processed by a fixed thread. Then call back.

I tend to use method 2, and this PR will be closed

Sounds good. It would be useful to write a separate issue which explains the current challenges in updating policies in a distributed setup with multiple brokers.

The only problem with this approach is that we cannot close 9711 before the other PRs are delivered. I can provide a fix for 9711 in the meantime (before the ZK & single writer approach is delivered) using Map.compute. That could become obsolete later, but it doesn't cause any harm to do that as an intermediate step since it would fix 9711.

@315157973
Copy link
Contributor Author

315157973 commented Mar 11, 2021

I thought of these solutions:
1 Tag Policies with the version of zk, lock them when reading the cache, and clone a copy of Policies. When updating to zk, if the version is incorrect, re-read the latest cache of zk and redo the modification, and then update.
2 In the unit of namespace, all namespace operations are put into orderExecutor, and each namespace is processed by a fixed thread. Then call back.
I tend to use method 2, and this PR will be closed

Sounds good. It would be useful to write a separate issue which explains the current challenges in updating policies in a distributed setup with multiple brokers.

The only problem with this approach is that we cannot close 9711 before the other PRs are delivered. I can provide a fix for 9711 in the meantime (before the ZK & single writer approach is delivered) using Map.compute. That could become obsolete later, but it doesn't cause any harm to do that as an intermediate step since it would fix 9711.

If it is single-threaded processing, there is no competition problem, and my PR will fix your problem.
And the changes will be small and can be modified uniformly
I will submit a PR this week

@lhotari
Copy link
Member

lhotari commented Mar 11, 2021

If it is single-threaded processing, there is no competition problem, and my PR will fix your problem.

Yes, it will eventually fix it. It would be helpful if you could create a separate issue about the ZK issues that you are planning to address since that is beyond #9711. Are you fine with that?


policies.auth_policies.destination_auth.get(topicUri).put(role, actions);
Set<AuthAction> authActionSet = Collections.newSetFromMap(new ConcurrentHashMap<>());
authActionSet.addAll(actions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend using the Map.compute method here instead of separate lookups and mutations since besides fixing "lost updates", the code is cleaner also when concurrency is not a concern.

Comment on lines +297 to +300
pulsarResources.getNamespaceResources().set(policiesPath, (policies) -> {
Set<AuthAction> authActionSet = Collections.newSetFromMap(new ConcurrentHashMap<>());
authActionSet.addAll(actions);
policies.auth_policies.namespace_auth.put(role, authActionSet);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use Map.compute for the mutation (rational in the previous comment)

Comment on lines 357 to 361
} else {
policies.auth_policies.subscription_auth_roles.put(subscriptionName, roles);
Set<String> roleSet = Collections.newSetFromMap(new ConcurrentHashMap<>());
roleSet.addAll(roles);
policies.auth_policies.subscription_auth_roles.put(subscriptionName, roleSet);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle as part of the Map.compute function

public BundlesData bundles;
@SuppressWarnings("checkstyle:MemberName")
public Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlog_quota_map = Maps.newHashMap();
@JsonDeserialize(as = ConcurrentHashMap.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not follow this approach because it leaves out other fields (that are also non-thread-safe) and other classes that will have the same issue.

Instead, we should try to find a way to the handling of modifications to these policies is synchronized.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat That's true that there is this uncomfortable feeling. :) However, it seems that this ConcurrentHashMap based solution makes a lot of sense in this context. One of the benefits is that the change doesn't require huge architectural changes and the risk that it impacts performance is very minor.

Instead, we should try to find a way to the handling of modifications to these policies is synchronized.

Does that help? Wouldn't it mean that reads are also synchronized? Wouldn't that impact performance? Perhaps I misunderstood what you meant with "synchronized".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it mean that reads are also synchronized?

Not necessarily, we could enforce to always clone the POJO before updating them, in copy-on-write fashion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat Yes, makes sense. There were some ideas in that direction in #9711 (comment) .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create a new PR and use orderedExecutor to make it thread safe

@315157973 315157973 closed this Mar 11, 2021
@315157973 315157973 deleted the thread branch March 12, 2021 13:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants