diff --git a/.cvsignore b/.gitignore similarity index 100% rename from .cvsignore rename to .gitignore diff --git a/Changes b/Changes index 98a1808..cb97610 100644 --- a/Changes +++ b/Changes @@ -1,5 +1,13 @@ Revision history for AnyEvent-Memcached +0.02_01 2009-12-18 + * Add gets/cas methods + * Add incget method + * Add options to rget + * Rewrite/enhance tests + * Remove excess dependencies + * Fix documentation + 0.02 2009-12-18 * First non-dev release diff --git a/MANIFEST b/MANIFEST index 82373aa..e5dd1eb 100644 --- a/MANIFEST +++ b/MANIFEST @@ -1,4 +1,4 @@ -.cvsignore +.gitignore Changes examples/test.pl inc/Module/AutoInstall.pm @@ -30,6 +30,8 @@ t/02-usage-memdb.t t/03-storable.t t/04-hashing.t t/05-hashing-with-next.t -t/99-dist.t t/check.pl +t/lib/Test/AE/MC.pm +t/lib/Test/AE/MD.pm t/pod.t +xt/99-dist.t diff --git a/MANIFEST.SKIP b/MANIFEST.SKIP index a4399a0..17f923d 100644 --- a/MANIFEST.SKIP +++ b/MANIFEST.SKIP @@ -29,4 +29,5 @@ ^dist/ ^makeall\.sh$ ^tmp/ +^t/lib/Test/AE/tdb.* ^AnyEvent-Memcached-.* \ No newline at end of file diff --git a/META.yml b/META.yml index 4641367..2aafbea 100644 --- a/META.yml +++ b/META.yml @@ -4,7 +4,6 @@ author: - 'Mons Anderson ' build_requires: ExtUtils::MakeMaker: 6.42 - Test::If: 0 Test::More: 0 Test::NoWarnings: 0 lib::abs: 0.90 @@ -22,19 +21,18 @@ no_index: - examples - inc - t + - xt provides: AnyEvent::Memcached: file: lib/AnyEvent/Memcached.pm - version: 0.02 + version: 0.02_01 requires: AnyEvent: 5.0 - AnyEvent::Connection: 0.02 - Devel::Leak::Cb: 0 - Object::Event: 1.101 + AnyEvent::Connection: 0.05 Storable: 0 String::CRC32: 0 common::sense: 2 perl: 5.8.8 resources: license: http://dev.perl.org/licenses/ -version: 0.02 +version: 0.02_01 diff --git a/Makefile.PL b/Makefile.PL index bc55804..fa07cb1 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -3,21 +3,19 @@ use inc::Module::Install; name 'AnyEvent-Memcached'; all_from 'lib/AnyEvent/Memcached.pm'; author 'Mons Anderson '; -version my $v = '0.02'; +version my $v = '0.02_01'; license 'perl'; build_requires 'Test::More'; -build_requires 'Test::If'; build_requires 'Test::NoWarnings'; -build_requires 'lib::abs', '0.90'; +build_requires 'lib::abs', '0.90'; requires 'common::sense', '2'; requires 'Storable'; -requires 'Object::Event', '1.101'; requires 'AnyEvent', '5.0'; -requires 'AnyEvent::Connection', '0.02'; +requires 'AnyEvent::Connection', '0.05'; requires 'String::CRC32'; -requires 'Devel::Leak::Cb'; +#requires 'Devel::Leak::Cb'; provides 'AnyEvent::Memcached' => { version => $v, diff --git a/README b/README index 205c67f..ab2b722 100644 --- a/README +++ b/README @@ -1,16 +1,6 @@ NAME AnyEvent::Memcached - AnyEvent memcached client -VERSION - Version 0.02 - -NOTICE - Interface is stabilized. Some features could be rewritten in future with - minor changes. - - Documentation is incomplete yet, look at tests, examples, sources for - now ;) - SYNOPSIS use AnyEvent::Memcached; @@ -20,6 +10,9 @@ SYNOPSIS compress_threshold => 10000, namespace => 'my-namespace:', + # May use another hashing algo: + hasher => 'AnyEvent::Memcached::Hash::WithNext', + cv => $cv, # AnyEvent->condvar: group callback ); @@ -32,13 +25,15 @@ SYNOPSIS shift or warn "Set failed: @_" } ); + # Single get $memd->get( 'key', cb => sub { my ($value,$err) = shift; $err and return warn "Get failed: @_"; warn "Value for key is $value"; } ); - $memd->mget( [ 'key1', 'key2' ], cb => sub { + # Multi-get + $memd->get( [ 'key1', 'key2' ], cb => sub { my ($values,$err) = shift; $err and return warn "Get failed: @_"; warn "Value for key1 is $values->{key1} and value for key2 is $values->{key2}" @@ -47,18 +42,60 @@ SYNOPSIS # Additionally there is rget (see memcachedb-1.2.1-beta) $memd->rget( 'fromkey', 'tokey', cb => sub { - my ($value,$err) = shift; + my ($values,$err) = shift; $err and warn "Get failed: @_"; + while (my ($key,$value) = each %$values) { + # ... + } } ); + # Rget with sorted responce values + $memd->rget( 'fromkey', 'tokey', rv => 'array' cb => sub { + my ($values,$err) = shift; + $err and warn "Get failed: @_"; + for (0 .. $#values/2) { + my ($key,$value) = @$values[$_*2,$_*2+1]; + } + } ); + +DESCRIPTION + Asyncronous "memcached/memcachedb" client for AnyEvent framework + +NOTICE + There is a notices in Cache::Memcached::AnyEvent related to this module. + They all has been fixed + + Prerequisites + We no longer need Object::Event and Devel::Leak::Cb. At all, the + dependency list is like in Cache::Memcached + AnyEvent + + Binary protocol + It seems to me, that usage of binary protocol from pure perl gives + very little advantage. So for now I don't implement it + + Unimplemented Methods + There is a note, that get_multi is not implementeted. In fact, it + was implemented by method "get", but the documentation was wrong. + + In general, this module follows the spirit of AnyEvent rather than + correspondence to Cache::Memcached interface. + METHODS new %args Currently supported options: servers =item namespace =item debug =item cv =item compress_threshold - =item compress_enable =item timeout =item noreply + =item compress_enable =item timeout =item hasher + If set, will use instance of this class for hashing instead of + default. For implementing your own hashing, see sources of + AnyEvent::Memcached::Hash and AnyEvent::Memcached::Hash::With::Next + + noreply If true, additional connection will established for noreply - commands. Beware, feature is under development and may be unstable + commands. + + cas If true, will enable cas/gets commands (since they are not suppotred + in memcachedb) set_servers Setup server list @@ -78,6 +115,28 @@ METHODS undef Error happens, see $err + cas( $key, $cas, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) ) + $memd->gets($key, cb => sub { + my $value = shift; + unless (@_) { # No errors + my ($cas,$val) = @$value; + # Change your value in $val + $memd->cas( $key, $cas, $value, cb => sub { + my $rc = shift; + if ($rc) { + # stored + } else { + # ... + } + }); + } + }) + + $rc is the same, as for "set" + + Store the $value on the server under the $key, but only if CAS value + associated with this key is equal to $cas. See also "gets" + add( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) ) Like "set", but only stores in memcache if the key doesn't already exist. @@ -99,18 +158,20 @@ METHODS get( $key, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) ) Retrieve the value for a $key. $key should be a scalar - mget( $keys : ARRAYREF, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) ) - NOT IMPLEMENTED YET + get( $keys : ARRAYREF, [cv => $cv], [ expire => $expire ], cb => $cb->( $values_hash, $err ) ) + Retrieve the values for a $keys. Return a hash with keys/values - Retrieve the values for a $keys. + gets( $key, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) ) + Retrieve the value and its CAS for a $key. $key should be a scalar. - get_multi : Alias for mget. - NOT IMPLEMENTED YET + $rc is a reference to an array [$cas, $value], or nothing for + non-existent key gets( $keys : ARRAYREF, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) ) - Retrieve the value and its CAS for a $key. $key should be a scalar. + Retrieve the values and their CAS for a $keys. - NOT IMPLEMENTED YET + $rc is a hash reference with $rc->{$key} is a reference to an array + [$cas, $value] delete( $key, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) ) Delete $key and its value from the cache. @@ -137,7 +198,7 @@ METHODS decr( $key, $decrement, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) ) Opposite to "incr" - rget( $from, $till, [ max => 100 ], [ '+left' => 1 ], [ '+right' => 1 ], [cv => $cv], cb => $cb->( $rc, $err ) ) + rget( $from, $till, [ max => 100 ], [ '+left' => 1 ], [ '+right' => 1 ], [cv => $cv], [ rv => 'array' ], cb => $cb->( $rc, $err ) ) Memcachedb 1.2.1-beta implements rget method, that allows to look through the whole storage @@ -158,13 +219,18 @@ METHODS max Maximum number of results to fetch. 100 is the maximum and is the default + rv If passed rv => 'array', then the return value will be arrayref with + values in order, returned by memcachedb. + + incadd ( $key, $increment, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) ) + Increment key, and if it not exists, add it with initial value. If add + fails, try again to incr or fail + destroy Shutdown object as much, as possible, incl cleaning of incapsulated objects BUGS - Since there is developer release, there may be a lot of bugs - Feature requests are welcome Bug reports are welcome diff --git a/dist/AnyEvent-Memcached-0.02_01.tar.gz b/dist/AnyEvent-Memcached-0.02_01.tar.gz new file mode 100644 index 0000000..18847e8 Binary files /dev/null and b/dist/AnyEvent-Memcached-0.02_01.tar.gz differ diff --git a/lib/AnyEvent/Memcached.pm b/lib/AnyEvent/Memcached.pm index eaff9aa..5faf07e 100644 --- a/lib/AnyEvent/Memcached.pm +++ b/lib/AnyEvent/Memcached.pm @@ -6,19 +6,9 @@ use 5.8.8; AnyEvent::Memcached - AnyEvent memcached client -=head1 VERSION - -Version 0.02 - -=head1 NOTICE - -Interface is stabilized. Some features could be rewritten in future with minor changes. - -Documentation is incomplete yet, look at tests, examples, sources for now ;) - =cut -our $VERSION = '0.02'; +our $VERSION = '0.02_01'; =head1 SYNOPSIS @@ -29,6 +19,9 @@ our $VERSION = '0.02'; debug => 1, compress_threshold => 10000, namespace => 'my-namespace:', + + # May use another hashing algo: + hasher => 'AnyEvent::Memcached::Hash::WithNext', cv => $cv, # AnyEvent->condvar: group callback ); @@ -42,13 +35,15 @@ our $VERSION = '0.02'; shift or warn "Set failed: @_" } ); + # Single get $memd->get( 'key', cb => sub { my ($value,$err) = shift; $err and return warn "Get failed: @_"; warn "Value for key is $value"; } ); - $memd->mget( [ 'key1', 'key2' ], cb => sub { + # Multi-get + $memd->get( [ 'key1', 'key2' ], cb => sub { my ($values,$err) = shift; $err and return warn "Get failed: @_"; warn "Value for key1 is $values->{key1} and value for key2 is $values->{key2}" @@ -57,10 +52,48 @@ our $VERSION = '0.02'; # Additionally there is rget (see memcachedb-1.2.1-beta) $memd->rget( 'fromkey', 'tokey', cb => sub { - my ($value,$err) = shift; + my ($values,$err) = shift; + $err and warn "Get failed: @_"; + while (my ($key,$value) = each %$values) { + # ... + } + } ); + + # Rget with sorted responce values + $memd->rget( 'fromkey', 'tokey', rv => 'array' cb => sub { + my ($values,$err) = shift; $err and warn "Get failed: @_"; + for (0 .. $#values/2) { + my ($key,$value) = @$values[$_*2,$_*2+1]; + } } ); +=head1 DESCRIPTION + +Asyncronous C client for L framework + +=head1 NOTICE + +There is a notices in L related to this module. They all has been fixed + +=over 4 + +=item Prerequisites + +We no longer need L and L. At all, the dependency list is like in L + L + +=item Binary protocol + +It seems to me, that usage of binary protocol from pure perl gives very little advantage. So for now I don't implement it + +=item Unimplemented Methods + +There is a note, that get_multi is not implementeted. In fact, it was implemented by method L, but the documentation was wrong. + +=back + +In general, this module follows the spirit of L rather than correspondence to L interface. + =cut use common::sense 2;m{ @@ -69,15 +102,14 @@ use warnings; }x; use Carp; -use Devel::Leak::Cb; use AnyEvent 5; +#use Devel::Leak::Cb; use AnyEvent::Socket; use AnyEvent::Handle; use AnyEvent::Connection; +use AnyEvent::Connection::Util; use AnyEvent::Memcached::Conn; -use base 'Object::Event'; -use String::CRC32; use Storable (); use AnyEvent::Memcached::Peer; @@ -96,10 +128,6 @@ BEGIN { $HAVE_ZLIB = eval "use Compress::Zlib (); 1;"; } -sub _hash($) { - return (crc32($_[0]) >> 16) & 0x7fff; -} - =head1 METHODS =head2 new %args @@ -115,9 +143,18 @@ Currently supported options: =item compress_threshold =item compress_enable =item timeout +=item hasher + +If set, will use instance of this class for hashing instead of default. +For implementing your own hashing, see sources of L and L + =item noreply -If true, additional connection will established for noreply commands. Beware, feature is under development and may be unstable +If true, additional connection will established for noreply commands. + +=item cas + +If true, will enable cas/gets commands (since they are not suppotred in memcachedb) =back @@ -127,7 +164,7 @@ sub new { my $self = bless {}, shift; my %args = @_; $self->{namespace} = exists $args{namespace} ? delete $args{namespace} : ''; - for (qw( debug cv compress_threshold compress_enable timeout noreply)) { + for (qw( debug cv compress_threshold compress_enable timeout noreply cas)) { $self->{$_} = exists $args{$_} ? delete $args{$_} : 0; } $self->{_bucker} = $args{bucker} || 'AnyEvent::Memcached::Buckets'; @@ -200,21 +237,10 @@ sub _handle_errors { } } -sub _p4k { - my $self = shift; - my $key = shift; - die "Should not be used (_p4k) at @{[ (caller)[1,2] ]}\n"; - my ($hv, $real_key) = ref $key ? - (int($key->[0]), $key->[1]) : - (_hash($key), $key); - my $bucket = $hv % $self->{bucketcount}; - return wantarray ? ( $self->{buckets}[$bucket], $real_key ) : $self->{buckets}[$bucket]; -} - sub _do { my $self = shift; - my $key = shift;utf8::encode($key) if utf8::is_utf8($key); - my $command = shift;utf8::encode($command) if utf8::is_utf8($command); + my $key = shift; utf8::decode($key) xor utf8::encode($key) if utf8::is_utf8($key); + my $command = shift; utf8::decode($command) xor utf8::encode($command) if utf8::is_utf8($command); my $worker = shift; # CODE my %args = @_; my $servers = $self->{hash}->servers($key); @@ -233,11 +259,12 @@ sub _do { if ($args{noreply}) { for my $srv ( keys %$servers ) { for my $real (@{ $servers->{$srv} }) { - my $cmd = sprintf($command, $real).' noreply'; + my $cmd = $command.' noreply'; + substr($cmd, index($cmd,'%s'),2) = $real; $self->{peers}{$srv}{nrc}->request($cmd); $self->{peers}{$srv}{lastnr} = $cmd; unless ($self->{peers}{$srv}{nrc}->handles('command')) { - $self->{peers}{$srv}{nrc}->reg_cb(command => cb { + $self->{peers}{$srv}{nrc}->reg_cb(command => sub { # cb { shift; warn "Got data from $srv noreply connection (while shouldn't): @_\nLast noreply command was $self->{peers}{$srv}{lastnr}\n"; }); @@ -250,7 +277,7 @@ sub _do { } $_ and $_->begin for $self->{cv}, $args{cv}; my $cv = AE::cv { - use Data::Dumper; + #use Data::Dumper; #warn Dumper $res,\%res,\%err; if ($res != -1) { $args{cb}($res); @@ -260,7 +287,7 @@ sub _do { $args{cb}($res{$key}); } else { - $args{cb}(undef, Dumper($err{$key})); + $args{cb}(undef, dumper($err{$key})); } #warn "cv end"; @@ -269,10 +296,11 @@ sub _do { for my $srv ( keys %$servers ) { for my $real (@{ $servers->{$srv} }) { $cv->begin; - my $cmd = sprintf $command, $real; + my $cmd = $command; + substr($cmd, index($cmd,'%s'),2) = $real; $self->{peers}{$srv}{con}->command( $cmd, - cb => cb { + cb => sub { # cb { if (defined( local $_ = shift )) { my ($ok,$fail) = $worker->($_); if (defined $ok) { @@ -299,11 +327,14 @@ sub _set { my $self = shift; my $cmd = shift; my $key = shift; + my $cas; + if ($cmd eq 'cas') { + $cas = shift; + } my $val = shift; my %args = @_; return $args{cb}(undef, "Readonly") if $self->{readonly}; #warn "cv begin"; - #(my $peer,$key) = $self->_p4k($key) or return $args{cb}(undef, "Peer dead???"); use bytes; # return bytes from length() @@ -333,8 +364,8 @@ sub _set { my $expire = int($args{expire} || 0); return $self->_do( $key, - "$cmd $self->{namespace}%s $flags $expire $len\015\012$val", - cb { + "$cmd $self->{namespace}%s $flags $expire $len".(defined $cas ? ' '.$cas : '')."\015\012$val", + sub { # cb { local $_ = shift; if ($_ eq 'STORED') { return 1 } elsif ($_ eq 'NOT_STORED') { return 0 } @@ -349,8 +380,6 @@ sub _set { my %err; my $res; my $cv = AE::cv { - use Data::Dumper; - warn Dumper $res,\%res,\%err; if ($res != -1) { $args{cb}($res); } @@ -359,7 +388,7 @@ sub _set { $args{cb}($res{$key}); } else { - $args{cb}(undef, Dumper($err{$key})); + $args{cb}(undef, dumper($err{$key})); } warn "cv end"; @@ -373,7 +402,7 @@ sub _set { $cv->begin; $self->{peers}{$srv}{con}->command( "$cmd $self->{namespace}$real $flags $expire $len\015\012$val", - cb => cb { + cb => sub { # cb { if (defined( local $_ = shift )) { if ($_ eq 'STORED') { $res{$real}{$srv} = 1; @@ -427,6 +456,28 @@ Error happens, see C<$err> =back +=head2 cas( $key, $cas, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) ) + + $memd->gets($key, cb => sub { + my $value = shift; + unless (@_) { # No errors + my ($cas,$val) = @$value; + # Change your value in $val + $memd->cas( $key, $cas, $value, cb => sub { + my $rc = shift; + if ($rc) { + # stored + } else { + # ... + } + }); + } + }) + +C<$rc> is the same, as for L + +Store the C<$value> on the server under the C<$key>, but only if CAS value associated with this key is equal to C<$cas>. See also L + =head2 add( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) ) Like C, but only stores in memcache if the key doesn't already exist. @@ -450,6 +501,11 @@ B command first appeared in memcached 1.2.4. =cut sub set { shift->_set( set => @_) } +sub cas { + my $self = shift; + unless ($self->{cas}) { shift;shift;my %args = @_;return $args{cb}(undef, "CAS not enabled") } + $self->_set( cas => @_) +} sub add { shift->_set( add => @_) } sub replace { shift->_set( replace => @_) } sub append { shift->_set( append => @_) } @@ -459,21 +515,21 @@ sub prepend { shift->_set( prepend => @_) } Retrieve the value for a $key. $key should be a scalar -=head2 mget( $keys : ARRAYREF, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) ) +=head2 get( $keys : ARRAYREF, [cv => $cv], [ expire => $expire ], cb => $cb->( $values_hash, $err ) ) -B +Retrieve the values for a $keys. Return a hash with keys/values -Retrieve the values for a $keys. +=head2 gets( $key, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) ) -=head2 get_multi : Alias for mget. +Retrieve the value and its CAS for a $key. $key should be a scalar. -B +C<$rc> is a reference to an array [$cas, $value], or nothing for non-existent key =head2 gets( $keys : ARRAYREF, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) ) -Retrieve the value and its CAS for a $key. $key should be a scalar. +Retrieve the values and their CAS for a $keys. -B +C<$rc> is a hash reference with $rc->{$key} is a reference to an array [$cas, $value] =cut @@ -491,17 +547,20 @@ sub _deflate { if ($_->{flags} & F_STORABLE) { eval{ $_->{data} = Storable::thaw($_->{data}); 1 } or delete $_->{data}; } - $_ = $_->{data}; + if (exists $_->{cas}) { + $_ = [$_->{cas},$_->{data}]; + } else { + $_ = $_->{data}; + } } return; } -sub get { +sub _get { my $self = shift; - my ($cmd) = (caller(0))[3] =~ /([^:]+)$/; + my $cmd = shift; my $keys = shift; my %args = @_; - #$self->{connected} or return $self->connect( $cmd => $keys, \%args ); my $array; if (ref $keys and ref $keys eq 'ARRAY') { $array = 1; @@ -512,8 +571,6 @@ sub get { my %res; my $cv = AE::cv { $self->_deflate(\%res); - #use Data::Dumper; - #warn Dumper \%res; $args{cb}( $array ? \%res : $res{ $keys } ); $_ and $_->end for $args{cv}, $self->{cv}; }; @@ -521,13 +578,19 @@ sub get { #warn "server for $key = $srv, $self->{peers}{$srv}"; $cv->begin; my $keys = join(' ',map "$self->{namespace}$_", @{ $servers->{$srv} }); - $self->{peers}{$srv}{con}->request( "get $keys" ); - $self->{peers}{$srv}{con}->reader( id => $srv.'+'.$keys, res => \%res, namespace => $self->{namespace}, cb => cb { + $self->{peers}{$srv}{con}->request( "$cmd $keys" ); + $self->{peers}{$srv}{con}->reader( id => $srv.'+'.$keys, res => \%res, namespace => $self->{namespace}, cb => sub { # cb { $cv->end; }); } return; } +sub get { shift->_get(get => @_) } +sub gets { + my $self = shift; + unless ($self->{cas}) { shift;my %args = @_;return $args{cb}(undef, "CAS not enabled") } + $self->_get(gets => @_) +} =head2 delete( $key, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) ) @@ -555,7 +618,7 @@ sub delete { return $self->_do( $key, "delete $self->{namespace}%s$time", - cb { + sub { # cb { local $_ = shift; if ($_ eq 'DELETED') { return 1 } elsif ($_ eq 'NOT_FOUND') { return 0 } @@ -564,32 +627,6 @@ sub delete { cb => $args{cb}, noreply => $args{noreply}, ); - (my $peer,$key) = $self->_p4k($key) or return $args{cb}(undef, "Peer dead???"); - - if ($args{noreply}) { - $self->{peers}{$peer}{con}->request("delete $self->{namespace}$key noreply"); - $args{cb}(1) if $args{cb}; - } else { - $_ and $_->begin for $self->{cv}, $args{cv}; - $self->{peers}{$peer}{con}->command( - "delete $self->{namespace}$key$time", - cb => cb { - if (defined( local $_ = shift )) { - if ($_ eq 'DELETED') { - $args{cb}(1); - } elsif ($_ eq 'NOT_FOUND') { - $args{cb}(0); - } else { - $args{cb}(undef,$_); - } - } else { - $args{cb}(undef, @_); - } - $_ and $_->end for $args{cv}, $self->{cv}; - } - ); - } - return; } *del = \&delete; *remove = \&delete; @@ -619,7 +656,7 @@ sub _delta { return $self->_do( $key, "$cmd $self->{namespace}%s $val", - cb { + sub { # cb { local $_ = shift; if ($_ eq 'NOT_FOUND') { return 0 } elsif (/^(\d+)$/) { return $1 eq '0' ? '0E0' : $_ } @@ -628,33 +665,6 @@ sub _delta { cb => $args{cb}, noreply => $args{noreply}, ); - (my $peer,$key) = $self->_p4k($key) or return $args{cb}(undef, "Peer dead???"); - if ($args{noreply}) { - $self->{peers}{$peer}{con}->request("$cmd $self->{namespace}$key $val noreply"); - $args{cb}(1) if $args{cb}; - } else { - $_ and $_->begin for $self->{cv}, $args{cv}; - $self->{peers}{$peer}{con}->command( - "$cmd $self->{namespace}$key $val", - cb => cb { - if (defined( local $_ = shift )) { - if ($_ eq 'NOT_FOUND') { - $args{cb}(undef); - } - elsif( /^(\d+)$/ ) { - $args{cb}($1 eq '0' ? '0E0' : $1); - } - else { - $args{cb}(undef,$_); - } - } else { - $args{cb}(undef, @_); - } - $_ and $_->end for $args{cv}, $self->{cv}; - } - ); - } - return; } sub incr { shift->_delta(@_) } sub decr { shift->_delta(@_) } @@ -669,7 +679,7 @@ sub decr { shift->_delta(@_) } # rget ($from,$till, '+left' => 1, '+right' => 0, max => 10, cb => sub { ... } ); -=head2 rget( $from, $till, [ max => 100 ], [ '+left' => 1 ], [ '+right' => 1 ], [cv => $cv], cb => $cb->( $rc, $err ) ) +=head2 rget( $from, $till, [ max => 100 ], [ '+left' => 1 ], [ '+right' => 1 ], [cv => $cv], [ rv => 'array' ], cb => $cb->( $rc, $err ) ) Memcachedb 1.2.1-beta implements rget method, that allows to look through the whole storage @@ -695,6 +705,10 @@ If true, then finishing key will be included in results. true by default Maximum number of results to fetch. 100 is the maximum and is the default +=item rv + +If passed rv => 'array', then the return value will be arrayref with values in order, returned by memcachedb. + =back =cut @@ -767,6 +781,45 @@ sub rget { return; } +=head2 incadd ( $key, $increment, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) ) + +Increment key, and if it not exists, add it with initial value. If add fails, try again to incr or fail + +=cut + +sub incadd { + my $self = shift; + my $key = shift; + my $val = shift; + my %args = @_; + $self->incr($key => $val, cb => sub { + if (my $rc = shift or @_) { + #if (@_) { + # warn("incr failed: @_"); + #} else { + # warn "incr ok"; + #} + $args{cb}($rc, @_); + } + else { + $self->add( $key, $val, cb => sub { + if ( my $rc = shift or @_ ) { + #if (@_) { + # warn("add failed: @_"); + #} else { + # warn "add ok"; + #} + $args{cb}($val, @_); + } + else { + #warn "add failed, try again"; + $self->incr_add($key,$val,%args); + } + }); + } + }); +} + =head2 destroy Shutdown object as much, as possible, incl cleaning of incapsulated objects @@ -792,8 +845,6 @@ sub DESTROY { =head1 BUGS -Since there is developer release, there may be a lot of bugs - Feature requests are welcome Bug reports are welcome @@ -809,7 +860,6 @@ Copyright 2009 Mons Anderson, all rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself. - =cut 1; # End of AnyEvent::Memcached diff --git a/lib/AnyEvent/Memcached/Conn.pm b/lib/AnyEvent/Memcached/Conn.pm index 65a1ee5..bce84b0 100644 --- a/lib/AnyEvent/Memcached/Conn.pm +++ b/lib/AnyEvent/Memcached/Conn.pm @@ -27,6 +27,10 @@ sub reader { undef $reader; $args{cb}( $result ); } + elsif (substr($_,0,5) eq 'ERROR') { + undef $reader; + $args{cb}( undef, $_ ); + } elsif (!length) { warn "Skip empty line"; $self->{h}->unshift_read( line => $reader); @@ -44,7 +48,7 @@ sub reader { my $v = { data => $data, flags => $flags, - $cas ? (cas => $cas) : (), + defined $cas ? (cas => $cas) : (), }; if ($ar) { push @$result, $key, $v; diff --git a/lib/AnyEvent/Memcached/Hash/WithNext.pm b/lib/AnyEvent/Memcached/Hash/WithNext.pm index e41e319..37750ba 100644 --- a/lib/AnyEvent/Memcached/Hash/WithNext.pm +++ b/lib/AnyEvent/Memcached/Hash/WithNext.pm @@ -1,8 +1,21 @@ package AnyEvent::Memcached::Hash::WithNext; +=head1 NAME + +AnyEvent::Memcached::Hash::WithNext - Hashing algorythm for AE::Memcached + +=head1 SYNOPSIS + + my $memd = AnyEvent::Memcached->new( + servers => [ "10.0.0.15:10001", "10.0.0.15:10002", "10.0.0.15:10003" ], + # ... + hasher => 'AnyEvent::Memcached::Hash::WithNext', + ); + $memd->set(key => "val", ...) # will put key on 2 servers + =head1 DESCRIPTION -Uses the same hashing, as default, but always put key to server, next after choosen. Rusult is twice-replicated data. Useful for usage with memcachdb +Uses the same hashing, as default, but always put key to server, next after choosen. Result is twice-replicated data. Useful for usage with memcachdb =cut @@ -24,4 +37,10 @@ sub peers { return $peers; } +=head1 AUTHOR + +Mons Anderson, C<< >> + +=cut + 1; \ No newline at end of file diff --git a/makeall.sh b/makeall.sh index bccf55d..3d392d2 100755 --- a/makeall.sh +++ b/makeall.sh @@ -4,18 +4,17 @@ MODULE=`perl -ne 'print $1 if m{all_from.+?([\w/.]+)}' Makefile.PL`; perl=perl $perl -v -rm -rf MANIFEST.bak MANIFEST Makefile.old && \ -echo > '_.tar.gz' && \ +rm -rf MANIFEST.bak Makefile.old && \ pod2text $MODULE > README && \ $perl -i -lpne 's{^\s+$}{};s{^ ((?: {8})+)}{" "x(4+length($1)/2)}se;' README && \ $perl Makefile.PL && \ -rm *.tar.gz && \ make manifest && \ +make && \ TEST_AUTHOR=1 make test && \ +TEST_AUTHOR=1 runprove 'xt/*.t' && \ make disttest && \ make dist && \ cp -f *.tar.gz dist/ && \ -perl tmp/cpants.pl && \ make clean && \ rm -rf MANIFEST.bak Makefile.old && \ echo "All is OK" diff --git a/t/01-usage-memd.t b/t/01-usage-memd.t index 5e46853..d8e5940 100644 --- a/t/01-usage-memd.t +++ b/t/01-usage-memd.t @@ -1,8 +1,9 @@ #!/usr/bin/env perl -w -use lib::abs; -our $noreply = 1; -our $testaddr = $ENV{MEMCACHED_SERVER} || "127.0.0.1:11211"; +use lib::abs 'lib','../lib';#, '../../AE-Cnn/lib'; +use Test::AE::MC; +use common::sense; + do + lib::abs::path('.').'/check.pl'; $@ and die; exit; require Test::NoWarnings; diff --git a/t/02-usage-memdb.t b/t/02-usage-memdb.t index 045d6f4..381e25b 100644 --- a/t/02-usage-memdb.t +++ b/t/02-usage-memdb.t @@ -1,8 +1,10 @@ #!/usr/bin/env perl -w -use lib::abs; -our $noreply = 0; -our $testaddr = $ENV{MEMCACHEDB_SERVER} || "127.0.0.1:21201"; # Default memcachedb port +use lib::abs 'lib','../lib';#, '../../AE-Cnn/lib'; +use Test::AE::MD; +use common::sense; + do + lib::abs::path('.').'/check.pl'; $@ and die; + exit; require Test::NoWarnings; diff --git a/t/03-storable.t b/t/03-storable.t index 633b140..69f7e32 100644 --- a/t/03-storable.t +++ b/t/03-storable.t @@ -1,53 +1,37 @@ #!/usr/bin/env perl -w -our $testaddr = $ENV{MEMCACHED_SERVER} || "127.0.0.1:21201"; # Default memcachedb port - +use lib::abs 'lib','../lib';#, '../../AE-Cnn/lib'; +use Test::AE::MC; use common::sense; -use lib::abs '../lib'; -use Test::More; -use AnyEvent::Impl::Perl; -use AnyEvent; -use AnyEvent::Socket; -use AnyEvent::Memcached; - -my ($host,$port) = split ':',$testaddr;$host ||= '127.0.0.1'; # allow *_SERVER=:port -$testaddr = join ':', $host,$port; - -alarm 10; -my $cv = AnyEvent->condvar; -$cv->begin(sub { $cv->send }); -$cv->begin; -my $cg;$cg = tcp_connect $host,$port, sub { - undef $cg; - @_ or plan skip_all => "No memcached instance running at $testaddr\n"; - diag "testing $testaddr"; +runtest { + my ($host,$port) = @_; + diag "testing $host : $port"; require Test::NoWarnings;Test::NoWarnings->import; plan tests => 5 + 1; - + my $cv = AE::cv; + my $memd = AnyEvent::Memcached->new( - servers => $testaddr, + servers => "$host:$port", cv => $cv, debug => 0, namespace => "AE::Memd::t/$$/" . (time() % 100) . "/", compress_enable => 1, compress_threshold => 1, # Almost everything is greater than 1 ); - + isa_ok($memd, 'AnyEvent::Memcached'); # Repeated structures will be compressed - $memd->set(key1 => { some => 'struct'x10 }, cb => sub { + $memd->set(key1 => { some => 'struct'x10, "\0" => "\1" }, cb => sub { ok(shift,"set key1") or diag " Error: @_"; $memd->get("key1", cb => sub { - is_deeply(shift, { some => 'struct'x10 }, "get key1") or diag " Error: @_"; + is_deeply(shift, { some => 'struct'x10, "\0" => "\1" }, "get key1") or diag " Error: @_"; }); }); - $memd->get("test", cb => sub { + $memd->get("test%s", cb => sub { ok !shift, 'no value'; ok !@_, 'no errors'; }); - $cv->end; #connect -}, sub { 1 }; - -$cv->end; -$cv->recv; + + $cv->recv; +}; diff --git a/t/check.pl b/t/check.pl index 9f686e5..ecc5412 100644 --- a/t/check.pl +++ b/t/check.pl @@ -1,40 +1,80 @@ use common::sense; -use lib::abs '../lib'; -use Test::More; -use AnyEvent::Impl::Perl; -use AnyEvent 5; -use AnyEvent::Socket; -use AnyEvent::Memcached; - -our $testaddr; -our $noreply; -my ($host,$port) = split ':',$testaddr;$host ||= '127.0.0.1'; # allow *_SERVER=:port -$testaddr = join ':', $host,$port; - -alarm 10; -my $cv;$cv = AE::cv { $cv->send; }; -#my $cv = AnyEvent->condvar; -#$cv->begin(sub { $cv->send }); - -my $memd; -$cv->begin; -my $cg;$cg = tcp_connect $host,$port, sub { - undef $cg; - @_ or plan skip_all => "No memcached instance running at $testaddr\n"; - diag "testing $testaddr"; +runtest { + my ($host,$port,%args) = @_; + my $cv;$cv = AE::cv; + diag "testing $host:$port"; require Test::NoWarnings;Test::NoWarnings->import; - plan tests => 27+1; + plan tests => 52+1; - $memd = AnyEvent::Memcached->new( - servers => [ $testaddr ], + my $memd = AnyEvent::Memcached->new( + servers => [ "$host:$port" ], cv => $cv, debug => 0, - noreply => $noreply, + %args, namespace => "AE::Memd::t/$$/" . (time() % 100) . "/", ); isa_ok($memd, 'AnyEvent::Memcached'); -#=for rem + $cv->begin; + $memd->set('cas2','val2',cb => sub { ok(shift,"set cas2 as val1") or diag " Error: @_"; }); + $memd->set('cas1','val1',cb => sub { + ok(shift,"set cas as val1") or diag " Error: @_"; + $memd->gets('cas1',cb => sub { + my $value = shift; + if ($value) { + ok $value, 'got result' or diag " Error: @_"; + is ref $value,'ARRAY', 'retval is array'; + is $value->[1], 'val1', 'value correct'; + # Now, break the value + $memd->set('cas1','val2',cb => sub { + ok(shift,"set cas as val2") or diag " Error: @_"; + $memd->cas('cas1', $value->[0], 'val3',cb => sub { + ok(!shift,"try cas as val3"); + ok(!@_, 'cas have no errors') or diag " Error: @_"; + $memd->gets('cas1',cb => sub { + ok my $value = shift, 'gets again'; + $memd->cas('cas1', $value->[0], 'val4',cb => sub { + ok(shift,"set cas as val4"); + ok(!@_, 'cas have no errors') or diag " Error: @_"; + + #Now, test 2 keys at once + $memd->gets(['cas1','cas2'], cb => sub { + ok my $values = shift, 'got gets* result' or diag " Error: @_"; + is ref $values, 'HASH', 'retval is hash'; + ok exists $values->{cas1}, 'have cas1'; + ok exists $values->{cas2}, 'have cas2'; + is ref $values->{cas1}, 'ARRAY', 'value 1 correct'; + is ref $values->{cas2}, 'ARRAY', 'value 2 correct'; + $memd->cas('cas1', $values->{cas1}[0], 'val5',cb => sub { + ok(shift,"set cas1 as val5"); + ok(!@_, 'cas1 have no errors') or diag " Error: @_"; + }); + $memd->cas('cas2', $values->{cas2}[0], 'val5',cb => sub { + ok(shift,"set cas2 as val5"); + ok(!@_, 'cas2 have no errors') or diag " Error: @_"; + }); + + }); + + }); + }); + }); + + }); + } else { + my $error = shift; + SKIP: { + if ($error =~ /not enabled/) { + skip "gets not enabled",19; + } else { + fail "gets failed"; + diag "$error"; + skip "gets failed",18; + } + } + } + }); + }); $memd->set("key1", "val1", cb => sub { ok(shift,"set key1 as val1") or diag " Error: @_"; $memd->get("key1", cb => sub { @@ -123,7 +163,6 @@ }); }); }); -#=cut $memd->replace("key-noexist", "bogus", cb => sub { ok(!shift , "replace key-noexist properly failed"); }); @@ -146,9 +185,20 @@ }); }); }); + $memd->incadd(iakey => 42, cb => sub { + is $_[0],42, 'incadd works as add'; + $memd->get(iakey => cb => sub { + is $_[0],42, 'incadd works as add (get check)'; + $memd->incadd(iakey => 42, cb => sub { + is $_[0], 42*2, 'incadd works as inc'; + $memd->get(iakey => cb => sub { + is $_[0],42*2, 'incadd works as inc (get check)'; + }); + }); + }); + }); + $cv->end; + $cv->recv; + $memd->destroy(); - $cv->end; #connect -}, sub { 1 }; - -$cv->recv; -$memd->destroy(); +}; diff --git a/t/lib/Test/AE/MC.pm b/t/lib/Test/AE/MC.pm new file mode 100644 index 0000000..08c7a45 --- /dev/null +++ b/t/lib/Test/AE/MC.pm @@ -0,0 +1,64 @@ +package #hide + Test::AE::MC; + +# Memcached test class + +use Test::More; +use AnyEvent::Impl::Perl; +use AnyEvent; +use AnyEvent::Socket; +BEGIN{ eval q{use AnyEvent::Memcached;1} or BAIL_OUT("$@") } +use common::sense; +use utf8; + +sub import { + *{caller().'::runtest'} = \&runtest; + @_ = 'Test::More'; + goto &{ Test::More->can('import') }; +} + +sub runtest(&) { + my $cx = shift; + my $code = sub { + alarm 10; + eval { + $cx->(@_,noreply => 1, cas => 1); + 1; + } or do { + warn "DIED $@"; + die "$@"; + } + + }; + my ($host,$port); + if (defined $ENV{MEMCACHED_SERVER}) { + my $testaddr = $ENV{MEMCACHED_SERVER}; + ($host,$port) = split ':',$testaddr;$host ||= '127.0.0.1'; # allow *_SERVER=:port + my $do; + my $cv = AE::cv; + $port; + my $cg;$cg = tcp_connect $host,$port, sub { + undef $cg; + @_ or plan skip_all => "No memcached instance running at $testaddr\n"; + $cv->send; #connect + }, sub { 1 }; + $cv->recv; + $code->($host,$port); + } else { + system("memcached -h > /dev/null") == 0 or plan skip_all => "Can't run memcached"; + eval q{use Test::TCP;1 } or plan skip_all => "No Test::TCP"; + $host = "127.0.0.1"; + test_tcp( + client => sub { + $port = shift; + $code->($host,$port); + }, + server => sub { + my $port = shift; + exec("memcached -l $host -p $port") or plan skip_all => "Can't run memcached"; + }, + ) + } +} + +1; diff --git a/t/lib/Test/AE/MD.pm b/t/lib/Test/AE/MD.pm new file mode 100644 index 0000000..d38793a --- /dev/null +++ b/t/lib/Test/AE/MD.pm @@ -0,0 +1,69 @@ +package #hide + Test::AE::MD; + +# MemcacheDB test class + +use AnyEvent::Impl::Perl; +use AnyEvent; +use AnyEvent::Socket; +use AnyEvent::Memcached; +use common::sense; +use utf8; +use Test::More; +use lib::abs; + +sub import { + *{caller().'::runtest'} = \&runtest; + @_ = 'Test::More'; + goto &{ Test::More->can('import') }; +} + +sub runtest(&) { + my $cx = shift; + my $code = sub { + alarm 10; + $cx->(@_,cas => 0, noreply => 0,); + }; + my ($host,$port); + if (defined $ENV{MEMCACHEDB_SERVER}) { + my $testaddr = $ENV{MEMCACHEDB_SERVER}; + ($host,$port) = split ':',$testaddr;$host ||= '127.0.0.1'; # allow *_SERVER=:port + my $do; + my $cv = AE::cv; + $port; + my $cg;$cg = tcp_connect $host,$port, sub { + undef $cg; + @_ or plan skip_all => "No memcachedb instance running at $testaddr\n"; + $cv->send; #connect + }, sub { 1 }; + $cv->recv; + $code->($host,$port); + } else { + system("memcachedb -h > /dev/null") == 0 or plan skip_all => "Can't run memcachedb"; + eval q{use Test::TCP;1} or plan skip_all => "No Test::TCP"; + $host = "127.0.0.1"; + my $db = lib::abs::path('tdb'); + $db .= '1' while -e $db; + mkdir $db or plan skip_all => "Can't create test db $db: $!"; + test_tcp( + client => sub { + $port = shift; + my $pid = shift; + $code->($host,$port); + kill TERM => $pid; + kill KILL => $pid; # Don't like to kill it, but should. + }, + server => sub { + my $port = shift; + close STDERR; + exec("memcachedb -l $host -p $port -H $db") or + plan skip_all => "Can't run memcachedb"; + }, + ); + unlink $_ for (<$db/*>); + rmdir $db; + + } +} + +1; diff --git a/t/pod.t b/t/pod.t index 17a14cc..67b4757 100644 --- a/t/pod.t +++ b/t/pod.t @@ -5,10 +5,9 @@ use warnings; use lib::abs '../lib'; use Test::More; -use Test::If - sub { chdir lib::abs::path '..' }, - 'Test::Pod 1.22', -; +BEGIN { + chdir lib::abs::path('..') and eval q{use Test::Pod 1.22; 1} or plan skip_all => "Prereq not met"; +} all_pod_files_ok(); exit 0; diff --git a/t/99-dist.t b/xt/99-dist.t similarity index 100% rename from t/99-dist.t rename to xt/99-dist.t