Permalink
Browse files

Change the FragmentWriter queue to be vector based

Previously the fragment writer thread queue was a static queue and if
the number of output buffers was greater than the queue size an overflow
would occur and encoded frames would be dropped, resulting in corrupted
video or missed audio.

- We will do performance optimization later.
- Improve the lock usage based on reviewer's comment
  • Loading branch information...
1 parent 97fd596 commit 471c0b46110756d699d989fed41022768210e521 Dan Vacura committed with James Dong Dec 9, 2009
Showing with 44 additions and 99 deletions.
  1. +44 −99 nodes/pvmp4ffcomposernode/src/pvmp4ffcn_node.cpp
@@ -42,7 +42,6 @@
#ifndef OSCL_MEM_BASIC_FUNCTIONS_H
#include "oscl_mem_basic_functions.h"
#endif
-#include "pvmf_omx_basedec_node.h" // for NUMBER_OUTPUT_BUFFER
#ifdef ANDROID
namespace android
@@ -55,31 +54,28 @@ namespace android
// This class is friend with the composer node it belongs to, in order
// to be able to call the original AddMemFragToTrack which does all
// the work.
-//
-// The queue is implemented using a circular buffer. mBuffer is the
-// start of the array and mEnd points just after the last element in
-// the array.
-// mFirst points to the oldest fragment.
-// mLast points to the next available cell for a new fragment.
-// When the array is empty or full mFirst == mLast.
+
+#define FRAME_QUEUE_DEFAULT_SIZE 20
class FragmentWriter: public Thread
{
public:
FragmentWriter(PVMp4FFComposerNode *composer) :
- Thread(kThreadCallJava), mSize(0), mEnd(mBuffer + kCapacity),
- mFirst(mBuffer), mLast(mBuffer), mComposer(composer),
- mPrevWriteStatus(PVMFSuccess), mTid(NULL), mDropped(0), mExitRequested(false) {}
+ Thread(kThreadCallJava), mComposer(composer),
+ mPrevWriteStatus(PVMFSuccess), mTid(NULL), mExitRequested(false)
+ {
+ mQueue.reserve(FRAME_QUEUE_DEFAULT_SIZE);
+ }
virtual ~FragmentWriter()
{
Mutex::Autolock l(mRequestMutex);
- LOG_ASSERT(0 == mSize, "The queue should be empty by now.");
LOG_ASSERT(mExitRequested, "Deleting an active instance.");
- LOGD_IF(0 < mSize, "Flushing %d frags in dtor", mSize);
- while (0 < mSize) // make sure we are flushed
+ LOGD("Capacity of fragment queue reached %d", mQueue.capacity());
+ LOGD_IF(0 < mQueue.size(), "Flushing %d frags in dtor", mQueue.size());
+ while (0 < mQueue.size()) // make sure we are flushed
{
- decrPendingRequests();
+ releaseQueuedFrame(mQueue.begin());
}
}
@@ -104,10 +100,12 @@ class FragmentWriter: public Thread
while (!done)
{
mRequestMutex.lock();
- done = mSize == 0 || iter > kMaxFlushAttempts;
+ done = mQueue.size() == 0 || iter > kMaxFlushAttempts;
if (!done) mRequestCv.signal();
mRequestMutex.unlock();
- if (!done) usleep(kFlushSleepMicros);
+ if (!done) {
+ usleep(kFlushSleepMicros);
+ }
++iter;
}
LOG_ASSERT(iter <= kMaxFlushAttempts, "Failed to flush");
@@ -123,55 +121,21 @@ class FragmentWriter: public Thread
{
if (mExitRequested) return PVMFErrCancelled;
Mutex::Autolock lock(mRequestMutex);
-
- // When the queue is full, we drop the request. This frees the
- // memory fragment and keeps the system running. Decoders are
- // unhappy when there is no buffer available to write the
- // output.
- // An alternative would be to discard the oldest fragment
- // enqueued to free some space. However that would modify
- // mFirst and require extra locking because the thread maybe
- // in the process of writing it.
- if (mSize == kCapacity)
- {
- ++mDropped;
- LOGW_IF((mDropped % kLogDroppedPeriod) == 0, "Frame %d dropped.", mDropped);
- // TODO: Should we return an error code here?
- return mPrevWriteStatus;
- }
-
- mLast->set(aFrame, aMemFrag, aFormat, aTimestamp, aTrackId, aPort);
- incrPendingRequests();
-
+ Request frame = {aFrame, aMemFrag, aFormat, aTimestamp, aTrackId, aPort};
+ mQueue.push_back(frame);
mRequestCv.signal();
return mPrevWriteStatus;
}
private:
static const bool kThreadCallJava = false;
- static const size_t kLogDroppedPeriod = 10; // Arbitrary.
- // Must match the number of buffers allocated in the decoder.
- static const size_t kCapacity = NUMBER_OUTPUT_BUFFER;
- static const size_t kWarningThreshold = kCapacity * 3 / 4; // Warn at 75%
static const OsclRefCounterMemFrag kEmptyFrag;
// Flush blocks for 2 seconds max.
static const size_t kMaxFlushAttempts = 10;
static const int kFlushSleepMicros = 200 * 1000;
struct Request
{
- void set(Oscl_Vector<OsclMemoryFragment, OsclMemAllocator> aFrame,
- OsclRefCounterMemFrag& aMemFrag, PVMFFormatType aFormat,
- uint32 aTimestamp, int32 aTrackId, PVMp4FFComposerPort *aPort)
- {
- mFrame = aFrame;
- mFrag = aMemFrag;
- mFormat = aFormat;
- mTimestamp = aTimestamp;
- mTrackId = aTrackId;
- mPort = aPort;
- }
-
Oscl_Vector<OsclMemoryFragment, OsclMemAllocator> mFrame;
OsclRefCounterMemFrag mFrag;
PVMFFormatType mFormat;
@@ -180,85 +144,66 @@ class FragmentWriter: public Thread
PVMp4FFComposerPort *mPort;
};
- void incrPendingRequests()
- {
- ++mLast;
- if (mEnd == mLast) mLast = mBuffer;
- ++mSize;
- }
-
- void decrPendingRequests()
+ void releaseQueuedFrame(Request *frame)
{
- mFirst->mFrame.clear();
+ if (!frame) {
+ LOGE("Frame not valid");
+ return;
+ }
+ frame->mFrame.clear();
// Release the memory fragment tracked using a refcount
// class. Need to assign an empty frag to release the memory
// fragment. We cannot wait for the array to wrap around.
- mFirst->mFrag = kEmptyFrag; // FIXME: This assignement to decr the ref count is ugly.
- ++mFirst;
- if (mEnd == mFirst) mFirst = mBuffer;
- --mSize;
+ frame->mFrag = kEmptyFrag; // FIXME: This assignement to decr the ref count is ugly.
+ mQueue.erase(frame);
}
// Called by the base class Thread.
// @return true if there more work to do. false when done.
// @Override Thread
virtual bool threadLoop()
{
+ Request frame;
+ bool addFrame = false;
if (!mTid) mTid = androidGetThreadId();
LOG_ASSERT(androidGetThreadId() == mTid,
"Thread id has changed!: %p != %p", mTid, androidGetThreadId());
+ if (mExitRequested)
+ return false;
- size_t numFrags = 0;
- // Check if there's work to do. Otherwise wait for new fragment.
mRequestMutex.lock();
- numFrags = mSize;
- mRequestMutex.unlock();
-
- bool doneWaiting = numFrags != 0;
- while (!doneWaiting)
- {
- mRequestMutex.lock();
+ if (mQueue.empty())
mRequestCv.wait(mRequestMutex);
- doneWaiting = mSize > 0 || mExitRequested;
- numFrags = mSize;
- mRequestMutex.unlock();
- }
- if (mExitRequested) return false;
+ if (!mQueue.empty()) {
+ // First copy the frame by value, since it will not be protected and the
+ // reference may change, then release the frame by reference
+ frame = mQueue[0];
+ addFrame = true;
+ releaseQueuedFrame(mQueue.begin());
+ }
+ mRequestMutex.unlock();
- LOGW_IF(numFrags > kWarningThreshold, "%d fragments in queue.", numFrags);
- for (size_t i = 0; i < numFrags; ++i)
- {
- // Don't lock the array while we are calling
- // AddMemFragToTrack, which may last a long time, because
- // we are the only thread accessing mFirst.
+ if (addFrame) {
mPrevWriteStatus = mComposer->AddMemFragToTrack(
- mFirst->mFrame, mFirst->mFrag, mFirst->mFormat,
- mFirst->mTimestamp, mFirst->mTrackId, mFirst->mPort);
-
- mRequestMutex.lock();
- decrPendingRequests();
- mRequestMutex.unlock();
+ frame.mFrame, frame.mFrag, frame.mFormat,
+ frame.mTimestamp, frame.mTrackId, frame.mPort);
}
+
return true;
}
- Mutex mRequestMutex; // Protects mRequestCv, mBuffer, mFirst, mLast, mDropped
+ Mutex mRequestMutex; // Protects mRequestCv, mQueue
Condition mRequestCv;
- Request mBuffer[kCapacity];
- size_t mSize;
- void *const mEnd; // Marker for the end of the array.
- Request *mFirst, *mLast;
-
+ Oscl_Vector<Request, OsclMemAllocator> mQueue;
// mComposer with the real implementation of the AddMemFragToTrack method.
PVMp4FFComposerNode *mComposer;
// TODO: lock needed for mPrevWriteStatus? Are int assignement atomic on arm?
PVMFStatus mPrevWriteStatus;
android_thread_id_t mTid;
- size_t mDropped;
// Unlike exitPending(), stays to true once exit has been called.
bool mExitRequested;
};

0 comments on commit 471c0b4

Please sign in to comment.