Permalink
Browse files

Smarter pool management.

  • Loading branch information...
1 parent 451dad3 commit 29570789b9afb6d4642a1d152dbccb37b27a0368 @fmgoncalves committed Sep 6, 2011
Showing with 239 additions and 116 deletions.
  1. +9 −2 examples/cassandra_simple_test.pm
  2. +75 −23 lib/Cassandra/Pool.pm
  3. +155 −91 lib/Cassandra/Simple.pm
@@ -6,13 +6,16 @@ use Data::Dumper;
use Cassandra::Simple;
use Cassandra::Pool;
+use Sys::Hostname qw/hostname/;
+
sub println {
print @_, "\n";
}
my ($keyspace, $column_family) = qw/simple simple/;
my $conn = Cassandra::Simple->new(
+ server_name => '127.0.0.1',
keyspace => $keyspace,
);
@@ -98,12 +101,16 @@ println Dumper $conn->get_range($column_family);
println "\$conn->ring('simple')";
println Dumper $conn->ring('simple');
-
+println "new Cassandra::Pool($keyspace)";
my $pool = new Cassandra::Pool($keyspace);
+println "\$pool->get()";
my $client = $pool->get();
println "\$client->describe_snitch()";
println $client->describe_snitch();
+println "\$client";
+println Dumper $client;
-
+println "\$pool->put(\$client)";
+$pool->put($client);
View
@@ -27,72 +27,124 @@ use Thrift::Socket;
use Data::Dumper;
-
-sub new{
- my $class = shift;
+sub new {
+ my $class = shift;
my $keyspace = shift;
- my $opt = shift;
- my $self={};
+ my $opt = shift;
+ my $self = {};
my $server_name = $opt->{server_name} // '127.0.0.1';
my $server_port = $opt->{server_port} // 9160;
$self->{username} = $opt->{username} // '';
$self->{password} = $opt->{password} // '';
- $self = bless($self, $class);
+ $self = bless( $self, $class );
+
+ $self->{keyspace} = $keyspace;
+
+ my $first = $self->create_client(
+ { server_name => $server_name, server_port => $server_port } );
- my $first = $self->create_client({server_name => $server_name, server_port => $server_port});
- $first->set_keyspace($keyspace);
+ $self->{server_port} = $server_port;
- $self->{pool} = [];
+ $self->{pool} = {};
- push @{$self->{pool}}, $first;
+ $self->{pool}->{$server_name} = [ $first, 0 ];
# print Dumper map { split( /\//, $_->{endpoints}->[0]) } @{$first->describe_ring($keyspace)};
- foreach( map {split( /\//, $_->{endpoints}->[0])} @{$first->describe_ring($keyspace)} ){
+ foreach ( map { split( /\//, $_->{endpoints}->[0] ) }
+ @{ $first->describe_ring($keyspace) } )
+ {
next if $server_name eq $_;
-# print ">> ",$_," ",$keyspace,"\n";
- my $cl = $self->create_client({server_name => $_, server_port => $server_port});
- $cl->set_keyspace($keyspace);
- push @{$self->{pool}}, $cl;
+
+ # print ">> ",$_," ",$keyspace,"\n";
+ $self->{pool}->{$_} = [ undef, 0 ];
}
return $self;
}
-sub create_client{
+sub create_client {
- my $self = shift;
- my $opt = shift // {};
+ my $self = shift;
+ my $opt = shift // {};
my $server_name = $opt->{server_name};
my $server_port = $opt->{server_port} // 9160;
my $sock = Thrift::Socket->new( $server_name, $server_port );
my $transport = Thrift::FramedTransport->new( $sock, 1024, 1024 );
- my $protocol = Thrift::BinaryProtocol->new( $transport );
+ my $protocol = Thrift::BinaryProtocol->new($transport);
- my $client = Cassandra::CassandraClient->new( $protocol );
+ my $client = Cassandra::CassandraClient->new($protocol);
$transport->open;
-# print "»opened transport \n";
+ # print "»opened transport \n";
my $auth = Cassandra::AuthenticationRequest->new;
$auth->{credentials} =
{ username => $self->{username}, password => $self->{password} };
$client->login($auth);
+ $client->set_keyspace( $self->{keyspace} );
return $client;
}
-sub get{
+sub get {
my $self = shift;
- return $self->{pool}->[rand @{$self->{pool}}];
+
+ my @ol =
+ sort { $self->{pool}->{$a}->[1] cmp $self->{pool}->{$b}->[1] } #TODO: obviously not a good solution. should use a priorityqueue or something, not sort the whole pool at every get.
+ keys %{ $self->{pool} };
+ my $server_name = shift @ol;
+
+ my $cl = $self->{pool}->{$server_name}->[0];
+
+ while ( (not defined $cl) and scalar @ol ) {
+ #print ">> undef $server_name ". defined $cl, "\n";
+ eval {
+ $cl = $self->create_client(
+ {
+ server_name => $server_name,
+ server_port => $self->{server_port}
+ }
+ );
+ $self->{pool}->{$server_name}->[0] = $cl;
+ };
+ if ($@) {
+ $self->{pool}->{$server_name}->[1] += 5;
+ $server_name = shift @ol;
+ $cl = $self->{pool}->{$server_name}->[0];
+ }
+ }
+# print "COUNTER for $server_name -> " . $self->{pool}->{$server_name}->[1], "\n";
+ $self->{pool}->{$server_name}->[1] += 1;
+ return $cl;
}
+sub put {
+ my $self = shift;
+ my $client = shift;
+
+ my $server_name = $client->{input}->{trans}->{transport}->{host};
+
+ $self->{pool}->{$server_name}->[1] -= 1;
+}
+
+sub fail {
+ my $self = shift;
+ my $client = shift;
+
+ my $server_name = $client->{input}->{trans}->{transport}->{host};
+
+ $self->{pool}->{$server_name}->[1] += 5;
+ $self->{pool}->{$server_name}->[0] += undef;
+}
+
+
1
Oops, something went wrong.

0 comments on commit 2957078

Please sign in to comment.