From 69f9e23268e1df9b96414fec69dc2373294c6697 Mon Sep 17 00:00:00 2001 From: claincly Date: Fri, 4 Jun 2021 11:07:02 +0100 Subject: [PATCH] Allow reading RTSP message body by Content-Length. Related to Issue: #8941. RTSP message body's format is not regulated by the RTSP spec, meaning it can use either CRLF or LF as its line terminator. The old code assumes every line ends with CRLF (RTSP message and the message body); the new code will rely on the Content-Length information to receive the bytes for the message body. #minor-release PiperOrigin-RevId: 377475565 --- .../source/rtsp/RtspMessageChannel.java | 166 ++++++++++++------ .../source/rtsp/RtspMessageUtil.java | 16 +- .../source/rtsp/SessionDescriptionParser.java | 4 +- .../source/rtsp/RtspMessageChannelTest.java | 26 ++- .../source/rtsp/RtspMessageUtilTest.java | 14 ++ 5 files changed, 162 insertions(+), 64 deletions(-) diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java index ab8d7760337..d45af667dcb 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java @@ -17,6 +17,7 @@ import static com.google.android.exoplayer2.source.rtsp.RtspMessageUtil.isRtspStartLine; import static com.google.android.exoplayer2.util.Assertions.checkArgument; +import static com.google.android.exoplayer2.util.Assertions.checkState; import static com.google.android.exoplayer2.util.Assertions.checkStateNotNull; import android.os.Handler; @@ -31,6 +32,7 @@ import com.google.common.base.Ascii; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.DataInputStream; @@ -251,10 +253,10 @@ public void close() { private final class Receiver implements Loadable { /** ASCII dollar encapsulates the RTP packets in interleaved mode (RFC2326 Section 10.12). */ - private static final byte RTSP_INTERLEAVED_MESSAGE_MARKER = '$'; + private static final byte INTERLEAVED_MESSAGE_MARKER = '$'; private final DataInputStream dataInputStream; - private final RtspMessageBuilder messageBuilder; + private final MessageParser messageParser; private volatile boolean loadCanceled; /** @@ -266,7 +268,7 @@ private final class Receiver implements Loadable { */ public Receiver(InputStream inputStream) { dataInputStream = new DataInputStream(inputStream); - messageBuilder = new RtspMessageBuilder(); + messageParser = new MessageParser(); } @Override @@ -279,7 +281,7 @@ public void load() throws IOException { while (!loadCanceled) { // TODO(internal b/172331505) Use a buffered read. byte firstByte = dataInputStream.readByte(); - if (firstByte == RTSP_INTERLEAVED_MESSAGE_MARKER) { + if (firstByte == INTERLEAVED_MESSAGE_MARKER) { handleInterleavedBinaryData(); } else { handleRtspMessage(firstByte); @@ -289,36 +291,11 @@ public void load() throws IOException { /** Handles an entire RTSP message. */ private void handleRtspMessage(byte firstByte) throws IOException { - @Nullable - ImmutableList messageLines = messageBuilder.addLine(handleRtspMessageLine(firstByte)); - while (messageLines == null) { - messageLines = messageBuilder.addLine(handleRtspMessageLine(dataInputStream.readByte())); - } - if (!closed) { - messageListener.onRtspMessageReceived(messageLines); + messageListener.onRtspMessageReceived(messageParser.parseNext(firstByte, dataInputStream)); } } - /** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */ - private byte[] handleRtspMessageLine(byte firstByte) throws IOException { - ByteArrayOutputStream messageByteStream = new ByteArrayOutputStream(); - - byte[] peekedBytes = new byte[2]; - peekedBytes[0] = firstByte; - peekedBytes[1] = dataInputStream.readByte(); - messageByteStream.write(peekedBytes); - - while (peekedBytes[0] != Ascii.CR || peekedBytes[1] != Ascii.LF) { - // Shift the CRLF buffer. - peekedBytes[0] = peekedBytes[1]; - peekedBytes[1] = dataInputStream.readByte(); - messageByteStream.write(peekedBytes[1]); - } - - return messageByteStream.toByteArray(); - } - private void handleInterleavedBinaryData() throws IOException { int channel = dataInputStream.readUnsignedByte(); int size = dataInputStream.readUnsignedShort(); @@ -354,38 +331,91 @@ public LoadErrorAction onLoadError( return Loader.DONT_RETRY; } } + /** Processes RTSP messages line-by-line. */ - private static final class RtspMessageBuilder { + private static final class MessageParser { - @IntDef({STATE_READING_FIRST_LINE, STATE_READING_RTSP_HEADER, STATE_READING_RTSP_BODY}) + @IntDef({STATE_READING_FIRST_LINE, STATE_READING_HEADER, STATE_READING_BODY}) @interface ReadingState {} private static final int STATE_READING_FIRST_LINE = 1; - private static final int STATE_READING_RTSP_HEADER = 2; - private static final int STATE_READING_RTSP_BODY = 3; + private static final int STATE_READING_HEADER = 2; + private static final int STATE_READING_BODY = 3; private final List messageLines; @ReadingState private int state; private long messageBodyLength; - private long receivedMessageBodyLength; /** Creates a new instance. */ - public RtspMessageBuilder() { + public MessageParser() { messageLines = new ArrayList<>(); state = STATE_READING_FIRST_LINE; } /** - * Add a line to the builder. + * Receives and parses an entire RTSP message. * - * @param lineBytes The complete RTSP message line in UTF-8 byte array, including CRLF. - * @return A list of completed RTSP message lines, without the CRLF line terminators; or {@code - * null} if the message is not yet complete. + * @param firstByte The first byte received for the RTSP message. + * @param dataInputStream The {@link DataInputStream} on which RTSP messages are received. + * @return An {@link ImmutableList} of the lines that make up an RTSP message. + */ + public ImmutableList parseNext(byte firstByte, DataInputStream dataInputStream) + throws IOException { + @Nullable + ImmutableList parsedMessageLines = + addMessageLine(parseNextLine(firstByte, dataInputStream)); + + while (parsedMessageLines == null) { + if (state == STATE_READING_BODY) { + if (messageBodyLength > 0) { + // Message body's format is not regulated under RTSP, so it could use LF (instead of + // RTSP's CRLF) as line ending. The length of the message body is included in the RTSP + // Content-Length header. + // Assume the message body length is within a 32-bit integer. + int messageBodyLengthInt = Ints.checkedCast(messageBodyLength); + checkState(messageBodyLengthInt != C.LENGTH_UNSET); + byte[] messageBodyBytes = new byte[messageBodyLengthInt]; + dataInputStream.readFully(messageBodyBytes, /* off= */ 0, messageBodyLengthInt); + parsedMessageLines = addMessageBody(messageBodyBytes); + } else { + throw new IllegalStateException("Expects a greater than zero Content-Length."); + } + } else { + parsedMessageLines = + addMessageLine(parseNextLine(dataInputStream.readByte(), dataInputStream)); + } + } + return parsedMessageLines; + } + + /** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */ + private static byte[] parseNextLine(byte firstByte, DataInputStream dataInputStream) + throws IOException { + ByteArrayOutputStream messageByteStream = new ByteArrayOutputStream(); + + byte[] peekedBytes = new byte[2]; + peekedBytes[0] = firstByte; + peekedBytes[1] = dataInputStream.readByte(); + messageByteStream.write(peekedBytes); + + while (peekedBytes[0] != Ascii.CR || peekedBytes[1] != Ascii.LF) { + // Shift the CRLF buffer. + peekedBytes[0] = peekedBytes[1]; + peekedBytes[1] = dataInputStream.readByte(); + messageByteStream.write(peekedBytes[1]); + } + + return messageByteStream.toByteArray(); + } + + /** + * Returns a list of completed RTSP message lines, without the CRLF line terminators; or {@code + * null} if the message is not yet complete. */ @Nullable - public ImmutableList addLine(byte[] lineBytes) throws ParserException { - // Trim CRLF. + private ImmutableList addMessageLine(byte[] lineBytes) throws ParserException { + // Trim CRLF. RTSP lists are terminated by a CRLF. checkArgument( lineBytes.length >= 2 && lineBytes[lineBytes.length - 2] == Ascii.CR @@ -397,11 +427,11 @@ public ImmutableList addLine(byte[] lineBytes) throws ParserException { switch (state) { case STATE_READING_FIRST_LINE: if (isRtspStartLine(line)) { - state = STATE_READING_RTSP_HEADER; + state = STATE_READING_HEADER; } break; - case STATE_READING_RTSP_HEADER: + case STATE_READING_HEADER: // Check if the line contains RTSP Content-Length header. long contentLength = RtspMessageUtil.parseContentLengthHeader(line); if (contentLength != C.LENGTH_UNSET) { @@ -411,7 +441,7 @@ public ImmutableList addLine(byte[] lineBytes) throws ParserException { if (line.isEmpty()) { // An empty line signals the end of the header section. if (messageBodyLength > 0) { - state = STATE_READING_RTSP_BODY; + state = STATE_READING_BODY; } else { ImmutableList linesToReturn = ImmutableList.copyOf(messageLines); reset(); @@ -420,14 +450,8 @@ public ImmutableList addLine(byte[] lineBytes) throws ParserException { } break; - case STATE_READING_RTSP_BODY: - receivedMessageBodyLength += lineBytes.length; - if (receivedMessageBodyLength >= messageBodyLength) { - ImmutableList linesToReturn = ImmutableList.copyOf(messageLines); - reset(); - return linesToReturn; - } - break; + case STATE_READING_BODY: + // Message body must be handled by addMessageBody(). default: throw new IllegalStateException(); @@ -435,11 +459,45 @@ public ImmutableList addLine(byte[] lineBytes) throws ParserException { return null; } + /** Returns a list of completed RTSP message lines, without the line terminators. */ + private ImmutableList addMessageBody(byte[] messageBodyBytes) { + checkState(state == STATE_READING_BODY); + + String messageBody; + if (messageBodyBytes.length > 0 + && messageBodyBytes[messageBodyBytes.length - 1] == Ascii.LF) { + if (messageBodyBytes.length > 1 + && messageBodyBytes[messageBodyBytes.length - 2] == Ascii.CR) { + // Line ends with CRLF. + messageBody = + new String( + messageBodyBytes, + /* offset= */ 0, + /* length= */ messageBodyBytes.length - 2, + CHARSET); + } else { + // Line ends with LF. + messageBody = + new String( + messageBodyBytes, + /* offset= */ 0, + /* length= */ messageBodyBytes.length - 1, + CHARSET); + } + } else { + throw new IllegalArgumentException("Message body is empty or does not end with a LF."); + } + + messageLines.add(messageBody); + ImmutableList linesToReturn = ImmutableList.copyOf(messageLines); + reset(); + return linesToReturn; + } + private void reset() { messageLines.clear(); state = STATE_READING_FIRST_LINE; messageBodyLength = 0; - receivedMessageBodyLength = 0; } } } diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtil.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtil.java index 0127f52f66e..4ef559fdae6 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtil.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtil.java @@ -38,6 +38,7 @@ import com.google.android.exoplayer2.C; import com.google.android.exoplayer2.ParserException; import com.google.android.exoplayer2.util.Util; +import com.google.common.base.Ascii; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; @@ -107,6 +108,8 @@ public RtspAuthUserInfo(String username, String password) { Pattern.compile("Basic realm=\"([\\w\\s@.]+)\""); private static final String RTSP_VERSION = "RTSP/1.0"; + private static final String LF = new String(new byte[] {Ascii.LF}); + private static final String CRLF = new String(new byte[] {Ascii.CR, Ascii.LF}); /** * Serializes an {@link RtspRequest} to an {@link ImmutableList} of strings. @@ -167,7 +170,7 @@ public static ImmutableList serializeResponse(RtspResponse response) { * removed. */ public static byte[] convertMessageToByteArray(List message) { - return Joiner.on("\r\n").join(message).getBytes(RtspMessageChannel.CHARSET); + return Joiner.on(CRLF).join(message).getBytes(RtspMessageChannel.CHARSET); } /** Removes the user info from the supplied {@link Uri}. */ @@ -211,7 +214,7 @@ public static byte[] getStringBytes(String s) { /** Returns the corresponding String representation of the {@link RtspRequest.Method} argument. */ public static String toMethodString(@RtspRequest.Method int method) { switch (method) { - case RtspRequest.METHOD_ANNOUNCE: + case METHOD_ANNOUNCE: return "ANNOUNCE"; case METHOD_DESCRIBE: return "DESCRIBE"; @@ -291,7 +294,7 @@ public static RtspResponse parseResponse(List lines) { List headerLines = lines.subList(1, messageBodyOffset); RtspHeaders headers = new RtspHeaders.Builder().addAll(headerLines).build(); - String messageBody = Joiner.on("\r\n").join(lines.subList(messageBodyOffset + 1, lines.size())); + String messageBody = Joiner.on(CRLF).join(lines.subList(messageBodyOffset + 1, lines.size())); return new RtspResponse(statusCode, headers, messageBody); } @@ -314,7 +317,7 @@ public static RtspRequest parseRequest(List lines) { List headerLines = lines.subList(1, messageBodyOffset); RtspHeaders headers = new RtspHeaders.Builder().addAll(headerLines).build(); - String messageBody = Joiner.on("\r\n").join(lines.subList(messageBodyOffset + 1, lines.size())); + String messageBody = Joiner.on(CRLF).join(lines.subList(messageBodyOffset + 1, lines.size())); return new RtspRequest(requestUri, method, headers, messageBody); } @@ -324,6 +327,11 @@ public static boolean isRtspStartLine(String line) { || STATUS_LINE_PATTERN.matcher(line).matches(); } + /** Returns the lines in an RTSP message body split by the line terminator used in body. */ + public static String[] splitRtspMessageBody(String body) { + return Util.split(body, body.contains(CRLF) ? CRLF : LF); + } + /** * Returns the length in bytes if the line contains a Content-Length header, otherwise {@link * C#LENGTH_UNSET}. diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/SessionDescriptionParser.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/SessionDescriptionParser.java index 57c3794c239..8b91cae609e 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/SessionDescriptionParser.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/SessionDescriptionParser.java @@ -41,8 +41,6 @@ private static final Pattern MEDIA_DESCRIPTION_PATTERN = Pattern.compile("(\\S+)\\s(\\S+)\\s(\\S+)\\s(\\S+)"); - private static final String CRLF = "\r\n"; - private static final String VERSION_TYPE = "v"; private static final String ORIGIN_TYPE = "o"; private static final String SESSION_TYPE = "s"; @@ -71,7 +69,7 @@ public static SessionDescription parse(String sdpString) throws ParserException @Nullable MediaDescription.Builder mediaDescriptionBuilder = null; // Lines are separated by an CRLF. - for (String line : Util.split(sdpString, CRLF)) { + for (String line : RtspMessageUtil.splitRtspMessageBody(sdpString)) { if ("".equals(line)) { continue; } diff --git a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannelTest.java b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannelTest.java index 3d008631f64..a7125d03b1d 100644 --- a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannelTest.java +++ b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannelTest.java @@ -68,11 +68,21 @@ public void rtspMessageChannelReceive_threeRtspMessagesAndTwoInterleavedBinary_p .build(), "v=安卓アンドロイド\r\n"); + RtspResponse describeResponse2 = + new RtspResponse( + 200, + new RtspHeaders.Builder() + .add(RtspHeaders.CSEQ, "4") + .add(RtspHeaders.CONTENT_TYPE, "application/sdp") + .add(RtspHeaders.CONTENT_LENGTH, "73") + .build(), + "v=安卓アンドロイド\n" + "o=test 2890844526 2890842807 IN IP4 127.0.0.1\n"); + RtspResponse setupResponse = new RtspResponse( 200, new RtspHeaders.Builder() - .add(RtspHeaders.CSEQ, "3") + .add(RtspHeaders.CSEQ, "5") .add(RtspHeaders.TRANSPORT, "RTP/AVP/TCP;unicast;interleaved=0-1") .build(), ""); @@ -99,6 +109,8 @@ public void rtspMessageChannelReceive_threeRtspMessagesAndTwoInterleavedBinary_p convertMessageToByteArray(serializeResponse(optionsResponse))); serverOutputStream.write( convertMessageToByteArray(serializeResponse(describeResponse))); + serverOutputStream.write( + convertMessageToByteArray(serializeResponse(describeResponse2))); serverOutputStream.write(Bytes.concat(new byte[] {'$'}, interleavedData1)); serverOutputStream.write(Bytes.concat(new byte[] {'$'}, interleavedData2)); serverOutputStream.write( @@ -120,7 +132,7 @@ public void rtspMessageChannelReceive_threeRtspMessagesAndTwoInterleavedBinary_p new RtspMessageChannel( message -> { receivedRtspResponses.add(message); - if (receivedRtspResponses.size() == 3 && receivedInterleavedData.size() == 2) { + if (receivedRtspResponses.size() == 4 && receivedInterleavedData.size() == 2) { receivingFinished.set(true); } }); @@ -150,9 +162,17 @@ public void rtspMessageChannelReceive_threeRtspMessagesAndTwoInterleavedBinary_p "content-length: 28", "", "v=安卓アンドロイド"), + /* describeResponse2 */ + ImmutableList.of( + "RTSP/1.0 200 OK", + "cseq: 4", + "content-type: application/sdp", + "content-length: 73", + "", + "v=安卓アンドロイド\n" + "o=test 2890844526 2890842807 IN IP4 127.0.0.1"), /* setupResponse */ ImmutableList.of( - "RTSP/1.0 200 OK", "cseq: 3", "transport: RTP/AVP/TCP;unicast;interleaved=0-1", "")) + "RTSP/1.0 200 OK", "cseq: 5", "transport: RTP/AVP/TCP;unicast;interleaved=0-1", "")) .inOrder(); assertThat(receivedInterleavedData) .containsExactly( diff --git a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtilTest.java b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtilTest.java index 85fcc1aa5dc..3454ea3b83e 100644 --- a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtilTest.java +++ b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtilTest.java @@ -482,4 +482,18 @@ public void parseWWWAuthenticateHeader_withDigestAuthentication_succeeds() throw assertThat(authenticationInfo.realm).isEqualTo("LIVE555 Streaming Media"); assertThat(authenticationInfo.opaque).isEmpty(); } + + @Test + public void splitRtspMessageBody_withCrLfLineTerminatorMessageBody_splitsMessageBody() { + String[] lines = RtspMessageUtil.splitRtspMessageBody("line1\r\nline2\r\nline3"); + + assertThat(lines).asList().containsExactly("line1", "line2", "line3").inOrder(); + } + + @Test + public void splitRtspMessageBody_withLfLineTerminatorMessageBody_splitsMessageBody() { + String[] lines = RtspMessageUtil.splitRtspMessageBody("line1\nline2\nline3"); + + assertThat(lines).asList().containsExactly("line1", "line2", "line3").inOrder(); + } }