Skip to content

Commit

Permalink
Backport binary TOUCH/GAT/GATQ commands
Browse files Browse the repository at this point in the history
Taken from the 1.6 branch, partly written by Trond. I hope the CAS handling is
correct.
  • Loading branch information
dormando committed Sep 27, 2011
1 parent 51c8f31 commit d87f568
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 4 deletions.
8 changes: 8 additions & 0 deletions items.c
Expand Up @@ -531,6 +531,14 @@ item *do_item_get(const char *key, const size_t nkey) {
return it;
}

item *do_item_touch(const char *key, size_t nkey, uint32_t exptime) {
item *it = do_item_get(key, nkey);
if (it != NULL) {
it->exptime = exptime;
}
return it;
}

/** returns an item whether or not it's expired. */
item *do_item_get_nocheck(const char *key, const size_t nkey) {
item *it = assoc_find(key, nkey);
Expand Down
1 change: 1 addition & 0 deletions items.h
Expand Up @@ -21,5 +21,6 @@ void do_item_flush_expired(void);

item *do_item_get(const char *key, const size_t nkey);
item *do_item_get_nocheck(const char *key, const size_t nkey);
item *do_item_touch(const char *key, const size_t nkey, uint32_t exptime);
void item_stats_reset(void);
extern pthread_mutex_t cache_lock;
91 changes: 91 additions & 0 deletions memcached.c
Expand Up @@ -166,6 +166,7 @@ static rel_time_t realtime(const time_t exptime) {
static void stats_init(void) {
stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = stats.reclaimed = 0;
stats.touch_cmds = stats.touch_misses = 0;
stats.curr_bytes = stats.listen_disabled_num = 0;
stats.accepting_conns = true; /* assuming we start in this state. */

Expand Down Expand Up @@ -1178,6 +1179,78 @@ static void complete_update_bin(conn *c) {
c->item = 0;
}

static void process_bin_touch(conn *c) {
item *it;

protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
char* key = binary_get_key(c);
size_t nkey = c->binary_header.request.keylen;
protocol_binary_request_touch *t = (void *)&c->binary_header;
uint32_t exptime = ntohl(t->message.body.expiration);

if (settings.verbose > 1) {
int ii;
/* May be GAT/GATQ/etc */
fprintf(stderr, "<%d TOUCH ", c->sfd);
for (ii = 0; ii < nkey; ++ii) {
fprintf(stderr, "%c", key[ii]);
}
fprintf(stderr, "\n");
}

it = item_touch(key, nkey, exptime);

if (it) {
/* the length has two unnecessary bytes ("\r\n") */
uint16_t keylen = 0;
uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2);

pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.touch_cmds++;
c->thread->stats.slab_stats[it->slabs_clsid].touch_hits++;
pthread_mutex_unlock(&c->thread->stats.mutex);

MEMCACHED_COMMAND_TOUCH(c->sfd, ITEM_key(it), it->nkey,
it->nbytes, ITEM_get_cas(it));

if (c->cmd == PROTOCOL_BINARY_CMD_TOUCH) {
bodylen -= it->nbytes - 2;
}

add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen);
rsp->message.header.response.cas = htonll(ITEM_get_cas(it));

// add the flags
rsp->message.body.flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
add_iov(c, &rsp->message.body, sizeof(rsp->message.body));

/* Add the data minus the CRLF */
if (c->cmd != PROTOCOL_BINARY_CMD_TOUCH) {
add_iov(c, ITEM_data(it), it->nbytes - 2);
}
conn_set_state(c, conn_mwrite);
/* Remember this command so we can garbage collect it later */
c->item = it;
} else {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.touch_cmds++;
c->thread->stats.touch_misses++;
pthread_mutex_unlock(&c->thread->stats.mutex);

MEMCACHED_COMMAND_TOUCH(c->sfd, key, nkey, -1, 0);

if (c->noreply) {
conn_set_state(c, conn_new_cmd);
} else {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
}
}

if (settings.detail_enabled) {
stats_prefix_record_get(key, nkey, NULL != it);
}
}

static void process_bin_get(conn *c) {
item *it;

Expand Down Expand Up @@ -1736,6 +1809,9 @@ static void dispatch_bin_command(conn *c) {
case PROTOCOL_BINARY_CMD_GETKQ:
c->cmd = PROTOCOL_BINARY_CMD_GETK;
break;
case PROTOCOL_BINARY_CMD_GATQ:
c->cmd = PROTOCOL_BINARY_CMD_GATQ;
break;
default:
c->noreply = false;
}
Expand Down Expand Up @@ -1837,6 +1913,15 @@ static void dispatch_bin_command(conn *c) {
protocol_error = 1;
}
break;
case PROTOCOL_BINARY_CMD_TOUCH:
case PROTOCOL_BINARY_CMD_GAT:
case PROTOCOL_BINARY_CMD_GATQ:
if (extlen == 4 && keylen != 0) {
bin_read_key(c, bin_reading_touch_key, 4);
} else {
protocol_error = 1;
}
break;
default:
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, bodylen);
}
Expand Down Expand Up @@ -2064,6 +2149,9 @@ static void complete_nread_binary(conn *c) {
case bin_reading_get_key:
process_bin_get(c);
break;
case bin_reading_touch_key:
process_bin_touch(c);
break;
case bin_reading_stat:
process_bin_stat(c);
break;
Expand Down Expand Up @@ -2413,6 +2501,7 @@ static void server_stats(ADD_STAT add_stats, conn *c) {
APPEND_STAT("cmd_get", "%llu", (unsigned long long)thread_stats.get_cmds);
APPEND_STAT("cmd_set", "%llu", (unsigned long long)slab_stats.set_cmds);
APPEND_STAT("cmd_flush", "%llu", (unsigned long long)thread_stats.flush_cmds);
APPEND_STAT("cmd_touch", "%llu", (unsigned long long)thread_stats.touch_cmds);
APPEND_STAT("get_hits", "%llu", (unsigned long long)slab_stats.get_hits);
APPEND_STAT("get_misses", "%llu", (unsigned long long)thread_stats.get_misses);
APPEND_STAT("delete_misses", "%llu", (unsigned long long)thread_stats.delete_misses);
Expand All @@ -2424,6 +2513,8 @@ static void server_stats(ADD_STAT add_stats, conn *c) {
APPEND_STAT("cas_misses", "%llu", (unsigned long long)thread_stats.cas_misses);
APPEND_STAT("cas_hits", "%llu", (unsigned long long)slab_stats.cas_hits);
APPEND_STAT("cas_badval", "%llu", (unsigned long long)slab_stats.cas_badval);
APPEND_STAT("touch_hits", "%llu", (unsigned long long)slab_stats.touch_hits);
APPEND_STAT("touch_misses", "%llu", (unsigned long long)thread_stats.touch_misses);
APPEND_STAT("auth_cmds", "%llu", (unsigned long long)thread_stats.auth_cmds);
APPEND_STAT("auth_errors", "%llu", (unsigned long long)thread_stats.auth_errors);
APPEND_STAT("bytes_read", "%llu", (unsigned long long)thread_stats.bytes_read);
Expand Down
10 changes: 9 additions & 1 deletion memcached.h
Expand Up @@ -162,7 +162,8 @@ enum bin_substates {
bin_reading_incr_header,
bin_read_flush_exptime,
bin_reading_sasl_auth,
bin_reading_sasl_auth_data
bin_reading_sasl_auth_data,
bin_reading_touch_key,
};

enum protocol {
Expand Down Expand Up @@ -201,6 +202,7 @@ typedef unsigned int rel_time_t;
struct slab_stats {
uint64_t set_cmds;
uint64_t get_hits;
uint64_t touch_hits;
uint64_t delete_hits;
uint64_t cas_hits;
uint64_t cas_badval;
Expand All @@ -215,6 +217,8 @@ struct thread_stats {
pthread_mutex_t mutex;
uint64_t get_cmds;
uint64_t get_misses;
uint64_t touch_cmds;
uint64_t touch_misses;
uint64_t delete_misses;
uint64_t incr_misses;
uint64_t decr_misses;
Expand All @@ -241,8 +245,11 @@ struct stats {
unsigned int conn_structs;
uint64_t get_cmds;
uint64_t set_cmds;
uint64_t touch_cmds;
uint64_t get_hits;
uint64_t get_misses;
uint64_t touch_hits;
uint64_t touch_misses;
uint64_t evictions;
uint64_t reclaimed;
time_t started; /* when the process was started */
Expand Down Expand Up @@ -476,6 +483,7 @@ item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbyt
char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes);
void item_flush_expired(void);
item *item_get(const char *key, const size_t nkey);
item *item_touch(const char *key, const size_t nkey, uint32_t exptime);
int item_link(item *it);
void item_remove(item *it);
int item_replace(item *it, item *new_it);
Expand Down
42 changes: 42 additions & 0 deletions protocol_binary.h
Expand Up @@ -105,6 +105,9 @@ extern "C"
PROTOCOL_BINARY_CMD_FLUSHQ = 0x18,
PROTOCOL_BINARY_CMD_APPENDQ = 0x19,
PROTOCOL_BINARY_CMD_PREPENDQ = 0x1a,
PROTOCOL_BINARY_CMD_TOUCH = 0x1c,
PROTOCOL_BINARY_CMD_GAT = 0x1d,
PROTOCOL_BINARY_CMD_GATQ = 0x1e,

PROTOCOL_BINARY_CMD_SASL_LIST_MECHS = 0x20,
PROTOCOL_BINARY_CMD_SASL_AUTH = 0x21,
Expand Down Expand Up @@ -381,6 +384,45 @@ extern "C"
*/
typedef protocol_binary_response_no_extras protocol_binary_response_stats;

/**
* Definition of the packet used by the touch command.
*/
typedef union {
struct {
protocol_binary_request_header header;
struct {
uint32_t expiration;
} body;
} message;
uint8_t bytes[sizeof(protocol_binary_request_header) + 4];
} protocol_binary_request_touch;

/**
* Definition of the packet returned from the touch command
*/
typedef protocol_binary_response_no_extras protocol_binary_response_touch;

/**
* Definition of the packet used by the GAT(Q) command.
*/
typedef union {
struct {
protocol_binary_request_header header;
struct {
uint32_t expiration;
} body;
} message;
uint8_t bytes[sizeof(protocol_binary_request_header) + 4];
} protocol_binary_request_gat;

typedef protocol_binary_request_gat protocol_binary_request_gatq;

/**
* Definition of the packet returned from the GAT(Q)
*/
typedef protocol_binary_response_get protocol_binary_response_gat;
typedef protocol_binary_response_get protocol_binary_response_gatq;

/**
* Definition of a request for a range operation.
* See http://code.google.com/p/memcached/wiki/RangeOps
Expand Down
3 changes: 2 additions & 1 deletion slabs.c
Expand Up @@ -377,7 +377,8 @@ static void do_slabs_stats(ADD_STAT add_stats, void *c) {
(unsigned long long)thread_stats.slab_stats[i].cas_hits);
APPEND_NUM_STAT(i, "cas_badval", "%llu",
(unsigned long long)thread_stats.slab_stats[i].cas_badval);

APPEND_NUM_STAT(i, "touch_hits", "%llu",
(unsigned long long)thread_stats.slab_stats[i].touch_hits);
total++;
}
}
Expand Down
43 changes: 42 additions & 1 deletion t/binary.t
Expand Up @@ -2,7 +2,7 @@

use strict;
use warnings;
use Test::More tests => 3376;
use Test::More tests => 3435;
use FindBin qw($Bin);
use lib "$Bin/lib";
use MemcachedTest;
Expand Down Expand Up @@ -41,6 +41,9 @@ use constant CMD_QUITQ => 0x17;
use constant CMD_FLUSHQ => 0x18;
use constant CMD_APPENDQ => 0x19;
use constant CMD_PREPENDQ => 0x1A;
use constant CMD_TOUCH => 0x1C;
use constant CMD_GAT => 0x1D;
use constant CMD_GATQ => 0x1E;

# REQ and RES formats are divided even though they currently share
# the same format, since they _could_ differ in the future.
Expand Down Expand Up @@ -237,6 +240,23 @@ is($mc->decr("x", 211), 0, "Floor is zero");
}
}

# diag "Touch commands";
{
$mc->flush;
$mc->set("totouch", "toast", 0, 1);
my $res = $mc->touch("totouch", 10);
sleep 2;
$check->("totouch", 0, "toast");

$mc->set("totouch", "toast2", 0, 1);
my ($flags, $val, $i) = $mc->gat("totouch", 10);
is($val, "toast2", "GAT returned correct value");
sleep 2;
$check->("totouch", 0, "toast2");

# Test miss as well
}

# diag "Silent set.";
$mc->silent_mutation(::CMD_SETQ, 'silentset', 'silentsetval');

Expand Down Expand Up @@ -681,6 +701,27 @@ sub get_multi {
return \%return;
}

sub touch {
my $self = shift;
my ($key, $expire) = @_;
my $extra_header = pack "N", $expire;
my $cas = 0;
return $self->_do_command(::CMD_TOUCH, $key, '', $extra_header, $cas);
}

sub gat {
my $self = shift;
my $key = shift;
my $expire = shift;
my $extra_header = pack "N", $expire;
my ($rv, $cas) = $self->_do_command(::CMD_GAT, $key, '', $extra_header);

my $header = substr $rv, 0, 4, '';
my $flags = unpack("N", $header);

return ($flags, $rv, $cas);
}

sub version {
my $self = shift;
return $self->_do_command(::CMD_VERSION, '', '');
Expand Down
2 changes: 1 addition & 1 deletion t/stats.t
Expand Up @@ -58,7 +58,7 @@ my $sock = $server->sock;
my $stats = mem_stats($sock);

# Test number of keys
is(scalar(keys(%$stats)), 39, "39 stats values");
is(scalar(keys(%$stats)), 42, "42 stats values");

# Test initial state
foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses
Expand Down

0 comments on commit d87f568

Please sign in to comment.