diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java index ab8d7760337..d45af667dcb 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java @@ -17,6 +17,7 @@ import static com.google.android.exoplayer2.source.rtsp.RtspMessageUtil.isRtspStartLine; import static com.google.android.exoplayer2.util.Assertions.checkArgument; +import static com.google.android.exoplayer2.util.Assertions.checkState; import static com.google.android.exoplayer2.util.Assertions.checkStateNotNull; import android.os.Handler; @@ -31,6 +32,7 @@ import com.google.common.base.Ascii; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.DataInputStream; @@ -251,10 +253,10 @@ public void close() { private final class Receiver implements Loadable { /** ASCII dollar encapsulates the RTP packets in interleaved mode (RFC2326 Section 10.12). */ - private static final byte RTSP_INTERLEAVED_MESSAGE_MARKER = '$'; + private static final byte INTERLEAVED_MESSAGE_MARKER = '$'; private final DataInputStream dataInputStream; - private final RtspMessageBuilder messageBuilder; + private final MessageParser messageParser; private volatile boolean loadCanceled; /** @@ -266,7 +268,7 @@ private final class Receiver implements Loadable { */ public Receiver(InputStream inputStream) { dataInputStream = new DataInputStream(inputStream); - messageBuilder = new RtspMessageBuilder(); + messageParser = new MessageParser(); } @Override @@ -279,7 +281,7 @@ public void load() throws IOException { while (!loadCanceled) { // TODO(internal b/172331505) Use a buffered read. byte firstByte = dataInputStream.readByte(); - if (firstByte == RTSP_INTERLEAVED_MESSAGE_MARKER) { + if (firstByte == INTERLEAVED_MESSAGE_MARKER) { handleInterleavedBinaryData(); } else { handleRtspMessage(firstByte); @@ -289,36 +291,11 @@ public void load() throws IOException { /** Handles an entire RTSP message. */ private void handleRtspMessage(byte firstByte) throws IOException { - @Nullable - ImmutableList messageLines = messageBuilder.addLine(handleRtspMessageLine(firstByte)); - while (messageLines == null) { - messageLines = messageBuilder.addLine(handleRtspMessageLine(dataInputStream.readByte())); - } - if (!closed) { - messageListener.onRtspMessageReceived(messageLines); + messageListener.onRtspMessageReceived(messageParser.parseNext(firstByte, dataInputStream)); } } - /** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */ - private byte[] handleRtspMessageLine(byte firstByte) throws IOException { - ByteArrayOutputStream messageByteStream = new ByteArrayOutputStream(); - - byte[] peekedBytes = new byte[2]; - peekedBytes[0] = firstByte; - peekedBytes[1] = dataInputStream.readByte(); - messageByteStream.write(peekedBytes); - - while (peekedBytes[0] != Ascii.CR || peekedBytes[1] != Ascii.LF) { - // Shift the CRLF buffer. - peekedBytes[0] = peekedBytes[1]; - peekedBytes[1] = dataInputStream.readByte(); - messageByteStream.write(peekedBytes[1]); - } - - return messageByteStream.toByteArray(); - } - private void handleInterleavedBinaryData() throws IOException { int channel = dataInputStream.readUnsignedByte(); int size = dataInputStream.readUnsignedShort(); @@ -354,38 +331,91 @@ public LoadErrorAction onLoadError( return Loader.DONT_RETRY; } } + /** Processes RTSP messages line-by-line. */ - private static final class RtspMessageBuilder { + private static final class MessageParser { - @IntDef({STATE_READING_FIRST_LINE, STATE_READING_RTSP_HEADER, STATE_READING_RTSP_BODY}) + @IntDef({STATE_READING_FIRST_LINE, STATE_READING_HEADER, STATE_READING_BODY}) @interface ReadingState {} private static final int STATE_READING_FIRST_LINE = 1; - private static final int STATE_READING_RTSP_HEADER = 2; - private static final int STATE_READING_RTSP_BODY = 3; + private static final int STATE_READING_HEADER = 2; + private static final int STATE_READING_BODY = 3; private final List messageLines; @ReadingState private int state; private long messageBodyLength; - private long receivedMessageBodyLength; /** Creates a new instance. */ - public RtspMessageBuilder() { + public MessageParser() { messageLines = new ArrayList<>(); state = STATE_READING_FIRST_LINE; } /** - * Add a line to the builder. + * Receives and parses an entire RTSP message. * - * @param lineBytes The complete RTSP message line in UTF-8 byte array, including CRLF. - * @return A list of completed RTSP message lines, without the CRLF line terminators; or {@code - * null} if the message is not yet complete. + * @param firstByte The first byte received for the RTSP message. + * @param dataInputStream The {@link DataInputStream} on which RTSP messages are received. + * @return An {@link ImmutableList} of the lines that make up an RTSP message. + */ + public ImmutableList parseNext(byte firstByte, DataInputStream dataInputStream) + throws IOException { + @Nullable + ImmutableList parsedMessageLines = + addMessageLine(parseNextLine(firstByte, dataInputStream)); + + while (parsedMessageLines == null) { + if (state == STATE_READING_BODY) { + if (messageBodyLength > 0) { + // Message body's format is not regulated under RTSP, so it could use LF (instead of + // RTSP's CRLF) as line ending. The length of the message body is included in the RTSP + // Content-Length header. + // Assume the message body length is within a 32-bit integer. + int messageBodyLengthInt = Ints.checkedCast(messageBodyLength); + checkState(messageBodyLengthInt != C.LENGTH_UNSET); + byte[] messageBodyBytes = new byte[messageBodyLengthInt]; + dataInputStream.readFully(messageBodyBytes, /* off= */ 0, messageBodyLengthInt); + parsedMessageLines = addMessageBody(messageBodyBytes); + } else { + throw new IllegalStateException("Expects a greater than zero Content-Length."); + } + } else { + parsedMessageLines = + addMessageLine(parseNextLine(dataInputStream.readByte(), dataInputStream)); + } + } + return parsedMessageLines; + } + + /** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */ + private static byte[] parseNextLine(byte firstByte, DataInputStream dataInputStream) + throws IOException { + ByteArrayOutputStream messageByteStream = new ByteArrayOutputStream(); + + byte[] peekedBytes = new byte[2]; + peekedBytes[0] = firstByte; + peekedBytes[1] = dataInputStream.readByte(); + messageByteStream.write(peekedBytes); + + while (peekedBytes[0] != Ascii.CR || peekedBytes[1] != Ascii.LF) { + // Shift the CRLF buffer. + peekedBytes[0] = peekedBytes[1]; + peekedBytes[1] = dataInputStream.readByte(); + messageByteStream.write(peekedBytes[1]); + } + + return messageByteStream.toByteArray(); + } + + /** + * Returns a list of completed RTSP message lines, without the CRLF line terminators; or {@code + * null} if the message is not yet complete. */ @Nullable - public ImmutableList addLine(byte[] lineBytes) throws ParserException { - // Trim CRLF. + private ImmutableList addMessageLine(byte[] lineBytes) throws ParserException { + // Trim CRLF. RTSP lists are terminated by a CRLF. checkArgument( lineBytes.length >= 2 && lineBytes[lineBytes.length - 2] == Ascii.CR @@ -397,11 +427,11 @@ public ImmutableList addLine(byte[] lineBytes) throws ParserException { switch (state) { case STATE_READING_FIRST_LINE: if (isRtspStartLine(line)) { - state = STATE_READING_RTSP_HEADER; + state = STATE_READING_HEADER; } break; - case STATE_READING_RTSP_HEADER: + case STATE_READING_HEADER: // Check if the line contains RTSP Content-Length header. long contentLength = RtspMessageUtil.parseContentLengthHeader(line); if (contentLength != C.LENGTH_UNSET) { @@ -411,7 +441,7 @@ public ImmutableList addLine(byte[] lineBytes) throws ParserException { if (line.isEmpty()) { // An empty line signals the end of the header section. if (messageBodyLength > 0) { - state = STATE_READING_RTSP_BODY; + state = STATE_READING_BODY; } else { ImmutableList linesToReturn = ImmutableList.copyOf(messageLines); reset(); @@ -420,14 +450,8 @@ public ImmutableList addLine(byte[] lineBytes) throws ParserException { } break; - case STATE_READING_RTSP_BODY: - receivedMessageBodyLength += lineBytes.length; - if (receivedMessageBodyLength >= messageBodyLength) { - ImmutableList linesToReturn = ImmutableList.copyOf(messageLines); - reset(); - return linesToReturn; - } - break; + case STATE_READING_BODY: + // Message body must be handled by addMessageBody(). default: throw new IllegalStateException(); @@ -435,11 +459,45 @@ public ImmutableList addLine(byte[] lineBytes) throws ParserException { return null; } + /** Returns a list of completed RTSP message lines, without the line terminators. */ + private ImmutableList addMessageBody(byte[] messageBodyBytes) { + checkState(state == STATE_READING_BODY); + + String messageBody; + if (messageBodyBytes.length > 0 + && messageBodyBytes[messageBodyBytes.length - 1] == Ascii.LF) { + if (messageBodyBytes.length > 1 + && messageBodyBytes[messageBodyBytes.length - 2] == Ascii.CR) { + // Line ends with CRLF. + messageBody = + new String( + messageBodyBytes, + /* offset= */ 0, + /* length= */ messageBodyBytes.length - 2, + CHARSET); + } else { + // Line ends with LF. + messageBody = + new String( + messageBodyBytes, + /* offset= */ 0, + /* length= */ messageBodyBytes.length - 1, + CHARSET); + } + } else { + throw new IllegalArgumentException("Message body is empty or does not end with a LF."); + } + + messageLines.add(messageBody); + ImmutableList linesToReturn = ImmutableList.copyOf(messageLines); + reset(); + return linesToReturn; + } + private void reset() { messageLines.clear(); state = STATE_READING_FIRST_LINE; messageBodyLength = 0; - receivedMessageBodyLength = 0; } } } diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtil.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtil.java index 0127f52f66e..4ef559fdae6 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtil.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtil.java @@ -38,6 +38,7 @@ import com.google.android.exoplayer2.C; import com.google.android.exoplayer2.ParserException; import com.google.android.exoplayer2.util.Util; +import com.google.common.base.Ascii; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; @@ -107,6 +108,8 @@ public RtspAuthUserInfo(String username, String password) { Pattern.compile("Basic realm=\"([\\w\\s@.]+)\""); private static final String RTSP_VERSION = "RTSP/1.0"; + private static final String LF = new String(new byte[] {Ascii.LF}); + private static final String CRLF = new String(new byte[] {Ascii.CR, Ascii.LF}); /** * Serializes an {@link RtspRequest} to an {@link ImmutableList} of strings. @@ -167,7 +170,7 @@ public static ImmutableList serializeResponse(RtspResponse response) { * removed. */ public static byte[] convertMessageToByteArray(List message) { - return Joiner.on("\r\n").join(message).getBytes(RtspMessageChannel.CHARSET); + return Joiner.on(CRLF).join(message).getBytes(RtspMessageChannel.CHARSET); } /** Removes the user info from the supplied {@link Uri}. */ @@ -211,7 +214,7 @@ public static byte[] getStringBytes(String s) { /** Returns the corresponding String representation of the {@link RtspRequest.Method} argument. */ public static String toMethodString(@RtspRequest.Method int method) { switch (method) { - case RtspRequest.METHOD_ANNOUNCE: + case METHOD_ANNOUNCE: return "ANNOUNCE"; case METHOD_DESCRIBE: return "DESCRIBE"; @@ -291,7 +294,7 @@ public static RtspResponse parseResponse(List lines) { List headerLines = lines.subList(1, messageBodyOffset); RtspHeaders headers = new RtspHeaders.Builder().addAll(headerLines).build(); - String messageBody = Joiner.on("\r\n").join(lines.subList(messageBodyOffset + 1, lines.size())); + String messageBody = Joiner.on(CRLF).join(lines.subList(messageBodyOffset + 1, lines.size())); return new RtspResponse(statusCode, headers, messageBody); } @@ -314,7 +317,7 @@ public static RtspRequest parseRequest(List lines) { List headerLines = lines.subList(1, messageBodyOffset); RtspHeaders headers = new RtspHeaders.Builder().addAll(headerLines).build(); - String messageBody = Joiner.on("\r\n").join(lines.subList(messageBodyOffset + 1, lines.size())); + String messageBody = Joiner.on(CRLF).join(lines.subList(messageBodyOffset + 1, lines.size())); return new RtspRequest(requestUri, method, headers, messageBody); } @@ -324,6 +327,11 @@ public static boolean isRtspStartLine(String line) { || STATUS_LINE_PATTERN.matcher(line).matches(); } + /** Returns the lines in an RTSP message body split by the line terminator used in body. */ + public static String[] splitRtspMessageBody(String body) { + return Util.split(body, body.contains(CRLF) ? CRLF : LF); + } + /** * Returns the length in bytes if the line contains a Content-Length header, otherwise {@link * C#LENGTH_UNSET}. diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/SessionDescriptionParser.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/SessionDescriptionParser.java index 57c3794c239..8b91cae609e 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/SessionDescriptionParser.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/SessionDescriptionParser.java @@ -41,8 +41,6 @@ private static final Pattern MEDIA_DESCRIPTION_PATTERN = Pattern.compile("(\\S+)\\s(\\S+)\\s(\\S+)\\s(\\S+)"); - private static final String CRLF = "\r\n"; - private static final String VERSION_TYPE = "v"; private static final String ORIGIN_TYPE = "o"; private static final String SESSION_TYPE = "s"; @@ -71,7 +69,7 @@ public static SessionDescription parse(String sdpString) throws ParserException @Nullable MediaDescription.Builder mediaDescriptionBuilder = null; // Lines are separated by an CRLF. - for (String line : Util.split(sdpString, CRLF)) { + for (String line : RtspMessageUtil.splitRtspMessageBody(sdpString)) { if ("".equals(line)) { continue; } diff --git a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannelTest.java b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannelTest.java index 3d008631f64..a7125d03b1d 100644 --- a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannelTest.java +++ b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannelTest.java @@ -68,11 +68,21 @@ public void rtspMessageChannelReceive_threeRtspMessagesAndTwoInterleavedBinary_p .build(), "v=安卓アンドロイド\r\n"); + RtspResponse describeResponse2 = + new RtspResponse( + 200, + new RtspHeaders.Builder() + .add(RtspHeaders.CSEQ, "4") + .add(RtspHeaders.CONTENT_TYPE, "application/sdp") + .add(RtspHeaders.CONTENT_LENGTH, "73") + .build(), + "v=安卓アンドロイド\n" + "o=test 2890844526 2890842807 IN IP4 127.0.0.1\n"); + RtspResponse setupResponse = new RtspResponse( 200, new RtspHeaders.Builder() - .add(RtspHeaders.CSEQ, "3") + .add(RtspHeaders.CSEQ, "5") .add(RtspHeaders.TRANSPORT, "RTP/AVP/TCP;unicast;interleaved=0-1") .build(), ""); @@ -99,6 +109,8 @@ public void rtspMessageChannelReceive_threeRtspMessagesAndTwoInterleavedBinary_p convertMessageToByteArray(serializeResponse(optionsResponse))); serverOutputStream.write( convertMessageToByteArray(serializeResponse(describeResponse))); + serverOutputStream.write( + convertMessageToByteArray(serializeResponse(describeResponse2))); serverOutputStream.write(Bytes.concat(new byte[] {'$'}, interleavedData1)); serverOutputStream.write(Bytes.concat(new byte[] {'$'}, interleavedData2)); serverOutputStream.write( @@ -120,7 +132,7 @@ public void rtspMessageChannelReceive_threeRtspMessagesAndTwoInterleavedBinary_p new RtspMessageChannel( message -> { receivedRtspResponses.add(message); - if (receivedRtspResponses.size() == 3 && receivedInterleavedData.size() == 2) { + if (receivedRtspResponses.size() == 4 && receivedInterleavedData.size() == 2) { receivingFinished.set(true); } }); @@ -150,9 +162,17 @@ public void rtspMessageChannelReceive_threeRtspMessagesAndTwoInterleavedBinary_p "content-length: 28", "", "v=安卓アンドロイド"), + /* describeResponse2 */ + ImmutableList.of( + "RTSP/1.0 200 OK", + "cseq: 4", + "content-type: application/sdp", + "content-length: 73", + "", + "v=安卓アンドロイド\n" + "o=test 2890844526 2890842807 IN IP4 127.0.0.1"), /* setupResponse */ ImmutableList.of( - "RTSP/1.0 200 OK", "cseq: 3", "transport: RTP/AVP/TCP;unicast;interleaved=0-1", "")) + "RTSP/1.0 200 OK", "cseq: 5", "transport: RTP/AVP/TCP;unicast;interleaved=0-1", "")) .inOrder(); assertThat(receivedInterleavedData) .containsExactly( diff --git a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtilTest.java b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtilTest.java index 85fcc1aa5dc..3454ea3b83e 100644 --- a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtilTest.java +++ b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtilTest.java @@ -482,4 +482,18 @@ public void parseWWWAuthenticateHeader_withDigestAuthentication_succeeds() throw assertThat(authenticationInfo.realm).isEqualTo("LIVE555 Streaming Media"); assertThat(authenticationInfo.opaque).isEmpty(); } + + @Test + public void splitRtspMessageBody_withCrLfLineTerminatorMessageBody_splitsMessageBody() { + String[] lines = RtspMessageUtil.splitRtspMessageBody("line1\r\nline2\r\nline3"); + + assertThat(lines).asList().containsExactly("line1", "line2", "line3").inOrder(); + } + + @Test + public void splitRtspMessageBody_withLfLineTerminatorMessageBody_splitsMessageBody() { + String[] lines = RtspMessageUtil.splitRtspMessageBody("line1\nline2\nline3"); + + assertThat(lines).asList().containsExactly("line1", "line2", "line3").inOrder(); + } }