Skip to content

Commit

Permalink
stats(), and compression/conversion fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mnunberg committed Jan 22, 2012
1 parent 2a3a82d commit 4e47012
Show file tree
Hide file tree
Showing 9 changed files with 500 additions and 43 deletions.
147 changes: 139 additions & 8 deletions Client.xs
Expand Up @@ -113,7 +113,53 @@ if( (tmp = av_fetch(options, opt_idx, 0)) && SvTRUE(*tmp) ) { \

static inline void initialize_conversion_settings(PLCB_t *object, AV *options)
{

SV **tmpsv;
AV *methav;
int dummy;

#define meth_assert_getpairs(flag, optidx) \
((object->my_flags & flag) \
? \
(((tmpsv = av_fetch(options, optidx, 0)) && SvROK(*tmpsv) && \
(methav = (AV*)SvRV(*tmpsv))) ? 1 : \
(void*)die(\
"Flag %s specified but no methods provided", #flag)) \
: 0)

#define meth_assert_assign(target_field, source_idx, diemsg) \
if((tmpsv = av_fetch(methav, source_idx, 0)) == NULL) { \
die("Nothing in IDX=%d (%s)", source_idx, diemsg); \
} \
if(! ((SvROK(*tmpsv) && SvTYPE(SvRV(*tmpsv)) == SVt_PVCV) ) ) { \
die("Expected CODE reference at IDX=%d: %s",source_idx, diemsg); \
} \
object->target_field = newRV_inc(SvRV(*tmpsv));


if( (tmpsv = av_fetch(options, PLCB_CTORIDX_MYFLAGS, 0))
&& SvIOK(*tmpsv)) {
object->my_flags = SvUV(*tmpsv);
}

if(meth_assert_getpairs(PLCBf_USE_COMPRESSION,
PLCB_CTORIDX_COMP_METHODS)) {
meth_assert_assign(cv_compress, 0, "Compression");
meth_assert_assign(cv_decompress, 1, "Decompression");
}

if(meth_assert_getpairs(PLCBf_USE_STORABLE,
PLCB_CTORIDX_SERIALIZE_METHODS)) {
meth_assert_assign(cv_serialize, 0, "Serialize");
meth_assert_assign(cv_deserialize, 1, "Deserialize");

}

if( (tmpsv = av_fetch(options, PLCB_CTORIDX_COMP_THRESHOLD, 0))
&& SvIOK(*tmpsv)) {
object->compress_threshold = SvIV(*tmpsv);
} else {
object->compress_threshold = 0;
}
}

static void PLCB_cleanup(PLCB_t *object)
Expand All @@ -126,6 +172,14 @@ static void PLCB_cleanup(PLCB_t *object)
SvREFCNT_dec(object->errors);
object->errors = NULL;
}
#define _free_cv(fld) \
if(object->fld) { \
SvREFCNT_dec(object->fld); object->fld = NULL; \
}
_free_cv(cv_compress); _free_cv(cv_decompress);
_free_cv(cv_serialize); _free_cv(cv_deserialize);
#undef _free_cv

}

/*Construct a new libcouchbase object*/
Expand All @@ -147,8 +201,6 @@ SV *PLCB_construct(const char *pkg, AV *options)
die("Failed to create instance");
}



Newxz(object, 1, PLCB_t);
object->instance = instance;
object->errors = newAV();
Expand All @@ -157,7 +209,9 @@ SV *PLCB_construct(const char *pkg, AV *options)
}

initialize_conversion_settings(object, options);


libcouchbase_set_cookie(instance, object);

if(libcouchbase_connect(instance) == LIBCOUCHBASE_SUCCESS) {
libcouchbase_wait(instance);
}
Expand Down Expand Up @@ -208,7 +262,7 @@ static SV *PLCB_set_common(SV *self,

/*Clear existing error status first*/
av_clear(object->errors);
ret_av = newAV();
ret_av = newAV();

exp = exp_offset ? time(NULL) + exp_offset : 0;

Expand Down Expand Up @@ -284,7 +338,6 @@ static SV *PLCB_get_common(SV *self, SV *key, int exp_offset)
time_t *exp_arg;

mk_instance_vars(self, instance, object);

plcb_get_str_or_die(key, skey, klen, "Key");

ret_av = newAV();
Expand Down Expand Up @@ -379,6 +432,57 @@ SV *PLCB_remove(SV *self, SV *key, uint64_t cas)
bless_return(object, ret_rv, ret_av);
}

SV *PLCB_stats(SV *self, AV *stats)
{
libcouchbase_t instance;
PLCB_t *object;
char *skey;
STRLEN nkey;
int curidx;
libcouchbase_error_t err;

SV *ret_hvref;
SV **tmpsv;

mk_instance_vars(self, instance, object);
if(object->stats_hv) {
die("Hrrm.. stats_hv should be NULL");
}

av_clear(object->errors);
object->stats_hv = newHV();
ret_hvref = newRV_noinc((SV*)object->stats_hv);

if(stats == NULL || (curidx = av_len(stats)) == -1) {
skey = NULL;
nkey = 0;
curidx = -1;
err = libcouchbase_server_stats(instance, NULL, NULL, 0);
if(err != LIBCOUCHBASE_SUCCESS) {
SvREFCNT_dec(ret_hvref);
ret_hvref = &PL_sv_undef;
}
libcouchbase_wait(instance);
} else {
for(; curidx >= 0; curidx--) {
tmpsv = av_fetch(stats, curidx, 0);
if(tmpsv == NULL
|| SvTYPE(*tmpsv) == SVt_NULL
|| (!SvPOK(*tmpsv))) {
continue;
}
skey = SvPV(*tmpsv, nkey);
err = libcouchbase_server_stats(instance, NULL, skey, nkey);
if(err == LIBCOUCHBASE_SUCCESS) {
libcouchbase_wait(instance);
}
}
}

object->stats_hv = NULL;
return ret_hvref;
}

/*Used for set/get/replace/add common interface*/
static libcouchbase_storage_t PLCB_XS_setmap[] = {
LIBCOUCHBASE_SET,
Expand Down Expand Up @@ -446,7 +550,9 @@ PLCB_set(self, key, value, ...)
OUTPUT:
RETVAL

SV *PLCB_arithmetic(self, key, ...)

SV *
PLCB_arithmetic(self, key, ...)
SV *self
SV *key

Expand Down Expand Up @@ -504,6 +610,7 @@ SV *PLCB_arithmetic(self, key, ...)
OUTPUT:
RETVAL


SV *
PLCB_cas(self, key, value, cas_sv, ...)
SV *self
Expand Down Expand Up @@ -554,5 +661,29 @@ PLCB_remove(self, key, ...)
OUTPUT:
RETVAL

SV *PLCB_get_errors(self)

SV *
PLCB_get_errors(self)
SV *self


SV *
PLCB_stats(self, ...)
SV *self

PREINIT:
SV *klist;

CODE:
if( items < 2 ) {
klist = NULL;
} else {
klist = ST(1);
if(! (SvROK(klist) && SvTYPE(SvRV(klist)) >= SVt_PVAV) ) {
die("Usage: stats( ['some', 'keys', ...] )");
}
}
RETVAL = PLCB_stats(self, (klist) ? (AV*)SvRV(klist) : NULL);

OUTPUT:
RETVAL
2 changes: 2 additions & 0 deletions Makefile.PL
Expand Up @@ -23,7 +23,9 @@ WriteMakefile(
'Test::More' => 0,
},
NEEDS_LINKING => 1,
OPTIMIZE => '-O0 -ggdb3',
LIBS => ['-L/sources/libcouchbase/.libs -lcouchbase'],
#LIBS => ['-lcouchbase'],
INC => '-I/sources/libcouchbase/include /sources/memcached/include',
dist => { COMPRESS => 'gzip -9f', SUFFIX => 'gz', },
clean => { FILES => 'Couchbase-Client-*' },
Expand Down
61 changes: 58 additions & 3 deletions callbacks.c
Expand Up @@ -3,6 +3,7 @@
#include "XSUB.h"

#include "perl-couchbase.h"
//#include <string.h>

void plcb_callback_get(
libcouchbase_t instance,
Expand Down Expand Up @@ -73,8 +74,7 @@ void plcb_callback_error(

object = (PLCB_t*)libcouchbase_get_cookie(instance);
av_push(object->errors,
newRV_noinc((SV*)av_make(2, elem_list))
);
newRV_noinc((SV*)av_make(2, elem_list)));
}

#ifdef PLCB_HAVE_CONNFAIL
Expand All @@ -100,6 +100,60 @@ static void keyop_callback(
NULL, 0, 0, 0);
}

static void stat_callback(
libcouchbase_t instance, const void *cookie,
const char *server,
libcouchbase_error_t err,
const void *stat_key, size_t nkey,
const void *bytes, size_t nbytes)
{
PLCB_t *object;
SV *server_sv, *data_sv, *key_sv;
dSP;



if(! (stat_key || bytes) ) {
warn("Got all statistics");
return;
}

server_sv = newSVpvn(server, strlen(server));
if(nkey) {
key_sv = newSVpvn(stat_key, nkey);
fprintf(stderr, "stat_callback(): ");
fwrite(stat_key, nkey, 1, stderr);
fprintf(stderr, "\n");
} else {
key_sv = newSVpvn("", 0);
}

if(nbytes) {
data_sv = newSVpvn(bytes, nbytes);
} else {
data_sv = newSVpvn("", 0);
}

object = (PLCB_t*)libcouchbase_get_cookie(instance);
if(!object->stats_hv) {
die("We have nothing to write our stats to!");
}

ENTER;
SAVETMPS;

PUSHMARK(SP);
XPUSHs(sv_2mortal(newRV_inc((SV*)object->stats_hv)));
XPUSHs(sv_2mortal(server_sv));
XPUSHs(sv_2mortal(key_sv));
XPUSHs(sv_2mortal(data_sv));
PUTBACK;

call_pv(PLCB_STATS_SUBNAME, G_DISCARD);
FREETMPS;
LEAVE;
}

void plcb_setup_callbacks(PLCB_t *object)
{
libcouchbase_t instance = object->instance;
Expand All @@ -113,6 +167,7 @@ void plcb_setup_callbacks(PLCB_t *object)
libcouchbase_set_touch_callback(instance, keyop_callback);
libcouchbase_set_remove_callback(instance, keyop_callback);
libcouchbase_set_arithmetic_callback(instance, arithmetic_callback);

libcouchbase_set_stat_callback(instance, stat_callback);

libcouchbase_set_cookie(instance, object);
}

0 comments on commit 4e47012

Please sign in to comment.