Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge "Change the FragmentWriter queue to be vector based"

  • Loading branch information...
commit 918c4156ea99dadfdae4a5f91c788eec8d5383df 2 parents 1faa812 + 471c0b4
James Dong authored Android (Google) Code Review committed
Showing with 44 additions and 99 deletions.
  1. +44 −99 nodes/pvmp4ffcomposernode/src/pvmp4ffcn_node.cpp
View
143 nodes/pvmp4ffcomposernode/src/pvmp4ffcn_node.cpp
@@ -42,7 +42,6 @@
#ifndef OSCL_MEM_BASIC_FUNCTIONS_H
#include "oscl_mem_basic_functions.h"
#endif
-#include "pvmf_omx_basedec_node.h" // for NUMBER_OUTPUT_BUFFER
#ifdef ANDROID
namespace android
@@ -55,31 +54,28 @@ namespace android
// This class is friend with the composer node it belongs to, in order
// to be able to call the original AddMemFragToTrack which does all
// the work.
-//
-// The queue is implemented using a circular buffer. mBuffer is the
-// start of the array and mEnd points just after the last element in
-// the array.
-// mFirst points to the oldest fragment.
-// mLast points to the next available cell for a new fragment.
-// When the array is empty or full mFirst == mLast.
+
+#define FRAME_QUEUE_DEFAULT_SIZE 20
class FragmentWriter: public Thread
{
public:
FragmentWriter(PVMp4FFComposerNode *composer) :
- Thread(kThreadCallJava), mSize(0), mEnd(mBuffer + kCapacity),
- mFirst(mBuffer), mLast(mBuffer), mComposer(composer),
- mPrevWriteStatus(PVMFSuccess), mTid(NULL), mDropped(0), mExitRequested(false) {}
+ Thread(kThreadCallJava), mComposer(composer),
+ mPrevWriteStatus(PVMFSuccess), mTid(NULL), mExitRequested(false)
+ {
+ mQueue.reserve(FRAME_QUEUE_DEFAULT_SIZE);
+ }
virtual ~FragmentWriter()
{
Mutex::Autolock l(mRequestMutex);
- LOG_ASSERT(0 == mSize, "The queue should be empty by now.");
LOG_ASSERT(mExitRequested, "Deleting an active instance.");
- LOGD_IF(0 < mSize, "Flushing %d frags in dtor", mSize);
- while (0 < mSize) // make sure we are flushed
+ LOGD("Capacity of fragment queue reached %d", mQueue.capacity());
+ LOGD_IF(0 < mQueue.size(), "Flushing %d frags in dtor", mQueue.size());
+ while (0 < mQueue.size()) // make sure we are flushed
{
- decrPendingRequests();
+ releaseQueuedFrame(mQueue.begin());
}
}
@@ -104,10 +100,12 @@ class FragmentWriter: public Thread
while (!done)
{
mRequestMutex.lock();
- done = mSize == 0 || iter > kMaxFlushAttempts;
+ done = mQueue.size() == 0 || iter > kMaxFlushAttempts;
if (!done) mRequestCv.signal();
mRequestMutex.unlock();
- if (!done) usleep(kFlushSleepMicros);
+ if (!done) {
+ usleep(kFlushSleepMicros);
+ }
++iter;
}
LOG_ASSERT(iter <= kMaxFlushAttempts, "Failed to flush");
@@ -123,36 +121,14 @@ class FragmentWriter: public Thread
{
if (mExitRequested) return PVMFErrCancelled;
Mutex::Autolock lock(mRequestMutex);
-
- // When the queue is full, we drop the request. This frees the
- // memory fragment and keeps the system running. Decoders are
- // unhappy when there is no buffer available to write the
- // output.
- // An alternative would be to discard the oldest fragment
- // enqueued to free some space. However that would modify
- // mFirst and require extra locking because the thread maybe
- // in the process of writing it.
- if (mSize == kCapacity)
- {
- ++mDropped;
- LOGW_IF((mDropped % kLogDroppedPeriod) == 0, "Frame %d dropped.", mDropped);
- // TODO: Should we return an error code here?
- return mPrevWriteStatus;
- }
-
- mLast->set(aFrame, aMemFrag, aFormat, aTimestamp, aTrackId, aPort);
- incrPendingRequests();
-
+ Request frame = {aFrame, aMemFrag, aFormat, aTimestamp, aTrackId, aPort};
+ mQueue.push_back(frame);
mRequestCv.signal();
return mPrevWriteStatus;
}
private:
static const bool kThreadCallJava = false;
- static const size_t kLogDroppedPeriod = 10; // Arbitrary.
- // Must match the number of buffers allocated in the decoder.
- static const size_t kCapacity = NUMBER_OUTPUT_BUFFER;
- static const size_t kWarningThreshold = kCapacity * 3 / 4; // Warn at 75%
static const OsclRefCounterMemFrag kEmptyFrag;
// Flush blocks for 2 seconds max.
static const size_t kMaxFlushAttempts = 10;
@@ -160,18 +136,6 @@ class FragmentWriter: public Thread
struct Request
{
- void set(Oscl_Vector<OsclMemoryFragment, OsclMemAllocator> aFrame,
- OsclRefCounterMemFrag& aMemFrag, PVMFFormatType aFormat,
- uint32 aTimestamp, int32 aTrackId, PVMp4FFComposerPort *aPort)
- {
- mFrame = aFrame;
- mFrag = aMemFrag;
- mFormat = aFormat;
- mTimestamp = aTimestamp;
- mTrackId = aTrackId;
- mPort = aPort;
- }
-
Oscl_Vector<OsclMemoryFragment, OsclMemAllocator> mFrame;
OsclRefCounterMemFrag mFrag;
PVMFFormatType mFormat;
@@ -180,23 +144,18 @@ class FragmentWriter: public Thread
PVMp4FFComposerPort *mPort;
};
- void incrPendingRequests()
- {
- ++mLast;
- if (mEnd == mLast) mLast = mBuffer;
- ++mSize;
- }
-
- void decrPendingRequests()
+ void releaseQueuedFrame(Request *frame)
{
- mFirst->mFrame.clear();
+ if (!frame) {
+ LOGE("Frame not valid");
+ return;
+ }
+ frame->mFrame.clear();
// Release the memory fragment tracked using a refcount
// class. Need to assign an empty frag to release the memory
// fragment. We cannot wait for the array to wrap around.
- mFirst->mFrag = kEmptyFrag; // FIXME: This assignement to decr the ref count is ugly.
- ++mFirst;
- if (mEnd == mFirst) mFirst = mBuffer;
- --mSize;
+ frame->mFrag = kEmptyFrag; // FIXME: This assignement to decr the ref count is ugly.
+ mQueue.erase(frame);
}
// Called by the base class Thread.
@@ -204,61 +163,47 @@ class FragmentWriter: public Thread
// @Override Thread
virtual bool threadLoop()
{
+ Request frame;
+ bool addFrame = false;
if (!mTid) mTid = androidGetThreadId();
LOG_ASSERT(androidGetThreadId() == mTid,
"Thread id has changed!: %p != %p", mTid, androidGetThreadId());
+ if (mExitRequested)
+ return false;
- size_t numFrags = 0;
- // Check if there's work to do. Otherwise wait for new fragment.
mRequestMutex.lock();
- numFrags = mSize;
- mRequestMutex.unlock();
-
- bool doneWaiting = numFrags != 0;
- while (!doneWaiting)
- {
- mRequestMutex.lock();
+ if (mQueue.empty())
mRequestCv.wait(mRequestMutex);
- doneWaiting = mSize > 0 || mExitRequested;
- numFrags = mSize;
- mRequestMutex.unlock();
- }
- if (mExitRequested) return false;
+ if (!mQueue.empty()) {
+ // First copy the frame by value, since it will not be protected and the
+ // reference may change, then release the frame by reference
+ frame = mQueue[0];
+ addFrame = true;
+ releaseQueuedFrame(mQueue.begin());
+ }
+ mRequestMutex.unlock();
- LOGW_IF(numFrags > kWarningThreshold, "%d fragments in queue.", numFrags);
- for (size_t i = 0; i < numFrags; ++i)
- {
- // Don't lock the array while we are calling
- // AddMemFragToTrack, which may last a long time, because
- // we are the only thread accessing mFirst.
+ if (addFrame) {
mPrevWriteStatus = mComposer->AddMemFragToTrack(
- mFirst->mFrame, mFirst->mFrag, mFirst->mFormat,
- mFirst->mTimestamp, mFirst->mTrackId, mFirst->mPort);
-
- mRequestMutex.lock();
- decrPendingRequests();
- mRequestMutex.unlock();
+ frame.mFrame, frame.mFrag, frame.mFormat,
+ frame.mTimestamp, frame.mTrackId, frame.mPort);
}
+
return true;
}
- Mutex mRequestMutex; // Protects mRequestCv, mBuffer, mFirst, mLast, mDropped
+ Mutex mRequestMutex; // Protects mRequestCv, mQueue
Condition mRequestCv;
- Request mBuffer[kCapacity];
- size_t mSize;
- void *const mEnd; // Marker for the end of the array.
- Request *mFirst, *mLast;
-
+ Oscl_Vector<Request, OsclMemAllocator> mQueue;
// mComposer with the real implementation of the AddMemFragToTrack method.
PVMp4FFComposerNode *mComposer;
// TODO: lock needed for mPrevWriteStatus? Are int assignement atomic on arm?
PVMFStatus mPrevWriteStatus;
android_thread_id_t mTid;
- size_t mDropped;
// Unlike exitPending(), stays to true once exit has been called.
bool mExitRequested;
};
Please sign in to comment.
Something went wrong with that request. Please try again.