Skip to content

Commit

Permalink
Use RTSP Setup response timeout value in KeepAliveMonitor intervalMs
Browse files Browse the repository at this point in the history
Set KeepAliveMonitor to send a keep-alive message at half the timeout value, if provided, by the RTSP Setup response.

Issue: androidx/media#662
PiperOrigin-RevId: 570946237
  • Loading branch information
microkatz authored and Copybara-Service committed Oct 5, 2023
1 parent 3becf4e commit 812d767
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@
public static final int RTSP_STATE_PLAYING = 2;

private static final String TAG = "RtspClient";
private static final long DEFAULT_RTSP_KEEP_ALIVE_INTERVAL_MS = 30_000;

/**
* The default divisor used on the session timeout value to be set as the {@link
* KeepAliveMonitor#intervalMs}.
*/
private static final int DEFAULT_RTSP_KEEP_ALIVE_INTERVAL_DIVISOR = 2;

/** A listener for session information update. */
public interface SessionInfoListener {
Expand Down Expand Up @@ -153,6 +158,7 @@ public interface PlaybackEventListener {
private RtspMessageChannel messageChannel;
@Nullable private RtspAuthUserInfo rtspAuthUserInfo;
@Nullable private String sessionId;
private long sessionTimeoutMs;
@Nullable private KeepAliveMonitor keepAliveMonitor;
@Nullable private RtspAuthenticationInfo rtspAuthenticationInfo;
private @RtspState int rtspState;
Expand Down Expand Up @@ -194,6 +200,7 @@ public RtspClient(
this.messageSender = new MessageSender();
this.uri = RtspMessageUtil.removeUserInfo(uri);
this.messageChannel = new RtspMessageChannel(new MessageListener());
this.sessionTimeoutMs = RtspMessageUtil.DEFAULT_RTSP_TIMEOUT_MS;
this.rtspAuthUserInfo = RtspMessageUtil.parseUserInfo(uri);
this.pendingSeekPositionUs = C.TIME_UNSET;
this.rtspState = RTSP_STATE_UNINITIALIZED;
Expand Down Expand Up @@ -741,6 +748,7 @@ private void onSetupResponseReceived(RtspSetupResponse response) {

rtspState = RTSP_STATE_READY;
sessionId = response.sessionHeader.sessionId;
sessionTimeoutMs = response.sessionHeader.timeoutMs;
continueSetupRtspTrack();
}

Expand All @@ -749,7 +757,9 @@ private void onPlayResponseReceived(RtspPlayResponse response) {

rtspState = RTSP_STATE_PLAYING;
if (keepAliveMonitor == null) {
keepAliveMonitor = new KeepAliveMonitor(DEFAULT_RTSP_KEEP_ALIVE_INTERVAL_MS);
keepAliveMonitor =
new KeepAliveMonitor(
/* intervalMs= */ sessionTimeoutMs / DEFAULT_RTSP_KEEP_ALIVE_INTERVAL_DIVISOR);
keepAliveMonitor.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,13 +407,19 @@ public void parseSessionHeader_withSessionIdContainingSpecialCharacters_succeeds
}

@Test
public void parseSessionHeader_withSessionIdContainingSpecialCharactersAndTimeout_succeeds()
throws Exception {
String sessionHeaderString = "610a63df-9b57.4856_97ac$665f+56e9c04;timeout=60";
public void parseSessionHeader_usingDefaultTimeout_succeeds() throws Exception {
String sessionHeaderString = "610a63df-9b57.4856_97ac$665f+56e9c04";
RtspMessageUtil.RtspSessionHeader sessionHeader =
RtspMessageUtil.parseSessionHeader(sessionHeaderString);
assertThat(sessionHeader.sessionId).isEqualTo("610a63df-9b57.4856_97ac$665f+56e9c04");
assertThat(sessionHeader.timeoutMs).isEqualTo(60_000);
assertThat(sessionHeader.timeoutMs).isEqualTo(RtspMessageUtil.DEFAULT_RTSP_TIMEOUT_MS);
}

@Test
public void parseSessionHeader_withCustomTimeout_succeeds() throws Exception {
String sessionHeaderString = "610a63df-9b57.4856_97ac$665f+56e9c04;timeout=30";
RtspMessageUtil.RtspSessionHeader sessionHeader =
RtspMessageUtil.parseSessionHeader(sessionHeaderString);
assertThat(sessionHeader.timeoutMs).isEqualTo(30_000);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
import org.junit.After;
Expand All @@ -55,6 +57,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.robolectric.annotation.Config;
import org.robolectric.shadows.ShadowLooper;

/** Playback testing for RTSP. */
@Config(sdk = 29)
Expand Down Expand Up @@ -103,9 +106,12 @@ public void prepare_withSupportedTrack_playsTrackUntilEnded() throws Exception {
new ResponseProvider(
clock,
ImmutableList.of(aacRtpPacketStreamDump, mpeg2tsRtpPacketStreamDump),
fakeRtpDataChannel);
fakeRtpDataChannel,
RtspMessageUtil.DEFAULT_RTSP_TIMEOUT_MS,
/* optionsRequestCounter= */ Optional.empty());
rtspServer = new RtspServer(responseProvider);
ExoPlayer player = createExoPlayer(rtspServer.startAndGetPortNumber(), rtpDataChannelFactory);
ExoPlayer player =
createExoPlayer(clock, rtspServer.startAndGetPortNumber(), rtpDataChannelFactory);

PlaybackOutput playbackOutput = PlaybackOutput.register(player, capturingRenderersFactory);
player.prepare();
Expand All @@ -125,8 +131,13 @@ public void prepare_noSupportedTrack_throwsPreparationError() throws Exception {
rtspServer =
new RtspServer(
new ResponseProvider(
clock, ImmutableList.of(mpeg2tsRtpPacketStreamDump), fakeRtpDataChannel));
ExoPlayer player = createExoPlayer(rtspServer.startAndGetPortNumber(), rtpDataChannelFactory);
clock,
ImmutableList.of(mpeg2tsRtpPacketStreamDump),
fakeRtpDataChannel,
RtspMessageUtil.DEFAULT_RTSP_TIMEOUT_MS,
/* optionsRequestCounter= */ Optional.empty()));
ExoPlayer player =
createExoPlayer(clock, rtspServer.startAndGetPortNumber(), rtpDataChannelFactory);

AtomicReference<Throwable> playbackError = new AtomicReference<>();
player.prepare();
Expand Down Expand Up @@ -158,7 +169,7 @@ public void prepare_withUdpUnsupportedWithFallback_fallsbackToTcpAndPlaysUntilEn
new UdpDataSourceRtpDataChannelFactory(DEFAULT_TIMEOUT_MS), rtpTcpDataChannelFactory);
rtspServer = new RtspServer(responseProviderSupportingOnlyTcp);
ExoPlayer player =
createExoPlayer(rtspServer.startAndGetPortNumber(), forwardingRtpDataChannelFactory);
createExoPlayer(clock, rtspServer.startAndGetPortNumber(), forwardingRtpDataChannelFactory);

PlaybackOutput playbackOutput = PlaybackOutput.register(player, capturingRenderersFactory);
player.prepare();
Expand All @@ -183,7 +194,8 @@ public void prepare_withUdpUnsupportedWithoutFallback_throwsRtspPlaybackExceptio
ImmutableList.of(aacRtpPacketStreamDump, mpeg2tsRtpPacketStreamDump),
fakeUdpRtpDataChannel);
rtspServer = new RtspServer(responseProvider);
ExoPlayer player = createExoPlayer(rtspServer.startAndGetPortNumber(), rtpDataChannelFactory);
ExoPlayer player =
createExoPlayer(clock, rtspServer.startAndGetPortNumber(), rtpDataChannelFactory);

AtomicReference<PlaybackException> playbackError = new AtomicReference<>();
player.prepare();
Expand Down Expand Up @@ -220,7 +232,7 @@ public void prepare_withUdpUnsupportedWithUdpFallback_throwsRtspUdpUnsupportedTr
new ForwardingRtpDataChannelFactory(rtpDataChannelFactory, rtpDataChannelFactory);
rtspServer = new RtspServer(responseProviderSupportingOnlyTcp);
ExoPlayer player =
createExoPlayer(rtspServer.startAndGetPortNumber(), forwardingRtpDataChannelFactory);
createExoPlayer(clock, rtspServer.startAndGetPortNumber(), forwardingRtpDataChannelFactory);

AtomicReference<PlaybackException> playbackError = new AtomicReference<>();
player.prepare();
Expand All @@ -240,8 +252,39 @@ public void onPlayerError(PlaybackException error) {
assertThat(playbackError.get()).hasCauseThat().hasMessageThat().isEqualTo("SETUP 461");
}

@Test
public void play_withCustomSessionTimeoutDuration_sendsKeepAliveOptionsRequest()
throws Exception {
FakeUdpDataSourceRtpDataChannel fakeRtpDataChannel = new FakeUdpDataSourceRtpDataChannel();
RtpDataChannel.Factory rtpDataChannelFactory = (trackId) -> fakeRtpDataChannel;
FakeClock fakeClock = new FakeClock(/* initialTimeMs= */ 0, true);
Optional<AtomicInteger> optionsRequestCounter = Optional.of(new AtomicInteger());
ResponseProvider responseProvider =
new ResponseProvider(
fakeClock,
ImmutableList.of(aacRtpPacketStreamDump),
fakeRtpDataChannel,
/* sessionTimeoutMs= */ 30_000L,
optionsRequestCounter);
rtspServer = new RtspServer(responseProvider);
ExoPlayer player =
createExoPlayer(fakeClock, rtspServer.startAndGetPortNumber(), rtpDataChannelFactory);
player.prepare();
player.play();
TestPlayerRunHelper.runUntilPlaybackState(player, Player.STATE_READY);
// Reset optionsRequestCounter to count requests made by the keep-alive monitor
optionsRequestCounter.get().getAndSet(0);

fakeClock.advanceTime(/* timeDiffMs= */ 16_000L);
ShadowLooper.idleMainLooper();

assertThat(optionsRequestCounter.get().get()).isEqualTo(1);

player.release();
}

private ExoPlayer createExoPlayer(
int serverRtspPortNumber, RtpDataChannel.Factory rtpDataChannelFactory) {
Clock clock, int serverRtspPortNumber, RtpDataChannel.Factory rtpDataChannelFactory) {
ExoPlayer player =
new ExoPlayer.Builder(applicationContext, capturingRenderersFactory)
.setClock(clock)
Expand All @@ -260,11 +303,14 @@ private ExoPlayer createExoPlayer(
private static class ResponseProvider implements RtspServer.ResponseProvider {

protected static final String SESSION_ID = "00000000";
private static final String SESSION_TIMEOUT_HEADER_TAG = ";timeout=";

protected final Clock clock;
protected final ArrayList<RtpPacketStreamDump> dumpsForSetUpTracks;
protected final List<RtpPacketStreamDump> dumpsForSetUpTracks = new ArrayList<>();
protected final ImmutableList<RtpPacketStreamDump> rtpPacketStreamDumps;
private final RtspMessageChannel.InterleavedBinaryDataListener binaryDataListener;
private final long sessionTimeoutMs;
private final Optional<AtomicInteger> optionsRequestCounter;

protected RtpPacketTransmitter packetTransmitter;

Expand All @@ -275,15 +321,21 @@ private static class ResponseProvider implements RtspServer.ResponseProvider {
* @param rtpPacketStreamDumps A list of {@link RtpPacketStreamDump}.
* @param binaryDataListener A {@link RtspMessageChannel.InterleavedBinaryDataListener} to send
* RTP data.
* @param sessionTimeoutMs Duration RTSP server will keep the session active without receiving
* any requests.
* @param optionsRequestCounter for how many RTSP Options requests were sent.
*/
public ResponseProvider(
ResponseProvider(
Clock clock,
List<RtpPacketStreamDump> rtpPacketStreamDumps,
RtspMessageChannel.InterleavedBinaryDataListener binaryDataListener) {
RtspMessageChannel.InterleavedBinaryDataListener binaryDataListener,
long sessionTimeoutMs,
Optional<AtomicInteger> optionsRequestCounter) {
this.clock = clock;
this.rtpPacketStreamDumps = ImmutableList.copyOf(rtpPacketStreamDumps);
this.binaryDataListener = binaryDataListener;
dumpsForSetUpTracks = new ArrayList<>();
this.sessionTimeoutMs = sessionTimeoutMs;
this.optionsRequestCounter = optionsRequestCounter;
}

/** Returns a list of the received SETUP requests' corresponding {@link RtpPacketStreamDump}. */
Expand All @@ -295,6 +347,7 @@ public ImmutableList<RtpPacketStreamDump> getDumpsForSetUpTracks() {

@Override
public RtspResponse getOptionsResponse() {
optionsRequestCounter.ifPresent(AtomicInteger::getAndIncrement);
return new RtspResponse(
/* status= */ 200,
new RtspHeaders.Builder()
Expand All @@ -316,9 +369,15 @@ public RtspResponse getSetupResponse(Uri requestedUri, RtspHeaders headers) {
packetTransmitter = new RtpPacketTransmitter(rtpPacketStreamDump, clock);
}
}

return new RtspResponse(
/* status= */ 200, headers.buildUpon().add(RtspHeaders.SESSION, SESSION_ID).build());
/* status= */ 200,
headers
.buildUpon()
.add(
RtspHeaders.SESSION,
// Convert sessionTimeoutMs to seconds
SESSION_ID + SESSION_TIMEOUT_HEADER_TAG + (sessionTimeoutMs / 1000))
.build());
}

@Override
Expand Down Expand Up @@ -348,7 +407,12 @@ public ResponseProviderSupportingOnlyTcp(
Clock clock,
List<RtpPacketStreamDump> rtpPacketStreamDumps,
RtspMessageChannel.InterleavedBinaryDataListener binaryDataListener) {
super(clock, rtpPacketStreamDumps, binaryDataListener);
super(
clock,
rtpPacketStreamDumps,
binaryDataListener,
RtspMessageUtil.DEFAULT_RTSP_TIMEOUT_MS,
/* optionsRequestCounter= */ Optional.empty());
}

@Override
Expand Down

0 comments on commit 812d767

Please sign in to comment.