New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ConcurrentSubscripiton avoid concurrent access for invalid demand #1015
Conversation
...lk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentUtils.java
Outdated
Show resolved
Hide resolved
private static long mapInvalidRequestN(long n) { | ||
// We map zero to a negative number because zero could later be overwritten by a subsequent legit value of | ||
// n, and we want to ensure the invalid use gets propagated. | ||
return n == CANCELLED ? CANCELLED + 1 : n == 0 ? -1 : n; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Imho chaining stuff like this is really not readable. Consider using a branch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Scottmitch ^^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to keep the code as is as it is more compact and I don't think readability is dramatically different between the two options:
current option:
return n == CANCELLED ? CANCELLED + 1 : n == 0 ? -1 : n;
suggested option:
if (n == CANCELLED) {
return CANCELLED + 1;
}
return n == 0 ? -1 : n;
...urrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentSubscription.java
Outdated
Show resolved
Hide resolved
...k-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherFlatMapSingleTest.java
Show resolved
Hide resolved
825e5c2
to
71b94a4
Compare
humm ... ci failures
|
@servicetalk-bot - test this please |
71b94a4
to
064f892
Compare
build failure attributed to #744 |
@servicetalk-bot - test this please |
ci failure...
|
@servicetalk-bot - test this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice simplification of ConcurrentSubscription
💯
...lk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentUtils.java
Show resolved
Hide resolved
...urrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentSubscription.java
Outdated
Show resolved
Hide resolved
...urrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentSubscription.java
Show resolved
Hide resolved
...urrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentSubscription.java
Outdated
Show resolved
Hide resolved
...urrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentSubscription.java
Outdated
Show resolved
Hide resolved
...urrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentSubscription.java
Outdated
Show resolved
Hide resolved
suspected CI related failure ... https://ci.servicetalk.io/job/servicetalk-java11-prb/1280/consoleText
|
@servicetalk-bot - test this please |
Benchmarks were revised to provide multi-threaded perf for non-reentrant case and more stack depth options for recursion. Summary:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Final comments then LGTM
servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/ConcurrentUtilsTest.java
Show resolved
Hide resolved
...lk-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentUtils.java
Show resolved
Hide resolved
...urrent-internal/src/main/java/io/servicetalk/concurrent/internal/ConcurrentSubscription.java
Outdated
Show resolved
Hide resolved
build failure attributed to #1040 |
02f1224
to
d259d2c
Compare
Motivation: ConcurrentSubscription currently propagates invalid demand without any concurrency protection. In general this is invalid use of the API but may invalidate underlying data structures that are not thread safe and result in undefined results. Modifications: - Use a simpler locking scheme inspired by Publisher#flatMapMerge design which allows for re-entry and also notification is another thread has attempted to acquire the lock which will trigger re-processing. Result: ConcurrentSubscripiton no longer allows any concurrent access and uses a more common/shareable locking utility.
d259d2c
to
5c9f925
Compare
discussed relaxing the exception handling with @NiteshKant offline. I've modified this PR to incorporate @NiteshKant's suggestions and we can enhance later if necessary. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
…ple#1015) Motivation: ConcurrentSubscription currently propagates invalid demand without any concurrency protection. In general this is invalid use of the API but may invalidate underlying data structures that are not thread safe and result in undefined results. Modifications: - Use a simpler locking scheme inspired by Publisher#flatMapMerge design which allows for re-entry and also notification is another thread has attempted to acquire the lock which will trigger re-processing. Result: ConcurrentSubscripiton no longer allows any concurrent access and uses a more common/shareable locking utility.
Motivation:
ConcurrentSubscription currently propagates invalid demand without any
concurrency protection. In general this is invalid use of the API but
may invalidate underlying data structures that are not thread safe and
result in undefined results.
Modifications:
which allows for re-entry and also notification is another thread has
attempted to acquire the lock which will trigger re-processing.
Result:
ConcurrentSubscripiton no longer allows any concurrent access and uses a
more common/shareable locking utility.