diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f19c289..802143df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ ## Changelog for 24.12 * ice: update driver to 1.14.9 +* st2110/20: add force numa option support on session level, see ST20_TX_FLAG_FORCE_NUMA/ST20_RX_FLAG_FORCE_NUMA +* st2110/30: add force numa option support on session level, see ST30_TX_FLAG_FORCE_NUMA/ST30_RX_FLAG_FORCE_NUMA ## Changelog for 24.06 diff --git a/app/src/app_base.h b/app/src/app_base.h index f87883a6..be8e0b08 100644 --- a/app/src/app_base.h +++ b/app/src/app_base.h @@ -570,6 +570,8 @@ struct st_app_context { uint16_t rx_burst_size; int force_tx_video_numa; int force_rx_video_numa; + int force_tx_audio_numa; + int force_rx_audio_numa; bool ptp_systime_sync; int ptp_sync_cnt; diff --git a/app/src/args.c b/app/src/args.c index 307ec36e..d89eaf38 100644 --- a/app/src/args.c +++ b/app/src/args.c @@ -67,6 +67,8 @@ enum st_args_cmd { ST_ARG_FORCE_NUMA, ST_ARG_FORCE_TX_VIDEO_NUMA, ST_ARG_FORCE_RX_VIDEO_NUMA, + ST_ARG_FORCE_TX_AUDIO_NUMA, + ST_ARG_FORCE_RX_AUDIO_NUMA, ST_ARG_CONFIG_FILE = 0x300, ST_ARG_TEST_TIME, @@ -211,6 +213,8 @@ static struct option st_app_args_options[] = { {"force_numa", required_argument, 0, ST_ARG_FORCE_NUMA}, {"force_tx_video_numa", required_argument, 0, ST_ARG_FORCE_TX_VIDEO_NUMA}, {"force_rx_video_numa", required_argument, 0, ST_ARG_FORCE_RX_VIDEO_NUMA}, + {"force_tx_audio_numa", required_argument, 0, ST_ARG_FORCE_TX_AUDIO_NUMA}, + {"force_rx_audio_numa", required_argument, 0, ST_ARG_FORCE_RX_AUDIO_NUMA}, {"config_file", required_argument, 0, ST_ARG_CONFIG_FILE}, {"test_time", required_argument, 0, ST_ARG_TEST_TIME}, @@ -615,6 +619,12 @@ int st_app_parse_args(struct st_app_context* ctx, struct mtl_init_params* p, int case ST_ARG_FORCE_RX_VIDEO_NUMA: ctx->force_rx_video_numa = atoi(optarg); break; + case ST_ARG_FORCE_TX_AUDIO_NUMA: + ctx->force_tx_audio_numa = atoi(optarg); + break; + case ST_ARG_FORCE_RX_AUDIO_NUMA: + ctx->force_rx_audio_numa = atoi(optarg); + break; case ST_ARG_SHAPING: if (!strcmp(optarg, "narrow")) ctx->tx_pacing_type = ST21_PACING_NARROW; diff --git a/app/src/legacy/rx_audio_app.c b/app/src/legacy/rx_audio_app.c index a35c4260..a11b36c8 100644 --- a/app/src/legacy/rx_audio_app.c +++ b/app/src/legacy/rx_audio_app.c @@ -384,6 +384,11 @@ static int app_rx_audio_init(struct st_app_context* ctx, st_json_audio_session_t s->enable_timing_parser_meta = true; } + if (ctx->force_rx_audio_numa >= 0) { + ops.flags |= ST30_RX_FLAG_FORCE_NUMA; + ops.socket_id = ctx->force_rx_audio_numa; + } + st_pthread_mutex_init(&s->st30_wake_mutex, NULL); st_pthread_cond_init(&s->st30_wake_cond, NULL); diff --git a/app/src/legacy/tx_audio_app.c b/app/src/legacy/tx_audio_app.c index 3d56b88b..8911038e 100644 --- a/app/src/legacy/tx_audio_app.c +++ b/app/src/legacy/tx_audio_app.c @@ -476,6 +476,11 @@ static int app_tx_audio_init(struct st_app_context* ctx, st_json_audio_session_t ops.rl_accuracy_ns = ctx->tx_audio_rl_accuracy_us * 1000; ops.rl_offset_ns = ctx->tx_audio_rl_offset_us * 1000; + if (ctx->force_tx_audio_numa >= 0) { + ops.flags |= ST30_TX_FLAG_FORCE_NUMA; + ops.socket_id = ctx->force_tx_audio_numa; + } + handle = st30_tx_create(ctx->st, &ops); if (!handle) { err("%s(%d), st30_tx_create fail\n", __func__, idx); diff --git a/app/src/rx_st30p_app.c b/app/src/rx_st30p_app.c index df5f8ed2..b7204fe6 100644 --- a/app/src/rx_st30p_app.c +++ b/app/src/rx_st30p_app.c @@ -128,6 +128,11 @@ static int app_rx_st30p_init(struct st_app_context* ctx, ops.flags |= ST30P_RX_FLAG_BLOCK_GET; ops.framebuff_cnt = 3; + if (ctx->force_rx_audio_numa >= 0) { + ops.flags |= ST30P_RX_FLAG_FORCE_NUMA; + ops.socket_id = ctx->force_rx_audio_numa; + } + s->num_port = ops.port.num_port; handle = st30p_rx_create(ctx->st, &ops); diff --git a/app/src/rxtx_app.c b/app/src/rxtx_app.c index 7f08847b..5c17be47 100644 --- a/app/src/rxtx_app.c +++ b/app/src/rxtx_app.c @@ -214,6 +214,8 @@ static void st_app_ctx_init(struct st_app_context* ctx) { ctx->force_tx_video_numa = -1; ctx->force_rx_video_numa = -1; + ctx->force_tx_audio_numa = -1; + ctx->force_rx_audio_numa = -1; ctx->last_stat_time_ns = st_app_get_monotonic_time(); } diff --git a/app/src/tx_st30p_app.c b/app/src/tx_st30p_app.c index aa1aedde..daac3004 100644 --- a/app/src/tx_st30p_app.c +++ b/app/src/tx_st30p_app.c @@ -213,6 +213,11 @@ static int app_tx_st30p_init(struct st_app_context* ctx, st_json_st30p_session_t if (ctx->tx_audio_dedicate_queue) ops.flags |= ST30P_TX_FLAG_DEDICATE_QUEUE; + if (ctx->force_tx_audio_numa >= 0) { + ops.flags |= ST30P_TX_FLAG_FORCE_NUMA; + ops.socket_id = ctx->force_tx_audio_numa; + } + handle = st30p_tx_create(ctx->st, &ops); if (!handle) { err("%s(%d), st30p_tx_create fail\n", __func__, idx); diff --git a/include/st30_api.h b/include/st30_api.h index 430cace5..1485be2e 100644 --- a/include/st30_api.h +++ b/include/st30_api.h @@ -65,6 +65,8 @@ typedef struct st_rx_audio_session_handle_impl* st30_rx_handle; * If use dedicated queue for TX. */ #define ST30_TX_FLAG_DEDICATE_QUEUE (MTL_BIT32(7)) +/** Force the numa of the created session, both CPU and memory */ +#define ST30_TX_FLAG_FORCE_NUMA (MTL_BIT32(8)) /** * Flag bit in flags of struct st30_rx_ops, for non MTL_PMD_DPDK_USER. @@ -77,6 +79,8 @@ typedef struct st_rx_audio_session_handle_impl* st30_rx_handle; * If enable the rtcp. */ #define ST30_RX_FLAG_ENABLE_RTCP (MTL_BIT32(1)) +/** Force the numa of the created session, both CPU and memory */ +#define ST30_RX_FLAG_FORCE_NUMA (MTL_BIT32(2)) /** * Flag bit in flags of struct st30_rx_ops. * Enable the timing analyze in the stat dump @@ -404,6 +408,8 @@ struct st30_tx_ops { uint32_t rl_accuracy_ns; /** Optional for ST30_TX_PACING_WAY_RL, the offset time(us) for warmup check point */ int32_t rl_offset_ns; + /** Use this socket if ST30_TX_FLAG_FORCE_NUMA is on, default use the NIC numa */ + int socket_id; /** * size for each sample group, @@ -499,6 +505,8 @@ struct st30_rx_ops { */ int (*notify_timing_parser_result)(void* priv, enum mtl_session_port port, struct st30_rx_tp_meta* tp); + /** Use this socket if ST30_RX_FLAG_FORCE_NUMA is on, default use the NIC numa */ + int socket_id; /** * size for each sample group, diff --git a/include/st30_pipeline_api.h b/include/st30_pipeline_api.h index 0558e304..1b572508 100644 --- a/include/st30_pipeline_api.h +++ b/include/st30_pipeline_api.h @@ -41,6 +41,8 @@ enum st30p_tx_flag { * If use dedicated queue for TX. */ ST30P_TX_FLAG_DEDICATE_QUEUE = (MTL_BIT32(7)), + /** Force the numa of the created session, both CPU and memory */ + ST30P_TX_FLAG_FORCE_NUMA = (MTL_BIT32(8)), /** Enable the st30p_tx_get_frame block behavior to wait until a frame becomes available or timeout(default: 1s, use st30p_tx_set_block_timeout to customize)*/ @@ -141,6 +143,8 @@ struct st30p_tx_ops { uint32_t rl_accuracy_ns; /** Optional for ST30_TX_PACING_WAY_RL, the offset time(us) for warmup check point */ int32_t rl_offset_ns; + /** Use this socket if ST30P_TX_FLAG_FORCE_NUMA is on, default use the NIC numa */ + int socket_id; }; /** @@ -174,6 +178,8 @@ enum st30p_rx_flag { * Use st30_rx_get_queue_meta to get the queue meta(queue number etc) info. */ ST30P_RX_FLAG_DATA_PATH_ONLY = (MTL_BIT32(0)), + /** Force the numa of the created session, both CPU and memory */ + ST30P_RX_FLAG_FORCE_NUMA = (MTL_BIT32(2)), /** Enable the st30p_rx_get_frame block behavior to wait until a frame becomes available or timeout(default: 1s, use st30p_rx_set_block_timeout to customize) */ @@ -214,6 +220,8 @@ struct st30p_rx_ops { * tasklet routine. */ int (*notify_frame_available)(void* priv); + /** Use this socket if ST30P_RX_FLAG_FORCE_NUMA is on, default use the NIC numa */ + int socket_id; }; /** diff --git a/lib/src/st2110/pipeline/st30_pipeline_rx.c b/lib/src/st2110/pipeline/st30_pipeline_rx.c index e5e60ab0..b9b82877 100644 --- a/lib/src/st2110/pipeline/st30_pipeline_rx.c +++ b/lib/src/st2110/pipeline/st30_pipeline_rx.c @@ -131,6 +131,13 @@ static int rx_st30p_create_transport(struct mtl_main_impl* impl, struct st30p_rx ops_rx.type = ST30_TYPE_FRAME_LEVEL; ops_rx.notify_frame_ready = rx_st30p_frame_ready; + if (ops->flags & ST30P_RX_FLAG_DATA_PATH_ONLY) + ops_rx.flags |= ST30_RX_FLAG_DATA_PATH_ONLY; + if (ops->flags & ST30P_RX_FLAG_FORCE_NUMA) { + ops_rx.socket_id = ops->socket_id; + ops_rx.flags |= ST30_RX_FLAG_FORCE_NUMA; + } + transport = st30_rx_create(impl, &ops_rx); if (!transport) { err("%s(%d), transport create fail\n", __func__, idx); @@ -151,10 +158,9 @@ static int rx_st30p_uinit_fbs(struct st30p_rx_ctx* ctx) { return 0; } -static int rx_st30p_init_fbs(struct mtl_main_impl* impl, struct st30p_rx_ctx* ctx, - struct st30p_rx_ops* ops) { +static int rx_st30p_init_fbs(struct st30p_rx_ctx* ctx, struct st30p_rx_ops* ops) { int idx = ctx->idx; - int soc_id = mt_socket_id(impl, MTL_PORT_P); + int soc_id = ctx->socket_id; struct st30p_rx_frame* frames; frames = mt_rte_zmalloc_socket(sizeof(*frames) * ctx->framebuff_cnt, soc_id); @@ -392,13 +398,23 @@ st30p_rx_handle st30p_rx_create(mtl_handle mt, struct st30p_rx_ops* ops) { return NULL; } - ctx = mt_rte_zmalloc_socket(sizeof(*ctx), mt_socket_id(impl, MTL_PORT_P)); + enum mtl_port port = mt_port_by_name(impl, ops->port.port[MTL_SESSION_PORT_P]); + if (port >= MTL_PORT_MAX) return NULL; + int socket = mt_socket_id(impl, port); + + if (ops->flags & ST30P_RX_FLAG_FORCE_NUMA) { + socket = ops->socket_id; + info("%s, ST30P_RX_FLAG_FORCE_NUMA to socket %d\n", __func__, socket); + } + + ctx = mt_rte_zmalloc_socket(sizeof(*ctx), socket); if (!ctx) { - err("%s, ctx malloc fail\n", __func__); + err("%s, ctx malloc fail on socket %d\n", __func__, socket); return NULL; } ctx->idx = idx; + ctx->socket_id = socket; ctx->ready = false; ctx->impl = impl; ctx->type = MT_ST30_HANDLE_PIPELINE_RX; @@ -420,7 +436,7 @@ st30p_rx_handle st30p_rx_create(mtl_handle mt, struct st30p_rx_ops* ops) { ctx->framebuff_cnt = ops->framebuff_cnt; /* init fbs */ - ret = rx_st30p_init_fbs(impl, ctx, ops); + ret = rx_st30p_init_fbs(ctx, ops); if (ret < 0) { err("%s(%d), init fbs fail %d\n", __func__, idx, ret); st30p_rx_free(ctx); diff --git a/lib/src/st2110/pipeline/st30_pipeline_rx.h b/lib/src/st2110/pipeline/st30_pipeline_rx.h index d134b126..5163f9bc 100644 --- a/lib/src/st2110/pipeline/st30_pipeline_rx.h +++ b/lib/src/st2110/pipeline/st30_pipeline_rx.h @@ -24,6 +24,7 @@ struct st30p_rx_frame { struct st30p_rx_ctx { struct mtl_main_impl* impl; int idx; + int socket_id; enum mt_handle_type type; /* for sanity check */ char ops_name[ST_MAX_NAME_LEN]; diff --git a/lib/src/st2110/pipeline/st30_pipeline_tx.c b/lib/src/st2110/pipeline/st30_pipeline_tx.c index 5239ae55..b6464dda 100644 --- a/lib/src/st2110/pipeline/st30_pipeline_tx.c +++ b/lib/src/st2110/pipeline/st30_pipeline_tx.c @@ -159,6 +159,10 @@ static int tx_st30p_create_transport(struct mtl_main_impl* impl, struct st30p_tx } if (ops->flags & ST30P_TX_FLAG_DEDICATE_QUEUE) ops_tx.flags |= ST30_TX_FLAG_DEDICATE_QUEUE; + if (ops->flags & ST30P_TX_FLAG_FORCE_NUMA) { + ops_tx.socket_id = ops->socket_id; + ops_tx.flags |= ST30_TX_FLAG_FORCE_NUMA; + } ops_tx.pacing_way = ops->pacing_way; ops_tx.fmt = ops->fmt; @@ -207,10 +211,9 @@ static int tx_st30p_uinit_fbs(struct st30p_tx_ctx* ctx) { return 0; } -static int tx_st30p_init_fbs(struct mtl_main_impl* impl, struct st30p_tx_ctx* ctx, - struct st30p_tx_ops* ops) { +static int tx_st30p_init_fbs(struct st30p_tx_ctx* ctx, struct st30p_tx_ops* ops) { int idx = ctx->idx; - int soc_id = mt_socket_id(impl, MTL_PORT_P); + int soc_id = ctx->socket_id; struct st30p_tx_frame* frames; frames = mt_rte_zmalloc_socket(sizeof(*frames) * ctx->framebuff_cnt, soc_id); @@ -471,13 +474,23 @@ st30p_tx_handle st30p_tx_create(mtl_handle mt, struct st30p_tx_ops* ops) { return NULL; } - ctx = mt_rte_zmalloc_socket(sizeof(*ctx), mt_socket_id(impl, MTL_PORT_P)); + enum mtl_port port = mt_port_by_name(impl, ops->port.port[MTL_SESSION_PORT_P]); + if (port >= MTL_PORT_MAX) return NULL; + int socket = mt_socket_id(impl, port); + + if (ops->flags & ST30P_RX_FLAG_FORCE_NUMA) { + socket = ops->socket_id; + info("%s, ST30P_RX_FLAG_FORCE_NUMA to socket %d\n", __func__, socket); + } + + ctx = mt_rte_zmalloc_socket(sizeof(*ctx), socket); if (!ctx) { - err("%s, ctx malloc fail\n", __func__); + err("%s, ctx malloc fail on socket %d\n", __func__, socket); return NULL; } ctx->idx = idx; + ctx->socket_id = socket; ctx->ready = false; ctx->impl = impl; ctx->type = MT_ST30_HANDLE_PIPELINE_TX; @@ -501,7 +514,7 @@ st30p_tx_handle st30p_tx_create(mtl_handle mt, struct st30p_tx_ops* ops) { ctx->framebuff_cnt = ops->framebuff_cnt; /* init fbs */ - ret = tx_st30p_init_fbs(impl, ctx, ops); + ret = tx_st30p_init_fbs(ctx, ops); if (ret < 0) { err("%s(%d), init fbs fail %d\n", __func__, idx, ret); st30p_tx_free(ctx); diff --git a/lib/src/st2110/pipeline/st30_pipeline_tx.h b/lib/src/st2110/pipeline/st30_pipeline_tx.h index 91907f1e..1ff92b2a 100644 --- a/lib/src/st2110/pipeline/st30_pipeline_tx.h +++ b/lib/src/st2110/pipeline/st30_pipeline_tx.h @@ -25,6 +25,7 @@ struct st30p_tx_frame { struct st30p_tx_ctx { struct mtl_main_impl* impl; int idx; + int socket_id; enum mt_handle_type type; /* for sanity check */ char ops_name[ST_MAX_NAME_LEN]; diff --git a/lib/src/st2110/st_header.h b/lib/src/st2110/st_header.h index 8ac5eb65..5640fa0a 100644 --- a/lib/src/st2110/st_header.h +++ b/lib/src/st2110/st_header.h @@ -785,6 +785,7 @@ struct st_tx_audio_session_rl_info { struct st_tx_audio_session_impl { int idx; /* index for current session */ + int socket_id; struct st30_tx_ops ops; char ops_name[ST_MAX_NAME_LEN]; int recovery_idx; @@ -872,6 +873,7 @@ struct st_tx_audio_session_impl { struct st_tx_audio_sessions_mgr { struct mtl_main_impl* parent; + int socket_id; int idx; /* index for current sessions mgr */ int max_idx; /* max session index */ struct mt_sch_tasklet_impl* tasklet; @@ -955,6 +957,7 @@ struct st_rx_audio_tp { struct st_rx_audio_session_impl { int idx; /* index for current session */ struct st_rx_audio_sessions_mgr* mgr; + int socket_id; bool attached; struct st30_rx_ops ops; char ops_name[ST_MAX_NAME_LEN]; @@ -1046,6 +1049,7 @@ struct st_tx_ancillary_session_pacing { struct st_tx_ancillary_session_impl { int idx; /* index for current session */ + int socket_id; struct st_tx_ancillary_sessions_mgr* mgr; struct st40_tx_ops ops; char ops_name[ST_MAX_NAME_LEN]; @@ -1115,6 +1119,7 @@ struct st_tx_ancillary_session_impl { struct st_tx_ancillary_sessions_mgr { struct mtl_main_impl* parent; + int socket_id; int idx; /* index for current sessions mgr */ int max_idx; /* max session index */ struct mt_sch_tasklet_impl* tasklet; @@ -1138,6 +1143,7 @@ struct st_tx_ancillary_sessions_mgr { struct st_rx_ancillary_session_impl { int idx; /* index for current session */ + int socket_id; struct st_rx_ancillary_sessions_mgr* mgr; bool attached; struct st40_rx_ops ops; diff --git a/lib/src/st2110/st_rx_ancillary_session.c b/lib/src/st2110/st_rx_ancillary_session.c index 5fe03cfe..03cec377 100644 --- a/lib/src/st2110/st_rx_ancillary_session.c +++ b/lib/src/st2110/st_rx_ancillary_session.c @@ -332,19 +332,17 @@ static int rx_ancillary_session_init_mcast(struct mtl_main_impl* impl, return 0; } -static int rx_ancillary_session_init_sw(struct mtl_main_impl* impl, - struct st_rx_ancillary_sessions_mgr* mgr, +static int rx_ancillary_session_init_sw(struct st_rx_ancillary_sessions_mgr* mgr, struct st_rx_ancillary_session_impl* s) { char ring_name[32]; struct rte_ring* ring; unsigned int flags, count; int mgr_idx = mgr->idx, idx = s->idx; - enum mtl_port port = mt_port_logic2phy(s->port_maps, MTL_SESSION_PORT_P); snprintf(ring_name, 32, "%sM%dS%d_PKT", ST_RX_ANCILLARY_PREFIX, mgr_idx, idx); flags = RING_F_SP_ENQ | RING_F_SC_DEQ; /* single-producer and single-consumer */ count = s->ops.rtp_ring_size; - ring = rte_ring_create(ring_name, count, mt_socket_id(impl, port), flags); + ring = rte_ring_create(ring_name, count, s->socket_id, flags); if (count <= 0) { err("%s(%d,%d), invalid rtp_ring_size %d\n", __func__, mgr_idx, idx, count); return -ENOMEM; @@ -413,7 +411,7 @@ static int rx_ancillary_session_attach(struct mtl_main_impl* impl, return ret; } - ret = rx_ancillary_session_init_sw(impl, mgr, s); + ret = rx_ancillary_session_init_sw(mgr, s); if (ret < 0) { err("%s(%d), rx_ancillary_session_init_rtps fail %d\n", __func__, idx, ret); rx_ancillary_session_uinit(impl, s); @@ -611,22 +609,24 @@ static int rx_ancillary_sessions_mgr_init(struct mtl_main_impl* impl, } static struct st_rx_ancillary_session_impl* rx_ancillary_sessions_mgr_attach( - struct st_rx_ancillary_sessions_mgr* mgr, struct st40_rx_ops* ops) { + struct mtl_sch_impl* sch, struct st40_rx_ops* ops) { + struct st_rx_ancillary_sessions_mgr* mgr = &sch->rx_anc_mgr; int midx = mgr->idx; - struct mtl_main_impl* impl = mgr->parent; int ret; struct st_rx_ancillary_session_impl* s; + int socket = mt_sch_socket_id(sch); /* find one empty slot in the mgr */ for (int i = 0; i < ST_MAX_RX_ANC_SESSIONS; i++) { if (!rx_ancillary_session_get_empty(mgr, i)) continue; - s = mt_rte_zmalloc_socket(sizeof(*s), mt_socket_id(impl, MTL_PORT_P)); + s = mt_rte_zmalloc_socket(sizeof(*s), socket); if (!s) { err("%s(%d), session malloc fail on %d\n", __func__, midx, i); rx_ancillary_session_put(mgr, i); return NULL; } + s->socket_id = socket; ret = rx_ancillary_session_init(mgr, s, i); if (ret < 0) { err("%s(%d), init fail on %d\n", __func__, midx, i); @@ -796,14 +796,19 @@ st40_rx_handle st40_rx_create(mtl_handle mt, struct st40_rx_ops* ops) { return NULL; } - s_impl = mt_rte_zmalloc_socket(sizeof(*s_impl), mt_socket_id(impl, MTL_PORT_P)); + enum mtl_port port = mt_port_by_name(impl, ops->port[MTL_SESSION_PORT_P]); + if (port >= MTL_PORT_MAX) return NULL; + int socket = mt_socket_id(impl, port); + + s_impl = mt_rte_zmalloc_socket(sizeof(*s_impl), socket); if (!s_impl) { - err("%s, s_impl malloc fail\n", __func__); + err("%s, s_impl malloc fail on socket %d\n", __func__, socket); return NULL; } quota_mbs = 0; - sch = mt_sch_get(impl, quota_mbs, MT_SCH_TYPE_DEFAULT, MT_SCH_MASK_ALL); + sch = + mt_sch_get_by_socket(impl, quota_mbs, MT_SCH_TYPE_DEFAULT, MT_SCH_MASK_ALL, socket); if (!sch) { mt_rte_free(s_impl); err("%s, get sch fail\n", __func__); @@ -821,7 +826,7 @@ st40_rx_handle st40_rx_create(mtl_handle mt, struct st40_rx_ops* ops) { } mt_pthread_mutex_lock(&sch->rx_anc_mgr_mutex); - s = rx_ancillary_sessions_mgr_attach(&sch->rx_anc_mgr, ops); + s = rx_ancillary_sessions_mgr_attach(sch, ops); mt_pthread_mutex_unlock(&sch->rx_anc_mgr_mutex); if (!s) { err("%s, rx_ancillary_sessions_mgr_attach fail\n", __func__); diff --git a/lib/src/st2110/st_rx_audio_session.c b/lib/src/st2110/st_rx_audio_session.c index aa61c177..5abc2f70 100644 --- a/lib/src/st2110/st_rx_audio_session.c +++ b/lib/src/st2110/st_rx_audio_session.c @@ -108,10 +108,8 @@ static int rx_audio_session_free_frames(struct st_rx_audio_session_impl* s) { return 0; } -static int rx_audio_session_alloc_frames(struct mtl_main_impl* impl, - struct st_rx_audio_session_impl* s) { - enum mtl_port port = mt_port_logic2phy(s->port_maps, MTL_SESSION_PORT_P); - int soc_id = mt_socket_id(impl, port); +static int rx_audio_session_alloc_frames(struct st_rx_audio_session_impl* s) { + int soc_id = s->socket_id; int idx = s->idx; size_t size = s->st30_frame_size; struct st_frame_trans* st30_frame; @@ -158,14 +156,12 @@ static int rx_audio_session_free_rtps(struct st_rx_audio_session_impl* s) { return 0; } -static int rx_audio_session_alloc_rtps(struct mtl_main_impl* impl, - struct st_rx_audio_sessions_mgr* mgr, +static int rx_audio_session_alloc_rtps(struct st_rx_audio_sessions_mgr* mgr, struct st_rx_audio_session_impl* s) { char ring_name[32]; struct rte_ring* ring; unsigned int flags, count; int mgr_idx = mgr->idx, idx = s->idx; - enum mtl_port port = mt_port_logic2phy(s->port_maps, MTL_SESSION_PORT_P); snprintf(ring_name, 32, "%sM%dS%d_RTP", ST_RX_AUDIO_PREFIX, mgr_idx, idx); flags = RING_F_SP_ENQ | RING_F_SC_DEQ; /* single-producer and single-consumer */ @@ -174,7 +170,7 @@ static int rx_audio_session_alloc_rtps(struct mtl_main_impl* impl, err("%s(%d,%d), invalid rtp_ring_size %d\n", __func__, mgr_idx, idx, count); return -ENOMEM; } - ring = rte_ring_create(ring_name, count, mt_socket_id(impl, port), flags); + ring = rte_ring_create(ring_name, count, s->socket_id, flags); if (!ring) { err("%s(%d,%d), rte_ring_create fail\n", __func__, mgr_idx, idx); return -ENOMEM; @@ -722,8 +718,7 @@ static int rx_audio_session_uinit_sw(struct st_rx_audio_session_impl* s) { return 0; } -static int rx_audio_session_init_sw(struct mtl_main_impl* impl, - struct st_rx_audio_sessions_mgr* mgr, +static int rx_audio_session_init_sw(struct st_rx_audio_sessions_mgr* mgr, struct st_rx_audio_session_impl* s) { enum st30_type st30_type = s->ops.type; int idx = s->idx; @@ -731,10 +726,10 @@ static int rx_audio_session_init_sw(struct mtl_main_impl* impl, switch (st30_type) { case ST30_TYPE_FRAME_LEVEL: - ret = rx_audio_session_alloc_frames(impl, s); + ret = rx_audio_session_alloc_frames(s); break; case ST30_TYPE_RTP_LEVEL: - ret = rx_audio_session_alloc_rtps(impl, mgr, s); + ret = rx_audio_session_alloc_rtps(mgr, s); break; default: err("%s(%d), error st30_type %d\n", __func__, idx, st30_type); @@ -839,7 +834,7 @@ static int rx_audio_session_attach(struct mtl_main_impl* impl, return ret; } - ret = rx_audio_session_init_sw(impl, mgr, s); + ret = rx_audio_session_init_sw(mgr, s); if (ret < 0) { err("%s(%d), rx_audio_session_init_sw fail %d\n", __func__, idx, ret); rx_audio_session_uinit(impl, s); @@ -1057,22 +1052,24 @@ static int rx_audio_session_init(struct st_rx_audio_sessions_mgr* mgr, } static struct st_rx_audio_session_impl* rx_audio_sessions_mgr_attach( - struct st_rx_audio_sessions_mgr* mgr, struct st30_rx_ops* ops) { + struct mtl_sch_impl* sch, struct st30_rx_ops* ops) { + struct st_rx_audio_sessions_mgr* mgr = &sch->rx_a_mgr; int midx = mgr->idx; - struct mtl_main_impl* impl = mgr->parent; int ret; struct st_rx_audio_session_impl* s; + int socket = mt_sch_socket_id(sch); /* find one empty slot in the mgr */ for (int i = 0; i < ST_SCH_MAX_RX_AUDIO_SESSIONS; i++) { if (!rx_audio_session_get_empty(mgr, i)) continue; - s = mt_rte_zmalloc_socket(sizeof(*s), mt_socket_id(impl, MTL_PORT_P)); + s = mt_rte_zmalloc_socket(sizeof(*s), socket); if (!s) { err("%s(%d), session malloc fail on %d\n", __func__, midx, i); rx_audio_session_put(mgr, i); return NULL; } + s->socket_id = socket; ret = rx_audio_session_init(mgr, s, i); if (ret < 0) { err("%s(%d), init fail on %d\n", __func__, midx, i); @@ -1255,14 +1252,24 @@ st30_rx_handle st30_rx_create(mtl_handle mt, struct st30_rx_ops* ops) { return NULL; } - s_impl = mt_rte_zmalloc_socket(sizeof(*s_impl), mt_socket_id(impl, MTL_PORT_P)); + enum mtl_port port = mt_port_by_name(impl, ops->port[MTL_SESSION_PORT_P]); + if (port >= MTL_PORT_MAX) return NULL; + int socket = mt_socket_id(impl, port); + + if (ops->flags & ST30_RX_FLAG_FORCE_NUMA) { + socket = ops->socket_id; + info("%s, ST30_RX_FLAG_FORCE_NUMA to socket %d\n", __func__, socket); + } + + s_impl = mt_rte_zmalloc_socket(sizeof(*s_impl), socket); if (!s_impl) { - err("%s, s_impl malloc fail\n", __func__); + err("%s, s_impl malloc fail on socket %d\n", __func__, socket); return NULL; } quota_mbs = impl->main_sch->data_quota_mbs_limit / impl->rx_audio_sessions_max_per_sch; - sch = mt_sch_get(impl, quota_mbs, MT_SCH_TYPE_DEFAULT, MT_SCH_MASK_ALL); + sch = + mt_sch_get_by_socket(impl, quota_mbs, MT_SCH_TYPE_DEFAULT, MT_SCH_MASK_ALL, socket); if (!sch) { mt_rte_free(s_impl); err("%s, get sch fail\n", __func__); @@ -1280,7 +1287,7 @@ st30_rx_handle st30_rx_create(mtl_handle mt, struct st30_rx_ops* ops) { } mt_pthread_mutex_lock(&sch->rx_a_mgr_mutex); - s = rx_audio_sessions_mgr_attach(&sch->rx_a_mgr, ops); + s = rx_audio_sessions_mgr_attach(sch, ops); mt_pthread_mutex_unlock(&sch->rx_a_mgr_mutex); if (!s) { err("%s(%d), rx_audio_sessions_mgr_attach fail\n", __func__, sch->idx); diff --git a/lib/src/st2110/st_tx_ancillary_session.c b/lib/src/st2110/st_tx_ancillary_session.c index aeb141cc..c3c7cc82 100644 --- a/lib/src/st2110/st_tx_ancillary_session.c +++ b/lib/src/st2110/st_tx_ancillary_session.c @@ -76,10 +76,8 @@ static int tx_ancillary_session_free_frames(struct st_tx_ancillary_session_impl* return 0; } -static int tx_ancillary_session_alloc_frames(struct mtl_main_impl* impl, - struct st_tx_ancillary_session_impl* s) { - enum mtl_port port = mt_port_logic2phy(s->port_maps, MTL_SESSION_PORT_P); - int soc_id = mt_socket_id(impl, port); +static int tx_ancillary_session_alloc_frames(struct st_tx_ancillary_session_impl* s) { + int soc_id = s->socket_id; int idx = s->idx; struct st_frame_trans* frame_info; @@ -1170,7 +1168,7 @@ static int tx_ancillary_sessions_mgr_init_hw(struct mtl_main_impl* impl, snprintf(ring_name, 32, "%sM%dP%d", ST_TX_ANCILLARY_PREFIX, mgr_idx, port); flags = RING_F_MP_HTS_ENQ | RING_F_SC_DEQ; /* multi-producer and single-consumer */ count = ST_TX_ANC_SESSIONS_RING_SIZE; - ring = rte_ring_create(ring_name, count, mt_socket_id(impl, port), flags); + ring = rte_ring_create(ring_name, count, mgr->socket_id, flags); if (!ring) { err("%s(%d), rte_ring_create fail for port %d\n", __func__, mgr_idx, port); tx_ancillary_sessions_mgr_uinit_hw(mgr, port); @@ -1298,9 +1296,9 @@ static int tx_ancillary_session_mempool_init(struct mtl_main_impl* impl, char pool_name[32]; snprintf(pool_name, 32, "%sM%dS%dP%d_HDR", ST_TX_ANCILLARY_PREFIX, mgr->idx, idx, i); - struct rte_mempool* mbuf_pool = - mt_mempool_create(impl, port, pool_name, n, MT_MBUF_CACHE_SIZE, - sizeof(struct mt_muf_priv_data), hdr_room_size); + struct rte_mempool* mbuf_pool = mt_mempool_create_by_socket( + impl, pool_name, n, MT_MBUF_CACHE_SIZE, sizeof(struct mt_muf_priv_data), + hdr_room_size, s->socket_id); if (!mbuf_pool) { tx_ancillary_session_mempool_free(s); return -ENOMEM; @@ -1324,9 +1322,9 @@ static int tx_ancillary_session_mempool_init(struct mtl_main_impl* impl, } else { char pool_name[32]; snprintf(pool_name, 32, "%sM%dS%d_CHAIN", ST_TX_ANCILLARY_PREFIX, mgr->idx, idx); - struct rte_mempool* mbuf_pool = - mt_mempool_create(impl, port, pool_name, n, MT_MBUF_CACHE_SIZE, - sizeof(struct mt_muf_priv_data), chain_room_size); + struct rte_mempool* mbuf_pool = mt_mempool_create_by_socket( + impl, pool_name, n, MT_MBUF_CACHE_SIZE, sizeof(struct mt_muf_priv_data), + chain_room_size, s->socket_id); if (!mbuf_pool) { tx_ancillary_session_mempool_free(s); return -ENOMEM; @@ -1338,18 +1336,16 @@ static int tx_ancillary_session_mempool_init(struct mtl_main_impl* impl, return 0; } -static int tx_ancillary_session_init_rtp(struct mtl_main_impl* impl, - struct st_tx_ancillary_sessions_mgr* mgr, +static int tx_ancillary_session_init_rtp(struct st_tx_ancillary_sessions_mgr* mgr, struct st_tx_ancillary_session_impl* s) { char ring_name[32]; struct rte_ring* ring; unsigned int flags, count = s->ops.rtp_ring_size; int mgr_idx = mgr->idx, idx = s->idx; - enum mtl_port port = mt_port_logic2phy(s->port_maps, MTL_SESSION_PORT_P); snprintf(ring_name, 32, "%sM%dS%d_PKT", ST_TX_ANCILLARY_PREFIX, mgr_idx, idx); flags = RING_F_SP_ENQ | RING_F_SC_DEQ; /* single-producer and single-consumer */ - ring = rte_ring_create(ring_name, count, mt_socket_id(impl, port), flags); + ring = rte_ring_create(ring_name, count, s->socket_id, flags); if (!ring) { err("%s(%d,%d), rte_ring_create fail\n", __func__, mgr_idx, idx); tx_ancillary_session_mempool_free(s); @@ -1402,9 +1398,9 @@ static int tx_ancillary_session_init_sw(struct mtl_main_impl* impl, } if (ops->type == ST40_TYPE_RTP_LEVEL) { - ret = tx_ancillary_session_init_rtp(impl, mgr, s); + ret = tx_ancillary_session_init_rtp(mgr, s); } else { - ret = tx_ancillary_session_alloc_frames(impl, s); + ret = tx_ancillary_session_alloc_frames(s); } if (ret < 0) { err("%s(%d), fail %d\n", __func__, idx, ret); @@ -1752,6 +1748,7 @@ static int tx_ancillary_sessions_mgr_init(struct mtl_main_impl* impl, mgr->parent = impl; mgr->idx = idx; + mgr->socket_id = mt_sch_socket_id(sch); for (i = 0; i < ST_MAX_TX_ANC_SESSIONS; i++) { rte_spinlock_init(&mgr->mutex[i]); @@ -1775,22 +1772,24 @@ static int tx_ancillary_sessions_mgr_init(struct mtl_main_impl* impl, } static struct st_tx_ancillary_session_impl* tx_ancillary_sessions_mgr_attach( - struct st_tx_ancillary_sessions_mgr* mgr, struct st40_tx_ops* ops) { + struct mtl_sch_impl* sch, struct st40_tx_ops* ops) { + struct st_tx_ancillary_sessions_mgr* mgr = &sch->tx_anc_mgr; int midx = mgr->idx; - struct mtl_main_impl* impl = mgr->parent; int ret; struct st_tx_ancillary_session_impl* s; + int socket = mt_sch_socket_id(sch); /* find one empty slot in the mgr */ for (int i = 0; i < ST_MAX_TX_ANC_SESSIONS; i++) { if (!tx_ancillary_session_get_empty(mgr, i)) continue; - s = mt_rte_zmalloc_socket(sizeof(*s), mt_socket_id(impl, MTL_PORT_P)); + s = mt_rte_zmalloc_socket(sizeof(*s), socket); if (!s) { err("%s(%d), session malloc fail on %d\n", __func__, midx, i); tx_ancillary_session_put(mgr, i); return NULL; } + s->socket_id = socket; ret = tx_ancillary_session_init(mgr, s, i); if (ret < 0) { err("%s(%d), init fail on %d\n", __func__, midx, i); @@ -1982,14 +1981,19 @@ st40_tx_handle st40_tx_create(mtl_handle mt, struct st40_tx_ops* ops) { return NULL; } - s_impl = mt_rte_zmalloc_socket(sizeof(*s_impl), mt_socket_id(impl, MTL_PORT_P)); + enum mtl_port port = mt_port_by_name(impl, ops->port[MTL_SESSION_PORT_P]); + if (port >= MTL_PORT_MAX) return NULL; + int socket = mt_socket_id(impl, port); + + s_impl = mt_rte_zmalloc_socket(sizeof(*s_impl), socket); if (!s_impl) { - err("%s, s_impl malloc fail\n", __func__); + err("%s, s_impl malloc fail on socket %d\n", __func__, socket); return NULL; } quota_mbs = 0; - sch = mt_sch_get(impl, quota_mbs, MT_SCH_TYPE_DEFAULT, MT_SCH_MASK_ALL); + sch = + mt_sch_get_by_socket(impl, quota_mbs, MT_SCH_TYPE_DEFAULT, MT_SCH_MASK_ALL, socket); if (!sch) { mt_rte_free(s_impl); err("%s, get sch fail\n", __func__); @@ -2007,7 +2011,7 @@ st40_tx_handle st40_tx_create(mtl_handle mt, struct st40_tx_ops* ops) { } mt_pthread_mutex_lock(&sch->tx_anc_mgr_mutex); - s = tx_ancillary_sessions_mgr_attach(&sch->tx_anc_mgr, ops); + s = tx_ancillary_sessions_mgr_attach(sch, ops); mt_pthread_mutex_unlock(&sch->tx_anc_mgr_mutex); if (!s) { err("%s, tx_ancillary_sessions_mgr_attach fail\n", __func__); diff --git a/lib/src/st2110/st_tx_audio_session.c b/lib/src/st2110/st_tx_audio_session.c index 71e78676..368c9224 100644 --- a/lib/src/st2110/st_tx_audio_session.c +++ b/lib/src/st2110/st_tx_audio_session.c @@ -75,10 +75,8 @@ static int tx_audio_session_free_frames(struct st_tx_audio_session_impl* s) { return 0; } -static int tx_audio_session_alloc_frames(struct mtl_main_impl* impl, - struct st_tx_audio_session_impl* s) { - enum mtl_port port = mt_port_logic2phy(s->port_maps, MTL_SESSION_PORT_P); - int soc_id = mt_socket_id(impl, port); +static int tx_audio_session_alloc_frames(struct st_tx_audio_session_impl* s) { + int soc_id = s->socket_id; int idx = s->idx; struct st_frame_trans* frame_info; @@ -1602,7 +1600,7 @@ static int tx_audio_sessions_mgr_init_hw(struct mtl_main_impl* impl, snprintf(ring_name, 32, "%sM%dP%d", ST_TX_AUDIO_PREFIX, mgr_idx, port); flags = RING_F_MP_HTS_ENQ | RING_F_SC_DEQ; /* multi-producer and single-consumer */ count = ST_TX_AUDIO_SESSIONS_RING_SIZE; - ring = rte_ring_create(ring_name, count, mt_socket_id(impl, port), flags); + ring = rte_ring_create(ring_name, count, mgr->socket_id, flags); if (!ring) { err("%s(%d), rte_ring_create fail for port %d\n", __func__, mgr_idx, port); tx_audio_sessions_mgr_uinit_hw(mgr, port); @@ -1732,9 +1730,9 @@ static int tx_audio_session_mempool_init(struct mtl_main_impl* impl, char pool_name[32]; snprintf(pool_name, 32, "%sM%dS%dP%d_HDR_%d", ST_TX_AUDIO_PREFIX, mgr->idx, idx, i, s->recovery_idx); - struct rte_mempool* mbuf_pool = - mt_mempool_create(impl, port, pool_name, n, MT_MBUF_CACHE_SIZE, - sizeof(struct mt_muf_priv_data), hdr_room_size); + struct rte_mempool* mbuf_pool = mt_mempool_create_by_socket( + impl, pool_name, n, MT_MBUF_CACHE_SIZE, sizeof(struct mt_muf_priv_data), + hdr_room_size, s->socket_id); if (!mbuf_pool) { tx_audio_session_mempool_free(s); return -ENOMEM; @@ -1759,8 +1757,8 @@ static int tx_audio_session_mempool_init(struct mtl_main_impl* impl, char pool_name[32]; snprintf(pool_name, 32, "%sM%dS%d_CHAIN_%d", ST_TX_AUDIO_PREFIX, mgr->idx, idx, s->recovery_idx); - struct rte_mempool* mbuf_pool = mt_mempool_create( - impl, port, pool_name, n, MT_MBUF_CACHE_SIZE, 0, chain_room_size); + struct rte_mempool* mbuf_pool = mt_mempool_create_by_socket( + impl, pool_name, n, MT_MBUF_CACHE_SIZE, 0, chain_room_size, s->socket_id); if (!mbuf_pool) { tx_audio_session_mempool_free(s); return -ENOMEM; @@ -1772,18 +1770,16 @@ static int tx_audio_session_mempool_init(struct mtl_main_impl* impl, return 0; } -static int tx_audio_session_init_rtp(struct mtl_main_impl* impl, - struct st_tx_audio_sessions_mgr* mgr, +static int tx_audio_session_init_rtp(struct st_tx_audio_sessions_mgr* mgr, struct st_tx_audio_session_impl* s) { char ring_name[32]; struct rte_ring* ring; unsigned int flags, count = s->ops.rtp_ring_size; int mgr_idx = mgr->idx, idx = s->idx; - enum mtl_port port = mt_port_logic2phy(s->port_maps, MTL_SESSION_PORT_P); snprintf(ring_name, 32, "%sM%dS%d_PKT", ST_TX_AUDIO_PREFIX, mgr_idx, idx); flags = RING_F_SP_ENQ | RING_F_SC_DEQ; /* single-producer and single-consumer */ - ring = rte_ring_create(ring_name, count, mt_socket_id(impl, port), flags); + ring = rte_ring_create(ring_name, count, s->socket_id, flags); if (!ring) { err("%s(%d,%d), rte_ring_create fail\n", __func__, mgr_idx, idx); return -ENOMEM; @@ -1806,8 +1802,7 @@ static int tx_audio_session_uinit_trans_ring(struct st_tx_audio_session_impl* s) return 0; } -static int tx_audio_session_init_trans_ring(struct mtl_main_impl* impl, - struct st_tx_audio_sessions_mgr* mgr, +static int tx_audio_session_init_trans_ring(struct st_tx_audio_sessions_mgr* mgr, struct st_tx_audio_session_impl* s) { struct mt_u64_fifo* ring; unsigned int count = ST_TX_AUDIO_SESSIONS_RING_SIZE; @@ -1821,7 +1816,7 @@ static int tx_audio_session_init_trans_ring(struct mtl_main_impl* impl, } for (int port = 0; port < num_port; port++) { - ring = mt_u64_fifo_init(count, mt_socket_id(impl, port)); + ring = mt_u64_fifo_init(count, s->socket_id); if (!ring) { err("%s(%d,%d), mt_u64_fifo_init fail\n", __func__, mgr_idx, idx); tx_audio_session_uinit_trans_ring(s); @@ -1949,7 +1944,7 @@ static int tx_audio_session_init_sw(struct mtl_main_impl* impl, return ret; } - ret = tx_audio_session_init_trans_ring(impl, mgr, s); + ret = tx_audio_session_init_trans_ring(mgr, s); if (ret < 0) { err("%s(%d), mbuf ring init fail %d\n", __func__, idx, ret); tx_audio_session_uinit_sw(mgr, s); @@ -1957,9 +1952,9 @@ static int tx_audio_session_init_sw(struct mtl_main_impl* impl, } if (ops->type == ST30_TYPE_RTP_LEVEL) { - ret = tx_audio_session_init_rtp(impl, mgr, s); + ret = tx_audio_session_init_rtp(mgr, s); } else { - ret = tx_audio_session_alloc_frames(impl, s); + ret = tx_audio_session_alloc_frames(s); } if (ret < 0) { err("%s(%d), mode init fail %d\n", __func__, idx, ret); @@ -2389,6 +2384,7 @@ static int tx_audio_sessions_mgr_init(struct mtl_main_impl* impl, mgr->parent = impl; mgr->idx = idx; + mgr->socket_id = mt_sch_socket_id(sch); mgr->tx_hang_detect_time_thresh = NS_PER_S; for (i = 0; i < ST_SCH_MAX_TX_AUDIO_SESSIONS; i++) { @@ -2413,22 +2409,24 @@ static int tx_audio_sessions_mgr_init(struct mtl_main_impl* impl, } static struct st_tx_audio_session_impl* tx_audio_sessions_mgr_attach( - struct st_tx_audio_sessions_mgr* mgr, struct st30_tx_ops* ops) { + struct mtl_sch_impl* sch, struct st30_tx_ops* ops) { + struct st_tx_audio_sessions_mgr* mgr = &sch->tx_a_mgr; int midx = mgr->idx; - struct mtl_main_impl* impl = mgr->parent; int ret; struct st_tx_audio_session_impl* s; + int socket = mt_sch_socket_id(sch); /* find one empty slot in the mgr */ for (int i = 0; i < ST_SCH_MAX_TX_AUDIO_SESSIONS; i++) { if (!tx_audio_session_get_empty(mgr, i)) continue; - s = mt_rte_zmalloc_socket(sizeof(*s), mt_socket_id(impl, MTL_PORT_P)); + s = mt_rte_zmalloc_socket(sizeof(*s), socket); if (!s) { err("%s(%d), session malloc fail on %d\n", __func__, midx, i); tx_audio_session_put(mgr, i); return NULL; } + s->socket_id = socket; ret = tx_audio_session_init(mgr, s, i); if (ret < 0) { err("%s(%d), init fail on %d\n", __func__, midx, i); @@ -2685,14 +2683,24 @@ st30_tx_handle st30_tx_create(mtl_handle mt, struct st30_tx_ops* ops) { return NULL; } - s_impl = mt_rte_zmalloc_socket(sizeof(*s_impl), mt_socket_id(impl, MTL_PORT_P)); + enum mtl_port port = mt_port_by_name(impl, ops->port[MTL_SESSION_PORT_P]); + if (port >= MTL_PORT_MAX) return NULL; + int socket = mt_socket_id(impl, port); + + if (ops->flags & ST30_TX_FLAG_FORCE_NUMA) { + socket = ops->socket_id; + info("%s, ST30_TX_FLAG_FORCE_NUMA to socket %d\n", __func__, socket); + } + + s_impl = mt_rte_zmalloc_socket(sizeof(*s_impl), socket); if (!s_impl) { - err("%s, s_impl malloc fail\n", __func__); + err("%s, s_impl malloc fail on socket %d\n", __func__, socket); return NULL; } quota_mbs = impl->main_sch->data_quota_mbs_limit / impl->tx_audio_sessions_max_per_sch; - sch = mt_sch_get(impl, quota_mbs, MT_SCH_TYPE_DEFAULT, MT_SCH_MASK_ALL); + sch = + mt_sch_get_by_socket(impl, quota_mbs, MT_SCH_TYPE_DEFAULT, MT_SCH_MASK_ALL, socket); if (!sch) { mt_rte_free(s_impl); err("%s, get sch fail\n", __func__); @@ -2710,7 +2718,7 @@ st30_tx_handle st30_tx_create(mtl_handle mt, struct st30_tx_ops* ops) { } mt_pthread_mutex_lock(&sch->tx_a_mgr_mutex); - s = tx_audio_sessions_mgr_attach(&sch->tx_a_mgr, ops); + s = tx_audio_sessions_mgr_attach(sch, ops); mt_pthread_mutex_unlock(&sch->tx_a_mgr_mutex); if (!s) { err("%s, tx_audio_sessions_mgr_attach fail\n", __func__);