Permalink
Browse files

Make compatible with new master

Refactored sync client to use start_event_loop directly instead of
libcouchbase_wait()

Added timers for async client
  • Loading branch information...
mnunberg committed Jan 27, 2012
1 parent 15f6c44 commit b7fd258936059c5722e204ced45a9a1ea483f75c
Showing with 281 additions and 104 deletions.
  1. +44 −34 Client.xs
  2. +1 −0 async.c
  3. +111 −18 async_events.c
  4. +33 −25 callbacks.c
  5. +1 −0 idx_constants.pl
  6. +3 −0 lib/Couchbase/Client/Async.pm
  7. +5 −4 lib/Couchbase/Client/IDXConst_const.pm
  8. +19 −8 perl-couchbase-async.h
  9. +19 −13 perl-couchbase.h
  10. +45 −2 t/CouchAsync.pm
View
@@ -1,12 +1,14 @@
#include "perl-couchbase.h"
#include "plcb-util.h"
+#include <libcouchbase/libevent_io_opts.h>
+static inline void
+wait_for_single_response(PLCB_t *object)
+{
+ object->npending = 1;
+ object->io_ops->run_event_loop(object->io_ops);
+}
-//#define libcouchbase_wait(x) \
-// libcouchbase_flush_buffers(x, NULL);
-
-#define libcouchbase_behavior_set_syncmode(x, y)
-
static void PLCB_cleanup(PLCB_t *object)
{
if(object->instance) {
@@ -31,32 +33,40 @@ static void PLCB_cleanup(PLCB_t *object)
SV *PLCB_construct(const char *pkg, AV *options)
{
libcouchbase_t instance;
- libcouchbase_error_t oprc;
+ libcouchbase_error_t err;
+ struct libcouchbase_io_opt_st *io_ops;
SV *blessed_obj;
PLCB_t *object;
char *host = NULL, *username = NULL, *password = NULL, *bucket = NULL;
plcb_ctor_cbc_opts(options,
&host, &username, &password, &bucket);
-
- instance = libcouchbase_create(host, username, password, bucket, NULL);
+
+
+ io_ops = libcouchbase_create_io_ops(
+ LIBCOUCHBASE_IO_OPS_DEFAULT, NULL, &err);
+ if(err != LIBCOUCHBASE_SUCCESS) {
+ die("Couldn't create new IO operations: %d", err);
+ }
+
+ //io_ops = libcouchbase_create_libevent_io_opts(NULL);
+ instance = libcouchbase_create(host, username, password, bucket, io_ops);
if(!instance) {
die("Failed to create instance");
}
Newxz(object, 1, PLCB_t);
+ object->io_ops = io_ops;
plcb_ctor_conversion_opts(object, options);
plcb_ctor_init_common(object, instance);
-
+
libcouchbase_set_cookie(instance, object);
- libcouchbase_behavior_set_syncmode(instance, LIBCOUCHBASE_SYNCHRONOUS);
if(libcouchbase_connect(instance) == LIBCOUCHBASE_SUCCESS) {
libcouchbase_wait(instance);
- warn("Connected!");
}
plcb_setup_callbacks(object);
@@ -73,7 +83,7 @@ SV *PLCB_construct(const char *pkg, AV *options)
inst_name = obj_name->instance;
#define bless_return(object, rv, av) \
- return plcb_ret_blessed_rv(object, av);
+ return plcb_ret_blessed_rv(object, av);
static SV *PLCB_set_common(SV *self,
@@ -98,7 +108,7 @@ static SV *PLCB_set_common(SV *self,
plcb_get_str_or_die(value, sval, vlen, "Value");
syncp = &(object->sync);
- plcb_sync_initialize(syncp, self, skey, klen);
+ plcb_sync_initialize(syncp, object, skey, klen);
/*Clear existing error status first*/
@@ -115,9 +125,7 @@ static SV *PLCB_set_common(SV *self,
if(err != LIBCOUCHBASE_SUCCESS) {
plcb_ret_set_err(object, ret_av, err);
} else {
- warn("Waiting..");
- libcouchbase_wait(instance);
- warn("Done!");
+ wait_for_single_response(object);
plcb_ret_set_err(object, ret_av, syncp->err);
}
bless_return(object, ret_rv, ret_av);
@@ -147,21 +155,19 @@ static SV *PLCB_arithmetic_common(SV *self,
plcb_get_str_or_die(key, skey, nkey, "Key");
syncp = &(object->sync);
- plcb_sync_initialize(syncp, self, skey, nkey);
+ plcb_sync_initialize(syncp, object, skey, nkey);
ret_av = newAV();
err = libcouchbase_arithmetic(
instance, syncp, skey, nkey, delta,
exp, do_create, initial
);
if(err != LIBCOUCHBASE_SUCCESS) {
- plcb_ret_set_err(object, ret_av, err);
+ plcb_ret_set_err(object, ret_av, err);
} else {
- if(!syncp->received) {
- libcouchbase_wait(instance);
- }
+ wait_for_single_response(object);
plcb_ret_set_err(object, ret_av, syncp->err);
-
+
if(syncp->err == LIBCOUCHBASE_SUCCESS) {
plcb_ret_set_numval(object, ret_av, syncp->arithmetic, syncp->cas);
}
@@ -188,7 +194,7 @@ static SV *PLCB_get_common(SV *self, SV *key, int exp_offset)
ret_av = newAV();
syncp = &(object->sync);
- plcb_sync_initialize(syncp, self, skey, klen);
+ plcb_sync_initialize(syncp, object, skey, klen);
av_clear(object->errors);
if(exp_offset) {
@@ -202,11 +208,12 @@ static SV *PLCB_get_common(SV *self, SV *key, int exp_offset)
exp_arg);
if(err != LIBCOUCHBASE_SUCCESS) {
- plcb_ret_set_err(object, ret_av, err);
+ plcb_ret_set_err(object, ret_av, err);
} else {
- libcouchbase_wait(instance);
- plcb_ret_set_err(object, ret_av, syncp->err);
- if(syncp->err == LIBCOUCHBASE_SUCCESS) {
+ wait_for_single_response(object);
+
+ plcb_ret_set_err(object, ret_av, syncp->err);
+ if(syncp->err == LIBCOUCHBASE_SUCCESS) {
plcb_ret_set_strval(
object, ret_av, syncp->value, syncp->nvalue,
syncp->store_flags, syncp->cas);
@@ -270,14 +277,15 @@ SV *PLCB_remove(SV *self, SV *key, uint64_t cas)
av_clear(object->errors);
syncp = &(object->sync);
- plcb_sync_initialize(syncp, self, skey, key_len);
+ plcb_sync_initialize(syncp, object, skey, key_len);
if( (err = libcouchbase_remove(instance, syncp, skey, key_len, cas))
!= LIBCOUCHBASE_SUCCESS) {
- plcb_ret_set_err(object, ret_av, err);
+ plcb_ret_set_err(object, ret_av, err);
} else {
- libcouchbase_wait(instance);
- plcb_ret_set_err(object, ret_av, syncp->err);
+ wait_for_single_response(object);
+
+ plcb_ret_set_err(object, ret_av, syncp->err);
}
bless_return(object, ret_rv, ret_av);
}
@@ -312,7 +320,9 @@ SV *PLCB_stats(SV *self, AV *stats)
SvREFCNT_dec(ret_hvref);
ret_hvref = &PL_sv_undef;
}
- libcouchbase_wait(instance);
+
+ wait_for_single_response(object);
+
} else {
for(; curidx >= 0; curidx--) {
tmpsv = av_fetch(stats, curidx, 0);
@@ -324,7 +334,7 @@ SV *PLCB_stats(SV *self, AV *stats)
skey = SvPV(*tmpsv, nkey);
err = libcouchbase_server_stats(instance, NULL, skey, nkey);
if(err == LIBCOUCHBASE_SUCCESS) {
- libcouchbase_wait(instance);
+ wait_for_single_response(object);
}
}
}
@@ -538,5 +548,5 @@ PLCB_stats(self, ...)
OUTPUT:
RETVAL
-
+
INCLUDE: Async.xs
View
@@ -375,6 +375,7 @@ extract_async_options(PLCBA_t *async, AV *options)
_assert_get_cv(CBEVMOD, cv_evmod, "update_event");
_assert_get_cv(CBERR, cv_err, "error");
_assert_get_cv(CBWAITDONE, cv_waitdone, "waitdone");
+ _assert_get_cv(CBTIMERMOD, cv_timermod, "update_timer");
if( (tmpsv = av_fetch(options, PLCBA_CTORIDX_BLESS_EVENT, 0)) ) {
if(SvTRUE(*tmpsv)) {
View
@@ -18,6 +18,43 @@
#endif
+static inline void
+plcb_call_sv_with_args_noret(SV *code,
+ int mortalize,
+ int nargs,
+ ...)
+{
+ va_list ap;
+ SV *cursv;
+
+ dSP;
+
+ ENTER;
+ SAVETMPS;
+ PUSHMARK(SP);
+ EXTEND(SP, nargs);
+
+ va_start(ap, nargs);
+ while(nargs) {
+ cursv = va_arg(ap, SV*);
+ if(mortalize) {
+ cursv = sv_2mortal(cursv);
+ }
+ PUSHs(cursv);
+ nargs--;
+ }
+ va_end(ap);
+
+ PUTBACK;
+
+ call_sv(code, G_DISCARD);
+
+
+ FREETMPS;
+ LEAVE;
+}
+
+
static void *create_event(plcba_cbcio *cbcio)
{
PLCBA_c_event *cevent;
@@ -27,6 +64,7 @@ static void *create_event(plcba_cbcio *cbcio)
Newxz(cevent, 1, PLCBA_c_event);
cevent->pl_event = newAV();
+ cevent->evtype = PLCBA_EVTYPE_IO;
av_store(cevent->pl_event, PLCBA_EVIDX_OPAQUE,
newSViv(PTR2IV(cevent)));
@@ -50,6 +88,7 @@ static void destroy_event(plcba_cbcio *cbcio, void *event)
PLCBA_c_event *cevent = (PLCBA_c_event*)event;
PLCBA_t *async = (PLCBA_t*)cbcio->cookie;
+ //warn("Event destruction requested");
if(cevent == async->cevents) {
if(cevent->next) {
@@ -92,18 +131,9 @@ modify_event_perl(PLCBA_t *async, PLCBA_c_event *cevent,
sv_setiv(*tmpsv, cevent->fd);
}
- ENTER;
- SAVETMPS;
- PUSHMARK(SP);
- EXTEND(SP, 3);
- mPUSHs(newRV_inc( (SV*)cevent->pl_event));
- mPUSHi(action);
- mPUSHi(flags);
- PUTBACK;
- call_sv(async->cv_evmod, G_DISCARD);
-
- FREETMPS;
- LEAVE;
+ plcb_call_sv_with_args_noret(async->cv_evmod, 1, 3,
+ newRV_inc( (SV*)(cevent->pl_event)),
+ newSViv(action), newSViv(flags));
/*set the current flags*/
if(action != PLCBA_EVACTION_SUSPEND && action != PLCBA_EVACTION_RESUME) {
@@ -170,21 +200,73 @@ static void delete_event(plcba_cbcio *cbcio,
}
-/*We need to resume watching on all events here*/
+/*
+ destroy_timer == destroy_event
+*/
+
+
+static void *create_timer(plcba_cbcio *cbcio)
+{
+ PLCBA_c_event *cevent = create_event(cbcio);
+ cevent->evtype = PLCBA_EVTYPE_TIMER;
+ //warn("Created timer %p", cevent);
+ return cevent;
+}
+static inline void
+modify_timer_perl(PLCBA_t *async,PLCBA_c_event *cevent,
+ uint32_t usecs, PLCBA_evaction_t action)
+{
+ SV **tmpsv;
+ dSP;
+ //warn("Calling cv_timermod");
+ plcb_call_sv_with_args_noret(async->cv_timermod,
+ 1, 3,
+ newRV_inc( (SV*)cevent->pl_event ),
+ newSViv(action), newSVuv(usecs));
+}
+static int update_timer(plcba_cbcio *cbcio,
+ void *event, uint32_t usecs,
+ void *cb_data,
+ plcba_c_evhandler handler)
+{
+ /*we cannot do any sane caching or clever magic like we do for I/O
+ watchers, because the time will always be different*/
+ PLCBA_c_event *cevent = (PLCBA_c_event*)event;
+
+ cevent->c.handler = handler;
+ cevent->c.arg = cb_data;
+
+ modify_timer_perl(cbcio->cookie, cevent, usecs, PLCBA_EVACTION_WATCH);
+ return 0;
+}
+
+static void delete_timer(plcba_cbcio *cbcio, void *event)
+{
+ PLCBA_c_event *cevent = (PLCBA_c_event*)event;
+ //warn("Deletion requested for timer!");
+ modify_timer_perl(cbcio->cookie, cevent, 0, PLCBA_EVACTION_UNWATCH);
+}
+
+
+/*We need to resume watching on all events here*/
static void run_event_loop(plcba_cbcio *cbcio)
{
PLCBA_t *async;
PLCBA_c_event *cevent;
async = (PLCBA_t*)cbcio->cookie;
+ //warn("Resuming events..");
for(cevent = async->cevents; cevent; cevent = cevent->next) {
- cevent->state = PLCBA_EVSTATE_ACTIVE;
- modify_event_perl(async, cevent, PLCBA_EVACTION_RESUME, cevent->flags);
+ if(cevent->evtype == PLCBA_EVTYPE_IO && cevent->fd > 0) {
+ cevent->state = PLCBA_EVSTATE_ACTIVE;
+ modify_event_perl(
+ async, cevent, PLCBA_EVACTION_RESUME, cevent->flags);
+ }
}
- warn("Running event loop...");
+ //warn("Running event loop...");
}
/*
@@ -205,10 +287,13 @@ static void stop_event_loop(plcba_cbcio *cbcio)
async = cbcio->cookie;
for(cevent = async->cevents; cevent; cevent = cevent->next) {
- cevent->state = PLCBA_EVSTATE_SUSPENDED;
- modify_event_perl(async, cevent, PLCBA_EVACTION_SUSPEND, -1);
+ if(cevent->evtype == PLCBA_EVTYPE_IO && cevent->fd > 0) {
+ cevent->state = PLCBA_EVSTATE_SUSPENDED;
+ modify_event_perl(async, cevent, PLCBA_EVACTION_SUSPEND, -1);
+ }
}
+ //warn("Calling cv_waitdone");
PUSHMARK(SP);
call_sv(async->cv_waitdone, G_DISCARD|G_NOARGS);
}
@@ -223,10 +308,18 @@ plcba_make_io_opts(PLCBA_t *async)
cbcio->cookie = async;
+ /* i/o events */
cbcio->create_event = create_event;
cbcio->destroy_event = destroy_event;
cbcio->update_event = update_event;
cbcio->delete_event = delete_event;
+
+ /* timer events */
+ cbcio->create_timer = create_timer;
+ cbcio->destroy_timer = destroy_event;
+ cbcio->delete_timer = delete_timer;
+ cbcio->update_timer = update_timer;
+
cbcio->run_event_loop = run_event_loop;
cbcio->stop_event_loop = stop_event_loop;
Oops, something went wrong.

0 comments on commit b7fd258

Please sign in to comment.