Skip to content
Browse files

Converted most of the code to Perl6 now

  • Loading branch information...
1 parent ca1041e commit 35404c7230b4c9290e383fd53a65d52a54909712 @cosimo committed Sep 24, 2010
Showing with 148 additions and 64 deletions.
  1. +148 −64 lib/Cache/Memcached.pm
View
212 lib/Cache/Memcached.pm
@@ -1,6 +1,9 @@
+use v6;
+use String::CRC32;
+
class Cache::Memcached:auth<cosimo>:ver<0.01>;
-=begin perl5
+=begin pod
use Storable ();
use Socket qw( MSG_NOSIGNAL PF_INET PF_UNIX IPPROTO_TCP SOCK_STREAM );
use IO::Handle ();
@@ -18,7 +21,7 @@ use fields qw{
parser_class
buck2sock
};
-=end
+=end pod
has Int $!debug is rw;
has Bool $!no_rehash is rw;
@@ -49,15 +52,15 @@ method F_COMPRESS () { return 2 }
# Size savings required before saving compressed value
method COMPRESS_SAVINGS () { return 0.20 } # percent
-=begin perl5
+=begin pod
use vars qw($VERSION $HAVE_ZLIB $FLAG_NOSIGNAL $HAVE_SOCKET6);
BEGIN {
$HAVE_ZLIB = eval "use Compress::Zlib (); 1;";
$HAVE_SOCKET6 = eval "use Socket6 qw(AF_INET6 PF_INET6); 1;";
}
my $HAVE_XS = eval "use Cache::Memcached::GetParserXS; 1;";
$HAVE_XS = 0 if $ENV{NO_XS};
-=end
+=end pod
our $VERSION = '0.01';
our $HAVE_ZLIB = 0;
@@ -71,7 +74,7 @@ my %host_dead; # host -> unixtime marked dead until
my %cache_sock; # host -> socket
my $PROTO_TCP;
-=begin perl5
+=begin pod
$self->{'debug'} = $args->{'debug'} || 0;
$self->{'no_rehash'} = $args->{'no_rehash'};
$self->{'stats'} = {};
@@ -90,13 +93,13 @@ my $PROTO_TCP;
return $self;
}
-=end
+=end pod
method set_pref_ip ($ip) {
$!pref_ip = $ip
}
-=begin perl5
+=begin pod
sub set_servers {
my Cache::Memcached $self = shift;
my ($list) = @_;
@@ -114,30 +117,30 @@ sub set_servers {
return $self;
}
-=end
+=end pod
method set_servers (Array @servers) {
unless @servers { @servers = () }
@!servers = @servers;
$!active = +@servers;
- @!buckets = undef;
+ @!buckets = Nil;
$!bucketcount = 0;
$.init_buckets();
@!buck2sock = ();
- $!_single_sock = undef;
+ $!_single_sock = Mu;
if @servers == 1 {
$!_single_sock = @servers[0];
}
}
-=begin perl5
+=begin pod
sub set_cb_connect_fail {
my Cache::Memcached $self = shift;
$self->{'cb_connect_fail'} = shift;
}
-=end
+=end pod
method set_cb_connect_fail (&callback) {
$!cb_connect_fail = &callback;
@@ -148,7 +151,7 @@ sub set_connect_timeout {
my Cache::Memcached $self = shift;
$self->{'connect_timeout'} = shift;
}
-=end
+=end pod
method set_connect_timeout ($timeout) {
$!connect_timeout = $timeout;
@@ -160,7 +163,7 @@ sub set_debug {
my ($dbg) = @_;
$self->{'debug'} = $dbg || 0;
}
-=end
+=end pod
method set_debug (Bool $debug = 0) {
$!debug = $debug;
@@ -172,7 +175,7 @@ sub set_readonly {
my ($ro) = @_;
$self->{'readonly'} = $ro;
}
-=end
+=end pod
method set_readonly (Bool $ro = 0) {
$!readonly = $ro;
@@ -184,7 +187,7 @@ sub set_norehash {
my ($val) = @_;
$self->{'no_rehash'} = $val;
}
-=end
+=end pod
method set_norehash (Bool $no_rehash = 0) {
$!no_rehash = $no_rehash;
@@ -196,7 +199,7 @@ sub set_compress_threshold {
my ($thresh) = @_;
$self->{'compress_threshold'} = $thresh;
}
-=end
+=end pod
method set_compress_threshold (Num $comp_thr) {
$!compress_threshold = $comp_thr;
@@ -208,7 +211,7 @@ sub enable_compress {
my ($enable) = @_;
$self->{'compress_enable'} = $enable;
}
-=end
+=end pod
method enable_compress (Bool $comp = 1) {
$!compress_enable = $comp;
@@ -220,7 +223,7 @@ sub forget_dead_hosts {
%host_dead = ();
$self->{'buck2sock'} = [];
}
-=end
+=end pod
method forget_dead_hosts () {
%host_dead = ();
@@ -233,7 +236,7 @@ sub set_stat_callback {
my ($stat_callback) = @_;
$self->{'stat_callback'} = $stat_callback;
}
-=end
+=end pod
method set_stat_callback (&callback) {
&!stat_callback = &callback;
@@ -254,12 +257,12 @@ sub _dead_sock {
$self->{'buck2sock'} = [] if $self;
return $ret; # 0 or undef, probably, depending on what caller wants
}
-=end
+=end pod
method _dead_sock ($sock, $ret, $dead_for) {
if my $ipport = %sock_map{$sock} {
my $now = time();
- %host_dead{$ipport} = $now + $dead if $dead_for;
+ %host_dead{$ipport} = $now + $dead_for if $dead_for;
%cache_sock.delete($ipport);
%sock_map.delete($sock);
}
@@ -277,7 +280,7 @@ sub _close_sock {
}
$self->{'buck2sock'} = [];
}
-=end
+=end pod
method _close_sock ($sock) {
if my $ipport = %sock_map{$sock} {
@@ -327,7 +330,7 @@ sub _connect_sock { # sock, sin, timeout
return $ret;
}
-=end
+=end pod
method _connect_sock ($sock, $sin, $timeout = 0.25) {
@@ -439,7 +442,7 @@ sub sock_to_host { # (host) #why is this public? I wouldn't have to worry about
return $sock;
}
-=end
+=end pod
# Why is this public? I wouldn't have to worry about undef $self if it weren't.
method sock_to_host ($host) {
@@ -471,7 +474,7 @@ method sock_to_host ($host) {
# TODO connect fail callback
#my &cb = &!cb_connect_fail;
#if &cb { &cb->() }
- return _dead_sock($sock, undef, 20 + 10.rand.Int;
+ return _dead_sock($sock, Mu, 20 + 10.rand.Int);
}
%sock_map{$sock} = $host;
@@ -499,15 +502,15 @@ sub get_sock { # (key)
}
return undef;
}
-=end
+=end pod
method get_sock ($key) {
if $!_single_sock {
return $.sock_to_host($!_single_sock);
}
- return undef unless $!active;
+ return unless $!active;
# TODO $key array
my $hv = _hashfunc($key);
@@ -518,11 +521,11 @@ method get_sock ($key) {
my $host = @!buckets[ $hv % $!bucketcount ];
my $sock = $.sock_to_host($host);
return $sock if $sock;
- return undef if $!no_rehash;
+ return if $!no_rehash;
$hv += _hashfunc($tries ~ $real_key); # stupid, but works
}
- return undef;
+ return;
}
=begin pod
@@ -539,7 +542,7 @@ sub init_buckets {
}
$self->{'bucketcount'} = scalar @{$self->{'buckets'}};
}
-=end
+=end pod
method init_buckets () {
return if @!buckets;
@@ -568,7 +571,7 @@ sub disconnect_all {
%cache_sock = ();
$self->{'buck2sock'} = [];
}
-=end
+=end pod
method disconnect_all () {
for %cache_sock.values -> $sock {
@@ -650,23 +653,24 @@ sub _write_and_read {
return $ret;
}
-=end
+=end pod
# writes a line, then reads result. by default stops reading after a
# single line, but caller can override the $check_complete subref,
# which gets passed a scalarref of buffer read thus far.
method _write_and_read ($sock, $line, $check_complete) {
my $res;
- my $ret = undef;
+ my $ret = Mu;
my $offset = 0;
-
+
&check_complete //= sub ($ret) {
return $ret.rindex("\r\n") + 2 == $ret.chars;
};
# state: 0 - writing, 1 - reading, 2 - done
my $state = 0;
+ my $copy_state = -1;
while True {
@@ -675,38 +679,39 @@ method _write_and_read ($sock, $line, $check_complete) {
$copy_state = $state;
}
- WRITE_LINE:
-
- my $res = $sock.send($line);
- next unless $res.defined;
- unless $res > 0 {
- $._close_sock($sock);
- return undef;
- }
+ my $to_send = $line.chars;
+
+ while $to_send > 0 {
+ $res = $sock.send($line);
+ last unless $res.defined;
+ if $res == 0 {
+ $._close_sock($sock);
+ return;
+ }
+ $to_send -= $res;
- if $res == $line.chars { # all sent
- $state = 1;
- }
- else { # we only succeeded in sending some of it
- $line = $line.substr($res); # delete the part we sent
- goto WRITE_LINE;
+ if $to_send == 0 { # all sent
+ $state = 1;
+ }
+ else { # we only succeeded in sending some of it
+ $line = $line.substr($res); # delete the part we sent
+ }
}
- my $res = $sock.recv();
+ $res = $sock.recv();
next unless $res.defined;
if $res.chars == 0 {
$._close_sock($sock);
- return undef;
+ return;
}
$offset += $res;
$state = 2 if &check_complete.($ret);
-
}
unless $state == 2 {
$._dead_sock($sock); # improperly finished
- return undef;
+ return;
}
return $ret;
@@ -735,7 +740,7 @@ sub delete {
return defined $res && $res eq "DELETED\r\n";
}
*remove = \&delete;
-=end
+=end pod
method delete ($key, $time = "") {
@@ -779,7 +784,7 @@ sub append {
sub prepend {
_set("prepend", @_);
}
-=end
+=end pod
method add ($key, $value) {
$._set('add', $key, $value);
@@ -861,7 +866,7 @@ sub _set {
return defined $res && $res eq "STORED\r\n";
}
-=end
+=end pod
method _set ($cmdname, $key, $val, $exptime = 0) {
return 0 if ! $!active || $!readonly;
@@ -872,7 +877,7 @@ method _set ($cmdname, $key, $val, $exptime = 0) {
my $sock = $._get_sock($key);
return 0 unless $sock;
- my $app_or_prep = $cmdname eq 'append' || $cmdname eq 'prepend' ?? 1 :: 0;
+ my $app_or_prep = ($cmdname eq 'append' or $cmdname eq 'prepend') ?? 1 !! 0;
%!stats{$cmdname}++;
my $flags = 0;
@@ -907,15 +912,13 @@ method decr ($key, $offset) {
}
sub _incrdecr ($cmdname, $key, $value) {
- return undef if ! $!active || $!readonly;
+ return if ! $!active || $!readonly;
my $stime;
- my $etime;
- my $value;
- my $stime = time() if &!stat_callback;
+ $stime = time() if &!stat_callback;
my $sock = $.get_sock($key);
- return undef unless $sock;
+ return unless $sock;
%!stats{$cmdname}++;
$value = 1 unless defined $value;
@@ -946,6 +949,7 @@ sub get {
return $r->{$kval};
}
+=end
method get ($key) {
@@ -961,6 +965,7 @@ method get ($key) {
}
+=begin pod
sub get_multi {
my Cache::Memcached $self = shift;
return {} unless $self->{'active'};
@@ -1194,11 +1199,22 @@ sub _load_multi {
return;
}
+=end pod
+=begin pod
sub _hashfunc {
return (crc32($_[0]) >> 16) & 0x7fff;
}
+=end pod
+
+method _hashfunc(Str $key) {
+ my $hash = String::CRC32.crc32($key);
+ $hash +>= 16;
+ $hash +&= 0x7FFF;
+ return $hash;
+}
+=begin pod
sub flush_all {
my Cache::Memcached $self = shift;
@@ -1213,8 +1229,28 @@ sub flush_all {
return $success;
}
+=end pod
+
+method flush_all () {
+ my $success = 1;
+ my @hosts = @!buckets;
-# returns array of lines, or () on failure.
+ for @hosts -> $host {
+ my $sock = $.sock_to_host($host);
+ my @res = $.run_command($sock, "flush_all\r\n");
+ $success = 0 unless @res == 1 && @res[0] eq "OK\r\n";
+ }
+
+ return $success;
+}
+
+#
+# <---------------------------------- CONVERTED TIL HERE
+#
+
+=begin pod
+
+# Returns array of lines, or () on failure.
sub run_command {
my Cache::Memcached $self = shift;
my ($sock, $cmd) = @_;
@@ -1230,6 +1266,31 @@ sub run_command {
return map { "$_\r\n" } split(/\r\n/, $ret);
}
+=end pod
+
+# Returns array of lines, or () on failure.
+method run_command ($sock, $cmd) {
+
+ return unless $sock;
+
+ my $ret;
+ my $line = $cmd;
+
+ while (my $res = self._write_and_read($sock, $line)) {
+ $line = Mu;
+ $ret ~= $res;
+ last if $ret ~~ /[ OK | END | ERROR ] \r\n $/;
+ }
+
+ $ret .= chop;
+ $ret .= chop;
+
+ $ret.split("\r\n") ==> map { "$_\r\n" } ==> my @lines;
+
+ return @lines;
+}
+
+=begin pod
sub stats {
my Cache::Memcached $self = shift;
my ($types) = @_;
@@ -1313,7 +1374,9 @@ sub stats {
return $stats_hr;
}
+=end pod
+=begin pod
sub stats_reset {
my Cache::Memcached $self = shift;
my ($types) = @_;
@@ -1329,9 +1392,27 @@ sub stats_reset {
}
return 1;
}
+=end pod
+
+method stats_reset ($types) {
+ return 0 unless $!active;
+
+ for @!buckets -> $host {
+ my $sock = self.sock_to_host($host);
+ next unless $sock;
+ my $ok = self._write_and_read($sock, "stats reset");
+ unless (defined $ok && $ok eq "RESET\r\n") {
+ self._dead_sock($sock);
+ }
+ }
+
+ return 1;
+}
1;
-__END__
+
+
+=begin pod
=head1 NAME
@@ -1621,3 +1702,6 @@ Anatoly Vorobey <mellon@pobox.com>
Brad Whitaker <whitaker@danga.com>
Jamie McCarthy <jamie@mccarthy.vg>
+
+=end pod
+

0 comments on commit 35404c7

Please sign in to comment.
Something went wrong with that request. Please try again.