Skip to content

Commit

Permalink
ecosystem/ffmpeg: improve unicast initial (#917) (#931)
Browse files Browse the repository at this point in the history
Address an issue where the RX is sending
ARP replies to the TX while still being completely uninitialized, which
leads to the session dropping frames. The problem worsens as the TX
outperforms
the RX in terms of initialization speed.

Resolve this by halting the FFmpeg plugin
initialization and starting the scheduled tasklets because we don't want
the CNI thread to respond to the ARP requests sent in the tv_attach
function
until the RX is ready to receive those packets.
  • Loading branch information
DawidWesierski4 committed Jul 17, 2024
1 parent 2732e93 commit a452b02
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 4 deletions.
1 change: 0 additions & 1 deletion ecosystem/ffmpeg_plugin/mtl_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ mtl_handle mtl_dev_get(AVFormatContext* ctx, const struct StDevArgs* args, int*
p.flags |= MTL_FLAG_RX_VIDEO_MIGRATE;
p.flags |= MTL_FLAG_RX_SEPARATE_VIDEO_LCORE;
p.flags |= MTL_FLAG_BIND_NUMA;
p.flags |= MTL_FLAG_DEV_AUTO_START_STOP;
p.log_level = MTL_LOG_LEVEL_INFO; // log level. ERROR, INFO, WARNING

if (args->dma_dev) {
Expand Down
31 changes: 30 additions & 1 deletion ecosystem/ffmpeg_plugin/mtl_st20p_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ typedef struct MtlSt20pDemuxerContext {
AVRational framerate;
int fb_cnt;
int timeout_sec;
int session_init_retry;

mtl_handle dev_handle;
st20p_rx_handle rx_handle;
Expand Down Expand Up @@ -168,6 +169,13 @@ static int mtl_st20p_read_header(AVFormatContext* ctx) {
return AVERROR(EIO);
}

ret = mtl_start(s->dev_handle);
if (ret < 0) {
err(ctx, "%s, mtl start fail %d\n", __func__, ret);
mtl_st20p_read_close(ctx);
return AVERROR(EIO);
}

info(ctx, "%s(%d), rx handle %p\n", __func__, s->idx, s->rx_handle);
return 0;
}
Expand All @@ -178,7 +186,20 @@ static int mtl_st20p_read_packet(AVFormatContext* ctx, AVPacket* pkt) {
struct st_frame* frame;

dbg("%s(%d), start\n", __func__, s->idx);
frame = st20p_rx_get_frame(s->rx_handle);

if (0 == s->frame_counter) {
/*
* for unicast scenarios, retries may be necessary
* if the transmitter is not yet initialized.
*/
for (int i = 1; i <= s->session_init_retry; i++) {
frame = st20p_rx_get_frame(s->rx_handle);
if (frame) break;
info(ctx, "%s(%d) session initialization retry %d\n", __func__, s->idx, i);
}
} else
frame = st20p_rx_get_frame(s->rx_handle);

if (!frame) {
info(ctx, "%s(%d), st20p_rx_get_frame timeout\n", __func__, s->idx);
return AVERROR(EIO);
Expand Down Expand Up @@ -255,6 +276,14 @@ static const AVOption mtl_st20p_rx_options[] = {
0,
60 * 10,
DEC},
{"init_retry",
"Number of retries to the initial read packet",
OFFSET(session_init_retry),
AV_OPT_TYPE_INT,
{.i64 = 5},
0,
60,
DEC},
{"fb_cnt",
"Frame buffer count",
OFFSET(fb_cnt),
Expand Down
7 changes: 7 additions & 0 deletions ecosystem/ffmpeg_plugin/mtl_st20p_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ static int mtl_st20p_write_header(AVFormatContext* ctx) {
return AVERROR(EIO);
}

ret = mtl_start(s->dev_handle);
if (ret < 0) {
err(ctx, "%s, mtl start fail %d\n", __func__, ret);
mtl_st20p_write_close(ctx);
return AVERROR(EIO);
}

s->tx_handle = st20p_tx_create(s->dev_handle, &ops_tx);
if (!s->tx_handle) {
err(ctx, "%s, st20p_tx_create failed\n", __func__);
Expand Down
38 changes: 37 additions & 1 deletion ecosystem/ffmpeg_plugin/mtl_st22p_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ typedef struct MtlSt22pDemuxerContext {
int fb_cnt;
int codec_thread_cnt;
int timeout_sec;
int session_init_retry;

mtl_handle dev_handle;
st22p_rx_handle rx_handle;
Expand Down Expand Up @@ -180,6 +181,13 @@ static int mtl_st22p_read_header(AVFormatContext* ctx) {
return AVERROR(EIO);
}

ret = mtl_start(s->dev_handle);
if (ret < 0) {
err(ctx, "%s, mtl start fail %d\n", __func__, ret);
mtl_st22p_read_close(ctx);
return AVERROR(EIO);
}

info(ctx, "%s(%d), rx handle %p\n", __func__, s->idx, s->rx_handle);
return 0;
}
Expand Down Expand Up @@ -287,6 +295,13 @@ static int mtl_st22_read_header(AVFormatContext* ctx) {
st->codecpar->bit_rate =
av_rescale_q(ctx->packet_size, (AVRational){8, 1}, st->time_base);

ret = mtl_start(s->dev_handle);
if (ret < 0) {
err(ctx, "%s, mtl start fail %d\n", __func__, ret);
mtl_st22p_read_close(ctx);
return AVERROR(EIO);
}

info(ctx, "%s(%d), rx handle %p, max packet_size %u\n", __func__, s->idx, s->rx_handle,
ctx->packet_size);
return 0;
Expand All @@ -298,7 +313,20 @@ static int mtl_st22p_read_packet(AVFormatContext* ctx, AVPacket* pkt) {
struct st_frame* frame;

dbg("%s(%d), start\n", __func__, s->idx);
frame = st22p_rx_get_frame(s->rx_handle);

if (0 == s->frame_counter) {
/*
* for unicast scenarios, retries may be necessary
* if the transmitter is not yet initialized.
*/
for (int i = 1; i <= s->session_init_retry; i++) {
frame = st22p_rx_get_frame(s->rx_handle);
if (frame) break;
info(ctx, "%s(%d) session initialization retry %d\n", __func__, s->idx, i);
}
} else
frame = st22p_rx_get_frame(s->rx_handle);

if (!frame) {
info(ctx, "%s(%d), st22p_rx_get_frame timeout\n", __func__, s->idx);
return AVERROR(EIO);
Expand Down Expand Up @@ -409,6 +437,14 @@ static const AVOption mtl_st22p_rx_options[] = {
0,
60 * 10,
DEC},
{"init_retry",
"Number of retries to the initial read packet",
OFFSET(session_init_retry),
AV_OPT_TYPE_INT,
{.i64 = 5},
0,
60,
DEC},
{"fb_cnt",
"Frame buffer count",
OFFSET(fb_cnt),
Expand Down
14 changes: 14 additions & 0 deletions ecosystem/ffmpeg_plugin/mtl_st22p_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ static int mtl_st22p_write_header(AVFormatContext* ctx) {
return AVERROR(EIO);
}

ret = mtl_start(s->dev_handle);
if (ret < 0) {
err(ctx, "%s, mtl start fail %d\n", __func__, ret);
mtl_st22p_write_close(ctx);
return AVERROR(EIO);
}

s->frame_size = st22p_tx_frame_size(s->tx_handle);
info(ctx, "%s(%d), tx_handle %p\n", __func__, s->idx, s->tx_handle);
return 0;
Expand Down Expand Up @@ -211,6 +218,13 @@ static int mtl_st22_write_header(AVFormatContext* ctx) {
return AVERROR(EIO);
}

ret = mtl_start(s->dev_handle);
if (ret < 0) {
err(ctx, "%s, mtl start fail %d\n", __func__, ret);
mtl_st22p_write_close(ctx);
return AVERROR(EIO);
}

s->tx_handle = st22p_tx_create(s->dev_handle, &ops_tx);
if (!s->tx_handle) {
err(ctx, "%s, st22p_tx_create failed\n", __func__);
Expand Down
31 changes: 30 additions & 1 deletion ecosystem/ffmpeg_plugin/mtl_st30p_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ typedef struct MtlSt30pDemuxerContext {
/* arguments for session */
int fb_cnt;
int timeout_sec;
int session_init_retry;
int sample_rate;
int channels;
enum st30_fmt fmt;
Expand Down Expand Up @@ -180,6 +181,13 @@ static int mtl_st30p_read_header(AVFormatContext* ctx) {
return AVERROR(EIO);
}

ret = mtl_start(s->dev_handle);
if (ret < 0) {
err(ctx, "%s, mtl start fail %d\n", __func__, ret);
mtl_st30p_read_close(ctx);
return AVERROR(EIO);
}

info(ctx, "%s(%d), rx handle %p\n", __func__, s->idx, s->rx_handle);
return 0;
}
Expand All @@ -190,7 +198,20 @@ static int mtl_st30p_read_packet(AVFormatContext* ctx, AVPacket* pkt) {
struct st30_frame* frame;

dbg("%s(%d), start\n", __func__, s->idx);
frame = st30p_rx_get_frame(s->rx_handle);

if (0 == s->frame_counter) {
/*
* for unicast scenarios, retries may be necessary
* if the transmitter is not yet initialized.
*/
for (int i = 1; i <= s->session_init_retry; i++) {
frame = st30p_rx_get_frame(s->rx_handle);
if (frame) break;
info(ctx, "%s(%d) session initialization retry %d\n", __func__, s->idx, i);
}
} else
frame = st30p_rx_get_frame(s->rx_handle);

if (!frame) {
info(ctx, "%s(%d), st30p_rx_get_frame timeout\n", __func__, s->idx);
return AVERROR(EIO);
Expand Down Expand Up @@ -242,6 +263,14 @@ static const AVOption mtl_st30p_rx_options[] = {
0,
60 * 10,
DEC},
{"init_retry",
"Number of retries to the initial read packet",
OFFSET(session_init_retry),
AV_OPT_TYPE_INT,
{.i64 = 5},
0,
60,
DEC},
{"ar",
"audio sampling rate",
OFFSET(sample_rate),
Expand Down
7 changes: 7 additions & 0 deletions ecosystem/ffmpeg_plugin/mtl_st30p_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ static int mtl_st30p_write_header(AVFormatContext* ctx) {
return AVERROR(EIO);
}

ret = mtl_start(s->dev_handle);
if (ret < 0) {
err(ctx, "%s, mtl start fail %d\n", __func__, ret);
mtl_st30p_write_close(ctx);
return AVERROR(EIO);
}

s->tx_handle = st30p_tx_create(s->dev_handle, &ops_tx);
if (!s->tx_handle) {
err(ctx, "%s, st30p_tx_create failed\n", __func__);
Expand Down

0 comments on commit a452b02

Please sign in to comment.