Skip to content

Commit

Permalink
fixes in packet reordering in route and mabr
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanlf committed May 21, 2024
1 parent 43a396e commit 46ea11a
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 30 deletions.
4 changes: 4 additions & 0 deletions Changelog
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
## Filters
- DVB-I MABR flute aux and demux

## Misc
- added packet reordering rule for netcap replay from file


#04/2024: GPAC 2.4

## Emscripten|WebAssembly(WASM) support
Expand Down
6 changes: 3 additions & 3 deletions include/gpac/route.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,11 @@ GF_Err gf_route_dmx_process(GF_ROUTEDmx *routedmx);

/*! Sets reordering on.
\param routedmx the ROUTE demultiplexer
\param force_reorder if TRUE, the order flag in ROUTE/LCT is ignored and objects are gathered for the given time. Otherwise, if order flag is set in ROUTE/LCT, an object is considered done as soon as a new object starts
\param timeout_ms maximum delay to wait before considering the object is done when ROUTE/LCT order is not used. A value of 0 implies waiting forever (default value is 5s).
\param reorder_needed if TRUE, the order flag in ROUTE/LCT is ignored and objects are gathered for the given time. Otherwise, if order flag is set in ROUTE/LCT, an object is considered done as soon as a new object starts
\param timeout_us maximum delay in microseconds to wait before considering the object is done when ROUTE/LCT order is not used. A value of 0 implies any out-of-order packet triggers a download completion (default value is 1 ms).
\return error code if any
*/
GF_Err gf_route_set_reorder(GF_ROUTEDmx *routedmx, Bool force_reorder, u32 timeout_ms);
GF_Err gf_route_set_reorder(GF_ROUTEDmx *routedmx, Bool reorder_needed, u32 timeout_us);

/*! Allow segments to be sent while being downloaded.
Expand Down
10 changes: 6 additions & 4 deletions share/doc/man/gpac-filters.1
Original file line number Diff line number Diff line change
Expand Up @@ -6621,7 +6621,7 @@ The cached MPD is assigned the following headers:
.br
* `x-route-ll`: boolean value, if yes indicates that the indicated first segment is currently being received (low latency signaling).
.br
* `x-route-loop`: boolean value, if yes indicates a loop in the service has been detected (usually pcap replay loop).
* `x-route-loop`: boolean value, if yes indicates a loop (e.g. pcap replay) in the service has been detected - only checked if .I cloop is set.
.br

.br
Expand Down Expand Up @@ -6744,9 +6744,11 @@ max_segs (uint, default: 0): maximum number of segments to keep on disk
.br
odir (str): output directory for standalone mode
.br
reorder (bool, default: false): ignore order flag in ROUTE/LCT packets, avoiding considering object done when TOI changes
reorder (bool, default: true): consider packets are not always in order - if false, this will evaluate an LCT object as done when TOI changes
.br
rtimeout (uint, default: 5000): default timeout in ms to wait when gathering out-of-order packets
cloop (bool, default: false): check for loops based on TOI (used for capture replay)
.br
rtimeout (uint, default: 1000): default timeout in us to wait when gathering out-of-order packets
.br
fullseg (bool, default: false): only dispatch full segments in cache mode (always true for other modes)
.br
Expand All @@ -6764,7 +6766,7 @@ repair (enum, default: simple): repair mode for corrupted files
.br
repair_url (cstr): repair url
.br
max_sess (uint, default: 1): max number of concurrent HTTP repair sassions
max_sess (uint, default: 1): max number of concurrent HTTP repair sessions
.br

.br
Expand Down
13 changes: 7 additions & 6 deletions src/filters/in_route.c
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ void routein_on_event_file(ROUTEInCtx *ctx, GF_ROUTEEventType evt, u32 evt_param
break;
}
} else if (!is_defer_repair && (ctx->sync_tsi == finfo->tsi)) {
if (ctx->last_toi > finfo->toi) {
if (ctx->cloop && (ctx->last_toi > finfo->toi + 100)) {
GF_LOG(GF_LOG_WARNING, GF_LOG_ROUTE, ("[ROUTE] Loop detected on service %d for TSI %u: prev TOI %u this toi %u\n", ctx->tune_service_id, finfo->tsi, ctx->last_toi, finfo->toi));

gf_route_dmx_purge_objects(ctx->route_dmx, evt_param);
Expand Down Expand Up @@ -662,7 +662,7 @@ static Bool routein_process_event(GF_Filter *filter, const GF_FilterEvent *evt)
if (!ctx->initial_play_forced)
ctx->nb_playing++;
ctx->initial_play_forced = GF_FALSE;
} else {
} else if (evt->base.type==GF_FEVT_STOP) {
ctx->nb_playing--;
}
return GF_TRUE;
Expand All @@ -685,8 +685,9 @@ static const GF_FilterArgs ROUTEInArgs[] =
{ OFFS(tsidbg), "gather only objects with given TSI (debug)", GF_PROP_UINT, "0", NULL, GF_FS_ARG_HINT_EXPERT},
{ OFFS(max_segs), "maximum number of segments to keep on disk", GF_PROP_UINT, "0", NULL, GF_FS_ARG_HINT_EXPERT},
{ OFFS(odir), "output directory for standalone mode", GF_PROP_STRING, NULL, NULL, GF_FS_ARG_HINT_ADVANCED},
{ OFFS(reorder), "ignore order flag in ROUTE/LCT packets, avoiding considering object done when TOI changes", GF_PROP_BOOL, "false", NULL, GF_FS_ARG_HINT_EXPERT},
{ OFFS(rtimeout), "default timeout in ms to wait when gathering out-of-order packets", GF_PROP_UINT, "5000", NULL, GF_FS_ARG_HINT_EXPERT},
{ OFFS(reorder), "consider packets are not always in order - if false, this will evaluate an LCT object as done when TOI changes", GF_PROP_BOOL, "true", NULL, GF_FS_ARG_HINT_EXPERT},
{ OFFS(cloop), "check for loops based on TOI (used for capture replay)", GF_PROP_BOOL, "false", NULL, 0},
{ OFFS(rtimeout), "default timeout in us to wait when gathering out-of-order packets", GF_PROP_UINT, "1000", NULL, GF_FS_ARG_HINT_EXPERT},
{ OFFS(fullseg), "only dispatch full segments in cache mode (always true for other modes)", GF_PROP_BOOL, "false", NULL, GF_FS_ARG_HINT_ADVANCED},
{ OFFS(repair), "repair mode for corrupted files\n"
"- no: no repair is performed\n"
Expand All @@ -695,7 +696,7 @@ static const GF_FilterArgs ROUTEInArgs[] =
"- full: HTTP-based repair, not yet implemented"
, GF_PROP_UINT, "simple", "no|simple|strict|full", GF_FS_ARG_HINT_EXPERT},
{ OFFS(repair_url), "repair url", GF_PROP_NAME, NULL, NULL, 0},
{ OFFS(max_sess), "max number of concurrent HTTP repair sassions", GF_PROP_UINT, "1", NULL, 0},
{ OFFS(max_sess), "max number of concurrent HTTP repair sessions", GF_PROP_UINT, "1", NULL, 0},
{0}
};

Expand Down Expand Up @@ -723,7 +724,7 @@ GF_FilterRegister ROUTEInRegister = {
"- `x-route`: integer value, indicates the ROUTE service ID.\n"
"- `x-route-first-seg`: string value, indicates the name of the first segment (completely or currently being) retrieved from the broadcast.\n"
"- `x-route-ll`: boolean value, if yes indicates that the indicated first segment is currently being received (low latency signaling).\n"
"- `x-route-loop`: boolean value, if yes indicates a loop in the service has been detected (usually pcap replay loop).\n"
"- `x-route-loop`: boolean value, if yes indicates a loop (e.g. pcap replay) in the service has been detected - only checked if [-cloop]() is set.\n"
" \n"
"The cached files are assigned the following headers:\n"
"- `x-route`: boolean value, if yes indicates the file comes from an ROUTE session.\n"
Expand Down
2 changes: 1 addition & 1 deletion src/filters/in_route.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ typedef struct
{
//options
char *src, *ifce, *odir, *repair_url;
Bool gcache, kc, skipr, reorder, fullseg;
Bool gcache, kc, skipr, reorder, fullseg, cloop;
u32 buffer, timeout, stats, max_segs, tsidbg, rtimeout, nbcached, repair;
u32 max_sess;
s32 tunein, stsi;
Expand Down
29 changes: 14 additions & 15 deletions src/media_tools/route_dmx.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ typedef struct
GF_LCTFragInfo *frags;
GF_LCTObjectStatus status;
u32 start_time_ms, download_time_ms;
u32 last_gather_time;
u64 last_gather_time;
u8 closed_flag;
u8 force_keep;
//flag set when the last chunk has been declared in ll_map
Expand Down Expand Up @@ -203,8 +203,8 @@ struct __gf_routedmx {
u8 *unz_buffer;
u32 unz_buffer_size;

u32 reorder_timeout;
Bool force_reorder;
u64 reorder_timeout;
Bool force_in_order;
Bool progressive_dispatch;
u32 nrt_max_seg;

Expand Down Expand Up @@ -391,7 +391,7 @@ static GF_ROUTEDmx *gf_route_dmx_new_internal(const char *ifce, u32 sock_buffer_
//create static bs
routedmx->bs = gf_bs_new((char*)&e, 1, GF_BITSTREAM_READ);

routedmx->reorder_timeout = 5000;
routedmx->reorder_timeout = 1000;

routedmx->on_event = on_event;
routedmx->udta = udta;
Expand Down Expand Up @@ -621,7 +621,7 @@ GF_Err gf_route_set_reorder(GF_ROUTEDmx *routedmx, Bool force_reorder, u32 timeo
{
if (!routedmx) return GF_BAD_PARAM;
routedmx->reorder_timeout = timeout_ms;
routedmx->force_reorder = force_reorder;
routedmx->force_in_order = !force_reorder;
return GF_OK;
}

Expand Down Expand Up @@ -1399,7 +1399,8 @@ static GF_Err gf_route_service_gather_object(GF_ROUTEDmx *routedmx, GF_ROUTEServ
GF_LCTObject *obj = s->last_active_obj;
GF_FLUTELLMapEntry *ll_map = NULL;

if (routedmx->force_reorder)
//not on a broadcast channel, ignore in_order flag
if (!routedmx->force_in_order)
in_order = GF_FALSE;

if (fdt_symbol_length) {
Expand Down Expand Up @@ -1595,9 +1596,7 @@ static GF_Err gf_route_service_gather_object(GF_ROUTEDmx *routedmx, GF_ROUTEServ
} else {
gf_route_obj_to_reservoir(routedmx, s, o);
}
}
//note that if not in order and no timeout, we wait forever !
else if (in_order || routedmx->reorder_timeout) {
} else {
count = gf_list_count(s->objects);
for (i=0; i<count; i++) {
u32 new_count;
Expand All @@ -1617,12 +1616,13 @@ static GF_Err gf_route_service_gather_object(GF_ROUTEDmx *routedmx, GF_ROUTEServ
) {
continue;
}
else if (!in_order) {
u32 elapsed = gf_sys_clock() - o->last_gather_time;
//packets not in order and timeout used
else if (!in_order && routedmx->reorder_timeout) {
u64 elapsed = gf_sys_clock_high_res() - o->last_gather_time;
if (elapsed < routedmx->reorder_timeout)
continue;

GF_LOG(GF_LOG_WARNING, GF_LOG_ROUTE, ("[%s] Object TSI %u TOI %u timeout after %d ms - forcing dispatch\n", s->log_name, o->tsi, o->toi, elapsed ));
GF_LOG(GF_LOG_WARNING, GF_LOG_ROUTE, ("[%s] Object TSI %u TOI %u timeout after %d us - forcing dispatch\n", s->log_name, o->tsi, o->toi, elapsed ));
} else if (o->rlct && !o->rlct->tsi_init) {
GF_LOG(GF_LOG_DEBUG, GF_LOG_ROUTE, ("[%s] Object TSI %u TOI %u incomplete (tune-in) - forcing dispatch\n", s->log_name, o->tsi, o->toi, toi ));
}
Expand Down Expand Up @@ -1677,7 +1677,7 @@ static GF_Err gf_route_service_gather_object(GF_ROUTEDmx *routedmx, GF_ROUTEServ
}
return GF_EOS;
}
obj->last_gather_time = gf_sys_clock();
obj->last_gather_time = gf_sys_clock_high_res();

if (!size) {
GF_LOG(GF_LOG_DEBUG, GF_LOG_ROUTE, ("[%s] Empty LCT packet TSI %u TOI %u\n", s->log_name, tsi, toi));
Expand Down Expand Up @@ -2593,7 +2593,6 @@ static GF_Err dmx_process_service_dvb_flute(GF_ROUTEDmx *routedmx, GF_ROUTEServi
u32 nb_read, cp , v, C, psi, S, O, H, /*Res, A,*/ B, hdr_len, cc, tsi, toi, pos;
u32 /*a_G=0, a_U=0,*/ a_S=0, a_M=0/*, a_A=0, a_H=0, a_D=0*/;
u64 transfert_length=0;
Bool in_order = GF_TRUE;
u32 start_offset=0;
GF_ROUTELCTChannel *rlct=NULL;
GF_LCTObject *gather_object=NULL;
Expand Down Expand Up @@ -2735,7 +2734,7 @@ static GF_Err dmx_process_service_dvb_flute(GF_ROUTEDmx *routedmx, GF_ROUTEServi
}
}

e = gf_route_service_gather_object(routedmx, s, tsi, toi, start_offset, routedmx->buffer + pos, nb_read-pos, (u32) transfert_length, B, in_order, rlct, &gather_object, ESI, fdt_symbol_length);
e = gf_route_service_gather_object(routedmx, s, tsi, toi, start_offset, routedmx->buffer + pos, nb_read-pos, (u32) transfert_length, B, GF_FALSE, rlct, &gather_object, ESI, fdt_symbol_length);

start_offset += (nb_read ) * ESI;

Expand Down
3 changes: 2 additions & 1 deletion src/utils/os_net.c
Original file line number Diff line number Diff line change
Expand Up @@ -1668,7 +1668,8 @@ static void gf_netcap_load_pck(GF_NetcapFilter *nf, u64 now)
s->cap_info->patch_offset = 0;
break;
}
if (netcap_filter_pck(nf->read_sock_selected, nf->pck_len, GF_FALSE))
//apply rule(s) except when sending delayed packets
if (!nf->reorder_resume_pos && netcap_filter_pck(nf->read_sock_selected, nf->pck_len, GF_FALSE))
nf->read_sock_selected = NULL;

if (!nf->read_sock_selected) {
Expand Down

0 comments on commit 46ea11a

Please sign in to comment.