From 9d87d0da3466bd88cdf3336844643882d272460b Mon Sep 17 00:00:00 2001 From: Frank Du Date: Fri, 7 Jun 2024 16:42:35 +0800 Subject: [PATCH] mcast: move socket based join to socket module (#897) also add mcast support for af_pkt Signed-off-by: Frank Du --- lib/src/datapath/mt_dp_socket.c | 18 +----- lib/src/datapath/mt_shared_queue.c | 17 ++++++ lib/src/dev/mt_af_xdp.c | 30 ++-------- lib/src/dev/mt_dev.c | 2 +- lib/src/mt_arp.c | 3 +- lib/src/mt_flow.c | 2 +- lib/src/mt_main.h | 2 + lib/src/mt_mcast.c | 4 ++ lib/src/mt_socket.c | 68 ++++++++++++++++++++++ lib/src/mt_socket.h | 6 ++ lib/src/st2110/st_rx_ancillary_session.c | 1 - lib/src/st2110/st_rx_audio_session.c | 1 - lib/src/st2110/st_rx_video_session.c | 1 - tests/script/native_af_xdp_json/mcast.json | 4 +- 14 files changed, 109 insertions(+), 50 deletions(-) diff --git a/lib/src/datapath/mt_dp_socket.c b/lib/src/datapath/mt_dp_socket.c index 0e01b0785..8eef0f939 100644 --- a/lib/src/datapath/mt_dp_socket.c +++ b/lib/src/datapath/mt_dp_socket.c @@ -6,6 +6,7 @@ #include "mt_dp_socket.h" #include "../mt_log.h" +#include "../mt_socket.h" #include "../mt_stat.h" #include "../mt_util.h" #ifndef WINDOWSENV @@ -479,26 +480,11 @@ static int rx_socket_init_fd(struct mt_rx_socket_entry* entry, int fd, bool reus /* join multicast group, will drop automatically when socket fd closed */ if (mt_is_multicast_ip(flow->dip_addr)) { - uint32_t source = *(uint32_t*)flow->sip_addr; - if (source == 0) { - struct ip_mreq mreq; - memset(&mreq, 0, sizeof(mreq)); - memcpy(&mreq.imr_multiaddr.s_addr, flow->dip_addr, MTL_IP_ADDR_LEN); - memcpy(&mreq.imr_interface.s_addr, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN); - ret = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); - } else { - struct ip_mreq_source mreq; - memset(&mreq, 0, sizeof(mreq)); - memcpy(&mreq.imr_multiaddr.s_addr, flow->dip_addr, MTL_IP_ADDR_LEN); - memcpy(&mreq.imr_interface.s_addr, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN); - memcpy(&mreq.imr_sourceaddr.s_addr, flow->sip_addr, MTL_IP_ADDR_LEN); - ret = setsockopt(fd, IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP, &mreq, sizeof(mreq)); - } + ret = mt_socket_fd_join_multicast(impl, port, flow, fd); if (ret < 0) { err("%s(%d,%d), join multicast fail %d\n", __func__, port, fd, ret); return ret; } - info("%s(%d,%d), join multicast succ\n", __func__, port, fd); } return 0; diff --git a/lib/src/datapath/mt_shared_queue.c b/lib/src/datapath/mt_shared_queue.c index 8020bca07..30e25bcc4 100644 --- a/lib/src/datapath/mt_shared_queue.c +++ b/lib/src/datapath/mt_shared_queue.c @@ -8,6 +8,7 @@ #include "../dev/mt_dev.h" #include "../mt_flow.h" #include "../mt_log.h" +#include "../mt_socket.h" #include "../mt_stat.h" #include "../mt_util.h" @@ -79,6 +80,10 @@ static int rsq_entry_free(struct mt_rsq_entry* entry) { mt_ring_dequeue_clean(entry->ring); rte_ring_free(entry->ring); } + if (entry->mcast_fd) { + close(entry->mcast_fd); + entry->mcast_fd = -1; + } info("%s(%d), succ on q %u idx %d\n", __func__, rsqm->port, entry->queue_id, entry->idx); mt_rte_free(entry); @@ -181,6 +186,7 @@ struct mt_rsq_entry* mt_rsq_get(struct mtl_main_impl* impl, enum mtl_port port, entry->queue_id = q; entry->idx = idx; entry->parent = rsqm; + entry->mcast_fd = -1; rte_memcpy(&entry->flow, flow, sizeof(entry->flow)); if (rsqm->queue_mode == MT_QUEUE_MODE_XDP) { @@ -224,6 +230,17 @@ struct mt_rsq_entry* mt_rsq_get(struct mtl_main_impl* impl, enum mtl_port port, return NULL; } + if (mt_pmd_is_dpdk_af_packet(impl, port) && mt_is_multicast_ip(flow->dip_addr)) { + /* join multicast group, will drop automatically when socket fd closed */ + entry->mcast_fd = mt_socket_get_multicast_fd(impl, port, flow); + if (entry->mcast_fd < 0) { + err("%s(%d,%d), get multicast socket fd fail %d\n", __func__, port, idx, + entry->mcast_fd); + rsq_entry_free(entry); + return NULL; + } + } + rsq_lock(rsq_queue); MT_TAILQ_INSERT_HEAD(&rsq_queue->head, entry, next); rte_atomic32_inc(&rsq_queue->entry_cnt); diff --git a/lib/src/dev/mt_af_xdp.c b/lib/src/dev/mt_af_xdp.c index d18b55589..b1c772a9e 100644 --- a/lib/src/dev/mt_af_xdp.c +++ b/lib/src/dev/mt_af_xdp.c @@ -13,6 +13,7 @@ #include "../mt_flow.h" #include "../mt_instance.h" #include "../mt_log.h" +#include "../mt_socket.h" #include "../mt_stat.h" #include "../mt_util.h" @@ -943,6 +944,7 @@ struct mt_rx_xdp_entry* mt_rx_xdp_get(struct mtl_main_impl* impl, enum mtl_port } entry->parent = impl; entry->port = port; + entry->mcast_fd = -1; rte_memcpy(&entry->flow, flow, sizeof(entry->flow)); struct mt_xdp_priv* xdp = mt_if(impl, port)->xdp; @@ -1000,35 +1002,12 @@ struct mt_rx_xdp_entry* mt_rx_xdp_get(struct mtl_main_impl* impl, enum mtl_port /* join multicast group, will drop automatically when socket fd closed */ int mcast_fd = -1; if (mt_is_multicast_ip(flow->dip_addr)) { - int ret; - mcast_fd = socket(AF_INET, SOCK_DGRAM, 0); + mcast_fd = mt_socket_get_multicast_fd(impl, port, flow); if (mcast_fd < 0) { - err("%s(%d,%u), create multicast socket fail\n", __func__, port, q); + err("%s(%d,%u), get multicast socket fd fail\n", __func__, port, q); mt_rx_xdp_put(entry); return NULL; } - uint32_t source = *(uint32_t*)flow->sip_addr; - if (source == 0) { - struct ip_mreq mreq; - memset(&mreq, 0, sizeof(mreq)); - memcpy(&mreq.imr_multiaddr.s_addr, flow->dip_addr, MTL_IP_ADDR_LEN); - memcpy(&mreq.imr_interface.s_addr, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN); - ret = setsockopt(mcast_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); - } else { - struct ip_mreq_source mreq; - memset(&mreq, 0, sizeof(mreq)); - memcpy(&mreq.imr_multiaddr.s_addr, flow->dip_addr, MTL_IP_ADDR_LEN); - memcpy(&mreq.imr_interface.s_addr, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN); - memcpy(&mreq.imr_sourceaddr.s_addr, flow->sip_addr, MTL_IP_ADDR_LEN); - ret = - setsockopt(mcast_fd, IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP, &mreq, sizeof(mreq)); - } - if (ret < 0) { - err("%s(%d), join multicast fail %d\n", __func__, port, ret); - mt_rx_xdp_put(entry); - return NULL; - } - info("%s(%d), join multicast succ\n", __func__, port); } entry->mcast_fd = mcast_fd; @@ -1048,6 +1027,7 @@ int mt_rx_xdp_put(struct mt_rx_xdp_entry* entry) { if (entry->mcast_fd > 0) { close(entry->mcast_fd); + entry->mcast_fd = -1; } if (entry->flow_rsp) { diff --git a/lib/src/dev/mt_dev.c b/lib/src/dev/mt_dev.c index d881afcce..fed3b72e9 100644 --- a/lib/src/dev/mt_dev.c +++ b/lib/src/dev/mt_dev.c @@ -82,7 +82,7 @@ static const struct mt_dev_driver_info dev_drvs[] = { .drv_type = MT_DRV_DPDK_AF_PKT, .flow_type = MT_FLOW_ALL, .flags = MT_DRV_F_USE_KERNEL_CTL | MT_DRV_F_RX_POOL_COMMON | MT_DRV_F_RX_NO_FLOW | - MT_DRV_F_KERNEL_BASED, + MT_DRV_F_KERNEL_BASED | MT_DRV_F_MCAST_IN_DP, }, { .name = "kernel_socket", diff --git a/lib/src/mt_arp.c b/lib/src/mt_arp.c index 12ad71639..7db26da7f 100644 --- a/lib/src/mt_arp.c +++ b/lib/src/mt_arp.c @@ -358,11 +358,10 @@ int mt_arp_uinit(struct mtl_main_impl* impl) { int mt_arp_get_mac(struct mtl_main_impl* impl, uint8_t dip[MTL_IP_ADDR_LEN], struct rte_ether_addr* ea, enum mtl_port port, int timeout_ms) { int ret; - struct mt_interface* inf = mt_if(impl, port); dbg("%s(%d), start to get mac for ip %d.%d.%d.%d\n", __func__, port, dip[0], dip[1], dip[2], dip[3]); - if ((inf->drv_info.flags & MT_DRV_F_USE_KERNEL_CTL) || mt_has_virtio_user(impl, port)) { + if (mt_drv_use_kernel_ctl(impl, port) || mt_has_virtio_user(impl, port)) { ret = mt_socket_get_mac(impl, mt_kernel_if_name(impl, port), dip, ea, timeout_ms); if (ret < 0) { dbg("%s(%d), failed to get mac from socket %d\n", __func__, port, ret); diff --git a/lib/src/mt_flow.c b/lib/src/mt_flow.c index 2b94c4308..eb4463ded 100644 --- a/lib/src/mt_flow.c +++ b/lib/src/mt_flow.c @@ -214,7 +214,7 @@ static struct mt_rx_flow_rsp* rx_flow_create(struct mt_interface* inf, uint16_t /* no flow if MT_DRV_F_RX_NO_FLOW */ if (inf->drv_info.flags & MT_DRV_F_RX_NO_FLOW) return rsp; - if (inf->drv_info.flags & MT_DRV_F_USE_KERNEL_CTL) { + if (mt_drv_use_kernel_ctl(impl, port)) { ret = mt_socket_add_flow(impl, port, q, flow); if (ret < 0) { err("%s(%d), socket add flow fail for queue %d\n", __func__, port, q); diff --git a/lib/src/mt_main.h b/lib/src/mt_main.h index b1fc752fd..f4774f485 100644 --- a/lib/src/mt_main.h +++ b/lib/src/mt_main.h @@ -916,6 +916,8 @@ struct mt_rsq_entry { struct mt_rx_flow_rsp* flow_rsp; struct mt_rsq_impl* parent; struct rte_ring* ring; + /* wa for MTL_PMD_DPDK_AF_PACKET */ + int mcast_fd; uint32_t stat_enqueue_cnt; uint32_t stat_dequeue_cnt; uint32_t stat_enqueue_fail_cnt; diff --git a/lib/src/mt_mcast.c b/lib/src/mt_mcast.c index f66c38fd8..7dbd9208f 100644 --- a/lib/src/mt_mcast.c +++ b/lib/src/mt_mcast.c @@ -562,6 +562,8 @@ int mt_mcast_join(struct mtl_main_impl* impl, uint32_t group_addr, uint32_t sour return 0; } + if (mt_drv_mcast_in_dp(impl, port)) return 0; + if (mcast->group_num >= MT_MCAST_GROUP_MAX) { err("%s(%d), reach max multicast group number!\n", __func__, port); return -EIO; @@ -660,6 +662,8 @@ int mt_mcast_leave(struct mtl_main_impl* impl, uint32_t group_addr, uint32_t sou return 0; } + if (mt_drv_mcast_in_dp(impl, port)) return 0; + struct mt_mcast_impl* mcast = get_mcast(impl, port); int group_num = mcast->group_num; struct mt_interface* inf = mt_if(impl, port); diff --git a/lib/src/mt_socket.c b/lib/src/mt_socket.c index 3f833173b..3783fcf34 100644 --- a/lib/src/mt_socket.c +++ b/lib/src/mt_socket.c @@ -420,6 +420,57 @@ int mt_socket_remove_flow(struct mtl_main_impl* impl, enum mtl_port port, int fl return mt_instance_del_flow(impl, if_nametoindex(if_name), flow_id); } + +int mt_socket_fd_join_multicast(struct mtl_main_impl* impl, enum mtl_port port, + struct mt_rxq_flow* flow, int fd) { + uint32_t source = *(uint32_t*)flow->sip_addr; + int ret; + + if (!mt_is_multicast_ip(flow->dip_addr)) { + err("%s(%d), not multicast dip\n", __func__, port); + return -EIO; + } + + if (source == 0) { + struct ip_mreq mreq; + memset(&mreq, 0, sizeof(mreq)); + memcpy(&mreq.imr_multiaddr.s_addr, flow->dip_addr, MTL_IP_ADDR_LEN); + memcpy(&mreq.imr_interface.s_addr, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN); + ret = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + } else { + struct ip_mreq_source mreq; + memset(&mreq, 0, sizeof(mreq)); + memcpy(&mreq.imr_multiaddr.s_addr, flow->dip_addr, MTL_IP_ADDR_LEN); + memcpy(&mreq.imr_interface.s_addr, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN); + memcpy(&mreq.imr_sourceaddr.s_addr, flow->sip_addr, MTL_IP_ADDR_LEN); + ret = setsockopt(fd, IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP, &mreq, sizeof(mreq)); + } + + return ret; +} + +int mt_socket_get_multicast_fd(struct mtl_main_impl* impl, enum mtl_port port, + struct mt_rxq_flow* flow) { + int ret; + int mcast_fd = socket(AF_INET, SOCK_DGRAM, 0); + if (mcast_fd < 0) { + err("%s(%d), create multicast socket fail\n", __func__, port); + return mcast_fd; + } + + ret = mt_socket_fd_join_multicast(impl, port, flow, mcast_fd); + if (ret < 0) { + err("%s(%d), setsockopt fail %d\n", __func__, port, ret); + close(mcast_fd); + return ret; + } else { + /* return the fd */ + uint8_t* ip = flow->dip_addr; + info("%s(%d), join %u.%u.%u.%u succ\n", __func__, port, ip[0], ip[1], ip[2], ip[3]); + return mcast_fd; + } +} + #else int mt_socket_get_if_ip(const char* if_name, uint8_t ip[MTL_IP_ADDR_LEN], uint8_t netmask[MTL_IP_ADDR_LEN]) { @@ -474,6 +525,23 @@ int mt_socket_remove_flow(struct mtl_main_impl* impl, enum mtl_port port, int fl MTL_MAY_UNUSED(dst_port); return -ENOTSUP; } + +int mt_socket_get_multicast_fd(struct mtl_main_impl* impl, enum mtl_port port, + struct mt_rxq_flow* flow) { + MTL_MAY_UNUSED(impl); + MTL_MAY_UNUSED(port); + MTL_MAY_UNUSED(flow); + return -ENOTSUP; +} + +int mt_socket_fd_join_multicast(struct mtl_main_impl* impl, enum mtl_port port, + struct mt_rxq_flow* flow, int fd) { + MTL_MAY_UNUSED(impl); + MTL_MAY_UNUSED(port); + MTL_MAY_UNUSED(flow); + MTL_MAY_UNUSED(fd); + return -ENOTSUP; +} #endif int mtl_get_if_ip(char* if_name, uint8_t ip[MTL_IP_ADDR_LEN], diff --git a/lib/src/mt_socket.h b/lib/src/mt_socket.h index cb6a56b9e..136e9a7e2 100644 --- a/lib/src/mt_socket.h +++ b/lib/src/mt_socket.h @@ -31,4 +31,10 @@ int mt_socket_add_flow(struct mtl_main_impl* impl, enum mtl_port port, uint16_t int mt_socket_remove_flow(struct mtl_main_impl* impl, enum mtl_port port, int flow_id, uint16_t dst_port); +int mt_socket_fd_join_multicast(struct mtl_main_impl* impl, enum mtl_port port, + struct mt_rxq_flow* flow, int fd); + +int mt_socket_get_multicast_fd(struct mtl_main_impl* impl, enum mtl_port port, + struct mt_rxq_flow* flow); + #endif diff --git a/lib/src/st2110/st_rx_ancillary_session.c b/lib/src/st2110/st_rx_ancillary_session.c index 8e20532c3..5fe03cfe6 100644 --- a/lib/src/st2110/st_rx_ancillary_session.c +++ b/lib/src/st2110/st_rx_ancillary_session.c @@ -319,7 +319,6 @@ static int rx_ancillary_session_init_mcast(struct mtl_main_impl* impl, for (int i = 0; i < ops->num_port; i++) { if (!mt_is_multicast_ip(ops->ip_addr[i])) continue; port = mt_port_logic2phy(s->port_maps, i); - if (mt_drv_mcast_in_dp(impl, port)) continue; if (ops->flags & ST20_RX_FLAG_DATA_PATH_ONLY) { info("%s(%d), skip mcast join for port %d\n", __func__, s->idx, i); return 0; diff --git a/lib/src/st2110/st_rx_audio_session.c b/lib/src/st2110/st_rx_audio_session.c index 338188e77..aa61c1778 100644 --- a/lib/src/st2110/st_rx_audio_session.c +++ b/lib/src/st2110/st_rx_audio_session.c @@ -702,7 +702,6 @@ static int rx_audio_session_init_mcast(struct mtl_main_impl* impl, for (int i = 0; i < ops->num_port; i++) { if (!mt_is_multicast_ip(ops->ip_addr[i])) continue; port = mt_port_logic2phy(s->port_maps, i); - if (mt_drv_mcast_in_dp(impl, port)) continue; if (ops->flags & ST20_RX_FLAG_DATA_PATH_ONLY) { info("%s(%d), skip mcast join for port %d\n", __func__, s->idx, i); return 0; diff --git a/lib/src/st2110/st_rx_video_session.c b/lib/src/st2110/st_rx_video_session.c index 574ebdcae..9815334ea 100644 --- a/lib/src/st2110/st_rx_video_session.c +++ b/lib/src/st2110/st_rx_video_session.c @@ -2847,7 +2847,6 @@ static int rv_init_mcast(struct mtl_main_impl* impl, struct st_rx_video_session_ for (int i = 0; i < ops->num_port; i++) { if (!mt_is_multicast_ip(ops->ip_addr[i])) continue; port = mt_port_logic2phy(s->port_maps, i); - if (mt_drv_mcast_in_dp(impl, port)) continue; if (ops->flags & ST20_RX_FLAG_DATA_PATH_ONLY) { info("%s(%d), skip mcast join for port %d\n", __func__, s->idx, i); return 0; diff --git a/tests/script/native_af_xdp_json/mcast.json b/tests/script/native_af_xdp_json/mcast.json index 7cac7002d..53f00605b 100644 --- a/tests/script/native_af_xdp_json/mcast.json +++ b/tests/script/native_af_xdp_json/mcast.json @@ -1,10 +1,10 @@ { "interfaces": [ { - "name": "native_af_xdp:enp0s3np0", + "name": "native_af_xdp:enp175s0f0np0", }, { - "name": "native_af_xdp:enp0s4np0", + "name": "native_af_xdp:enp175s0f1np1", } ], "tx_sessions": [