Skip to content

Commit

Permalink
fix(recording): send PLIs if recording stops flowing
Browse files Browse the repository at this point in the history
There are some scenarios where capture for video streams just freezes
for periods of time when folks are recording themselves alone in a room.
I suspect there's something wrong with KMS's nack-pli procedures in
plain RTP/AVPF endpoints, but can't (and won't) confirm. This issue is
hard to reproduce, though - and hard to debug without a reproducible
scenario.

This commit makes SFU fire a manual PLI salvo when the recording end
reports a stall (up to 3 PLIs over 6 seconds after a minimum of 500 ms
of stalled media). This should help, but needs to deployed and tested
alongside a sanity scan to detect capture gaps proactively.

The behavior is _disabled by default_. The configuration flag for this
is `recordingPliOnNotFlowing`.
  • Loading branch information
prlanzarin committed Mar 8, 2023
1 parent 8d6fd67 commit 35b428c
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 7 deletions.
2 changes: 2 additions & 0 deletions config/default.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ fsBridgeMode: 'RTP'
recordScreenSharing: true
# Whether to record camera raw files
recordWebcams: true
# Fire a max of 3 PLIs over 6 seconds if recording stops flowing for 500 ms
recordingPliOnNotFlowing: false
# Base path where recording raw files will be stored
# WARNING => THE FOLLOWING PARROT CEASED TO BE
recordingBasePath: /var/kurento
Expand Down
78 changes: 74 additions & 4 deletions lib/screenshare/screenshare.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,14 @@ const RECORDING_ADAPTER = config.has('recordingAdapter')
const GENERATE_TS_ON_RECORDING_EVT = config.has('recordingGenerateTsOnRecEvt')
? config.get('recordingGenerateTsOnRecEvt')
: false;
const RECORDING_PLI_ON_NOT_FLOWING = config.has('recordingPliOnNotFlowing')
? config.get('recordingPliOnNotFlowing')
: false;

const LOG_PREFIX = "[screenshare]";
const PLI_SHOTS = 3;
const PLI_FREQ = 2000;
const REC_FLOW_TIMER = 500;

module.exports = class Screenshare extends BaseProvider {
static getCustomMediaSpec (bitrate) {
Expand Down Expand Up @@ -105,6 +111,8 @@ module.exports = class Screenshare extends BaseProvider {
nativeSubMediaId: null, // ?: string (<T>)
hgaPubMediaId: null, // ?: string (<T>)
};
this._pliInterval = null;
this._pliShots = 0;

this._trackMCSEvents();
}
Expand Down Expand Up @@ -413,24 +421,38 @@ module.exports = class Screenshare extends BaseProvider {
}
}

_mediaStateRecording (event, endpoint) {
_handleHGARecStateChange (event, endpoint) {
const { mediaId , state } = event;
const { name, details } = state;

if (mediaId !== endpoint) {
return;
}
const { name, details } = state;

switch (name) {
case "MediaStateChanged":
break;
case "MediaFlowOutStateChange":
case "MediaFlowInStateChange":
if (details === 'NOT_FLOWING' && this.status !== C.MEDIA_PAUSED) {
Logger.debug(`Recording media STOPPED FLOWING on endpoint ${endpoint}`,
Logger.warn(`Recording media STOPPED FLOWING on endpoint ${endpoint}`,
this._getFullPresenterLogMetadata(this._connectionId));

if (RECORDING_PLI_ON_NOT_FLOWING && this.hgaRecordingSet.flowTracker == null) {
this.hgaRecordingSet.flowTracker = setTimeout(() => {
this._pliSalvo(this.hgaRecordingSet.nativeSubMediaId);
}, REC_FLOW_TIMER);
}
} else if (details === 'FLOWING') {
Logger.debug(`Recording media STARTED FLOWING on endpoint ${endpoint}`,
this._getFullPresenterLogMetadata(this._connectionId));
this._clearPliSalvo();

if (this.hgaRecordingSet.flowTracker) {
clearTimeout(this.hgaRecordingSet.flowTracker);
this.hgaRecordingSet.flowTracker = null;
}

if (!this._startRecordingEventFired && !GENERATE_TS_ON_RECORDING_EVT) {
Logger.debug('Firing recording event via flowing event',
this._getFullPresenterLogMetadata(this._connectionId));
Expand All @@ -455,8 +477,50 @@ module.exports = class Screenshare extends BaseProvider {

/* ======= RECORDING METHODS ======= */

_requestKeyframe (mediaId) {
return this.mcs.requestKeyframe(mediaId).catch((error) => {
Logger.warn(`requestKeyframe failed for ${mediaId}: ${error.message}`, {
...this._getFullPresenterLogMetadata(this._connectionId),
error,
});
});
}

_pliSalvo (endpoint) {
if (this._pliInterval || endpoint == null) return;

Logger.warn(
`Firing recording PLI salvo: ${endpoint}`,
this._getFullPresenterLogMetadata(this._connectionId)
);
this._requestKeyframe(endpoint);
this._pliShots++
this._pliInterval = setInterval(() => {
if (this._pliShots >= PLI_SHOTS) {
this._clearPliSalvo();
} else {
this._pliShots++;
this._requestKeyframe(endpoint);
}
}, PLI_FREQ);
}

_clearPliSalvo () {
if (this._pliInterval) {
clearInterval(this._pliInterval);
this._pliInterval = null;
}

this._pliShots = 0;
}

async _stopHGARecordingSet () {
const { nativeSubMediaId, hgaPubMediaId } = this.hgaRecordingSet;
const { nativeSubMediaId, hgaPubMediaId, flowTracker } = this.hgaRecordingSet;

if (flowTracker) {
clearTimeout(flowTracker);
this.hgaRecordingSet.flowTracker = null;
}

if (nativeSubMediaId) {
try {
Expand Down Expand Up @@ -548,6 +612,10 @@ module.exports = class Screenshare extends BaseProvider {
);
this.hgaRecordingSet.hgaPubMediaId = hgaMediaId;

this.mcs.onEvent(C.MEDIA_STATE, hgaMediaId, (event) => {
this._handleHGARecStateChange(event, hgaMediaId);
});

// Step 3
nativeOptions.descriptor = hgaAnswer;
nativeOptions.mediaId = nativeMediaId;
Expand Down Expand Up @@ -1000,6 +1068,8 @@ module.exports = class Screenshare extends BaseProvider {
async stopPresenter () {
// Set this right away to avoid trailing stops
this.status = C.MEDIA_STOPPING;
// Clear PLI interval if presenter
this._clearPliSalvo();
// Stop the recording procedures if needed.
this._stopRecording();
// Send stopRtmpBroadcast message to akka-apps
Expand Down
76 changes: 73 additions & 3 deletions lib/video/video.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ const { hrTime } = require('../common/utils.js');
const Messaging = require('../bbb/messages/Messaging');
const BaseProvider = require('../base/base-provider.js');
const SHOULD_RECORD = config.get('recordWebcams');
const LOG_PREFIX = "[video]";
const errors = require('../base/errors');
const DEFAULT_MEDIA_SPECS = config.get('conference-media-specs');
const SUBSCRIBER_SPEC_SLAVE = config.has('videoSubscriberSpecSlave')
Expand All @@ -24,6 +23,14 @@ const RECORDING_ADAPTER = config.has('recordingAdapter')
const GENERATE_TS_ON_RECORDING_EVT = config.has('recordingGenerateTsOnRecEvt')
? config.get('recordingGenerateTsOnRecEvt')
: false;
const RECORDING_PLI_ON_NOT_FLOWING = config.has('recordingPliOnNotFlowing')
? config.get('recordingPliOnNotFlowing')
: false;

const LOG_PREFIX = "[video]";
const PLI_SHOTS = 3;
const PLI_FREQ = 2000;
const REC_FLOW_TIMER = 500;

let sources = {};

Expand Down Expand Up @@ -71,6 +78,8 @@ module.exports = class Video extends BaseProvider {
nativeSubMediaId: null, // ?: string (<T>)
hgaPubMediaId: null, // ?: string (<T>)
};
this._pliInterval = null;
this._pliShots = 0;

this._bindEventHandlers();
this._trackBigBlueButtonEvents();
Expand Down Expand Up @@ -358,7 +367,7 @@ module.exports = class Video extends BaseProvider {
}
}

_mediaStateRecording (event, endpoint) {
_handleHGARecStateChange (event, endpoint) {
const { mediaId , state } = event;
const { name, details } = state;

Expand All @@ -374,7 +383,22 @@ module.exports = class Video extends BaseProvider {
if (details === 'NOT_FLOWING' && this.status !== C.MEDIA_PAUSED) {
Logger.warn(`Recording media STOPPED FLOWING on endpoint ${endpoint}`,
this._getLogMetadata());

if (RECORDING_PLI_ON_NOT_FLOWING && this.hgaRecordingSet.flowTracker == null) {
this.hgaRecordingSet.flowTracker = setTimeout(() => {
this._pliSalvo(this.hgaRecordingSet.nativeSubMediaId);
}, REC_FLOW_TIMER);
}
} else if (details === 'FLOWING') {
Logger.debug(`Recording media STARTED FLOWING on endpoint ${endpoint}`,
this._getLogMetadata());
this._clearPliSalvo();

if (this.hgaRecordingSet.flowTracker) {
clearTimeout(this.hgaRecordingSet.flowTracker);
this.hgaRecordingSet.flowTracker = null;
}

if (!this._startRecordingEventFired && !GENERATE_TS_ON_RECORDING_EVT) {
Logger.debug('Firing recording event via flowing event',
this._getLogMetadata());
Expand All @@ -383,6 +407,7 @@ module.exports = class Video extends BaseProvider {
}
}
break;

case "Recording":
if (!this._startRecordingEventFired && GENERATE_TS_ON_RECORDING_EVT) {
Logger.debug('Firing recording event via experimental event',
Expand Down Expand Up @@ -474,6 +499,41 @@ module.exports = class Video extends BaseProvider {

/* ======= RECORDING METHODS ======= */

_requestKeyframe (mediaId) {
return this.mcs.requestKeyframe(mediaId).catch((error) => {
Logger.warn(`requestKeyframe failed for ${mediaId}: ${error.message}`, {
...this._getLogMetadata(),
error,
});
});
}

_pliSalvo (endpoint) {
if (this._pliInterval || endpoint == null) return;

Logger.warn(`Firing recording PLI salvo: ${endpoint}`, this._getLogMetadata());

this._requestKeyframe(endpoint);
this._pliShots++
this._pliInterval = setInterval(() => {
if (this._pliShots >= PLI_SHOTS) {
this._clearPliSalvo();
} else {
this._pliShots++;
this._requestKeyframe(endpoint);
}
}, PLI_FREQ);
}

_clearPliSalvo () {
if (this._pliInterval) {
clearInterval(this._pliInterval);
this._pliInterval = null;
}

this._pliShots = 0;
}

shouldRecord () {
return this.isRecorded && this.shared && this.record;
}
Expand Down Expand Up @@ -515,7 +575,12 @@ module.exports = class Video extends BaseProvider {
}

async _stopHGARecordingSet () {
const { nativeSubMediaId, hgaPubMediaId } = this.hgaRecordingSet;
const { nativeSubMediaId, hgaPubMediaId, flowTracker } = this.hgaRecordingSet;

if (flowTracker) {
clearTimeout(flowTracker);
this.hgaRecordingSet.flowTracker = null;
}

if (nativeSubMediaId) {
try {
Expand Down Expand Up @@ -594,6 +659,10 @@ module.exports = class Video extends BaseProvider {
);
this.hgaRecordingSet.hgaPubMediaId = hgaMediaId;

this.mcs.onEvent(C.MEDIA_STATE, hgaMediaId, (event) => {
this._handleHGARecStateChange(event, hgaMediaId);
});

// Step 3
nativeOptions.descriptor = hgaAnswer;
nativeOptions.mediaId = nativeMediaId;
Expand Down Expand Up @@ -876,6 +945,7 @@ module.exports = class Video extends BaseProvider {
return new Promise((resolve) => {
this._untrackBigBlueButtonEvents();
this._untrackMCSEvents();
this._clearPliSalvo();

switch (this.status) {
case C.MEDIA_STOPPED: {
Expand Down

0 comments on commit 35b428c

Please sign in to comment.