Conversation
WalkthroughWalkthroughThe recent updates to the ReactivePlusPlus library enhance its error-handling capabilities with a new retry mechanism for observables, allowing for automatic resubscription on encountering errors. This functionality is encapsulated in the Changes
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- src/rpp/rpp/operators/on_error_resume_next.hpp (1 hunks)
- src/rpp/rpp/operators/retry.hpp (1 hunks)
Files skipped from review due to trivial changes (1)
- src/rpp/rpp/operators/on_error_resume_next.hpp
Additional comments not posted (6)
src/rpp/rpp/operators/retry.hpp (6)
1-16: Header and includes are standard.The file header and includes are appropriate and follow standard practices.
20-34:retry_state_tis well-structured with atomic operations.The class effectively manages retry state with thread safety considerations using atomic operations.
36-77:retry_observer_strategycorrectly handles retry logic.The strategy manages observer interactions, error handling, and disposal states effectively.
79-101:drainfunction robustly manages retries.The function effectively handles the retry loop with proper error handling and state management.
103-122:retry_tencapsulates retry logic effectively.The class provides a clear structure for managing retries, with efficient memory management.
126-147: Namespacerpp::operatorsprovides clear documentation.The
retryfunction is well-documented, offering clear guidance for users.
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- src/rpp/rpp/operators/retry.hpp (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- src/rpp/rpp/operators/retry.hpp
BENCHMARK RESULTS (AUTOGENERATED)
|
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 303.00 ns | 2.16 ns | 2.16 ns | 1.00 |
| Subscribe empty callbacks to empty observable via pipe operator | 305.01 ns | 2.16 ns | 2.16 ns | 1.00 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 699.44 ns | 0.31 ns | 0.31 ns | 1.00 |
| from array of 1 - create + subscribe + current_thread | 1050.33 ns | 3.70 ns | 3.43 ns | 1.08 |
| concat_as_source of just(1 immediate) create + subscribe | 2252.28 ns | 106.74 ns | 124.97 ns | 0.85 |
| defer from array of 1 - defer + create + subscribe + immediate | 729.05 ns | 0.31 ns | 0.31 ns | 1.00 |
| interval - interval + take(3) + subscribe + immediate | 2168.38 ns | 59.19 ns | 59.24 ns | 1.00 |
| interval - interval + take(3) + subscribe + current_thread | 2980.71 ns | 32.40 ns | 32.46 ns | 1.00 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 30939.05 ns | 28918.55 ns | 27261.09 ns | 1.06 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 44364.57 ns | 51753.52 ns | 51775.00 ns | 1.00 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3574.26 ns | 123.75 ns | 138.45 ns | 0.89 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1102.36 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+filter(true)+subscribe | 855.87 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+skip(1)+subscribe | 1016.16 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 854.10 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+first()+subscribe | 1227.73 ns | 0.62 ns | 0.31 ns | 2.00 |
| immediate_just(1,2)+last()+subscribe | 904.75 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+take_last(1)+subscribe | 1122.30 ns | 17.28 ns | 17.91 ns | 0.96 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 857.43 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 293.77 ns | 2.16 ns | 2.16 ns | 1.00 |
| current_thread scheduler create worker + schedule | 367.88 ns | 5.86 ns | 5.87 ns | 1.00 |
| current_thread scheduler create worker + schedule + recursive schedule | 820.91 ns | 57.32 ns | 55.85 ns | 1.03 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 853.64 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+scan(10, std::plus)+subscribe | 886.13 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 2356.59 ns | 162.65 ns | 158.70 ns | 1.02 |
| immediate_just+buffer(2)+subscribe | 1539.70 ns | 13.59 ns | 13.90 ns | 0.98 |
| immediate_just+window(2)+subscribe + subscsribe inner | 2386.22 ns | 1052.26 ns | 1091.53 ns | 0.96 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 858.52 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 860.17 ns | 0.31 ns | 0.31 ns | 1.00 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 2125.06 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3430.07 ns | 175.82 ns | 173.15 ns | 1.02 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3667.55 ns | 166.78 ns | 175.26 ns | 0.95 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 135.30 ns | 135.51 ns | 1.00 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3534.75 ns | 983.78 ns | 1004.59 ns | 0.98 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 2124.02 ns | 208.66 ns | 207.71 ns | 1.00 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 34.57 ns | 14.71 ns | 14.60 ns | 1.01 |
| subscribe 100 observers to publish_subject | 200322.00 ns | 15309.64 ns | 15476.84 ns | 0.99 |
| 100 on_next to 100 observers to publish_subject | 32626.84 ns | 17313.90 ns | 20245.56 ns | 0.86 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1441.56 ns | 12.65 ns | 12.66 ns | 1.00 |
| basic sample with immediate scheduler | 1396.15 ns | 5.55 ns | 5.24 ns | 1.06 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 905.57 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2038.68 ns | 971.10 ns | 965.70 ns | 1.01 |
| create(on_error())+retry(1)+subscribe | 598.86 ns | 109.62 ns | - | 0.00 |
ci-macos
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 1187.15 ns | 4.64 ns | 5.42 ns | 0.86 |
| Subscribe empty callbacks to empty observable via pipe operator | 1508.41 ns | 5.71 ns | 5.44 ns | 1.05 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 2286.86 ns | 0.40 ns | 0.33 ns | 1.22 |
| from array of 1 - create + subscribe + current_thread | 3087.13 ns | 38.86 ns | 46.17 ns | 0.84 |
| concat_as_source of just(1 immediate) create + subscribe | 6513.60 ns | 389.64 ns | 463.26 ns | 0.84 |
| defer from array of 1 - defer + create + subscribe + immediate | 2382.32 ns | 0.29 ns | 0.33 ns | 0.91 |
| interval - interval + take(3) + subscribe + immediate | 6259.24 ns | 150.07 ns | 158.37 ns | 0.95 |
| interval - interval + take(3) + subscribe + current_thread | 7144.83 ns | 111.19 ns | 132.00 ns | 0.84 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 104630.44 ns | 105356.60 ns | 111711.90 ns | 0.94 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 107706.10 ns | 107828.80 ns | 122468.10 ns | 0.88 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 9733.28 ns | 460.85 ns | 521.99 ns | 0.88 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 3696.19 ns | 0.27 ns | 0.33 ns | 0.84 |
| immediate_just+filter(true)+subscribe | 2496.69 ns | 0.40 ns | 0.32 ns | 1.23 |
| immediate_just(1,2)+skip(1)+subscribe | 3420.55 ns | 0.28 ns | 0.32 ns | 0.86 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 2487.50 ns | 0.56 ns | 0.65 ns | 0.86 |
| immediate_just(1,2)+first()+subscribe | 3759.93 ns | 0.27 ns | 0.33 ns | 0.84 |
| immediate_just(1,2)+last()+subscribe | 2824.09 ns | 0.30 ns | 0.33 ns | 0.93 |
| immediate_just+take_last(1)+subscribe | 4347.43 ns | 0.31 ns | 0.33 ns | 0.96 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 2526.49 ns | 0.27 ns | 0.33 ns | 0.84 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 999.31 ns | 4.83 ns | 5.77 ns | 0.84 |
| current_thread scheduler create worker + schedule | 1535.71 ns | 42.71 ns | 51.33 ns | 0.83 |
| current_thread scheduler create worker + schedule + recursive schedule | 2498.10 ns | 243.94 ns | 285.29 ns | 0.86 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 2454.15 ns | 4.89 ns | 6.16 ns | 0.79 |
| immediate_just+scan(10, std::plus)+subscribe | 2868.41 ns | 0.57 ns | 0.65 ns | 0.87 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 6476.37 ns | 508.55 ns | 563.80 ns | 0.90 |
| immediate_just+buffer(2)+subscribe | 2986.51 ns | 82.07 ns | 90.45 ns | 0.91 |
| immediate_just+window(2)+subscribe + subscsribe inner | 6432.45 ns | 2791.11 ns | 3348.55 ns | 0.83 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 2557.26 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 2464.55 ns | 0.27 ns | 0.33 ns | 0.84 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 6424.16 ns | 5.74 ns | 6.59 ns | 0.87 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 9406.92 ns | 513.08 ns | 616.68 ns | 0.83 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 10103.87 ns | 541.20 ns | 615.16 ns | 0.88 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 576.71 ns | 652.43 ns | 0.88 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 9342.73 ns | 2222.78 ns | 2663.51 ns | 0.83 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 6153.80 ns | 989.34 ns | 1162.84 ns | 0.85 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 89.67 ns | 60.40 ns | 66.13 ns | 0.91 |
| subscribe 100 observers to publish_subject | 402135.00 ns | 48039.23 ns | 55122.70 ns | 0.87 |
| 100 on_next to 100 observers to publish_subject | 64833.35 ns | 20436.14 ns | 22465.44 ns | 0.91 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 3194.79 ns | 79.58 ns | 91.36 ns | 0.87 |
| basic sample with immediate scheduler | 3197.01 ns | 22.32 ns | 25.10 ns | 0.89 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 3055.51 ns | 0.28 ns | 0.31 ns | 0.89 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 7938.20 ns | 5210.70 ns | 5531.59 ns | 0.94 |
| create(on_error())+retry(1)+subscribe | 2168.93 ns | 351.81 ns | - | 0.00 |
ci-ubuntu-clang
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 276.84 ns | 1.54 ns | 1.56 ns | 0.99 |
| Subscribe empty callbacks to empty observable via pipe operator | 282.89 ns | 1.54 ns | 1.54 ns | 1.00 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 570.43 ns | 0.31 ns | 0.31 ns | 1.00 |
| from array of 1 - create + subscribe + current_thread | 785.93 ns | 4.32 ns | 4.02 ns | 1.08 |
| concat_as_source of just(1 immediate) create + subscribe | 2388.50 ns | 135.35 ns | 135.76 ns | 1.00 |
| defer from array of 1 - defer + create + subscribe + immediate | 777.93 ns | 0.31 ns | 0.31 ns | 1.00 |
| interval - interval + take(3) + subscribe + immediate | 2220.55 ns | 58.30 ns | 58.36 ns | 1.00 |
| interval - interval + take(3) + subscribe + current_thread | 3206.03 ns | 30.86 ns | 30.86 ns | 1.00 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 30509.37 ns | 28199.08 ns | 27933.41 ns | 1.01 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 41048.04 ns | 37078.26 ns | 36567.84 ns | 1.01 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3666.62 ns | 157.74 ns | 157.15 ns | 1.00 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1163.68 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+filter(true)+subscribe | 848.48 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+skip(1)+subscribe | 1078.42 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 861.14 ns | 0.62 ns | 0.62 ns | 1.01 |
| immediate_just(1,2)+first()+subscribe | 1386.91 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+last()+subscribe | 1007.37 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+take_last(1)+subscribe | 1173.00 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 861.01 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 284.70 ns | 1.54 ns | 1.54 ns | 1.00 |
| current_thread scheduler create worker + schedule | 396.77 ns | 4.78 ns | 4.50 ns | 1.06 |
| current_thread scheduler create worker + schedule + recursive schedule | 856.80 ns | 55.85 ns | 55.72 ns | 1.00 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 850.66 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+scan(10, std::plus)+subscribe | 963.24 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 2236.02 ns | 136.68 ns | 140.84 ns | 0.97 |
| immediate_just+buffer(2)+subscribe | 1515.33 ns | 13.59 ns | 13.59 ns | 1.00 |
| immediate_just+window(2)+subscribe + subscsribe inner | 2417.96 ns | 939.81 ns | 927.46 ns | 1.01 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 844.01 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 833.39 ns | 0.31 ns | 0.31 ns | 1.00 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 2001.22 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3334.01 ns | 158.01 ns | 159.02 ns | 0.99 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3750.04 ns | 145.91 ns | 147.45 ns | 0.99 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 144.33 ns | 145.28 ns | 0.99 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3379.53 ns | 855.86 ns | 849.83 ns | 1.01 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 2213.24 ns | 200.73 ns | 200.40 ns | 1.00 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 54.16 ns | 17.85 ns | 17.85 ns | 1.00 |
| subscribe 100 observers to publish_subject | 208961.00 ns | 16078.98 ns | 15985.26 ns | 1.01 |
| 100 on_next to 100 observers to publish_subject | 42802.08 ns | 17443.39 ns | 20745.52 ns | 0.84 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1283.59 ns | 11.75 ns | 11.11 ns | 1.06 |
| basic sample with immediate scheduler | 1305.22 ns | 6.17 ns | 5.86 ns | 1.05 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 1016.62 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2165.30 ns | 1217.26 ns | 1230.05 ns | 0.99 |
| create(on_error())+retry(1)+subscribe | 651.18 ns | 145.65 ns | - | 0.00 |
ci-windows
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 561.78 ns | 4.81 ns | 4.94 ns | 0.97 |
| Subscribe empty callbacks to empty observable via pipe operator | 586.28 ns | 4.72 ns | 4.94 ns | 0.96 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 1155.75 ns | 4.63 ns | 5.55 ns | 0.83 |
| from array of 1 - create + subscribe + current_thread | 1417.54 ns | 15.44 ns | 15.74 ns | 0.98 |
| concat_as_source of just(1 immediate) create + subscribe | 3730.72 ns | 171.40 ns | 174.06 ns | 0.98 |
| defer from array of 1 - defer + create + subscribe + immediate | 1194.53 ns | 4.94 ns | 5.24 ns | 0.94 |
| interval - interval + take(3) + subscribe + immediate | 3681.68 ns | 133.23 ns | 133.31 ns | 1.00 |
| interval - interval + take(3) + subscribe + current_thread | 3462.54 ns | 53.80 ns | 55.25 ns | 0.97 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 120255.56 ns | 114160.00 ns | 114811.11 ns | 0.99 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 129255.56 ns | 129800.00 ns | 132911.11 ns | 0.98 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 5301.84 ns | 211.42 ns | 205.81 ns | 1.03 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1820.35 ns | 12.88 ns | 12.86 ns | 1.00 |
| immediate_just+filter(true)+subscribe | 1313.72 ns | 12.38 ns | 12.38 ns | 1.00 |
| immediate_just(1,2)+skip(1)+subscribe | 1725.97 ns | 13.01 ns | 13.13 ns | 0.99 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 1651.66 ns | 15.95 ns | 15.95 ns | 1.00 |
| immediate_just(1,2)+first()+subscribe | 2042.57 ns | 12.96 ns | 12.95 ns | 1.00 |
| immediate_just(1,2)+last()+subscribe | 2040.24 ns | 13.78 ns | 13.79 ns | 1.00 |
| immediate_just+take_last(1)+subscribe | 2075.72 ns | 59.15 ns | 59.68 ns | 0.99 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 1367.98 ns | 13.79 ns | 13.77 ns | 1.00 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 477.66 ns | 6.18 ns | 6.17 ns | 1.00 |
| current_thread scheduler create worker + schedule | 656.30 ns | 14.54 ns | 13.68 ns | 1.06 |
| current_thread scheduler create worker + schedule + recursive schedule | 1084.11 ns | 102.53 ns | 107.43 ns | 0.95 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 1313.12 ns | 12.40 ns | 12.26 ns | 1.01 |
| immediate_just+scan(10, std::plus)+subscribe | 1429.08 ns | 21.60 ns | 21.58 ns | 1.00 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 3464.61 ns | 202.36 ns | 207.43 ns | 0.98 |
| immediate_just+buffer(2)+subscribe | 2632.37 ns | 58.51 ns | 57.92 ns | 1.01 |
| immediate_just+window(2)+subscribe + subscsribe inner | 4010.85 ns | 1320.07 ns | 1290.18 ns | 1.02 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 1638.47 ns | 11.45 ns | 11.44 ns | 1.00 |
| immediate_just+take_while(true)+subscribe | 1328.72 ns | 12.38 ns | 12.37 ns | 1.00 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 3178.55 ns | 7.40 ns | 7.71 ns | 0.96 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 5159.64 ns | 222.52 ns | 221.45 ns | 1.00 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 5429.05 ns | 215.96 ns | 213.27 ns | 1.01 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 198.03 ns | 197.27 ns | 1.00 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 5404.23 ns | 947.31 ns | 953.15 ns | 0.99 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 3557.34 ns | 531.27 ns | 525.58 ns | 1.01 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 37.29 ns | 19.68 ns | 20.02 ns | 0.98 |
| subscribe 100 observers to publish_subject | 266400.00 ns | 27856.41 ns | 28978.05 ns | 0.96 |
| 100 on_next to 100 observers to publish_subject | 51882.35 ns | 38670.37 ns | 32658.06 ns | 1.18 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1892.96 ns | 57.32 ns | 62.66 ns | 0.91 |
| basic sample with immediate scheduler | 2305.52 ns | 36.72 ns | 36.72 ns | 1.00 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 1483.88 ns | 19.97 ns | 20.01 ns | 1.00 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2188.35 ns | 335.96 ns | 350.50 ns | 0.96 |
| create(on_error())+retry(1)+subscribe | 1711.94 ns | 145.86 ns | - | 0.00 |
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- src/rpp/rpp/operators/retry.hpp (1 hunks)
- src/tests/rpp/test_retry.cpp (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- src/rpp/rpp/operators/retry.hpp
Additional comments not posted (2)
src/tests/rpp/test_retry.cpp (2)
23-58: Test case "retry handles errors properly" is well-structured.The test case effectively verifies the retry mechanism with different retry counts using a mock observer. The use of trompeloeil for sequence verification is appropriate.
61-77: Test case "retry handles stack overflow" is well-designed.This test case effectively stress tests the retry mechanism with a large retry count, ensuring it handles potential stack overflow scenarios. The use of constexpr for the retry count is a good practice.
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- src/tests/rpp/test_repeat.cpp (2 hunks)
Additional comments not posted (1)
src/tests/rpp/test_repeat.cpp (1)
141-157: LGTM! The test case effectively handles stack overflow.The test case "repeat handles stack overflow" is well-implemented. It correctly sets up a mock observer and verifies the behavior of the
repeatoperator under a high load scenario. The use of Trompeloeil for setting expectations is appropriate.
|
TODO: doxygen |
There was a problem hiding this comment.
Actionable comments posted: 1
Outside diff range, codebase verification and nitpick comments (1)
src/examples/rpp/doxygen/retry.cpp (1)
1-30: Examples are well-implemented.The examples demonstrate the retry operator effectively.
Add comments for clarity.
Consider adding comments to explain the purpose of each example block for better understanding.
//! [retry] +// Example of retrying the observable twice upon error rpp::source::concat(rpp::source::just(1, 2, 3), rpp::source::error<int>({})) | rpp::operators::retry(2) | rpp::operators::subscribe([](int v) { std::cout << v << " "; }, [](const std::exception_ptr&) { std::cout << "error" << std::endl; }, []() { std::cout << "completed" << std::endl; }); // Output: 1 2 3 1 2 3 1 2 3 error //! [retry] //! [retry_infinitely] +// Example of retrying the observable indefinitely, limited by take(10) rpp::source::concat(rpp::source::just(1, 2, 3), rpp::source::error<int>({})) | rpp::operators::retry() | rpp::operators::take(10) | rpp::operators::subscribe([](int v) { std::cout << v << " "; }, [](const std::exception_ptr&) { std::cout << "error" << std::endl; }, []() { std::cout << "completed" << std::endl; }); // Output: 1 2 3 1 2 3 1 2 3 1 completed //! [retry_infinitely]
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (5)
- src/examples/rpp/doxygen/retry.cpp (1 hunks)
- src/rpp/rpp/operators.hpp (1 hunks)
- src/rpp/rpp/operators/fwd.hpp (1 hunks)
- src/rpp/rpp/operators/retry.hpp (1 hunks)
- src/tests/rpp/test_retry.cpp (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- src/tests/rpp/test_retry.cpp
Additional comments not posted (3)
src/rpp/rpp/operators.hpp (1)
132-132: Inclusion ofretry.hppis appropriate.The addition of the
retry.hppheader file aligns with the new retry functionality.src/rpp/rpp/operators/fwd.hpp (2)
111-111: LGTM! Verify the implementation ofretry().The function declaration is consistent with existing patterns. Ensure that the implementation aligns with the intended retry mechanism.
Run the following script to verify the implementation of
retry():Verification successful
Implementation of
retry()VerifiedThe
retry()function is implemented insrc/rpp/rpp/operators/retry.hppand is used across multiple test cases and examples, indicating its correct functionality.
- Implementation: Located in
src/rpp/rpp/operators/retry.hpp.- Tests: Found in
src/tests/rpp/test_retry.cpp, covering various scenarios.- Examples: Documented in
src/examples/rpp/doxygen/retry.cpp.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the implementation of `retry()`. # Test: Search for the function implementation. Expect: Implementation details. ast-grep --lang cpp --pattern $'auto retry() { $$$ }'Length of output: 54
Script:
#!/bin/bash # Description: Search for the keyword `retry` in the repository to locate its implementation or usage. # Test: Use `rg` to find occurrences of `retry` and include context lines. rg 'retry' -A 5Length of output: 18294
109-109: LGTM! Verify the implementation ofretry(size_t count).The function declaration is consistent with existing patterns. Ensure that the implementation aligns with the intended retry mechanism.
Run the following script to verify the implementation of
retry(size_t count):Verification successful
Implementation of
retry(size_t count)is verified.The function is implemented in
src/rpp/rpp/operators/retry.hppand is actively used in test cases and examples, confirming its presence and functionality in the codebase.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the implementation of `retry(size_t count)`. # Test: Search for the function implementation. Expect: Implementation details. ast-grep --lang cpp --pattern $'auto retry(size_t count) { $$$ }'Length of output: 66
Script:
#!/bin/bash # Search for any implementation of a function named `retry` in the codebase. ast-grep --lang cpp --pattern 'auto retry($$$) { $$$ }'Length of output: 57
Script:
#!/bin/bash # Search for references to the `retry` function in the codebase. rg 'retry' -A 5Length of output: 18294
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- src/benchmarks/benchmarks.cpp (1 hunks)
- src/rpp/rpp/operators/retry.hpp (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- src/rpp/rpp/operators/retry.hpp
Additional comments not posted (1)
src/benchmarks/benchmarks.cpp (1)
658-675: Verify the retry mechanism's effectiveness.The section tests the retry mechanism by creating an observable that immediately triggers an error and applies a retry operator with a single retry attempt. Ensure that the retry logic is correctly implemented and that the observable resubscribes as expected.
Consider adding assertions or checks to confirm the retry behavior and expected outcomes.
|



Summary by CodeRabbit
New Features
Improvements
constinstances, improving usability in immutable contexts.Tests