diff --git a/CMakeLists.txt b/CMakeLists.txt index fa12d47..2e66488 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -29,13 +29,17 @@ include(FetchContent) FetchContent_Declare( task - # FETCHCONTENT_SOURCE_DIR_TASK ${CMAKE_SOURCE_DIR}/../task + # SOURCE_DIR ${CMAKE_SOURCE_DIR}/../task GIT_REPOSITORY https://github.com/bemanproject/task - GIT_TAG 16a5916 + GIT_TAG 6ff5b89 FIND_PACKAGE_ARGS 0.2.0 EXACT NAMES beman.task COMPONENTS task_headers ) FetchContent_MakeAvailable(task) +include(CheckLibraryExists) +set(POSTGRESROOT /Library/PostgreSQL/18) +check_library_exists(pq PQsocket ${POSTGRESROOT}/lib HAS_POSTGRES) + add_subdirectory(src/beman/net) #=============================================================================== @@ -52,6 +56,7 @@ if(BEMAN_NET_BUILD_TESTS) add_subdirectory(tests/beman/net) endif() -if(BEMAN_NET_BUILD_EXAMPLES) +if(${BEMAN_NET_BUILD_EXAMPLES}) + message(NOTICE "Building examples") add_subdirectory(examples) endif() diff --git a/Makefile b/Makefile index 1bc894c..70bcb83 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception SANITIZERS = none debug msan asan usan tsan -.PHONY: default gcc clang run update check ce todo distclean clean build test all format $(SANITIZERS) +.PHONY: default gcc clang run update check ce todo distclean clean build test all clang-format format $(SANITIZERS) COMPILER=system CXX_BASE=$(CXX:$(dir $(CXX))%=%) @@ -89,7 +89,7 @@ check: todo: bin/mk-todo.py -format: +clang-format format: git clang-format main clean: diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 3961058..dd3ae61 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -33,7 +33,13 @@ set(EXAMPLES taps task ) + +if(${HAS_POSTGRES}) + list(APPEND EXAMPLES postgres postgres-talk populate-postgres) +endif() set(xEXAMPLES taps) +set(xEXAMPLES memory-stream) +set(xEXAMPLES postgres-talk) foreach(EXAMPLE ${EXAMPLES}) set(EXAMPLE_TARGET beman.net.examples.${EXAMPLE}) @@ -42,6 +48,14 @@ foreach(EXAMPLE ${EXAMPLES}) # XXX target_compile_definitions( ${EXAMPLE_TARGET} PRIVATE BEMAN_NET_USE_URING) endif() target_sources(${EXAMPLE_TARGET} PRIVATE ${EXAMPLE}.cpp) + if(${HAS_POSTGRES}) + target_include_directories( + ${EXAMPLE_TARGET} + PRIVATE ${POSTGRESROOT}/include + ) + target_link_directories(${EXAMPLE_TARGET} PRIVATE ${POSTGRESROOT}/lib) + target_link_libraries(${EXAMPLE_TARGET} PRIVATE pq) + endif() target_link_libraries(${EXAMPLE_TARGET} PRIVATE beman::net_headers) target_link_libraries(${EXAMPLE_TARGET} PRIVATE beman::task_headers) endforeach() diff --git a/examples/demo_algorithm.hpp b/examples/demo_algorithm.hpp index 95369ef..ecb681f 100644 --- a/examples/demo_algorithm.hpp +++ b/examples/demo_algorithm.hpp @@ -141,6 +141,8 @@ struct when_any_t { template struct env; template + struct state_env_base; + template struct state_base; template struct state_value; @@ -183,7 +185,7 @@ struct demo::into_error_t::receiver { template struct demo::into_error_t::sender { using sender_concept = ex::sender_t; - template + template static consteval auto get_completion_signatures() { // static_assert(sizeof...(Env) <= 1u); if constexpr (sizeof...(Env) <= 1) @@ -238,16 +240,24 @@ inline auto demo::into_expected_t::operator()(Sender&& s) const { // ---------------------------------------------------------------------------- -template -struct demo::when_any_t::state_base { - ::std::size_t total{}; - Receiver receiver{}; - ::std::atomic<::std::size_t> done_count{}; - ::std::atomic<::std::size_t> ready_count{}; +template +struct demo::when_any_t::state_env_base { + virtual auto get_receiver_env() const noexcept -> Env = 0; ::demo::ex::inplace_stop_source source{}; +}; +// ---------------------------------------------------------------------------- + +template +struct demo::when_any_t::state_base : demo::when_any_t::state_env_base> { + ::std::size_t total; + Receiver receiver; + ::std::atomic<::std::size_t> done_count{}; + ::std::atomic<::std::size_t> ready_count{}; + + auto get_receiver_env() const noexcept -> ex::env_of_t override { return ex::get_env(this->receiver); } template - state_base(std::size_t tot, R&& rcvr) : total(tot), receiver(::std::forward(rcvr)) {} + state_base(::std::size_t tot, R&& r) : total(tot), receiver(std::forward(r)) {} virtual ~state_base() = default; auto complete() -> bool { if (0u == this->done_count++) { @@ -292,13 +302,18 @@ struct demo::when_any_t::state_value : demo::when_any_t::state_base { // ---------------------------------------------------------------------------- -template +template struct demo::when_any_t::env { - demo::when_any_t::state_base* state; + demo::when_any_t::state_env_base* state; + auto query(const ex::get_stop_token_t&) const noexcept -> ex::inplace_stop_token { return this->state->source.get_token(); } - //-dk:TODO when_any_t::env: set up query forwarding + template + requires requires(const Query& q, const Env& e) { q(e); } + auto query(const Query& q) const noexcept { + return q(this->state->get_receiver_env()); + } }; // ---------------------------------------------------------------------------- @@ -308,7 +323,7 @@ struct demo::when_any_t::receiver { using receiver_concept = ::demo::ex::receiver_t; demo::when_any_t::state_value* state; - auto get_env() const noexcept -> env { return {this->state}; } + auto get_env() const noexcept -> demo::when_any_t::env> { return {this->state}; } template auto set_error(E&& error) && noexcept -> void { if (this->state->complete()) { @@ -360,26 +375,25 @@ template struct demo::when_any_t::sender { ::beman::execution::detail::product_type<::std::remove_cvref_t...> sender; using sender_concept = ex::sender_t; - template + template static consteval auto get_completion_signatures() { - return ::beman::execution::detail::meta::unique< - ::beman::execution::detail::meta::combine())...>>(); + return ::beman::execution::detail::meta::unique<::beman::execution::detail::meta::combine< + decltype(ex::get_completion_signatures>())...>>(); } template - auto connect(Receiver&& receiver) && -> state< - ::std::index_sequence_for, - ::std::remove_cvref_t, - demo::detail::variant_from_list_t())>>>>, - demo::detail::variant_from_list_t())>>, - Sender...> { - return {::std::forward(receiver), ::std::move(this->sender)}; + auto connect(Receiver&& receiver) && { + using completion_signatures = decltype(ex::get_completion_signatures, + decltype(ex::get_env(receiver))>()); + return state< + ::std::index_sequence_for, + ::std::remove_cvref_t, + demo::detail::variant_from_list_t< + ex::detail::transform>>>, + demo::detail::variant_from_list_t>, + Sender...>{::std::forward(receiver), ::std::move(this->sender)}; } }; diff --git a/examples/demo_http.hpp b/examples/demo_http.hpp new file mode 100644 index 0000000..1497f4e --- /dev/null +++ b/examples/demo_http.hpp @@ -0,0 +1,88 @@ +// examples/demo_http.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_EXAMPLES_DEMO_HTTP +#define INCLUDED_EXAMPLES_DEMO_HTTP + +#include +#include +#include +#include "demo_stream.hpp" +#include +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace demo::http { +namespace ex = ::beman::execution; +namespace net = ::beman::net; + +struct request { + std::string method; + std::string url; + std::string version; + std::vector headers; +}; + +template +struct http_client { + Stream stream; + + public: + http_client(Stream s) : stream{std::move(s)} {} + auto request() -> ex::task<> { + std::vector method, url, version; + if (!(co_await stream.read(method, ' '))) { + co_return; + } + std::cout << "read method='" << std::string_view(method) << "'\n"; + + if (!(co_await stream.read(url, ' '))) { + co_return; + } + std::cout << "read url='" << std::string_view(url) << "'\n"; + + if (!(co_await stream.read(version, "\r\n"))) { + co_return; + } + std::cout << "read version='" << std::string_view(version) << "'\n"; + + std::vector header; + while (co_await stream.read(header, "\r\n") && !header.empty()) { + std::cout << "read header line='" << std::string_view(header) << "'\n"; + header.clear(); + } + std::cout << "read HTTP GET request\n"; + + co_return; + } +}; + +struct no_error_env { + using error_types = ::beman::execution::completion_signatures<>; +}; + +template +::beman::execution::task +http_server(::beman::net::io_context& io, unsigned short port, SenderFactory fun) { + ::beman::net::ip::tcp::endpoint ep(::beman::net::ip::address_v4::any(), port); + ::beman::net::ip::tcp::acceptor server(io, ep); + while (true) { + auto [client, addr] = co_await ::beman::net::async_accept(server); + std::cout << "connection from " << addr << "\n"; + fun(demo::http::http_client(demo::tcp_stream(std::move(client)))); + } +} +} // namespace demo::http + +namespace demo { +using http::http_client; +using http::http_server; +} // namespace demo + +// ---------------------------------------------------------------------------- + +#endif diff --git a/examples/demo_stream.hpp b/examples/demo_stream.hpp new file mode 100644 index 0000000..a65c423 --- /dev/null +++ b/examples/demo_stream.hpp @@ -0,0 +1,179 @@ +// examples/demo_stream.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_EXAMPLES_DEMO_STREAM +#define INCLUDED_EXAMPLES_DEMO_STREAM + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace demo::stream { +namespace ex = ::beman::execution; +namespace net = ::beman::net; + +struct tcp { + ::beman::net::ip::tcp::socket socket; + template + auto receive(Buffer buffer) { + return net::async_receive(this->socket, buffer); + } +}; + +struct memory_base { + virtual ~memory_base() = default; + virtual auto add_receive_data(std::string_view data) -> std::size_t = 0; +}; +struct context { + using update_fun = std::function; + std::unordered_map streams; + std::vector> updates; + std::size_t next_index{}; + void next() { + auto& update = updates[next_index]; + memory_base* stream = streams[update.first]; + if (stream == nullptr || update.second(*stream)) { + this->updates.erase(this->updates.begin() + next_index); + } else if (this->updates.size() == ++this->next_index) { + this->next_index = 0u; + } + } + void add(std::string name, update_fun fun) { this->updates.emplace_back(std::move(name), std::move(fun)); } + auto async_run_one() { + return ex::just() | ex::then([this] { this->next(); }); + } + auto async_run() { + return ex::just() | + net::repeat_effect_until(ex::read_env(ex::get_scheduler) | ex::let_value([this](auto sched) { + return ex::schedule(sched) | + ex::let_value([this] { return this->async_run_one(); }); + }), + [this] { return this->updates.empty(); }); + } +}; +struct memory { + template + struct state : memory_base { + using operation_state_concept = ex::operation_state_t; + Receiver receiver; + Buffer buffer; + memory* self; + + template + state(R&& r, B&& b, memory* s) : receiver(std::forward(r)), buffer(std::forward(b)), self(s) {} + auto start() & noexcept { + this->self->receive_state = this; + this->self->ctxt->streams[this->self->name] = this; + } + auto add_receive_data(std::string_view data) -> std::size_t override { + const auto& vec{this->buffer.data()}; + std::size_t n{std::min(data.size(), vec[0].iov_len)}; + std::copy(data.begin(), data.begin() + n, static_cast(vec[0].iov_base)); + this->self->ctxt->streams[this->self->name] = nullptr; + ex::set_value(std::move(this->receiver), n); + return n; + } + }; + template + struct receive_sender { + using sender_concept = ex::sender_t; + template + static consteval auto get_completion_signatures() { + return ex::completion_signatures{}; + } + memory* self; + Buffer buffer; + + template + state, Buffer> connect(Receiver&& receiver) && { + static_assert(ex::operation_state, Buffer>>); + return {std::forward(receiver), std::move(this->buffer), this->self}; + } + }; + template + auto receive(Buffer buffer) { + static_assert(ex::sender>); + return receive_sender{this, std::move(buffer)}; + } + + context* ctxt; + std::string name; + memory_base* receive_state{}; + memory(context& c, std::string n) : ctxt(&c), name(std::move(n)) {} +}; + +template +struct basic_buffered { + using buffer_t = ::std::array; + using buffer_iterator = typename buffer_t::iterator; + + Stream stream; + buffer_t buffer{}; + buffer_iterator it{buffer.begin()}; + buffer_iterator end{buffer.begin()}; + + template + basic_buffered(Args&&... args) : stream(std::forward(args)...) {} + static constexpr std::size_t length(char) { return 1u; } + bool consume(auto& to, char sentinel) { + auto end{::std::find(this->it, this->end, sentinel)}; + to.insert(to.end(), this->it, end); + this->it = end; + return end == this->end; + } + template <::std::size_t Size> + static constexpr std::size_t length(const char (&)[Size]) { + return Size - 1u; + } + template <::std::size_t Size> + bool consume(auto& to, const char (&sentinel)[Size]) { + auto end{::std::search(this->it, this->end, sentinel, sentinel + Size - 1)}; + bool rc(end == this->end); + if (rc) { + end -= std::min(std::size_t(std::distance(this->it, end)), Size); + } + to.insert(to.end(), this->it, end); + this->it = end; + return rc; + } + auto read(auto& to, const auto& sentinel) -> ex::task { + while (this->consume(to, sentinel)) { + if (std::size_t(std::distance(this->end, this->buffer.end())) < this->buffer.size() / 2) { + this->end = std::move(this->it, this->end, this->buffer.begin()); + this->it = this->buffer.begin(); + } + std::size_t n{co_await this->stream.receive(net::buffer(std::string_view(this->end, this->buffer.end())))}; + if (n == 0) { + co_return false; + } + this->end += n; + } + this->it += length(sentinel); + + co_return true; + } +}; +} // namespace demo::stream + +namespace demo { +using demo::stream::basic_buffered; +using tcp_stream = demo::stream::basic_buffered; +using mem_stream = demo::stream::basic_buffered; +using demo::stream::context; +using demo::stream::memory; +using demo::stream::memory_base; +} // namespace demo + +// ---------------------------------------------------------------------------- + +#endif diff --git a/examples/empty.cpp b/examples/empty.cpp index 862272a..48ecab4 100644 --- a/examples/empty.cpp +++ b/examples/empty.cpp @@ -28,6 +28,8 @@ std::unordered_map files{ }; auto main() -> int { + std::cout << std::unitbuf << "hello world\n"; +#if 0 net::io_context context; net::ip::tcp::endpoint ep(net::ip::address_v4::any(), 12345); net::ip::tcp::acceptor server(context, ep); @@ -35,4 +37,5 @@ auto main() -> int { ex::sync_wait(net::async_accept(server)); context.run(); +#endif } diff --git a/examples/memory-stream.cpp b/examples/memory-stream.cpp new file mode 100644 index 0000000..146ecae --- /dev/null +++ b/examples/memory-stream.cpp @@ -0,0 +1,39 @@ +// examples/memory-stream.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include "demo_stream.hpp" +#include "demo_http.hpp" +#include +#include +#include +#include +#include "demo_algorithm.hpp" +#include "demo_http.hpp" +#include +#include +#include + +namespace ex = ::beman::execution; +namespace net = ::beman::net; + +// ---------------------------------------------------------------------------- + +int main() { + std::cout << std::unitbuf; + demo::context context; + std::string request( + "GET /some/url HTTP/1.1\r\nHost: localhost:12345\r\nUser-Agent: memory/0.0\r\nAccept: */*\r\n\r\n"); + context.add("foo", [data = std::string_view(request)](demo::memory_base& s) mutable { + if (std::size_t n{s.add_receive_data(data.substr(0, std::min(std::size_t(2), data.size())))}) { + data = data.substr(n); + return false; + } + return true; + }); + + auto reader{[](auto client) -> ex::task<> { + co_await client.request(); + }(demo::http_client(demo::mem_stream(context, "foo")))}; + + ex::sync_wait(ex::when_all(context.async_run(), std::move(reader))); +} diff --git a/examples/populate-postgres.cpp b/examples/populate-postgres.cpp new file mode 100644 index 0000000..b14c30c --- /dev/null +++ b/examples/populate-postgres.cpp @@ -0,0 +1,54 @@ +// examples/postgres.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#include +#include +#include +#include + +namespace pq { +using connection = std::unique_ptr; +using result = std::unique_ptr; + +struct error { + std::string msg; + error(const connection& conn) : msg(PQerrorMessage(conn.get())) {} + const char* what() const noexcept { return msg.c_str(); }; + friend std::ostream& operator<<(std::ostream& os, const error& err) { return os << err.msg; } +}; +} // namespace pq + +int main() { + std::cout << std::unitbuf; + pq::connection conn(PQconnectdb("user=sruser dbname=sruser")); + + if (PQstatus(conn.get()) != CONNECTION_OK) { + std::cout << "Connection to database failed: " << pq::error(conn) << '\n'; + ; + return 1; + } + const char* const query_version = "SELECT version(), pg_sleep(0)"; + pq::result res(PQexec(conn.get(), query_version)); + if (PQresultStatus(res.get()) != PGRES_TUPLES_OK) { + std::cout << "SELECT failed: " << pq::error(conn) << '\n'; + return 1; + } + std::cout << "PostgreSQL version: " << PQgetvalue(res.get(), 0, 0) << '\n'; + + std::string input("/usr/share/dict/words"); + std::cout << "populating from '" << input << "'\n"; + std::ifstream file(input); + std::string message; + for (std::size_t i{1}; i < 100 && std::getline(file, message); ++i) { + std::ostringstream ins; + ins << "insert into messages (key, message) values(" << i << ", '" << message << "');"; + std::cout << "inserting: " << ins.str() << '\n'; + pq::result res(PQexec(conn.get(), ins.str().c_str())); + if (PQresultStatus(res.get()) != PGRES_COMMAND_OK) { + std::cout << "INSERT failed: " << pq::error(conn) << '\n'; + return 1; + } + } +} diff --git a/examples/postgres-talk.cpp b/examples/postgres-talk.cpp new file mode 100644 index 0000000..df283c2 --- /dev/null +++ b/examples/postgres-talk.cpp @@ -0,0 +1,154 @@ +// examples/postgres-talk.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +// examples/http-server.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#include +#include +#include "demo_algorithm.hpp" +#include "demo_http.hpp" +#include +#include +#include + +namespace ex = beman::execution; +namespace net = beman::net; +using namespace std::chrono_literals; + +// PQconnectdb(const char* connstr) -> PGconn* +// PQexec(const PGconn *conn, const char* query) -> PGresult* - query the database +// PQsendQuery(const PGconn *conn, const char* query) - send a query +// PQconsumeInput(const PGconn *conn) - consume available input, clear socket stat +// PQgetResult(const PGconn *conn) -> PGresult* - get result, return nullptr if no more results, or would block +// PQsetnonblocking(const PGconn *conn, int arg) - set non-blocking mode to avoid write blocks +// PQsocket(const PGconn *conn) - get socket +// PQflush(const PGconn *conn) - flush output buffer, return 1 if still pending data +// PQisBusy(const PGconn *conn) -> int - PQgetResult() would block +// PQsetSingleRowMode(PGconn *conn) - set single row mode, return 0 on failure +// PQsetChunkedMode(PGconn *conn, int arg) - set chunked mode, return 0 on failure + +namespace { +const std::string connection_string("user=sruser dbname=sruser"); +const std::string query("select *, pg_sleep(0.5) from messages where 0 < key and key < 5;"); + +inline constexpr auto print_result{[](const PGresult* result) noexcept { + std::cout << "n=" << PQntuples(result) << ", m=" << PQnfields(result) << "\n"; + for (int i = 0, n = PQntuples(result); i < n; ++i) { + for (int j = 0, m = PQnfields(result); j < m; ++j == m || std::cout << ", ") { + std::cout << PQgetvalue(result, i, j); + } + std::cout << "\n"; + } +}}; +} // namespace + +namespace pg { +struct connection { + std::unique_ptr conn; + net::ip::tcp::socket socket; + connection(net::io_context& io, PGconn* c) + : conn(c ? c : throw std::runtime_error("connection failed")), + socket(io, io.make_socket(PQsocket(conn.get()))) {} + operator PGconn*() const { return conn.get(); } +}; + +struct error { + std::string msg; + friend std::ostream& operator<<(std::ostream& out, const error& e) { return out << e.msg; } +}; + +struct env { + using error_types = ex::completion_signatures; +}; + +struct result { + std::unique_ptr res; + result(PGresult* r) : res(r) {} + operator PGresult*() const noexcept { return res.get(); } +}; + +auto exec2(pg::connection& conn, const char* query) { + return ex::just() | ex::then([&conn, query] noexcept { PQsendQuery(conn, query); }) | + net::repeat_effect_until(net::async_poll(conn.socket, net::event_type::out) | + ex::upon_error([](auto&&) noexcept {}) | ex::then([](auto&&...) noexcept {}), + [&conn] noexcept { return not PQflush(conn); }) | + net::repeat_effect_until(net::async_poll(conn.socket, net::event_type::in) | + ex::upon_error([](auto&&) noexcept {}) | + ex::then([&conn](auto&&...) noexcept { PQconsumeInput(conn); }), + [&conn] noexcept { return !PQisBusy(conn); }) | + ex::then([&conn] noexcept { return pg::result(PQgetResult(conn)); }); +} +ex::task exec(pg::connection& conn, const char* query) { + PQsendQuery(conn, query); + while (PQflush(conn)) { + co_await net::async_poll(conn.socket, net::event_type::out); + } + while (PQisBusy(conn)) { + co_await net::async_poll(conn.socket, net::event_type::in); + if (!PQconsumeInput(conn)) { + co_yield ex::with_error(pg::error(PQerrorMessage(conn))); + } + } + pg::result res(PQgetResult(conn)); + if (!res) { + co_yield ex::with_error(pg::error(PQerrorMessage(conn))); + } + co_return std::move(res); +} +} // namespace pg + +// ---------------------------------------------------------------------------- + +auto main() -> int { + std::cout << std::unitbuf << "Postgres Example\n"; + net::io_context io; + pg::connection conn(io, PQconnectdb(connection_string.c_str())); + ex::counting_scope scope; + auto spawn{ + [&](ex::sender auto s) { ex::spawn(ex::starts_on(io.get_scheduler(), std::move(s)), scope.get_token()); }}; + +#if 0 + spawn(demo::http_server(io, 12345, [&spawn](auto client) noexcept { + std::cout << "got a client\n"; + spawn([](auto client)->ex::task{ + std::cout << "reading request\n"; + co_await client.request(); + std::cout << "client done\n"; + }(std::move(client))); + })); +#endif + + struct io_env { + using scheduler_type = decltype(io.get_scheduler()); + using error_types = ex::completion_signatures<>; + }; + + auto timer{[]() -> ex::task { + while (true) { + std::cout << "time=" << std::chrono::system_clock::now() << "\n"; + co_await net::resume_after(co_await ex::read_env(ex::get_scheduler), 1s); + } + }()}; + + auto request{pg::exec2(conn, query.c_str()) | ex::then(print_result) | + ex::upon_error([](pg::error e) noexcept { std::cout << "database error=" << e << "\n"; })}; + + if constexpr (false) { + spawn(std::move(timer)); + spawn(std::move(request) | ex::then([&scope] noexcept { scope.request_stop(); })); + ex::sync_wait(ex::when_all(io.async_run(), scope.join())); + } else if constexpr (false) { + ex::inplace_stop_source source; + ex::sync_wait(ex::when_all( + io.async_run(), + ex::starts_on(io.get_scheduler(), + ex::write_env(std::move(timer), ex::env{ex::prop{ex::get_stop_token, source.get_token()}})), + std::move(request) | ex::then([&source] noexcept { source.request_stop(); }))); + } else { + ex::sync_wait( + demo::when_any(io.async_run(), std::move(request), ex::starts_on(io.get_scheduler(), std::move(timer)))); + } +} diff --git a/examples/postgres.cpp b/examples/postgres.cpp new file mode 100644 index 0000000..95e6170 --- /dev/null +++ b/examples/postgres.cpp @@ -0,0 +1,162 @@ +// examples/postgres.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ex = beman::execution; +namespace net = beman::net; +using namespace std::chrono_literals; + +namespace pg { +struct connection { + using handle_t = std::unique_ptr; + + handle_t handle; + net::ip::tcp::socket socket; + connection(net::io_context& io, PGconn* conn) : handle(conn), socket(io, io.make_socket(PQsocket(handle.get()))) { + if (PQstatus(conn) != CONNECTION_OK) { + throw std::runtime_error(std::string("Connection to database failed: ") + PQerrorMessage(conn)); + } + } + connection(connection&&) noexcept = default; + + net::ip::tcp::socket& get_socket() { return this->socket; } + operator PGconn*() { return this->handle.get(); } + operator const PGconn*() const { return this->handle.get(); } +}; + +struct error { + std::string msg; + explicit error(const char* m) : msg(m) {} + error(const connection& conn) : msg(PQerrorMessage(conn)) {} + const char* what() const noexcept { return msg.c_str(); }; + friend std::ostream& operator<<(std::ostream& os, const error& err) { return os << err.msg; } +}; + +struct result { + std::unique_ptr res; + explicit result(PGresult* r) : res(r) {} + operator PGresult*() const { return this->res.get(); } +}; + +// PQsetnonblocking(const PGconn *conn, int arg) - set non-blocking mode to avoid write blocks +// PQsocket(const PGconn *conn) - get socket +// PQflush(const PGconn *conn) - flush output buffer, return 1 if still pending data +// PQisBusy(const PGconn *conn) - PQgetResult() would block +// PQconsumeInput(const PGconn *conn) - consume available input, clear socket stat +// PQgetResult(const PGconn *conn) - get result, return nullptr if no more results, or would block +// PQsetSingleRowMode(PGconn *conn) - set single row mode, return 0 on failure +// PQsetChunkedMode(PGconn *conn, int arg) - set chunked mode, return 0 on failure + +struct exec { + using sender_concept = ex::sender_t; + using completion_signatures = ex::completion_signatures; + template + static consteval completion_signatures get_completion_signatures() noexcept { + return {}; + } + + template + struct state { + using operation_state_concept = ex::operation_state_t; + + struct env { + using error_types = ex::completion_signatures; + }; + static ex::task work([[maybe_unused]] connection& conn, + [[maybe_unused]] std::string query) noexcept { + std::cout << "exec.work()\n"; + if (!PQsendQuery(conn, query.c_str())) { + std::cout << "PQsendQuery failed: " << PQerrorMessage(conn) << "\n"; + co_yield ex::with_error(pg::error(conn)); + } + PQflush(conn); + while (PQisBusy(conn)) { + std::cout << "co_awaiting poll\n"; + auto evs = co_await net::async_poll(conn.get_socket(), net::event_type::in); + std::cout << "co_awaiting done=" << evs << "\n"; + if (!PQconsumeInput(conn)) { + std::cout << "PQconsumeInput failed: " << PQerrorMessage(conn) << "\n"; + co_yield ex::with_error(pg::error(conn)); + } + } + if (pg::result res{PQgetResult(conn)}) { + co_return std::move(res); + } else { + co_yield ex::with_error(pg::error(conn)); + } + std::unreachable(); + } + using inner_state_t = + ex::connect_result_t(), std::string{})), Receiver&&>; + + inner_state_t inner_state; + + state(Receiver&& r, connection& conn, std::string query) + : inner_state(ex::connect(work(conn, std::move(query)), std::forward(r))) {} + + auto start() noexcept -> void { ex::start(this->inner_state); } + }; + + template + auto connect(Receiver&& receiver) && noexcept { + return state{std::forward(receiver), conn, std::move(query)}; + } + + exec(connection& conn, std::string query) : conn(conn), query(std::move(query)) {} + connection& conn; + std::string query; +}; + +} // namespace pg + +int main() { + std::cout << std::unitbuf << "PostgreSQL example\n"; + try { + net::io_context io; + pg::connection conn(io, PQconnectdb("user=sruser dbname=sruser")); + ex::counting_scope scope; + + auto spawn{ + [&](ex::sender auto s) { ex::spawn(ex::starts_on(io.get_scheduler(), std::move(s)), scope.get_token()); }}; + + struct noexcept_env { + using error_types = ex::completion_signatures<>; + using scheduler_type = decltype(io.get_scheduler()); + }; + + spawn(std::invoke([]() noexcept -> ex::task { + while (true) { + std::cout << "time=" << std::chrono::system_clock::now() << "\n" << std::flush; + co_await net::resume_after(co_await ex::read_env(ex::get_scheduler), 1s); + } + })); + + std::string query("select *, pg_sleep(1.1) from messages where 0 <= key and key < 3;"); + + spawn(pg::exec(conn, query) | ex::then([](pg::result res) noexcept { + for (int i{}, n{PQntuples(res)}; i < n; ++i) { + for (int j{}, m{PQnfields(res)}; j < m; ++j) { + std::cout << PQgetvalue(res, i, j) << ','; + } + std::cout << '\n'; + } + std::cout << "query done\n" << std::flush; + }) | + ex::upon_error([](pg::error error) noexcept { std::cout << "query error: " << error << "\n"; }) | + ex::then([&scope]() noexcept { scope.request_stop(); })); + + ex::sync_wait(ex::when_all(scope.join(), io.async_run()) | ex::upon_stopped([]() noexcept {})); + } catch (const pg::error& e) { + std::cout << "Error: " << e << '\n'; + } +} diff --git a/examples/postgres.txt b/examples/postgres.txt new file mode 100644 index 0000000..5aa0a14 --- /dev/null +++ b/examples/postgres.txt @@ -0,0 +1,15 @@ +psql -U postgres +> create role sruser password 'sruser' createdb login; +> create database sruser; + +psql -d sruser -U postgres +> create table messages(key integer, message text); +> create unique index msg_index on messages(key); +> grant all on messages to sruser; + +psql -U sruser +> insert into messages (key, message) values(0, 'zero'); + +touch ~/.pgpass +chmod go-rwx ~/.pgpass +echo '*:*:*:sruser:sruser' > ~/.pgpass diff --git a/examples/print_completions.hpp b/examples/print_completions.hpp new file mode 100644 index 0000000..3566c5d --- /dev/null +++ b/examples/print_completions.hpp @@ -0,0 +1,52 @@ +// examples/print_completions.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +template +struct print_completion_signatures_t; +template +struct print_completion_signatures_t> { + void operator()(std::ostream& os) const { ((os << typeid(Signatures).name() << ','), ...); } +}; +template +inline constexpr print_completion_signatures_t print_completion_signatures{}; + +struct print_completions_t { + template + struct sender { + using sender_concept = ex::sender_t; + template + static consteval auto get_completion_signatures() noexcept { + return ex::get_completion_signatures(); + } + + template + struct state { + using operation_state_concept = ex::operation_state_t; + using state_t = ex::connect_result_t; + using env_t = ex::env_of_t; + + state_t state_; + state(Receiver&& r, Sender&& s) + : state_(ex::connect(std::forward(s), std::forward(r))) {} + auto start() noexcept -> void { + std::cout << "completion_signatures<"; + print_completion_signatures())>(std::cout); + std::cout << ">\n"; + ex::start(this->state_); + } + }; + + std::remove_cvref_t sender; + template + auto connect(Receiver&& r) && { + return state{std::forward(r), std::move(sender)}; + } + }; + + template + auto operator()(Sender&& sndr) const { + return sender{std::forward(sndr)}; + } +}; + +[[maybe_unused]] inline constexpr print_completions_t print_completions{}; diff --git a/examples/sync_run.hpp b/examples/sync_run.hpp new file mode 100644 index 0000000..1955ffd --- /dev/null +++ b/examples/sync_run.hpp @@ -0,0 +1,28 @@ +// examples/sync_run.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_EXAMPLES_SYNC_RUN +#define INCLUDED_EXAMPLES_SYNC_RUN + +struct sync_run_env { + ex::run_loop& loop; + auto query(const ex::get_scheduler_t&) const noexcept { return this->loop.get_scheduler(); } +}; +struct sync_run_receiver { + ex::run_loop& loop; + using receiver_concept = ex::receiver_t; + + auto get_env() const noexcept { return sync_run_env{this->loop}; } + auto set_value() noexcept { this->loop.finish(); } +}; +auto sync_run(ex::run_loop& loop, ex::sender auto snd) { + auto state{ex::connect(std::move(snd), sync_run_receiver{loop})}; + ex::start(state); + std::cout << "running loop\n"; + loop.run(); + std::cout << "running loop done\n"; +} + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/net/detail/basic_socket.hpp b/include/beman/net/detail/basic_socket.hpp index 7937cb7..d9353fc 100644 --- a/include/beman/net/detail/basic_socket.hpp +++ b/include/beman/net/detail/basic_socket.hpp @@ -21,8 +21,8 @@ class beman::net::basic_socket : public ::beman::net::socket_base { private: ::beman::net::detail::context_base* d_context; - protocol_type d_protocol{::beman::net::ip::tcp::v6()}; - ::beman::net::detail::socket_id d_id{::beman::net::detail::socket_id::invalid}; + protocol_type d_protocol{::beman::net::ip::tcp::v6()}; //-dk:TODO should initialize based on protocol_type + ::beman::net::detail::socket_id d_id{::beman::net::detail::socket_id::invalid}; public: basic_socket() : d_context(nullptr) {} diff --git a/include/beman/net/detail/basic_socket_acceptor.hpp b/include/beman/net/detail/basic_socket_acceptor.hpp index 2a6fb1c..33cc69d 100644 --- a/include/beman/net/detail/basic_socket_acceptor.hpp +++ b/include/beman/net/detail/basic_socket_acceptor.hpp @@ -59,8 +59,9 @@ class beman::net::basic_socket_acceptor : public ::beman::net::socket_base { } basic_socket_acceptor(::beman::net::io_context&, const protocol_type&, const native_handle_type&); basic_socket_acceptor(const basic_socket_acceptor&) = delete; - basic_socket_acceptor(basic_socket_acceptor&& other) + basic_socket_acceptor(basic_socket_acceptor&& other) noexcept : ::beman::net::socket_base(), + d_context(other.d_context), d_protocol(other.d_protocol), d_id(::std::exchange(other.d_id, ::beman::net::detail::socket_id::invalid)) {} template diff --git a/include/beman/net/detail/basic_stream_socket.hpp b/include/beman/net/detail/basic_stream_socket.hpp index 550f691..9a35969 100644 --- a/include/beman/net/detail/basic_stream_socket.hpp +++ b/include/beman/net/detail/basic_stream_socket.hpp @@ -29,6 +29,8 @@ class beman::net::basic_stream_socket : public basic_socket { basic_stream_socket& operator=(basic_stream_socket&&) = default; basic_stream_socket(::beman::net::detail::context_base* context, ::beman::net::detail::socket_id id) : basic_socket(context, id) {} + basic_stream_socket(::beman::net::io_context& context, ::beman::net::detail::socket_id id) + : basic_socket(context.get_scheduler().get_context(), id) {} basic_stream_socket(::beman::net::io_context& context, const endpoint_type& endpoint) : beman::net::basic_socket( context.get_scheduler().get_context(), ::std::invoke([p = endpoint.protocol(), &context] { diff --git a/include/beman/net/detail/context_base.hpp b/include/beman/net/detail/context_base.hpp index 0132869..961dbf9 100644 --- a/include/beman/net/detail/context_base.hpp +++ b/include/beman/net/detail/context_base.hpp @@ -26,6 +26,7 @@ struct beman::net::detail::context_base { virtual auto complete() -> void = 0; }; + using poll_operation = ::beman::net::detail::io_operation<::std::tuple>; using accept_operation = ::beman::net::detail::io_operation< ::std::tuple<::beman::net::detail::endpoint, ::socklen_t, ::std::optional<::beman::net::detail::socket_id>>>; using connect_operation = ::beman::net::detail::io_operation<::std::tuple<::beman::net::detail::endpoint>>; @@ -49,8 +50,9 @@ struct beman::net::detail::context_base { virtual auto run_one() noexcept -> ::std::size_t = 0; - virtual auto cancel(::beman::net::detail::io_base*, ::beman::net::detail::io_base*) -> void = 0; - virtual auto schedule(::beman::net::detail::context_base::task*) -> void = 0; + virtual auto cancel(::beman::net::detail::io_base*, ::beman::net::detail::io_base*) -> void = 0; + virtual auto schedule(::beman::net::detail::context_base::task*) -> void = 0; + virtual auto poll(::beman::net::detail::context_base::poll_operation*) -> ::beman::net::detail::submit_result = 0; virtual auto accept(::beman::net::detail::context_base::accept_operation*) -> ::beman::net::detail::submit_result = 0; virtual auto connect(::beman::net::detail::context_base::connect_operation*) diff --git a/include/beman/net/detail/event_type.hpp b/include/beman/net/detail/event_type.hpp index 6d44ea2..a25097d 100644 --- a/include/beman/net/detail/event_type.hpp +++ b/include/beman/net/detail/event_type.hpp @@ -5,6 +5,7 @@ #define INCLUDED_INCLUDE_BEMAN_NET_DETAIL_EVENT_TYPE #include +#include // ---------------------------------------------------------------------------- @@ -13,6 +14,20 @@ enum class event_type { none = 0x00, in = 0x01, out = 0x02, in_out = 0x03 }; constexpr ::beman::net::event_type operator&(::beman::net::event_type e0, ::beman::net::event_type e1) { return ::beman::net::event_type(::std::uint8_t(e0) & ::std::uint8_t(e1)); } +inline std::ostream& operator<<(std::ostream& os, ::beman::net::event_type e) { + switch (e) { + default: + return os << "invalid(" << ::std::uint8_t(e) << ")"; + case ::beman::net::event_type::none: + return os << "none"; + case ::beman::net::event_type::in: + return os << "in"; + case ::beman::net::event_type::out: + return os << "out"; + case ::beman::net::event_type::in_out: + return os << "in|out"; + } +} } // namespace beman::net // ---------------------------------------------------------------------------- diff --git a/include/beman/net/detail/execution.hpp b/include/beman/net/detail/execution.hpp index 64d67e8..ba43a59 100644 --- a/include/beman/net/detail/execution.hpp +++ b/include/beman/net/detail/execution.hpp @@ -28,6 +28,7 @@ using ::beman::execution::env; using ::beman::execution::env_of_t; using ::beman::execution::error_types_of_t; using ::beman::execution::get_env; +using ::beman::execution::sends_stopped; using ::beman::execution::value_types_of_t; using ::beman::execution::get_completion_scheduler; @@ -64,6 +65,7 @@ using ::beman::execution::connect_t; using ::beman::execution::start; using ::beman::execution::start_t; +using ::beman::execution::inline_scheduler; using ::beman::execution::just; using ::beman::execution::just_error; using ::beman::execution::just_stopped; diff --git a/include/beman/net/detail/io_context.hpp b/include/beman/net/detail/io_context.hpp index 4409f73..7490db5 100644 --- a/include/beman/net/detail/io_context.hpp +++ b/include/beman/net/detail/io_context.hpp @@ -68,6 +68,9 @@ class beman::net::io_context { io_context(::beman::net::detail::context_base& context) : d_owned(), d_context(context) {} io_context(io_context&&) = delete; + auto make_socket(::beman::net::detail::native_handle_type fd) -> ::beman::net::detail::socket_id { + return this->d_context.make_socket(fd); + } auto make_socket(int d, int t, int p, ::std::error_code& error) -> ::beman::net::detail::socket_id { return this->d_context.make_socket(d, t, p, error); } @@ -93,7 +96,7 @@ class beman::net::io_context { auto listen(::beman::net::detail::socket_id id, int no, ::std::error_code& error) { this->d_context.listen(id, no, error); } - auto get_scheduler() -> scheduler_type { return scheduler_type(&this->d_context); } + auto get_scheduler() noexcept -> scheduler_type { return scheduler_type(&this->d_context); } template struct run_one_state { @@ -115,6 +118,10 @@ class beman::net::io_context { using completion_signatures = ::beman::execution::completion_signatures<::beman::execution::set_value_t(std::size_t), ::beman::execution::set_stopped_t()>; + template + static consteval auto get_completion_signatures() { + return completion_signatures{}; + } beman::net::io_context* _context; template @@ -125,14 +132,24 @@ class beman::net::io_context { auto async_run_one() { return run_one_sender{this}; } auto async_run() { - return beman::execution::let_value(beman::execution::just(), [this, last_count = std::size_t(1)]() mutable { - (void)last_count; //-dk:TODO remove this once no compiler complains about last_count being unused - return beman::net::repeat_effect_until( - beman::execution::just(), - [this] { return this->async_run_one(); }() | - beman::execution::then([&last_count](std::size_t count) { last_count = count; }), - [&last_count] { return last_count == 0; }); - }); + return beman::execution::write_env( + beman::execution::read_env(beman::execution::get_scheduler) | + beman::execution::let_value([this, last_count = std::size_t(1)](auto sched) mutable noexcept { + (void)last_count; //-dk:TODO remove this once no compiler complains about last_count being + // unused + return beman::net::repeat_effect_until( + beman::execution::just(), + beman::execution::starts_on( + sched, + this->async_run_one() | + beman::execution::then( + [&last_count](std::size_t count) noexcept { last_count = count; })), + [&last_count]() noexcept { return last_count == 0; }); + }), + beman::execution::env{beman::execution::prop{beman::execution::get_stop_token, + beman::execution::never_stop_token{}}}) | + beman::execution::upon_error([](auto&&) noexcept {}); + ; } ::std::size_t run_one() noexcept { return this->d_context.run_one(); } ::std::size_t run() { diff --git a/include/beman/net/detail/io_context_scheduler.hpp b/include/beman/net/detail/io_context_scheduler.hpp index 872c6f8..040755e 100644 --- a/include/beman/net/detail/io_context_scheduler.hpp +++ b/include/beman/net/detail/io_context_scheduler.hpp @@ -76,6 +76,9 @@ class beman::net::detail::io_context_scheduler { auto cancel(beman::net::detail::io_base* cancel_op, beman::net::detail::io_base* op) -> void { this->d_context->cancel(cancel_op, op); } + auto poll(::beman::net::detail::context_base::poll_operation* op) -> ::beman::net::detail::submit_result { + return this->d_context->poll(op); + } auto accept(::beman::net::detail::context_base::accept_operation* op) -> ::beman::net::detail::submit_result { return this->d_context->accept(op); } diff --git a/include/beman/net/detail/iocp_context.hpp b/include/beman/net/detail/iocp_context.hpp index 33756de..33fa685 100644 --- a/include/beman/net/detail/iocp_context.hpp +++ b/include/beman/net/detail/iocp_context.hpp @@ -308,6 +308,9 @@ struct iocp_context final : context_base { ::PostQueuedCompletionStatus(iocp_handle, 0, 0, nullptr); } + auto poll(poll_operation* op) -> submit_result override { + return submit_result{}; //-dk:TODO + } auto accept(accept_operation* op) -> submit_result override { SOCKET listen_socket = static_cast(native_handle(op->id)); int family = sockets[op->id].address_family; diff --git a/include/beman/net/detail/merge_completion_signatures.hpp b/include/beman/net/detail/merge_completion_signatures.hpp new file mode 100644 index 0000000..499c028 --- /dev/null +++ b/include/beman/net/detail/merge_completion_signatures.hpp @@ -0,0 +1,51 @@ +// include/beman/net/detail/merge_completion_signatures.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_NET_DETAIL_MERGE_COMPLETION_SIGNATURES +#define INCLUDED_INCLUDE_BEMAN_NET_DETAIL_MERGE_COMPLETION_SIGNATURES + +// ---------------------------------------------------------------------------- + +namespace beman::net::detail { +template +struct merge_completion_signatures_unique; +template +struct merge_completion_signatures_unique> { + using type = S0; +}; +template +struct merge_completion_signatures_unique<::beman::execution::completion_signatures, + ::beman::execution::completion_signatures> { + using type = std::conditional_t, + ::beman::execution::completion_signatures>; +}; +template +struct merge_completion_signatures_unique> { + using type = typename merge_completion_signatures_unique< + typename merge_completion_signatures_unique>::type, + ::beman::execution::completion_signatures>::type; +}; + +template +struct merge_completion_signatures_helper; +template +struct merge_completion_signatures_helper { + using type = typename merge_completion_signatures_unique::type; +}; +template +struct merge_completion_signatures_helper { + using type = typename merge_completion_signatures_helper::type, + S2, + Sigs...>::type; +}; + +template +inline consteval auto merge_completion_signatures() { + return typename merge_completion_signatures_helper::type(); +} +} // namespace beman::net::detail + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/net/detail/operations.hpp b/include/beman/net/detail/operations.hpp index a6ac80b..b9b94e7 100644 --- a/include/beman/net/detail/operations.hpp +++ b/include/beman/net/detail/operations.hpp @@ -13,6 +13,7 @@ // ---------------------------------------------------------------------------- namespace beman::net::detail { +struct poll_desc; struct accept_desc; struct connect_desc; struct send_desc; @@ -22,6 +23,9 @@ struct receive_from_desc; } // namespace beman::net::detail namespace beman::net { +using async_poll_t = ::beman::net::detail::sender_cpo<::beman::net::detail::poll_desc>; +inline constexpr async_poll_t async_poll{}; + using async_accept_t = ::beman::net::detail::sender_cpo<::beman::net::detail::accept_desc>; inline constexpr async_accept_t async_accept{}; @@ -42,6 +46,31 @@ inline constexpr async_receive_from_t async_receive_from{}; // ---------------------------------------------------------------------------- +struct beman::net::detail::poll_desc { + using operation = ::beman::net::detail::context_base::poll_operation; + template + struct data { + using completion_signature = ::beman::net::detail::ex::set_value_t(::beman::net::event_type); + + Socket& d_socket; + ::beman::net::event_type d_mask; + data(Socket& socket, ::beman::net::event_type mask) : d_socket(socket), d_mask(mask) {} + + auto id() const { return this->d_socket.id(); } + auto events() const -> ::beman::net::event_type { return this->d_mask; } + auto set_value([[maybe_unused]] operation& o, auto&& receiver) { + ::beman::net::detail::ex::set_value(::std::move(receiver), + //::std::get<0>(o) + ::beman::net::event_type{}); + } + auto get_scheduler() { return this->d_socket.get_scheduler(); } + auto submit([[maybe_unused]] auto* base) -> ::beman::net::detail::submit_result { + ::std::get<1>(*base) = this->d_mask; + return this->get_scheduler().poll(base); + } + }; +}; + struct beman::net::detail::accept_desc { using operation = ::beman::net::detail::context_base::accept_operation; template diff --git a/include/beman/net/detail/poll_context.hpp b/include/beman/net/detail/poll_context.hpp index 6f63e19..7537aa9 100644 --- a/include/beman/net/detail/poll_context.hpp +++ b/include/beman/net/detail/poll_context.hpp @@ -16,6 +16,7 @@ #include #include #include +#include //-dk:TODO remove // ---------------------------------------------------------------------------- @@ -200,6 +201,16 @@ struct beman::net::detail::poll_context final : ::beman::net::detail::context_ba tsk->next = this->d_tasks; this->d_tasks = tsk; } + auto poll(::beman::net::detail::context_base::poll_operation* op) + -> ::beman::net::detail::submit_result override final { + op->context = this; + op->work = [](::beman::net::detail::context_base&, ::beman::net::detail::io_base* o) { + auto& cmp(*static_cast(o)); + cmp.complete(); + return ::beman::net::detail::submit_result::submit; + }; + return this->add_outstanding(op); + } auto accept(::beman::net::detail::context_base::accept_operation* completion) -> ::beman::net::detail::submit_result override final { completion->work = [](::beman::net::detail::context_base& ctxt, ::beman::net::detail::io_base* comp) { diff --git a/include/beman/net/detail/repeat_effect_until.hpp b/include/beman/net/detail/repeat_effect_until.hpp index fae5719..22887e0 100644 --- a/include/beman/net/detail/repeat_effect_until.hpp +++ b/include/beman/net/detail/repeat_effect_until.hpp @@ -5,6 +5,7 @@ #define INCLUDED_INCLUDE_BEMAN_NET_DETAIL_REPEAT_EFFECT_UNTIL #include +#include #include #include #include @@ -19,6 +20,23 @@ struct repeat_effect_until_t : beman::execution::sender_adaptor_closure(upstream), std::forward(body), std::forward(predicate)}; } + template + struct adaptor : beman::execution::sender_adaptor_closure> { + std::remove_cvref_t body; + std::remove_cvref_t predicate; + template + adaptor(B&& b, P&& p) : body(std::forward(b)), predicate(std::forward

(p)) {} + template + auto operator()(Upstream&& upstream) && { + return repeat_effect_until_t{}( + std::forward(upstream), std::move(this->body), std::move(this->predicate)); + } + }; + template + auto operator()(Body&& body, Predicate&& predicate) const { + return adaptor{std::forward(body), std::forward(predicate)}; + } + template struct sender { - using sender_concept = beman::execution::sender_t; - using completion_signatures = - beman::execution::completion_signatures; - template - static constexpr auto get_completion_signatures() -> completion_signatures { - return {}; + using sender_concept = beman::execution::sender_t; + using completion_signatures = beman::execution::completion_signatures; + template + static consteval auto get_completion_signatures() { + return net::detail::merge_completion_signatures< + completion_signatures, + ex::error_types_of_t, ex::completion_signatures>, + ex::error_types_of_t, ex::completion_signatures>, + std::conditional_t || ex::sends_stopped, + ex::completion_signatures, + ex::completion_signatures<>>, + std::conditional_t()()), + ex::completion_signatures<>, + ex::completion_signatures>>(); } Upstream upstream; @@ -88,7 +111,8 @@ struct repeat_effect_until_t : beman::execution::sender_adaptor_closure - auto connect(Receiver&& receiver) { + auto connect(Receiver&& receiver) noexcept( + std::is_nothrow_constructible_v, Receiver>) { return state>{std::move(this->upstream), std::move(this->body), std::move(this->predicate), diff --git a/include/beman/net/detail/scope.hpp b/include/beman/net/detail/scope.hpp index 8c02074..a3fb850 100644 --- a/include/beman/net/detail/scope.hpp +++ b/include/beman/net/detail/scope.hpp @@ -56,7 +56,8 @@ class beman::net::detail::scope { }; auto run() -> auto { - return beman::execution::when_all(this->_counting_scope.join(), this->_io_context.async_run()); + //-dk:TODO return beman::execution::when_all(this->_counting_scope.join(), this->_io_context.async_run()); + return beman::execution::when_all(this->_counting_scope.join()); } auto get_token() -> token { return {this}; }