Skip to content
Browse files

merged pbc branch to master

  • Loading branch information...
1 parent 85a963d commit 79bea382fd2c0753ca9ace79a11bb74c9a1d722b @robinedwards robinedwards committed
Showing with 2,361 additions and 669 deletions.
  1. +21 −4 Changes
  2. +2 −2 dist.ini
  3. +56 −31 lib/Net/Riak.pm
  4. +15 −67 lib/Net/Riak/Bucket.pm
  5. +1 −33 lib/Net/Riak/Client.pm
  6. +0 −15 lib/Net/Riak/Link.pm
  7. +5 −26 lib/Net/Riak/MapReduce.pm
  8. +32 −157 lib/Net/Riak/Object.pm
  9. +2 −20 lib/Net/Riak/Role/Hosts.pm
  10. +78 −0 lib/Net/Riak/Role/PBC.pm
  11. +46 −0 lib/Net/Riak/Role/PBC/Bucket.pm
  12. +35 −0 lib/Net/Riak/Role/PBC/Link.pm
  13. +37 −0 lib/Net/Riak/Role/PBC/MapReduce.pm
  14. +21 −0 lib/Net/Riak/Role/PBC/Message.pm
  15. +131 −0 lib/Net/Riak/Role/PBC/Object.pm
  16. +59 −4 lib/Net/Riak/Role/REST.pm
  17. +73 −0 lib/Net/Riak/Role/REST/Bucket.pm
  18. +52 −0 lib/Net/Riak/Role/REST/Link.pm
  19. +40 −0 lib/Net/Riak/Role/REST/MapReduce.pm
  20. +160 −0 lib/Net/Riak/Role/REST/Object.pm
  21. +8 −1 lib/Net/Riak/Role/UserAgent.pm
  22. +9 −0 lib/Net/Riak/Transport/PBC.pm
  23. +90 −0 lib/Net/Riak/Transport/PBC/Code.pm
  24. +121 −0 lib/Net/Riak/Transport/PBC/Message.pm
  25. +483 −0 lib/Net/Riak/Transport/PBC/Transport.pm
  26. +11 −0 lib/Net/Riak/Transport/REST.pm
  27. +38 −0 lib/Net/Riak/Types.pm
  28. +21 −0 pbc/compile_pbc.pl
  29. +246 −0 pbc/riakclient.proto
  30. +5 −0 t/00_use.t
  31. +0 −209 t/01_basic.t
  32. +25 −0 t/01_store_fetch_object.t
  33. +10 −0 t/02_missing_object.t
  34. +20 −0 t/03_delete_object.t
  35. +0 −12 t/04_bucket.t
  36. +24 −0 t/04_bucket_properties.t
  37. +0 −18 t/05_links.t
  38. +47 −0 t/05_object_siblings.t
  39. +40 −0 t/06_links.t
  40. +66 −0 t/07_map_reduce.t
  41. +15 −0 t/10_list_buckets.t
  42. +36 −0 t/11_get_keys.t
  43. +52 −63 t/90_bug_links.t
  44. 0 t/{02_client.t → client.t}
  45. 0 t/{06_host.t → hosts.t}
  46. +99 −0 t/lib/Test/Riak.pm
  47. +10 −0 t/pbc/server_info.t
  48. +7 −5 t/{03_object.t → rest/populate_object.t}
  49. +1 −1 t/{07_properties.t → rest/properties.t}
  50. +10 −0 t/rest/stats.t
  51. +1 −1 t/{08_stream.t → rest/stream.t}
View
25 Changes
@@ -1,10 +1,27 @@
+0.15
+ - fixed link encoding (Simon Wistow)
+ - added stats method for REST
+ - added server_info method for PBC
+ - added list bucket support for both protocols (0.14+ for REST)
+ - added PBC support
+ - added Test::Riak for running tests on both protocols
+ - added disable_return_body flag, in REST mode prevents body being returned on store
+ - create new object without keys (SymKat <symkat@symkat.com>)
+
+ DEPRECATIONS
+ - $object->status please call $object->client->status (for REST only)
+ - $object->count_links, added ->has_links and ->all_links
+ - disable_return_body will become the default in 0.17
+ - support for providing multiple hosts when connecting (only supported by REST)
+ all load balancing code to be moved out of Net::Riak by 0.17.
+
0.14 Mon Mar 14 08:59:10 GMT 2011
- - fixed delete link (Simon Wistow)
+ - fixed delete link (Simon Wistow)
0.13 Tue, 08 Feb 2011 16:40:07 GMT
- - Enviroment variable for riak when running tests (Robin Edwards)
- - Connection caching (Andrew Sayers)
- - Automatic key generation support (Simon Wistow)
+ - Enviroment variable for riak when running tests (Robin Edwards)
+ - Connection caching (Andrew Sayers)
+ - Automatic key generation support (Simon Wistow)
0.12
- fix _build_link method (reported by gphat)
View
4 dist.ini
@@ -1,9 +1,9 @@
name = Net-Riak
-author = franck cuny <franck@lumberjaph.net>
+author = franck cuny <franck@lumberjaph.net>, robin edwards <robin.ge@gmail.com>
license = Perl_5
copyright_holder = linkfluence
copyright_year = 2011
-version = 0.14
+version = 0.15
[@Filter]
bundle = @Basic
View
87 lib/Net/Riak.pm
@@ -6,19 +6,24 @@ use Moose;
use Net::Riak::Client;
use Net::Riak::Bucket;
+use Net::Riak::Types Client => { -as => 'Client_T' };
with 'Net::Riak::Role::MapReduce';
has client => (
is => 'rw',
- isa => 'Net::Riak::Client',
+ isa => Client_T,
required => 1,
- handles => [qw/is_alive http_request http_response/]
+ handles => [qw/is_alive all_buckets server_info stats/]
);
sub BUILDARGS {
my ($class, %args) = @_;
- my $client = Net::Riak::Client->new(%args);
+
+ my $transport = $args{transport} || 'REST';
+ my $trait = "Net::Riak::Transport::".$transport;
+
+ my $client = Net::Riak::Client->with_traits($trait)->new(%args);
$args{client} = $client;
\%args;
}
@@ -32,10 +37,19 @@ sub bucket {
1;
=head1 SYNOPSIS
-
+
+ # REST interface
my $client = Net::Riak->new(
host => 'http://10.0.0.40:8098',
- ua_timeout => 900
+ ua_timeout => 900,
+ disable_return_body => 1
+ );
+
+ # Or PBC interface.
+ my $client = Net::Riak->new(
+ transport => 'PBC',
+ host => '10.0.0.40',
+ port => 8080
);
my $bucket = $client->bucket('blog');
@@ -45,9 +59,6 @@ sub bucket {
$obj = $bucket->get('new_post');
say "title for ".$obj->key." is ".$obj->data->{title};
- my $req = $client->http_request; # last request
- $client->http_response # last response
-
=head1 DESCRIPTION
=head2 ATTRIBUTES
@@ -56,31 +67,27 @@ sub bucket {
=item B<host>
-URL of the node (default 'http://127.0.0.1:8098'). If your ring is composed with more than one node, you can configure the client to hit more than one host, instead of hitting always the same node. For this, you can do one of the following:
+REST: The URL of the node
-=over 4
+PBC: The hostname of the node
-=item B<all nodes equals>
+default 'http://127.0.0.1:8098'
- my $riak = Net::Riak->new(
- host => [
- 'http://10.0.0.40:8098',
- 'http://10.0.0.41:8098'
- ]
- );
+Note that providing multiple hosts is now deprecated.
+
+=back
-=item B<give weight to nodes>
+=item B<port>
- my $riak = Net::Riak->new(
- host => [
- {node => 'http://10.0.0.40:8098', weight => '0.2'},
- {node => 'http://10.0.0.41:8098', weight => '0.8'}
- ]
- );
+Port of the PBC interface.
=back
-Now, when a request is made, a node is picked at random, according to weight.
+=item B<transport>
+
+Used to select the PB protocol by passing in 'PBC'
+
+=back
=item B<prefix>
@@ -108,10 +115,20 @@ client_id for this client
=back
-=item B<ua_timeout>
+=item B<ua_timeout (REST only)>
timeout for L<LWP::UserAgent> in seconds, defaults to 3.
+=item B<disable_return_body (REST only)>
+
+Disable returning of object content in response in a store operation.
+
+If set to true and the object has siblings these will not be available without an additional fetch.
+
+This will become the default behaviour in 0.17
+
+=back
+
=head1 METHODS
=head2 bucket
@@ -128,6 +145,10 @@ Get the bucket by the specified name. Since buckets always exist, this will alwa
Check if the Riak server for this client is alive
+=head2 all_buckets
+
+List all buckets, requires Riak 0.14+ or PBC connection.
+
=head2 add
my $map_reduce = $client->add('bucket_name', 'key');
@@ -152,16 +173,20 @@ Start assembling a Map/Reduce operation
Start assembling a Map/Reduce operation
-=method http_request
+=head2 server_info (PBC only)
+
+ $client->server_info->{server_version};
-Returns the HTTP::Request object from the last request
+=head2 stats (REST only)
-=method http_response
-
-Returns a HTTP::Response object from the last request
+ say Dumper $client->stats;
=head2 SEE ALSO
Net::Riak::MapReduce
+Net::Riak::Object
+
+Net::Riak::Bucket
+
=cut
View
82 lib/Net/Riak/Bucket.pm
@@ -1,16 +1,14 @@
package Net::Riak::Bucket;
-
-# ABSTRACT: Access and change information about a Riak bucket
-
-use JSON;
use Moose;
-use Carp;
use Net::Riak::Object;
-
+use Net::Riak::Types Client => {-as => 'Client_T'};
with 'Net::Riak::Role::Replica' => {keys => [qw/r w dw/]};
-with 'Net::Riak::Role::Base' => {
- classes => [{ name => 'client', required => 1, }]
-};
+
+has client => (
+ is => 'rw',
+ isa => Client_T,
+ required => 1,
+);
has name => (
is => 'ro',
@@ -37,20 +35,17 @@ sub allow_multiples {
my $self = shift;
if (my $val = shift) {
- my $bool = ($val == 1 ? JSON::true : JSON::false);
+ my $bool = ($val == 1 ? 1 : 0);
$self->set_property('allow_mult', $bool);
}
else {
- return $self->get_property('allow_mult');
+ return $self->get_property('allow_mult') ? 1 : 0;
}
}
sub get_keys {
my ($self, $params) = @_;
- my $key_mode = delete($params->{stream}) ? 'stream' : 'true';
- $params = { props => 'false', keys => $key_mode, %$params };
- my $properties = $self->get_properties($params);
- return $properties->{keys};
+ $self->client->get_keys($self->name, $params);
}
sub get {
@@ -66,12 +61,12 @@ sub get {
}
sub delete_object {
- my ($self, $key) = @_;
+ my ($self, $key, $dw) = @_;
Net::Riak::Object->new(
client => $self->client,
bucket => $self,
key => $key
- )->delete;
+ )->delete($dw);
}
sub set_property {
@@ -87,59 +82,12 @@ sub get_property {
sub get_properties {
my ($self, $params) = @_;
-
- # Callbacks require stream mode
- $params->{keys} = 'stream' if $params->{cb};
-
- $params->{props} = 'true' unless exists $params->{props};
- $params->{keys} = 'false' unless exists $params->{keys};
-
- my $request = $self->client->new_request(
- 'GET', [$self->client->prefix, $self->name], $params
- );
-
- my $response = $self->client->send_request($request);
-
- unless ($response->is_success) {
- die "Error getting bucket properties: ".$response->status_line."\n";
- }
-
- if ($params->{keys} ne 'stream') {
- return JSON::decode_json($response->content);
- }
-
- # In streaming mode, aggregate keys from the multiple returned chunk objects
- else {
- my $json = JSON->new;
- my $props = $json->incr_parse($response->content);
- if ($params->{cb}) {
- while (defined(my $obj = $json->incr_parse)) {
- $params->{cb}->($_) foreach @{$obj->{keys}};
- }
- return %$props ? { props => $props } : {};
- }
- else {
- my @keys = map { $_->{keys} && ref $_->{keys} eq 'ARRAY' ? @{$_->{keys}} : () }
- $json->incr_parse;
- return { props => $props, keys => \@keys };
- }
- }
+ $self->client->get_properties($self->name, $params);
}
sub set_properties {
my ($self, $props) = @_;
-
- my $request = $self->client->new_request(
- 'PUT', [$self->client->prefix, $self->name]
- );
-
- $request->header('Content-Type' => $self->content_type);
- $request->content(JSON::encode_json({props => $props}));
-
- my $response = $self->client->send_request($request);
- unless ($response->is_success) {
- die "Error setting bucket properties: ".$response->status_line."\n";
- }
+ $self->client->set_properties($self, $props);
}
sub new_object {
@@ -169,7 +117,7 @@ sub new_object {
my $obj2 = $bucket->new_object('foo2', {...});
$object->store;
- $bucket->delete_object($key);
+ $bucket->delete_object($key, 3); # optional w val
=head1 DESCRIPTION
View
34 lib/Net/Riak/Client.pm
@@ -2,10 +2,8 @@ package Net::Riak::Client;
use Moose;
use MIME::Base64;
-use Moose::Util::TypeConstraints;
-class_type 'HTTP::Request';
-class_type 'HTTP::Response';
+with 'MooseX::Traits';
has prefix => (
is => 'rw',
@@ -27,40 +25,10 @@ has client_id => (
isa => 'Str',
lazy_build => 1,
);
-has http_request => (
- is => 'rw',
- isa => 'HTTP::Request',
-);
-
-has http_response => (
- is => 'rw',
- isa => 'HTTP::Response',
- handles => ['is_success']
-);
-
-has ua_timeout => (
- is => 'rw',
- isa => 'Int',
- default => 3
-);
-
-with 'Net::Riak::Role::UserAgent';
-with qw/
- Net::Riak::Role::REST
- Net::Riak::Role::Hosts
- /;
-
-
sub _build_client_id {
"perl_net_riak" . encode_base64(int(rand(10737411824)), '');
}
-sub is_alive {
- my $self = shift;
- my $request = $self->new_request('GET', ['ping']);
- my $response = $self->send_request($request);
- $self->is_success ? return 1 : return 0;
-}
1;
View
15 lib/Net/Riak/Link.pm
@@ -20,19 +20,4 @@ has tag => (
default => sub {(shift)->bucket->name}
);
-sub to_link_header {
- my ($self, $client) = @_;
-
- $client ||= $self->client;
-
- my $link = '';
- $link .= '</';
- $link .= $client->prefix . '/';
- $link .= $self->bucket->name . '/';
- $link .= $self->key . '>; riaktag="';
- $link .= $self->tag . '"';
- return $link;
-}
-
1;
-
View
31 lib/Net/Riak/MapReduce.pm
@@ -6,6 +6,8 @@ use JSON;
use Moose;
use Scalar::Util;
+use Data::Dumper;
+
use Net::Riak::LinkPhase;
use Net::Riak::MapReducePhase;
@@ -156,35 +158,12 @@ sub run {
$inputs = $self->inputs;
}
- my $ua_timeout = $self->client->useragent->timeout();
-
my $job = {inputs => $inputs, query => $query};
- if ($timeout) {
- if ($ua_timeout < ($timeout/1000)) {
- $self->client->useragent->timeout(int($timeout/1000));
- }
- $job->{timeout} = $timeout;
- }
-
-
- my $content = JSON::encode_json($job);
- my $request = $self->client->new_request(
- 'POST', [$self->client->mapred_prefix]
- );
- $request->content($content);
-
- my $response = $self->client->send_request($request);
-
- unless ($response->is_success) {
- die "MapReduce query failed: ".$response->status_line;
- }
-
- my $result = JSON::decode_json($response->content);
+ # how phases set to 'keep'.
+ my $p = scalar ( grep { $_->keep } $self->phases);
- if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) {
- $self->client->useragent->timeout($ua_timeout);
- }
+ my $result = $self->client->execute_job($job, $timeout, $p);
my @phases = $self->phases;
if (ref $phases[-1] ne 'Net::Riak::LinkPhase') {
View
189 lib/Net/Riak/Object.pm
@@ -2,24 +2,27 @@ package Net::Riak::Object;
# ABSTRACT: holds meta information about a Riak object
-use Carp;
-use JSON;
use Moose;
use Scalar::Util;
use Net::Riak::Link;
with 'Net::Riak::Role::Replica' => {keys => [qw/r w dw/]};
with 'Net::Riak::Role::Base' => {classes =>
- [{name => 'bucket', required => 1}, {name => 'client', required => 1}]};
-
+ [{name => 'bucket', required => 1}]};
+use Net::Riak::Types Client => {-as => 'Client_T'};
+has client => (
+ is => 'rw',
+ isa => Client_T,
+ required => 1,
+);
has key => (is => 'rw', isa => 'Str', required => 0);
-has status => (is => 'rw', isa => 'Int');
has exists => (is => 'rw', isa => 'Bool', default => 0,);
has data => (is => 'rw', isa => 'Any', clearer => '_clear_data');
-has vclock => (is => 'rw', isa => 'Str', predicate => 'has_vclock',);
+has vclock => (is => 'rw', isa => 'Str', predicate => 'has_vclock');
+has vtag => (is => 'rw', isa => 'Str');
has content_type => (is => 'rw', isa => 'Str', default => 'application/json');
-has _headers => (is => 'rw', isa => 'HTTP::Response',);
-has _jsonize => (is => 'rw', isa => 'Bool', lazy => 1, default => 1,);
+has location => ( is => 'rw', isa => 'Str' );
+has _jsonize => (is => 'rw', isa => 'Bool', lazy => 1, default => 1);
has links => (
traits => ['Array'],
is => 'rw',
@@ -31,6 +34,7 @@ has links => (
count_links => 'elements',
append_link => 'push',
has_links => 'count',
+ all_links => 'elements',
},
clearer => '_clear_links',
);
@@ -52,62 +56,31 @@ has siblings => (
clearer => '_clear_siblings',
);
+after count_links => sub {
+ warn "DEPRECATED: count_links method will be removed in the 0.17 release, please use has_links.";
+};
+
sub store {
my ($self, $w, $dw) = @_;
$w ||= $self->w;
$dw ||= $self->dw;
- my $params = {returnbody => 'true', w => $w, dw => $dw};
- my $path = [$self->client->prefix, $self->bucket->name];
- my $method = 'POST';
- if (defined $self->key) {
- push @$path, $self->key;
- $method = 'PUT';
- }
-
- my $request = $self->client->new_request($method, $path, $params);
-
- $request->header('X-Riak-ClientID' => $self->client->client_id);
- $request->header('Content-Type' => $self->content_type);
-
- if ($self->has_vclock) {
- $request->header('X-Riak-Vclock' => $self->vclock);
- }
-
- if ($self->has_links) {
- $request->header('link' => $self->_links_to_header);
- }
-
- if (ref $self->data && $self->content_type eq 'application/json') {
- $request->content(JSON::encode_json($self->data));
- }
- else {
- $request->content($self->data);
- }
-
- my $response = $self->client->send_request($request);
- $self->populate($response, [200, 201, 204, 300]);
- $self;
+ $self->client->store_object($w, $dw, $self);
}
-sub _links_to_header {
- my $self = shift;
- join(', ', map { $_->to_link_header($self->client) } $self->links);
-}
+sub status {
+ my ($self) = @_;
+ warn "DEPRECATED: status method will be removed in the 0.17 release, please use ->client->status.";
+ $self->client->status;
+}
sub load {
my $self = shift;
my $params = {r => $self->r};
- my $request =
- $self->client->new_request('GET',
- [$self->client->prefix, $self->bucket->name, $self->key], $params);
-
- my $response = $self->client->send_request($request);
- $self->populate($response, [200, 300, 404]);
- $self;
+ $self->client->load_object($params, $self);
}
sub delete {
@@ -116,13 +89,7 @@ sub delete {
$dw ||= $self->bucket->dw;
my $params = {dw => $dw};
- my $request =
- $self->client->new_request('DELETE',
- [$self->client->prefix, $self->bucket->name, $self->key], $params);
-
- my $response = $self->client->send_request($request);
- $self->populate($response, [204, 404]);
- $self;
+ $self->client->delete_object($params, $self);
}
sub clear {
@@ -133,109 +100,17 @@ sub clear {
$self;
}
-sub populate {
- my ($self, $http_response, $expected) = @_;
-
- $self->clear;
-
- return if (!$http_response);
-
- my $status = $http_response->code;
- $self->_headers($http_response);
- $self->status($status);
-
- $self->data($http_response->content);
-
- if (!grep { $status == $_ } @$expected) {
- confess "Expected status "
- . (join(', ', @$expected))
- . ", received $status"
- }
-
- if ($status == 404) {
- $self->clear;
- return;
- }
-
- $self->exists(1);
-
- if ($http_response->header('link')) {
- $self->_populate_links($http_response->header('link'));
- }
-
- if ($status == 300) {
- my @siblings = split("\n", $self->data);
- shift @siblings;
- $self->siblings(\@siblings);
- }
-
- if ($status == 201) {
- my $location = $http_response->header('location');
- my ($key) = ($location =~ m!/([^/]+)$!);
- $self->key($key);
- }
-
-
- if ($status == 200 || $status == 201) {
- $self->content_type($http_response->content_type)
- if $http_response->content_type;
- $self->data(JSON::decode_json($self->data))
- if $self->content_type eq 'application/json';
- $self->vclock($http_response->header('X-Riak-Vclock'));
- }
-}
-
-sub _uri_decode {
- my $str = shift;
- $str =~ s/%([a-fA-F0-9]{2,2})/chr(hex($1))/eg;
- return $str;
-}
-
-sub _populate_links {
- my ($self, $links) = @_;
- for my $link (split(',', $links)) {
- if ($link
- =~ /\<\/([^\/]+)\/([^\/]+)\/([^\/]+)\>; ?riaktag=\"([^\']+)\"/)
- {
- my $bucket = _uri_decode($2);
- my $key = _uri_decode($3);
- my $tag = _uri_decode($4);
- my $l = Net::Riak::Link->new(
- bucket => Net::Riak::Bucket->new(
- name => $bucket,
- client => $self->client
- ),
- key => $key,
- tag => $tag
- );
- $self->add_link($l);
- }
- }
-}
-
sub sibling {
my ($self, $id, $r) = @_;
$r ||= $self->bucket->r;
my $vtag = $self->get_sibling($id);
- my $params = {r => $r, vtag => $vtag};
- my $request =
- $self->client->new_request('GET',
- [$self->client->prefix, $self->bucket->name, $self->key], $params);
- my $response = $self->client->send_request($request);
-
- my $obj = Net::Riak::Object->new(
- client => $self->client,
- bucket => $self->bucket,
- key => $self->key
+ return $self->client->retrieve_sibling(
+ $self, {r => $r, vtag => $vtag}
);
- $obj->_jsonize($self->_jsonize);
- $obj->populate($response, [200]);
- $obj;
}
-
sub _build_link {
my ($self,$obj,$tag) = @_;
blessed $obj && $obj->isa('Net::Riak::Link')
@@ -337,10 +212,6 @@ Get or set the data stored in this object.
=item B<content_type>
-=item B<status>
-
-Get the HTTP status from the last operation on this object.
-
=item B<links>
Get an array of L<Net::Riak::Link> objects
@@ -359,7 +230,11 @@ Return an array of Siblings
=over 4
-=item count_links
+=item all_links
+
+Return the number of links
+
+=item has_links
Return the number of links
@@ -445,7 +320,7 @@ Return true if this object has siblings
Return true if this object has no siblings
-=item populate
+=item populate_object
Given the output of RiakUtils.http_request and a list of statuses, populate the object. Only for use by the Riak client library.
View
22 lib/Net/Riak/Role/Hosts.pm
@@ -1,29 +1,11 @@
package Net::Riak::Role::Hosts;
use Moose::Role;
-use Moose::Util::TypeConstraints;
-
-subtype 'RiakHost' => as 'ArrayRef[HashRef]';
-
-coerce 'RiakHost' => from 'Str' => via {
- [{node => $_, weight => 1}];
-};
-coerce 'RiakHost' => from 'ArrayRef' => via {
- my $backends = $_;
- my $weight = 1 / @$backends;
- [map { {node => $_, weight => $weight} } @$backends];
-};
-coerce 'RiakHost' => from 'HashRef' => via {
- my $backends = $_;
- my $total = 0;
- $total += $_ for values %$backends;
- [map { {node => $_, weight => $backends->{$_} / $total} }
- keys %$backends];
-};
+use Net::Riak::Types qw(RiakHost);
has host => (
is => 'rw',
- isa => 'RiakHost',
+ isa => RiakHost,
coerce => 1,
default => 'http://127.0.0.1:8098',
);
View
78 lib/Net/Riak/Role/PBC.pm
@@ -0,0 +1,78 @@
+package Net::Riak::Role::PBC;
+
+use Moose::Role;
+use MooseX::Types::Moose qw/Str Int/;
+
+with qw(
+ Net::Riak::Role::PBC::Message
+ Net::Riak::Role::PBC::Bucket
+ Net::Riak::Role::PBC::MapReduce
+ Net::Riak::Role::PBC::Link
+ Net::Riak::Role::PBC::Object);
+
+use Net::Riak::Types 'Socket';
+use IO::Socket::INET;
+
+has [qw/r w dw/] => (
+ is => 'rw',
+ isa => Int,
+ default => 2
+);
+
+has host => (
+ is => 'ro',
+ isa => Str,
+ required => 1,
+);
+
+has port => (
+ is => 'ro',
+ isa => Int,
+ required => 1,
+);
+
+has socket => (
+ is => 'rw',
+ isa => Socket,
+ predicate => 'has_socket',
+);
+
+sub is_alive {
+ my $self = shift;
+ return $self->send_message('PingReq');
+}
+
+sub connected {
+ my $self = shift;
+ return $self->has_socket && $self->socket->connected ? 1 : 0;
+}
+
+sub connect {
+ my $self = shift;
+ return if $self->has_socket && $self->connected;
+
+ $self->socket(
+ IO::Socket::INET->new(
+ PeerAddr => $self->host,
+ PeerPort => $self->port,
+ Proto => 'tcp',
+ Timeout => 30,
+ )
+ );
+}
+
+sub all_buckets {
+ my $self = shift;
+ my $resp = $self->send_message('ListBucketsReq');
+ return ref ($resp->buckets) eq 'ARRAY' ? @{$resp->buckets} : ();
+}
+
+sub server_info {
+ my $self = shift;
+ my $resp = $self->send_message('GetServerInfoReq');
+ return $resp;
+}
+
+sub stats { die "->stats is only avaliable through the REST interface" }
+
+1;
View
46 lib/Net/Riak/Role/PBC/Bucket.pm
@@ -0,0 +1,46 @@
+package Net::Riak::Role::PBC::Bucket;
+
+use Moose::Role;
+use Data::Dumper;
+
+sub get_properties {
+ my ( $self, $name, $params ) = @_;
+ my $resp = $self->send_message( GetBucketReq => { bucket => $name } );
+ return { props => { %{ $resp->props } } };
+}
+
+sub set_properties {
+ my ( $self, $bucket, $props ) = @_;
+ return $self->send_message(
+ SetBucketReq => {
+ bucket => $bucket->name,
+ props => $props
+ }
+ );
+}
+
+sub get_keys {
+ my ( $self, $name, $params) = @_;
+ my $keys = [];
+
+ my $res = $self->send_message(
+ ListKeysReq => { bucket => $name, },
+ sub {
+ if ( defined $_[0]->keys ) {
+ if ($params->{cb}) {
+ $params->{cb}->($_) for @{ $_[0]->keys };
+ }
+ else {
+ push @$keys, @{ $_[0]->keys };
+ }
+ }
+ }
+ );
+
+ return $params->{cb} ? undef : $keys;
+}
+
+
+
+1;
+
View
35 lib/Net/Riak/Role/PBC/Link.pm
@@ -0,0 +1,35 @@
+package Net::Riak::Role::PBC::Link;
+use Moose::Role;
+use Net::Riak::Link;
+use Net::Riak::Bucket;
+
+sub _populate_links {
+ my ($self, $object, $links) = @_;
+
+ for my $link (@$links) {
+ my $l = Net::Riak::Link->new(
+ bucket => Net::Riak::Bucket->new(
+ name => $link->bucket,
+ client => $self
+ ),
+ key => $link->key,
+ tag => $link->tag
+ );
+ $object->add_link($l);
+ }
+}
+
+sub _links_for_message {
+ my ($self, $object) = @_;
+
+ return [
+ map { {
+ tag => $_->tag,
+ key => $_->key,
+ bucket => $_->bucket->name
+ }
+ } $object->all_links
+ ]
+}
+
+1;
View
37 lib/Net/Riak/Role/PBC/MapReduce.pm
@@ -0,0 +1,37 @@
+package Net::Riak::Role::PBC::MapReduce;
+use Moose::Role;
+use JSON;
+use List::Util 'sum';
+use Data::Dump 'pp';
+
+sub execute_job {
+ my ($self, $job, $timeout, $returned_phases) = @_;
+
+ $job->{timeout} = $timeout;
+
+ my $job_request = JSON::encode_json($job);
+
+ my $results;
+
+ my $resp = $self->send_message( MapRedReq => {
+ request => $job_request,
+ content_type => 'application/json'
+ }, sub { push @$results, $self->decode_phase(shift) })
+ or
+ die "MapReduce query failed!";
+
+
+ return $returned_phases == 1 ? $results->[0] : $results;
+}
+
+sub decode_phase {
+ my ($self, $resp) = @_;
+
+ if (defined $resp->response && length($resp->response)) {
+ return JSON::decode_json($resp->response);
+ }
+
+ return;
+}
+
+1;
View
21 lib/Net/Riak/Role/PBC/Message.pm
@@ -0,0 +1,21 @@
+package Net::Riak::Role::PBC::Message;
+
+use Moose::Role;
+use Net::Riak::Transport::PBC::Message;
+
+sub send_message {
+ my ( $self, $type, $params, $cb ) = @_;
+
+ $self->connect unless $self->connected;
+
+ my $message = Net::Riak::Transport::PBC::Message->new(
+ message_type => $type,
+ params => $params || {},
+ );
+
+ $message->socket( $self->socket );
+
+ return $message->send($cb);
+}
+
+1;
View
131 lib/Net/Riak/Role/PBC/Object.pm
@@ -0,0 +1,131 @@
+package Net::Riak::Role::PBC::Object;
+
+use JSON;
+use Moose::Role;
+use Data::Dumper;
+use List::Util 'first';
+
+sub store_object {
+ my ($self, $w, $dw, $object) = @_;
+
+ my $value = (ref $object->data && $object->content_type eq 'application/json')
+ ? JSON::encode_json($object->data) : $object->data;
+
+ my $content = {
+ content_type => $object->content_type,
+ value => $value,
+ usermeta => undef
+ };
+
+ if ($object->has_links) {
+ $content->{links} = $self->_links_for_message($object);
+ }
+
+ $self->send_message(
+ PutReq => {
+ bucket => $object->bucket->name,
+ key => $object->key,
+ content => $content,
+ }
+ );
+ return $object;
+}
+
+sub load_object {
+ my ( $self, $params, $object ) = @_;
+
+ my $resp = $self->send_message(
+ GetReq => {
+ bucket => $object->bucket->name,
+ key => $object->key,
+ r => $params->{r},
+ }
+ );
+
+ $self->populate_object($object, $resp);
+
+ return $object;
+}
+
+sub delete_object {
+ my ( $self, $params, $object ) = @_;
+
+ my $resp = $self->send_message(
+ DelReq => {
+ bucket => $object->bucket->name,
+ key => $object->key,
+ rw => $params->{dw},
+ }
+ );
+
+ $object;
+}
+
+sub populate_object {
+ my ( $self, $object, $resp) = @_;
+
+ $object->_clear_links;
+ $object->exists(0);
+
+ if ( $resp->content && scalar (@{$resp->content}) > 1) {
+ my %seen;
+ my @vtags = grep { !$seen{$_}++ } map { $_->vtag } @{$resp->content};
+ $object->siblings(\@vtags);
+ }
+
+ my $content = $resp->content ? $resp->content->[0] : undef;
+
+ return unless $content and $resp->vclock;
+
+ $object->vclock($resp->vclock);
+ $object->vtag($content->vtag);
+ $object->content_type($content->content_type);
+
+ if($content->links) {
+ $self->_populate_links($object, $content->links);
+ }
+
+ my $data = ($object->content_type eq 'application/json')
+ ? JSON::decode_json($content->value) : $content->value;
+
+ $object->exists(1);
+
+ $object->data($data);
+}
+
+
+# This emulates the behavior of the existing REST client.
+sub retrieve_sibling {
+ my ($self, $object, $params) = @_;
+
+ my $resp = $self->send_message(
+ GetReq => {
+ bucket => $object->bucket->name,
+ key => $object->key,
+ r => $params->{r},
+ }
+ );
+
+ # hack for loading 1 sibling
+ if ($params->{vtag}) {
+ $resp->{content} = [
+ first {
+ $_->vtag eq $params->{vtag}
+ } @{$resp->content}
+ ];
+ }
+
+ my $sibling = Net::Riak::Object->new(
+ client => $self,
+ bucket => $object->bucket,
+ key => $object->key
+ );
+
+ $sibling->_jsonize($object->_jsonize);
+
+ $self->populate_object($sibling, $resp);
+
+ $sibling;
+}
+
+1;
View
63 lib/Net/Riak/Role/REST.pm
@@ -3,12 +3,36 @@ package Net::Riak::Role::REST;
# ABSTRACT: role for REST operations
use URI;
-use HTTP::Request;
+
use Moose::Role;
+use MooseX::Types::Moose 'Bool';
+use Net::Riak::Types qw/HTTPResponse HTTPRequest/;
+use Data::Dump 'pp';
+with qw/Net::Riak::Role::REST::Bucket
+ Net::Riak::Role::REST::Object
+ Net::Riak::Role::REST::Link
+ Net::Riak::Role::REST::MapReduce
+ /;
+
+has http_request => (
+ is => 'rw',
+ isa => HTTPRequest,
+);
+
+has http_response => (
+ is => 'rw',
+ isa => HTTPResponse,
+ handles => {
+ is_success => 'is_success',
+ status => 'code',
+ }
+);
-requires 'http_request';
-requires 'http_response';
-requires 'useragent';
+has disable_return_body => (
+ is => 'rw',
+ isa => Bool,
+ default => 0
+);
sub _build_path {
my ($self, $path) = @_;
@@ -37,9 +61,40 @@ sub send_request {
$self->http_request($req);
my $r = $self->useragent->request($req);
+
$self->http_response($r);
+ if ($ENV{RIAK_VERBOSE}) {
+ print STDERR pp($r);
+ }
+
return $r;
}
+sub is_alive {
+ my $self = shift;
+ my $request = $self->new_request('HEAD', ['ping']);
+ my $response = $self->send_request($request);
+ $self->is_success ? return 1 : return 0;
+}
+
+sub all_buckets {
+ my $self = shift;
+ my $request = $self->new_request('GET', [$self->prefix], {buckets => 'true'});
+ my $response = $self->send_request($request);
+ die "Failed to fetch buckets.. are you running riak 0.14+?"
+ unless $response->is_success;
+ my $resp = JSON::decode_json($response->content);
+ return ref ($resp->{buckets}) eq 'ARRAY' ? @{$resp->{buckets}} : ();
+}
+
+sub server_info { die "->server_info not supported by the REST interface" }
+
+sub stats {
+ my $self = shift;
+ my $request = $self->new_request('GET', ["stats"]);
+ my $response = $self->send_request($request);
+ return JSON::decode_json($response->content);
+}
+
1;
View
73 lib/Net/Riak/Role/REST/Bucket.pm
@@ -0,0 +1,73 @@
+package Net::Riak::Role::REST::Bucket;
+
+use Moose::Role;
+use JSON;
+
+sub get_properties {
+ my ($self, $name, $params) = @_;
+
+ # Callbacks require stream mode
+ $params->{keys} = 'stream' if $params->{cb};
+
+ $params->{props} = 'true' unless exists $params->{props};
+ $params->{keys} = 'false' unless exists $params->{keys};
+
+ my $request = $self->new_request(
+ 'GET', [$self->prefix, $name], $params
+ );
+
+ my $response = $self->send_request($request);
+
+ unless ($response->is_success) {
+ die "Error getting bucket properties: ".$response->status_line."\n";
+ }
+
+ if ($params->{keys} ne 'stream') {
+ return JSON::decode_json($response->content);
+ }
+
+ # In streaming mode, aggregate keys from the multiple returned chunk objects
+ else {
+ my $json = JSON->new;
+ my $props = $json->incr_parse($response->content);
+ if ($params->{cb}) {
+ while (defined(my $obj = $json->incr_parse)) {
+ $params->{cb}->($_) foreach @{$obj->{keys}};
+ }
+ return %$props ? { props => $props } : {};
+ }
+ else {
+ my @keys = map { $_->{keys} && ref $_->{keys} eq 'ARRAY' ? @{$_->{keys}} : () }
+ $json->incr_parse;
+ return { props => $props, keys => \@keys };
+ }
+ }
+}
+
+sub set_properties {
+ my ($self, $bucket, $props) = @_;
+
+ my $request = $self->new_request(
+ 'PUT', [$self->prefix, $bucket->name]
+ );
+
+ $request->header('Content-Type' => $bucket->content_type);
+ $request->content(JSON::encode_json({props => $props}));
+
+ my $response = $self->send_request($request);
+ unless ($response->is_success) {
+ die "Error setting bucket properties: ".$response->status_line."\n";
+ }
+}
+
+sub get_keys {
+ my ($self, $bucket, $params) = @_;
+
+ my $key_mode = delete($params->{stream}) ? 'stream' : 'true';
+ $params = { props => 'false', keys => $key_mode, %$params };
+ my $properties = $self->get_properties($bucket, $params);
+
+ return $properties->{keys};
+}
+
+1;
View
52 lib/Net/Riak/Role/REST/Link.pm
@@ -0,0 +1,52 @@
+package Net::Riak::Role::REST::Link;
+use Moose::Role;
+use Net::Riak::Link;
+use Net::Riak::Bucket;
+
+sub _populate_links {
+ my ($self, $object, $links) = @_;
+
+ for my $link (split(',', $links)) {
+ if ($link
+ =~ /\<\/([^\/]+)\/([^\/]+)\/([^\/]+)\>; ?riaktag=\"([^\']+)\"/)
+ {
+ my $bucket = _uri_decode($2);
+ my $key = _uri_decode($3);
+ my $tag = _uri_decode($4);
+ my $l = Net::Riak::Link->new(
+ bucket => Net::Riak::Bucket->new(
+ name => $bucket,
+ client => $self
+ ),
+ key => $key,
+ tag => $tag
+ );
+ $object->add_link($l);
+ }
+ }
+}
+
+sub _uri_decode {
+ my $str = shift;
+ $str =~ s/%([a-fA-F0-9]{2,2})/chr(hex($1))/eg;
+ return $str;
+}
+
+sub _links_to_header {
+ my ($self, $object) = @_;
+ join(', ', map { $self->link_to_header($_) } $object->links);
+}
+
+sub link_to_header {
+ my ($self, $link) = @_;
+
+ my $link_header = '';
+ $link_header .= '</';
+ $link_header .= $self->prefix . '/';
+ $link_header .= $link->bucket->name . '/';
+ $link_header .= $link->key . '>; riaktag="';
+ $link_header .= $link->tag . '"';
+ return $link_header;
+}
+
+1;
View
40 lib/Net/Riak/Role/REST/MapReduce.pm
@@ -0,0 +1,40 @@
+package Net::Riak::Role::REST::MapReduce;
+use Moose::Role;
+use JSON;
+use Data::Dumper;
+
+sub execute_job {
+ my ($self, $job, $timeout) = @_;
+
+ # save existing timeout value.
+ my $ua_timeout = $self->useragent->timeout();
+
+ if ($timeout) {
+ if ($ua_timeout < ($timeout/1000)) {
+ $self->useragent->timeout(int($timeout/1000));
+ }
+ $job->{timeout} = $timeout;
+ }
+
+ my $content = JSON::encode_json($job);
+
+ my $request = $self->new_request(
+ 'POST', [$self->mapred_prefix]
+ );
+ $request->content($content);
+
+ my $response = $self->send_request($request);
+
+ # restore time out value
+ if ( $timeout && ( $ua_timeout != $self->useragent->timeout() ) ) {
+ $self->useragent->timeout($ua_timeout);
+ }
+
+ unless ($response->is_success) {
+ die "MapReduce query failed: ".$response->status_line;
+ }
+
+ return JSON::decode_json($response->content);
+}
+
+1;
View
160 lib/Net/Riak/Role/REST/Object.pm
@@ -0,0 +1,160 @@
+package Net::Riak::Role::REST::Object;
+
+use Moose::Role;
+use JSON;
+
+sub store_object {
+ my ($self, $w, $dw, $object) = @_;
+
+ my $params = {returnbody => 'true', w => $w, dw => $dw};
+
+ $params->{returnbody} = 'false'
+ if $self->disable_return_body;
+
+
+ my $request;
+ if ( defined $object->key ) {
+ $request = $self->new_request('PUT',
+ [$self->prefix, $object->bucket->name, $object->key], $params);
+ } else {
+ $request = $self->new_request('POST',
+ [$self->prefix, $object->bucket->name ], $params);
+ }
+
+ $request->header('X-Riak-ClientID' => $self->client_id);
+ $request->header('Content-Type' => $object->content_type);
+
+ if ($object->has_vclock) {
+ $request->header('X-Riak-Vclock' => $object->vclock);
+ }
+
+ if ($object->has_links) {
+ $request->header('link' => $self->_links_to_header($object));
+ }
+
+ if (ref $object->data && $object->content_type eq 'application/json') {
+ $request->content(JSON::encode_json($object->data));
+ }
+ else {
+ $request->content($object->data);
+ }
+
+ my $response = $self->send_request($request);
+ $self->populate_object($object, $response, [200, 201, 204, 300]);
+ return $object;
+}
+
+sub load_object {
+ my ( $self, $params, $object ) = @_;
+
+ my $request =
+ $self->new_request( 'GET',
+ [ $self->prefix, $object->bucket->name, $object->key ], $params );
+
+ my $response = $self->send_request($request);
+ $self->populate_object($object, $response, [ 200, 300, 404 ] );
+ $object;
+}
+
+sub delete_object {
+ my ( $self, $params, $object ) = @_;
+
+ my $request =
+ $self->new_request( 'DELETE',
+ [ $self->prefix, $object->bucket->name, $object->key ], $params );
+
+ my $response = $self->send_request($request);
+ $self->populate_object($object, $response, [ 204, 404 ] );
+ $object;
+}
+
+sub populate_object {
+ my ($self, $obj, $http_response, $expected) = @_;
+
+ $obj->_clear_links;
+ $obj->exists(0);
+
+ return if (!$http_response);
+
+ my $status = $http_response->code;
+
+ $obj->data($http_response->content)
+ unless $self->disable_return_body;
+
+ if ( $http_response->header('location') ) {
+ $obj->key( $http_response->header('location') );
+ $obj->location( $http_response->header('location') );
+ }
+
+ if (!grep { $status == $_ } @$expected) {
+ confess "Expected status "
+ . (join(', ', @$expected))
+ . ", received $status"
+ }
+
+ if ($status == 404) {
+ $obj->clear;
+ return;
+ }
+
+ $obj->exists(1);
+
+ if ($http_response->header('link')) {
+ $self->_populate_links($obj, $http_response->header('link'));
+ }
+
+ if ($status == 300) {
+ my @siblings = split("\n", $obj->data);
+ shift @siblings;
+ my %seen; @siblings = grep { !$seen{$_}++ } @siblings;
+ $obj->siblings(\@siblings);
+ }
+
+ if ($status == 201) {
+ my $location = $http_response->header('location');
+ my ($key) = ($location =~ m!/([^/]+)$!);
+ $obj->key($key);
+ }
+
+
+ if ($status == 200 || $status == 201) {
+ $obj->content_type($http_response->content_type)
+ if $http_response->content_type;
+ $obj->data(JSON::decode_json($obj->data))
+ if $obj->content_type eq 'application/json';
+ $obj->vclock($http_response->header('X-Riak-Vclock'));
+ }
+}
+
+sub retrieve_sibling {
+ my ($self, $object, $params) = @_;
+
+ my $request = $self->new_request(
+ 'GET',
+ [$self->prefix, $object->bucket->name, $object->key],
+ $params
+ );
+
+ my $response = $self->send_request($request);
+
+ my $sibling = Net::Riak::Object->new(
+ client => $self,
+ bucket => $object->bucket,
+ key => $object->key
+ );
+
+ $sibling->_jsonize($object->_jsonize);
+ $self->populate_object($sibling, $response, [200]);
+ $sibling;
+}
+
+
+
+
+1;
+__END__
+
+=item populate_object
+
+Given the output of RiakUtils.http_request and a list of statuses, populate the object. Only for use by the Riak client library.
+
View
9 lib/Net/Riak/Role/UserAgent.pm
@@ -10,6 +10,12 @@ our $CONN_CACHE;
sub connection_cache { $CONN_CACHE ||= LWP::ConnCache->new }
+has ua_timeout => (
+ is => 'rw',
+ isa => 'Int',
+ default => 120
+);
+
has useragent => (
is => 'rw',
isa => 'LWP::UserAgent',
@@ -24,7 +30,8 @@ has useragent => (
@LWP::Protocol::http::EXTRA_SOCK_OPTS = %opts;
my $ua = LWP::UserAgent->new(
- timeout => $self->ua_timeout
+ timeout => $self->ua_timeout,
+ keep_alive => 1,
);
$ua->conn_cache(__PACKAGE__->connection_cache);
View
9 lib/Net/Riak/Transport/PBC.pm
@@ -0,0 +1,9 @@
+package Net::Riak::Transport::PBC;
+
+use Moose::Role;
+
+with qw/
+ Net::Riak::Role::PBC
+ /;
+
+1;
View
90 lib/Net/Riak/Transport/PBC/Code.pm
@@ -0,0 +1,90 @@
+package Net::Riak::Transport::PBC::Code;
+use strict;
+use warnings;
+use base 'Exporter';
+
+our @EXPORT_OK = qw/
+ REQ_CODE
+ RESP_CLASS
+ EXPECTED_RESP
+ RESP_DECODER
+/;
+
+sub EXPECTED_RESP {
+ my $code = shift;
+ return {
+ 1 => 2,
+ 3 => 4,
+ 5 => 6,
+ 7 => 8,
+ 9 => 10,
+ 11 => 12,
+ 13 => 14,
+ 15 => 16,
+ 17 => 18,
+ 19 => 20,
+ 21 => 22,
+ 23 => 24,
+ }->{$code};
+}
+sub RESP_CLASS {
+ my $code = shift;
+
+ return {
+ 0 => 'RpbErrorResp',
+ 2 => 'RpbPingResp',
+ 4 => 'RpbGetClientIdResp',
+ 6 => 'RpbSetClientIdResp',
+ 8 => 'RpbGetServerInfoResp',
+ 10 => 'RpbGetResp',
+ 12 => 'RpbPutResp',
+ 14 => 'RpbDelResp',
+ 16 => 'RpbListBucketsResp',
+ 18 => 'RpbListKeysResp',
+ 20 => 'RpbGetBucketResp',
+ 22 => 'RpbSetBucketResp',
+ 24 => 'RpbMapRedResp',
+ }->{$code};
+}
+
+sub RESP_DECODER {
+ my $code = shift;
+
+ return {
+ 0 => 'RpbErrorResp',
+ 2 => undef,
+ 4 => 'RpbGetClientIdResp',
+ 6 => undef,
+ 8 => 'RpbGetServerInfoResp',
+ 10 => 'RpbGetResp',
+ 12 => 'RpbPutResp',
+ 14 => undef,
+ 16 => 'RpbListBucketsResp',
+ 18 => 'RpbListKeysResp',
+ 20 => 'RpbGetBucketResp',
+ 22 => undef,
+ 24 => 'RpbMapRedResp'
+ }->{$code};
+};
+
+
+sub REQ_CODE {
+ my $class = shift;
+
+ return {
+ RpbPingReq => 1,
+ RpbGetClientIdReq => 3,
+ RpbSetClientIdReq => 5,
+ RpbGetServerInfoReq => 7,
+ RpbGetReq => 9,
+ RpbPutReq => 11,
+ RpbDelReq => 13,
+ RpbListBucketsReq => 15,
+ RpbListKeysReq => 17,
+ RpbGetBucketReq => 19,
+ RpbSetBucketReq => 21,
+ RpbMapRedReq => 23,
+ }->{$class};
+}
+
+1;
View
121 lib/Net/Riak/Transport/PBC/Message.pm
@@ -0,0 +1,121 @@
+package Net::Riak::Transport::PBC::Message;
+
+use Moose;
+use MooseX::Types::Moose qw/Str HashRef Int/;
+use Net::Riak::Types 'Socket';
+use Net::Riak::Transport::PBC::Code qw/
+ REQ_CODE EXPECTED_RESP RESP_CLASS RESP_DECODER/;
+use Net::Riak::Transport::PBC::Transport;
+
+has socket => (
+ is => 'rw',
+ isa => Socket,
+ predicate => 'has_socket',
+);
+
+has request => (
+ isa => 'Str',
+ is => 'ro',
+ lazy_build => 1,
+);
+
+has request_code => (
+ required => 1,
+ isa => Int,
+ is => 'ro',
+ lazy_build => 1,
+);
+
+has message_type => (
+ required => 1,
+ isa => Str,
+ is => 'ro',
+ trigger => sub {
+ $_[0]->{message_type} = 'Rpb'.$_[1];
+ }
+);
+
+has params => (
+ is => 'ro',
+ isa => HashRef,
+);
+
+sub _build_request_code {
+ my $self = shift;
+ return REQ_CODE($self->message_type);
+}
+
+sub _build_request {
+ my $self = shift;
+ $self->_pack_request( $self->request_code, $self->encode );
+}
+
+sub _pack_request {
+ my ($self, $code, $req) = @_;
+ my $h = pack('c', $code) . $req;
+ use bytes;
+ my $len = length $h;
+ return pack('N',$len).$h;
+}
+
+sub encode {
+ my $self = shift;
+ return $self->message_type->can('encode')
+ ? $self->message_type->encode( $self->params )
+ : '';
+}
+
+sub decode {
+ my ($self, $type, $raw_content) = @_;
+ return 'Rpb'.$type->decode($raw_content);
+}
+
+sub send {
+ my ($self, $cb) = @_;
+
+ die "No socket? did you forget to ->connect?" unless $self->has_socket;
+
+ $self->socket->print($self->request);
+
+ my $resp = $self->handle_response;
+
+ return $resp unless $cb;
+
+ $cb->($resp);
+ while (!$resp->done) {
+ $resp = $self->handle_response;
+# use YAML::Syck; warn Dump $resp;
+ $cb->($resp);
+ }
+ return 1;
+}
+
+sub handle_response {
+ my $self = shift;
+ my ($code, $resp) = $self->_unpack_response;
+
+ my $expected_code = EXPECTED_RESP($self->request_code);
+
+ if ($expected_code != $code) {
+ # TODO throw object
+ die "Expecting response type "
+ . RESP_CLASS($expected_code)
+ . " got " . RESP_CLASS($code);
+ }
+
+ return 1 unless RESP_DECODER($code);
+ return RESP_DECODER($code)->decode($resp);
+}
+
+sub _unpack_response {
+ my $self = shift;
+ my ( $len, $code, $msg );
+ $self->socket->read( $len, 4 );
+ $len = unpack( 'N', $len );
+ $self->socket->read( $code, 1 );
+ $code = unpack( 'c', $code );
+ $self->socket->read( $msg, $len - 1 );
+ return ( $code, $msg );
+}
+
+1;
View
483 lib/Net/Riak/Transport/PBC/Transport.pm
@@ -0,0 +1,483 @@
+package Net::Riak::Transport::PBC;
+
+##
+## This file was generated by Google::ProtocolBuffers (0.08)
+## on Mon Dec 13 11:30:34 2010
+##
+use strict;
+use warnings;
+use Google::ProtocolBuffers;
+{
+ unless (RpbSetClientIdReq->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbSetClientIdReq',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'client_id', 1, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbSetBucketReq->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbSetBucketReq',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'bucket', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ 'RpbBucketProps',
+ 'props', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbPutReq->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbPutReq',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'bucket', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'key', 2, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'vclock', 3, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ 'RpbContent',
+ 'content', 4, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_UINT32(),
+ 'w', 5, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_UINT32(),
+ 'dw', 6, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BOOL(),
+ 'return_body', 7, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbListBucketsResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbListBucketsResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REPEATED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'buckets', 1, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbGetBucketResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbGetBucketResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ 'RpbBucketProps',
+ 'props', 1, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbGetReq->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbGetReq',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'bucket', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'key', 2, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_UINT32(),
+ 'r', 3, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbGetBucketReq->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbGetBucketReq',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'bucket', 1, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbLink->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbLink',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'bucket', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'key', 2, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'tag', 3, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbGetResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbGetResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REPEATED(),
+ 'RpbContent',
+ 'content', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'vclock', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbPair->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbPair',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'key', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'value', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbPutResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbPutResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REPEATED(),
+ 'RpbContent',
+ 'content', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'vclock', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbDelReq->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbDelReq',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'bucket', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'key', 2, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_UINT32(),
+ 'rw', 3, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbMapRedReq->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbMapRedReq',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'request', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'content_type', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbMapRedResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbMapRedResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_UINT32(),
+ 'phase', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'response', 2, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BOOL(),
+ 'done', 3, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbGetClientIdResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbGetClientIdResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'client_id', 1, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbErrorResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbErrorResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'errmsg', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_UINT32(),
+ 'errcode', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbBucketProps->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbBucketProps',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_UINT32(),
+ 'n_val', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BOOL(),
+ 'allow_mult', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbGetServerInfoResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbGetServerInfoResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'node', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'server_version', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbListKeysReq->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbListKeysReq',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'bucket', 1, undef