Skip to content

Commit

Permalink
sch: add socket support (#893)
Browse files Browse the repository at this point in the history
Sessions can create a sch on any socket by mt_sch_get_by_socket.

Signed-off-by: Frank Du <frank.du@intel.com>
  • Loading branch information
frankdjx committed Jun 6, 2024
1 parent 8a5893b commit afa6ed1
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 34 deletions.
8 changes: 4 additions & 4 deletions lib/src/mt_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ static int admin_tx_video_migrate(struct mtl_main_impl* impl, bool* migrated) {
}

dbg("%s, find one busy session(%d,%d)\n", __func__, from_sch->idx, busy_s->idx);
struct mtl_sch_impl* to_sch =
mt_sch_get(impl, quota_mbs, from_sch->type, MT_SCH_MASK_ALL);
struct mtl_sch_impl* to_sch = mt_sch_get_by_socket(impl, quota_mbs, from_sch->type,
MT_SCH_MASK_ALL, from_sch->socket);
if (!to_sch) {
err("%s, no idle sch for session(%d,%d)\n", __func__, from_sch->idx, busy_s->idx);
return -EIO;
Expand Down Expand Up @@ -294,8 +294,8 @@ static int admin_rx_video_migrate(struct mtl_main_impl* impl, bool* migrated) {
}

dbg("%s, find one busy session(%d,%d)\n", __func__, from_sch->idx, busy_s->idx);
struct mtl_sch_impl* to_sch =
mt_sch_get(impl, quota_mbs, from_sch->type, MT_SCH_MASK_ALL);
struct mtl_sch_impl* to_sch = mt_sch_get_by_socket(impl, quota_mbs, from_sch->type,
MT_SCH_MASK_ALL, from_sch->type);
if (!to_sch) {
err("%s, no idle sch for session(%d,%d)\n", __func__, from_sch->idx, busy_s->idx);
return -EIO;
Expand Down
3 changes: 2 additions & 1 deletion lib/src/mt_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,8 @@ int mtl_get_lcore(mtl_handle mt, unsigned int* lcore) {
return -EIO;
}

return mt_sch_get_lcore(impl, lcore, MT_LCORE_TYPE_USER);
return mt_sch_get_lcore(impl, lcore, MT_LCORE_TYPE_USER,
mt_socket_id(impl, MTL_PORT_P));
}

int mtl_put_lcore(mtl_handle mt, unsigned int lcore) {
Expand Down
1 change: 1 addition & 0 deletions lib/src/mt_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ struct mtl_sch_impl {
/* max tasklet index */
volatile int max_tasklet_idx;
unsigned int lcore;
int socket;
bool run_in_thread; /* Run the tasklet inside one thread instead of a pinned lcore. */
pthread_t tid; /* thread id for run_in_thread */
int t_pid; /* gettid */
Expand Down
54 changes: 30 additions & 24 deletions lib/src/mt_sch.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ static int sch_start(struct mtl_sch_impl* sch) {
if (!sch->run_in_thread) {
ret = mt_sch_get_lcore(
sch->parent, &sch->lcore,
(sch->type == MT_SCH_TYPE_APP) ? MT_LCORE_TYPE_SCH_USER : MT_LCORE_TYPE_SCH);
(sch->type == MT_SCH_TYPE_APP) ? MT_LCORE_TYPE_SCH_USER : MT_LCORE_TYPE_SCH,
sch->socket);
if (ret < 0) {
err("%s(%d), get lcore fail %d\n", __func__, idx, ret);
sch_unlock(sch);
Expand All @@ -254,7 +255,7 @@ static int sch_start(struct mtl_sch_impl* sch) {

rte_atomic32_set(&sch->started, 1);
if (!sch->run_in_thread)
info("%s(%d), succ on lcore %u\n", __func__, idx, sch->lcore);
info("%s(%d), succ on lcore %u socket %d\n", __func__, idx, sch->lcore, sch->socket);
else
info("%s(%d), succ on tid %" PRIu64 "\n", __func__, idx, sch->tid);
sch_unlock(sch);
Expand Down Expand Up @@ -292,7 +293,8 @@ static int sch_stop(struct mtl_sch_impl* sch) {
}

static struct mtl_sch_impl* sch_request(struct mtl_main_impl* impl, enum mt_sch_type type,
mt_sch_mask_t mask, struct mtl_sch_ops* ops) {
mt_sch_mask_t mask, struct mtl_sch_ops* ops,
int socket) {
struct mtl_sch_impl* sch;

for (int sch_idx = 0; sch_idx < MT_MAX_SCH_NUM; sch_idx++) {
Expand All @@ -313,8 +315,8 @@ static struct mtl_sch_impl* sch_request(struct mtl_main_impl* impl, enum mt_sch_
sch->nb_tasklets = ops->nb_tasklets;
else
sch->nb_tasklets = impl->tasklets_nb_per_sch;
sch->tasklet = mt_rte_zmalloc_socket(sizeof(*sch->tasklet) * sch->nb_tasklets,
mt_socket_id(impl, MTL_PORT_P));
sch->tasklet =
mt_rte_zmalloc_socket(sizeof(*sch->tasklet) * sch->nb_tasklets, socket);
if (!sch->tasklet) {
err("%s(%d), %u tasklet malloc fail\n", __func__, sch_idx, sch->nb_tasklets);
sch_unlock(sch);
Expand All @@ -323,8 +325,8 @@ static struct mtl_sch_impl* sch_request(struct mtl_main_impl* impl, enum mt_sch_
rte_atomic32_inc(&sch->active);
rte_atomic32_inc(&mt_sch_get_mgr(impl)->sch_cnt);
sch_unlock(sch);
info("%s(%d), name %s with %u tasklets, type %d\n", __func__, sch_idx, sch->name,
sch->nb_tasklets, type);
info("%s(%d), name %s with %u tasklets, type %d socket %d\n", __func__, sch_idx,
sch->name, sch->nb_tasklets, type, socket);
return sch;
}
sch_unlock(sch);
Expand Down Expand Up @@ -634,7 +636,7 @@ static inline bool sch_socket_match(int cpu_socket, int dev_socket,
}

int mt_sch_get_lcore(struct mtl_main_impl* impl, unsigned int* lcore,
enum mt_lcore_type type) {
enum mt_lcore_type type, int socket) {
unsigned int cur_lcore;
int ret;
bool skip_numa_check = false;
Expand All @@ -648,17 +650,16 @@ int mt_sch_get_lcore(struct mtl_main_impl* impl, unsigned int* lcore,
do {
cur_lcore = rte_get_next_lcore(cur_lcore, 1, 0);
if ((cur_lcore < RTE_MAX_LCORE) &&
sch_socket_match(rte_lcore_to_socket_id(cur_lcore),
mt_socket_id(impl, MTL_PORT_P), skip_numa_check)) {
sch_socket_match(rte_lcore_to_socket_id(cur_lcore), socket, skip_numa_check)) {
ret = mt_instance_get_lcore(impl, cur_lcore);
if (ret == 0) {
*lcore = cur_lcore;
rte_atomic32_inc(&impl->lcore_cnt);
/* set local lcores info */
mgr->local_lcores_active[cur_lcore] = true;
mgr->local_lcores_type[cur_lcore] = type;
info("%s, succ on manager lcore %d for %s\n", __func__, cur_lcore,
lcore_type_name(type));
info("%s, succ on manager lcore %d for %s socket %d\n", __func__, cur_lcore,
lcore_type_name(type), socket);
return 0;
}
}
Expand All @@ -680,8 +681,7 @@ int mt_sch_get_lcore(struct mtl_main_impl* impl, unsigned int* lcore,
shm_entry = &lcore_shm->lcores_info[cur_lcore];

if ((cur_lcore < RTE_MAX_LCORE) &&
sch_socket_match(rte_lcore_to_socket_id(cur_lcore),
mt_socket_id(impl, MTL_PORT_P), skip_numa_check)) {
sch_socket_match(rte_lcore_to_socket_id(cur_lcore), socket, skip_numa_check)) {
if (!shm_entry->active) {
*lcore = cur_lcore;
shm_entry->active = true;
Expand All @@ -697,8 +697,8 @@ int mt_sch_get_lcore(struct mtl_main_impl* impl, unsigned int* lcore,
mgr->local_lcores_active[cur_lcore] = true;
mgr->local_lcores_type[cur_lcore] = type;
ret = sch_filelock_unlock(mgr);
info("%s, succ on shm lcore %d for %s\n", __func__, cur_lcore,
lcore_type_name(type));
info("%s, succ on shm lcore %d for %s socket %d\n", __func__, cur_lcore,
lcore_type_name(type), socket);
if (ret < 0) {
err("%s, sch_filelock_unlock fail\n", __func__);
return ret;
Expand All @@ -713,7 +713,7 @@ int mt_sch_get_lcore(struct mtl_main_impl* impl, unsigned int* lcore,

if (!skip_numa_check && mt_user_across_numa_core(impl)) {
warn("%s, can't find available lcore from socket %d, try with other numa cpu\n",
__func__, mt_socket_id(impl, MTL_PORT_P));
__func__, socket);
skip_numa_check = true;
goto again;
}
Expand Down Expand Up @@ -854,7 +854,6 @@ int mtl_sch_unregister_tasklet(mtl_tasklet_handle tasklet) {
mtl_tasklet_handle mtl_sch_register_tasklet(struct mtl_sch_impl* sch,
struct mtl_tasklet_ops* tasklet_ops) {
int idx = sch->idx;
struct mtl_main_impl* impl = sch->parent;
struct mt_sch_tasklet_impl* tasklet;

sch_lock(sch);
Expand All @@ -864,7 +863,7 @@ mtl_tasklet_handle mtl_sch_register_tasklet(struct mtl_sch_impl* sch,
if (sch->tasklet[i]) continue;

/* find one empty tasklet slot */
tasklet = mt_rte_zmalloc_socket(sizeof(*tasklet), mt_socket_id(impl, MTL_PORT_P));
tasklet = mt_rte_zmalloc_socket(sizeof(*tasklet), sch->socket);
if (!tasklet) {
err("%s(%d), tasklet malloc fail on %d\n", __func__, idx, i);
sch_unlock(sch);
Expand Down Expand Up @@ -1052,8 +1051,9 @@ int mt_sch_put(struct mtl_sch_impl* sch, int quota_mbs) {
return 0;
}

struct mtl_sch_impl* mt_sch_get(struct mtl_main_impl* impl, int quota_mbs,
enum mt_sch_type type, mt_sch_mask_t mask) {
struct mtl_sch_impl* mt_sch_get_by_socket(struct mtl_main_impl* impl, int quota_mbs,
enum mt_sch_type type, mt_sch_mask_t mask,
int socket) {
int ret, idx;
struct mtl_sch_impl* sch;
struct mt_sch_mgr* mgr = mt_sch_get_mgr(impl);
Expand All @@ -1063,6 +1063,7 @@ struct mtl_sch_impl* mt_sch_get(struct mtl_main_impl* impl, int quota_mbs,
/* first try to find one sch capable with quota */
for (idx = 0; idx < MT_MAX_SCH_NUM; idx++) {
sch = mt_sch_instance(impl, idx);
if (socket != sch->socket) continue;
/* mask check */
if (!(mask & MTL_BIT64(idx))) continue;
/* active and busy check */
Expand All @@ -1071,20 +1072,23 @@ struct mtl_sch_impl* mt_sch_get(struct mtl_main_impl* impl, int quota_mbs,
if (!sch_is_capable(sch, quota_mbs, type)) continue;
ret = mt_sch_add_quota(sch, quota_mbs);
if (ret >= 0) {
info("%s(%d), succ with quota_mbs %d\n", __func__, idx, quota_mbs);
info("%s(%d), succ with quota_mbs %d socket %d\n", __func__, idx, quota_mbs,
socket);
rte_atomic32_inc(&sch->ref_cnt);
sch_mgr_unlock(mgr);
return sch;
}
}

/* no quota, try to create one */
sch = sch_request(impl, type, mask, NULL);
sch = sch_request(impl, type, mask, NULL, socket);
if (!sch) {
err("%s, no free sch\n", __func__);
sch_mgr_unlock(mgr);
return NULL;
}
/* set the socket id */
sch->socket = socket;
idx = sch->idx;
ret = mt_sch_add_quota(sch, quota_mbs);
if (ret < 0) {
Expand Down Expand Up @@ -1316,7 +1320,9 @@ mtl_sch_handle mtl_sch_create(mtl_handle mt, struct mtl_sch_ops* ops) {
return NULL;
}

struct mtl_sch_impl* sch = sch_request(impl, MT_SCH_TYPE_APP, MT_SCH_MASK_ALL, ops);
/* request sch on the MTL_PORT_P socket */
struct mtl_sch_impl* sch = sch_request(impl, MT_SCH_TYPE_APP, MT_SCH_MASK_ALL, ops,
mt_socket_id(impl, MTL_PORT_P));
if (!sch) {
err("%s, sch request fail\n", __func__);
return NULL;
Expand Down
13 changes: 10 additions & 3 deletions lib/src/mt_sch.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,15 @@ static inline void mt_tasklet_set_sleep(struct mt_sch_tasklet_impl* tasklet,

int mt_sch_add_quota(struct mtl_sch_impl* sch, int quota_mbs);

struct mtl_sch_impl* mt_sch_get(struct mtl_main_impl* impl, int quota_mbs,
enum mt_sch_type type, mt_sch_mask_t mask);
struct mtl_sch_impl* mt_sch_get_by_socket(struct mtl_main_impl* impl, int quota_mbs,
enum mt_sch_type type, mt_sch_mask_t mask,
int socket);
static inline struct mtl_sch_impl* mt_sch_get(struct mtl_main_impl* impl, int quota_mbs,
enum mt_sch_type type, mt_sch_mask_t mask) {
/* use the default socket of MTL_PORT_P */
return mt_sch_get_by_socket(impl, quota_mbs, type, mask,
mt_socket_id(impl, MTL_PORT_P));
}
int mt_sch_put(struct mtl_sch_impl* sch, int quota_mbs);

int mt_sch_start_all(struct mtl_main_impl* impl);
Expand All @@ -67,7 +74,7 @@ static inline uint64_t mt_sch_avg_ns_loop(struct mtl_sch_impl* sch) {

int mt_sch_put_lcore(struct mtl_main_impl* impl, unsigned int lcore);
int mt_sch_get_lcore(struct mtl_main_impl* impl, unsigned int* lcore,
enum mt_lcore_type type);
enum mt_lcore_type type, int socket);
bool mt_sch_lcore_valid(struct mtl_main_impl* impl, unsigned int lcore);

#endif
2 changes: 1 addition & 1 deletion lib/src/mt_tap.c
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ int mt_tap_init(struct mtl_main_impl* impl) {

rte_atomic32_set(&cni->stop_tap, 0);
tap_ctx->has_lcore = false;
ret = mt_sch_get_lcore(impl, &lcore, MT_LCORE_TYPE_TAP);
ret = mt_sch_get_lcore(impl, &lcore, MT_LCORE_TYPE_TAP, mt_socket_id(impl, MTL_PORT_P));
if (ret < 0) {
err("%s, get lcore fail %d\n", __func__, ret);
mt_tap_uinit(impl);
Expand Down
3 changes: 2 additions & 1 deletion lib/src/st2110/st_rx_video_session.c
Original file line number Diff line number Diff line change
Expand Up @@ -2286,7 +2286,8 @@ static int rv_init_pkt_lcore(struct mtl_main_impl* impl,
}
s->pkt_lcore_ring = ring;

ret = mt_sch_get_lcore(impl, &lcore, MT_LCORE_TYPE_RXV_RING_LCORE);
ret = mt_sch_get_lcore(impl, &lcore, MT_LCORE_TYPE_RXV_RING_LCORE,
mt_socket_id(impl, port));
if (ret < 0) {
err("%s(%d,%d), get lcore fail %d\n", __func__, mgr_idx, idx, ret);
rv_uinit_pkt_lcore(impl, s);
Expand Down

0 comments on commit afa6ed1

Please sign in to comment.