Permalink
Browse files

DeviceReadBuffer: Deal with device drivers which segment their intern…

…al buffers.

Some device drivers segment their buffers into pieces, and will return at
most one "piece" at a time.  For efficent processing, we may need to process
more than one "piece" during each loop.
  • Loading branch information...
jpoet committed Jan 27, 2013
1 parent 4f0bfd9 commit bbfdaceeefdbfe37de3fcac49f37ccfda7d2aa6c
@@ -33,7 +33,7 @@ DeviceReadBuffer::DeviceReadBuffer(
max_poll_wait(2500 /*ms*/),
size(0), used(0),
read_quanta(0),
read_quanta(0), dev_buffer_count(1),
dev_read_size(0), readThreshold(0),
buffer(NULL), readPtr(NULL),
@@ -72,7 +72,8 @@ DeviceReadBuffer::~DeviceReadBuffer()
}
bool DeviceReadBuffer::Setup(const QString &streamName, int streamfd,
uint readQuanta, uint deviceBufferSize)
uint readQuanta, uint deviceBufferSize,
uint deviceBufferCount)
{
QMutexLocker locker(&lock);
@@ -90,6 +91,7 @@ bool DeviceReadBuffer::Setup(const QString &streamName, int streamfd,
paused = false;
read_quanta = (readQuanta) ? readQuanta : read_quanta;
dev_buffer_count = deviceBufferCount;
size = gCoreContext->GetNumSetting(
"HDRingbufferSize", 50 * read_quanta) * 1024;
used = 0;
@@ -329,6 +331,12 @@ void DeviceReadBuffer::run(void)
RunProlog();
uint errcnt = 0;
uint cnt;
ssize_t len;
size_t read_size;
size_t unused;
size_t total;
size_t throttle = dev_read_size * dev_buffer_count / 2;
lock.lock();
runWait.wakeAll();
@@ -360,31 +368,36 @@ void DeviceReadBuffer::run(void)
}
}
// Limit read size for faster return from read
size_t unused = (size_t) WaitForUnused(read_quanta);
size_t read_size = min(dev_read_size, unused);
// if read_size > 0 do the read...
ssize_t len = 0;
if (read_size)
/* Some device drivers segment their buffer into small pieces,
* So allow for the reading of multiple buffers */
for (cnt = 0, len = 0, total = 0;
dorun && len >= 0 && cnt < dev_buffer_count; ++cnt)
{
len = read(_stream_fd, writePtr, read_size);
if (!CheckForErrors(len, read_size, errcnt))
// Limit read size for faster return from read
unused = static_cast<size_t>(WaitForUnused(read_quanta));
read_size = min(dev_read_size, unused);
// if read_size > 0 do the read...
if (read_size)
{
if (errcnt > 5)
len = read(_stream_fd, writePtr, read_size);
if (!CheckForErrors(len, read_size, errcnt))
break;
else
continue;
errcnt = 0;
// if we wrote past the official end of the buffer,
// copy to start
if (writePtr + len > endPtr)
memcpy(buffer, endPtr, writePtr + len - endPtr);
IncrWritePointer(len);
total += len;
}
errcnt = 0;
// if we wrote past the official end of the buffer, copy to start
if (writePtr + len > endPtr)
memcpy(buffer, endPtr, writePtr + len - endPtr);
IncrWritePointer(len);
}
if (errcnt > 5)
break;
// Slow down reading if not under load
if (len < static_cast<int32_t>(dev_read_size / 2))
if (errcnt == 0 && total < throttle)
usleep(1000);
}
@@ -41,7 +41,8 @@ class DeviceReadBuffer : protected MThread
bool Setup(const QString &streamName,
int streamfd,
uint readQuanta = sizeof(TSPacket),
uint deviceBufferSize = 0);
uint deviceBufferSize = 0,
uint deviceBufferCount = 1);
void Start(void);
void Reset(const QString &streamName, int streamfd);
@@ -102,6 +103,7 @@ class DeviceReadBuffer : protected MThread
size_t size;
size_t used;
size_t read_quanta;
size_t dev_buffer_count;
size_t dev_read_size;
size_t readThreshold;
unsigned char *buffer;

0 comments on commit bbfdace

Please sign in to comment.