Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Fix thread safety issue for multi topic consumer #14380

Merged

Conversation

BewareMyPower
Copy link
Contributor

Motivation

In C++ client, if a consumer subscribes multiple topics, a
MultiTopicsConsumerImpl object, which manages a vector of
ConsumerImpls (consumers_ field), will be created. However,
consumers_ could be accessed by multiple threads, while no
mutex is locked to protect the access to make it thread safe.

Modifications

  • Add a SynchronizedHashMap class, which implements some thread safe
    methods of traverse, remove, find, clear operations. Since the
    forEach methods could call other methods, use the recursive mutex
    instead of the default mutex.
  • Add a related test SynchronizedHashMapTest to test the methods and
    the thread safety of SynchronizedHashMap.
  • Use SynchronizedHashMap as the type of
    MultiTopicsConsumerImpl::consumers_.

**Motivation**

In C++ client, if a consumer subscribes multiple topics, a
`MultiTopicsConsumerImpl` object, which manages a vector of
`ConsumerImpl`s (`consumers_` field), will be created. However,
`consumers_` could be accessed by multiple threads, while no
mutex is locked to protect the access to make it thread safe.

**Modifications**

- Add a `SynchronizedHashMap` class, which implements some thread safe
  methods of traverse, remove, find, clear operations. Since the
  `forEach` methods could call other methods, use the recursive mutex
  instead of the default mutex.
- Add a related test `SynchronizedHashMapTest` to test the methods and
  the thread safety of `SynchronizedHashMap`.
- Use `SynchronizedHashMap` as the type of
  `MultiTopicsConsumerImpl::consumers_`.
@BewareMyPower
Copy link
Contributor Author

Hi @merlimat could you take a look at this PR?

@merlimat merlimat merged commit f94eba9 into apache:master Mar 10, 2022
@gaoran10 gaoran10 added cherry-picked/branch-2.9 Archived: 2.9 is end of life release/2.9.2 and removed release/2.9.3 labels Mar 11, 2022
gaoran10 pushed a commit that referenced this pull request Mar 11, 2022
* [C++] Fix thread safety issue for multi topic consumer

**Motivation**

In C++ client, if a consumer subscribes multiple topics, a
`MultiTopicsConsumerImpl` object, which manages a vector of
`ConsumerImpl`s (`consumers_` field), will be created. However,
`consumers_` could be accessed by multiple threads, while no
mutex is locked to protect the access to make it thread safe.

**Modifications**

- Add a `SynchronizedHashMap` class, which implements some thread safe
  methods of traverse, remove, find, clear operations. Since the
  `forEach` methods could call other methods, use the recursive mutex
  instead of the default mutex.
- Add a related test `SynchronizedHashMapTest` to test the methods and
  the thread safety of `SynchronizedHashMap`.
- Use `SynchronizedHashMap` as the type of
  `MultiTopicsConsumerImpl::consumers_`.

* Add findFirstValueIf method

* Remove unnecessary return value of forEach

* Fix incorrect calls of forEachValue

* Add missed header

(cherry picked from commit f94eba9)
codelipenghui pushed a commit that referenced this pull request Mar 12, 2022
* [C++] Fix thread safety issue for multi topic consumer

**Motivation**

In C++ client, if a consumer subscribes multiple topics, a
`MultiTopicsConsumerImpl` object, which manages a vector of
`ConsumerImpl`s (`consumers_` field), will be created. However,
`consumers_` could be accessed by multiple threads, while no
mutex is locked to protect the access to make it thread safe.

**Modifications**

- Add a `SynchronizedHashMap` class, which implements some thread safe
  methods of traverse, remove, find, clear operations. Since the
  `forEach` methods could call other methods, use the recursive mutex
  instead of the default mutex.
- Add a related test `SynchronizedHashMapTest` to test the methods and
  the thread safety of `SynchronizedHashMap`.
- Use `SynchronizedHashMap` as the type of
  `MultiTopicsConsumerImpl::consumers_`.

* Add findFirstValueIf method

* Remove unnecessary return value of forEach

* Fix incorrect calls of forEachValue

* Add missed header

(cherry picked from commit f94eba9)
@codelipenghui codelipenghui modified the milestones: 2.11.0, 2.10.0 Mar 12, 2022
@BewareMyPower BewareMyPower deleted the bewaremypower/cpp-synchronized-map branch March 13, 2022 14:44
codelipenghui pushed a commit that referenced this pull request Mar 18, 2022
* [C++] Fix thread safety issue for multi topic consumer

**Motivation**

In C++ client, if a consumer subscribes multiple topics, a
`MultiTopicsConsumerImpl` object, which manages a vector of
`ConsumerImpl`s (`consumers_` field), will be created. However,
`consumers_` could be accessed by multiple threads, while no
mutex is locked to protect the access to make it thread safe.

**Modifications**

- Add a `SynchronizedHashMap` class, which implements some thread safe
  methods of traverse, remove, find, clear operations. Since the
  `forEach` methods could call other methods, use the recursive mutex
  instead of the default mutex.
- Add a related test `SynchronizedHashMapTest` to test the methods and
  the thread safety of `SynchronizedHashMap`.
- Use `SynchronizedHashMap` as the type of
  `MultiTopicsConsumerImpl::consumers_`.

* Add findFirstValueIf method

* Remove unnecessary return value of forEach

* Fix incorrect calls of forEachValue

* Add missed header

(cherry picked from commit f94eba9)
@codelipenghui codelipenghui added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Mar 18, 2022
Nicklee007 pushed a commit to Nicklee007/pulsar that referenced this pull request Apr 20, 2022
* [C++] Fix thread safety issue for multi topic consumer

**Motivation**

In C++ client, if a consumer subscribes multiple topics, a
`MultiTopicsConsumerImpl` object, which manages a vector of
`ConsumerImpl`s (`consumers_` field), will be created. However,
`consumers_` could be accessed by multiple threads, while no
mutex is locked to protect the access to make it thread safe.

**Modifications**

- Add a `SynchronizedHashMap` class, which implements some thread safe
  methods of traverse, remove, find, clear operations. Since the
  `forEach` methods could call other methods, use the recursive mutex
  instead of the default mutex.
- Add a related test `SynchronizedHashMapTest` to test the methods and
  the thread safety of `SynchronizedHashMap`.
- Use `SynchronizedHashMap` as the type of
  `MultiTopicsConsumerImpl::consumers_`.

* Add findFirstValueIf method

* Remove unnecessary return value of forEach

* Fix incorrect calls of forEachValue

* Add missed header
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-picked/branch-2.8 Archived: 2.8 is end of life cherry-picked/branch-2.9 Archived: 2.9 is end of life doc-not-needed Your PR changes do not impact docs release/2.8.4 release/2.9.2 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants