Skip to content

refactor concat to speedup it#677

Merged
AlexInLog merged 2 commits into
v2from
refactor_concat
Nov 10, 2024
Merged

refactor concat to speedup it#677
AlexInLog merged 2 commits into
v2from
refactor_concat

Conversation

@AlexInLog

@AlexInLog AlexInLog commented Nov 7, 2024

Copy link
Copy Markdown
Owner

#671

Summary by CodeRabbit

  • New Features

    • Enhanced memory management and lifecycle handling for disposables in the reactive programming framework.
    • Simplified method signatures for improved usability.
    • Expanded test coverage for the concat operator, including new scenarios for completion behavior and disposal management.
  • Bug Fixes

    • Improved handling of disposables and observer notifications for better performance and reliability.

@AlexInLog AlexInLog changed the title refactor concat refactor concat to speedup it Nov 7, 2024
@coderabbitai

coderabbitai Bot commented Nov 7, 2024

Copy link
Copy Markdown
Contributor
📝 Walkthrough
📝 Walkthrough

Walkthrough

The pull request introduces significant modifications to the concat_disposable class in the rpp::operators::details namespace. It now inherits from rpp::composite_disposable and implements rpp::details::enable_wrapper_from_this, altering memory management and lifecycle handling. The drain, handle_observable, and handle_observable_impl methods have been simplified by removing the refcounted parameter. Additionally, the concat_inner_observer_strategy and concat_observer_strategy structures have been redefined to eliminate their base class inheritance, enhancing the encapsulation and management of disposables.

Changes

File Path Change Summary
src/rpp/rpp/operators/concat.hpp - concat_disposable class now inherits from rpp::composite_disposable and rpp::details::enable_wrapper_from_this.
- Removed refcounted parameter from drain, handle_observable, and handle_observable_impl methods.
- Simplified definitions of concat_inner_observer_strategy and concat_observer_strategy by removing inheritance from concat_observer_strategy_base.
src/tests/rpp/test_concat.cpp - Expanded test cases for concat operator to cover new scenarios, including completion behavior and disposal checks.
- Included new header <rpp/sources/empty.hpp> for testing with empty observables.

Possibly related PRs

  • Fix disposables issue #640: This PR modifies the concat_state_t class in concat.hpp, which is closely related to the concat_disposable class in the main PR, as both involve changes to memory management and lifecycle handling of disposables.
  • remove useless calls #644: This PR removes disposal calls in the on_error method within concat.hpp, which directly affects the error handling logic of the concat_disposable class in the main PR, indicating a shared focus on resource management during error scenarios.
  • Fix deadlocks in operators related to is_disposed #661: This PR addresses modifications in the is_disposed logic across various operators, which relates to the memory management and lifecycle handling improvements made in the main PR.
  • add new benchmarks #676: The addition of a benchmark for the concat() operator in this PR is directly related to the changes made in the main PR, as it tests the functionality of the concat operator that has been modified.

🐰 In the land of code where bunnies play,
The concat_disposable found a new way.
With simpler methods and a lighter load,
It hops through disposables, a smoother road.
So let’s cheer for changes, both bright and bold,
In the garden of logic, new stories unfold! 🌼✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (2)
src/rpp/rpp/operators/concat.hpp (2)

170-174: Optimize memory ordering in atomic operations for performance

The use of std::memory_order::seq_cst in compare_exchange_strong provides the strongest memory ordering guarantees but may introduce unnecessary performance overhead. Consider using std::memory_order_acq_rel or std::memory_order_release if sufficient, to improve performance without compromising thread safety.

Apply this diff to adjust the memory order:

 if (disposable->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst))
     disposable->handle_observable(std::forward<T>(v));
 else
     disposable->get_queue()->push(std::forward<T>(v));

Change to:

 if (disposable->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order_acq_rel))
     disposable->handle_observable(std::forward<T>(v));
 else
     disposable->get_queue()->push(std::forward<T>(v));

141-146: Review the use of strong memory order in compare_exchange_strong

Similar to earlier, in the on_completed method, the use of std::memory_order::seq_cst may be overly strict. Assess whether a weaker memory order can be utilized to enhance performance while maintaining correctness.

Consider modifying the memory order:

 if (disposable->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst))
     return;

Change to:

 if (disposable->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order_acq_rel))
     return;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 9aa557c and 97594ce.

📒 Files selected for processing (1)
  • src/rpp/rpp/operators/concat.hpp (4 hunks)
🧰 Additional context used
📓 Learnings (1)
src/rpp/rpp/operators/concat.hpp (1)
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#673
File: src/rpp/rpp/operators/concat.hpp:59-59
Timestamp: 2024-11-04T19:42:15.257Z
Learning: In the `concat_disposable` class in `src/rpp/rpp/operators/concat.hpp`, within the `drain` method, it's acceptable to call `get_observer()->on_completed()` even when `is_disposed()` returns true, because `refcounted.dispose()` could cause disposing of `this` object.
🔇 Additional comments (1)
src/rpp/rpp/operators/concat.hpp (1)

60-62: Ensure thread-safe access when checking and modifying stage

In the drain method, when checking if there are no more observables and updating the stage, ensure that the operations are thread-safe to prevent race conditions.

To confirm thread safety, run the following script to analyze the use of atomic operations on m_stage:

✅ Verification successful

Let me gather more context about the implementation to verify thread safety comprehensively.


Let me gather more specific context about the drain method implementation in the concat operator.


Let me check the implementation of get_observable() and related methods to fully understand the thread safety.


Let me check the implementation of pointer_under_lock to understand the thread safety guarantees it provides.


Based on the gathered information, I can now provide the final response.

Thread safety is properly implemented in the concat operator's drain method

The implementation already ensures thread-safe access through multiple mechanisms:

  1. The observer and queue are protected using value_with_mutex, accessed via pointer_under_lock which provides RAII-style mutex locking
  2. Stage transitions are handled using atomic operations with appropriate memory orders:
    • store(ConcatStage::None, std::memory_order::relaxed) for simple state updates
    • compare_exchange_strong with std::memory_order::seq_cst for critical state transitions
  3. The member variables are properly synchronized:
    • m_observer and m_queue use mutex-based synchronization
    • m_stage is atomic
    • m_child_disposables access is synchronized through the stage transitions

The original review comment's concern about thread safety has been addressed in the implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Search for all accesses to m_stage to verify consistent use of atomic operations.

# Expected: All accesses should use atomic operations with appropriate memory orders.

rg --type-add 'cpp:hpp' --files-with-matches 'm_stage' | xargs grep -n 'm_stage'

Length of output: 218


Script:

#!/bin/bash
# Get more context around the m_stage usage and the drain method implementation
rg -B 5 -A 10 "drain\(" --type cpp

# Also check for atomic operations and synchronization mechanisms
ast-grep --pattern 'std::atomic<$_> $_'

Length of output: 24253


Script:

#!/bin/bash
# Get the full implementation context of the concat operator's drain method
rg -B 10 -A 20 "void drain\(\)" src/rpp/rpp/operators/concat.hpp

Length of output: 1164


Script:

#!/bin/bash
# Search for get_observable implementation and related methods
rg -B 5 -A 10 "get_observable\(\)" src/rpp/rpp/operators/concat.hpp

# Also check the ConcatStage enum definition
rg -B 2 -A 5 "enum.*ConcatStage" src/rpp/rpp/operators/concat.hpp

Length of output: 1299


Script:

#!/bin/bash
# Search for pointer_under_lock implementation
rg -B 5 -A 15 "class pointer_under_lock" --type cpp

# Also check if there are any other synchronization mechanisms in the concat operator
rg -B 3 -A 3 "mutex|lock" src/rpp/rpp/operators/concat.hpp

Length of output: 2765

Comment on lines +129 to +132
void on_error(const std::exception_ptr& err) const
{
locally_disposed = true;
disposable->get_observer()->on_error(err);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid potential memory leaks by handling exceptions properly

In the on_error method, after setting locally_disposed to true, consider disposing of any associated disposables to prevent potential memory leaks.

Apply this diff to dispose of the inner child disposable:

 void on_error(const std::exception_ptr& err) const
 {
     locally_disposed = true;
+    disposable->get_inner_child_disposable().dispose();
     disposable->get_observer()->on_error(err);
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +91 to +95
void composite_dispose_impl(interface_disposable::Mode) noexcept override
{
for (auto& d : m_child_disposables)
d.dispose();
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure proper disposal by invoking base class composite_dispose_impl

In the overridden composite_dispose_impl method, consider calling the base class implementation to ensure that any additional disposal logic in rpp::composite_disposable is executed. This ensures comprehensive resource cleanup and guards against potential future changes that may introduce disposal logic in the base class.

Apply this diff to include the base class disposal:

 void composite_dispose_impl(interface_disposable::Mode mode) noexcept override
 {
     for (auto& d : m_child_disposables)
         d.dispose();
+    rpp::composite_disposable::composite_dispose_impl(mode);
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void composite_dispose_impl(interface_disposable::Mode) noexcept override
{
for (auto& d : m_child_disposables)
d.dispose();
}
void composite_dispose_impl(interface_disposable::Mode mode) noexcept override
{
for (auto& d : m_child_disposables)
d.dispose();
rpp::composite_disposable::composite_dispose_impl(mode);
}

@github-actions

github-actions Bot commented Nov 7, 2024

Copy link
Copy Markdown
Contributor

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 298.80 ns 1.85 ns 1.54 ns 1.20
Subscribe empty callbacks to empty observable via pipe operator 306.95 ns 1.85 ns 1.54 ns 1.20

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 687.63 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 1044.33 ns 3.43 ns 3.43 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 2248.04 ns 111.12 ns 123.73 ns 0.90
defer from array of 1 - defer + create + subscribe + immediate 718.79 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2136.23 ns 59.23 ns 59.19 ns 1.00
interval - interval + take(3) + subscribe + current_thread 2968.39 ns 32.42 ns 32.40 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 27802.58 ns 27672.21 ns 28243.23 ns 0.98
from array of 1000 - create + as_blocking + subscribe + new_thread 40073.18 ns 52648.00 ns 49260.17 ns 1.07
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3503.08 ns 130.80 ns 138.01 ns 0.95

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1129.91 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 844.18 ns 0.31 ns 0.31 ns 1.01
immediate_just(1,2)+skip(1)+subscribe 990.30 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 853.43 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1247.01 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 953.06 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1110.45 ns 17.90 ns 18.22 ns 0.98
immediate_just(1,2,3)+element_at(1)+subscribe 830.07 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 283.12 ns 0.47 ns 0.62 ns 0.76
current_thread scheduler create worker + schedule 383.14 ns 4.32 ns 4.63 ns 0.93
current_thread scheduler create worker + schedule + recursive schedule 809.39 ns 60.79 ns 61.11 ns 0.99

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 863.17 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 895.74 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2333.16 ns 144.17 ns 144.35 ns 1.00
immediate_just+buffer(2)+subscribe 1568.56 ns 13.59 ns 13.90 ns 0.98
immediate_just+window(2)+subscribe + subscsribe inner 2398.10 ns 1327.71 ns 1372.13 ns 0.97

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 859.51 ns - - 0.00
immediate_just+take_while(true)+subscribe 843.57 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1993.11 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3455.68 ns 169.96 ns 152.08 ns 1.12
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3634.66 ns 187.87 ns 155.52 ns 1.21
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 131.97 ns 138.82 ns 0.95
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3523.98 ns 1184.21 ns 1119.03 ns 1.06
immediate_just(1) + zip(immediate_just(2)) + subscribe 2117.38 ns 208.29 ns 221.27 ns 0.94
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 3199.98 ns 238.11 ns 1346.05 ns 0.18

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 34.46 ns 14.66 ns 14.60 ns 1.00
subscribe 100 observers to publish_subject 199414.67 ns 15702.20 ns 15701.14 ns 1.00
100 on_next to 100 observers to publish_subject 27065.64 ns 17270.51 ns 20189.50 ns 0.86

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1417.24 ns 12.65 ns 12.96 ns 0.98
basic sample with immediate scheduler 1377.76 ns 5.24 ns 5.55 ns 0.94

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 900.03 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2049.20 ns 979.15 ns 994.45 ns 0.98
create(on_error())+retry(1)+subscribe 628.35 ns 115.00 ns 112.09 ns 1.03

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 1207.96 ns 0.61 ns 1.30 ns 0.47
Subscribe empty callbacks to empty observable via pipe operator 976.23 ns 0.52 ns 1.44 ns 0.36

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1923.07 ns 0.23 ns 0.36 ns 0.65
from array of 1 - create + subscribe + current_thread 2440.97 ns 33.41 ns 46.72 ns 0.72
concat_as_source of just(1 immediate) create + subscribe 8692.03 ns 418.44 ns 408.28 ns 1.02
defer from array of 1 - defer + create + subscribe + immediate 3767.98 ns 0.53 ns 0.26 ns 2.03
interval - interval + take(3) + subscribe + immediate 6178.32 ns 145.78 ns 139.89 ns 1.04
interval - interval + take(3) + subscribe + current_thread 7958.06 ns 112.81 ns 113.04 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 84067.14 ns 84161.73 ns 110933.60 ns 0.76
from array of 1000 - create + as_blocking + subscribe + new_thread 97026.18 ns 90960.27 ns 110597.90 ns 0.82
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 17675.29 ns 630.15 ns 384.93 ns 1.64

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 5262.21 ns 0.67 ns 0.31 ns 2.18
immediate_just+filter(true)+subscribe 3467.06 ns 0.43 ns 0.32 ns 1.35
immediate_just(1,2)+skip(1)+subscribe 3821.54 ns 0.47 ns 0.32 ns 1.51
immediate_just(1,1,2)+distinct_until_changed()+subscribe 3556.96 ns 0.64 ns 0.61 ns 1.04
immediate_just(1,2)+first()+subscribe 5993.05 ns 0.35 ns 0.31 ns 1.15
immediate_just(1,2)+last()+subscribe 4872.80 ns 0.57 ns 0.32 ns 1.79
immediate_just+take_last(1)+subscribe 5993.34 ns 0.69 ns 0.31 ns 2.25
immediate_just(1,2,3)+element_at(1)+subscribe 4060.25 ns 0.62 ns 0.31 ns 1.98

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 1090.59 ns 1.91 ns 0.53 ns 3.58
current_thread scheduler create worker + schedule 1931.42 ns 53.56 ns 36.37 ns 1.47
current_thread scheduler create worker + schedule + recursive schedule 3580.25 ns 282.43 ns 251.50 ns 1.12

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 3321.27 ns 5.16 ns 4.95 ns 1.04
immediate_just+scan(10, std::plus)+subscribe 4268.37 ns 0.77 ns 0.64 ns 1.19
immediate_just+flat_map(immediate_just(v*2))+subscribe 8973.47 ns 743.46 ns 523.03 ns 1.42
immediate_just+buffer(2)+subscribe 4964.98 ns 123.42 ns 93.31 ns 1.32
immediate_just+window(2)+subscribe + subscsribe inner 10245.57 ns 3989.42 ns 3189.36 ns 1.25

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 3287.83 ns - - 0.00
immediate_just+take_while(true)+subscribe 3081.21 ns 0.55 ns 0.27 ns 2.07

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 8556.78 ns 6.40 ns 6.22 ns 1.03

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 13501.28 ns 710.99 ns 444.95 ns 1.60
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 14443.21 ns 407.93 ns 434.00 ns 0.94
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 589.35 ns 471.19 ns 1.25
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 11636.00 ns 3018.21 ns 2160.15 ns 1.40
immediate_just(1) + zip(immediate_just(2)) + subscribe 10549.80 ns 1584.51 ns 1310.59 ns 1.21
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 9860.01 ns 1268.38 ns 2956.42 ns 0.43

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 124.24 ns 95.30 ns 54.90 ns 1.74
subscribe 100 observers to publish_subject 567334.00 ns 64367.89 ns 46789.48 ns 1.38
100 on_next to 100 observers to publish_subject 131695.62 ns 35893.71 ns 22267.59 ns 1.61

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 6186.60 ns 146.13 ns 74.57 ns 1.96
basic sample with immediate scheduler 3610.30 ns 33.90 ns 21.94 ns 1.54

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 5603.80 ns 0.48 ns 0.29 ns 1.62

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 20227.50 ns 11552.40 ns 5604.16 ns 2.06
create(on_error())+retry(1)+subscribe 4313.67 ns 606.78 ns 286.56 ns 2.12

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 267.39 ns 0.64 ns 1.55 ns 0.41
Subscribe empty callbacks to empty observable via pipe operator 265.65 ns 0.63 ns 1.54 ns 0.41

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 562.04 ns 0.31 ns 0.42 ns 0.73
from array of 1 - create + subscribe + current_thread 802.43 ns 4.01 ns 4.01 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 2333.39 ns 128.68 ns 130.11 ns 0.99
defer from array of 1 - defer + create + subscribe + immediate 779.19 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2206.55 ns 58.35 ns 58.31 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3202.65 ns 30.88 ns 30.88 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 30792.14 ns 28574.28 ns 29929.21 ns 0.95
from array of 1000 - create + as_blocking + subscribe + new_thread 43218.82 ns 39310.67 ns 43192.83 ns 0.91
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3660.16 ns 147.99 ns 148.10 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1168.28 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 836.75 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1098.48 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 867.86 ns 0.31 ns 0.62 ns 0.50
immediate_just(1,2)+first()+subscribe 1388.47 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 997.29 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1205.67 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2,3)+element_at(1)+subscribe 859.11 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 282.60 ns 1.54 ns 0.63 ns 2.43
current_thread scheduler create worker + schedule 390.64 ns 4.02 ns 4.32 ns 0.93
current_thread scheduler create worker + schedule + recursive schedule 854.47 ns 54.93 ns 55.55 ns 0.99

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 844.69 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 961.84 ns 0.62 ns 0.31 ns 2.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2236.52 ns 144.18 ns 140.15 ns 1.03
immediate_just+buffer(2)+subscribe 1527.45 ns 14.20 ns 13.59 ns 1.05
immediate_just+window(2)+subscribe + subscsribe inner 2499.79 ns 934.21 ns 893.72 ns 1.05

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 841.44 ns - - 0.00
immediate_just+take_while(true)+subscribe 858.73 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 2019.97 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3351.43 ns 157.42 ns 156.03 ns 1.01
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3756.55 ns 141.60 ns 139.21 ns 1.02
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 139.38 ns 142.09 ns 0.98
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3387.04 ns 834.99 ns 827.82 ns 1.01
immediate_just(1) + zip(immediate_just(2)) + subscribe 2218.68 ns 204.45 ns 202.16 ns 1.01
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 3225.25 ns 222.23 ns 959.26 ns 0.23

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 52.28 ns 17.74 ns 17.76 ns 1.00
subscribe 100 observers to publish_subject 217406.75 ns 15989.90 ns 16121.65 ns 0.99
100 on_next to 100 observers to publish_subject 43875.12 ns 20615.45 ns 23591.16 ns 0.87

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1312.66 ns 11.42 ns 12.04 ns 0.95
basic sample with immediate scheduler 1288.47 ns 6.17 ns 5.86 ns 1.05

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 985.80 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2182.82 ns 1163.42 ns 1176.18 ns 0.99
create(on_error())+retry(1)+subscribe 657.83 ns 139.75 ns 139.59 ns 1.00

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 559.43 ns 1.85 ns 1.85 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 580.28 ns 1.85 ns 1.85 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1161.64 ns 5.55 ns 5.24 ns 1.06
from array of 1 - create + subscribe + current_thread 1434.92 ns 15.74 ns 15.45 ns 1.02
concat_as_source of just(1 immediate) create + subscribe 3715.20 ns 172.54 ns 170.46 ns 1.01
defer from array of 1 - defer + create + subscribe + immediate 1195.81 ns 5.55 ns 5.24 ns 1.06
interval - interval + take(3) + subscribe + immediate 3358.97 ns 140.97 ns 140.86 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3412.06 ns 59.93 ns 62.47 ns 0.96
from array of 1 - create + as_blocking + subscribe + new_thread 119255.56 ns 110930.00 ns 114730.00 ns 0.97
from array of 1000 - create + as_blocking + subscribe + new_thread 126687.50 ns 128966.67 ns 132100.00 ns 0.98
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 5450.26 ns 201.31 ns 204.70 ns 0.98

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1851.21 ns 19.73 ns 19.74 ns 1.00
immediate_just+filter(true)+subscribe 1631.27 ns 18.80 ns 18.80 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1715.60 ns 18.50 ns 18.50 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 1347.94 ns 23.46 ns 23.44 ns 1.00
immediate_just(1,2)+first()+subscribe 2393.71 ns 17.29 ns 17.28 ns 1.00
immediate_just(1,2)+last()+subscribe 1717.23 ns 18.51 ns 18.52 ns 1.00
immediate_just+take_last(1)+subscribe 1996.91 ns 64.04 ns 64.38 ns 0.99
immediate_just(1,2,3)+element_at(1)+subscribe 1637.90 ns 21.91 ns 21.91 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 479.21 ns 4.01 ns 4.32 ns 0.93
current_thread scheduler create worker + schedule 648.80 ns 11.62 ns 11.70 ns 0.99
current_thread scheduler create worker + schedule + recursive schedule 1336.90 ns 103.85 ns 102.07 ns 1.02

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 1330.00 ns 18.82 ns 18.82 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 1437.40 ns 21.27 ns 20.97 ns 1.01
immediate_just+flat_map(immediate_just(v*2))+subscribe 3822.39 ns 173.96 ns 180.34 ns 0.96
immediate_just+buffer(2)+subscribe 2327.21 ns 63.78 ns 65.04 ns 0.98
immediate_just+window(2)+subscribe + subscsribe inner 3994.05 ns 1277.63 ns 1320.02 ns 0.97

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 1322.07 ns 17.57 ns 17.57 ns 1.00
immediate_just+take_while(true)+subscribe 1323.17 ns 18.81 ns 18.81 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 3097.94 ns 11.11 ns 11.10 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 5017.51 ns 194.98 ns 201.76 ns 0.97
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 5450.00 ns 181.34 ns 186.19 ns 0.97
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 203.99 ns 196.20 ns 1.04
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 6087.43 ns 946.84 ns 950.84 ns 1.00
immediate_just(1) + zip(immediate_just(2)) + subscribe 3785.26 ns 508.83 ns 519.14 ns 0.98
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 4843.56 ns 337.97 ns 1172.56 ns 0.29

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 36.76 ns 20.09 ns 20.08 ns 1.00
subscribe 100 observers to publish_subject 264525.00 ns 27064.29 ns 27061.90 ns 1.00
100 on_next to 100 observers to publish_subject 51554.55 ns 35762.07 ns 35741.38 ns 1.00

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1855.98 ns 95.87 ns 98.19 ns 0.98
basic sample with immediate scheduler 1883.50 ns 68.49 ns 69.04 ns 0.99

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1448.74 ns 19.43 ns 19.42 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 1916.03 ns 367.68 ns 362.69 ns 1.01
create(on_error())+retry(1)+subscribe 1177.52 ns 140.30 ns 138.74 ns 1.01

@codecov

codecov Bot commented Nov 7, 2024

Copy link
Copy Markdown

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 98.47%. Comparing base (9aa557c) to head (e39410f).
Report is 1 commits behind head on v2.

Additional details and impacted files
@@           Coverage Diff           @@
##               v2     #677   +/-   ##
=======================================
  Coverage   98.47%   98.47%           
=======================================
  Files         155      155           
  Lines        9695     9723   +28     
=======================================
+ Hits         9547     9575   +28     
  Misses        148      148           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (1)
src/tests/rpp/test_concat.cpp (1)

230-258: Consider adding tests for concurrent completion scenarios

While the current test additions provide good coverage for sequential completion scenarios, given that this is a reactive library, it might be valuable to add tests that verify completion behavior under concurrent conditions:

  1. Simultaneous completion of multiple inner observables
  2. Race conditions between base and inner completions
  3. Completion during subscription/unsubscription

Would you like me to help draft concurrent test scenarios?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 97594ce and e39410f.

📒 Files selected for processing (2)
  • src/rpp/rpp/operators/concat.hpp (4 hunks)
  • src/tests/rpp/test_concat.cpp (2 hunks)
🧰 Additional context used
📓 Learnings (1)
src/rpp/rpp/operators/concat.hpp (1)
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#673
File: src/rpp/rpp/operators/concat.hpp:59-59
Timestamp: 2024-11-04T19:42:15.257Z
Learning: In the `concat_disposable` class in `src/rpp/rpp/operators/concat.hpp`, within the `drain` method, it's acceptable to call `get_observer()->on_completed()` even when `is_disposed()` returns true, because `refcounted.dispose()` could cause disposing of `this` object.
🔇 Additional comments (7)
src/rpp/rpp/operators/concat.hpp (5)

37-38: LGTM! Improved design with better separation of concerns

The changes enhance the design by:

  • Using composition over inheritance with composite_disposable
  • Providing clear separation between base and inner child disposables
  • Enabling proper self-referential wrapper pattern with enable_wrapper_from_this

Also applies to: 77-78, 111-111


128-131: Improve error handling in concat_inner_observer_strategy

The error handling could be improved by ensuring proper cleanup of resources before propagating the error.


90-94: Ensure proper disposal chain by calling base implementation

The composite_dispose_impl override should call the base class implementation to ensure proper cleanup.


175-177: 🛠️ Refactor suggestion

Consider resource cleanup in error handling

The on_error method should consider cleaning up any queued observables before propagating the error to prevent resource leaks.

Apply this diff to improve error handling:

 void on_error(const std::exception_ptr& err) const
 {
+    disposable->get_queue()->clear();  // Clear any queued observables
     disposable->get_observer()->on_error(err);
 }

Likely invalid or redundant comment.


51-65: Consider potential race condition in drain method

The while (!is_disposed()) check followed by operations might be susceptible to a race condition. The disposed state could change between the check and the subsequent operations.

Consider using a more robust synchronization mechanism. Let's verify the impact:

src/tests/rpp/test_concat.cpp (2)

20-20: LGTM: Include added for empty observable support

The addition of <rpp/sources/empty.hpp> is appropriate as it's used in the new test cases for completion behavior.


230-258: LGTM: Comprehensive completion behavior test cases

The new test cases thoroughly verify the completion behavior of the concat operator in various scenarios:

  1. Base subject completion
  2. Inner observable completion followed by base completion
  3. Base completion followed by inner observable completion

These tests align well with the refactoring of concat_disposable mentioned in the PR summary.

Let's verify test coverage for error scenarios:

@AlexInLog

Copy link
Copy Markdown
Owner Author

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (1)

src/tests/rpp/test_concat.cpp (1)> 230-258: Consider adding tests for concurrent completion scenarios

While the current test additions provide good coverage for sequential completion scenarios, given that this is a reactive library, it might be valuable to add tests that verify completion behavior under concurrent conditions:

  1. Simultaneous completion of multiple inner observables
  2. Race conditions between base and inner completions
  3. Completion during subscription/unsubscription

Would you like me to help draft concurrent test scenarios?

📜 Review details

Yes, draft these scenarios please

@sonarqubecloud

Copy link
Copy Markdown

@AlexInLog AlexInLog merged commit 1726657 into v2 Nov 10, 2024
@AlexInLog AlexInLog deleted the refactor_concat branch November 10, 2024 16:18
This was referenced Nov 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant