Skip to content

Commit

Permalink
audio: add numa option support (#904)
Browse files Browse the repository at this point in the history
test with:
./build/app/RxTxApp --config_file
tests/script/audio_json/st30p_loop.json --force_tx_audio_numa 0
and
./build/app/RxTxApp --config_file
tests/script/audio_json/st30p_loop.json --force_rx_audio_numa 0

Signed-off-by: Frank Du <frank.du@intel.com>
  • Loading branch information
frankdjx committed Jun 17, 2024
1 parent 8191b25 commit 363bd62
Show file tree
Hide file tree
Showing 19 changed files with 208 additions and 95 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## Changelog for 24.12

* ice: update driver to 1.14.9
* st2110/20: add force numa option support on session level, see ST20_TX_FLAG_FORCE_NUMA/ST20_RX_FLAG_FORCE_NUMA
* st2110/30: add force numa option support on session level, see ST30_TX_FLAG_FORCE_NUMA/ST30_RX_FLAG_FORCE_NUMA

## Changelog for 24.06

Expand Down
2 changes: 2 additions & 0 deletions app/src/app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,8 @@ struct st_app_context {
uint16_t rx_burst_size;
int force_tx_video_numa;
int force_rx_video_numa;
int force_tx_audio_numa;
int force_rx_audio_numa;

bool ptp_systime_sync;
int ptp_sync_cnt;
Expand Down
10 changes: 10 additions & 0 deletions app/src/args.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ enum st_args_cmd {
ST_ARG_FORCE_NUMA,
ST_ARG_FORCE_TX_VIDEO_NUMA,
ST_ARG_FORCE_RX_VIDEO_NUMA,
ST_ARG_FORCE_TX_AUDIO_NUMA,
ST_ARG_FORCE_RX_AUDIO_NUMA,

ST_ARG_CONFIG_FILE = 0x300,
ST_ARG_TEST_TIME,
Expand Down Expand Up @@ -211,6 +213,8 @@ static struct option st_app_args_options[] = {
{"force_numa", required_argument, 0, ST_ARG_FORCE_NUMA},
{"force_tx_video_numa", required_argument, 0, ST_ARG_FORCE_TX_VIDEO_NUMA},
{"force_rx_video_numa", required_argument, 0, ST_ARG_FORCE_RX_VIDEO_NUMA},
{"force_tx_audio_numa", required_argument, 0, ST_ARG_FORCE_TX_AUDIO_NUMA},
{"force_rx_audio_numa", required_argument, 0, ST_ARG_FORCE_RX_AUDIO_NUMA},

{"config_file", required_argument, 0, ST_ARG_CONFIG_FILE},
{"test_time", required_argument, 0, ST_ARG_TEST_TIME},
Expand Down Expand Up @@ -615,6 +619,12 @@ int st_app_parse_args(struct st_app_context* ctx, struct mtl_init_params* p, int
case ST_ARG_FORCE_RX_VIDEO_NUMA:
ctx->force_rx_video_numa = atoi(optarg);
break;
case ST_ARG_FORCE_TX_AUDIO_NUMA:
ctx->force_tx_audio_numa = atoi(optarg);
break;
case ST_ARG_FORCE_RX_AUDIO_NUMA:
ctx->force_rx_audio_numa = atoi(optarg);
break;
case ST_ARG_SHAPING:
if (!strcmp(optarg, "narrow"))
ctx->tx_pacing_type = ST21_PACING_NARROW;
Expand Down
5 changes: 5 additions & 0 deletions app/src/legacy/rx_audio_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ static int app_rx_audio_init(struct st_app_context* ctx, st_json_audio_session_t
s->enable_timing_parser_meta = true;
}

if (ctx->force_rx_audio_numa >= 0) {
ops.flags |= ST30_RX_FLAG_FORCE_NUMA;
ops.socket_id = ctx->force_rx_audio_numa;
}

st_pthread_mutex_init(&s->st30_wake_mutex, NULL);
st_pthread_cond_init(&s->st30_wake_cond, NULL);

Expand Down
5 changes: 5 additions & 0 deletions app/src/legacy/tx_audio_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,11 @@ static int app_tx_audio_init(struct st_app_context* ctx, st_json_audio_session_t
ops.rl_accuracy_ns = ctx->tx_audio_rl_accuracy_us * 1000;
ops.rl_offset_ns = ctx->tx_audio_rl_offset_us * 1000;

if (ctx->force_tx_audio_numa >= 0) {
ops.flags |= ST30_TX_FLAG_FORCE_NUMA;
ops.socket_id = ctx->force_tx_audio_numa;
}

handle = st30_tx_create(ctx->st, &ops);
if (!handle) {
err("%s(%d), st30_tx_create fail\n", __func__, idx);
Expand Down
5 changes: 5 additions & 0 deletions app/src/rx_st30p_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ static int app_rx_st30p_init(struct st_app_context* ctx,
ops.flags |= ST30P_RX_FLAG_BLOCK_GET;
ops.framebuff_cnt = 3;

if (ctx->force_rx_audio_numa >= 0) {
ops.flags |= ST30P_RX_FLAG_FORCE_NUMA;
ops.socket_id = ctx->force_rx_audio_numa;
}

s->num_port = ops.port.num_port;

handle = st30p_rx_create(ctx->st, &ops);
Expand Down
2 changes: 2 additions & 0 deletions app/src/rxtx_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ static void st_app_ctx_init(struct st_app_context* ctx) {

ctx->force_tx_video_numa = -1;
ctx->force_rx_video_numa = -1;
ctx->force_tx_audio_numa = -1;
ctx->force_rx_audio_numa = -1;

ctx->last_stat_time_ns = st_app_get_monotonic_time();
}
Expand Down
5 changes: 5 additions & 0 deletions app/src/tx_st30p_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ static int app_tx_st30p_init(struct st_app_context* ctx, st_json_st30p_session_t

if (ctx->tx_audio_dedicate_queue) ops.flags |= ST30P_TX_FLAG_DEDICATE_QUEUE;

if (ctx->force_tx_audio_numa >= 0) {
ops.flags |= ST30P_TX_FLAG_FORCE_NUMA;
ops.socket_id = ctx->force_tx_audio_numa;
}

handle = st30p_tx_create(ctx->st, &ops);
if (!handle) {
err("%s(%d), st30p_tx_create fail\n", __func__, idx);
Expand Down
8 changes: 8 additions & 0 deletions include/st30_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ typedef struct st_rx_audio_session_handle_impl* st30_rx_handle;
* If use dedicated queue for TX.
*/
#define ST30_TX_FLAG_DEDICATE_QUEUE (MTL_BIT32(7))
/** Force the numa of the created session, both CPU and memory */
#define ST30_TX_FLAG_FORCE_NUMA (MTL_BIT32(8))

/**
* Flag bit in flags of struct st30_rx_ops, for non MTL_PMD_DPDK_USER.
Expand All @@ -77,6 +79,8 @@ typedef struct st_rx_audio_session_handle_impl* st30_rx_handle;
* If enable the rtcp.
*/
#define ST30_RX_FLAG_ENABLE_RTCP (MTL_BIT32(1))
/** Force the numa of the created session, both CPU and memory */
#define ST30_RX_FLAG_FORCE_NUMA (MTL_BIT32(2))
/**
* Flag bit in flags of struct st30_rx_ops.
* Enable the timing analyze in the stat dump
Expand Down Expand Up @@ -404,6 +408,8 @@ struct st30_tx_ops {
uint32_t rl_accuracy_ns;
/** Optional for ST30_TX_PACING_WAY_RL, the offset time(us) for warmup check point */
int32_t rl_offset_ns;
/** Use this socket if ST30_TX_FLAG_FORCE_NUMA is on, default use the NIC numa */
int socket_id;

/**
* size for each sample group,
Expand Down Expand Up @@ -499,6 +505,8 @@ struct st30_rx_ops {
*/
int (*notify_timing_parser_result)(void* priv, enum mtl_session_port port,
struct st30_rx_tp_meta* tp);
/** Use this socket if ST30_RX_FLAG_FORCE_NUMA is on, default use the NIC numa */
int socket_id;

/**
* size for each sample group,
Expand Down
8 changes: 8 additions & 0 deletions include/st30_pipeline_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ enum st30p_tx_flag {
* If use dedicated queue for TX.
*/
ST30P_TX_FLAG_DEDICATE_QUEUE = (MTL_BIT32(7)),
/** Force the numa of the created session, both CPU and memory */
ST30P_TX_FLAG_FORCE_NUMA = (MTL_BIT32(8)),

/** Enable the st30p_tx_get_frame block behavior to wait until a frame becomes
available or timeout(default: 1s, use st30p_tx_set_block_timeout to customize)*/
Expand Down Expand Up @@ -141,6 +143,8 @@ struct st30p_tx_ops {
uint32_t rl_accuracy_ns;
/** Optional for ST30_TX_PACING_WAY_RL, the offset time(us) for warmup check point */
int32_t rl_offset_ns;
/** Use this socket if ST30P_TX_FLAG_FORCE_NUMA is on, default use the NIC numa */
int socket_id;
};

/**
Expand Down Expand Up @@ -174,6 +178,8 @@ enum st30p_rx_flag {
* Use st30_rx_get_queue_meta to get the queue meta(queue number etc) info.
*/
ST30P_RX_FLAG_DATA_PATH_ONLY = (MTL_BIT32(0)),
/** Force the numa of the created session, both CPU and memory */
ST30P_RX_FLAG_FORCE_NUMA = (MTL_BIT32(2)),

/** Enable the st30p_rx_get_frame block behavior to wait until a frame becomes
available or timeout(default: 1s, use st30p_rx_set_block_timeout to customize) */
Expand Down Expand Up @@ -214,6 +220,8 @@ struct st30p_rx_ops {
* tasklet routine.
*/
int (*notify_frame_available)(void* priv);
/** Use this socket if ST30P_RX_FLAG_FORCE_NUMA is on, default use the NIC numa */
int socket_id;
};

/**
Expand Down
28 changes: 22 additions & 6 deletions lib/src/st2110/pipeline/st30_pipeline_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ static int rx_st30p_create_transport(struct mtl_main_impl* impl, struct st30p_rx
ops_rx.type = ST30_TYPE_FRAME_LEVEL;
ops_rx.notify_frame_ready = rx_st30p_frame_ready;

if (ops->flags & ST30P_RX_FLAG_DATA_PATH_ONLY)
ops_rx.flags |= ST30_RX_FLAG_DATA_PATH_ONLY;
if (ops->flags & ST30P_RX_FLAG_FORCE_NUMA) {
ops_rx.socket_id = ops->socket_id;
ops_rx.flags |= ST30_RX_FLAG_FORCE_NUMA;
}

transport = st30_rx_create(impl, &ops_rx);
if (!transport) {
err("%s(%d), transport create fail\n", __func__, idx);
Expand All @@ -151,10 +158,9 @@ static int rx_st30p_uinit_fbs(struct st30p_rx_ctx* ctx) {
return 0;
}

static int rx_st30p_init_fbs(struct mtl_main_impl* impl, struct st30p_rx_ctx* ctx,
struct st30p_rx_ops* ops) {
static int rx_st30p_init_fbs(struct st30p_rx_ctx* ctx, struct st30p_rx_ops* ops) {
int idx = ctx->idx;
int soc_id = mt_socket_id(impl, MTL_PORT_P);
int soc_id = ctx->socket_id;
struct st30p_rx_frame* frames;

frames = mt_rte_zmalloc_socket(sizeof(*frames) * ctx->framebuff_cnt, soc_id);
Expand Down Expand Up @@ -392,13 +398,23 @@ st30p_rx_handle st30p_rx_create(mtl_handle mt, struct st30p_rx_ops* ops) {
return NULL;
}

ctx = mt_rte_zmalloc_socket(sizeof(*ctx), mt_socket_id(impl, MTL_PORT_P));
enum mtl_port port = mt_port_by_name(impl, ops->port.port[MTL_SESSION_PORT_P]);
if (port >= MTL_PORT_MAX) return NULL;
int socket = mt_socket_id(impl, port);

if (ops->flags & ST30P_RX_FLAG_FORCE_NUMA) {
socket = ops->socket_id;
info("%s, ST30P_RX_FLAG_FORCE_NUMA to socket %d\n", __func__, socket);
}

ctx = mt_rte_zmalloc_socket(sizeof(*ctx), socket);
if (!ctx) {
err("%s, ctx malloc fail\n", __func__);
err("%s, ctx malloc fail on socket %d\n", __func__, socket);
return NULL;
}

ctx->idx = idx;
ctx->socket_id = socket;
ctx->ready = false;
ctx->impl = impl;
ctx->type = MT_ST30_HANDLE_PIPELINE_RX;
Expand All @@ -420,7 +436,7 @@ st30p_rx_handle st30p_rx_create(mtl_handle mt, struct st30p_rx_ops* ops) {

ctx->framebuff_cnt = ops->framebuff_cnt;
/* init fbs */
ret = rx_st30p_init_fbs(impl, ctx, ops);
ret = rx_st30p_init_fbs(ctx, ops);
if (ret < 0) {
err("%s(%d), init fbs fail %d\n", __func__, idx, ret);
st30p_rx_free(ctx);
Expand Down
1 change: 1 addition & 0 deletions lib/src/st2110/pipeline/st30_pipeline_rx.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ struct st30p_rx_frame {
struct st30p_rx_ctx {
struct mtl_main_impl* impl;
int idx;
int socket_id;
enum mt_handle_type type; /* for sanity check */

char ops_name[ST_MAX_NAME_LEN];
Expand Down
25 changes: 19 additions & 6 deletions lib/src/st2110/pipeline/st30_pipeline_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ static int tx_st30p_create_transport(struct mtl_main_impl* impl, struct st30p_tx
}
if (ops->flags & ST30P_TX_FLAG_DEDICATE_QUEUE)
ops_tx.flags |= ST30_TX_FLAG_DEDICATE_QUEUE;
if (ops->flags & ST30P_TX_FLAG_FORCE_NUMA) {
ops_tx.socket_id = ops->socket_id;
ops_tx.flags |= ST30_TX_FLAG_FORCE_NUMA;
}
ops_tx.pacing_way = ops->pacing_way;

ops_tx.fmt = ops->fmt;
Expand Down Expand Up @@ -207,10 +211,9 @@ static int tx_st30p_uinit_fbs(struct st30p_tx_ctx* ctx) {
return 0;
}

static int tx_st30p_init_fbs(struct mtl_main_impl* impl, struct st30p_tx_ctx* ctx,
struct st30p_tx_ops* ops) {
static int tx_st30p_init_fbs(struct st30p_tx_ctx* ctx, struct st30p_tx_ops* ops) {
int idx = ctx->idx;
int soc_id = mt_socket_id(impl, MTL_PORT_P);
int soc_id = ctx->socket_id;
struct st30p_tx_frame* frames;

frames = mt_rte_zmalloc_socket(sizeof(*frames) * ctx->framebuff_cnt, soc_id);
Expand Down Expand Up @@ -471,13 +474,23 @@ st30p_tx_handle st30p_tx_create(mtl_handle mt, struct st30p_tx_ops* ops) {
return NULL;
}

ctx = mt_rte_zmalloc_socket(sizeof(*ctx), mt_socket_id(impl, MTL_PORT_P));
enum mtl_port port = mt_port_by_name(impl, ops->port.port[MTL_SESSION_PORT_P]);
if (port >= MTL_PORT_MAX) return NULL;
int socket = mt_socket_id(impl, port);

if (ops->flags & ST30P_RX_FLAG_FORCE_NUMA) {
socket = ops->socket_id;
info("%s, ST30P_RX_FLAG_FORCE_NUMA to socket %d\n", __func__, socket);
}

ctx = mt_rte_zmalloc_socket(sizeof(*ctx), socket);
if (!ctx) {
err("%s, ctx malloc fail\n", __func__);
err("%s, ctx malloc fail on socket %d\n", __func__, socket);
return NULL;
}

ctx->idx = idx;
ctx->socket_id = socket;
ctx->ready = false;
ctx->impl = impl;
ctx->type = MT_ST30_HANDLE_PIPELINE_TX;
Expand All @@ -501,7 +514,7 @@ st30p_tx_handle st30p_tx_create(mtl_handle mt, struct st30p_tx_ops* ops) {

ctx->framebuff_cnt = ops->framebuff_cnt;
/* init fbs */
ret = tx_st30p_init_fbs(impl, ctx, ops);
ret = tx_st30p_init_fbs(ctx, ops);
if (ret < 0) {
err("%s(%d), init fbs fail %d\n", __func__, idx, ret);
st30p_tx_free(ctx);
Expand Down
1 change: 1 addition & 0 deletions lib/src/st2110/pipeline/st30_pipeline_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ struct st30p_tx_frame {
struct st30p_tx_ctx {
struct mtl_main_impl* impl;
int idx;
int socket_id;
enum mt_handle_type type; /* for sanity check */

char ops_name[ST_MAX_NAME_LEN];
Expand Down
6 changes: 6 additions & 0 deletions lib/src/st2110/st_header.h
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ struct st_tx_audio_session_rl_info {

struct st_tx_audio_session_impl {
int idx; /* index for current session */
int socket_id;
struct st30_tx_ops ops;
char ops_name[ST_MAX_NAME_LEN];
int recovery_idx;
Expand Down Expand Up @@ -872,6 +873,7 @@ struct st_tx_audio_session_impl {

struct st_tx_audio_sessions_mgr {
struct mtl_main_impl* parent;
int socket_id;
int idx; /* index for current sessions mgr */
int max_idx; /* max session index */
struct mt_sch_tasklet_impl* tasklet;
Expand Down Expand Up @@ -955,6 +957,7 @@ struct st_rx_audio_tp {
struct st_rx_audio_session_impl {
int idx; /* index for current session */
struct st_rx_audio_sessions_mgr* mgr;
int socket_id;
bool attached;
struct st30_rx_ops ops;
char ops_name[ST_MAX_NAME_LEN];
Expand Down Expand Up @@ -1046,6 +1049,7 @@ struct st_tx_ancillary_session_pacing {

struct st_tx_ancillary_session_impl {
int idx; /* index for current session */
int socket_id;
struct st_tx_ancillary_sessions_mgr* mgr;
struct st40_tx_ops ops;
char ops_name[ST_MAX_NAME_LEN];
Expand Down Expand Up @@ -1115,6 +1119,7 @@ struct st_tx_ancillary_session_impl {

struct st_tx_ancillary_sessions_mgr {
struct mtl_main_impl* parent;
int socket_id;
int idx; /* index for current sessions mgr */
int max_idx; /* max session index */
struct mt_sch_tasklet_impl* tasklet;
Expand All @@ -1138,6 +1143,7 @@ struct st_tx_ancillary_sessions_mgr {

struct st_rx_ancillary_session_impl {
int idx; /* index for current session */
int socket_id;
struct st_rx_ancillary_sessions_mgr* mgr;
bool attached;
struct st40_rx_ops ops;
Expand Down
Loading

0 comments on commit 363bd62

Please sign in to comment.