Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 57 additions & 55 deletions src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@

#include <rpp/operators/fwd.hpp>

#include <rpp/disposables/refcount_disposable.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/utils/utils.hpp>

#include <array>
#include <cassert>
#include <queue>

Expand All @@ -34,7 +34,8 @@ namespace rpp::operators::details
};

template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
class concat_disposable final : public rpp::refcount_disposable
class concat_disposable final : public rpp::composite_disposable
, public rpp::details::enable_wrapper_from_this<concat_disposable<TObservable, TObserver>>
{
public:
concat_disposable(TObserver&& observer)
Expand All @@ -47,44 +48,51 @@ namespace rpp::operators::details

std::atomic<ConcatStage>& stage() { return m_stage; }

void drain(rpp::composite_disposable_wrapper refcounted)
void drain()
{
while (!is_disposed())
{
const auto observable = get_observable();
if (!observable)
{
stage().store(ConcatStage::None, std::memory_order::relaxed);
refcounted.dispose();
if (is_disposed())
if (get_base_child_disposable().is_disposed())
get_observer()->on_completed();
return;
}

if (handle_observable_impl(observable.value(), refcounted))
if (handle_observable_impl(observable.value()))
return;
}
}

void handle_observable(const rpp::constraint::decayed_same_as<TObservable> auto& observable, rpp::composite_disposable_wrapper refcounted)
void handle_observable(const rpp::constraint::decayed_same_as<TObservable> auto& observable)
{
if (handle_observable_impl(observable, refcounted))
if (handle_observable_impl(observable))
return;

drain(refcounted);
drain();
}

rpp::composite_disposable& get_base_child_disposable() { return m_child_disposables[0]; }
rpp::composite_disposable& get_inner_child_disposable() { return m_child_disposables[1]; }

private:
bool handle_observable_impl(const rpp::constraint::decayed_same_as<TObservable> auto& observable, rpp::composite_disposable_wrapper refcounted)
bool handle_observable_impl(const rpp::constraint::decayed_same_as<TObservable> auto& observable)
{
stage().store(ConcatStage::Draining, std::memory_order::relaxed);
observable.subscribe(concat_inner_observer_strategy<TObservable, TObserver>{disposable_wrapper_impl<concat_disposable>{wrapper_from_this()}.lock(), std::move(refcounted)});
observable.subscribe(concat_inner_observer_strategy<TObservable, TObserver>{disposable_wrapper_impl<concat_disposable>{this->wrapper_from_this()}.lock()});

ConcatStage current = ConcatStage::Draining;
return stage().compare_exchange_strong(current, ConcatStage::Processing, std::memory_order::seq_cst);
}

private:
void composite_dispose_impl(interface_disposable::Mode) noexcept override
{
for (auto& d : m_child_disposables)
d.dispose();
}
Comment on lines +90 to +94

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure proper disposal by invoking base class composite_dispose_impl

In the overridden composite_dispose_impl method, consider calling the base class implementation to ensure that any additional disposal logic in rpp::composite_disposable is executed. This ensures comprehensive resource cleanup and guards against potential future changes that may introduce disposal logic in the base class.

Apply this diff to include the base class disposal:

 void composite_dispose_impl(interface_disposable::Mode mode) noexcept override
 {
     for (auto& d : m_child_disposables)
         d.dispose();
+    rpp::composite_disposable::composite_dispose_impl(mode);
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void composite_dispose_impl(interface_disposable::Mode) noexcept override
{
for (auto& d : m_child_disposables)
d.dispose();
}
void composite_dispose_impl(interface_disposable::Mode mode) noexcept override
{
for (auto& d : m_child_disposables)
d.dispose();
rpp::composite_disposable::composite_dispose_impl(mode);
}


std::optional<TObservable> get_observable()
{
auto queue = get_queue();
Expand All @@ -99,92 +107,86 @@ namespace rpp::operators::details
rpp::utils::value_with_mutex<TObserver> m_observer;
rpp::utils::value_with_mutex<std::queue<TObservable>> m_queue;
std::atomic<ConcatStage> m_stage{};
};

template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
struct concat_observer_strategy_base
{
concat_observer_strategy_base(std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable, rpp::composite_disposable_wrapper refcounted)
: disposable{std::move(disposable)}
, refcounted{std::move(refcounted)}
{
}

concat_observer_strategy_base(std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable)
: concat_observer_strategy_base{disposable, disposable->add_ref()}
{
}

std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable;
rpp::composite_disposable_wrapper refcounted;

void on_error(const std::exception_ptr& err) const
{
disposable->get_observer()->on_error(err);
}

void set_upstream(const disposable_wrapper& d) const { refcounted.add(d); }

bool is_disposed() const { return refcounted.is_disposed(); }
std::array<rpp::composite_disposable, 2> m_child_disposables{};
};

template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
struct concat_inner_observer_strategy : public concat_observer_strategy_base<TObservable, TObserver>
struct concat_inner_observer_strategy
{
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;

using base = concat_observer_strategy_base<TObservable, TObserver>;
using base::concat_observer_strategy_base;
std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable{};
mutable bool locally_disposed{};

template<typename T>
void on_next(T&& v) const
{
base::disposable->get_observer()->on_next(std::forward<T>(v));
disposable->get_observer()->on_next(std::forward<T>(v));
}

void on_error(const std::exception_ptr& err) const
{
locally_disposed = true;
disposable->get_observer()->on_error(err);
Comment on lines +128 to +131

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid potential memory leaks by handling exceptions properly

In the on_error method, after setting locally_disposed to true, consider disposing of any associated disposables to prevent potential memory leaks.

Apply this diff to dispose of the inner child disposable:

 void on_error(const std::exception_ptr& err) const
 {
     locally_disposed = true;
+    disposable->get_inner_child_disposable().dispose();
     disposable->get_observer()->on_error(err);
 }

Committable suggestion skipped: line range outside the PR's diff.

}

void on_completed() const
{
base::refcounted.clear();
locally_disposed = true;
disposable->get_inner_child_disposable().clear();

ConcatStage current{ConcatStage::Draining};
if (base::disposable->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst))
if (disposable->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst))
return;

assert(current == ConcatStage::Processing);

base::disposable->drain(base::refcounted);
disposable->drain();
}

void set_upstream(const disposable_wrapper& d) const { disposable->get_inner_child_disposable().add(d); }

bool is_disposed() const { return locally_disposed || disposable->get_inner_child_disposable().is_disposed(); }
};

template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
struct concat_observer_strategy : public concat_observer_strategy_base<TObservable, TObserver>
struct concat_observer_strategy
{
using base = concat_observer_strategy_base<TObservable, TObserver>;

static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;

std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable;

concat_observer_strategy(TObserver&& observer)
: base{init_state(std::move(observer))}
: disposable{init_state(std::move(observer))}
{
}

template<typename T>
void on_next(T&& v) const
{
ConcatStage current = ConcatStage::None;
if (base::disposable->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst))
base::disposable->handle_observable(std::forward<T>(v), base::disposable->add_ref());
if (disposable->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst))
disposable->handle_observable(std::forward<T>(v));
else
base::disposable->get_queue()->push(std::forward<T>(v));
disposable->get_queue()->push(std::forward<T>(v));
}

void on_error(const std::exception_ptr& err) const
{
disposable->get_observer()->on_error(err);
}

void on_completed() const
{
base::refcounted.dispose();
if (base::disposable->is_disposed())
base::disposable->get_observer()->on_completed();
disposable->get_base_child_disposable().dispose();
if (disposable->stage() == ConcatStage::None)
disposable->get_observer()->on_completed();
}

void set_upstream(const disposable_wrapper& d) const { disposable->get_base_child_disposable().add(d); }

bool is_disposed() const { return disposable->get_base_child_disposable().is_disposed(); }

private:
static std::shared_ptr<concat_disposable<TObservable, TObserver>> init_state(TObserver&& observer)
Expand Down
30 changes: 30 additions & 0 deletions src/tests/rpp/test_concat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <rpp/schedulers/immediate.hpp>
#include <rpp/sources/concat.hpp>
#include <rpp/sources/create.hpp>
#include <rpp/sources/empty.hpp>
#include <rpp/sources/just.hpp>
#include <rpp/subjects/publish_subject.hpp>

Expand Down Expand Up @@ -226,6 +227,35 @@ TEST_CASE_TEMPLATE("concat", TestType, rpp::memory_model::use_stack, rpp::memory
test([](auto&&... vals) {
return rpp::source::just(std::forward<decltype(vals)>(vals).as_dynamic()...) | rpp::ops::concat();
});
SUBCASE("concat completes right")
{
rpp::subjects::publish_subject<rpp::dynamic_observable<int>> subj{};

subj.get_observable() | rpp::ops::concat() | rpp::ops::subscribe(mock);
SUBCASE("on_completed from base")
{
REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);
subj.get_observer().on_completed();
}

SUBCASE("on_completed from inner + then from base")
{
subj.get_observer().on_next(rpp::source::empty<int>());

REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);
subj.get_observer().on_completed();
}

SUBCASE("on_completed from base + then from inner")
{
rpp::subjects::publish_subject<int> inner{};
subj.get_observer().on_next(inner.get_observable());
subj.get_observer().on_completed();

REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);
inner.get_observer().on_completed();
}
}
}
}

Expand Down