Skip to content

Commit

Permalink
feature: add current_thread runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
8sileus committed Apr 27, 2024
1 parent 0b790bc commit adfc766
Show file tree
Hide file tree
Showing 50 changed files with 787 additions and 383 deletions.
16 changes: 9 additions & 7 deletions examples/benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ auto main(int argc, char **argv) -> int {
}
SET_LOG_LEVEL(zedio::log::LogLevel::Trace);
auto num_threads = std::stoi(argv[1]);
Runtime::options()
.scheduler()
.set_num_workers(num_threads)
.driver()
.set_submit_interval(0)
.build()
.block_on(main_loop());
if (num_threads > 1) {
runtime::MultiThreadBuilder::options()
.set_num_workers(num_threads)
.set_submit_interval(0)
.build()
.block_on(main_loop());
} else {
runtime::CurrentThreadBuilder::default_create().block_on(main_loop());
}
return 0;
}
2 changes: 1 addition & 1 deletion examples/echo_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ auto main(int argc, char **argv) -> int {
}
SET_LOG_LEVEL(zedio::log::LogLevel::Trace);
auto num_threas = std::stoi(argv[1]);
auto runtime = Runtime::options().scheduler().set_num_workers(num_threas).build();
auto runtime = zedio::runtime::Builder<>::options().set_num_workers(num_threas).build();
runtime.block_on(server());
return 0;
}
2 changes: 1 addition & 1 deletion examples/ping_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ auto main(int argc, char **argv) -> int {
auto port = std::stoi(argv[2]);
auto addr = SocketAddr::parse(ip, port).value();
auto client_num = std::stoi(argv[3]);
Runtime::create().block_on(client(addr, client_num));
zedio::runtime::Builder<>::default_create().block_on(client(addr, client_num));
return 0;
}
2 changes: 1 addition & 1 deletion examples/redis_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,6 @@ auto client() -> Task<void> {

auto main() -> int {
SET_LOG_LEVEL(zedio::log::LogLevel::Debug);
auto runtime = Runtime::options().scheduler().set_num_workers(1).build();
auto runtime = zedio::runtime::Builder<>::options().set_num_workers(1).build();
runtime.block_on(client());
}
2 changes: 1 addition & 1 deletion examples/rpc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,6 @@ auto client() -> Task<void> {

auto main() -> int {
SET_LOG_LEVEL(zedio::log::LogLevel::Debug);
auto runtime = Runtime::options().scheduler().set_num_workers(1).build();
auto runtime = zedio::runtime::Builder<>::options().set_num_workers(1).build();
runtime.block_on(client());
}
2 changes: 1 addition & 1 deletion examples/rpc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class RpcServer {
}

void run() {
auto runtime = Runtime::create();
auto runtime = zedio::runtime::Builder<>::default_create();
runtime.block_on([this]() -> Task<void> {
auto has_addr = SocketAddr::parse(host_, port_);
if (!has_addr) {
Expand Down
4 changes: 2 additions & 2 deletions examples/simple_echo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ auto process(TcpStream stream) -> Task<void> {
if (!ret) {
LOG_ERROR("{}", ret.error());
break;
}
}
auto len = ret.value();
LOG_DEBUG("{}", std::string_view(buf, len));
if (len == 0) {
Expand All @@ -34,5 +34,5 @@ auto server() -> Task<void> {
}

auto main() -> int {
Runtime::create().block_on(server());
zedio::runtime::Builder<>::default_create().block_on(server());
}
3 changes: 1 addition & 2 deletions examples/unwrap_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ auto main(int argc, char **argv) -> int {
auto port = std::stoi(argv[2]);
auto num_threads = std::stoi(argv[3]);
auto num_connections = std::stoi(argv[4]);
Runtime::options()
.scheduler()
zedio::runtime::Builder<>::options()
.set_num_workers(num_threads)
.build()
.block_on(client(ip, static_cast<uint16_t>(port), num_connections));
Expand Down
3 changes: 1 addition & 2 deletions examples/unwrap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ auto main(int argc, char **argv) -> int {
auto ip = argv[1];
auto port = std::stoi(argv[2]);
auto num_threads = std::stoi(argv[3]);
Runtime::options()
.scheduler()
zedio::runtime::Builder<>::options()
.set_num_workers(num_threads)
.build()
.block_on(server(ip, static_cast<uint16_t>(port)));
Expand Down
2 changes: 1 addition & 1 deletion tests/channel_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ auto test(std::size_t n) -> Task<void> {

auto main() -> int {
SET_LOG_LEVEL(LogLevel::Trace);
auto runtime = Runtime::options().scheduler().set_num_workers(4).build();
auto runtime = zedio::runtime::Builder<>::options().set_num_workers(4).build();
runtime.block_on(test(1000));
return 0;
}
2 changes: 1 addition & 1 deletion tests/condition_variable_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ auto test(std::size_t n) -> Task<void> {

auto main() -> int {
SET_LOG_LEVEL(LogLevel::Trace);
auto runtime = Runtime::options().scheduler().set_num_workers(4).build();
auto runtime = zedio::runtime::Builder<>::options().set_num_workers(4).build();
runtime.block_on(test(10000));
return 0;
}
3 changes: 2 additions & 1 deletion tests/file_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ auto test() -> Task<void> {
}

auto main() -> int {
auto runtime = Runtime::options().scheduler().set_num_workers(1).build();
// auto runtime = zedio::runtime::Builder<>::options().set_num_workers(1).build();
auto runtime = zedio::runtime::Builder<zedio::runtime::Kind::CurrentThread>::default_create();
runtime.block_on(test());
return 0;
}
3 changes: 2 additions & 1 deletion tests/io_buf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,5 +150,6 @@ auto test() -> Task<void> {
}

auto main() -> int {
return Runtime::options().scheduler().set_num_workers(1).build().block_on(test());
zedio::runtime::Builder<>::default_create().block_on(test());
return 0;
}
3 changes: 2 additions & 1 deletion tests/latch_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ auto test(std::size_t n) -> Task<void> {

auto main() -> int {
SET_LOG_LEVEL(LogLevel::Trace);
return Runtime::options().scheduler().set_num_workers(4).build().block_on(test(100000));
zedio::runtime::Builder<>::default_create().block_on(test(100000));
return 0;
}
3 changes: 2 additions & 1 deletion tests/mutex_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ auto test(std::size_t n) -> Task<void> {

auto main() -> int {
SET_LOG_LEVEL(LogLevel::Trace);
return Runtime::options().scheduler().set_num_workers(4).build().block_on(test(100000));
zedio::runtime::Builder<>::default_create().block_on(test(100000));
return 0;
}
4 changes: 2 additions & 2 deletions tests/queue_test.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#define BOOST_TEST_MODULE ring_buffer_test

#include "zedio/runtime/queue.hpp"
#include "zedio/runtime/multi_thread/queue.hpp"

#include <boost/test/included/unit_test.hpp>

#include <thread>

using namespace zedio::runtime::detail;
using namespace zedio::runtime::multi_thread;

BOOST_AUTO_TEST_SUITE(ring_buffer_test)

Expand Down
3 changes: 2 additions & 1 deletion tests/semaphore_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,6 @@ auto test() -> Task<void> {

auto main() -> int {
SET_LOG_LEVEL(LogLevel::Trace);
return Runtime::options().scheduler().set_num_workers(4).build().block_on(test());
runtime::MultiThreadBuilder::default_create().block_on(test());
return 0;
}
6 changes: 5 additions & 1 deletion tests/signal_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ auto signal_test() -> async::Task<void> {
co_await signal::ctrl_c();
LOG_DEBUG("sigint after");
t.join();
LOG_DEBUG("join finished");
}

int main() {
SET_LOG_LEVEL(zedio::log::LogLevel::Trace);
return Runtime::create().block_on(signal_test());
// auto runtime = zedio::runtime::Builder<>::default_create();
auto runtime = runtime::CurrentThreadBuilder::default_create();
runtime.block_on(signal_test());
return 0;
}
2 changes: 1 addition & 1 deletion tests/stdio_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ auto test() -> Task<void> {

auto main() -> int {
SET_LOG_LEVEL(LogLevel::Debug);
auto runtime = Runtime::options().scheduler().set_num_workers(1).build();
auto runtime = zedio::runtime::CurrentThreadBuilder::default_create();
runtime.block_on(test());
}
3 changes: 2 additions & 1 deletion tests/tcp_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,6 @@ auto test() -> Task<void> {

auto main() -> int {
SET_LOG_LEVEL(zedio::log::LogLevel::Trace);
return Runtime::options().scheduler().set_num_workers(1).build().block_on(test());
runtime::MultiThreadBuilder::default_create().block_on(test());
return 0;
}
3 changes: 2 additions & 1 deletion tests/timeout_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ auto main_test() -> Task<void> {

auto main() -> int {
SET_LOG_LEVEL(zedio::log::LogLevel::Trace);
return Runtime::options().scheduler().set_num_workers(1).build().block_on(main_test());
runtime::CurrentThreadBuilder::default_create().block_on(main_test());
return 0;
}
21 changes: 16 additions & 5 deletions tests/udp_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,21 @@ auto client(std::string_view ip, uint16_t port) -> Task<void> {
std::string_view w_buf = "udp ping";
char r_buf[1024] = {};
while (true) {
co_await sock.send_to(w_buf, addr);
auto [len, addr] = (co_await sock.recv_from(r_buf)).value();
if(auto ret=co_await sock.send_to(w_buf, addr);!ret){
LOG_ERROR("{}", ret.error());
break;
}
auto ret = co_await sock.recv_from(r_buf);
if (!ret) {
LOG_ERROR("{}", ret.error());
break;
}
auto [len, addr] = ret.value();
if (len == 0) {
break;
}
LOG_INFO("read: {} from {}", len, addr);
r_buf[len] = 0;
LOG_INFO("client read: {} from {}", r_buf, addr);
}
}

Expand All @@ -31,7 +40,8 @@ auto server(std::string_view ip, uint16_t port) -> Task<void> {
char buf[1024] = {};
for (int i = 0; i < 100; i += 1) {
auto [len, addr] = (co_await sock.recv_from(buf)).value();
LOG_INFO("read: {} from {}", len, addr);
buf[len] = 0;
LOG_INFO("server read: {} from {}", buf, addr);
co_await sock.send_to({buf, len}, addr);
}
}
Expand All @@ -40,6 +50,7 @@ auto main() -> int {
SET_LOG_LEVEL(zedio::log::LogLevel::Trace);
auto ip = "127.0.0.1";
auto port = 9999;
Runtime::create().block_on(server(ip, static_cast<uint16_t>(port)));
runtime::CurrentThreadBuilder::default_create().block_on(
server(ip, static_cast<uint16_t>(port)));
return 0;
}
3 changes: 2 additions & 1 deletion tests/unix_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,6 @@ auto test() -> Task<void> {

auto main() -> int {
SET_LOG_LEVEL(zedio::log::LogLevel::Trace);
return Runtime::options().scheduler().set_num_workers(1).build().block_on(test());
runtime::CurrentThreadBuilder::default_create().block_on(test());
return 0;
}
30 changes: 4 additions & 26 deletions zedio/common/util/thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// C++
#include <thread>

namespace zedio::current_thread {
namespace zedio::util {

static inline auto get_tid() noexcept -> pid_t {
static thread_local pid_t t_id = ::gettid();
Expand All @@ -16,35 +16,13 @@ static inline auto get_tid() noexcept -> pid_t {

static thread_local std::string t_name;

static inline auto set_thread_name(std::string_view name) {
static inline auto set_current_thread_name(std::string_view name) {
pthread_setname_np(pthread_self(), name.data());
t_name = name;
}

static inline auto get_thread_name() -> std::string_view {
static inline auto get_current_thread_name() -> std::string_view {
return t_name;
}

} // namespace zedio::current_thread

namespace zedio::util {
class SpinMutex : Noncopyable {
public:
explicit SpinMutex(int pshared = 0) noexcept {
pthread_spin_init(&mutex_, pshared);
}
~SpinMutex() noexcept {
pthread_spin_destroy(&mutex_);
}

void lock() noexcept {
pthread_spin_lock(&mutex_);
}
void unlock() noexcept {
pthread_spin_unlock(&mutex_);
}

private:
pthread_spinlock_t mutex_;
};

} // namespace zedio::util
21 changes: 4 additions & 17 deletions zedio/io/base/callback.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,10 @@ class Entry;
namespace zedio::io::detail {

struct Callback {

[[nodiscard]]
auto get_coro_handle_and_set_result(int result) -> std::coroutine_handle<> {
auto ret = handle_;
result_ = result;
return ret;
}

union {
std::coroutine_handle<> handle_{nullptr};
int result_;
};

union {
runtime::detail::Entry *entry_{nullptr};
std::chrono::steady_clock::time_point deadline_;
};
std::coroutine_handle<> handle_{nullptr};
int result_;
runtime::detail::Entry *entry_{nullptr};
std::chrono::steady_clock::time_point deadline_;
};

} // namespace zedio::io::detail
4 changes: 2 additions & 2 deletions zedio/log/logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ class BaseLogger : util::Noncopyable {
static_cast<DeriverLogger *>(this)->template log<LEVEL>(
LogRecord{buffer.data(),
cur_millisecond,
current_thread::get_tid(),
current_thread::get_thread_name(),
util::get_tid(),
util::get_current_thread_name(),
sl.file_name(),
sl.line(),
std::vformat(fmt, std::make_format_args(std::forward<Args>(args)...))});
Expand Down
Loading

0 comments on commit adfc766

Please sign in to comment.