Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3c81ece
started on creating a postgres connection
dietmarkuehl Apr 12, 2026
2030284
saving current intermediate state
dietmarkuehl Apr 16, 2026
d22b028
made some progress on the poll() implementation
dietmarkuehl Apr 18, 2026
ecabdc4
fixed the code to have poll() work for the poll_context
dietmarkuehl Apr 19, 2026
5d39b75
making progress on querying the database in a nice way
dietmarkuehl Apr 19, 2026
aa136c7
added sync_run to run a local run_loop
dietmarkuehl Apr 19, 2026
958a6ee
trying to fix the run_loop existing on a timer
dietmarkuehl Apr 19, 2026
35bd5dc
made some progress on fixing async_run
dietmarkuehl Apr 20, 2026
846e5e9
a first cut at an asynchronous postgres demo
dietmarkuehl Apr 22, 2026
7e5a81c
clang-format and some refinement to improve set_error completions
dietmarkuehl Apr 22, 2026
fdb0cf5
added an initial print completions function
dietmarkuehl Apr 24, 2026
31b182d
adding an example and building postgresql binding
dietmarkuehl Apr 27, 2026
cd4444c
Merge branch 'main' into postgres
dietmarkuehl Apr 27, 2026
1a57468
fixed the CMakeLists.txt to conditionally include postgres
dietmarkuehl Apr 27, 2026
a12f7f3
some clean-up and better prepration for the postgres presentation
dietmarkuehl Apr 28, 2026
4c80cb8
minor usability improvement for socket creation
dietmarkuehl Apr 28, 2026
5c3aba0
started adding a simple HTTP server
dietmarkuehl May 2, 2026
f3ee267
added an async in-memory stream
dietmarkuehl May 2, 2026
ae1058e
added an in-memory stream
dietmarkuehl May 2, 2026
e8ce047
a bit more progress on the HTTP support
dietmarkuehl May 4, 2026
1ffb393
fixed noexcept of repeat_effect_until::connect
dietmarkuehl May 4, 2026
56bf564
improved the completion signature of repeat_effect_until
dietmarkuehl May 4, 2026
e6e6db2
fixed when_any version to actually stop
dietmarkuehl May 4, 2026
593329c
clang format
dietmarkuehl May 4, 2026
361c877
restored building examples
dietmarkuehl May 4, 2026
48857e6
fix/work around issues located by CI
dietmarkuehl May 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ include(FetchContent)

FetchContent_Declare(
task
# FETCHCONTENT_SOURCE_DIR_TASK ${CMAKE_SOURCE_DIR}/../task
# SOURCE_DIR ${CMAKE_SOURCE_DIR}/../task
GIT_REPOSITORY https://github.com/bemanproject/task
GIT_TAG 16a5916
GIT_TAG 6ff5b89
FIND_PACKAGE_ARGS 0.2.0 EXACT NAMES beman.task COMPONENTS task_headers
)
FetchContent_MakeAvailable(task)

include(CheckLibraryExists)
set(POSTGRESROOT /Library/PostgreSQL/18)
check_library_exists(pq PQsocket ${POSTGRESROOT}/lib HAS_POSTGRES)
Comment on lines +40 to +41
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

POSTGRESROOT is hard-coded to /Library/PostgreSQL/18, which is platform- and install-specific. Make this configurable (CACHE PATH) and/or use find_package(PostgreSQL) / pkg-config to locate libpq in a cross-platform way; otherwise the build system will behave unexpectedly on non-macOS or different Postgres installs.

Suggested change
set(POSTGRESROOT /Library/PostgreSQL/18)
check_library_exists(pq PQsocket ${POSTGRESROOT}/lib HAS_POSTGRES)
find_package(PostgreSQL QUIET)
if(PostgreSQL_FOUND)
set(HAS_POSTGRES TRUE)
else()
set(HAS_POSTGRES FALSE)
endif()

Copilot uses AI. Check for mistakes.

add_subdirectory(src/beman/net)

#===============================================================================
Expand All @@ -52,6 +56,7 @@ if(BEMAN_NET_BUILD_TESTS)
add_subdirectory(tests/beman/net)
endif()

if(BEMAN_NET_BUILD_EXAMPLES)
if(${BEMAN_NET_BUILD_EXAMPLES})
message(NOTICE "Building examples")
add_subdirectory(examples)
endif()
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

SANITIZERS = none debug msan asan usan tsan
.PHONY: default gcc clang run update check ce todo distclean clean build test all format $(SANITIZERS)
.PHONY: default gcc clang run update check ce todo distclean clean build test all clang-format format $(SANITIZERS)

COMPILER=system
CXX_BASE=$(CXX:$(dir $(CXX))%=%)
Expand Down Expand Up @@ -89,7 +89,7 @@ check:
todo:
bin/mk-todo.py

format:
clang-format format:
git clang-format main

clean:
Expand Down
14 changes: 14 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ set(EXAMPLES
taps
task
)

if(${HAS_POSTGRES})
list(APPEND EXAMPLES postgres postgres-talk populate-postgres)
endif()
set(xEXAMPLES taps)
set(xEXAMPLES memory-stream)
set(xEXAMPLES postgres-talk)

foreach(EXAMPLE ${EXAMPLES})
set(EXAMPLE_TARGET beman.net.examples.${EXAMPLE})
Expand All @@ -42,6 +48,14 @@ foreach(EXAMPLE ${EXAMPLES})
# XXX target_compile_definitions( ${EXAMPLE_TARGET} PRIVATE BEMAN_NET_USE_URING)
endif()
target_sources(${EXAMPLE_TARGET} PRIVATE ${EXAMPLE}.cpp)
if(${HAS_POSTGRES})
target_include_directories(
${EXAMPLE_TARGET}
PRIVATE ${POSTGRESROOT}/include
)
target_link_directories(${EXAMPLE_TARGET} PRIVATE ${POSTGRESROOT}/lib)
target_link_libraries(${EXAMPLE_TARGET} PRIVATE pq)
endif()
Comment thread
dietmarkuehl marked this conversation as resolved.
target_link_libraries(${EXAMPLE_TARGET} PRIVATE beman::net_headers)
target_link_libraries(${EXAMPLE_TARGET} PRIVATE beman::task_headers)
endforeach()
70 changes: 42 additions & 28 deletions examples/demo_algorithm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ struct when_any_t {
template <typename>
struct env;
template <typename>
struct state_env_base;
template <typename>
struct state_base;
template <ex::receiver, typename, typename>
struct state_value;
Expand Down Expand Up @@ -183,7 +185,7 @@ struct demo::into_error_t::receiver {
template <demo::ex::sender Sender, typename Fun>
struct demo::into_error_t::sender {
using sender_concept = ex::sender_t;
template <typename... Env>
template <typename, typename... Env>
static consteval auto get_completion_signatures() {
// static_assert(sizeof...(Env) <= 1u);
if constexpr (sizeof...(Env) <= 1)
Expand Down Expand Up @@ -238,16 +240,24 @@ inline auto demo::into_expected_t::operator()(Sender&& s) const {

// ----------------------------------------------------------------------------

template <typename Receiver>
struct demo::when_any_t::state_base {
::std::size_t total{};
Receiver receiver{};
::std::atomic<::std::size_t> done_count{};
::std::atomic<::std::size_t> ready_count{};
template <typename Env>
struct demo::when_any_t::state_env_base {
virtual auto get_receiver_env() const noexcept -> Env = 0;
::demo::ex::inplace_stop_source source{};
};

// ----------------------------------------------------------------------------

template <typename Receiver>
struct demo::when_any_t::state_base : demo::when_any_t::state_env_base<ex::env_of_t<Receiver>> {
::std::size_t total;
Receiver receiver;
::std::atomic<::std::size_t> done_count{};
::std::atomic<::std::size_t> ready_count{};

auto get_receiver_env() const noexcept -> ex::env_of_t<Receiver> override { return ex::get_env(this->receiver); }
template <typename R>
state_base(std::size_t tot, R&& rcvr) : total(tot), receiver(::std::forward<R>(rcvr)) {}
state_base(::std::size_t tot, R&& r) : total(tot), receiver(std::forward<R>(r)) {}
virtual ~state_base() = default;
auto complete() -> bool {
if (0u == this->done_count++) {
Expand Down Expand Up @@ -292,13 +302,18 @@ struct demo::when_any_t::state_value : demo::when_any_t::state_base<Receiver> {

// ----------------------------------------------------------------------------

template <typename Receiver>
template <typename Env>
struct demo::when_any_t::env {
demo::when_any_t::state_base<Receiver>* state;
demo::when_any_t::state_env_base<Env>* state;

auto query(const ex::get_stop_token_t&) const noexcept -> ex::inplace_stop_token {
return this->state->source.get_token();
}
//-dk:TODO when_any_t::env: set up query forwarding
template <typename Query>
requires requires(const Query& q, const Env& e) { q(e); }
auto query(const Query& q) const noexcept {
return q(this->state->get_receiver_env());
}
};

// ----------------------------------------------------------------------------
Expand All @@ -308,7 +323,7 @@ struct demo::when_any_t::receiver {
using receiver_concept = ::demo::ex::receiver_t;
demo::when_any_t::state_value<Receiver, Value, Error>* state;

auto get_env() const noexcept -> env<Receiver> { return {this->state}; }
auto get_env() const noexcept -> demo::when_any_t::env<ex::env_of_t<Receiver>> { return {this->state}; }
template <typename E>
auto set_error(E&& error) && noexcept -> void {
if (this->state->complete()) {
Expand Down Expand Up @@ -360,26 +375,25 @@ template <demo::ex::sender... Sender>
struct demo::when_any_t::sender {
::beman::execution::detail::product_type<::std::remove_cvref_t<Sender>...> sender;
using sender_concept = ex::sender_t;
template <typename Env>
template <typename, typename Env>
static consteval auto get_completion_signatures() {
return ::beman::execution::detail::meta::unique<
::beman::execution::detail::meta::combine<decltype(ex::get_completion_signatures<Sender, Env>())...>>();
return ::beman::execution::detail::meta::unique<::beman::execution::detail::meta::combine<
decltype(ex::get_completion_signatures<Sender, when_any_t::env<Env>>())...>>();
}

template <demo::ex::receiver Receiver>
auto connect(Receiver&& receiver) && -> state<
::std::index_sequence_for<Sender...>,
::std::remove_cvref_t<Receiver>,
demo::detail::variant_from_list_t<ex::detail::transform<
demo::detail::decayed_set_value_t,
demo::detail::make_type_list_t<ex::detail::filter<
demo::detail::is_set_value,
decltype(ex::get_completion_signatures<decltype(*this), decltype(ex::get_env(receiver))>())>>>>,
demo::detail::variant_from_list_t<ex::detail::filter<
demo::detail::is_set_error,
decltype(ex::get_completion_signatures<decltype(*this), decltype(ex::get_env(receiver))>())>>,
Sender...> {
return {::std::forward<Receiver>(receiver), ::std::move(this->sender)};
auto connect(Receiver&& receiver) && {
using completion_signatures = decltype(ex::get_completion_signatures<std::remove_cvref_t<decltype(*this)>,
decltype(ex::get_env(receiver))>());
return state<
::std::index_sequence_for<Sender...>,
::std::remove_cvref_t<Receiver>,
demo::detail::variant_from_list_t<
ex::detail::transform<demo::detail::decayed_set_value_t,
demo::detail::make_type_list_t<
ex::detail::filter<demo::detail::is_set_value, completion_signatures>>>>,
demo::detail::variant_from_list_t<ex::detail::filter<demo::detail::is_set_error, completion_signatures>>,
Sender...>{::std::forward<Receiver>(receiver), ::std::move(this->sender)};
}
};

Expand Down
88 changes: 88 additions & 0 deletions examples/demo_http.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// examples/demo_http.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_EXAMPLES_DEMO_HTTP
#define INCLUDED_EXAMPLES_DEMO_HTTP

#include <beman/execution/execution.hpp>
#include <beman/execution/task.hpp>
#include <beman/net/net.hpp>
#include "demo_stream.hpp"
#include <algorithm>
#include <array>
#include <cstddef>
#include <string_view>
#include <vector>

// ----------------------------------------------------------------------------

namespace demo::http {
namespace ex = ::beman::execution;
namespace net = ::beman::net;

struct request {
std::string method;
std::string url;
std::string version;
std::vector<std::string> headers;
};

template <typename Stream>
struct http_client {
Stream stream;

public:
http_client(Stream s) : stream{std::move(s)} {}
auto request() -> ex::task<> {
std::vector<char> method, url, version;
if (!(co_await stream.read(method, ' '))) {
co_return;
}
std::cout << "read method='" << std::string_view(method) << "'\n";

if (!(co_await stream.read(url, ' '))) {
co_return;
}
std::cout << "read url='" << std::string_view(url) << "'\n";

if (!(co_await stream.read(version, "\r\n"))) {
co_return;
}
std::cout << "read version='" << std::string_view(version) << "'\n";

std::vector<char> header;
while (co_await stream.read(header, "\r\n") && !header.empty()) {
std::cout << "read header line='" << std::string_view(header) << "'\n";
header.clear();
}
std::cout << "read HTTP GET request\n";

co_return;
}
};

struct no_error_env {
using error_types = ::beman::execution::completion_signatures<>;
};

template <typename SenderFactory>
::beman::execution::task<void, demo::http::no_error_env>
http_server(::beman::net::io_context& io, unsigned short port, SenderFactory fun) {
::beman::net::ip::tcp::endpoint ep(::beman::net::ip::address_v4::any(), port);
::beman::net::ip::tcp::acceptor server(io, ep);
while (true) {
auto [client, addr] = co_await ::beman::net::async_accept(server);
std::cout << "connection from " << addr << "\n";
fun(demo::http::http_client(demo::tcp_stream(std::move(client))));
}
}
} // namespace demo::http

namespace demo {
using http::http_client;
using http::http_server;
} // namespace demo

// ----------------------------------------------------------------------------

#endif
Loading
Loading