Skip to content

Commit

Permalink
Allow reading RTSP message body by Content-Length.
Browse files Browse the repository at this point in the history
Related to Issue: #8941.

RTSP message body's format is not regulated by the RTSP spec, meaning it can
use either CRLF or LF as its line terminator. The old code assumes every line
ends with CRLF (RTSP message and the message body); the new code will rely on
the Content-Length information to receive the bytes for the message body.

#minor-release

PiperOrigin-RevId: 377475565
  • Loading branch information
claincly authored and marcbaechinger committed Jun 8, 2021
1 parent b56b769 commit 69f9e23
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 64 deletions.
Expand Up @@ -17,6 +17,7 @@

import static com.google.android.exoplayer2.source.rtsp.RtspMessageUtil.isRtspStartLine;
import static com.google.android.exoplayer2.util.Assertions.checkArgument;
import static com.google.android.exoplayer2.util.Assertions.checkState;
import static com.google.android.exoplayer2.util.Assertions.checkStateNotNull;

import android.os.Handler;
Expand All @@ -31,6 +32,7 @@
import com.google.common.base.Ascii;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
Expand Down Expand Up @@ -251,10 +253,10 @@ public void close() {
private final class Receiver implements Loadable {

/** ASCII dollar encapsulates the RTP packets in interleaved mode (RFC2326 Section 10.12). */
private static final byte RTSP_INTERLEAVED_MESSAGE_MARKER = '$';
private static final byte INTERLEAVED_MESSAGE_MARKER = '$';

private final DataInputStream dataInputStream;
private final RtspMessageBuilder messageBuilder;
private final MessageParser messageParser;
private volatile boolean loadCanceled;

/**
Expand All @@ -266,7 +268,7 @@ private final class Receiver implements Loadable {
*/
public Receiver(InputStream inputStream) {
dataInputStream = new DataInputStream(inputStream);
messageBuilder = new RtspMessageBuilder();
messageParser = new MessageParser();
}

@Override
Expand All @@ -279,7 +281,7 @@ public void load() throws IOException {
while (!loadCanceled) {
// TODO(internal b/172331505) Use a buffered read.
byte firstByte = dataInputStream.readByte();
if (firstByte == RTSP_INTERLEAVED_MESSAGE_MARKER) {
if (firstByte == INTERLEAVED_MESSAGE_MARKER) {
handleInterleavedBinaryData();
} else {
handleRtspMessage(firstByte);
Expand All @@ -289,36 +291,11 @@ public void load() throws IOException {

/** Handles an entire RTSP message. */
private void handleRtspMessage(byte firstByte) throws IOException {
@Nullable
ImmutableList<String> messageLines = messageBuilder.addLine(handleRtspMessageLine(firstByte));
while (messageLines == null) {
messageLines = messageBuilder.addLine(handleRtspMessageLine(dataInputStream.readByte()));
}

if (!closed) {
messageListener.onRtspMessageReceived(messageLines);
messageListener.onRtspMessageReceived(messageParser.parseNext(firstByte, dataInputStream));
}
}

/** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */
private byte[] handleRtspMessageLine(byte firstByte) throws IOException {
ByteArrayOutputStream messageByteStream = new ByteArrayOutputStream();

byte[] peekedBytes = new byte[2];
peekedBytes[0] = firstByte;
peekedBytes[1] = dataInputStream.readByte();
messageByteStream.write(peekedBytes);

while (peekedBytes[0] != Ascii.CR || peekedBytes[1] != Ascii.LF) {
// Shift the CRLF buffer.
peekedBytes[0] = peekedBytes[1];
peekedBytes[1] = dataInputStream.readByte();
messageByteStream.write(peekedBytes[1]);
}

return messageByteStream.toByteArray();
}

private void handleInterleavedBinaryData() throws IOException {
int channel = dataInputStream.readUnsignedByte();
int size = dataInputStream.readUnsignedShort();
Expand Down Expand Up @@ -354,38 +331,91 @@ public LoadErrorAction onLoadError(
return Loader.DONT_RETRY;
}
}

/** Processes RTSP messages line-by-line. */
private static final class RtspMessageBuilder {
private static final class MessageParser {

@IntDef({STATE_READING_FIRST_LINE, STATE_READING_RTSP_HEADER, STATE_READING_RTSP_BODY})
@IntDef({STATE_READING_FIRST_LINE, STATE_READING_HEADER, STATE_READING_BODY})
@interface ReadingState {}

private static final int STATE_READING_FIRST_LINE = 1;
private static final int STATE_READING_RTSP_HEADER = 2;
private static final int STATE_READING_RTSP_BODY = 3;
private static final int STATE_READING_HEADER = 2;
private static final int STATE_READING_BODY = 3;

private final List<String> messageLines;

@ReadingState private int state;
private long messageBodyLength;
private long receivedMessageBodyLength;

/** Creates a new instance. */
public RtspMessageBuilder() {
public MessageParser() {
messageLines = new ArrayList<>();
state = STATE_READING_FIRST_LINE;
}

/**
* Add a line to the builder.
* Receives and parses an entire RTSP message.
*
* @param lineBytes The complete RTSP message line in UTF-8 byte array, including CRLF.
* @return A list of completed RTSP message lines, without the CRLF line terminators; or {@code
* null} if the message is not yet complete.
* @param firstByte The first byte received for the RTSP message.
* @param dataInputStream The {@link DataInputStream} on which RTSP messages are received.
* @return An {@link ImmutableList} of the lines that make up an RTSP message.
*/
public ImmutableList<String> parseNext(byte firstByte, DataInputStream dataInputStream)
throws IOException {
@Nullable
ImmutableList<String> parsedMessageLines =
addMessageLine(parseNextLine(firstByte, dataInputStream));

while (parsedMessageLines == null) {
if (state == STATE_READING_BODY) {
if (messageBodyLength > 0) {
// Message body's format is not regulated under RTSP, so it could use LF (instead of
// RTSP's CRLF) as line ending. The length of the message body is included in the RTSP
// Content-Length header.
// Assume the message body length is within a 32-bit integer.
int messageBodyLengthInt = Ints.checkedCast(messageBodyLength);
checkState(messageBodyLengthInt != C.LENGTH_UNSET);
byte[] messageBodyBytes = new byte[messageBodyLengthInt];
dataInputStream.readFully(messageBodyBytes, /* off= */ 0, messageBodyLengthInt);
parsedMessageLines = addMessageBody(messageBodyBytes);
} else {
throw new IllegalStateException("Expects a greater than zero Content-Length.");
}
} else {
parsedMessageLines =
addMessageLine(parseNextLine(dataInputStream.readByte(), dataInputStream));
}
}
return parsedMessageLines;
}

/** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */
private static byte[] parseNextLine(byte firstByte, DataInputStream dataInputStream)
throws IOException {
ByteArrayOutputStream messageByteStream = new ByteArrayOutputStream();

byte[] peekedBytes = new byte[2];
peekedBytes[0] = firstByte;
peekedBytes[1] = dataInputStream.readByte();
messageByteStream.write(peekedBytes);

while (peekedBytes[0] != Ascii.CR || peekedBytes[1] != Ascii.LF) {
// Shift the CRLF buffer.
peekedBytes[0] = peekedBytes[1];
peekedBytes[1] = dataInputStream.readByte();
messageByteStream.write(peekedBytes[1]);
}

return messageByteStream.toByteArray();
}

/**
* Returns a list of completed RTSP message lines, without the CRLF line terminators; or {@code
* null} if the message is not yet complete.
*/
@Nullable
public ImmutableList<String> addLine(byte[] lineBytes) throws ParserException {
// Trim CRLF.
private ImmutableList<String> addMessageLine(byte[] lineBytes) throws ParserException {
// Trim CRLF. RTSP lists are terminated by a CRLF.
checkArgument(
lineBytes.length >= 2
&& lineBytes[lineBytes.length - 2] == Ascii.CR
Expand All @@ -397,11 +427,11 @@ public ImmutableList<String> addLine(byte[] lineBytes) throws ParserException {
switch (state) {
case STATE_READING_FIRST_LINE:
if (isRtspStartLine(line)) {
state = STATE_READING_RTSP_HEADER;
state = STATE_READING_HEADER;
}
break;

case STATE_READING_RTSP_HEADER:
case STATE_READING_HEADER:
// Check if the line contains RTSP Content-Length header.
long contentLength = RtspMessageUtil.parseContentLengthHeader(line);
if (contentLength != C.LENGTH_UNSET) {
Expand All @@ -411,7 +441,7 @@ public ImmutableList<String> addLine(byte[] lineBytes) throws ParserException {
if (line.isEmpty()) {
// An empty line signals the end of the header section.
if (messageBodyLength > 0) {
state = STATE_READING_RTSP_BODY;
state = STATE_READING_BODY;
} else {
ImmutableList<String> linesToReturn = ImmutableList.copyOf(messageLines);
reset();
Expand All @@ -420,26 +450,54 @@ public ImmutableList<String> addLine(byte[] lineBytes) throws ParserException {
}
break;

case STATE_READING_RTSP_BODY:
receivedMessageBodyLength += lineBytes.length;
if (receivedMessageBodyLength >= messageBodyLength) {
ImmutableList<String> linesToReturn = ImmutableList.copyOf(messageLines);
reset();
return linesToReturn;
}
break;
case STATE_READING_BODY:
// Message body must be handled by addMessageBody().

default:
throw new IllegalStateException();
}
return null;
}

/** Returns a list of completed RTSP message lines, without the line terminators. */
private ImmutableList<String> addMessageBody(byte[] messageBodyBytes) {
checkState(state == STATE_READING_BODY);

String messageBody;
if (messageBodyBytes.length > 0
&& messageBodyBytes[messageBodyBytes.length - 1] == Ascii.LF) {
if (messageBodyBytes.length > 1
&& messageBodyBytes[messageBodyBytes.length - 2] == Ascii.CR) {
// Line ends with CRLF.
messageBody =
new String(
messageBodyBytes,
/* offset= */ 0,
/* length= */ messageBodyBytes.length - 2,
CHARSET);
} else {
// Line ends with LF.
messageBody =
new String(
messageBodyBytes,
/* offset= */ 0,
/* length= */ messageBodyBytes.length - 1,
CHARSET);
}
} else {
throw new IllegalArgumentException("Message body is empty or does not end with a LF.");
}

messageLines.add(messageBody);
ImmutableList<String> linesToReturn = ImmutableList.copyOf(messageLines);
reset();
return linesToReturn;
}

private void reset() {
messageLines.clear();
state = STATE_READING_FIRST_LINE;
messageBodyLength = 0;
receivedMessageBodyLength = 0;
}
}
}
Expand Up @@ -38,6 +38,7 @@
import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.ParserException;
import com.google.android.exoplayer2.util.Util;
import com.google.common.base.Ascii;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
Expand Down Expand Up @@ -107,6 +108,8 @@ public RtspAuthUserInfo(String username, String password) {
Pattern.compile("Basic realm=\"([\\w\\s@.]+)\"");

private static final String RTSP_VERSION = "RTSP/1.0";
private static final String LF = new String(new byte[] {Ascii.LF});
private static final String CRLF = new String(new byte[] {Ascii.CR, Ascii.LF});

/**
* Serializes an {@link RtspRequest} to an {@link ImmutableList} of strings.
Expand Down Expand Up @@ -167,7 +170,7 @@ public static ImmutableList<String> serializeResponse(RtspResponse response) {
* removed.
*/
public static byte[] convertMessageToByteArray(List<String> message) {
return Joiner.on("\r\n").join(message).getBytes(RtspMessageChannel.CHARSET);
return Joiner.on(CRLF).join(message).getBytes(RtspMessageChannel.CHARSET);
}

/** Removes the user info from the supplied {@link Uri}. */
Expand Down Expand Up @@ -211,7 +214,7 @@ public static byte[] getStringBytes(String s) {
/** Returns the corresponding String representation of the {@link RtspRequest.Method} argument. */
public static String toMethodString(@RtspRequest.Method int method) {
switch (method) {
case RtspRequest.METHOD_ANNOUNCE:
case METHOD_ANNOUNCE:
return "ANNOUNCE";
case METHOD_DESCRIBE:
return "DESCRIBE";
Expand Down Expand Up @@ -291,7 +294,7 @@ public static RtspResponse parseResponse(List<String> lines) {
List<String> headerLines = lines.subList(1, messageBodyOffset);
RtspHeaders headers = new RtspHeaders.Builder().addAll(headerLines).build();

String messageBody = Joiner.on("\r\n").join(lines.subList(messageBodyOffset + 1, lines.size()));
String messageBody = Joiner.on(CRLF).join(lines.subList(messageBodyOffset + 1, lines.size()));
return new RtspResponse(statusCode, headers, messageBody);
}

Expand All @@ -314,7 +317,7 @@ public static RtspRequest parseRequest(List<String> lines) {
List<String> headerLines = lines.subList(1, messageBodyOffset);
RtspHeaders headers = new RtspHeaders.Builder().addAll(headerLines).build();

String messageBody = Joiner.on("\r\n").join(lines.subList(messageBodyOffset + 1, lines.size()));
String messageBody = Joiner.on(CRLF).join(lines.subList(messageBodyOffset + 1, lines.size()));
return new RtspRequest(requestUri, method, headers, messageBody);
}

Expand All @@ -324,6 +327,11 @@ public static boolean isRtspStartLine(String line) {
|| STATUS_LINE_PATTERN.matcher(line).matches();
}

/** Returns the lines in an RTSP message body split by the line terminator used in body. */
public static String[] splitRtspMessageBody(String body) {
return Util.split(body, body.contains(CRLF) ? CRLF : LF);
}

/**
* Returns the length in bytes if the line contains a Content-Length header, otherwise {@link
* C#LENGTH_UNSET}.
Expand Down
Expand Up @@ -41,8 +41,6 @@
private static final Pattern MEDIA_DESCRIPTION_PATTERN =
Pattern.compile("(\\S+)\\s(\\S+)\\s(\\S+)\\s(\\S+)");

private static final String CRLF = "\r\n";

private static final String VERSION_TYPE = "v";
private static final String ORIGIN_TYPE = "o";
private static final String SESSION_TYPE = "s";
Expand Down Expand Up @@ -71,7 +69,7 @@ public static SessionDescription parse(String sdpString) throws ParserException
@Nullable MediaDescription.Builder mediaDescriptionBuilder = null;

// Lines are separated by an CRLF.
for (String line : Util.split(sdpString, CRLF)) {
for (String line : RtspMessageUtil.splitRtspMessageBody(sdpString)) {
if ("".equals(line)) {
continue;
}
Expand Down

0 comments on commit 69f9e23

Please sign in to comment.