Skip to content

Commit

Permalink
Merge pull request #14 from fixstars/feature/develop
Browse files Browse the repository at this point in the history
doca gpunetio and dpdk gpu direct, tcp and udp
  • Loading branch information
iitaku committed Jun 28, 2024
2 parents 65102db + d2b75a3 commit 6dedefd
Show file tree
Hide file tree
Showing 34 changed files with 4,989 additions and 492 deletions.
24 changes: 23 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ file(GENERATE OUTPUT ${PROJECT_BINARY_DIR}/VERSION CONTENT "${LNG_KIT_VERSION_S}
set(LNG_CORE_SRC
src/actor.cc
src/receiver-actor.cc
src/receiver-actor.cu
src/event.cc
src/log.cc
src/runtime.cc
Expand All @@ -67,6 +68,7 @@ set(LNG_CORE_SRC
)
if(LNG_WITH_CUDA AND LNG_WITH_DOCA)
list(APPEND LNG_CORE_SRC
src/receiver-actor-gpu.cc
src/protocol_tcp.cu
src/protocol_udp.cu)
endif()
Expand All @@ -76,7 +78,8 @@ if(LNG_WITH_DOCA)
src/doca-common-util-internal.cc
src/doca-stream.cc
src/doca-tcp-util.cc
src/doca-udp-util.cc)
src/doca-udp-util.cc
)
endif()
add_library(lng-core ${LNG_CORE_SRC})

Expand Down Expand Up @@ -126,6 +129,22 @@ if(LNG_WITH_DOCA OR LNG_WITH_DPDK)

add_executable(test_dpdk_build_frame_tcp test/dpdk_build_frame_tcp.cc)
target_link_libraries(test_dpdk_build_frame_tcp lng-core ${LIBDPDK_STATIC_LDFLAGS})

add_executable(test_dpdk_gpu_build_frame_udp test/dpdk_gpu_build_frame_udp.cc)
target_include_directories(
test_dpdk_gpu_build_frame_udp
PRIVATE
${CMAKE_CUDA_TOOLKIT_INCLUDE_DIRECTORIES}
)
target_link_libraries(test_dpdk_gpu_build_frame_udp lng-core ${LIBDPDK_STATIC_LDFLAGS})

add_executable(test_dpdk_gpu_build_frame_tcp test/dpdk_gpu_build_frame_tcp.cc)
target_include_directories(
test_dpdk_gpu_build_frame_tcp
PRIVATE
${CMAKE_CUDA_TOOLKIT_INCLUDE_DIRECTORIES}
)
target_link_libraries(test_dpdk_gpu_build_frame_tcp lng-core ${LIBDPDK_STATIC_LDFLAGS})
endif()

if(LNG_WITH_DOCA)
Expand All @@ -134,6 +153,9 @@ if(LNG_WITH_DOCA)

add_executable(test_doca_build_frame_tcp test/doca_build_frame_tcp.cc)
target_link_libraries(test_doca_build_frame_tcp lng-core ${LIBDPDK_STATIC_LDFLAGS})

add_executable(test_doca_build_frame_udp test/doca_build_frame_udp.cc)
target_link_libraries(test_doca_build_frame_udp lng-core ${LIBDPDK_STATIC_LDFLAGS})
endif()

# if(LNG_WITH_NVIDIA)
Expand Down
91 changes: 79 additions & 12 deletions app/client_tcp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ struct client_args {
std::vector<uint8_t> send_buf;
uint32_t send_buf_num;
uint32_t chunk_size;
uint32_t check_ack_freq;
bool ignore_ack;
};

int init_rte_env(void* arg1)
Expand Down Expand Up @@ -390,7 +392,7 @@ void init(
const std::string& log_level)
{

std::vector<std::string> arguments = { ".", "--lcores", lcores, "--socket-mem", socket_mem, "--iova-mode", iova_mode, "--log-level", log_level, "--file-prefix", dev_pci_addrs[0] };
std::vector<std::string> arguments = { ".", "--lcores", lcores, "--socket-mem", socket_mem, "--iova-mode", iova_mode, "--log-level", log_level, "--file-prefix", dev_pci_addrs[0] + lcores };

for (auto& dev_pci_addr : dev_pci_addrs) {
arguments.push_back("-a");
Expand Down Expand Up @@ -542,10 +544,17 @@ int sending_tcp_data(void* arg1)
uint16_t nb_txd = arg->nb_txd;

uint32_t seqn = arg->seqn;
uint32_t wait_seqn = arg->seqn;
uint32_t ackn = arg->ackn;

uint32_t check_ack_freq = arg->check_ack_freq;

const size_t chunk_size = arg->chunk_size; // 16MiB

bool ignore_ack = arg->ignore_ack;

bool is_first = true;

// NOTE: Assume frame index is always zero to make things easy
std::vector<std::tuple<std::vector<struct rte_mbuf*>, size_t>> bss;
{
Expand All @@ -565,8 +574,18 @@ int sending_tcp_data(void* arg1)

auto ts1 = std::chrono::high_resolution_clock::now();

size_t tx_time = 0;
size_t rtt_measured = 0;

double sum_rtt = 0;
double sum2_rtt = 0;
double max_rtt = -1;
double min_rtt = 1000000;

while (g_running) {

auto tx_st = std::chrono::high_resolution_clock::now();

for (const auto& bs_info : bss) {
const auto& reference_bs = std::get<0>(bs_info);
const auto& payload_size = std::get<1>(bs_info);
Expand All @@ -584,20 +603,45 @@ int sending_tcp_data(void* arg1)
seqn += rte_pktmbuf_pkt_len(b) - b->l2_len - b->l3_len - b->l4_len;
}

if (!is_first) {
if (!ignore_ack && ((tx_time + 1) % check_ack_freq == 0)) {
auto ns = wait_packet(
dev_port_id,
[&]() {
return g_running;
},
[&](const rte_ipv4_hdr*, const rte_tcp_hdr* tcp) {
return (tcp->tcp_flags & RTE_TCP_ACK_FLAG) && (tcp->dst_port == rte_cpu_to_be_16(arg->client_port)) && (rte_be_to_c\
pu_32(tcp->recv_ack) == wait_seqn);
});
auto tx_ed = std::chrono::high_resolution_clock::now();

double elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(tx_ed - tx_st).count() / 1000.0;
if (tx_time > 10) {
rtt_measured++;
sum_rtt += elapsed;
sum2_rtt += elapsed * elapsed;
max_rtt = std::max(max_rtt, elapsed);
min_rtt = std::min(min_rtt, elapsed);
}
}
} else {
is_first = false;
}
wait_seqn = seqn;

// Transmit
auto nb = rte_eth_tx_burst(dev_port_id, 0, bs.data(), bs.size());
if (nb != bs.size()) {
throw std::runtime_error("Failed to send data");
if (tx_time % check_ack_freq == 0) {
tx_st = std::chrono::high_resolution_clock::now();
}

size_t transmitted_num = 0;
while (transmitted_num < bs.size()) {
auto nb = rte_eth_tx_burst(dev_port_id, 0, bs.data() + transmitted_num, bs.size() - transmitted_num);
transmitted_num += nb;
}

auto ns = wait_packet(
dev_port_id,
[&]() {
return g_running;
},
[&](const rte_ipv4_hdr*, const rte_tcp_hdr* tcp) {
return (tcp->tcp_flags & RTE_TCP_ACK_FLAG) && (tcp->dst_port == rte_cpu_to_be_16(arg->client_port)) && (rte_be_to_cpu_32(tcp->recv_ack) == seqn);
});
tx_time++;

if (!g_running) {
break;
Expand Down Expand Up @@ -642,6 +686,16 @@ int sending_tcp_data(void* arg1)
if (!rte_eth_stats_get(dev_port_id, &stats)) {
print_statistics(elapsed, stats.obytes);
}

if (n > 0) {
printf("*****************************\n");
printf("average rtt : %f usec \n", sum_rtt / rtt_measured);
printf("std_dev rtt : %f usec \n", std::sqrt(sum2_rtt / rtt_measured - (sum_rtt / rtt_measured) * (sum_rtt / rtt_measured)));
printf("minimum rtt : %f usec \n", (double)min_rtt);
printf("maximum rtt : %f usec \n", (double)max_rtt);
printf("*****************************\n");
}

return 0;
}

Expand Down Expand Up @@ -858,10 +912,19 @@ int main(int argc, char** argv)
.help("specify the chunk size for one rx")
.scan<'u', uint32_t>();

program.add_argument("--check_ack_interval")
.default_value<uint32_t>(1)
.help("specify checking ack frequency")
.scan<'u', uint32_t>();

program.add_argument("--output_sent_file")
.default_value<std::string>("")
.help("specify the filename");

program.add_argument("--ignore_ack")
.help("switch ignore ack")
.flag();

try {
program.parse_args(argc, argv);
} catch (const std::exception& err) {
Expand All @@ -886,6 +949,8 @@ int main(int argc, char** argv)
uint32_t frame_num = program.get<uint32_t>("--frame_num");
uint32_t chunk_size = program.get<uint32_t>("--chunk_size");
std::string output_file = program.get<std::string>("--output_sent_file");
uint32_t check_ack_freq = program.get<uint32_t>("--check_ack_interval");
bool ignore_ack = (program["--ignore_ack"] == true);

auto socket_mem = get_socket_mem(lcores);

Expand Down Expand Up @@ -916,6 +981,8 @@ int main(int argc, char** argv)
client_argses.at(i).send_buf = send_buf;
client_argses.at(i).send_buf_num = frame_num;
client_argses.at(i).chunk_size = chunk_size;
client_argses.at(i).check_ack_freq = check_ack_freq;
client_argses.at(i).ignore_ack = ignore_ack;
}

std::sort(client_argses.begin(), client_argses.end(),
Expand Down
31 changes: 23 additions & 8 deletions app/client_udp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@

namespace fs = std::filesystem;

struct udp_payload_header {
uint32_t seqn;
};

bool g_running = true;
void catch_int(int sig_num)
{
Expand Down Expand Up @@ -91,7 +95,7 @@ class UDPPacketMaker {
const uint32_t l2_len = sizeof(struct rte_ether_hdr);
const uint32_t l3_len = sizeof(struct rte_ipv4_hdr);
const uint32_t l4_len = sizeof(rte_udp_hdr);
const uint32_t hdr_len = l2_len + l3_len + l4_len;
const uint32_t hdr_len = l2_len + l3_len + l4_len + sizeof(struct udp_payload_header);
const uint16_t mss = mtu - l4_len - l3_len;

std::vector<struct rte_mbuf*> bufs;
Expand Down Expand Up @@ -134,7 +138,7 @@ class UDPPacketMaker {
uint8_t* body = reinterpret_cast<uint8_t*>(rte_pktmbuf_append(buf, segmented_payload_size));
memcpy(body, payload_ptr, segmented_payload_size);

uint16_t dgram_len = segmented_payload_size + sizeof(struct rte_udp_hdr);
uint16_t dgram_len = segmented_payload_size + sizeof(struct rte_udp_hdr) + sizeof(struct udp_payload_header);

remaining -= segmented_payload_size;
payload_ptr += segmented_payload_size;
Expand Down Expand Up @@ -431,7 +435,7 @@ int sending_udp_data(void* arg1)
uint32_t itr = 0;
int ports_num = arg->udp_pkt_maker->dst_ports_.size();
std::vector<uint64_t> count_bytes(ports_num, 0);
std::vector<uint8_t> send_pkt_id(ports_num, 0);
std::vector<uint32_t> seqn(ports_num, 0);

uint32_t num_itr = (NUM_DUP - 1 + arg->send_buf_num) / NUM_DUP;

Expand All @@ -452,9 +456,10 @@ int sending_udp_data(void* arg1)
count_bytes.at(idx) += rte_cpu_to_be_16(udp->dgram_len);
udp->dst_port = arg->udp_pkt_maker->dst_ports_.at(idx);

auto* body = rte_pktmbuf_mtod_offset(bs[i], uint8_t*, bs[i]->l2_len + bs[i]->l3_len + bs[i]->l4_len);
body[0] = send_pkt_id.at(idx);
send_pkt_id.at(idx)++;
auto* udp_custom_header = rte_pktmbuf_mtod_offset(bs[i], struct udp_payload_header*, bs[i]->l2_len + bs[i]->l3_len + bs[i]->l4_len);
udp_custom_header->seqn = seqn.at(idx);
seqn.at(idx) += rte_pktmbuf_pkt_len(bs[i]) - bs[i]->l2_len - bs[i]->l3_len - bs[i]->l4_len - sizeof(struct udp_payload_header);
// std::cout << idx << " " << rte_cpu_to_be_16(udp->dgram_len) << " " << udp_custom_header->seqn << " udp_custom_header->seqn" << std::endl;
// std::cout << bs[i]->l2_len << " " << bs[i]->l3_len << " " << bs[i]->l4_len << " l2 l3 l4" << std::endl;
itr++;
}
Expand All @@ -471,6 +476,16 @@ int sending_udp_data(void* arg1)
if (!g_running) {
break;
}

if (bandwidth_in_gbps) {
sent_in_bytes += payload_size;

auto stop_time = (8 * sent_in_bytes / (bandwidth_in_gbps * 1024.0 * 1024.0 * 1024.0)) - (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - ts1).count() / 1000.0);

if (stop_time > 0) {
usleep(static_cast<int>(stop_time * 1000 * 1000));
}
}
}

if ((n++ % 1000) == 0) {
Expand Down Expand Up @@ -818,13 +833,13 @@ int main(int argc, char** argv)
.help("specify the log level");

program.add_argument("--frame_size")
.default_value<size_t>(256 * 1024 * 1024)
.default_value<size_t>(MTU - sizeof(struct rte_ipv4_hdr) - sizeof(struct rte_udp_hdr) - sizeof(struct udp_payload_header))
.help("specify the one frame size")
.scan<'u', size_t>();
;

program.add_argument("--frame_num")
.default_value<uint32_t>(128)
.default_value<uint32_t>(1024 * 1024)
.help("specify the # of frame")
.scan<'u', uint32_t>();

Expand Down
43 changes: 41 additions & 2 deletions include/lng/doca-kernels.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,64 @@

namespace lng {

void init_udp_kernels(std::vector<cudaStream_t>& streams);
void launch_udp_kernels(struct rx_queue* rxq,
void init_udp_echo_kernels(std::vector<cudaStream_t>& streams);
void launch_udp_echo_kernels(struct rx_queue* rxq,
struct tx_queue* txq,
struct tx_buf* tx_buf_arr,
struct semaphore* sem_rx,
struct semaphore* sem_fr,
struct semaphore* sem_reply,
std::vector<cudaStream_t>& streams);
void init_udp_framebuilding_kernels(std::vector<cudaStream_t>& streams);
void launch_udp_framebuilding_kernels(struct rx_queue* rxq,
struct semaphore* sem_rx,
struct semaphore* sem_fr,
uint8_t* tar_buf, size_t frame_size,
uint8_t* tmp_buf,
std::vector<cudaStream_t>& streams);

void init_tcp_kernels(std::vector<cudaStream_t>& streams);
void launch_tcp_kernels(struct rx_queue* rxq,
struct tx_queue* txq,
struct tx_buf* tx_buf_arr,
struct semaphore* sem_rx,
struct semaphore* sem_pay,
struct semaphore* sem_fr,
uint8_t* tar_bufs, size_t frame_size,
uint8_t* tmp_buf,
uint32_t* first_ackn, int* is_fin,
std::vector<cudaStream_t>& streams, int id);
void frame_check(uint8_t* frame, size_t frame_size, int* res, cudaStream_t stream);

// temporary here
void init_dpdk_udp_framebuilding_kernels(std::vector<cudaStream_t>& streams);
void launch_dpdk_udp_framebuilding_kernels(
struct rte_gpu_comm_list* comm_list, int comm_list_entries,
// struct semaphore* sem_fr,
uint32_t* quit_flag_ptr,
uint8_t* tar_buf, size_t frame_size,
uint8_t* tmp_buf,
std::vector<cudaStream_t>& streams);

void init_dpdk_tcp_framebuilding_kernels(std::vector<cudaStream_t>& streams);
void launch_dpdk_tcp_framebuilding_kernels(
struct rte_gpu_comm_list* comm_list, int comm_list_entries,
struct rte_gpu_comm_list* comm_list_recv, int comm_list_recv_entries,
struct rte_gpu_comm_list* comm_list_ack, int comm_list_ack_entries,
struct rte_gpu_comm_list* comm_list_frame, int comm_list_frame_entries,
// struct semaphore* sem_fr,
uint32_t* quit_flag_ptr,
uint32_t* seqn,
uint8_t* tar_buf, size_t frame_size,
uint8_t* tmp_buf,
std::vector<cudaStream_t>& streams);
void cpu_3way_handshake(
struct rte_gpu_comm_list* comm_list_recv, int comm_list_recv_entries,
struct rte_gpu_comm_list* comm_list_ack, int comm_list_ack_entries,
uint32_t* quit_flag_ptr,
uint32_t* seqn);
// void print_header_cpu(uint8_t* ack);
// void set_header_cpu(uint8_t* ack);
}

#endif
Loading

0 comments on commit 6dedefd

Please sign in to comment.