diff --git a/library/core/src/main/java/com/google/android/exoplayer2/extractor/ts/TsExtractor.java b/library/core/src/main/java/com/google/android/exoplayer2/extractor/ts/TsExtractor.java index 7b63ce8..5f39e2f 100644 --- a/library/core/src/main/java/com/google/android/exoplayer2/extractor/ts/TsExtractor.java +++ b/library/core/src/main/java/com/google/android/exoplayer2/extractor/ts/TsExtractor.java @@ -16,6 +16,7 @@ package com.google.android.exoplayer2.extractor.ts; import android.support.annotation.IntDef; +import android.util.Log; import android.util.SparseArray; import android.util.SparseBooleanArray; import android.util.SparseIntArray; @@ -43,11 +44,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; /** * Facilitates the extraction of data from the MPEG-2 TS container format. */ public final class TsExtractor implements Extractor { + private static final String TAG = "TsExtractor"; /** * Factory for {@link TsExtractor} instances. @@ -105,7 +108,7 @@ public final class TsExtractor implements Extractor { private static final long E_AC3_FORMAT_IDENTIFIER = Util.getIntegerCodeForString("EAC3"); private static final long HEVC_FORMAT_IDENTIFIER = Util.getIntegerCodeForString("HEVC"); - private static final int BUFFER_PACKET_COUNT = 5; // Should be at least 2 + private static final int BUFFER_PACKET_COUNT = 50; // Should be at least 2 private static final int BUFFER_SIZE = TS_PACKET_SIZE * BUFFER_PACKET_COUNT; @Mode private final int mode; @@ -122,6 +125,9 @@ public final class TsExtractor implements Extractor { private int remainingPmts; private boolean tracksEnded; private TsPayloadReader id3Reader; + private final ArrayBlockingQueue tsPacketBlockingQueue; + private boolean isThreadInited = false; + private Thread parserThread; public TsExtractor() { this(0); @@ -168,6 +174,7 @@ public final class TsExtractor implements Extractor { trackIds = new SparseBooleanArray(); tsPayloadReaders = new SparseArray<>(); continuityCounters = new SparseIntArray(); + tsPacketBlockingQueue = new ArrayBlockingQueue<>(BUFFER_PACKET_COUNT * 100); resetPayloadReaders(); } @@ -217,17 +224,24 @@ public final class TsExtractor implements Extractor { @Override public int read(ExtractorInput input, PositionHolder seekPosition) throws IOException, InterruptedException { + if (!isThreadInited) { + parserThread = new Thread(new TsParser(tsPacketBlockingQueue)); + parserThread.setName("parserThread"); + parserThread.start(); + isThreadInited = true; + } + byte[] data = tsPacketBuffer.data; // Shift bytes to the start of the buffer if there isn't enough space left at the end - if (BUFFER_SIZE - tsPacketBuffer.getPosition() < TS_PACKET_SIZE) { + if (BUFFER_SIZE - tsPacketBuffer.getPosition() < BUFFER_PACKET_COUNT * TS_PACKET_SIZE) { int bytesLeft = tsPacketBuffer.bytesLeft(); if (bytesLeft > 0) { System.arraycopy(data, tsPacketBuffer.getPosition(), data, 0, bytesLeft); } tsPacketBuffer.reset(data, bytesLeft); } - // Read more bytes until there is at least one packet size - while (tsPacketBuffer.bytesLeft() < TS_PACKET_SIZE) { + // Read more bytes until there is at least one * BUFFER_PACKET_COUNT packet size + while (tsPacketBuffer.bytesLeft() < BUFFER_PACKET_COUNT * TS_PACKET_SIZE) { int limit = tsPacketBuffer.limit(); int read = input.read(data, limit, BUFFER_SIZE - limit); if (read == C.RESULT_END_OF_INPUT) { @@ -236,20 +250,30 @@ public final class TsExtractor implements Extractor { tsPacketBuffer.setLimit(limit + read); } - // Note: see ISO/IEC 13818-1, section 2.4.3.2 for detailed information on the format of - // the header. final int limit = tsPacketBuffer.limit(); + ParsableByteArray packetGroup = new ParsableByteArray(BUFFER_PACKET_COUNT * TS_PACKET_SIZE); int position = tsPacketBuffer.getPosition(); while (position < limit && data[position] != TS_SYNC_BYTE) { position++; } tsPacketBuffer.setPosition(position); - int endOfPacket = position + TS_PACKET_SIZE; + int endOfPacket = position + BUFFER_PACKET_COUNT * TS_PACKET_SIZE; if (endOfPacket > limit) { return RESULT_CONTINUE; } + System.arraycopy(data, tsPacketBuffer.getPosition(), packetGroup.data, 0, BUFFER_PACKET_COUNT * TS_PACKET_SIZE); + packetGroup.setPosition(0); + packetGroup.setLimit(BUFFER_PACKET_COUNT * TS_PACKET_SIZE); + tsPacketBlockingQueue.put(packetGroup); + tsPacketBuffer.setPosition(endOfPacket); + return RESULT_CONTINUE; + } + + private int easyParse(ParsableByteArray tsPacketBuffer, int endOfPacket) { + // Note: see ISO/IEC 13818-1, section 2.4.3.2 for detailed information on the format of + // the header. tsPacketBuffer.skipBytes(1); tsPacketBuffer.readBytes(tsScratch, 3); if (tsScratch.readBit()) { // transport_error_indicator @@ -297,7 +321,7 @@ public final class TsExtractor implements Extractor { tsPacketBuffer.setLimit(endOfPacket); payloadReader.consume(tsPacketBuffer, payloadUnitStartIndicator); Assertions.checkState(tsPacketBuffer.getPosition() <= endOfPacket); - tsPacketBuffer.setLimit(limit); + tsPacketBuffer.setLimit(endOfPacket); } } @@ -558,5 +582,36 @@ public final class TsExtractor implements Extractor { } + class TsParser implements Runnable{ + + private final ArrayBlockingQueue sharedQueue; + + public TsParser (ArrayBlockingQueue sharedQueue) { + this.sharedQueue = sharedQueue; + } + + @Override + public void run() { + while(true){ + try { + ParsableByteArray parsePacketBuffer = (ParsableByteArray)sharedQueue.take(); + for(int i = 0; i < BUFFER_PACKET_COUNT; i++) { + parsePacketBuffer.setPosition(i * TS_PACKET_SIZE); + int position = parsePacketBuffer.getPosition(); + while (position < parsePacketBuffer.limit() && parsePacketBuffer.data[position] != TS_SYNC_BYTE) { + position++; + } + parsePacketBuffer.setPosition(position); + parsePacketBuffer.setLimit(parsePacketBuffer.getPosition() + TS_PACKET_SIZE); + easyParse(parsePacketBuffer, parsePacketBuffer.limit()); + } + } catch (InterruptedException ex) { + Log.d(TAG, "Exception caught; interrupting parserThread"); + Thread.currentThread().interrupt(); + break; + } + } + } + } }