Skip to content

Commit

Permalink
one palloc to shortcut aggrageate
Browse files Browse the repository at this point in the history
  • Loading branch information
baverman committed Mar 13, 2013
1 parent 07c803e commit e4bec73
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 13 deletions.
16 changes: 16 additions & 0 deletions pghll--1.0.sql
Expand Up @@ -18,4 +18,20 @@ CREATE FUNCTION hll_merge2(bytea, bytea)
CREATE AGGREGATE hll_merge(bytea) (
sfunc = hll_merge2,
stype = bytea
);

CREATE FUNCTION hll_sum2(internal, bytea)
RETURNS internal
AS 'MODULE_PATHNAME', 'hll_sum'
LANGUAGE C;

CREATE FUNCTION hll_sum_fin(internal)
RETURNS bigint
AS 'MODULE_PATHNAME'
LANGUAGE C;

CREATE AGGREGATE hll_sum(bytea) (
sfunc = hll_sum2,
stype = internal,
finalfunc = hll_sum_fin
);
104 changes: 92 additions & 12 deletions pghll.c
Expand Up @@ -5,14 +5,21 @@

#include "postgres.h"
#include "fmgr.h"
#include "utils/memutils.h"

#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif

#define UBUF_LEN 16384
#define HLL_LEN 4096

static double alpha_MM(int count) {
typedef struct {
uint32_t value[HLL_LEN];
uint32_t state[HLL_LEN];
} dmerge_state;

static inline double alpha_MM(int count) {
return (0.7213 / (1.0 + 1.079 / count)) * count * count;
}

Expand All @@ -28,8 +35,8 @@ static double calc_sum_and_zeros(int count, uint32_t *data, int *zeros) {
int v = (value & (0x1f << shift)) >> shift;
sum += pow(2, -v);
if ( v == 0 ) {
zcount++;
}
zcount++;
}
}
}
*zeros = zcount;
Expand All @@ -54,7 +61,7 @@ static int64 cardinality(int count, uint32_t *data, int enable_long_range_correc
result = estimate;
}
}

return (int64)(result + 0.5);
}

Expand All @@ -70,38 +77,40 @@ static void merge_sets(int count, uint32_t *source, uint32_t *dest) {
int sv = (svalue & (0x1f << shift)) >> shift;
int dv = (dvalue & (0x1f << shift)) >> shift;
if ( sv > dv ) {
result += sv << shift;
result += sv << shift;
} else {
result += dv << shift;
}
}
}
dest[i] = result;
}
}

PG_FUNCTION_INFO_V1(hll_decode);
Datum hll_decode(PG_FUNCTION_ARGS);
Datum hll_decode(PG_FUNCTION_ARGS) {
bytea *data = PG_GETARG_BYTEA_P(0);
bytea *udata = (bytea *) palloc(UBUF_LEN);
uint32_t *body = (uint32_t *) VARDATA(udata);
int i;
uint32_t count;

uLongf dest_size = UBUF_LEN;
int res = uncompress((Bytef *) body, &dest_size, (Bytef *) VARDATA(data), VARSIZE(data));
if ( res != Z_OK ) {
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION), errmsg("can't decode value")));
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION), errmsg("can't decode value")));
}
SET_VARSIZE(udata, dest_size + VARHDRSZ);

count = dest_size / 4;
for(i = 0; i < count; i++) {
body[i] = ntohl(body[i]);
body[i] = ntohl(body[i]);
}
PG_RETURN_BYTEA_P(udata);
}

PG_FUNCTION_INFO_V1(hll_count);
Datum hll_count(PG_FUNCTION_ARGS);
Datum hll_count(PG_FUNCTION_ARGS) {
bytea *arg = PG_GETARG_BYTEA_P(0);
uint32_t *data = (uint32_t *) VARDATA(arg);
Expand All @@ -110,20 +119,91 @@ Datum hll_count(PG_FUNCTION_ARGS) {
}

PG_FUNCTION_INFO_V1(hll_merge);
Datum hll_merge(PG_FUNCTION_ARGS);
Datum hll_merge(PG_FUNCTION_ARGS) {
bytea *state;
uint32_t *sdata;

bytea *value = PG_GETARG_BYTEA_P(1);
uint32_t *vdata = (uint32_t *) VARDATA(value);

if ( PG_ARGISNULL(0) ) {
PG_RETURN_BYTEA_P(value);
} else {
state = PG_GETARG_BYTEA_P(0);
sdata = (uint32_t *) VARDATA(state);
sdata = (uint32_t *) VARDATA(state);
}

merge_sets(1 << sdata[0], vdata + 2, sdata + 2);
PG_RETURN_BYTEA_P(state);
}

PG_FUNCTION_INFO_V1(hll_sum);
Datum hll_sum(PG_FUNCTION_ARGS);
Datum hll_sum(PG_FUNCTION_ARGS) {
MemoryContext aggctx;
MemoryContext tmpcontext;
MemoryContext oldcontext;

dmerge_state *state;
uint32_t *value;
uLongf dest_size = HLL_LEN * 4;
int unpack_res;
int count;
int i;

bytea *data = PG_GETARG_BYTEA_P(1);

if (!AggCheckCallContext(fcinfo, &aggctx))
ereport(ERROR,
(errcode(ERRCODE_DATA_EXCEPTION),
errmsg("hll_sum outside transition context")));


if ( PG_ARGISNULL(0) ) {
tmpcontext = AllocSetContextCreate(aggctx,
"hll_sum",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);

oldcontext = MemoryContextSwitchTo(tmpcontext);
state = (dmerge_state *) palloc(sizeof(dmerge_state));
MemoryContextSwitchTo(oldcontext);

value = state->state;
} else {
state = (dmerge_state *) PG_GETARG_POINTER(0);
value = state->value;
}

unpack_res = uncompress((Bytef *) value, &dest_size, (Bytef *) VARDATA(data), VARSIZE(data));
if ( unpack_res != Z_OK ) {
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION), errmsg("can't decode value")));
}

count = dest_size / 4;
for(i = 0; i < count; i++) {
value[i] = ntohl(value[i]);
}

if ( !PG_ARGISNULL(0) ) {
merge_sets(1 << state->state[0], value + 2, state->state + 2);
}

PG_RETURN_POINTER(state);
}

PG_FUNCTION_INFO_V1(hll_sum_fin);
Datum hll_sum_fin(PG_FUNCTION_ARGS);
Datum hll_sum_fin(PG_FUNCTION_ARGS) {
int64 result = 0;
dmerge_state *state;

if (!PG_ARGISNULL(0)) {
state = (dmerge_state *) PG_GETARG_POINTER(0);
result = cardinality(1 << state->state[0], state->state + 2, 1);
}

PG_RETURN_INT64(result);
}
3 changes: 2 additions & 1 deletion test.py
Expand Up @@ -35,6 +35,7 @@ def create_big_table(cursor):
#conn.commit()

s = time()
cursor.execute('SELECT hll_count(hll_merge(hll_decode(data))) from hll_big_test')
#cursor.execute('SELECT hll_count(hll_merge(hll_decode(data))) from hll_big_test')
cursor.execute('SELECT hll_sum(data) from hll_big_test')
print cursor.fetchone()
print time() - s

0 comments on commit e4bec73

Please sign in to comment.