Skip to content

Commit

Permalink
st2110/video: add force numa option (#894)
Browse files Browse the repository at this point in the history
test with:
./build/app/RxTxApp --config_file
tests/script/loop_json/st20p_1v_planar10.json --force_rx_video_numa 0
./build/app/RxTxApp --config_file
tests/script/loop_json/st20p_1v_planar10.json --force_tx_video_numa 0

Signed-off-by: Frank Du <frank.du@intel.com>
  • Loading branch information
frankdjx committed Jun 6, 2024
1 parent afa6ed1 commit 6385771
Show file tree
Hide file tree
Showing 29 changed files with 299 additions and 132 deletions.
2 changes: 2 additions & 0 deletions app/src/app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,8 @@ struct st_app_context {
bool tx_display;
bool rx_display;
uint16_t rx_burst_size;
int force_tx_video_numa;
int force_rx_video_numa;

bool ptp_systime_sync;
int ptp_sync_cnt;
Expand Down
10 changes: 10 additions & 0 deletions app/src/args.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ enum st_args_cmd {
ST_ARG_BIND_NUMA,
ST_ARG_NOT_BIND_NUMA,
ST_ARG_FORCE_NUMA,
ST_ARG_FORCE_TX_VIDEO_NUMA,
ST_ARG_FORCE_RX_VIDEO_NUMA,

ST_ARG_CONFIG_FILE = 0x300,
ST_ARG_TEST_TIME,
Expand Down Expand Up @@ -206,6 +208,8 @@ static struct option st_app_args_options[] = {
{"bind_numa", no_argument, 0, ST_ARG_BIND_NUMA},
{"not_bind_numa", no_argument, 0, ST_ARG_NOT_BIND_NUMA},
{"force_numa", required_argument, 0, ST_ARG_FORCE_NUMA},
{"force_tx_video_numa", required_argument, 0, ST_ARG_FORCE_TX_VIDEO_NUMA},
{"force_rx_video_numa", required_argument, 0, ST_ARG_FORCE_RX_VIDEO_NUMA},

{"config_file", required_argument, 0, ST_ARG_CONFIG_FILE},
{"test_time", required_argument, 0, ST_ARG_TEST_TIME},
Expand Down Expand Up @@ -603,6 +607,12 @@ int st_app_parse_args(struct st_app_context* ctx, struct mtl_init_params* p, int
p->port_params[port].socket_id = atoi(optarg);
}
break;
case ST_ARG_FORCE_TX_VIDEO_NUMA:
ctx->force_tx_video_numa = atoi(optarg);
break;
case ST_ARG_FORCE_RX_VIDEO_NUMA:
ctx->force_rx_video_numa = atoi(optarg);
break;
case ST_ARG_SHAPING:
if (!strcmp(optarg, "narrow"))
ctx->tx_pacing_type = ST21_PACING_NARROW;
Expand Down
4 changes: 4 additions & 0 deletions app/src/legacy/rx_video_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,10 @@ static int app_rx_video_init(struct st_app_context* ctx, st_json_video_session_t
if (ctx->enable_timing_parser) ops.flags |= ST20_RX_FLAG_TIMING_PARSER_STAT;
if (ctx->rx_video_multi_thread) ops.flags |= ST20_RX_FLAG_USE_MULTI_THREADS;
ops.rx_burst_size = ctx->rx_burst_size;
if (ctx->force_rx_video_numa >= 0) {
ops.flags |= ST20_RX_FLAG_FORCE_NUMA;
ops.socket_id = ctx->force_rx_video_numa;
}

st_pthread_mutex_init(&s->st20_wake_mutex, NULL);
st_pthread_cond_init(&s->st20_wake_cond, NULL);
Expand Down
4 changes: 4 additions & 0 deletions app/src/legacy/tx_video_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,10 @@ static int app_tx_video_init(struct st_app_context* ctx, st_json_video_session_t
if (ctx->tx_ts_first_pkt) ops.flags |= ST20_TX_FLAG_RTP_TIMESTAMP_FIRST_PKT;
if (ctx->tx_ts_epoch) ops.flags |= ST20_TX_FLAG_RTP_TIMESTAMP_EPOCH;
if (ctx->tx_no_bulk) ops.flags |= ST20_TX_FLAG_DISABLE_BULK;
if (ctx->force_tx_video_numa >= 0) {
ops.flags |= ST20_TX_FLAG_FORCE_NUMA;
ops.socket_id = ctx->force_tx_video_numa;
}

if (video && video->enable_rtcp) {
ops.flags |= ST20_TX_FLAG_ENABLE_RTCP;
Expand Down
4 changes: 4 additions & 0 deletions app/src/rx_st20p_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ static int app_rx_st20p_init(struct st_app_context* ctx,
if (st20p && st20p->enable_rtcp) ops.flags |= ST20P_RX_FLAG_ENABLE_RTCP;
if (ctx->enable_timing_parser) ops.flags |= ST20P_RX_FLAG_TIMING_PARSER_STAT;
if (ctx->rx_video_multi_thread) ops.flags |= ST20P_RX_FLAG_USE_MULTI_THREADS;
if (ctx->force_rx_video_numa >= 0) {
ops.flags |= ST20P_RX_FLAG_FORCE_NUMA;
ops.socket_id = ctx->force_rx_video_numa;
}

s->width = ops.width;
s->height = ops.height;
Expand Down
4 changes: 4 additions & 0 deletions app/src/rx_st22p_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ static int app_rx_st22p_init(struct st_app_context* ctx,
ops.flags |= ST22P_RX_FLAG_BLOCK_GET;
ops.framebuff_cnt = s->framebuff_cnt;
if (st22p && st22p->enable_rtcp) ops.flags |= ST22P_RX_FLAG_ENABLE_RTCP;
if (ctx->force_rx_video_numa >= 0) {
ops.flags |= ST22P_RX_FLAG_FORCE_NUMA;
ops.socket_id = ctx->force_rx_video_numa;
}

s->width = ops.width;
s->height = ops.height;
Expand Down
3 changes: 3 additions & 0 deletions app/src/rxtx_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ static void st_app_ctx_init(struct st_app_context* ctx) {
ctx->rtp_lcore[i] = -1;
}

ctx->force_tx_video_numa = -1;
ctx->force_rx_video_numa = -1;

ctx->last_stat_time_ns = st_app_get_monotonic_time();
}

Expand Down
4 changes: 4 additions & 0 deletions app/src/tx_st20p_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ static int app_tx_st20p_init(struct st_app_context* ctx, st_json_st20p_session_t
if (ctx->tx_ts_first_pkt) ops.flags |= ST20P_TX_FLAG_RTP_TIMESTAMP_FIRST_PKT;
if (ctx->tx_ts_epoch) ops.flags |= ST20P_TX_FLAG_RTP_TIMESTAMP_EPOCH;
if (ctx->tx_no_bulk) ops.flags |= ST20P_TX_FLAG_DISABLE_BULK;
if (ctx->force_tx_video_numa >= 0) {
ops.flags |= ST20P_TX_FLAG_FORCE_NUMA;
ops.socket_id = ctx->force_tx_video_numa;
}

s->width = ops.width;
s->height = ops.height;
Expand Down
4 changes: 4 additions & 0 deletions app/src/tx_st22p_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ static int app_tx_st22p_init(struct st_app_context* ctx, st_json_st22p_session_t
ops.flags |= ST22P_TX_FLAG_BLOCK_GET;
if (st22p && st22p->enable_rtcp) ops.flags |= ST22P_TX_FLAG_ENABLE_RTCP;
if (ctx->tx_no_bulk) ops.flags |= ST22P_TX_FLAG_DISABLE_BULK;
if (ctx->force_tx_video_numa >= 0) {
ops.flags |= ST22P_TX_FLAG_FORCE_NUMA;
ops.socket_id = ctx->force_tx_video_numa;
}

s->width = ops.width;
s->height = ops.height;
Expand Down
28 changes: 28 additions & 0 deletions include/st20_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ extern "C" {
* performance since the object enqueue/dequeue will be acted one by one.
*/
#define ST20_TX_FLAG_DISABLE_BULK (MTL_BIT32(10))
/**
* Flag bit in flags of struct st20_tx_ops.
* Force the numa of the created session, both CPU and memory.
*/
#define ST20_TX_FLAG_FORCE_NUMA (MTL_BIT32(11))

/**
* Flag bit in flags of struct st22_tx_ops.
Expand Down Expand Up @@ -131,6 +136,11 @@ extern "C" {
* performance since the object enqueue/dequeue will be acted one by one.
*/
#define ST22_TX_FLAG_DISABLE_BULK (MTL_BIT32(7))
/**
* Flag bit in flags of struct st22_tx_ops.
* Force the numa of the created session, both CPU and memory.
*/
#define ST22_TX_FLAG_FORCE_NUMA (MTL_BIT32(8))

/**
* Flag bit in flags of struct st20_rx_ops, for non MTL_PMD_DPDK_USER.
Expand All @@ -153,6 +163,11 @@ extern "C" {
* If enabled, simulate random packet loss, test usage only.
*/
#define ST20_RX_FLAG_SIMULATE_PKT_LOSS (MTL_BIT32(3))
/**
* Flag bit in flags of struct st20_rx_ops.
* Force the numa of the created session, both CPU and memory.
*/
#define ST20_RX_FLAG_FORCE_NUMA (MTL_BIT32(4))

/**
* Flag bit in flags of struct st20_rx_ops.
Expand Down Expand Up @@ -233,6 +248,11 @@ extern "C" {
* If enabled, simulate random packet loss, test usage only.
*/
#define ST22_RX_FLAG_SIMULATE_PKT_LOSS (MTL_BIT32(4))
/**
* Flag bit in flags of struct st22_rx_ops.
* Force the numa of the created session, both CPU and memory.
*/
#define ST22_RX_FLAG_FORCE_NUMA (MTL_BIT32(5))

/**
* Flag bit in flags of struct st22_rx_ops.
Expand Down Expand Up @@ -1213,6 +1233,8 @@ struct st20_tx_ops {
* tasklet routine.
*/
int (*notify_rtp_done)(void* priv);
/** Use this socket if ST20_TX_FLAG_FORCE_NUMA is on, default use the NIC numa */
int socket_id;
};

/**
Expand Down Expand Up @@ -1327,6 +1349,8 @@ struct st22_tx_ops {
* tasklet routine.
*/
int (*notify_rtp_done)(void* priv);
/** Use this socket if ST22_TX_FLAG_FORCE_NUMA is on, default use the NIC numa */
int socket_id;
};

/**
Expand Down Expand Up @@ -1542,6 +1566,8 @@ struct st20_rx_ops {
* routine.
*/
int (*notify_rtp_ready)(void* priv);
/** Use this socket if ST20_RX_FLAG_FORCE_NUMA is on, default use the NIC numa */
int socket_id;
};

/**
Expand Down Expand Up @@ -1640,6 +1666,8 @@ struct st22_rx_ops {
* routine.
*/
int (*notify_rtp_ready)(void* priv);
/** Use this socket if ST22_RX_FLAG_FORCE_NUMA is on, default use the NIC numa */
int socket_id;
};

/**
Expand Down
21 changes: 20 additions & 1 deletion include/st_pipeline_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ enum st22p_tx_flag {
* st22p_tx_put_ext_frame.
*/
ST22P_TX_FLAG_EXT_FRAME = (MTL_BIT32(8)),
/** Force the numa of the created session, both CPU and memory */
ST22P_TX_FLAG_FORCE_NUMA = (MTL_BIT32(9)),
/** Enable the st22p_tx_get_frame block behavior to wait until a frame becomes
available or timeout(default: 1s, use st22p_tx_set_block_timeout to customize) */
ST22P_TX_FLAG_BLOCK_GET = (MTL_BIT32(15)),
Expand Down Expand Up @@ -461,6 +463,8 @@ enum st20p_tx_flag {
* performance since the object enqueue/dequeue will be acted one by one.
*/
ST20P_TX_FLAG_DISABLE_BULK = (MTL_BIT32(10)),
/** Force the numa of the created session, both CPU and memory */
ST20P_TX_FLAG_FORCE_NUMA = (MTL_BIT32(11)),
/** Enable the st20p_tx_get_frame block behavior to wait until a frame becomes
available or (default: 1s, use st20p_tx_set_block_timeout to customize) */
ST20P_TX_FLAG_BLOCK_GET = (MTL_BIT32(15)),
Expand Down Expand Up @@ -491,6 +495,8 @@ enum st22p_rx_flag {
* callback(query_ext_frame in st22p_rx_ops) to let MTL can get the frame when needed.
*/
ST22P_RX_FLAG_EXT_FRAME = (MTL_BIT32(4)),
/** Force the numa of the created session, both CPU and memory */
ST22P_RX_FLAG_FORCE_NUMA = MTL_BIT32(5),

/** Enable the st22p_rx_get_frame block behavior to wait until a frame becomes
available or timeout(default: 1s, use st22p_rx_set_block_timeout to customize) */
Expand Down Expand Up @@ -535,6 +541,8 @@ enum st20p_rx_flag {
* If enabled, simulate random packet loss, test usage only.
*/
ST20P_RX_FLAG_SIMULATE_PKT_LOSS = (MTL_BIT32(5)),
/** Force the numa of the created session, both CPU and memory */
ST20P_RX_FLAG_FORCE_NUMA = (MTL_BIT32(6)),

/** Enable the st20p_rx_get_frame block behavior to wait until a frame becomes
available or (default: 1s, use st20p_rx_set_block_timeout to customize) */
Expand Down Expand Up @@ -620,11 +628,12 @@ struct st22_encoder_create_req {
uint16_t framebuff_cnt;
/** thread count, set by lib */
uint32_t codec_thread_cnt;

/** max size for frame(encoded code stream), set by plugin */
size_t max_codestream_size;
/** the flag indicated by plugin to customize the behavior */
uint32_t resp_flag;
/** numa socket id, set by lib */
int socket_id;
};

/** The structure info for st22 encoder dev. */
Expand Down Expand Up @@ -679,6 +688,8 @@ struct st22_decoder_create_req {
uint32_t codec_thread_cnt;
/** the flag indicated by plugin to customize the behavior */
uint32_t resp_flag;
/** numa socket id, set by lib */
int socket_id;
};

/** The structure info for st22 decoder dev. */
Expand Down Expand Up @@ -894,6 +905,8 @@ struct st20p_tx_ops {
* Ex, cast to struct st10_vsync_meta for ST_EVENT_VSYNC.
*/
int (*notify_event)(void* priv, enum st_event event, void* args);
/** Use this socket if ST20P_TX_FLAG_FORCE_NUMA is on, default use the NIC numa */
int socket_id;
};

/** The structure describing how to create a rx st2110-20 pipeline session. */
Expand Down Expand Up @@ -964,6 +977,8 @@ struct st20p_rx_ops {
*/
int (*notify_detected)(void* priv, const struct st20_detect_meta* meta,
struct st20_detect_reply* reply);
/** Use this socket if ST20P_RX_FLAG_FORCE_NUMA is on, default use the NIC numa */
int socket_id;
};

/** The structure describing how to create a tx st2110-22 pipeline session. */
Expand Down Expand Up @@ -1033,6 +1048,8 @@ struct st22p_tx_ops {
* Ex, cast to struct st10_vsync_meta for ST_EVENT_VSYNC.
*/
int (*notify_event)(void* priv, enum st_event event, void* args);
/** Use this socket if ST22P_TX_FLAG_FORCE_NUMA is on, default use the NIC numa */
int socket_id;
};

/** The structure describing how to create a rx st2110-22 pipeline session. */
Expand Down Expand Up @@ -1095,6 +1112,8 @@ struct st22p_rx_ops {
*/
int (*query_ext_frame)(void* priv, struct st_ext_frame* ext_frame,
struct st22_rx_frame_meta* meta);
/** Use this socket if ST22P_RX_FLAG_FORCE_NUMA is on, default use the NIC numa */
int socket_id;
};

/**
Expand Down
8 changes: 4 additions & 4 deletions lib/src/mt_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ static int admin_tx_video_migrate(struct mtl_main_impl* impl, bool* migrated) {
}

dbg("%s, find one busy session(%d,%d)\n", __func__, from_sch->idx, busy_s->idx);
struct mtl_sch_impl* to_sch = mt_sch_get_by_socket(impl, quota_mbs, from_sch->type,
MT_SCH_MASK_ALL, from_sch->socket);
struct mtl_sch_impl* to_sch = mt_sch_get_by_socket(
impl, quota_mbs, from_sch->type, MT_SCH_MASK_ALL, mt_sch_socket_id(from_sch));
if (!to_sch) {
err("%s, no idle sch for session(%d,%d)\n", __func__, from_sch->idx, busy_s->idx);
return -EIO;
Expand Down Expand Up @@ -294,8 +294,8 @@ static int admin_rx_video_migrate(struct mtl_main_impl* impl, bool* migrated) {
}

dbg("%s, find one busy session(%d,%d)\n", __func__, from_sch->idx, busy_s->idx);
struct mtl_sch_impl* to_sch = mt_sch_get_by_socket(impl, quota_mbs, from_sch->type,
MT_SCH_MASK_ALL, from_sch->type);
struct mtl_sch_impl* to_sch = mt_sch_get_by_socket(
impl, quota_mbs, from_sch->type, MT_SCH_MASK_ALL, mt_sch_socket_id(from_sch));
if (!to_sch) {
err("%s, no idle sch for session(%d,%d)\n", __func__, from_sch->idx, busy_s->idx);
return -EIO;
Expand Down
3 changes: 2 additions & 1 deletion lib/src/mt_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ struct mtl_sch_impl {
/* max tasklet index */
volatile int max_tasklet_idx;
unsigned int lcore;
int socket;
/* the socket id this sch attached to */
int socket_id;
bool run_in_thread; /* Run the tasklet inside one thread instead of a pinned lcore. */
pthread_t tid; /* thread id for run_in_thread */
int t_pid; /* gettid */
Expand Down
11 changes: 6 additions & 5 deletions lib/src/mt_sch.c
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ static int sch_start(struct mtl_sch_impl* sch) {
ret = mt_sch_get_lcore(
sch->parent, &sch->lcore,
(sch->type == MT_SCH_TYPE_APP) ? MT_LCORE_TYPE_SCH_USER : MT_LCORE_TYPE_SCH,
sch->socket);
mt_sch_socket_id(sch));
if (ret < 0) {
err("%s(%d), get lcore fail %d\n", __func__, idx, ret);
sch_unlock(sch);
Expand All @@ -255,7 +255,8 @@ static int sch_start(struct mtl_sch_impl* sch) {

rte_atomic32_set(&sch->started, 1);
if (!sch->run_in_thread)
info("%s(%d), succ on lcore %u socket %d\n", __func__, idx, sch->lcore, sch->socket);
info("%s(%d), succ on lcore %u socket %d\n", __func__, idx, sch->lcore,
mt_sch_socket_id(sch));
else
info("%s(%d), succ on tid %" PRIu64 "\n", __func__, idx, sch->tid);
sch_unlock(sch);
Expand Down Expand Up @@ -863,7 +864,7 @@ mtl_tasklet_handle mtl_sch_register_tasklet(struct mtl_sch_impl* sch,
if (sch->tasklet[i]) continue;

/* find one empty tasklet slot */
tasklet = mt_rte_zmalloc_socket(sizeof(*tasklet), sch->socket);
tasklet = mt_rte_zmalloc_socket(sizeof(*tasklet), mt_sch_socket_id(sch));
if (!tasklet) {
err("%s(%d), tasklet malloc fail on %d\n", __func__, idx, i);
sch_unlock(sch);
Expand Down Expand Up @@ -1063,7 +1064,7 @@ struct mtl_sch_impl* mt_sch_get_by_socket(struct mtl_main_impl* impl, int quota_
/* first try to find one sch capable with quota */
for (idx = 0; idx < MT_MAX_SCH_NUM; idx++) {
sch = mt_sch_instance(impl, idx);
if (socket != sch->socket) continue;
if (socket != mt_sch_socket_id(sch)) continue;
/* mask check */
if (!(mask & MTL_BIT64(idx))) continue;
/* active and busy check */
Expand All @@ -1088,7 +1089,7 @@ struct mtl_sch_impl* mt_sch_get_by_socket(struct mtl_main_impl* impl, int quota_
return NULL;
}
/* set the socket id */
sch->socket = socket;
sch->socket_id = socket;
idx = sch->idx;
ret = mt_sch_add_quota(sch, quota_mbs);
if (ret < 0) {
Expand Down
4 changes: 4 additions & 0 deletions lib/src/mt_sch.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ static inline bool mt_sch_started(struct mtl_sch_impl* sch) {
return false;
}

static inline int mt_sch_socket_id(struct mtl_sch_impl* sch) {
return sch->socket_id;
}

static inline void mt_sch_enable_allow_sleep(struct mtl_sch_impl* sch, bool enable) {
sch->allow_sleep = enable;
}
Expand Down
Loading

0 comments on commit 6385771

Please sign in to comment.