Skip to content

Commit c696dda

Browse files
committed
Initial commit of StreamingBuffer
1 parent 704d7ef commit c696dda

File tree

2 files changed

+174
-2
lines changed

2 files changed

+174
-2
lines changed

include/cinder/Buffer.h

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#pragma once
2424

2525
#include "cinder/Cinder.h"
26+
#include <deque>
27+
#include <mutex>
2628

2729
#define DEFAULT_COMPRESSION_LEVEL 6
2830

@@ -79,7 +81,48 @@ class CI_API Buffer {
7981
bool mOwnsData;
8082
};
8183

84+
//! Thread-safe single-producer, single-consumer block-based double-ended byte queue
85+
class CI_API StreamingBuffer {
86+
public:
87+
StreamingBuffer( size_t blockSizeBytes = 65536 );
88+
89+
//! pushes \a sizeBytes bytes at the front of the deque
90+
void pushFront( const void *data, size_t sizeBytes );
91+
//! pops up to \a maxSize bytes from the back of the deque. Returns the number of bytes popped, which may be 0.
92+
size_t popBack( void *output, size_t maxSize );
93+
94+
//! returns the number of bytes currently in the deque
95+
size_t getSize() const;
96+
97+
//! returns \c true if the deque is empty
98+
bool empty() const { return getSize() == 0; }
99+
//! clears all data in the deque but does not deallocate internal storage
100+
void clear();
101+
//! deallocates internal storage to precisely fit current size
102+
void shrinkToFit();
103+
104+
//! Performs a non-destructive copy to \a output, up to \a maxSize bytes. Does not pop any data. Returns number of bytes written
105+
size_t copyTo( void *output, size_t maxSize ) const;
106+
107+
private:
108+
StreamingBuffer( const StreamingBuffer &rhs ) = delete;
109+
StreamingBuffer( StreamingBuffer &&rhs ) = delete;
110+
111+
StreamingBuffer& operator=( const StreamingBuffer &rhs ) = delete;
112+
StreamingBuffer& operator=( StreamingBuffer &&rhs ) = delete;
113+
114+
void allocateNewWriteBlock();
115+
void releaseCurrentReadBlock();
116+
size_t calcSize() const;
117+
118+
std::deque<std::unique_ptr<uint8_t[]>> mBlocks, mUnusedBlocks;
119+
120+
mutable std::mutex mMutex;
121+
const size_t mBlockSize;
122+
size_t mWriteOffset, mReadOffset; // expressed in bytes
123+
};
124+
82125
CI_API Buffer compressBuffer( const Buffer &buffer, int8_t compressionLevel = DEFAULT_COMPRESSION_LEVEL, bool resizeResult = true );
83126
CI_API Buffer decompressBuffer( const Buffer &buffer, bool resizeResult = true, bool useGZip = false );
84127

85-
} //namespace
128+
} //namespace

src/cinder/Buffer.cpp

Lines changed: 130 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,135 @@ void Buffer::write( const DataTargetRef &dataTarget )
127127
os->write( *this );
128128
}
129129

130+
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
131+
// StreamingBuffer
132+
133+
StreamingBuffer::StreamingBuffer( size_t blockSizeBytes )
134+
: mBlockSize( std::max<size_t>( blockSizeBytes, 1 ) )
135+
{
136+
allocateNewWriteBlock();
137+
mReadOffset = 0;
138+
}
139+
140+
void StreamingBuffer::pushFront( const void *data, size_t dataSize )
141+
{
142+
std::lock_guard<std::mutex> lock( mMutex );
143+
size_t offset = 0;
144+
145+
while( offset < dataSize ) {
146+
size_t copyCount = std::min( dataSize - offset, mBlockSize - mWriteOffset );
147+
memcpy( &mBlocks.back()[mWriteOffset], &reinterpret_cast<const uint8_t*>(data)[offset], copyCount );
148+
offset += copyCount;
149+
mWriteOffset += copyCount;
150+
if( mWriteOffset == mBlockSize )
151+
allocateNewWriteBlock();
152+
}
153+
}
154+
155+
size_t StreamingBuffer::popBack( void *output, size_t maxSize )
156+
{
157+
std::lock_guard<std::mutex> lock( mMutex );
158+
size_t offset = 0;
159+
160+
maxSize = std::min( maxSize, calcSize() );
161+
162+
while( offset < maxSize && ( ! mBlocks.empty() ) ) {
163+
// for the last block, we can only read up to `mWriteOffset`; for other blocks we can read up to 'mBlockSize'
164+
size_t copyCount = std::min( maxSize - offset, ( ( mBlocks.size() == 1 ) ? mWriteOffset : mBlockSize ) - mReadOffset );
165+
memcpy( &reinterpret_cast<uint8_t*>(output)[offset], &mBlocks.front()[mReadOffset], copyCount );
166+
offset += copyCount;
167+
mReadOffset += copyCount;
168+
if( mReadOffset == mBlockSize )
169+
releaseCurrentReadBlock();
170+
}
171+
172+
return offset;
173+
}
174+
175+
size_t StreamingBuffer::getSize() const
176+
{
177+
std::lock_guard<std::mutex> lock( mMutex );
178+
return calcSize();
179+
}
180+
181+
void StreamingBuffer::clear()
182+
{
183+
std::lock_guard<std::mutex> lock( mMutex );
184+
while( mBlocks.size() > 1 )
185+
releaseCurrentReadBlock();
186+
mReadOffset = mWriteOffset = 0;
187+
}
188+
189+
void StreamingBuffer::shrinkToFit()
190+
{
191+
std::lock_guard<std::mutex> lock( mMutex );
192+
mUnusedBlocks.clear();
193+
}
194+
195+
size_t StreamingBuffer::copyTo( void *output, size_t maxSize ) const
196+
{
197+
std::lock_guard<std::mutex> lock( mMutex );
198+
size_t offset = 0, copyCount;
199+
200+
// the first buffer we copy from read offset, up to either 'mBlockSize' or in the case of a single block, 'mWriteOffset'
201+
size_t firstBlockSize = ( mBlocks.size() == 1 ) ? mWriteOffset : mBlockSize;
202+
copyCount = std::min( maxSize, firstBlockSize - mReadOffset );
203+
memcpy( &reinterpret_cast<uint8_t*>(output)[0], &mBlocks.front()[mReadOffset], copyCount );
204+
offset += copyCount;
205+
if( maxSize == offset || mBlocks.size() == 1 )
206+
return offset;
207+
208+
// all but the first and last buffer we can copy in its entirety
209+
for( size_t block = 1; block < mBlocks.size() - 1; ++block ) {
210+
copyCount = std::min( maxSize - offset, mBlockSize );
211+
memcpy( &reinterpret_cast<uint8_t*>(output)[offset], &mBlocks[block][0], copyCount );
212+
offset += copyCount;
213+
if( maxSize == offset )
214+
return offset;
215+
}
216+
217+
// last block we should read up to 'writeOffset'
218+
copyCount = std::min( maxSize - offset, mWriteOffset );
219+
memcpy( &reinterpret_cast<uint8_t*>(output)[offset], &mBlocks.back()[0], copyCount );
220+
offset += copyCount;
221+
222+
return offset;
223+
}
224+
225+
// implicitly thread-safe; only called by thread-safe methods
226+
void StreamingBuffer::allocateNewWriteBlock()
227+
{
228+
if( ! mUnusedBlocks.empty() ) {
229+
mBlocks.push_back( std::move( mUnusedBlocks.back() ) );
230+
mUnusedBlocks.pop_back();
231+
}
232+
else
233+
mBlocks.emplace_back( new uint8_t[mBlockSize] );
234+
mWriteOffset = 0;
235+
}
236+
237+
// implicitly thread-safe; only called by thread-safe methods
238+
void StreamingBuffer::releaseCurrentReadBlock()
239+
{
240+
mUnusedBlocks.push_back( std::move( mBlocks.front() ) );
241+
mBlocks.pop_front();
242+
mReadOffset = 0;
243+
}
244+
245+
// implicitly thread-safe; only called by thread-safe methods
246+
size_t StreamingBuffer::calcSize() const
247+
{
248+
if( mBlocks.size() == 1 )
249+
return mWriteOffset - mReadOffset;
250+
else {
251+
size_t frontBlockSize = mBlockSize - mReadOffset;
252+
size_t backBlockSize = mWriteOffset;
253+
return ( mBlocks.size() - 2 ) * mBlockSize + frontBlockSize + backBlockSize;
254+
}
255+
}
256+
257+
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
258+
// Compression
130259
Buffer compressBuffer( const Buffer &buffer, int8_t compressionLevel, bool resizeResult )
131260
{
132261
/*Initial output buffer size needs to be 0.1% larger than source buffer + 12 bytes*/
@@ -220,4 +349,4 @@ Buffer decompressBuffer( const Buffer &buffer, bool resizeResult, bool useGZip )
220349
return outBuffer;
221350
}
222351

223-
} //namespace
352+
} //namespace

0 commit comments

Comments
 (0)