Skip to content
This repository has been archived by the owner on Nov 29, 2021. It is now read-only.

Commit

Permalink
libeio-based implementation of asyncronous interaction.
Browse files Browse the repository at this point in the history
Step 3. Callback implementation.
  • Loading branch information
Ivan (egorich) Egorov committed Jun 5, 2010
1 parent a322cf4 commit 5c935f2
Showing 1 changed file with 52 additions and 28 deletions.
80 changes: 52 additions & 28 deletions src/zlib.h
Expand Up @@ -183,15 +183,11 @@ class ZipLib : ObjectWrap {
Request *request = Request::Write(proc, buffer, cb);

eio_custom(Self::DoPushRequest, EIO_PRI_DEFAULT,
Self::OnRequestPushed, request);
Self::DoHandleCallbacks, request);

ev_ref(EV_DEFAULT_UC);
proc->Ref();

// Blob out;
// int r = proc->Write(buffer->data(), buffer->length(), out);
//
// DoCallback(cb, r, out);
return Undefined();
}

Expand All @@ -210,15 +206,11 @@ class ZipLib : ObjectWrap {

Request *request = Request::Close(proc, cb);
eio_custom(Self::DoPushRequest, EIO_PRI_DEFAULT,
Self::OnRequestPushed, request);
Self::DoHandleCallbacks, request);

ev_ref(EV_DEFAULT_UC);
proc->Ref();

// Blob out;
// int r = proc->Close(out);
//
// DoCallback(cb, r, out);
return Undefined();
}

Expand All @@ -228,37 +220,52 @@ class ZipLib : ObjectWrap {

Request *request = Request::Destroy(proc);
eio_custom(Self::DoPushRequest, EIO_PRI_DEFAULT,
Self::OnRequestPushed, request);
Self::DoHandleCallbacks, request);

ev_ref(EV_DEFAULT_UC);
proc->Ref();

//proc->Destroy();

return Undefined();
}


private:
static DoPushRequest(void *rawRequest) {
static void DoPushRequest(void *rawRequest) {
Request *request = static_cast<Request*>(rawRequest);

Self *self = request->self();
pthread_mutex_lock(&self->mutex_);
self->queue_.Push(request);
pthread_mutex_lock(&self->requestsMutex_);
self->requestsQueue_.Push(request);
bool startProcessing = !self->processorActive_;
if (!startProcessing) self->processorActive_ = true;
pthread_mutex_unlock(&self->mutex_);
pthread_mutex_unlock(&self->requestsMutex_);

if (startProcessing) {
self->DoProcess();
}
}

static void DoHandleCallbacks(void *rawRequest) {
Request *request;

while (ReentrantPop(callbackQueue_, callbackMutex_, request)) {
Self *self = request->self();
self->DoCallback(request->callback(),
request->status(), request->output());

ev_unref(EV_DEFAULT_UC);
self->Unref();
}
}

static void DoHandleCallbacks2(EV_P_ ev_async *evt, int revents) {
DoHandleCallbacks(0);
}

void DoProcess() {
Request *request;

while (ReentrantPop(request)) {
while (ReentrantPop(requestsQueue_, requestsMutex_, request)) {
switch (request->kind()) {
case RWrite:
request->setResult(
Expand All @@ -277,20 +284,23 @@ class ZipLib : ObjectWrap {
}
}

pthread_mutex_lock(&mutex_);
pthread_mutex_lock(&requestsMutex_);
processorActive_ = false;
pthread_mutex_unlock(&mutex_);
pthread_mutex_unlock(&requestsMutex_);

ev_async_send(callbackNotify_);
}

bool ReentrantPop(Request*& request) {
static bool ReentrantPop(Queue<Request*> &queue, pthread_mutex_t &mutex,
Request*& request) {
request = 0;

pthread_mutex_lock(&mutex_);
bool result = self->queue_.length() != 0;
pthread_mutex_lock(&mutex);
bool result = queue.length() != 0;
if (result) {
request = self->queue_.Pop();
request = queue.Pop();
}
pthread_mutex_unlock(&mutex_);
pthread_mutex_unlock(&mutex);

return result;
}
Expand All @@ -300,7 +310,15 @@ class ZipLib : ObjectWrap {
ZipLib()
: ObjectWrap(), state_(Self::Idle), processorActive_(false)
{
mutex_ = PTHREAD_MUTEX_INITIALIZER;
requestsMutex_ = PTHREAD_MUTEX_INITIALIZER;

// Lazy init. Safe to do it here as this always happen in JS-thread.
if (!callbackInitialized_) {
callbackMutex_ = PTHREAD_MUTEX_INITIALIZER;
callbackNotify_ = ev_async_init(EV_DEFAULT_UC_ Self::DoHandleCallbacks2);
ev_async_start(&callbackNotify_);
ev_unref();
}
}


Expand Down Expand Up @@ -435,8 +453,14 @@ class ZipLib : ObjectWrap {
private:
Processor processor_;
State state_;
pthread_mutex_t mutex_;
Queue<Request*> queue_;

pthread_mutex_t requestsMutex_;
Queue<Request*> requestsQueue_;

static bool callbackInitialized_ = false;
static pthread_mutex_t callbackMutex_;
static Queue<Request*> callbackQueue_;
static ev_async callbackNotify_;

volatile bool processorActive_;
};
Expand Down

0 comments on commit 5c935f2

Please sign in to comment.