diff --git a/lib/src/mt_admin.c b/lib/src/mt_admin.c index 9156bf4a..3cbda76d 100644 --- a/lib/src/mt_admin.c +++ b/lib/src/mt_admin.c @@ -177,8 +177,8 @@ static int admin_tx_video_migrate(struct mtl_main_impl* impl, bool* migrated) { } dbg("%s, find one busy session(%d,%d)\n", __func__, from_sch->idx, busy_s->idx); - struct mtl_sch_impl* to_sch = - mt_sch_get(impl, quota_mbs, from_sch->type, MT_SCH_MASK_ALL); + struct mtl_sch_impl* to_sch = mt_sch_get_by_socket(impl, quota_mbs, from_sch->type, + MT_SCH_MASK_ALL, from_sch->socket); if (!to_sch) { err("%s, no idle sch for session(%d,%d)\n", __func__, from_sch->idx, busy_s->idx); return -EIO; @@ -294,8 +294,8 @@ static int admin_rx_video_migrate(struct mtl_main_impl* impl, bool* migrated) { } dbg("%s, find one busy session(%d,%d)\n", __func__, from_sch->idx, busy_s->idx); - struct mtl_sch_impl* to_sch = - mt_sch_get(impl, quota_mbs, from_sch->type, MT_SCH_MASK_ALL); + struct mtl_sch_impl* to_sch = mt_sch_get_by_socket(impl, quota_mbs, from_sch->type, + MT_SCH_MASK_ALL, from_sch->type); if (!to_sch) { err("%s, no idle sch for session(%d,%d)\n", __func__, from_sch->idx, busy_s->idx); return -EIO; diff --git a/lib/src/mt_main.c b/lib/src/mt_main.c index 98923b80..591e5f68 100644 --- a/lib/src/mt_main.c +++ b/lib/src/mt_main.c @@ -690,7 +690,8 @@ int mtl_get_lcore(mtl_handle mt, unsigned int* lcore) { return -EIO; } - return mt_sch_get_lcore(impl, lcore, MT_LCORE_TYPE_USER); + return mt_sch_get_lcore(impl, lcore, MT_LCORE_TYPE_USER, + mt_socket_id(impl, MTL_PORT_P)); } int mtl_put_lcore(mtl_handle mt, unsigned int lcore) { diff --git a/lib/src/mt_main.h b/lib/src/mt_main.h index b000caa2..119ab828 100644 --- a/lib/src/mt_main.h +++ b/lib/src/mt_main.h @@ -502,6 +502,7 @@ struct mtl_sch_impl { /* max tasklet index */ volatile int max_tasklet_idx; unsigned int lcore; + int socket; bool run_in_thread; /* Run the tasklet inside one thread instead of a pinned lcore. */ pthread_t tid; /* thread id for run_in_thread */ int t_pid; /* gettid */ diff --git a/lib/src/mt_sch.c b/lib/src/mt_sch.c index ac49bcf2..57699500 100644 --- a/lib/src/mt_sch.c +++ b/lib/src/mt_sch.c @@ -236,7 +236,8 @@ static int sch_start(struct mtl_sch_impl* sch) { if (!sch->run_in_thread) { ret = mt_sch_get_lcore( sch->parent, &sch->lcore, - (sch->type == MT_SCH_TYPE_APP) ? MT_LCORE_TYPE_SCH_USER : MT_LCORE_TYPE_SCH); + (sch->type == MT_SCH_TYPE_APP) ? MT_LCORE_TYPE_SCH_USER : MT_LCORE_TYPE_SCH, + sch->socket); if (ret < 0) { err("%s(%d), get lcore fail %d\n", __func__, idx, ret); sch_unlock(sch); @@ -254,7 +255,7 @@ static int sch_start(struct mtl_sch_impl* sch) { rte_atomic32_set(&sch->started, 1); if (!sch->run_in_thread) - info("%s(%d), succ on lcore %u\n", __func__, idx, sch->lcore); + info("%s(%d), succ on lcore %u socket %d\n", __func__, idx, sch->lcore, sch->socket); else info("%s(%d), succ on tid %" PRIu64 "\n", __func__, idx, sch->tid); sch_unlock(sch); @@ -292,7 +293,8 @@ static int sch_stop(struct mtl_sch_impl* sch) { } static struct mtl_sch_impl* sch_request(struct mtl_main_impl* impl, enum mt_sch_type type, - mt_sch_mask_t mask, struct mtl_sch_ops* ops) { + mt_sch_mask_t mask, struct mtl_sch_ops* ops, + int socket) { struct mtl_sch_impl* sch; for (int sch_idx = 0; sch_idx < MT_MAX_SCH_NUM; sch_idx++) { @@ -313,8 +315,8 @@ static struct mtl_sch_impl* sch_request(struct mtl_main_impl* impl, enum mt_sch_ sch->nb_tasklets = ops->nb_tasklets; else sch->nb_tasklets = impl->tasklets_nb_per_sch; - sch->tasklet = mt_rte_zmalloc_socket(sizeof(*sch->tasklet) * sch->nb_tasklets, - mt_socket_id(impl, MTL_PORT_P)); + sch->tasklet = + mt_rte_zmalloc_socket(sizeof(*sch->tasklet) * sch->nb_tasklets, socket); if (!sch->tasklet) { err("%s(%d), %u tasklet malloc fail\n", __func__, sch_idx, sch->nb_tasklets); sch_unlock(sch); @@ -323,8 +325,8 @@ static struct mtl_sch_impl* sch_request(struct mtl_main_impl* impl, enum mt_sch_ rte_atomic32_inc(&sch->active); rte_atomic32_inc(&mt_sch_get_mgr(impl)->sch_cnt); sch_unlock(sch); - info("%s(%d), name %s with %u tasklets, type %d\n", __func__, sch_idx, sch->name, - sch->nb_tasklets, type); + info("%s(%d), name %s with %u tasklets, type %d socket %d\n", __func__, sch_idx, + sch->name, sch->nb_tasklets, type, socket); return sch; } sch_unlock(sch); @@ -634,7 +636,7 @@ static inline bool sch_socket_match(int cpu_socket, int dev_socket, } int mt_sch_get_lcore(struct mtl_main_impl* impl, unsigned int* lcore, - enum mt_lcore_type type) { + enum mt_lcore_type type, int socket) { unsigned int cur_lcore; int ret; bool skip_numa_check = false; @@ -648,8 +650,7 @@ int mt_sch_get_lcore(struct mtl_main_impl* impl, unsigned int* lcore, do { cur_lcore = rte_get_next_lcore(cur_lcore, 1, 0); if ((cur_lcore < RTE_MAX_LCORE) && - sch_socket_match(rte_lcore_to_socket_id(cur_lcore), - mt_socket_id(impl, MTL_PORT_P), skip_numa_check)) { + sch_socket_match(rte_lcore_to_socket_id(cur_lcore), socket, skip_numa_check)) { ret = mt_instance_get_lcore(impl, cur_lcore); if (ret == 0) { *lcore = cur_lcore; @@ -657,8 +658,8 @@ int mt_sch_get_lcore(struct mtl_main_impl* impl, unsigned int* lcore, /* set local lcores info */ mgr->local_lcores_active[cur_lcore] = true; mgr->local_lcores_type[cur_lcore] = type; - info("%s, succ on manager lcore %d for %s\n", __func__, cur_lcore, - lcore_type_name(type)); + info("%s, succ on manager lcore %d for %s socket %d\n", __func__, cur_lcore, + lcore_type_name(type), socket); return 0; } } @@ -680,8 +681,7 @@ int mt_sch_get_lcore(struct mtl_main_impl* impl, unsigned int* lcore, shm_entry = &lcore_shm->lcores_info[cur_lcore]; if ((cur_lcore < RTE_MAX_LCORE) && - sch_socket_match(rte_lcore_to_socket_id(cur_lcore), - mt_socket_id(impl, MTL_PORT_P), skip_numa_check)) { + sch_socket_match(rte_lcore_to_socket_id(cur_lcore), socket, skip_numa_check)) { if (!shm_entry->active) { *lcore = cur_lcore; shm_entry->active = true; @@ -697,8 +697,8 @@ int mt_sch_get_lcore(struct mtl_main_impl* impl, unsigned int* lcore, mgr->local_lcores_active[cur_lcore] = true; mgr->local_lcores_type[cur_lcore] = type; ret = sch_filelock_unlock(mgr); - info("%s, succ on shm lcore %d for %s\n", __func__, cur_lcore, - lcore_type_name(type)); + info("%s, succ on shm lcore %d for %s socket %d\n", __func__, cur_lcore, + lcore_type_name(type), socket); if (ret < 0) { err("%s, sch_filelock_unlock fail\n", __func__); return ret; @@ -713,7 +713,7 @@ int mt_sch_get_lcore(struct mtl_main_impl* impl, unsigned int* lcore, if (!skip_numa_check && mt_user_across_numa_core(impl)) { warn("%s, can't find available lcore from socket %d, try with other numa cpu\n", - __func__, mt_socket_id(impl, MTL_PORT_P)); + __func__, socket); skip_numa_check = true; goto again; } @@ -854,7 +854,6 @@ int mtl_sch_unregister_tasklet(mtl_tasklet_handle tasklet) { mtl_tasklet_handle mtl_sch_register_tasklet(struct mtl_sch_impl* sch, struct mtl_tasklet_ops* tasklet_ops) { int idx = sch->idx; - struct mtl_main_impl* impl = sch->parent; struct mt_sch_tasklet_impl* tasklet; sch_lock(sch); @@ -864,7 +863,7 @@ mtl_tasklet_handle mtl_sch_register_tasklet(struct mtl_sch_impl* sch, if (sch->tasklet[i]) continue; /* find one empty tasklet slot */ - tasklet = mt_rte_zmalloc_socket(sizeof(*tasklet), mt_socket_id(impl, MTL_PORT_P)); + tasklet = mt_rte_zmalloc_socket(sizeof(*tasklet), sch->socket); if (!tasklet) { err("%s(%d), tasklet malloc fail on %d\n", __func__, idx, i); sch_unlock(sch); @@ -1052,8 +1051,9 @@ int mt_sch_put(struct mtl_sch_impl* sch, int quota_mbs) { return 0; } -struct mtl_sch_impl* mt_sch_get(struct mtl_main_impl* impl, int quota_mbs, - enum mt_sch_type type, mt_sch_mask_t mask) { +struct mtl_sch_impl* mt_sch_get_by_socket(struct mtl_main_impl* impl, int quota_mbs, + enum mt_sch_type type, mt_sch_mask_t mask, + int socket) { int ret, idx; struct mtl_sch_impl* sch; struct mt_sch_mgr* mgr = mt_sch_get_mgr(impl); @@ -1063,6 +1063,7 @@ struct mtl_sch_impl* mt_sch_get(struct mtl_main_impl* impl, int quota_mbs, /* first try to find one sch capable with quota */ for (idx = 0; idx < MT_MAX_SCH_NUM; idx++) { sch = mt_sch_instance(impl, idx); + if (socket != sch->socket) continue; /* mask check */ if (!(mask & MTL_BIT64(idx))) continue; /* active and busy check */ @@ -1071,7 +1072,8 @@ struct mtl_sch_impl* mt_sch_get(struct mtl_main_impl* impl, int quota_mbs, if (!sch_is_capable(sch, quota_mbs, type)) continue; ret = mt_sch_add_quota(sch, quota_mbs); if (ret >= 0) { - info("%s(%d), succ with quota_mbs %d\n", __func__, idx, quota_mbs); + info("%s(%d), succ with quota_mbs %d socket %d\n", __func__, idx, quota_mbs, + socket); rte_atomic32_inc(&sch->ref_cnt); sch_mgr_unlock(mgr); return sch; @@ -1079,12 +1081,14 @@ struct mtl_sch_impl* mt_sch_get(struct mtl_main_impl* impl, int quota_mbs, } /* no quota, try to create one */ - sch = sch_request(impl, type, mask, NULL); + sch = sch_request(impl, type, mask, NULL, socket); if (!sch) { err("%s, no free sch\n", __func__); sch_mgr_unlock(mgr); return NULL; } + /* set the socket id */ + sch->socket = socket; idx = sch->idx; ret = mt_sch_add_quota(sch, quota_mbs); if (ret < 0) { @@ -1316,7 +1320,9 @@ mtl_sch_handle mtl_sch_create(mtl_handle mt, struct mtl_sch_ops* ops) { return NULL; } - struct mtl_sch_impl* sch = sch_request(impl, MT_SCH_TYPE_APP, MT_SCH_MASK_ALL, ops); + /* request sch on the MTL_PORT_P socket */ + struct mtl_sch_impl* sch = sch_request(impl, MT_SCH_TYPE_APP, MT_SCH_MASK_ALL, ops, + mt_socket_id(impl, MTL_PORT_P)); if (!sch) { err("%s, sch request fail\n", __func__); return NULL; diff --git a/lib/src/mt_sch.h b/lib/src/mt_sch.h index c6413174..43b819b3 100644 --- a/lib/src/mt_sch.h +++ b/lib/src/mt_sch.h @@ -50,8 +50,15 @@ static inline void mt_tasklet_set_sleep(struct mt_sch_tasklet_impl* tasklet, int mt_sch_add_quota(struct mtl_sch_impl* sch, int quota_mbs); -struct mtl_sch_impl* mt_sch_get(struct mtl_main_impl* impl, int quota_mbs, - enum mt_sch_type type, mt_sch_mask_t mask); +struct mtl_sch_impl* mt_sch_get_by_socket(struct mtl_main_impl* impl, int quota_mbs, + enum mt_sch_type type, mt_sch_mask_t mask, + int socket); +static inline struct mtl_sch_impl* mt_sch_get(struct mtl_main_impl* impl, int quota_mbs, + enum mt_sch_type type, mt_sch_mask_t mask) { + /* use the default socket of MTL_PORT_P */ + return mt_sch_get_by_socket(impl, quota_mbs, type, mask, + mt_socket_id(impl, MTL_PORT_P)); +} int mt_sch_put(struct mtl_sch_impl* sch, int quota_mbs); int mt_sch_start_all(struct mtl_main_impl* impl); @@ -67,7 +74,7 @@ static inline uint64_t mt_sch_avg_ns_loop(struct mtl_sch_impl* sch) { int mt_sch_put_lcore(struct mtl_main_impl* impl, unsigned int lcore); int mt_sch_get_lcore(struct mtl_main_impl* impl, unsigned int* lcore, - enum mt_lcore_type type); + enum mt_lcore_type type, int socket); bool mt_sch_lcore_valid(struct mtl_main_impl* impl, unsigned int lcore); #endif diff --git a/lib/src/mt_tap.c b/lib/src/mt_tap.c index 9a1a359e..5502c9a4 100644 --- a/lib/src/mt_tap.c +++ b/lib/src/mt_tap.c @@ -900,7 +900,7 @@ int mt_tap_init(struct mtl_main_impl* impl) { rte_atomic32_set(&cni->stop_tap, 0); tap_ctx->has_lcore = false; - ret = mt_sch_get_lcore(impl, &lcore, MT_LCORE_TYPE_TAP); + ret = mt_sch_get_lcore(impl, &lcore, MT_LCORE_TYPE_TAP, mt_socket_id(impl, MTL_PORT_P)); if (ret < 0) { err("%s, get lcore fail %d\n", __func__, ret); mt_tap_uinit(impl); diff --git a/lib/src/st2110/st_rx_video_session.c b/lib/src/st2110/st_rx_video_session.c index f345bdd8..2609393c 100644 --- a/lib/src/st2110/st_rx_video_session.c +++ b/lib/src/st2110/st_rx_video_session.c @@ -2286,7 +2286,8 @@ static int rv_init_pkt_lcore(struct mtl_main_impl* impl, } s->pkt_lcore_ring = ring; - ret = mt_sch_get_lcore(impl, &lcore, MT_LCORE_TYPE_RXV_RING_LCORE); + ret = mt_sch_get_lcore(impl, &lcore, MT_LCORE_TYPE_RXV_RING_LCORE, + mt_socket_id(impl, port)); if (ret < 0) { err("%s(%d,%d), get lcore fail %d\n", __func__, mgr_idx, idx, ret); rv_uinit_pkt_lcore(impl, s);