Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Added keyspace manipulation methods. Fixed major bug (same keys in Lo…

…adBalancer) in Cassandra::Pool
  • Loading branch information...
commit 3881047d65c5eb250ad9af7237791709ae110eba 1 parent 1e5718f
Filipe Gonçalves authored
7 README
View
@@ -184,6 +184,13 @@ SYNOPSYS
is an HASH containing Column Family Definition options (column_type,
comparator_type, etc.).
+ create_keyspace
+ Usage "create_keyspace($keyspace [, $opt])"
+
+ $opt is an *HASH* and can have the following keys:
+
+ strategy
+
create_index
Usage: "create_index($keyspace, $column_family, $columns,
[$validation_class])"
58 examples/cassandra_simple_test.pm
View
@@ -12,31 +12,49 @@ sub println {
print @_, "\n";
}
-my ( $keyspace, $column_family, $composite_column_family) = qw/simple simple simplecomposite/;
+my ( $keyspace, $column_family, $composite_column_family ) =
+ qw/simple simple simplecomposite/;
+
+my $sys_conn = Cassandra::Simple->new( server_name => hostname() );
+unless ( grep { $_ eq $keyspace } @{ $sys_conn->list_keyspaces() } ) {
+ println "Creating keyspace $keyspace";
+ $sys_conn->create_keyspace($keyspace);
+}
my $conn = Cassandra::Simple->new( server_name => hostname(),
keyspace => $keyspace, );
-
my $present =
- grep { $_ eq $column_family } @{ [ $conn->list_keyspace_cfs($keyspace) ] };
+ grep { $_ eq $column_family } @{ $conn->list_keyspace_cfs($keyspace) };
unless ($present) {
println "Creating $column_family in $keyspace";
- $conn->create_column_family( $keyspace, $column_family,
- {
- comparator_type => 'UTF8Type',
- key_validation_class => 'UTF8Type',
- default_validation_class => 'UTF8Type',
- } );
+ $conn->create_column_family(
+ $keyspace,
+ $column_family,
+ {
+ comparator_type => 'UTF8Type',
+ key_validation_class => 'UTF8Type',
+ default_validation_class => 'UTF8Type',
+ }
+ );
}
$present =
- grep { $_ eq $composite_column_family} @{ [ $conn->list_keyspace_cfs($keyspace) ] };
+ grep { $_ eq $composite_column_family }
+ @{ $conn->list_keyspace_cfs($keyspace) };
unless ($present) {
println "Creating $composite_column_family in $keyspace";
- $conn->create_column_family( $keyspace, $composite_column_family );
+ $conn->create_column_family(
+ $keyspace,
+ $composite_column_family,
+ {
+ comparator_type => 'CompositeType(UTF8Type,UTF8Type)',
+ key_validation_class => 'UTF8Type',
+ default_validation_class => 'UTF8Type',
+ }
+ );
}
#Method to test code here success
@@ -135,11 +153,10 @@ println Dumper $conn->get_indexed_slices( $column_family,
println
"\$conn->get_indexed_slices($column_family, { expression_list => [ [ 'age' , '=' , '12' ] ] })";
println Dumper $conn->get_indexed_slices( $column_family,
- { expression_list => [ [ 'age' , '=' , '12' ] ] } );
+ { expression_list => [ [ 'age', '=', '12' ] ] } );
#Expected result: Rows whisky1, whisky2, whisky4
-
println
"\$conn->remove($column_family, [ 'ChaveA' ], { columns => [ 'ColunaA1' ]})";
println Dumper $conn->remove( $column_family, ['ChaveA'],
@@ -172,11 +189,11 @@ println Dumper $conn->get_range($column_family);
println "\$conn->ring('simple')";
println Dumper $conn->ring('simple');
-
println
"\$conn->insert( $composite_column_family, 'hello', { composite( 'a','en') => 'world' , composite('a','pt') => 'mundo' } )";
println Dumper $conn->insert(
- $composite_column_family, "hello",
+ $composite_column_family,
+ "hello",
{
composite( "a", "en" ) => "world",
composite( "a", "pt" ) => "mundo"
@@ -184,10 +201,15 @@ println Dumper $conn->insert(
);
println
- "\$conn->get( $composite_column_family, 'hello', { columns => [ composite('a', 'pt' ) ] } )";
+"\$conn->get( $composite_column_family, 'hello', { columns => [ composite('a', 'pt' ) ] } )";
my $x = $conn->get( $composite_column_family, "hello",
- { columns => [ composite( "a", "pt" ) ] } );
-println Dumper { map { ( join ':', @{composite_to_array($_)} ) => $x->{$_} } keys % $x };
+ { columns => [ composite( "a", "pt" ) ] } );
+println Dumper {
+ map { ( join ':', @{ composite_to_array($_) } ) => $x->{$_} } keys %$x;
+};
println Dumper "\$conn->remove($composite_column_family)";
println Dumper $conn->remove($composite_column_family);
+
+println Dumper "\$conn->drop_keyspace($keyspace)";
+println Dumper $sys_conn->drop_keyspace($keyspace);
118 lib/Cassandra/Cassandra.pm
View
@@ -6264,17 +6264,17 @@ sub recv_login{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_login_result();
$result->read($self->{input});
$self->{input}->readMessageEnd();
if (defined $result->{authnx}) {
- die $result->{authnx};
+ die $result->{authnx}->why;
}
if (defined $result->{authzx}) {
- die $result->{authzx};
+ die $result->{authzx}->why;
}
return;
}
@@ -6310,14 +6310,14 @@ sub recv_set_keyspace{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_set_keyspace_result();
$result->read($self->{input});
$self->{input}->readMessageEnd();
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
return;
}
@@ -6359,7 +6359,7 @@ sub recv_get{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_get_result();
$result->read($self->{input});
@@ -6369,7 +6369,7 @@ sub recv_get{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{nfe}) {
die $result->{nfe};
@@ -6423,7 +6423,7 @@ sub recv_get_slice{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_get_slice_result();
$result->read($self->{input});
@@ -6433,7 +6433,7 @@ sub recv_get_slice{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{ue}) {
die $result->{ue};
@@ -6484,7 +6484,7 @@ sub recv_get_count{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_get_count_result();
$result->read($self->{input});
@@ -6494,7 +6494,7 @@ sub recv_get_count{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{ue}) {
die $result->{ue};
@@ -6545,7 +6545,7 @@ sub recv_multiget_slice{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_multiget_slice_result();
$result->read($self->{input});
@@ -6555,7 +6555,7 @@ sub recv_multiget_slice{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{ue}) {
die $result->{ue};
@@ -6606,7 +6606,7 @@ sub recv_multiget_count{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_multiget_count_result();
$result->read($self->{input});
@@ -6616,7 +6616,7 @@ sub recv_multiget_count{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{ue}) {
die $result->{ue};
@@ -6667,7 +6667,7 @@ sub recv_get_range_slices{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_get_range_slices_result();
$result->read($self->{input});
@@ -6677,7 +6677,7 @@ sub recv_get_range_slices{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{ue}) {
die $result->{ue};
@@ -6728,7 +6728,7 @@ sub recv_get_indexed_slices{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_get_indexed_slices_result();
$result->read($self->{input});
@@ -6738,7 +6738,7 @@ sub recv_get_indexed_slices{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{ue}) {
die $result->{ue};
@@ -6789,14 +6789,14 @@ sub recv_insert{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_insert_result();
$result->read($self->{input});
$self->{input}->readMessageEnd();
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{ue}) {
die $result->{ue};
@@ -6847,14 +6847,14 @@ sub recv_add{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_add_result();
$result->read($self->{input});
$self->{input}->readMessageEnd();
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{ue}) {
die $result->{ue};
@@ -6905,14 +6905,14 @@ sub recv_remove{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_remove_result();
$result->read($self->{input});
$self->{input}->readMessageEnd();
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{ue}) {
die $result->{ue};
@@ -6960,14 +6960,14 @@ sub recv_remove_counter{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_remove_counter_result();
$result->read($self->{input});
$self->{input}->readMessageEnd();
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{ue}) {
die $result->{ue};
@@ -7012,14 +7012,14 @@ sub recv_batch_mutate{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_batch_mutate_result();
$result->read($self->{input});
$self->{input}->readMessageEnd();
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{ue}) {
die $result->{ue};
@@ -7061,14 +7061,14 @@ sub recv_truncate{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_truncate_result();
$result->read($self->{input});
$self->{input}->readMessageEnd();
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{ue}) {
die $result->{ue};
@@ -7104,7 +7104,7 @@ sub recv_describe_schema_versions{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_describe_schema_versions_result();
$result->read($self->{input});
@@ -7114,7 +7114,7 @@ sub recv_describe_schema_versions{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
die "describe_schema_versions failed: unknown result";
}
@@ -7147,7 +7147,7 @@ sub recv_describe_keyspaces{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_describe_keyspaces_result();
$result->read($self->{input});
@@ -7157,7 +7157,7 @@ sub recv_describe_keyspaces{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
die "describe_keyspaces failed: unknown result";
}
@@ -7190,7 +7190,7 @@ sub recv_describe_cluster_name{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_describe_cluster_name_result();
$result->read($self->{input});
@@ -7230,7 +7230,7 @@ sub recv_describe_version{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_describe_version_result();
$result->read($self->{input});
@@ -7273,7 +7273,7 @@ sub recv_describe_ring{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_describe_ring_result();
$result->read($self->{input});
@@ -7283,7 +7283,7 @@ sub recv_describe_ring{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
die "describe_ring failed: unknown result";
}
@@ -7316,7 +7316,7 @@ sub recv_describe_partitioner{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_describe_partitioner_result();
$result->read($self->{input});
@@ -7356,7 +7356,7 @@ sub recv_describe_snitch{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_describe_snitch_result();
$result->read($self->{input});
@@ -7399,7 +7399,7 @@ sub recv_describe_keyspace{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_describe_keyspace_result();
$result->read($self->{input});
@@ -7412,7 +7412,7 @@ sub recv_describe_keyspace{
die $result->{nfe};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
die "describe_keyspace failed: unknown result";
}
@@ -7457,7 +7457,7 @@ sub recv_describe_splits{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_describe_splits_result();
$result->read($self->{input});
@@ -7467,7 +7467,7 @@ sub recv_describe_splits{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
die "describe_splits failed: unknown result";
}
@@ -7503,7 +7503,7 @@ sub recv_system_add_column_family{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_system_add_column_family_result();
$result->read($self->{input});
@@ -7513,7 +7513,7 @@ sub recv_system_add_column_family{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{sde}) {
die $result->{sde};
@@ -7552,7 +7552,7 @@ sub recv_system_drop_column_family{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_system_drop_column_family_result();
$result->read($self->{input});
@@ -7562,7 +7562,7 @@ sub recv_system_drop_column_family{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{sde}) {
die $result->{sde};
@@ -7601,7 +7601,7 @@ sub recv_system_add_keyspace{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_system_add_keyspace_result();
$result->read($self->{input});
@@ -7611,7 +7611,7 @@ sub recv_system_add_keyspace{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{sde}) {
die $result->{sde};
@@ -7650,7 +7650,7 @@ sub recv_system_drop_keyspace{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_system_drop_keyspace_result();
$result->read($self->{input});
@@ -7660,7 +7660,7 @@ sub recv_system_drop_keyspace{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{sde}) {
die $result->{sde};
@@ -7699,7 +7699,7 @@ sub recv_system_update_keyspace{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_system_update_keyspace_result();
$result->read($self->{input});
@@ -7709,7 +7709,7 @@ sub recv_system_update_keyspace{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{sde}) {
die $result->{sde};
@@ -7748,7 +7748,7 @@ sub recv_system_update_column_family{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_system_update_column_family_result();
$result->read($self->{input});
@@ -7758,7 +7758,7 @@ sub recv_system_update_column_family{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{sde}) {
die $result->{sde};
@@ -7800,7 +7800,7 @@ sub recv_execute_cql_query{
my $x = new TApplicationException();
$x->read($self->{input});
$self->{input}->readMessageEnd();
- die $x;
+ die $x->getMessage();
}
my $result = new Cassandra::Cassandra_execute_cql_query_result();
$result->read($self->{input});
@@ -7810,7 +7810,7 @@ sub recv_execute_cql_query{
return $result->{success};
}
if (defined $result->{ire}) {
- die $result->{ire};
+ die $result->{ire}->why;
}
if (defined $result->{ue}) {
die $result->{ue};
58 lib/Cassandra/Pool.pm
View
@@ -38,44 +38,58 @@ sub new {
my $opt = shift;
my $self = {};
-
- $opt->{keyspace} = $keyspace;
+ $opt->{keyspace} = $keyspace if $keyspace;
+
+ my $loadbalancer =
+ ResourcePool::LoadBalancer->new( "cassandra" . int( rand() * 100000 ),
+ MaxTry => 6 ); #TODO try alternative policy methods
+
+ $loadbalancer->add_pool(
+ ResourcePool->new( Cassandra::Pool::CassandraServerFactory->new($opt) )
+ );
+
+ if ($keyspace) {
+ my @nodes = @{ $loadbalancer->get()->describe_ring($keyspace) };
+ foreach (
+ map {
+ map { split( /\//, $_ ) } @{ $_->{rpc_endpoints} }
+ } @nodes
+ )
+ {
+ next if $opt->{server_name} eq $_;
+ my %params = %$opt;
+ $params{server_name} = $_;
+ $loadbalancer->add_pool(
+ ResourcePool->new(
+ Cassandra::Pool::CassandraServerFactory->new( \%params ),
+ PreCreate => 2
+ )
+ );
+ }
+ }
+ $self->{pool} = $loadbalancer;
$self = bless( $self, $class );
- my $loadbalancer = ResourcePool::LoadBalancer->new("cassandra", MaxTry => 6);#TODO try alternative policy methods
-
- $loadbalancer->add_pool(ResourcePool->new(Cassandra::Pool::CassandraServerFactory->new($opt)));
-
-# print Dumper map { split( /\//, $_->{endpoints}->[0]) } @{$first->describe_ring($keyspace)};
-
- foreach ( map { split( /\//, $_->{endpoints}->[0] ) }
- @{ $loadbalancer->get()->describe_ring($keyspace) } )
- {
- next if $opt->{server_name} eq $_;
- my %params = %$opt;
- $params{server_name} = $_;
- $loadbalancer->add_pool(ResourcePool->new(Cassandra::Pool::CassandraServerFactory->new(\%params), PreCreate => 2));
- }
- $self->{pool} = $loadbalancer;
-
return $self;
}
-sub get{
+sub get {
my $self = shift;
- return $self->{pool}->get(@_);
+ my $c = $self->{pool}->get(@_);
+ return $c;
}
-sub put{
+sub put {
my $self = shift;
return $self->{pool}->free(@_);
}
-sub fail{
+sub fail {
my $self = shift;
return $self->{pool}->fail(@_);
}
+
#get, put, fail
1
27 lib/Cassandra/Pool/CassandraServer.pm
View
@@ -31,19 +31,26 @@ sub new {
$self->{client} = Cassandra::CassandraClient->new($protocol);
- $transport->open;
+ eval {
+ $transport->open;
+
+ my $auth = Cassandra::AuthenticationRequest->new;
+ $auth->{credentials} = {
+ username => $opt->{username} || '',
+ password => $opt->{password} || ''
+ };
- my $auth = Cassandra::AuthenticationRequest->new;
- $auth->{credentials} = {
- username => $opt->{username} || '',
- password => $opt->{password} || ''
+ $self->{client}->set_keyspace( $opt->{keyspace} );
+ $self->{client}->login($auth);
};
- $self->{client}->login($auth);
- $self->{client}->set_keyspace( $opt->{keyspace} );
+ if ($!) {
+ my $error = $@->{message} || $!;
- bless( $self, $class );
+ die $error;
+ }
+ bless( $self, $class );
return $self;
}
@@ -126,7 +133,7 @@ Usually this method just stores the parameters somewhere and will use it later c
sub new {
my ( $class, $params ) = @_;
- die 'A keyspace must be provided' unless $params->{keyspace};
+ #die 'A keyspace must be provided' unless $params->{keyspace};
my $self = {};
$self->{params} = $params;
@@ -144,9 +151,7 @@ Returns: a reference to a ResourcePool::Resource object
sub create_resource {
my $self = shift;
-
return new Cassandra::Pool::CassandraServer( $self->{params} );
}
-
1;
86 lib/Cassandra/Simple.pm
View
@@ -52,7 +52,7 @@ This module attempts to abstract the underlying Thrift methods as much as possib
=cut
-our $VERSION="0.1";
+our $VERSION = "0.1";
use strict;
use warnings;
@@ -238,7 +238,7 @@ sub get {
if ( exists $opt->{columns} )
{ #TODO extra case for when only 1 column is requested, use thrift api's get
- $predicate->{column_names} = $opt->{columns} ;
+ $predicate->{column_names} = $opt->{columns};
} else {
my $sliceRange = Cassandra::SliceRange->new($opt);
$sliceRange->{start} = $opt->{column_start} // '';
@@ -549,7 +549,7 @@ sub get_indexed_slices {
my @index_expr = map {
my ( $col, $op, $val ) = @$_;
- ($op, $val) = ($val, $op) unless $val;
+ ( $op, $val ) = ( $val, $op ) unless $val;
Cassandra::IndexExpression->new(
{
column_name => $col,
@@ -961,7 +961,7 @@ sub remove {
=head2 list_keyspace_cfs
-Usage: C<< list_keyspace_cfs($keyspace) >>
+Usage: C<< list_keyspace_cfs() >>
Returns an HASH of C<< { column_family_name => column_family_type } >> where column family type is either C<Standard> or C<Super>
@@ -995,10 +995,12 @@ sub create_column_family {
my $cl = $self->pool->get();
my $res = eval { $cl->system_add_column_family($cfdef) };
- $self->_wait_for_agreement();
if ($@) { print Dumper $@; $self->pool->fail($cl) }
else { $self->pool->put($cl) }
+
+ $self->_wait_for_agreement();
$self->clear_ksdef();
+
return $res;
}
@@ -1019,22 +1021,72 @@ strategy
sub create_keyspace {
my $self = shift;
- my $keyspace = shift;
- my $opt = shift // {};
+ my $keyspace = shift;
+ my $opt = shift // {};
+
+ my $params = {};
+ $params->{strategy_class} =
+ 'org.apache.cassandra.locator.NetworkTopologyStrategy'
+ unless $opt->{strategy};
- $opt->{strategy} = 'org.apache.cassandra.locator.SimpleStrategy' unless $opt->{strategy};
- $opt->{cf_defs} = [];
- $opt->{name} = $keyspace;
-
- my $ksdef = Cassandra::KsDef->new($opt);
+ $params->{strategy_options} = { 'datacenter1' => '1' }
+ unless $opt->{strategy_options};
+ $params->{cf_defs} = [];
+ $params->{name} = $keyspace;
+
+ my $ksdef = Cassandra::KsDef->new($params);
my $cl = $self->pool->get();
my $res = eval { $cl->system_add_keyspace($ksdef) };
+
+ if ($@) { print Dumper $@; $self->pool->fail($cl) }
+ else { $self->pool->put($cl) }
+
$self->_wait_for_agreement();
+
+ return $res;
+}
+
+=head2 list_keyspaces
+
+Usage C<< list_keyspaces() >>
+
+=cut
+
+sub list_keyspaces {
+
+ my $self = shift;
+
+ my $cl = $self->pool->get();
+ my $res = eval { $cl->describe_keyspaces() };
+
+ if ($@) { print Dumper $@; $self->pool->fail($cl) }
+ else { $self->pool->put($cl) }
+
+ $res = [ map { $_->{name} } @$res ];
+
+ return $res;
+}
+
+=head2 drop_keyspace
+
+Usage C<< drop_keyspace($keyspace [, $opt]) >>
+
+=cut
+
+sub drop_keyspace {
+ my $self = shift;
+ my $keyspace = shift;
+
+ my $cl = $self->pool->get();
+ my $res = eval { $cl->system_drop_keyspace($keyspace) };
+
if ($@) { print Dumper $@; $self->pool->fail($cl) }
else { $self->pool->put($cl) }
-
+
+ $self->_wait_for_agreement();
+
return $res;
}
@@ -1110,11 +1162,13 @@ sub ring {
my $self = shift;
- my $keyspace = shift;
- my $cl = $self->pool->get();
+ my $keyspace = shift || $self->keyspace;
+ my $cl = $self->pool->get();
my @result = eval {
- map { $_->{endpoints}->[0] } @{ $cl->describe_ring($keyspace) };
+ map {
+ map { $_ } @{ $_->{rpc_endpoints} }
+ } @{ $cl->describe_ring($keyspace) };
};
if ($@) { print Dumper $@; $self->pool->fail($cl) }
else { $self->pool->put($cl) }
Please sign in to comment.
Something went wrong with that request. Please try again.