Skip to content

Commit

Permalink
get multi support
Browse files Browse the repository at this point in the history
  • Loading branch information
frsyuki committed Jun 27, 2009
1 parent 5b6871f commit dfa2745
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 36 deletions.
164 changes: 154 additions & 10 deletions src/gate_memtext_retrieval.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
#include "gate_memtext_impl.h"
#include "gate_memtext_retrieval.h"

#ifndef MEMTEXT_MULTI_MAX
#define MEMTEXT_MULTI_MAX 1024
#endif

namespace memxy {
namespace memtext {

Expand All @@ -41,16 +45,16 @@ static int request_get_single(void* user,
memcached_return err;
const char* const key = r->key[0];
size_t const keylen = r->key_len[0];
char* val;
size_t vallen;
uint32_t flags;
char* val;

{
proxy_client::ref mc( proxy_client::get() );
val = memcached_get(*mc, key, keylen, &vallen, &flags, &err);
}

if(err) {
if(err && err != MEMCACHED_NOTFOUND) {
send_error(fd, err);
return 0;
}
Expand All @@ -76,23 +80,163 @@ static int request_get_single(void* user,
p[0] = '\r'; p[1] = '\n'; p += 2;
}

struct iovec vb[3];
vb[0].iov_base = header;
vb[0].iov_len = p - header;
vb[1].iov_base = const_cast<char*>(val);
vb[1].iov_len = vallen;
vb[2].iov_base = const_cast<char*>("\r\nEND\r\n");
vb[2].iov_len = 7;
struct iovec vec[3];
vec[0].iov_base = header;
vec[0].iov_len = p - header;
vec[1].iov_base = const_cast<char*>(val);
vec[1].iov_len = vallen;
vec[2].iov_base = const_cast<char*>("\r\nEND\r\n");
vec[2].iov_len = 7;

core::writev(fd, vb, 3, &::free, header);
core::writev(fd, vec, 3, &::free, header);
return 0;
}


class multi_set {
public:
multi_set() : val(NULL), vallen(0), flags(0) { }
~multi_set() { ::free(val); }

char* val;
size_t vallen;
uint32_t flags;

private:
multi_set(const multi_set&);
};

class multi_set_carry {
public:
multi_set_carry(multi_set* set, size_t num, size_t buffer_size) :
m_alloc((char*)::malloc(sizeof(char**)*num + buffer_size)),
m_num(num)
{
if(!m_alloc) { throw std::bad_alloc(); }
for(size_t i=0; i < num; ++i) {
((char**)m_alloc)[i] = set[i].val;
set[i].val = NULL;
}
}

~multi_set_carry()
{
for(size_t i=0; i < m_num; ++i) {
::free( ((char**)m_alloc)[i] );
}
free(m_alloc);
}

char* buffer() { return m_alloc + sizeof(char**)*m_num; }

char* operator[] (size_t i) { return ((char**)m_alloc)[i]; }

private:
char* m_alloc;
size_t m_num;
multi_set_carry(const multi_set&);
};

static int request_get_multi(void* user,
memtext_command cmd,
memtext_request_retrieval* r,
bool require_cas)
{
int fd = CAST_USER(user);

if(r->key_num > MEMTEXT_MULTI_MAX) {
send_error(fd, MEMCACHED_CLIENT_ERROR);
return 0;
}

memcached_return err;
multi_set multi[r->key_num];
size_t found_keys = 0;
size_t total_keylen = 0;

{
proxy_client::ref mc( proxy_client::get() );
err = memcached_mget(*mc, r->key, r->key_len, r->key_num);

if(err) {
send_error(fd, err);
return 0;
}

for(unsigned int i=0; i < r->key_num; ++i) {
multi[i].val = memcached_fetch(*mc, r->key[i], &r->key_len[i],
&multi[i].vallen, &multi[i].flags, &err);

if(err) {
if(err == MEMCACHED_NOTFOUND) { continue; }
if(err == MEMCACHED_END) { break; }
send_error(fd, err);
return 0;
}

if(multi[i].val) {
++found_keys;
total_keylen += r->key_len[i];
}
}
}

if(found_keys == 0) {
send_static(fd, "END\r\n");
return 0;
}

std::auto_ptr<multi_set_carry> carry( new multi_set_carry(
multi, sizeof(multi)/sizeof(multi_set),
found_keys*HEADER_SIZE(0) + total_keylen) );

char* header = carry->buffer();
char* p = header;
struct iovec vec[found_keys*2 + 1]; // +1: last END
struct iovec* pv = vec;

unsigned int i;
for(i=0; i < r->key_num; ++i) {
if( (*carry)[i] != NULL ) {
break;
}
}

pv->iov_base = p;
memcpy(p, "VALUE ", 6); p += 6;
goto header_set;

for(; i < r->key_num; ++i) {
if( (*carry)[i] == NULL ) {
continue;
}

pv->iov_base = p;
memcpy(p, "\r\nEND\r\nVALUE ", 13); p += 13;
header_set:
memcpy(p, r->key[i], r->key_len[i]); p += r->key_len[i];
p += sprintf(p, " %"PRIu32" %lu", multi[i].flags, multi[i].vallen);
if(require_cas) {
// FIXME cas
p += sprintf(p, " %"PRIu64"\r\n", (uint64_t)0);
} else {
p[0] = '\r'; p[1] = '\n'; p += 2;
}
pv->iov_len = p - header;
header = p;
++pv;

pv->iov_base = (*carry)[i];
pv->iov_len = multi[i].vallen;
++pv;
}

pv->iov_base = (void*)"\r\nEND\r\n";
pv->iov_len = 7;
++pv;

core::writev(fd, vec, sizeof(vec)/sizeof(iovec), carry);

return 0;
}

Expand Down
34 changes: 17 additions & 17 deletions src/memproto/memtext.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,42 +54,42 @@ typedef enum {


typedef struct {
const char** key;
unsigned* key_len;
char** key;
size_t* key_len;
unsigned key_num;
} memtext_request_retrieval;

typedef struct {
const char* key;
unsigned key_len;
const char* data;
unsigned data_len;
char* key;
size_t key_len;
char* data;
size_t data_len;
unsigned short flags;
uint32_t exptime;
bool noreply;
} memtext_request_storage;

typedef struct {
const char* key;
unsigned key_len;
const char* data;
unsigned data_len;
char* key;
size_t key_len;
char* data;
size_t data_len;
unsigned short flags;
uint32_t exptime;
bool noreply;
uint64_t cas_unique;
} memtext_request_cas;

typedef struct {
const char* key;
unsigned key_len;
char* key;
size_t key_len;
uint32_t exptime;
bool noreply;
} memtext_request_delete;

typedef struct {
const char* key;
unsigned key_len;
char* key;
size_t key_len;
uint64_t value;
bool noreply;
} memtext_request_numeric;
Expand Down Expand Up @@ -138,7 +138,7 @@ typedef struct {
memtext_command command;

size_t key_pos[MEMTEXT_MAX_MULTI_GET];
unsigned int key_len[MEMTEXT_MAX_MULTI_GET];
size_t key_len[MEMTEXT_MAX_MULTI_GET];
unsigned int keys;

size_t flags;
Expand All @@ -148,15 +148,15 @@ typedef struct {
uint64_t cas_unique;

size_t data_pos;
unsigned int data_len;
size_t data_len;

memtext_callback callback;

void* user;
} memtext_parser;

void memtext_init(memtext_parser* ctx, memtext_callback* callback, void* user);
int memtext_execute(memtext_parser* ctx, const char* data, size_t len, size_t* off);
int memtext_execute(memtext_parser* ctx, char* data, size_t len, size_t* off);

#ifdef __cplusplus
}
Expand Down
12 changes: 6 additions & 6 deletions src/memproto/memtext.rl
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
CALLBACK(cb, memtext_callback_retrieval);
if(cb) {
memtext_request_retrieval req = {
(const char**)ctx->key_pos,
(char**)ctx->key_pos,
ctx->key_len,
ctx->keys
};
Expand Down Expand Up @@ -301,17 +301,17 @@ void memtext_init(memtext_parser* ctx, memtext_callback* callback, void* user)
ctx->user = user;
}

int memtext_execute(memtext_parser* ctx, const char* data, size_t len, size_t* off)
int memtext_execute(memtext_parser* ctx, char* data, size_t len, size_t* off)
{
if(len <= *off) { return 0; }

const char* p = data + *off;
const char* pe = data + len;
const char* eof = pe;
char* p = data + *off;
char* pe = data + len;
char* eof = pe;
int cs = ctx->cs;
int top = ctx->top;
int* stack = ctx->stack;
const char* pos;
char* pos;
char numbuf[NUM_BUF_MAX+1];

//printf("execute, len:%lu, off:%lu\n", len, *off);
Expand Down
8 changes: 5 additions & 3 deletions src/mpsrc/wavy_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,11 @@ class thread_init {

void wait_init()
{
pthread_scoped_lock lk(m_init->start_mutex);
while(!m_init->started) {
m_init->start_cond.wait(m_init->start_mutex);
{
pthread_scoped_lock lk(m_init->start_mutex);
while(!m_init->started) {
m_init->start_cond.wait(m_init->start_mutex);
}
}
m_init.reset();
}
Expand Down

0 comments on commit dfa2745

Please sign in to comment.