Skip to content

Commit

Permalink
AdapterIO: Further development of the interface
Browse files Browse the repository at this point in the history
* Implement Open/Close mechanism.
* Implement SetBuffer.
* Implement timeout handling.
* Improve Seek mechanism, this is now working by locking the
thread until the backend call SeekCompleted.
  • Loading branch information
Numerio committed Jun 30, 2016
1 parent 92cb0c5 commit 345dba5
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 34 deletions.
14 changes: 9 additions & 5 deletions headers/private/media/AdapterIO.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,30 @@ class BAdapterIO : public BMediaIO {
virtual status_t SetSize(off_t size);
virtual status_t GetSize(off_t* size) const;

virtual status_t Open();
virtual void Close();

void SeekCompleted();
status_t SetBuffer(BPositionIO* buffer);

BInputAdapter* BuildInputAdapter();

protected:
friend class BInputAdapter;

void SetBuffer(BPositionIO* io);
virtual status_t SeekRequested(off_t position);

ssize_t BackWrite(const void* buffer, size_t size);

status_t SeekRequested(off_t position);
status_t SeekCompleted(off_t position);

private:
status_t _EvaluateWait(off_t position);

int32 fFlags;
bigtime_t fTimeout;

RelativePositionIO* fBuffer;
off_t fTotalSize;
bool fOpened;
sem_id fSeekSem;

BInputAdapter* fInputAdapter;

Expand Down
117 changes: 88 additions & 29 deletions src/kits/media/AdapterIO.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@

#include <MediaIO.h>

#include <stdio.h>

#include "debug.h"


#define TIMEOUT_QUANTA 100000


class RelativePositionIO : public BPositionIO
{
public:
RelativePositionIO(BAdapterIO* owner, BPositionIO* buffer)
RelativePositionIO(BAdapterIO* owner, BPositionIO* buffer, bigtime_t timeout)
:
BPositionIO(),
fOwner(owner),
fBackPosition(0),
fStartOffset(0),
fBuffer(buffer)
fBuffer(buffer),
fTimeout(timeout)
{
fOwner->GetFlags(&fFlags);
}
Expand Down Expand Up @@ -84,8 +86,15 @@ class RelativePositionIO : public BPositionIO
if (ret != B_OK)
return B_ERROR;

while(bufferSize < position) {
snooze(100000);
bigtime_t totalTimeOut = 0;

while(bufferSize <= position) {
if (fTimeout != B_INFINITE_TIMEOUT && totalTimeOut >= fTimeout)
return B_TIMED_OUT;

snooze(TIMEOUT_QUANTA);

totalTimeOut += TIMEOUT_QUANTA;
GetSize(&bufferSize);
}
return B_OK;
Expand Down Expand Up @@ -147,10 +156,15 @@ class RelativePositionIO : public BPositionIO
{
AutoWriteLocker _(fLock);

off_t currentPos = Position();
off_t ret = fBuffer->WriteAt(fBackPosition, buffer, size);
fBackPosition += ret;
return fBuffer->Seek(currentPos, SEEK_SET);
return ret;
}

void SetBuffer(BPositionIO* buffer)
{
delete fBuffer;
fBuffer = buffer;
}

bool IsStreaming() const
Expand Down Expand Up @@ -188,19 +202,26 @@ class RelativePositionIO : public BPositionIO
int32 fFlags;

mutable RWLocker fLock;

bigtime_t fTimeout;
};


BAdapterIO::BAdapterIO(int32 flags, bigtime_t timeout)
:
fFlags(flags),
fTimeout(timeout),
fBuffer(NULL),
fTotalSize(0),
fOpened(false),
fSeekSem(-1),
fInputAdapter(NULL)
{
CALLED();

fBuffer = new RelativePositionIO(this, new BMallocIO());
fBuffer = new RelativePositionIO(this, new BMallocIO(), timeout);

if (fBuffer->IsSeekable())
fSeekSem = create_sem(0, "BAdapterIO seek sem");
}


Expand All @@ -214,6 +235,8 @@ BAdapterIO::~BAdapterIO()
{
CALLED();

Close();

delete fInputAdapter;
delete fBuffer;
}
Expand All @@ -233,7 +256,8 @@ BAdapterIO::ReadAt(off_t position, void* buffer, size_t size)
{
CALLED();

printf("read at %d %d \n", (int)position, (int)size);
TRACE("BAdapterIO::ReadAt %" B_PRId64 " %" B_PRId64 "\n", position, size);

status_t ret = _EvaluateWait(position+size);
if (ret != B_OK)
return ret;
Expand All @@ -260,6 +284,7 @@ BAdapterIO::Seek(off_t position, uint32 seekMode)
{
CALLED();

// TODO: Support seekModes
status_t ret = _EvaluateWait(position);
if (ret != B_OK)
return ret;
Expand Down Expand Up @@ -305,34 +330,43 @@ BAdapterIO::GetSize(off_t* size) const
}


ssize_t
BAdapterIO::BackWrite(const void* buffer, size_t size)
status_t
BAdapterIO::Open()
{
CALLED();

return fBuffer->BackWrite(buffer, size);
fOpened = true;
return B_OK;
}


status_t
BAdapterIO::_EvaluateWait(off_t pos)
void
BAdapterIO::Close()
{
CALLED();

status_t err = fBuffer->EvaluatePosition(pos);
fOpened = false;
}

if (err != B_WOULD_BLOCK) {
if (err != B_RESOURCE_UNAVAILABLE && err != B_OK)
return B_UNSUPPORTED;

if (err == B_RESOURCE_UNAVAILABLE && fBuffer->IsStreaming()
&& fBuffer->IsSeekable()) {
if (SeekRequested(pos) != B_OK)
return B_UNSUPPORTED;
}
}
void
BAdapterIO::SeekCompleted()
{
CALLED();
release_sem(fSeekSem);
}

return fBuffer->WaitForData(pos);

status_t
BAdapterIO::SetBuffer(BPositionIO* buffer)
{
// We can't change the buffer while we
// are running.
if (fOpened)
return B_ERROR;

fBuffer->SetBuffer(buffer);
return B_OK;
}


Expand All @@ -356,12 +390,37 @@ BAdapterIO::SeekRequested(off_t position)
}


ssize_t
BAdapterIO::BackWrite(const void* buffer, size_t size)
{
CALLED();

return fBuffer->BackWrite(buffer, size);
}


status_t
BAdapterIO::SeekCompleted(off_t position)
BAdapterIO::_EvaluateWait(off_t pos)
{
CALLED();

return fBuffer->ResetStartOffset(position);
status_t err = fBuffer->EvaluatePosition(pos);

if (err != B_WOULD_BLOCK) {
if (err != B_RESOURCE_UNAVAILABLE && err != B_OK)
return B_UNSUPPORTED;

if (err == B_RESOURCE_UNAVAILABLE && fBuffer->IsStreaming()
&& fBuffer->IsSeekable()) {
if (SeekRequested(pos) != B_OK)
return B_UNSUPPORTED;

acquire_sem(fSeekSem);
fBuffer->ResetStartOffset(pos);
}
}

return fBuffer->WaitForData(pos);
}


Expand Down

0 comments on commit 345dba5

Please sign in to comment.