Skip to content

Commit

Permalink
mcast: move socket based join to socket module (#897)
Browse files Browse the repository at this point in the history
also add mcast support for af_pkt

Signed-off-by: Frank Du <frank.du@intel.com>
  • Loading branch information
frankdjx committed Jun 7, 2024
1 parent 002b38e commit 9d87d0d
Show file tree
Hide file tree
Showing 14 changed files with 109 additions and 50 deletions.
18 changes: 2 additions & 16 deletions lib/src/datapath/mt_dp_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "mt_dp_socket.h"

#include "../mt_log.h"
#include "../mt_socket.h"
#include "../mt_stat.h"
#include "../mt_util.h"
#ifndef WINDOWSENV
Expand Down Expand Up @@ -479,26 +480,11 @@ static int rx_socket_init_fd(struct mt_rx_socket_entry* entry, int fd, bool reus

/* join multicast group, will drop automatically when socket fd closed */
if (mt_is_multicast_ip(flow->dip_addr)) {
uint32_t source = *(uint32_t*)flow->sip_addr;
if (source == 0) {
struct ip_mreq mreq;
memset(&mreq, 0, sizeof(mreq));
memcpy(&mreq.imr_multiaddr.s_addr, flow->dip_addr, MTL_IP_ADDR_LEN);
memcpy(&mreq.imr_interface.s_addr, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN);
ret = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
} else {
struct ip_mreq_source mreq;
memset(&mreq, 0, sizeof(mreq));
memcpy(&mreq.imr_multiaddr.s_addr, flow->dip_addr, MTL_IP_ADDR_LEN);
memcpy(&mreq.imr_interface.s_addr, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN);
memcpy(&mreq.imr_sourceaddr.s_addr, flow->sip_addr, MTL_IP_ADDR_LEN);
ret = setsockopt(fd, IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP, &mreq, sizeof(mreq));
}
ret = mt_socket_fd_join_multicast(impl, port, flow, fd);
if (ret < 0) {
err("%s(%d,%d), join multicast fail %d\n", __func__, port, fd, ret);
return ret;
}
info("%s(%d,%d), join multicast succ\n", __func__, port, fd);
}

return 0;
Expand Down
17 changes: 17 additions & 0 deletions lib/src/datapath/mt_shared_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "../dev/mt_dev.h"
#include "../mt_flow.h"
#include "../mt_log.h"
#include "../mt_socket.h"
#include "../mt_stat.h"
#include "../mt_util.h"

Expand Down Expand Up @@ -79,6 +80,10 @@ static int rsq_entry_free(struct mt_rsq_entry* entry) {
mt_ring_dequeue_clean(entry->ring);
rte_ring_free(entry->ring);
}
if (entry->mcast_fd) {
close(entry->mcast_fd);
entry->mcast_fd = -1;
}
info("%s(%d), succ on q %u idx %d\n", __func__, rsqm->port, entry->queue_id,
entry->idx);
mt_rte_free(entry);
Expand Down Expand Up @@ -181,6 +186,7 @@ struct mt_rsq_entry* mt_rsq_get(struct mtl_main_impl* impl, enum mtl_port port,
entry->queue_id = q;
entry->idx = idx;
entry->parent = rsqm;
entry->mcast_fd = -1;
rte_memcpy(&entry->flow, flow, sizeof(entry->flow));

if (rsqm->queue_mode == MT_QUEUE_MODE_XDP) {
Expand Down Expand Up @@ -224,6 +230,17 @@ struct mt_rsq_entry* mt_rsq_get(struct mtl_main_impl* impl, enum mtl_port port,
return NULL;
}

if (mt_pmd_is_dpdk_af_packet(impl, port) && mt_is_multicast_ip(flow->dip_addr)) {
/* join multicast group, will drop automatically when socket fd closed */
entry->mcast_fd = mt_socket_get_multicast_fd(impl, port, flow);
if (entry->mcast_fd < 0) {
err("%s(%d,%d), get multicast socket fd fail %d\n", __func__, port, idx,
entry->mcast_fd);
rsq_entry_free(entry);
return NULL;
}
}

rsq_lock(rsq_queue);
MT_TAILQ_INSERT_HEAD(&rsq_queue->head, entry, next);
rte_atomic32_inc(&rsq_queue->entry_cnt);
Expand Down
30 changes: 5 additions & 25 deletions lib/src/dev/mt_af_xdp.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "../mt_flow.h"
#include "../mt_instance.h"
#include "../mt_log.h"
#include "../mt_socket.h"
#include "../mt_stat.h"
#include "../mt_util.h"

Expand Down Expand Up @@ -943,6 +944,7 @@ struct mt_rx_xdp_entry* mt_rx_xdp_get(struct mtl_main_impl* impl, enum mtl_port
}
entry->parent = impl;
entry->port = port;
entry->mcast_fd = -1;
rte_memcpy(&entry->flow, flow, sizeof(entry->flow));

struct mt_xdp_priv* xdp = mt_if(impl, port)->xdp;
Expand Down Expand Up @@ -1000,35 +1002,12 @@ struct mt_rx_xdp_entry* mt_rx_xdp_get(struct mtl_main_impl* impl, enum mtl_port
/* join multicast group, will drop automatically when socket fd closed */
int mcast_fd = -1;
if (mt_is_multicast_ip(flow->dip_addr)) {
int ret;
mcast_fd = socket(AF_INET, SOCK_DGRAM, 0);
mcast_fd = mt_socket_get_multicast_fd(impl, port, flow);
if (mcast_fd < 0) {
err("%s(%d,%u), create multicast socket fail\n", __func__, port, q);
err("%s(%d,%u), get multicast socket fd fail\n", __func__, port, q);
mt_rx_xdp_put(entry);
return NULL;
}
uint32_t source = *(uint32_t*)flow->sip_addr;
if (source == 0) {
struct ip_mreq mreq;
memset(&mreq, 0, sizeof(mreq));
memcpy(&mreq.imr_multiaddr.s_addr, flow->dip_addr, MTL_IP_ADDR_LEN);
memcpy(&mreq.imr_interface.s_addr, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN);
ret = setsockopt(mcast_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
} else {
struct ip_mreq_source mreq;
memset(&mreq, 0, sizeof(mreq));
memcpy(&mreq.imr_multiaddr.s_addr, flow->dip_addr, MTL_IP_ADDR_LEN);
memcpy(&mreq.imr_interface.s_addr, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN);
memcpy(&mreq.imr_sourceaddr.s_addr, flow->sip_addr, MTL_IP_ADDR_LEN);
ret =
setsockopt(mcast_fd, IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP, &mreq, sizeof(mreq));
}
if (ret < 0) {
err("%s(%d), join multicast fail %d\n", __func__, port, ret);
mt_rx_xdp_put(entry);
return NULL;
}
info("%s(%d), join multicast succ\n", __func__, port);
}
entry->mcast_fd = mcast_fd;

Expand All @@ -1048,6 +1027,7 @@ int mt_rx_xdp_put(struct mt_rx_xdp_entry* entry) {

if (entry->mcast_fd > 0) {
close(entry->mcast_fd);
entry->mcast_fd = -1;
}

if (entry->flow_rsp) {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/dev/mt_dev.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ static const struct mt_dev_driver_info dev_drvs[] = {
.drv_type = MT_DRV_DPDK_AF_PKT,
.flow_type = MT_FLOW_ALL,
.flags = MT_DRV_F_USE_KERNEL_CTL | MT_DRV_F_RX_POOL_COMMON | MT_DRV_F_RX_NO_FLOW |
MT_DRV_F_KERNEL_BASED,
MT_DRV_F_KERNEL_BASED | MT_DRV_F_MCAST_IN_DP,
},
{
.name = "kernel_socket",
Expand Down
3 changes: 1 addition & 2 deletions lib/src/mt_arp.c
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,10 @@ int mt_arp_uinit(struct mtl_main_impl* impl) {
int mt_arp_get_mac(struct mtl_main_impl* impl, uint8_t dip[MTL_IP_ADDR_LEN],
struct rte_ether_addr* ea, enum mtl_port port, int timeout_ms) {
int ret;
struct mt_interface* inf = mt_if(impl, port);

dbg("%s(%d), start to get mac for ip %d.%d.%d.%d\n", __func__, port, dip[0], dip[1],
dip[2], dip[3]);
if ((inf->drv_info.flags & MT_DRV_F_USE_KERNEL_CTL) || mt_has_virtio_user(impl, port)) {
if (mt_drv_use_kernel_ctl(impl, port) || mt_has_virtio_user(impl, port)) {
ret = mt_socket_get_mac(impl, mt_kernel_if_name(impl, port), dip, ea, timeout_ms);
if (ret < 0) {
dbg("%s(%d), failed to get mac from socket %d\n", __func__, port, ret);
Expand Down
2 changes: 1 addition & 1 deletion lib/src/mt_flow.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ static struct mt_rx_flow_rsp* rx_flow_create(struct mt_interface* inf, uint16_t
/* no flow if MT_DRV_F_RX_NO_FLOW */
if (inf->drv_info.flags & MT_DRV_F_RX_NO_FLOW) return rsp;

if (inf->drv_info.flags & MT_DRV_F_USE_KERNEL_CTL) {
if (mt_drv_use_kernel_ctl(impl, port)) {
ret = mt_socket_add_flow(impl, port, q, flow);
if (ret < 0) {
err("%s(%d), socket add flow fail for queue %d\n", __func__, port, q);
Expand Down
2 changes: 2 additions & 0 deletions lib/src/mt_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,8 @@ struct mt_rsq_entry {
struct mt_rx_flow_rsp* flow_rsp;
struct mt_rsq_impl* parent;
struct rte_ring* ring;
/* wa for MTL_PMD_DPDK_AF_PACKET */
int mcast_fd;
uint32_t stat_enqueue_cnt;
uint32_t stat_dequeue_cnt;
uint32_t stat_enqueue_fail_cnt;
Expand Down
4 changes: 4 additions & 0 deletions lib/src/mt_mcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,8 @@ int mt_mcast_join(struct mtl_main_impl* impl, uint32_t group_addr, uint32_t sour
return 0;
}

if (mt_drv_mcast_in_dp(impl, port)) return 0;

if (mcast->group_num >= MT_MCAST_GROUP_MAX) {
err("%s(%d), reach max multicast group number!\n", __func__, port);
return -EIO;
Expand Down Expand Up @@ -660,6 +662,8 @@ int mt_mcast_leave(struct mtl_main_impl* impl, uint32_t group_addr, uint32_t sou
return 0;
}

if (mt_drv_mcast_in_dp(impl, port)) return 0;

struct mt_mcast_impl* mcast = get_mcast(impl, port);
int group_num = mcast->group_num;
struct mt_interface* inf = mt_if(impl, port);
Expand Down
68 changes: 68 additions & 0 deletions lib/src/mt_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,57 @@ int mt_socket_remove_flow(struct mtl_main_impl* impl, enum mtl_port port, int fl

return mt_instance_del_flow(impl, if_nametoindex(if_name), flow_id);
}

int mt_socket_fd_join_multicast(struct mtl_main_impl* impl, enum mtl_port port,
struct mt_rxq_flow* flow, int fd) {
uint32_t source = *(uint32_t*)flow->sip_addr;
int ret;

if (!mt_is_multicast_ip(flow->dip_addr)) {
err("%s(%d), not multicast dip\n", __func__, port);
return -EIO;
}

if (source == 0) {
struct ip_mreq mreq;
memset(&mreq, 0, sizeof(mreq));
memcpy(&mreq.imr_multiaddr.s_addr, flow->dip_addr, MTL_IP_ADDR_LEN);
memcpy(&mreq.imr_interface.s_addr, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN);
ret = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
} else {
struct ip_mreq_source mreq;
memset(&mreq, 0, sizeof(mreq));
memcpy(&mreq.imr_multiaddr.s_addr, flow->dip_addr, MTL_IP_ADDR_LEN);
memcpy(&mreq.imr_interface.s_addr, mt_sip_addr(impl, port), MTL_IP_ADDR_LEN);
memcpy(&mreq.imr_sourceaddr.s_addr, flow->sip_addr, MTL_IP_ADDR_LEN);
ret = setsockopt(fd, IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP, &mreq, sizeof(mreq));
}

return ret;
}

int mt_socket_get_multicast_fd(struct mtl_main_impl* impl, enum mtl_port port,
struct mt_rxq_flow* flow) {
int ret;
int mcast_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (mcast_fd < 0) {
err("%s(%d), create multicast socket fail\n", __func__, port);
return mcast_fd;
}

ret = mt_socket_fd_join_multicast(impl, port, flow, mcast_fd);
if (ret < 0) {
err("%s(%d), setsockopt fail %d\n", __func__, port, ret);
close(mcast_fd);
return ret;
} else {
/* return the fd */
uint8_t* ip = flow->dip_addr;
info("%s(%d), join %u.%u.%u.%u succ\n", __func__, port, ip[0], ip[1], ip[2], ip[3]);
return mcast_fd;
}
}

#else
int mt_socket_get_if_ip(const char* if_name, uint8_t ip[MTL_IP_ADDR_LEN],
uint8_t netmask[MTL_IP_ADDR_LEN]) {
Expand Down Expand Up @@ -474,6 +525,23 @@ int mt_socket_remove_flow(struct mtl_main_impl* impl, enum mtl_port port, int fl
MTL_MAY_UNUSED(dst_port);
return -ENOTSUP;
}

int mt_socket_get_multicast_fd(struct mtl_main_impl* impl, enum mtl_port port,
struct mt_rxq_flow* flow) {
MTL_MAY_UNUSED(impl);
MTL_MAY_UNUSED(port);
MTL_MAY_UNUSED(flow);
return -ENOTSUP;
}

int mt_socket_fd_join_multicast(struct mtl_main_impl* impl, enum mtl_port port,
struct mt_rxq_flow* flow, int fd) {
MTL_MAY_UNUSED(impl);
MTL_MAY_UNUSED(port);
MTL_MAY_UNUSED(flow);
MTL_MAY_UNUSED(fd);
return -ENOTSUP;
}
#endif

int mtl_get_if_ip(char* if_name, uint8_t ip[MTL_IP_ADDR_LEN],
Expand Down
6 changes: 6 additions & 0 deletions lib/src/mt_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,10 @@ int mt_socket_add_flow(struct mtl_main_impl* impl, enum mtl_port port, uint16_t
int mt_socket_remove_flow(struct mtl_main_impl* impl, enum mtl_port port, int flow_id,
uint16_t dst_port);

int mt_socket_fd_join_multicast(struct mtl_main_impl* impl, enum mtl_port port,
struct mt_rxq_flow* flow, int fd);

int mt_socket_get_multicast_fd(struct mtl_main_impl* impl, enum mtl_port port,
struct mt_rxq_flow* flow);

#endif
1 change: 0 additions & 1 deletion lib/src/st2110/st_rx_ancillary_session.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,6 @@ static int rx_ancillary_session_init_mcast(struct mtl_main_impl* impl,
for (int i = 0; i < ops->num_port; i++) {
if (!mt_is_multicast_ip(ops->ip_addr[i])) continue;
port = mt_port_logic2phy(s->port_maps, i);
if (mt_drv_mcast_in_dp(impl, port)) continue;
if (ops->flags & ST20_RX_FLAG_DATA_PATH_ONLY) {
info("%s(%d), skip mcast join for port %d\n", __func__, s->idx, i);
return 0;
Expand Down
1 change: 0 additions & 1 deletion lib/src/st2110/st_rx_audio_session.c
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,6 @@ static int rx_audio_session_init_mcast(struct mtl_main_impl* impl,
for (int i = 0; i < ops->num_port; i++) {
if (!mt_is_multicast_ip(ops->ip_addr[i])) continue;
port = mt_port_logic2phy(s->port_maps, i);
if (mt_drv_mcast_in_dp(impl, port)) continue;
if (ops->flags & ST20_RX_FLAG_DATA_PATH_ONLY) {
info("%s(%d), skip mcast join for port %d\n", __func__, s->idx, i);
return 0;
Expand Down
1 change: 0 additions & 1 deletion lib/src/st2110/st_rx_video_session.c
Original file line number Diff line number Diff line change
Expand Up @@ -2847,7 +2847,6 @@ static int rv_init_mcast(struct mtl_main_impl* impl, struct st_rx_video_session_
for (int i = 0; i < ops->num_port; i++) {
if (!mt_is_multicast_ip(ops->ip_addr[i])) continue;
port = mt_port_logic2phy(s->port_maps, i);
if (mt_drv_mcast_in_dp(impl, port)) continue;
if (ops->flags & ST20_RX_FLAG_DATA_PATH_ONLY) {
info("%s(%d), skip mcast join for port %d\n", __func__, s->idx, i);
return 0;
Expand Down
4 changes: 2 additions & 2 deletions tests/script/native_af_xdp_json/mcast.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"interfaces": [
{
"name": "native_af_xdp:enp0s3np0",
"name": "native_af_xdp:enp175s0f0np0",
},
{
"name": "native_af_xdp:enp0s4np0",
"name": "native_af_xdp:enp175s0f1np1",
}
],
"tx_sessions": [
Expand Down

0 comments on commit 9d87d0d

Please sign in to comment.