From a14856a854f17ff578aec053f22f09e1aa24ffc7 Mon Sep 17 00:00:00 2001 From: Takuro Iizuka Date: Thu, 4 Apr 2024 14:32:37 -0700 Subject: [PATCH 01/12] WIP --- CMakeLists.txt | 6 +++-- include/lng/system.h | 49 +++++++++-------------------------- src/dpdk-runtime.cc | 0 src/dpdk-runtime.h | 6 +++++ src/system.cc | 61 ++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 81 insertions(+), 41 deletions(-) delete mode 100644 src/dpdk-runtime.cc create mode 100644 src/dpdk-runtime.h diff --git a/CMakeLists.txt b/CMakeLists.txt index c31151b..35c3867 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,7 +31,9 @@ endif() if(LIBDOCAGPU_FOUND) set(LNG_WITH_DOCA 1) add_compile_definitions(LNG_WITH_DOCA=1) -elseif(LIBDPDK_FOUND) +endif() + +if(LIBDPDK_FOUND) set(LNG_WITH_DPDK 1) add_compile_definitions(LNG_WITH_DPDK=1) endif() @@ -119,7 +121,7 @@ if(LNG_WITH_DOCA OR LNG_WITH_DPDK) target_link_libraries(dpdk_frame_builder_tcp lng-core ${LIBDPDK_STATIC_LDFLAGS}) endif() -if(LNG_WITH_NVIDIA) +if(LNG_WITH_DOCA) add_executable(test_tcp test/test_tcp.cc) target_link_libraries(test_tcp lng-core ${LIBDOCAGPU_STATIC_LDFLAGS}) add_executable(test_udp test/test_udp.cc) diff --git a/include/lng/system.h b/include/lng/system.h index 05e7c94..2cee435 100644 --- a/include/lng/system.h +++ b/include/lng/system.h @@ -10,59 +10,34 @@ namespace lng { class System { + + struct Impl; + public: System(); + System(const System&) = delete; + System(const System&&) = delete; template ::value>::type* = nullptr> T create_actor(const std::string id, int cpu_id, Args... args) { auto actor(std::make_shared(id, cpu_id, args...)); - actors_[id] = actor; + register_actor(id, actor); return *actor; } - void start() - { - // TODO: Considering tree dependency - for (auto& [id, actor] : actors_) { - actor->start(); - } + void start(); - // Make sure to be running all actors - for (auto& [n, actor] : actors_) { - actor->wait_until(Actor::State::Running); - } - } + void stop(); - void stop() - { - // TODO: Considering tree dependency - for (auto& [id, actor] : actors_) { - actor->stop(); - } + void terminate(); - // Make sure to be ready all actors - for (auto& [n, actor] : actors_) { - actor->wait_until(Actor::State::Ready); - } - } - - void terminate() - { - // TODO: Considering tree dependency - for (auto& [n, actor] : actors_) { - actor->terminate(); - } - - // Make sure to finalize all actors - for (auto& [n, actor] : actors_) { - actor->wait_until(Actor::State::Fin); - } - } private: - std::unordered_map> actors_; + void register_actor(const std::string& id, const std::shared_ptr& actor); + + std::shared_ptr impl_; }; } diff --git a/src/dpdk-runtime.cc b/src/dpdk-runtime.cc deleted file mode 100644 index e69de29..0000000 diff --git a/src/dpdk-runtime.h b/src/dpdk-runtime.h new file mode 100644 index 0000000..5c611f5 --- /dev/null +++ b/src/dpdk-runtime.h @@ -0,0 +1,6 @@ +namespace lng { + +class DPDKRuntime { +}; + +} // lng diff --git a/src/system.cc b/src/system.cc index 181b6d4..9056da0 100644 --- a/src/system.cc +++ b/src/system.cc @@ -1,20 +1,77 @@ #include "lng/system.h" -#if defined(LNG_WITH_NV) +#if defined(LNG_WITH_DOCA) #include "lng/doca-util.h" #endif +#if defined(LNG_WITH_DPDK) +#include "dpdk-runtime.h" +#endif + #include "log.h" namespace lng { +struct System::Impl { + + std::unordered_map> actors; + +#if defined(LNG_WITH_DPDK) + DPDKRuntime dpdk_rt; +#endif +}; + System::System() + : impl_(new Impl) { log::debug("System is initialized"); -#if defined(LNG_WITH_NV) +#if defined(LNG_WITH_DOCA) doca_log_backend_create_standard(); #endif } +void System::start() +{ + // TODO: Considering tree dependency + for (auto& [id, actor] : impl_->actors) { + actor->start(); + } + + // Make sure to be running all actors + for (auto& [n, actor] : impl_->actors) { + actor->wait_until(Actor::State::Running); + } +} + +void System::stop() +{ + // TODO: Considering tree dependency + for (auto& [id, actor] : impl_->actors) { + actor->stop(); + } + + // Make sure to be ready all actors + for (auto& [n, actor] : impl_->actors) { + actor->wait_until(Actor::State::Ready); + } +} + +void System::terminate() +{ + // TODO: Considering tree dependency + for (auto& [n, actor] : impl_->actors) { + actor->terminate(); + } + + // Make sure to finalize all actors + for (auto& [n, actor] : impl_->actors) { + actor->wait_until(Actor::State::Fin); + } +} + +void System::register_actor(const std::string& id, const std::shared_ptr& actor) { + impl_->actors[id] = actor; +} + } // lng From b1b56e4074d9bd6f6990d19b959d28ebb1aa8223 Mon Sep 17 00:00:00 2001 From: Takuro Iizuka Date: Thu, 4 Apr 2024 23:41:07 +0000 Subject: [PATCH 02/12] WIP --- CMakeLists.txt | 3 ++- include/lng/runtime.h | 51 +++++++++++++++++++++++++++++++++++++ include/lng/stream.h | 10 +++++--- include/lng/system.h | 12 +++++++++ src/dpdk-runtime.h | 6 ----- src/runtime.cc | 59 +++++++++++++++++++++++++++++++++++++++++++ src/stream.cc | 36 +++++--------------------- src/system.cc | 34 +++++++++++++++---------- 8 files changed, 157 insertions(+), 54 deletions(-) create mode 100644 include/lng/runtime.h delete mode 100644 src/dpdk-runtime.h create mode 100644 src/runtime.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 35c3867..25bfb01 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -58,10 +58,11 @@ set(LNG_CORE_SRC src/receiver-actor.cc src/event.cc src/log.cc + src/runtime.cc src/stream.cc src/system.cc ) -if(LNG_WITH_CUDA) +if(LNG_WITH_CUDA AND LNG_WITH_DOCA) list(APPEND LNG_CORE_SRC src/protocol_tcp.cu src/protocol_udp.cu) diff --git a/include/lng/runtime.h b/include/lng/runtime.h new file mode 100644 index 0000000..f30e783 --- /dev/null +++ b/include/lng/runtime.h @@ -0,0 +1,51 @@ +#ifndef LNG_RUNTIME_H +#define LNG_RUNTIME_H + +#if defined(LNG_WITH_DPDK) +struct rte_mempool; +#endif + +namespace lng { + +class Runtime { +public: + enum Type { + DPDK, + DOCA + }; + + virtual void start() = 0; + virtual void stop() = 0; +}; + +#if defined(LNG_WITH_DOCA) +class DOCARuntime : public Runtime { +public: + DOCARuntime(); + + virtual void start() { } + + virtual void stop() { } +}; +#endif + +#if defined(LNG_WITH_DPDK) + +class DPDKRuntime : public Runtime { +public: + virtual void start(); + + virtual void stop(); + + rte_mempool* get_mempool() { + return mbuf_pool_; + } + +private: + rte_mempool* mbuf_pool_; +}; +#endif + +} // lng + +#endif diff --git a/include/lng/stream.h b/include/lng/stream.h index e8d35c8..ae0f22d 100644 --- a/include/lng/stream.h +++ b/include/lng/stream.h @@ -11,6 +11,8 @@ struct rte_mempool; namespace lng { +class DPDKRuntime; + template class Stream { public: @@ -61,7 +63,7 @@ class MemoryStream : public Stream { class DPDKStream : public Stream { struct Impl { - rte_mempool* mbuf_pool; + std::shared_ptr rt; uint16_t port_id; uint16_t tcp_port; bool send_ack(rte_mbuf* recv_mbuf, size_t length); @@ -69,7 +71,7 @@ class DPDKStream : public Stream { void wait_for_3wayhandshake(); bool check_target_packet(rte_mbuf* recv_mbuf); - Impl(uint16_t port_id); + Impl(const std::shared_ptr& rt, uint16_t port_id); ~Impl(); private: @@ -77,8 +79,8 @@ class DPDKStream : public Stream { }; public: - DPDKStream(uint16_t port_id) - : impl_(new Impl(port_id)) + DPDKStream(const std::shared_ptr& rt, uint16_t port_id) + : impl_(new Impl(rt, port_id)) { } diff --git a/include/lng/system.h b/include/lng/system.h index 2cee435..ae368c8 100644 --- a/include/lng/system.h +++ b/include/lng/system.h @@ -6,6 +6,8 @@ #include #include "lng/actor.h" +#include "lng/runtime.h" +#include "lng/stream.h" namespace lng { @@ -27,6 +29,14 @@ class System { return *actor; } + template::value>::type* = nullptr, + typename std::enable_if::value>::type* = nullptr> + std::shared_ptr create_stream(Args... args) + { + return std::make_shared(select_runtime(Runtime::DPDK), args...); + } + void start(); void stop(); @@ -37,6 +47,8 @@ class System { private: void register_actor(const std::string& id, const std::shared_ptr& actor); + std::shared_ptr select_runtime(Runtime::Type type); + std::shared_ptr impl_; }; diff --git a/src/dpdk-runtime.h b/src/dpdk-runtime.h deleted file mode 100644 index 5c611f5..0000000 --- a/src/dpdk-runtime.h +++ /dev/null @@ -1,6 +0,0 @@ -namespace lng { - -class DPDKRuntime { -}; - -} // lng diff --git a/src/runtime.cc b/src/runtime.cc new file mode 100644 index 0000000..014e252 --- /dev/null +++ b/src/runtime.cc @@ -0,0 +1,59 @@ +#if defined(LNG_WITH_DOCA) +#include "lng/doca-util.h" +#endif + +#if defined(LNG_WITH_DPDK) +#include +#include +#include +#include +#endif + +#include "lng/runtime.h" + +#include "log.h" + +namespace lng { + +#if defined(LNG_WITH_DOCA) +DOCARuntime::DOCARuntime() { + doca_log_backend_create_standard(); +} +#endif + +#if defined(LNG_WITH_DPDK) +void DPDKRuntime::start() { + // Initializion the environment abstraction layer + std::vector arguments = { "." }; + std::vector args; + for (auto& a : arguments) { + args.push_back(&a[0]); + } + args.push_back(nullptr); + + int ret = rte_eal_init(args.size() - 1, args.data()); + if (ret < 0) { + throw std::runtime_error("Cannot initialize DPDK"); + } + + // Allocates mempool to hold the mbufs + constexpr uint32_t n = 8192 - 1; + constexpr uint32_t cache_size = 256; + constexpr uint32_t data_room_size = RTE_PKTMBUF_HEADROOM + 10 * 1024; + + mbuf_pool_ = rte_pktmbuf_pool_create("mbuf_pool", n, cache_size, 0, data_room_size, rte_socket_id()); + if (mbuf_pool_ == nullptr) { + throw std::runtime_error(fmt::format("Cannot create mbuf pool, n={}, cache_size={}, priv_size=0, data_room_size={}", + n, cache_size, data_room_size)); + } + +} + +void DPDKRuntime::stop() { + rte_mempool_free(mbuf_pool_); + rte_eal_cleanup(); +} + +#endif + +} // lng diff --git a/src/stream.cc b/src/stream.cc index 696aa78..7755a2b 100644 --- a/src/stream.cc +++ b/src/stream.cc @@ -6,39 +6,18 @@ #endif #include "lng/stream.h" +#include "lng/runtime.h" #include "log.h" namespace lng { -DPDKStream::Impl::Impl(uint16_t port_id) - : port_id(port_id) +DPDKStream::Impl::Impl(const std::shared_ptr& rt, uint16_t port_id) + : rt(rt), port_id(port_id) { - constexpr uint32_t mtu = 9000; - - // Initializion the environment abstraction layer - std::vector arguments = { "." }; - std::vector args; - for (auto& a : arguments) { - args.push_back(&a[0]); - } - args.push_back(nullptr); + int ret = 0; - int ret = rte_eal_init(args.size() - 1, args.data()); - if (ret < 0) { - throw std::runtime_error("Cannot initialize DPDK"); - } - - // Allocates mempool to hold the mbufs - constexpr uint32_t n = 8192 - 1; - constexpr uint32_t cache_size = 256; - constexpr uint32_t data_room_size = RTE_PKTMBUF_HEADROOM + 10 * 1024; - - mbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", n, cache_size, 0, data_room_size, rte_socket_id()); - if (mbuf_pool == nullptr) { - throw std::runtime_error(fmt::format("Cannot create mbuf pool, n={}, cache_size={}, priv_size=0, data_room_size={}", - n, cache_size, data_room_size)); - } + constexpr uint32_t mtu = 9000; // Initializing all ports if (!rte_eth_dev_is_valid_port(port_id)) { @@ -111,7 +90,7 @@ DPDKStream::Impl::Impl(uint16_t port_id) rte_eth_rxconf rxconf = dev_info.default_rxconf; // rxconf.offloads = port_conf.rxmode.offloads; for (auto q = 0; q < rx_rings; q++) { - ret = rte_eth_rx_queue_setup(port_id, q, rx_desc_size, rte_eth_dev_socket_id(port_id), &rxconf, mbuf_pool); + ret = rte_eth_rx_queue_setup(port_id, q, rx_desc_size, rte_eth_dev_socket_id(port_id), &rxconf, rt->get_mempool()); if (ret < 0) { throw std::runtime_error(fmt::format("Failed to setup Rx queue: {}", strerror(-ret))); } @@ -199,7 +178,6 @@ DPDKStream::Impl::~Impl() // std::cout << "Bandwidth : " << total_gbits/elapsed << " Gbps" << std::endl; } - rte_eal_cleanup(); } void DPDKStream::put(rte_mbuf* v) @@ -220,7 +198,7 @@ bool DPDKStream::get(rte_mbuf** vp) bool DPDKStream::Impl::send_flag_packet(rte_mbuf* recv_mbuf, size_t length, uint8_t tcp_flags) { - auto ack_mbuf = rte_pktmbuf_copy(recv_mbuf, mbuf_pool, 0, sizeof(rte_ipv4_hdr) + sizeof(rte_tcp_hdr) + sizeof(rte_ether_hdr)); + auto ack_mbuf = rte_pktmbuf_copy(recv_mbuf, rt->get_mempool(), 0, sizeof(rte_ipv4_hdr) + sizeof(rte_tcp_hdr) + sizeof(rte_ether_hdr)); auto* eth = rte_pktmbuf_mtod_offset(ack_mbuf, rte_ether_hdr*, 0); diff --git a/src/system.cc b/src/system.cc index 9056da0..80037fc 100644 --- a/src/system.cc +++ b/src/system.cc @@ -1,24 +1,13 @@ #include "lng/system.h" -#if defined(LNG_WITH_DOCA) -#include "lng/doca-util.h" -#endif - -#if defined(LNG_WITH_DPDK) -#include "dpdk-runtime.h" -#endif - +#include "lng/runtime.h" #include "log.h" namespace lng { struct System::Impl { - + std::unordered_map> runtimes; std::unordered_map> actors; - -#if defined(LNG_WITH_DPDK) - DPDKRuntime dpdk_rt; -#endif }; System::System() @@ -27,12 +16,21 @@ System::System() log::debug("System is initialized"); #if defined(LNG_WITH_DOCA) - doca_log_backend_create_standard(); + impl_->runtimes[Runtime::DOCA] = std::make_shared(); #endif + +#if defined(LNG_WITH_DPDK) + impl_->runtimes[Runtime::DPDK] = std::make_shared(); +#endif + } void System::start() { + for (auto& [_, rt] : impl_->runtimes) { + rt->start(); + } + // TODO: Considering tree dependency for (auto& [id, actor] : impl_->actors) { actor->start(); @@ -55,6 +53,10 @@ void System::stop() for (auto& [n, actor] : impl_->actors) { actor->wait_until(Actor::State::Ready); } + + for (auto& [_, rt] : impl_->runtimes) { + rt->stop(); + } } void System::terminate() @@ -74,4 +76,8 @@ void System::register_actor(const std::string& id, const std::shared_ptr& impl_->actors[id] = actor; } +std::shared_ptr System::select_runtime(Runtime::Type type) { + return impl_->runtimes[type]; +} + } // lng From c204ea584c74138aa57f8cd5afcc8587e0a24210 Mon Sep 17 00:00:00 2001 From: Takuro Iizuka Date: Sat, 6 Apr 2024 01:39:22 +0000 Subject: [PATCH 03/12] Separate Stream and Queueable interface --- include/lng/receiver-actor.h | 28 +++++++++---------- include/lng/stream.h | 23 ++++++++++++---- include/lng/system.h | 20 +++++++++++--- src/runtime.cc | 4 ++- src/stream.cc | 49 ++++++++++++++++++---------------- src/system.cc | 38 +++++++++++++++++++------- test/basic.cc | 14 +++++----- test/dpdk_echo_udp.cc | 24 +++++++---------- test/dpdk_frame_builder_tcp.cc | 40 ++++++++++++++------------- 9 files changed, 145 insertions(+), 95 deletions(-) diff --git a/include/lng/receiver-actor.h b/include/lng/receiver-actor.h index 58ab102..f2e14f3 100644 --- a/include/lng/receiver-actor.h +++ b/include/lng/receiver-actor.h @@ -9,9 +9,9 @@ class Receiver : public Actor { public: Receiver(const std::string& id, int cpu_id, - DPDKStream* dpdk_st, - Stream* valid, - Stream* ready) + const std::shared_ptr& dpdk_st, + const std::shared_ptr>& valid, + const std::shared_ptr>& ready) : Actor(id, cpu_id) , nic_stream_(dpdk_st) , vaild_payload_stream_(valid) @@ -23,19 +23,19 @@ class Receiver : public Actor { virtual void main() override; private: - DPDKStream* nic_stream_; - Stream* vaild_payload_stream_; - Stream* ready_payload_stream_; + std::shared_ptr nic_stream_; + std::shared_ptr> vaild_payload_stream_; + std::shared_ptr> ready_payload_stream_; }; class FrameBuilder : public Actor { public: FrameBuilder(const std::string& id, int cpu_id, - Stream* valid_payload, - Stream* ready_payload, - Stream* valid_frame, - Stream* ready_frame) + const std::shared_ptr>& valid_payload, + const std::shared_ptr>& ready_payload, + const std::shared_ptr>& valid_frame, + const std::shared_ptr>& ready_frame) : Actor(id, cpu_id) , vaild_payload_stream_(valid_payload) , ready_payload_stream_(ready_payload) @@ -51,10 +51,10 @@ class FrameBuilder : public Actor { virtual void main() override; private: - Stream* vaild_payload_stream_; - Stream* ready_payload_stream_; - Stream* vaild_frame_stream_; - Stream* ready_frame_stream_; + std::shared_ptr> vaild_payload_stream_; + std::shared_ptr> ready_payload_stream_; + std::shared_ptr> vaild_frame_stream_; + std::shared_ptr> ready_frame_stream_; size_t frame_id_; size_t write_head_; Frame* next_frame_; diff --git a/include/lng/stream.h b/include/lng/stream.h index ae0f22d..7072ce1 100644 --- a/include/lng/stream.h +++ b/include/lng/stream.h @@ -13,15 +13,21 @@ namespace lng { class DPDKRuntime; -template class Stream { public: + virtual void start() = 0; + virtual void stop() = 0; +}; + +template +class Queueable { + public: virtual void put(T v) = 0; virtual bool get(T* vp) = 0; }; template -class MemoryStream : public Stream { +class MemoryStream : public Stream, public Queueable { struct Impl { #ifdef BLOCKINGQUEUE @@ -37,6 +43,9 @@ class MemoryStream : public Stream { { } + virtual void start() {} + virtual void stop() {} + virtual void put(T v) { impl_->queue.enqueue(v); @@ -60,7 +69,7 @@ class MemoryStream : public Stream { #if defined(LNG_WITH_DOCA) || defined(LNG_WITH_DPDK) -class DPDKStream : public Stream { +class DPDKStream : public Stream, public Queueable { struct Impl { std::shared_ptr rt; @@ -71,8 +80,9 @@ class DPDKStream : public Stream { void wait_for_3wayhandshake(); bool check_target_packet(rte_mbuf* recv_mbuf); - Impl(const std::shared_ptr& rt, uint16_t port_id); - ~Impl(); + Impl(const std::shared_ptr& rt, uint16_t port_id) + : rt(rt), port_id(port_id) + {} private: bool send_flag_packet(rte_mbuf* tar, size_t length, uint8_t tcp_flags); @@ -84,6 +94,9 @@ class DPDKStream : public Stream { { } + virtual void start(); + virtual void stop(); + virtual void put(rte_mbuf* v); virtual bool get(rte_mbuf** vp); diff --git a/include/lng/system.h b/include/lng/system.h index ae368c8..b4ffb1a 100644 --- a/include/lng/system.h +++ b/include/lng/system.h @@ -30,11 +30,23 @@ class System { } template::value>::type* = nullptr, - typename std::enable_if::value>::type* = nullptr> + typename std::enable_if::value>::type* = nullptr > std::shared_ptr create_stream(Args... args) { - return std::make_shared(select_runtime(Runtime::DPDK), args...); + auto stream(std::make_shared(args...)); + register_stream(stream); + return stream; + } + + template::value>::type* = nullptr > + std::shared_ptr create_stream(Args... args) + { + register_runtime(Runtime::DPDK); + + auto stream(std::make_shared(std::dynamic_pointer_cast(select_runtime(Runtime::DPDK)), args...)); + register_stream(stream); + return stream; } void start(); @@ -46,6 +58,8 @@ class System { private: void register_actor(const std::string& id, const std::shared_ptr& actor); + void register_stream(const std::shared_ptr& stream); + void register_runtime(Runtime::Type type); std::shared_ptr select_runtime(Runtime::Type type); diff --git a/src/runtime.cc b/src/runtime.cc index 014e252..107d4f4 100644 --- a/src/runtime.cc +++ b/src/runtime.cc @@ -41,12 +41,14 @@ void DPDKRuntime::start() { constexpr uint32_t cache_size = 256; constexpr uint32_t data_room_size = RTE_PKTMBUF_HEADROOM + 10 * 1024; + log::info("Trying to create mempool"); + mbuf_pool_ = rte_pktmbuf_pool_create("mbuf_pool", n, cache_size, 0, data_room_size, rte_socket_id()); if (mbuf_pool_ == nullptr) { + log::error("Failed to create mempool"); throw std::runtime_error(fmt::format("Cannot create mbuf pool, n={}, cache_size={}, priv_size=0, data_room_size={}", n, cache_size, data_room_size)); } - } void DPDKRuntime::stop() { diff --git a/src/stream.cc b/src/stream.cc index 7755a2b..e87d723 100644 --- a/src/stream.cc +++ b/src/stream.cc @@ -12,13 +12,33 @@ namespace lng { -DPDKStream::Impl::Impl(const std::shared_ptr& rt, uint16_t port_id) - : rt(rt), port_id(port_id) +void DPDKStream::Impl::wait_for_3wayhandshake() +{ + while (true) { + rte_mbuf* v; + if (!rte_eth_rx_burst(port_id, 0, &v, 1)) { + continue; + } + auto* tcp = rte_pktmbuf_mtod_offset(v, rte_tcp_hdr*, sizeof(rte_ipv4_hdr) + sizeof(rte_ether_hdr)); + if (!(tcp->tcp_flags & RTE_TCP_SYN_FLAG)) + continue; + send_synack(v); + + tcp_port = tcp->dst_port; + + rte_pktmbuf_free(v); + break; + } +} + +void DPDKStream::start() { int ret = 0; constexpr uint32_t mtu = 9000; + auto port_id = impl_->port_id; + // Initializing all ports if (!rte_eth_dev_is_valid_port(port_id)) { throw std::runtime_error(fmt::format("Port {} is not valid", port_id)); @@ -90,7 +110,7 @@ DPDKStream::Impl::Impl(const std::shared_ptr& rt, uint16_t port_id) rte_eth_rxconf rxconf = dev_info.default_rxconf; // rxconf.offloads = port_conf.rxmode.offloads; for (auto q = 0; q < rx_rings; q++) { - ret = rte_eth_rx_queue_setup(port_id, q, rx_desc_size, rte_eth_dev_socket_id(port_id), &rxconf, rt->get_mempool()); + ret = rte_eth_rx_queue_setup(port_id, q, rx_desc_size, rte_eth_dev_socket_id(port_id), &rxconf, impl_->rt->get_mempool()); if (ret < 0) { throw std::runtime_error(fmt::format("Failed to setup Rx queue: {}", strerror(-ret))); } @@ -135,30 +155,13 @@ DPDKStream::Impl::Impl(const std::shared_ptr& rt, uint16_t port_id) log::info("Link status is {}", link.link_status ? "up" : "down"); - wait_for_3wayhandshake(); + impl_->wait_for_3wayhandshake(); } -void DPDKStream::Impl::wait_for_3wayhandshake() +void DPDKStream::stop() { - while (true) { - rte_mbuf* v; - if (!rte_eth_rx_burst(port_id, 0, &v, 1)) { - continue; - } - auto* tcp = rte_pktmbuf_mtod_offset(v, rte_tcp_hdr*, sizeof(rte_ipv4_hdr) + sizeof(rte_ether_hdr)); - if (!(tcp->tcp_flags & RTE_TCP_SYN_FLAG)) - continue; - send_synack(v); - - tcp_port = tcp->dst_port; + auto port_id = impl_->port_id; - rte_pktmbuf_free(v); - break; - } -} - -DPDKStream::Impl::~Impl() -{ auto ret = rte_eth_dev_stop(port_id); if (ret < 0) { log::error("Failed to stop device: {}", strerror(-ret)); diff --git a/src/system.cc b/src/system.cc index 80037fc..5b1015c 100644 --- a/src/system.cc +++ b/src/system.cc @@ -7,6 +7,7 @@ namespace lng { struct System::Impl { std::unordered_map> runtimes; + std::vector> streams; std::unordered_map> actors; }; @@ -14,15 +15,6 @@ System::System() : impl_(new Impl) { log::debug("System is initialized"); - -#if defined(LNG_WITH_DOCA) - impl_->runtimes[Runtime::DOCA] = std::make_shared(); -#endif - -#if defined(LNG_WITH_DPDK) - impl_->runtimes[Runtime::DPDK] = std::make_shared(); -#endif - } void System::start() @@ -31,6 +23,10 @@ void System::start() rt->start(); } + for (auto& st : impl_->streams) { + st->start(); + } + // TODO: Considering tree dependency for (auto& [id, actor] : impl_->actors) { actor->start(); @@ -54,6 +50,10 @@ void System::stop() actor->wait_until(Actor::State::Ready); } + for (auto& st : impl_->streams) { + st->stop(); + } + for (auto& [_, rt] : impl_->runtimes) { rt->stop(); } @@ -76,6 +76,26 @@ void System::register_actor(const std::string& id, const std::shared_ptr& impl_->actors[id] = actor; } +void System::register_stream(const std::shared_ptr& stream) { + impl_->streams.push_back(stream); +} + +void System::register_runtime(Runtime::Type type) { +#if defined(LNG_WITH_DOCA) + if (type == Runtime::DOCA && !impl_->runtimes.count(type)) { + impl_->runtimes[type] = std::make_shared(); + return; + } +#endif + +#if defined(LNG_WITH_DPDK) + if (type == Runtime::DPDK && !impl_->runtimes.count(type)) { + impl_->runtimes[type] = std::make_shared(); + return; + } +#endif +} + std::shared_ptr System::select_runtime(Runtime::Type type) { return impl_->runtimes[type]; } diff --git a/test/basic.cc b/test/basic.cc index 77182ef..45ec83f 100644 --- a/test/basic.cc +++ b/test/basic.cc @@ -11,7 +11,7 @@ using namespace lng; template class Producer : public Actor { public: - Producer(const std::string& id, int cpu_id, Stream* s) + Producer(const std::string& id, int cpu_id, const std::shared_ptr>& s) : Actor(id, cpu_id) , stream_(s) , v_(0) @@ -25,14 +25,14 @@ class Producer : public Actor { } private: - Stream* stream_; + std::shared_ptr> stream_; int v_; }; template class Consumer : public Actor { public: - Consumer(const std::string& id, int cpu_id, Stream* s) + Consumer(const std::string& id, int cpu_id, const std::shared_ptr> s) : Actor(id, cpu_id) , stream_(s) { @@ -48,7 +48,7 @@ class Consumer : public Actor { } private: - Stream* stream_; + std::shared_ptr> stream_; }; int main() @@ -56,10 +56,10 @@ int main() try { System sys; - MemoryStream stream; + auto stream(sys.create_stream>()); - auto consumer(sys.create_actor>("/consumer", 4, reinterpret_cast*>(&stream))); - auto producer(sys.create_actor>("/producer", 5, reinterpret_cast*>(&stream))); + auto consumer(sys.create_actor>("/consumer", 4, stream)); + auto producer(sys.create_actor>("/producer", 5, stream)); sys.start(); diff --git a/test/dpdk_echo_udp.cc b/test/dpdk_echo_udp.cc index 6f01b23..97ee892 100644 --- a/test/dpdk_echo_udp.cc +++ b/test/dpdk_echo_udp.cc @@ -20,7 +20,7 @@ void handler_sigint(int sig) class Receiver : public Actor { public: - Receiver(const std::string& id, int cpu_id, Stream* is, Stream* os) + Receiver(const std::string& id, int cpu_id, const std::shared_ptr>& is, const std::shared_ptr>& os) : Actor(id, cpu_id) , inner_stream_(is) , outer_stream_(os) @@ -38,13 +38,13 @@ class Receiver : public Actor { } private: - Stream* inner_stream_; - Stream* outer_stream_; + std::shared_ptr> inner_stream_; + std::shared_ptr> outer_stream_; }; class Sender : public Actor { public: - Sender(const std::string& id, int cpu_id, Stream* is, Stream* os) + Sender(const std::string& id, int cpu_id, const std::shared_ptr>& is, const std::shared_ptr>& os) : Actor(id, cpu_id) , inner_stream_(is) , outer_stream_(os) @@ -62,8 +62,8 @@ class Sender : public Actor { } private: - Stream* inner_stream_; - Stream* outer_stream_; + std::shared_ptr> inner_stream_; + std::shared_ptr> outer_stream_; }; // +----------+ +--------+ @@ -76,15 +76,11 @@ int main() System sys; - DPDKStream outer_stream(2); - MemoryStream inner_stream; + auto outer_stream(sys.create_stream(2)); + auto inner_stream(sys.create_stream>()); - auto receiver(sys.create_actor("/receiver", 4, - reinterpret_cast*>(&inner_stream), - reinterpret_cast*>(&outer_stream))); - auto sender(sys.create_actor("/sender", 5, - reinterpret_cast*>(&inner_stream), - reinterpret_cast*>(&outer_stream))); + auto receiver(sys.create_actor("/receiver", 4, inner_stream, outer_stream)); + auto sender(sys.create_actor("/sender", 5, inner_stream, outer_stream)); sys.start(); diff --git a/test/dpdk_frame_builder_tcp.cc b/test/dpdk_frame_builder_tcp.cc index 3105721..c636235 100644 --- a/test/dpdk_frame_builder_tcp.cc +++ b/test/dpdk_frame_builder_tcp.cc @@ -22,7 +22,9 @@ void handler_sigint(int sig) class FrameReceiver : public Actor { public: - FrameReceiver(const std::string& id, int cpu_id, Stream* valid_frame, Stream* ready_frame) + FrameReceiver(const std::string& id, int cpu_id, + const std::shared_ptr>& valid_frame, + const std::shared_ptr>& ready_frame) : Actor(id, cpu_id) , valid_stream_(valid_frame) , ready_stream_(ready_frame) @@ -40,8 +42,8 @@ class FrameReceiver : public Actor { } private: - Stream* valid_stream_; - Stream* ready_stream_; + std::shared_ptr> valid_stream_; + std::shared_ptr> ready_stream_; }; // +----------+ +--------+ @@ -54,37 +56,37 @@ int main() System sys; - DPDKStream dpdk_stream(1); - MemoryStream valid_frame_stream; - MemoryStream ready_frame_stream; - MemoryStream valid_payload_stream; - MemoryStream ready_payload_stream; + auto dpdk_stream = sys.create_stream(1); + auto valid_frame_stream = sys.create_stream>(); + auto ready_frame_stream = sys.create_stream>(); + auto valid_payload_stream = sys.create_stream>(); + auto ready_payload_stream = sys.create_stream>(); const int num_pays = 1024; std::unique_ptr pays[num_pays]; for (int i = 0; i < num_pays; ++i) { pays[i].reset(new Payloads); - ready_payload_stream.put(pays[i].get()); + ready_payload_stream->put(pays[i].get()); } const int num_frames = 8; std::unique_ptr frames[num_frames]; for (int i = 0; i < num_frames; ++i) { frames[i].reset(new Frame); - ready_frame_stream.put(frames[i].get()); + ready_frame_stream->put(frames[i].get()); } auto receiver(sys.create_actor("/frame/build/eth", 4, - &dpdk_stream, - &valid_payload_stream, - &ready_payload_stream)); + dpdk_stream, + valid_payload_stream, + ready_payload_stream)); auto frame_builder(sys.create_actor("/frame/build", 5, - &valid_payload_stream, - &ready_payload_stream, - &valid_frame_stream, - &ready_frame_stream)); + valid_payload_stream, + ready_payload_stream, + valid_frame_stream, + ready_frame_stream)); auto frame_receiver(sys.create_actor("/frame", 6, - &valid_frame_stream, - &ready_frame_stream)); + valid_frame_stream, + ready_frame_stream)); sys.start(); From e7b0b34ed229da20f6440f5d1dbd42e6a416b5ea Mon Sep 17 00:00:00 2001 From: Takuro Iizuka Date: Mon, 8 Apr 2024 21:29:26 +0000 Subject: [PATCH 04/12] Introduced setup and teardown which is called from actor thread when it transit from/to Running --- include/lng/actor.h | 2 ++ include/lng/receiver-actor.h | 1 + include/lng/stream.h | 5 +++++ src/actor.cc | 8 ++++++++ src/receiver-actor.cc | 13 +++++++++++-- src/stream.cc | 2 -- 6 files changed, 27 insertions(+), 4 deletions(-) diff --git a/include/lng/actor.h b/include/lng/actor.h index e4ec55a..bd0ec36 100644 --- a/include/lng/actor.h +++ b/include/lng/actor.h @@ -69,6 +69,8 @@ class Actor { void wait_until(State to); protected: + virtual void setup() {} + virtual void teardown() {} virtual void main() = 0; private: diff --git a/include/lng/receiver-actor.h b/include/lng/receiver-actor.h index f2e14f3..16b7204 100644 --- a/include/lng/receiver-actor.h +++ b/include/lng/receiver-actor.h @@ -20,6 +20,7 @@ class Receiver : public Actor { } protected: + virtual void setup() override; virtual void main() override; private: diff --git a/include/lng/stream.h b/include/lng/stream.h index f55b850..5965b0c 100644 --- a/include/lng/stream.h +++ b/include/lng/stream.h @@ -107,11 +107,16 @@ class DPDKStream : public Stream, public Queueable { { return impl_->send_ack(recv_mbuf, length); } + bool check_target_packet(rte_mbuf* recv_mbuf) { return impl_->check_target_packet(recv_mbuf); } + void wait_for_3wayhandshake() { + impl_->wait_for_3wayhandshake(); + } + private: std::shared_ptr impl_; }; diff --git a/src/actor.cc b/src/actor.cc index e45f960..91bbd54 100644 --- a/src/actor.cc +++ b/src/actor.cc @@ -87,12 +87,20 @@ void Actor::entry_point(Actor* obj) obj->impl_->cvar.notify_all(); } + if (from == State::Started && to == State::Running) { + obj->setup(); + } + if (to == State::Running) { obj->main(); } else if (to == State::Fin) { return; } + if (from == State::Stopped && to == State::Ready) { + obj->teardown(); + } + } catch (const std::exception& e) { log::error("Exception on {} : {}", obj->impl_->id, e.what()); } catch (...) { diff --git a/src/receiver-actor.cc b/src/receiver-actor.cc index 54f2ee7..05d7b42 100644 --- a/src/receiver-actor.cc +++ b/src/receiver-actor.cc @@ -1,10 +1,12 @@ -#include "lng/receiver-actor.h" - #include #include #include #include +#include "lng/receiver-actor.h" + +#include "log.h" + namespace lng { void Payloads::Clear() @@ -44,6 +46,13 @@ uint32_t Payloads::ExtractPayloads(rte_mbuf* mbuf) return payload_size; } +void Receiver::setup() +{ + log::debug("Receiver is waiting for 3-way handshake"); + nic_stream_->wait_for_3wayhandshake(); + log::debug("Receiver is awaked from 3-way handshake"); +} + void Receiver::main() { Payloads* pays; diff --git a/src/stream.cc b/src/stream.cc index a0d8f80..8e5ba0e 100644 --- a/src/stream.cc +++ b/src/stream.cc @@ -154,8 +154,6 @@ void DPDKStream::start() } log::info("Link status is {}", link.link_status ? "up" : "down"); - - impl_->wait_for_3wayhandshake(); } void DPDKStream::stop() From 4b3f2175ad2574a78ae409a04a23a46b60edd187 Mon Sep 17 00:00:00 2001 From: Takuro Iizuka Date: Mon, 8 Apr 2024 21:40:48 +0000 Subject: [PATCH 05/12] Fixed build error on non-DOCA environment --- CMakeLists.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 25742a5..8034087 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -126,7 +126,9 @@ if(LNG_WITH_DOCA OR LNG_WITH_DPDK) add_executable(dpdk_frame_builder_tcp test/dpdk_frame_builder_tcp.cc) target_link_libraries(dpdk_frame_builder_tcp lng-core ${LIBDPDK_STATIC_LDFLAGS}) +endif() +if(LNG_WITH_DOCA) add_executable(test_doca_echo_udp test/doca_echo_udp.cc) target_link_libraries(test_doca_echo_udp lng-core ${LIBDPDK_STATIC_LDFLAGS}) From 11b9d5ee2b18777beb0f06bbe1d8e4348255b9d0 Mon Sep 17 00:00:00 2001 From: Takuro Iizuka Date: Tue, 9 Apr 2024 00:31:37 +0000 Subject: [PATCH 06/12] WIP: Reimplementing actor to be safe to transit state --- include/lng/receiver-actor.h | 3 +++ src/receiver-actor.cc | 36 ++++++++++++++++++------------------ 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/include/lng/receiver-actor.h b/include/lng/receiver-actor.h index 16b7204..eb1da18 100644 --- a/include/lng/receiver-actor.h +++ b/include/lng/receiver-actor.h @@ -16,6 +16,7 @@ class Receiver : public Actor { , nic_stream_(dpdk_st) , vaild_payload_stream_(valid) , ready_payload_stream_(ready) + , payloads_(nullptr) { } @@ -27,6 +28,8 @@ class Receiver : public Actor { std::shared_ptr nic_stream_; std::shared_ptr> vaild_payload_stream_; std::shared_ptr> ready_payload_stream_; + + Payloads *payloads_; }; class FrameBuilder : public Actor { diff --git a/src/receiver-actor.cc b/src/receiver-actor.cc index 05d7b42..54a69ff 100644 --- a/src/receiver-actor.cc +++ b/src/receiver-actor.cc @@ -55,31 +55,30 @@ void Receiver::setup() void Receiver::main() { - Payloads* pays; - if (!ready_payload_stream_->get(&pays, 1)) { + if (!payloads_) { + if (!ready_payload_stream_->get(&payloads_, 1)) { + return; + } + payloads_->Clear(); + } + + rte_mbuf* v; + if (!nic_stream_->get(&v, 1)) { return; - } else { - pays->Clear(); } - while (true) { - rte_mbuf* v; - if (nic_stream_->get(&v, 1)) { - // std::cout << "received " << v->pkt_len << " bytes" << std::endl; - if (!nic_stream_->check_target_packet(v)) { - continue; - } + if (!nic_stream_->check_target_packet(v)) { + return; + } - // TODO detect FIN and quit + // TODO detect FIN and quit + auto len = payloads_->ExtractPayloads(v); - auto len = pays->ExtractPayloads(v); + nic_stream_->send_ack(v, len); - nic_stream_->send_ack(v, len); + vaild_payload_stream_->put(&payloads_, 1); - vaild_payload_stream_->put(&pays, 1); - break; - } - } + payloads_ = nullptr; } #ifdef __AVX512F__ @@ -155,6 +154,7 @@ void FrameBuilder::main() bool complete = false; + // WIP/TODO: Make this loop transit-safe while (!complete) { Payloads* pays; if (vaild_payload_stream_->get(&pays, 1)) { From 01afff0965ab4f411fbbcea5ada7b179c92231c5 Mon Sep 17 00:00:00 2001 From: Takuro Iizuka Date: Tue, 9 Apr 2024 07:41:51 +0000 Subject: [PATCH 07/12] Fixed thread ordering --- include/lng/actor.h | 2 +- src/actor.cc | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/include/lng/actor.h b/include/lng/actor.h index bd0ec36..068393d 100644 --- a/include/lng/actor.h +++ b/include/lng/actor.h @@ -43,7 +43,7 @@ class Actor { int cpu_id; Impl(Actor* obj, const std::string& id, int cpu) - : th(entry_point, obj) + : th() , mutex() , cvar() , id(id) diff --git a/src/actor.cc b/src/actor.cc index 91bbd54..d5001c2 100644 --- a/src/actor.cc +++ b/src/actor.cc @@ -23,6 +23,7 @@ namespace { Actor::Actor(const std::string& id, int cpu_id) : impl_(new Impl(this, id, cpu_id)) { + impl_->th = std::move(std::thread(entry_point, this)); log::debug("{} is initialized", impl_->id); } From 3fde7df2bef6265082266765cbdcee8cd0dc0490 Mon Sep 17 00:00:00 2001 From: Takuro Iizuka Date: Tue, 9 Apr 2024 20:31:25 +0000 Subject: [PATCH 08/12] Defferred thread initialization --- src/actor.cc | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/actor.cc b/src/actor.cc index d5001c2..426de6b 100644 --- a/src/actor.cc +++ b/src/actor.cc @@ -23,12 +23,16 @@ namespace { Actor::Actor(const std::string& id, int cpu_id) : impl_(new Impl(this, id, cpu_id)) { - impl_->th = std::move(std::thread(entry_point, this)); log::debug("{} is initialized", impl_->id); } void Actor::start() { + if (impl_->state == State::Init) { + impl_->th = std::move(std::thread(entry_point, this)); + wait_until(State::Ready); + } + transit(State::Ready, State::Started); } @@ -60,6 +64,9 @@ static inline void set_affinity(int cpu_id) void Actor::entry_point(Actor* obj) { set_affinity(obj->impl_->cpu_id); + + obj->transit(State::Init, State::Ready); + while (true) { try { @@ -67,11 +74,9 @@ void Actor::entry_point(Actor* obj) State to; { std::unique_lock lock(obj->impl_->mutex); - obj->impl_->cvar.wait(lock, [&] { return obj->impl_->state == State::Init | obj->impl_->state == State::Running | obj->impl_->state == State::Started | obj->impl_->state == State::Stopped | obj->impl_->state == State::Terminated; }); + obj->impl_->cvar.wait(lock, [&] { return obj->impl_->state == State::Running | obj->impl_->state == State::Started | obj->impl_->state == State::Stopped | obj->impl_->state == State::Terminated; }); from = obj->impl_->state; - if (from == State::Init) { - to = State::Ready; - } else if (from == State::Running) { + if (from == State::Running) { to = from; } else if (from == State::Started) { to = State::Running; From 292bf96e352764fc112be1352f4f3986deecea9e Mon Sep 17 00:00:00 2001 From: Takuro Iizuka Date: Tue, 9 Apr 2024 22:00:57 +0000 Subject: [PATCH 09/12] Testing --- include/lng/stream.h | 12 ++++++++++++ src/doca-stream.cc | 12 ++++++++++++ src/receiver-actor.cc | 43 +++++++++++++++++++++++++++++++++++++++++-- src/stream.cc | 5 +++++ 4 files changed, 70 insertions(+), 2 deletions(-) diff --git a/include/lng/stream.h b/include/lng/stream.h index 5965b0c..8fcb116 100644 --- a/include/lng/stream.h +++ b/include/lng/stream.h @@ -33,6 +33,7 @@ class Queueable { public: virtual bool put(T* v, size_t count) = 0; virtual size_t get(T* vp, size_t max) = 0; + virtual size_t count() = 0; }; template @@ -65,6 +66,11 @@ class MemoryStream : public Stream, public Queueable { return impl_->queue.try_dequeue_bulk(vp, max); } + virtual size_t count() + { + return impl_->queue.size_approx(); + } + private: std::shared_ptr impl_; }; @@ -103,6 +109,8 @@ class DPDKStream : public Stream, public Queueable { virtual size_t get(rte_mbuf** vp, size_t max); + virtual size_t count(); + bool send_ack(rte_mbuf* recv_mbuf, uint32_t length) { return impl_->send_ack(recv_mbuf, length); @@ -168,6 +176,8 @@ class DOCAUDPStream : public Stream, public Queueable { { return impl_->get(vp, max); } + + virtual size_t count(); private: std::shared_ptr impl_; @@ -224,6 +234,8 @@ class DOCATCPStream : public Stream, public Queueable { return impl_->get(vp, max); } + virtual size_t count(); + private: std::shared_ptr impl_; }; diff --git a/src/doca-stream.cc b/src/doca-stream.cc index d6cafde..12b02c1 100644 --- a/src/doca-stream.cc +++ b/src/doca-stream.cc @@ -148,6 +148,12 @@ size_t DOCAUDPStream::Impl::get(uint8_t** vp, size_t max) return ret; } +size_t DOCAUDPStream::count() +{ + // TBD + return 0; +} + DOCATCPStream::Impl::Impl(std::string nic_addr, std::string gpu_addr) : sem_fr_idx(0) { @@ -258,6 +264,12 @@ size_t DOCATCPStream::Impl::get(uint8_t** vp, size_t max) return ret; } +size_t DOCATCPStream::count() +{ + // TBD + return 0; +} + } // lng #endif diff --git a/src/receiver-actor.cc b/src/receiver-actor.cc index 54a69ff..6fba37b 100644 --- a/src/receiver-actor.cc +++ b/src/receiver-actor.cc @@ -139,7 +139,7 @@ void lng_memcpy(uint8_t* dst, uint8_t* src, size_t size) void FrameBuilder::main() { - +#if 0 if (!next_frame_) { if (!ready_frame_stream_->get(&next_frame_, 1)) { return; @@ -182,5 +182,44 @@ void FrameBuilder::main() } frame->frame_id = this->frame_id_++; vaild_frame_stream_->put(&frame, 1); +#else + if (!next_frame_) { + if (!ready_frame_stream_->get(&next_frame_, 1)) { + return; + } + } + + Frame* frame = next_frame_; + + // WIP/TODO: Make this loop transit-safe + Payloads* pays; + if (!vaild_payload_stream_->get(&pays, 1)) { + return; + } + + for (int seg = 0; seg < pays->no_of_payload; seg++) { + auto len = pays->segments[seg].length; + if (write_head_ + len < Frame::frame_size) { + lng_memcpy(frame->body + write_head_, pays->segments[seg].payload, len); + write_head_ += len; + } else if (write_head_ < Frame::frame_size) { + size_t bytes_cur_frame = Frame::frame_size - write_head_; + size_t bytes_next_frame = len - bytes_cur_frame; + uint8_t* p = pays->segments[seg].payload; + lng_memcpy(frame->body + write_head_, p, bytes_cur_frame); + lng_memcpy(next_frame_->body, p + bytes_cur_frame, bytes_next_frame); + write_head_ = bytes_next_frame; + + frame->frame_id = this->frame_id_++; + vaild_frame_stream_->put(&frame, 1); + } else { + lng_memcpy(next_frame_->body + write_head_, pays->segments[seg].payload, len); + write_head_ += len; + } + } + + ready_payload_stream_->put(&pays, 1); +#endif } -} + +} // lng diff --git a/src/stream.cc b/src/stream.cc index 8e5ba0e..84e9735 100644 --- a/src/stream.cc +++ b/src/stream.cc @@ -195,6 +195,11 @@ size_t DPDKStream::get(rte_mbuf** vp, size_t max) return rte_eth_rx_burst(impl_->port_id, 0, vp, max); } +size_t DPDKStream::count() +{ + return rte_eth_rx_queue_count(impl_->port_id, 0); +} + bool DPDKStream::Impl::send_flag_packet(rte_mbuf* recv_mbuf, uint32_t length, uint8_t tcp_flags) { From e2f478c27aaf6fa985a0b28447be9a2cb75ca25c Mon Sep 17 00:00:00 2001 From: Takuro Iizuka Date: Wed, 10 Apr 2024 23:36:07 +0000 Subject: [PATCH 10/12] WIP: Implementing transit-safe actor --- .gitignore | 1 + include/lng/receiver-actor.h | 46 +++++++----- include/lng/stream.h | 29 ++++++-- src/receiver-actor.cc | 131 ++++++++++++++++++++++----------- test/dpdk_frame_builder_tcp.cc | 8 +- 5 files changed, 143 insertions(+), 72 deletions(-) diff --git a/.gitignore b/.gitignore index 74b19dc..f20b56f 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ GPATH GTAGS GRTAGS build* +.vscode *~ diff --git a/include/lng/receiver-actor.h b/include/lng/receiver-actor.h index eb1da18..2029646 100644 --- a/include/lng/receiver-actor.h +++ b/include/lng/receiver-actor.h @@ -10,13 +10,13 @@ class Receiver : public Actor { Receiver(const std::string& id, int cpu_id, const std::shared_ptr& dpdk_st, - const std::shared_ptr>& valid, - const std::shared_ptr>& ready) + const std::shared_ptr>& valid, + const std::shared_ptr>& ready) : Actor(id, cpu_id) , nic_stream_(dpdk_st) - , vaild_payload_stream_(valid) + , valid_payload_stream_(valid) , ready_payload_stream_(ready) - , payloads_(nullptr) + , payload_(nullptr) { } @@ -26,28 +26,31 @@ class Receiver : public Actor { private: std::shared_ptr nic_stream_; - std::shared_ptr> vaild_payload_stream_; - std::shared_ptr> ready_payload_stream_; + std::shared_ptr> valid_payload_stream_; + std::shared_ptr> ready_payload_stream_; - Payloads *payloads_; + Payload *payload_; }; class FrameBuilder : public Actor { public: FrameBuilder(const std::string& id, int cpu_id, - const std::shared_ptr>& valid_payload, - const std::shared_ptr>& ready_payload, + const std::shared_ptr>& valid_payload, + const std::shared_ptr>& ready_payload, const std::shared_ptr>& valid_frame, const std::shared_ptr>& ready_frame) : Actor(id, cpu_id) - , vaild_payload_stream_(valid_payload) + , valid_payload_stream_(valid_payload) , ready_payload_stream_(ready_payload) - , vaild_frame_stream_(valid_frame) + , valid_frame_stream_(valid_frame) , ready_frame_stream_(ready_frame) + , payload_(nullptr) + , payload_segment_id_(0) + , payload_segment_read_offset_(0) + , frame_(nullptr) , frame_id_(0) - , write_head_(0) - , next_frame_(nullptr) + , frame_write_offset_(0) { } @@ -55,12 +58,21 @@ class FrameBuilder : public Actor { virtual void main() override; private: - std::shared_ptr> vaild_payload_stream_; - std::shared_ptr> ready_payload_stream_; - std::shared_ptr> vaild_frame_stream_; + std::shared_ptr> valid_payload_stream_; + std::shared_ptr> ready_payload_stream_; + std::shared_ptr> valid_frame_stream_; std::shared_ptr> ready_frame_stream_; + #if 0 + Frame* next_frame_; size_t frame_id_; size_t write_head_; - Frame* next_frame_; +#else + Payload* payload_; + size_t payload_segment_id_; + size_t payload_segment_read_offset_; + Frame* frame_; + size_t frame_id_; + size_t frame_write_offset_; + #endif }; } diff --git a/include/lng/stream.h b/include/lng/stream.h index 8fcb116..a96674a 100644 --- a/include/lng/stream.h +++ b/include/lng/stream.h @@ -241,24 +241,39 @@ class DOCATCPStream : public Stream, public Queueable { }; #endif - +#if 0 struct Segment { uint8_t* payload; uint16_t length; }; -struct Payloads { - static constexpr size_t max_payloads = 10; +struct Payload { + static constexpr size_t max_Payload = 10; rte_mbuf* buf = nullptr; - uint8_t no_of_payload = 0; - Segment segments[max_payloads]; + uint8_t segments_num = 0; + Segment segments[max_Payload]; size_t dropped_bytes = 0; void Clear(); - uint32_t ExtractPayloads(rte_mbuf* mbuf); + uint32_t ExtractPayload(rte_mbuf* mbuf); +}; +#else +struct Segment { + uint8_t* addr; + uint16_t size; }; +struct Payload { + static constexpr size_t segments_max = 10; + rte_mbuf* buf = nullptr; + uint8_t segments_num = 0; + Segment segments[segments_max]; + size_t dropped_bytes = 0; + void Clear(); + uint32_t ExtractPayload(rte_mbuf* mbuf); +}; +#endif struct Frame { - static constexpr size_t frame_size = 64 * 1024 * 1024; + static constexpr size_t frame_size = 256;//64 * 1024 * 1024; size_t frame_id; uint8_t body[frame_size]; }; diff --git a/src/receiver-actor.cc b/src/receiver-actor.cc index 6fba37b..c6f71dc 100644 --- a/src/receiver-actor.cc +++ b/src/receiver-actor.cc @@ -9,16 +9,16 @@ namespace lng { -void Payloads::Clear() +void Payload::Clear() { if (this->buf) { rte_pktmbuf_free(this->buf); } this->buf = nullptr; - this->no_of_payload = 0; + this->segments_num = 0; } -uint32_t Payloads::ExtractPayloads(rte_mbuf* mbuf) +uint32_t Payload::ExtractPayload(rte_mbuf* mbuf) { this->buf = mbuf; @@ -26,15 +26,15 @@ uint32_t Payloads::ExtractPayloads(rte_mbuf* mbuf) int header_size = sizeof(rte_ether_hdr) + sizeof(rte_ipv4_hdr) + sizeof(rte_tcp_hdr); while (seg) { - if (this->no_of_payload >= max_payloads) { + if (this->segments_num >= segments_max) { throw std::runtime_error("# of payload overflow"); } uint16_t seg_len = rte_pktmbuf_data_len(seg) - header_size; if (seg_len > 0) { - this->segments[this->no_of_payload].payload = rte_pktmbuf_mtod_offset(seg, uint8_t*, header_size); - this->segments[this->no_of_payload].length = rte_pktmbuf_data_len(seg) - header_size; - this->no_of_payload++; + this->segments[this->segments_num].addr = rte_pktmbuf_mtod_offset(seg, uint8_t*, header_size); + this->segments[this->segments_num].size = rte_pktmbuf_data_len(seg) - header_size; + this->segments_num++; } seg = seg->next; @@ -54,12 +54,41 @@ void Receiver::setup() } void Receiver::main() +#if 0 +{ + Payload* pays; + if (!ready_payload_stream_->get(&pays, 1)) { + return; + } else { + pays->Clear(); + } + + while (true) { + rte_mbuf* v; + if (nic_stream_->get(&v, 1)) { + // std::cout << "received " << v->pkt_len << " bytes" << std::endl; + if (!nic_stream_->check_target_packet(v)) { + continue; + } + + // TODO detect FIN and quit + + auto len = pays->ExtractPayload(v); + + nic_stream_->send_ack(v, len); + + valid_payload_stream_->put(&pays, 1); + break; + } + } +} +#else { - if (!payloads_) { - if (!ready_payload_stream_->get(&payloads_, 1)) { + if (!payload_) { + if (!ready_payload_stream_->get(&payload_, 1)) { return; } - payloads_->Clear(); + payload_->Clear(); } rte_mbuf* v; @@ -72,14 +101,17 @@ void Receiver::main() } // TODO detect FIN and quit - auto len = payloads_->ExtractPayloads(v); + auto len = payload_->ExtractPayload(v); nic_stream_->send_ack(v, len); - vaild_payload_stream_->put(&payloads_, 1); + valid_payload_stream_->put(&payload_, 1); + log::trace("r1"); - payloads_ = nullptr; + payload_ = nullptr; } +#endif + #ifdef __AVX512F__ @@ -156,9 +188,9 @@ void FrameBuilder::main() // WIP/TODO: Make this loop transit-safe while (!complete) { - Payloads* pays; - if (vaild_payload_stream_->get(&pays, 1)) { - for (int seg = 0; seg < pays->no_of_payload; seg++) { + Payload* pays; + if (valid_payload_stream_->get(&pays, 1)) { + for (int seg = 0; seg < pays->segments_num; seg++) { auto len = pays->segments[seg].length; if (write_head_ + len < Frame::frame_size) { lng_memcpy(frame->body + write_head_, pays->segments[seg].payload, len); @@ -181,44 +213,55 @@ void FrameBuilder::main() } } frame->frame_id = this->frame_id_++; - vaild_frame_stream_->put(&frame, 1); + valid_frame_stream_->put(&frame, 1); #else - if (!next_frame_) { - if (!ready_frame_stream_->get(&next_frame_, 1)) { + if (!frame_) { + if (!ready_frame_stream_->get(&frame_, 1)) { return; } } - Frame* frame = next_frame_; - - // WIP/TODO: Make this loop transit-safe - Payloads* pays; - if (!vaild_payload_stream_->get(&pays, 1)) { - return; + if (!payload_) { + if (!valid_payload_stream_->get(&payload_, 1)) { + return; + } + log::trace("0"); } - - for (int seg = 0; seg < pays->no_of_payload; seg++) { - auto len = pays->segments[seg].length; - if (write_head_ + len < Frame::frame_size) { - lng_memcpy(frame->body + write_head_, pays->segments[seg].payload, len); - write_head_ += len; - } else if (write_head_ < Frame::frame_size) { - size_t bytes_cur_frame = Frame::frame_size - write_head_; - size_t bytes_next_frame = len - bytes_cur_frame; - uint8_t* p = pays->segments[seg].payload; - lng_memcpy(frame->body + write_head_, p, bytes_cur_frame); - lng_memcpy(next_frame_->body, p + bytes_cur_frame, bytes_next_frame); - write_head_ = bytes_next_frame; - - frame->frame_id = this->frame_id_++; - vaild_frame_stream_->put(&frame, 1); + + int seg; + for (seg = payload_segment_id_; seg < payload_->segments_num; seg++) { + auto segment_size = payload_->segments[seg].size - payload_segment_read_offset_; + if (frame_write_offset_ + segment_size < Frame::frame_size) { + log::trace("1:w({:5}) <- r({:5},{:5}) len({:5}) val({:#x})", frame_write_offset_, payload_segment_id_, payload_segment_read_offset_, segment_size, *(payload_->segments[seg].addr + payload_segment_read_offset_)); + lng_memcpy(frame_->body + frame_write_offset_, payload_->segments[seg].addr + payload_segment_read_offset_, segment_size); + payload_segment_read_offset_ = 0; + frame_write_offset_ += segment_size; + } else if (frame_write_offset_ < Frame::frame_size) { + size_t copy_size = Frame::frame_size - frame_write_offset_; + log::trace("2:w({:5}) <- r({:5},{:5}) len({:5}) val({:#x})", frame_write_offset_, payload_segment_id_, payload_segment_read_offset_, copy_size, *(payload_->segments[seg].addr + payload_segment_read_offset_)); + lng_memcpy(frame_->body + frame_write_offset_, payload_->segments[seg].addr + payload_segment_read_offset_, copy_size); + payload_segment_id_ = seg; + payload_segment_read_offset_ = segment_size - copy_size; + frame_->frame_id = this->frame_id_++; + valid_frame_stream_->put(&frame_, 1); + frame_ = nullptr; + frame_write_offset_ = 0; + break; } else { - lng_memcpy(next_frame_->body + write_head_, pays->segments[seg].payload, len); - write_head_ += len; + log::trace("3:w({:5}) <- r({:5},{:5}) len({:5}) val({:#x})", frame_write_offset_, payload_segment_id_, payload_segment_read_offset_, segment_size, *(payload_->segments[seg].addr + payload_segment_read_offset_)); + lng_memcpy(frame_->body + frame_write_offset_, payload_->segments[seg].addr + payload_segment_read_offset_, segment_size); + payload_segment_read_offset_ = 0; + frame_write_offset_ += segment_size; } } - ready_payload_stream_->put(&pays, 1); + if (seg == payload_->segments_num && payload_segment_read_offset_ == 0) { + log::trace("4"); + ready_payload_stream_->put(&payload_, 1); + payload_ = nullptr; + payload_segment_id_ = 0; + payload_segment_read_offset_ = 0; + } #endif } diff --git a/test/dpdk_frame_builder_tcp.cc b/test/dpdk_frame_builder_tcp.cc index b6e2662..c1651d2 100644 --- a/test/dpdk_frame_builder_tcp.cc +++ b/test/dpdk_frame_builder_tcp.cc @@ -69,13 +69,13 @@ int main() auto dpdk_stream = sys.create_stream(3); auto valid_frame_stream = sys.create_stream>(); auto ready_frame_stream = sys.create_stream>(); - auto valid_payload_stream = sys.create_stream>(); - auto ready_payload_stream = sys.create_stream>(); + auto valid_payload_stream = sys.create_stream>(); + auto ready_payload_stream = sys.create_stream>(); const int num_pays = 1024; - std::unique_ptr pays[num_pays]; + std::unique_ptr pays[num_pays]; for (int i = 0; i < num_pays; ++i) { - pays[i].reset(new Payloads); + pays[i].reset(new Payload); auto p = pays[i].get(); ready_payload_stream->put(&p, 1); } From f83b39ba96889069553d046217983279ee7bcd2b Mon Sep 17 00:00:00 2001 From: Takuro Iizuka Date: Fri, 12 Apr 2024 00:11:17 +0000 Subject: [PATCH 11/12] Fixed CPU actor to behave as expected --- src/receiver-actor.cc | 112 ++++++++++-------------------------------- src/stream.cc | 16 ++++++ 2 files changed, 41 insertions(+), 87 deletions(-) diff --git a/src/receiver-actor.cc b/src/receiver-actor.cc index c6f71dc..7268e37 100644 --- a/src/receiver-actor.cc +++ b/src/receiver-actor.cc @@ -54,35 +54,6 @@ void Receiver::setup() } void Receiver::main() -#if 0 -{ - Payload* pays; - if (!ready_payload_stream_->get(&pays, 1)) { - return; - } else { - pays->Clear(); - } - - while (true) { - rte_mbuf* v; - if (nic_stream_->get(&v, 1)) { - // std::cout << "received " << v->pkt_len << " bytes" << std::endl; - if (!nic_stream_->check_target_packet(v)) { - continue; - } - - // TODO detect FIN and quit - - auto len = pays->ExtractPayload(v); - - nic_stream_->send_ack(v, len); - - valid_payload_stream_->put(&pays, 1); - break; - } - } -} -#else { if (!payload_) { if (!ready_payload_stream_->get(&payload_, 1)) { @@ -106,12 +77,9 @@ void Receiver::main() nic_stream_->send_ack(v, len); valid_payload_stream_->put(&payload_, 1); - log::trace("r1"); payload_ = nullptr; } -#endif - #ifdef __AVX512F__ @@ -171,50 +139,6 @@ void lng_memcpy(uint8_t* dst, uint8_t* src, size_t size) void FrameBuilder::main() { -#if 0 - if (!next_frame_) { - if (!ready_frame_stream_->get(&next_frame_, 1)) { - return; - } - } - - Frame* frame = next_frame_; - - if (!ready_frame_stream_->get(&next_frame_, 1)) { - return; - } - - bool complete = false; - - // WIP/TODO: Make this loop transit-safe - while (!complete) { - Payload* pays; - if (valid_payload_stream_->get(&pays, 1)) { - for (int seg = 0; seg < pays->segments_num; seg++) { - auto len = pays->segments[seg].length; - if (write_head_ + len < Frame::frame_size) { - lng_memcpy(frame->body + write_head_, pays->segments[seg].payload, len); - write_head_ += len; - } else if (write_head_ < Frame::frame_size) { - size_t bytes_cur_frame = Frame::frame_size - write_head_; - size_t bytes_next_frame = len - bytes_cur_frame; - uint8_t* p = pays->segments[seg].payload; - lng_memcpy(frame->body + write_head_, p, bytes_cur_frame); - lng_memcpy(next_frame_->body, p + bytes_cur_frame, bytes_next_frame); - write_head_ = bytes_next_frame; - - complete = true; - } else { - lng_memcpy(next_frame_->body + write_head_, pays->segments[seg].payload, len); - write_head_ += len; - } - } - ready_payload_stream_->put(&pays, 1); - } - } - frame->frame_id = this->frame_id_++; - valid_frame_stream_->put(&frame, 1); -#else if (!frame_) { if (!ready_frame_stream_->get(&frame_, 1)) { return; @@ -225,44 +149,58 @@ void FrameBuilder::main() if (!valid_payload_stream_->get(&payload_, 1)) { return; } - log::trace("0"); } int seg; for (seg = payload_segment_id_; seg < payload_->segments_num; seg++) { auto segment_size = payload_->segments[seg].size - payload_segment_read_offset_; - if (frame_write_offset_ + segment_size < Frame::frame_size) { - log::trace("1:w({:5}) <- r({:5},{:5}) len({:5}) val({:#x})", frame_write_offset_, payload_segment_id_, payload_segment_read_offset_, segment_size, *(payload_->segments[seg].addr + payload_segment_read_offset_)); - lng_memcpy(frame_->body + frame_write_offset_, payload_->segments[seg].addr + payload_segment_read_offset_, segment_size); - payload_segment_read_offset_ = 0; - frame_write_offset_ += segment_size; - } else if (frame_write_offset_ < Frame::frame_size) { + if (frame_write_offset_ + segment_size > Frame::frame_size) { + // The payload has to be split into two frames size_t copy_size = Frame::frame_size - frame_write_offset_; - log::trace("2:w({:5}) <- r({:5},{:5}) len({:5}) val({:#x})", frame_write_offset_, payload_segment_id_, payload_segment_read_offset_, copy_size, *(payload_->segments[seg].addr + payload_segment_read_offset_)); + log::trace("1:w({:5}) <- r({:5},{:5}) len({:5}) val({:#x})", frame_write_offset_, payload_segment_id_, payload_segment_read_offset_, copy_size, *(payload_->segments[seg].addr + payload_segment_read_offset_)); lng_memcpy(frame_->body + frame_write_offset_, payload_->segments[seg].addr + payload_segment_read_offset_, copy_size); + + // Store payload segment id and read offset for the next frame payload_segment_id_ = seg; payload_segment_read_offset_ = segment_size - copy_size; + + // Frame is pushed to the stream + frame_->frame_id = this->frame_id_++; + valid_frame_stream_->put(&frame_, 1); + frame_ = nullptr; + frame_write_offset_ = 0; + break; + } else if (frame_write_offset_ + segment_size == Frame::frame_size) { + // The payload can fit into the frame exactly + log::trace("2:w({:5}) <- r({:5},{:5}) len({:5}) val({:#x})", frame_write_offset_, payload_segment_id_, payload_segment_read_offset_, segment_size, *(payload_->segments[seg].addr + payload_segment_read_offset_)); + lng_memcpy(frame_->body + frame_write_offset_, payload_->segments[seg].addr + payload_segment_read_offset_, segment_size); + + // Payload segment id and read offset should be point to the next segment because the payload is fully copied + payload_segment_id_ = seg + 1; + payload_segment_read_offset_ = 0; + + // Frame is pushed to the stream frame_->frame_id = this->frame_id_++; valid_frame_stream_->put(&frame_, 1); frame_ = nullptr; frame_write_offset_ = 0; break; } else { + // The payload can fit into the frame with some space left log::trace("3:w({:5}) <- r({:5},{:5}) len({:5}) val({:#x})", frame_write_offset_, payload_segment_id_, payload_segment_read_offset_, segment_size, *(payload_->segments[seg].addr + payload_segment_read_offset_)); lng_memcpy(frame_->body + frame_write_offset_, payload_->segments[seg].addr + payload_segment_read_offset_, segment_size); payload_segment_read_offset_ = 0; frame_write_offset_ += segment_size; } } - + + // Release payload if it is fully copied if (seg == payload_->segments_num && payload_segment_read_offset_ == 0) { - log::trace("4"); ready_payload_stream_->put(&payload_, 1); payload_ = nullptr; payload_segment_id_ = 0; payload_segment_read_offset_ = 0; } -#endif } } // lng diff --git a/src/stream.cc b/src/stream.cc index 84e9735..2794fde 100644 --- a/src/stream.cc +++ b/src/stream.cc @@ -14,6 +14,7 @@ namespace lng { void DPDKStream::Impl::wait_for_3wayhandshake() { + // Wait SYN, send SYN-ACK while (true) { rte_mbuf* v; if (!rte_eth_rx_burst(port_id, 0, &v, 1)) { @@ -29,6 +30,21 @@ void DPDKStream::Impl::wait_for_3wayhandshake() rte_pktmbuf_free(v); break; } + + // Wait ACK + while (true) { + rte_mbuf* v; + if (!rte_eth_rx_burst(port_id, 0, &v, 1)) { + continue; + } + + auto* tcp = rte_pktmbuf_mtod_offset(v, rte_tcp_hdr*, sizeof(rte_ipv4_hdr) + sizeof(rte_ether_hdr)); + if (!(tcp->tcp_flags & RTE_TCP_ACK_FLAG)) + continue; + + rte_pktmbuf_free(v); + break; + } } void DPDKStream::start() From 4ae4172794ec79ea0728955f01043b6859e0c145 Mon Sep 17 00:00:00 2001 From: Takuro Iizuka Date: Fri, 12 Apr 2024 03:35:13 +0000 Subject: [PATCH 12/12] Fix naming convention --- CMakeLists.txt | 4 ++-- test/{dpdk_frame_builder_tcp.cc => dpdk_build_frame_tcp.cc} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename test/{dpdk_frame_builder_tcp.cc => dpdk_build_frame_tcp.cc} (100%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8034087..c78ba2a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -124,8 +124,8 @@ if(LNG_WITH_DOCA OR LNG_WITH_DPDK) add_executable(test_dpdk_echo_udp test/dpdk_echo_udp.cc) target_link_libraries(test_dpdk_echo_udp lng-core ${LIBDPDK_STATIC_LDFLAGS}) - add_executable(dpdk_frame_builder_tcp test/dpdk_frame_builder_tcp.cc) - target_link_libraries(dpdk_frame_builder_tcp lng-core ${LIBDPDK_STATIC_LDFLAGS}) + add_executable(test_dpdk_build_frame_tcp test/dpdk_build_frame_tcp.cc) + target_link_libraries(test_dpdk_build_frame_tcp lng-core ${LIBDPDK_STATIC_LDFLAGS}) endif() if(LNG_WITH_DOCA) diff --git a/test/dpdk_frame_builder_tcp.cc b/test/dpdk_build_frame_tcp.cc similarity index 100% rename from test/dpdk_frame_builder_tcp.cc rename to test/dpdk_build_frame_tcp.cc