Browse files

Refactored pool code to use ResourcePool.

  • Loading branch information...
1 parent f5afe3d commit 8968d0b68eaa8f69ce920f7837a952c57291e885 @fmgoncalves committed Nov 9, 2011
Showing with 237 additions and 105 deletions.
  1. +53 −0 examples/multigettimer.pm
  2. +1 −1 lib/Cassandra/Composite.pm
  3. +24 −103 lib/Cassandra/Pool.pm
  4. +156 −0 lib/Cassandra/Pool/CassandraServer.pm
  5. +3 −1 lib/Cassandra/Simple.pm
View
53 examples/multigettimer.pm
@@ -0,0 +1,53 @@
+use strict;
+use warnings;
+
+use Data::Dumper;
+
+use Cassandra::Simple;
+use Cassandra::Composite qw/composite composite_to_array/;
+
+use Sys::Hostname qw/hostname/;
+use Time::HiRes qw/time/;
+
+sub println {
+ print @_, "\n";
+}
+
+my ( $keyspace, $column_family) = qw/simple simple/;
+
+my $conn = Cassandra::Simple->new( server_name => '127.0.0.1',
+ keyspace => $keyspace, );
+
+my $present =
+ grep { $_ eq $column_family } @{ [ $conn->list_keyspace_cfs($keyspace) ] };
+
+unless ($present) {
+ println "Creating $column_family in $keyspace";
+ $conn->create_column_family( $keyspace, $column_family,
+ {
+ comparator_type => 'UTF8Type',
+ key_validation_class => 'UTF8Type',
+ default_validation_class => 'UTF8Type',
+ } );
+}
+
+my %data= map { "k".$_ => {map{ "c".$_ => "v".$_ } 1..20} } 1..100;
+println "inserting";
+
+$conn->batch_insert($column_family,\%data);
+println "Sleeping 3 seconds to settle and then multigeting";
+sleep 3;
+
+my ($start,$end )= (1,3);
+
+foreach(1..10){
+ my $t0 = time;
+
+ $conn->multiget($column_family, [map { "k".$_ } $start..$end]);
+
+ println "".($end-$start). " \t ". (time-$t0 ) ;
+ $end += 10;
+ sleep 0.1;
+}
+
+$conn->remove($column_family);
View
2 lib/Cassandra/Composite.pm
@@ -10,7 +10,7 @@ our @EXPORT = qw/ composite composite_to_array/;
use Data::Dumper;
-=head2 multiget
+=head2 composite
Usage: C<< composite($component [, $component [, ...]]) >>
View
127 lib/Cassandra/Pool.pm
@@ -25,6 +25,11 @@ use Thrift::BinaryProtocol;
use Thrift::FramedTransport;
use Thrift::Socket;
+use ResourcePool;
+use ResourcePool::LoadBalancer;
+
+use Cassandra::Pool::CassandraServer;
+
use Data::Dumper;
sub new {
@@ -33,128 +38,44 @@ sub new {
my $opt = shift;
my $self = {};
- my $server_name = $opt->{server_name} // '127.0.0.1';
- my $server_port = $opt->{server_port} // 9160;
- $self->{username} = $opt->{username} // '';
- $self->{password} = $opt->{password} // '';
+ $opt->{keyspace} = $keyspace;
$self = bless( $self, $class );
- $self->{keyspace} = $keyspace;
-
- my $first = $self->create_client(
- { server_name => $server_name, server_port => $server_port } );
-
- $self->{server_port} = $server_port;
-
- $self->{pool} = {};
-
- $self->{pool}->{$server_name} = [ $first, 0 ];
-
+ my $loadbalancer = ResourcePool::LoadBalancer->new("cassandra", MaxTry => 6);#TODO try alternative policy methods
+
+ $loadbalancer->add_pool(ResourcePool->new(Cassandra::Pool::CassandraServerFactory->new($opt)));
+
# print Dumper map { split( /\//, $_->{endpoints}->[0]) } @{$first->describe_ring($keyspace)};
foreach ( map { split( /\//, $_->{endpoints}->[0] ) }
- @{ $first->describe_ring($keyspace) } )
+ @{ $loadbalancer->get()->describe_ring($keyspace) } )
{
- next if $server_name eq $_;
-
- # print ">> ",$_," ",$keyspace,"\n";
- $self->{pool}->{$_} = [ undef, 0 ];
+ next if $opt->{server_name} eq $_;
+ my %params = {$opt};
+ $params{server_name} = $_;
+ $loadbalancer->add_pool(ResourcePool->new(Cassandra::Pool::CassandraServerFactory->new(\%params)));
}
-
+ $self->{pool} = $loadbalancer;
+
return $self;
}
-sub create_client {
-
- my $self = shift;
- my $opt = shift // {};
- my $server_name = $opt->{server_name};
- my $server_port = $opt->{server_port} // 9160;
-
- my $sock = Thrift::Socket->new( $server_name, $server_port );
-
- my $transport = Thrift::FramedTransport->new( $sock, 1024, 1024 );
-
- my $protocol = Thrift::BinaryProtocol->new($transport);
-
- my $client = Cassandra::CassandraClient->new($protocol);
-
- $transport->open;
-
- # print "»opened transport \n";
-
- my $auth = Cassandra::AuthenticationRequest->new;
- $auth->{credentials} =
- { username => $self->{username}, password => $self->{password} };
-
- $client->login($auth);
- $client->set_keyspace( $self->{keyspace} );
-
- return $client;
-}
-
-sub get {
+sub get{
my $self = shift;
-
- #TODO: obviously not a good solution. should use a priorityqueue or something, not sort the whole pool at every get.
- my @ol =
- sort { $self->{pool}->{$a}->[1] cmp $self->{pool}->{$b}->[1] }
- keys %{ $self->{pool} };
- my $server_name = shift @ol;
-
- #print Dumper $self->{pool}->{$server_name};
- #print ">Defined long name ". defined $self->{pool}->{$server_name}->[0]."\n";
-
- my $cl = $self->{pool}->{$server_name}->[0];
-
- #print ">Defined short ". defined $cl."\n";
-
- while ( (not defined $cl) and scalar @ol ) {
- #print ">> undef $server_name ". defined $cl, "\n";
- eval {
- $cl = $self->create_client(
- {
- server_name => $server_name,
- server_port => $self->{server_port}
- }
- );
- $self->{pool}->{$server_name}->[0] = $cl;
- };
- if ($@) {
- $self->{pool}->{$server_name}->[1] += 5;
- $server_name = shift @ol;
- $cl = $self->{pool}->{$server_name}->[0];
- }
- }
-# print "COUNTER for $server_name -> " . $self->{pool}->{$server_name}->[1], "\n";
- $self->{pool}->{$server_name}->[1] += 1;
-
- #print "$server_name ->". defined $cl. " |";
-
- return $cl;
+ return $self->{pool}->get(@_);
}
-sub put {
+sub put{
my $self = shift;
- my $client = shift;
-
- my $server_name = $client->{input}->{trans}->{transport}->{host};
-
- $self->{pool}->{$server_name}->[1] -= 1;
+ return $self->{pool}->free(@_);
}
-sub fail {
+sub fail{
my $self = shift;
- my $client = shift;
- my $server_name = $client->{input}->{trans}->{transport}->{host};
-
-# print "FAILED $server_name";
-
- $self->{pool}->{$server_name}->[1] += 5;
- $self->{pool}->{$server_name}->[0] = undef;
+ return $self->{pool}->fail(@_);
}
-
+#get, put, fail
1
View
156 lib/Cassandra/Pool/CassandraServer.pm
@@ -0,0 +1,156 @@
+package Cassandra::Pool::CassandraServer;
+
+use strict;
+use warnings;
+
+use Cassandra::Cassandra;
+use Cassandra::Types;
+use Thrift;
+use Thrift::BinaryProtocol;
+use Thrift::FramedTransport;
+use Thrift::Socket;
+
+use Data::Dumper;
+
+use base qw/ResourcePool::Resource/;
+
+sub new {
+ my $class = shift;
+ my $opt = shift;
+
+ my $self = {};
+
+ my $server_name = $opt->{server_name} || '127.0.0.1';
+ my $server_port = $opt->{server_port} || 9160;
+
+ my $sock = Thrift::Socket->new( $server_name, $server_port );
+
+ my $transport = Thrift::FramedTransport->new( $sock, 1024, 1024 );
+
+ my $protocol = Thrift::BinaryProtocol->new($transport);
+
+ $self->{client} = Cassandra::CassandraClient->new($protocol);
+
+ $transport->open;
+
+ my $auth = Cassandra::AuthenticationRequest->new;
+ $auth->{credentials} = {
+ username => $opt->{username} || '',
+ password => $opt->{password} || ''
+ };
+
+ $self->{client}->login($auth);
+ $self->{client}->set_keyspace( $opt->{keyspace} );
+
+ bless( $self, $class );
+
+ return $self;
+}
+
+=head
+Closes a connection gracefully.
+=cut
+
+sub close {
+ my $self = shift;
+ $self->{client}->{input}->getTransport()->close();
+ eval { $self->{client}->{output}->getTransport()->close(); };
+}
+
+=head
+Closes a failed connection and ignores error (since this connection is known to be broken)
+=cut
+
+sub fail_close {
+ my $self = shift;
+ eval {
+ $self->{client}->{input}->getTransport()->close();
+ $self->{client}->{output}->getTransport()->close();
+ };
+}
+
+=head
+Returns the naked resource which can be used by the client. This is the DBI or Net::LDAP handle for example.
+
+Returns: a reference to a object
+=cut
+
+sub get_plain_resource {
+ my $self = shift;
+ return $self->{client};
+}
+
+=head
+Checks a connection.
+=cut
+
+sub check {
+ my $self = shift;
+ return $self->{client}->{input}->getTransport()->isOpen()
+ && $self->{client}->{output}->getTransport()->isOpen();
+}
+
+=head
+Checks a connection. This method is called by the get() method of the ResourcePool before it returns a connection. The default implementation always returns true.
+
+Returns: true if the connection is valid
+=cut
+
+sub precheck {
+ return check(@_);
+}
+
+=head
+Checks a connection. This method is called by the free() method of the ResourcePool to check if a connection is still valid. The default implementation always returns true.
+
+Returns: true if the connection is valid
+=cut
+
+sub postcheck {
+ return check(@_);
+}
+
+##Factory class
+package Cassandra::Pool::CassandraServerFactory;
+
+use Data::Dumper;
+
+use base qw/ResourcePool::Factory/;
+
+=head
+The new method is called to create a new factory.
+
+Usually this method just stores the parameters somewhere and will use it later create_resource is called.
+=cut
+
+sub new {
+ my ( $class, $params ) = @_;
+
+ die 'A keyspace must be provided' unless $params->{keyspace};
+
+ my $self = {};
+ $self->{params} = $params;
+ bless( $self, $class );
+ return $self;
+}
+
+=head
+This method is used to actually create a resource according to the parameters given to the new method.
+
+You must override this method in order to do something useful.
+
+Returns: a reference to a ResourcePool::Resource object
+=cut
+
+sub create_resource {
+ my $self = shift;
+
+ return new Cassandra::Pool::CassandraServer( $self->{params} );
+}
+
+#TODO implement remaining methods if needed only
+#$factory->info
+#$factory->mk_singleton_key
+#$factory->singleton
+
+1;
View
4 lib/Cassandra/Simple.pm
@@ -284,8 +284,10 @@ sub multiget {
my $level = $self->_consistency_level_read($opt);
my $cl = $self->pool->get();
+ my $t0 = time;
my $result =
eval { $cl->multiget_slice( $keys, $columnParent, $predicate, $level ) };
+
if ($@) { print Dumper $@; $self->pool->fail($cl) }
else { $self->pool->put($cl) }
@@ -890,7 +892,7 @@ sub remove {
if ($keys) {
- $keys = [$keys] unless ref($keys) eq 'ARRAY';
+ $keys = [$keys] unless UNIVERSAL::isa( $keys, 'ARRAY' );
my $deletion =
Cassandra::Deletion->new(

0 comments on commit 8968d0b

Please sign in to comment.