Permalink
Browse files

update buffers

  • Loading branch information...
1 parent 87ea0a6 commit 84805bcf666cf20326d2acc96a86d22db7bab573 @frsyuki committed May 23, 2010
Showing with 102 additions and 22 deletions.
  1. +40 −11 mp/shared_buffer.h
  2. +36 −6 mp/stream_buffer.h
  3. +26 −5 test/handler.cc
View
@@ -28,6 +28,28 @@
namespace mp {
+/**
+ * shared_buffer:
+ * +-----------------------------+
+ * | filled space | unused space |
+ * +-----------------------------+
+ * ^ buffer()
+ *
+ * +--------------+
+ * buffer_capacity()
+ *
+ * reserve_buffer() +->
+ *
+ * +----+
+ * holded
+ * +------+
+ * holded (not to be freed)
+ *
+ * +-> hold()
+ * +-> skip()
+ *
+ */
+
class shared_buffer {
public:
shared_buffer(size_t init_size = MP_SHARED_BUFFER_INITIAL_BUFFER_SIZE);
@@ -37,14 +59,16 @@ class shared_buffer {
class ref {
public:
ref();
- ref(void* p);
ref(const ref& o);
~ref();
void clear();
- void reset(void* p);
void swap(ref& x);
private:
void* m;
+ private:
+ ref(void* p);
+ void reset(void* p);
+ friend class ref;
};
void reserve_buffer(size_t len,
@@ -53,8 +77,8 @@ class shared_buffer {
void* buffer();
size_t buffer_capacity() const;
- ref hold_buffer(size_t size,
- size_t init_size = MP_SHARED_BUFFER_INITIAL_BUFFER_SIZE);
+ ref hold(size_t len);
+ void skip(size_t len);
private:
char* m_buffer;
@@ -169,7 +193,7 @@ inline size_t shared_buffer::buffer_capacity() const
return m_free;
}
-inline void shared_buffer::reserve(size_t len, size_t init_size)
+inline void shared_buffer::reserve_buffer(size_t len, size_t init_size)
{
if(get_count(m_buffer) == 1) {
// rewind buffer
@@ -181,19 +205,24 @@ inline void shared_buffer::reserve(size_t len, size_t init_size)
}
}
-inline ref shared_buffer::hold_buffer(
- size_t len, size_t init_size)
+inline ref shared_buffer::hold(size_t len)
+{
+ if(m_free < len) { len = m_free; }
+ m_used += len;
+ m_free -= len;
+ return ref(m_buffer);
+}
+
+inline void shared_buffer::skip(size_t len)
{
- reserve(len, init_size);
- char* tmp = m_buffer + m_used;
+ if(m_free < len) { len = m_free; }
m_used += len;
m_free -= len;
- return result_ref(m_buffer);
}
inline void shared_buffer::expand_buffer(size_t len, size_t init_size)
{
- if(m_used == sizeof(count_t)) {
+ if(m_used == sizeof(count_t) && get_count(m_buffer) == 1) {
size_t next_size = (m_used + m_free) * 2;
while(next_size < len + m_used) { next_size *= 2; }
View
@@ -30,6 +30,28 @@
namespace mp {
+/**
+ * stream_buffer:
+ * +-------------------------------------------+
+ * | referenced | unparsed data | unused space |
+ * +-------------------------------------------+
+ * ^ ^ data() ^ buffer()
+ * |
+ * | +---------------+
+ * | data_size()
+ * | +--------------+
+ * | buffer_capacity()
+ * |
+ * | +-> data_consumed()
+ * |
+ * | +-> buffer_filled()
+ * |
+ * | reserve_buffer() +->
+ * |
+ * +-- stream_buffer::ref (reference counter)
+ *
+ */
+
class stream_buffer {
public:
stream_buffer(size_t initial_buffer_size = MP_STREAM_BUFFER_INITIAL_BUFFER_SIZE);
@@ -53,16 +75,19 @@ class stream_buffer {
ref(const ref& o);
~ref();
void clear();
- void push(void* d);
void swap(ref& x);
private:
std::vector<void*> m_array;
struct each_incr;
struct each_decr;
+ private:
+ void push(void* d);
+ void move(void* d);
+ friend class stream_buffer;
};
- ref release();
- void release_to(ref* to);
+ ref retain();
+ void retain_to(ref* to);
private:
char* m_buffer;
@@ -150,6 +175,11 @@ inline void stream_buffer::ref::push(void* d)
incr_count(d);
}
+inline void stream_buffer::ref::move(void* d)
+{
+ m_array.push_back(d);
+}
+
inline void stream_buffer::ref::swap(ref& x)
{
m_array.swap(x.m_array);
@@ -210,14 +240,14 @@ inline void stream_buffer::data_consumed(size_t len)
}
-inline stream_buffer::ref stream_buffer::release()
+inline stream_buffer::ref stream_buffer::retain()
{
ref r;
r.swap(m_ref);
return r;
}
-inline void stream_buffer::release_to(ref* to)
+inline void stream_buffer::retain_to(ref* to)
{
m_ref.push(m_buffer);
to->swap(m_ref);
@@ -261,7 +291,7 @@ inline void stream_buffer::expand_buffer(size_t len, size_t initial_buffer_size)
init_count(tmp);
try {
- m_ref.push(m_buffer);
+ m_ref.move(m_buffer);
} catch (...) { free(tmp); throw; }
memcpy(tmp+sizeof(count_t), m_buffer+m_off, not_used);
View
@@ -39,12 +39,35 @@ class handler : public mp::wavy::handler {
int m_count;
};
+bool timer_handler(int* count, mp::wavy::loop* lo)
+{
+ std::cout << "timer" << std::endl;
+
+ if(++(*count) >= 3) {
+ lo->end();
+ return false;
+ }
+
+ return true;
+}
+
+void my_function()
+{
+ std::cout << "ok" << std::endl;
+}
+
void reader_main(int rpipe)
{
mp::wavy::loop lo;
lo.add_handler<handler>(rpipe, &lo);
+ int count = 0;
+ lo.add_timer(0.1, 0.1, mp::bind(
+ &timer_handler, &count, &lo));
+
+ lo.submit(&my_function);
+
lo.run(4);
}
@@ -56,11 +79,7 @@ void writer_main(int wpipe)
lo.write(wpipe, "test", 4);
}
- lo.start(4);
-
- wait(NULL);
-
- lo.end();
+ lo.flush();
}
int main(void)
@@ -75,5 +94,7 @@ int main(void)
}
writer_main(pair[1]);
+
+ wait(NULL);
}

0 comments on commit 84805bc

Please sign in to comment.