Skip to content

Commit

Permalink
Refactored messy and buggy conversion functions
Browse files Browse the repository at this point in the history
  • Loading branch information
mnunberg committed Jan 29, 2012
1 parent ffd6d9d commit a2b5db4
Showing 1 changed file with 131 additions and 110 deletions.
241 changes: 131 additions & 110 deletions convert.c
@@ -1,90 +1,149 @@
#include "perl-couchbase.h"
void plcb_convert_storage(
PLCB_t *object, SV **data_sv, STRLEN *data_len,
uint32_t *flags)


#define CONVERT_DIRECTION_OUT 1
#define CONVERT_DIRECTION_IN 2

static inline SV *
serialize_convert(SV *meth, SV *input, int direction)
{
dSP;
SV *ret;
int count;

SV *sv;
SV *compressed_sv;
SV *result_sv;
ENTER;
SAVETMPS;
PUSHMARK(SP);

XPUSHs(input);
PUTBACK;

if(direction == CONVERT_DIRECTION_OUT) {
count = call_sv(meth, G_SCALAR);
SPAGAIN;

/*for ouptut we must have this function succeed!*/
if(count != 1) {
croak("Serialization method returned nothing!");
}
ret = POPs;
} else {
count = call_sv(meth, G_SCALAR|G_EVAL);
SPAGAIN;

/*if someone has messed up our flags, don't die, but throw a warning*/
if(SvTRUE(ERRSV)) {
warn("Couldn't deserialize data: %s", SvPV_nolen(ERRSV));
ret = input;
} else {
if(count != 1) {
croak("Serialization method returned nothing?");
}
ret = POPs;
}
}

SvREFCNT_inc(ret);

FREETMPS;
LEAVE;
return ret;
}

static inline SV *
compression_convert(SV *meth, SV *input, int direction)
{
dSP;
int count;
SV *converted = newSV(0);
SV *ret;

ENTER;
SAVETMPS;

PUSHMARK(SP);
EXTEND(SP, 2);

PUSHs(sv_2mortal(newRV_inc(input)));
PUSHs(sv_2mortal(newRV_inc(converted)));
PUTBACK;

/*output conversion must succeed*/
if(direction == CONVERT_DIRECTION_OUT) {
if(call_sv(meth, G_SCALAR) != 1) {
croak("Compression method returned nothing");
}
SPAGAIN;

if(!SvTRUE(POPs)) {
croak("Compression method returned error status");
}
input = NULL;

} else {
count = call_sv(meth, G_SCALAR|G_EVAL);
SPAGAIN;

if(SvTRUE(ERRSV)) {
warn("Could not decompress input: %s", SvPV_nolen(ERRSV));
} else if (count == 0) {
warn("Decompression method didn't return anything");
} else if( (ret = POPs) && SvTRUE(ret) == 0 ){
warn("Decompression method returned error status");
} else if (!SvTRUE(converted)) {
warn("Decompression returned empty string");
} else {
input = NULL;
}
}
if(input != NULL) {
/*conversion failed*/
SvREFCNT_dec(converted);
converted = input;
}

FREETMPS;
LEAVE;

return converted;
}

void plcb_convert_storage(
PLCB_t *object, SV **data_sv, STRLEN *data_len,
uint32_t *flags)
{
SV *sv;

if(!(object->my_flags & PLCBf_DO_CONVERSION || SvROK(*data_sv) ) ) {
if(object->my_flags & PLCBf_DO_CONVERSION == 0 && SvROK(*data_sv) == 0) {
return;
}

sv = *data_sv;

if(SvROK(sv)) {
if(!(object->my_flags & PLCBf_USE_STORABLE)) {
die("Serialization not enabled "
"but we were passed a reference");
}
//warn("Serializing...");

ENTER;
SAVETMPS;

PUSHMARK(SP);
XPUSHs(sv);
PUTBACK;

count = call_sv(object->cv_serialize, G_SCALAR);

SPAGAIN;

if (count != 1) {
croak("Serialize method returned nothing");
}

sv = POPs;
SvREFCNT_inc(sv);
PUTBACK;

FREETMPS;
LEAVE;

*data_len = SvLEN(sv);
*data_sv = sv;
sv = serialize_convert(object->cv_serialize, sv,
CONVERT_DIRECTION_OUT);
plcb_storeflags_apply_serialization(object, *flags);
*data_len = SvCUR(sv); /*set this so compression method sees new length*/
}

if( (object->my_flags & PLCBf_USE_COMPRESSION)
&& object->compress_threshold
&& *data_len >= object->compress_threshold ) {

//warn("Compressing..");

compressed_sv = newSV(0);

PUSHMARK(SP);

XPUSHs(sv_2mortal(newRV_inc(sv)));
XPUSHs(sv_2mortal(newRV_inc(compressed_sv)));

PUTBACK;

count = call_sv(object->cv_compress, G_SCALAR);
sv = compression_convert(object->cv_compress, sv,
CONVERT_DIRECTION_OUT);
plcb_storeflags_apply_compression(object, *flags);
}

SPAGAIN;

if (count < 1) {
croak("Compress method returned nothing");
}

result_sv = POPs;
PUTBACK;

if (SvTRUE(result_sv)) {
sv = compressed_sv;
plcb_storeflags_apply_compression(object, *flags);
*data_len = SvLEN(sv);
*data_sv = sv;
} else {
SvREFCNT_dec(compressed_sv);
}
if(*data_sv != sv) {
*data_sv = sv;
*data_len = SvCUR(sv);
}
}

Expand All @@ -94,74 +153,36 @@ void plcb_convert_storage_free(PLCB_t *object, SV *data, uint32_t flags)
plcb_storeflags_has_serialization(object,flags) == 0) {
return;
}

SvREFCNT_dec(data);
}

SV* plcb_convert_retrieval(
PLCB_t *object, const char *data, size_t data_len, uint32_t flags)
{
SV *ret_sv, *input_sv;
dSP;

input_sv = newSVpvn(data, data_len);

if(!plcb_storeflags_has_conversion(object, flags)) {
return input_sv;
}

ret_sv = NULL;
ret_sv = input_sv;

if(plcb_storeflags_has_compression(object, flags)) {
ret_sv = newSV(0);
//warn("Decompressing..");
PUSHMARK(SP);
XPUSHs(sv_2mortal(newRV_inc(input_sv)));
XPUSHs(sv_2mortal(newRV_inc(ret_sv)));
PUTBACK;

if(call_sv(object->cv_decompress, G_SCALAR|G_EVAL) < 1) {
die("decompress method didn't return anything");
}
SPAGAIN;
if(!SvTRUE(POPs)) {
SvREFCNT_dec(ret_sv);
ret_sv = NULL;
} else {
//sv_dump(input_sv);
SvREFCNT_dec(input_sv);
input_sv = NULL;
}
ret_sv = compression_convert(object->cv_decompress, ret_sv,
CONVERT_DIRECTION_IN);
}

if(plcb_storeflags_has_serialization(object, flags)) {
if(ret_sv) {
/*if we decompressed, let the input_sv be the decompressed data
which we are about to deserialize*/
input_sv = ret_sv;
}
//warn("Deserializing..");
ret_sv = NULL;

ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(input_sv);
PUTBACK;

if(call_sv(object->cv_deserialize, G_SCALAR|G_EVAL) < 1) {
die("Deserialize method didn't return anything");
}
if(!SvTRUE(ERRSV)) {
ret_sv = POPs;
SvREFCNT_inc(ret_sv);
SvREFCNT_dec(input_sv);
input_sv = NULL;
} else {
ret_sv = input_sv;
warn(SvPV_nolen(ERRSV));
}
FREETMPS;
LEAVE;
ret_sv = serialize_convert(object->cv_deserialize, ret_sv,
CONVERT_DIRECTION_IN);
}

if(ret_sv != input_sv) {
SvREFCNT_dec(input_sv);
}

return ret_sv;
}

0 comments on commit a2b5db4

Please sign in to comment.