Permalink
Browse files

Version with database/rowset/row/fieldset/field API.

  • Loading branch information...
1 parent eba0ecb commit 95532cfc379fa8dca3e811fb502e58f15a523ec5 Evan Weaver committed Aug 15, 2009
Showing with 255 additions and 266 deletions.
  1. +1 −1 lib/cassandra.rb
  2. +72 −72 lib/cassandra/cassandra.rb
  3. +0 −86 lib/cassandra/columns.rb
  4. +84 −0 lib/cassandra/fields.rb
  5. +37 −37 lib/cassandra/protocol.rb
  6. +61 −70 test/cassandra_test.rb
View
@@ -16,7 +16,7 @@
require 'cassandra/long'
require 'cassandra/safe_client'
require 'cassandra/ordered_hash'
-require 'cassandra/columns'
+require 'cassandra/fields'
require 'cassandra/protocol'
require 'cassandra/cassandra'
require 'cassandra/constants'
View
@@ -1,6 +1,6 @@
=begin rdoc
-Create a new Cassandra client instance. Accepts a keyspace name, and optional host and port.
+Create a new Cassandra client instance. Accepts a database name, and optional host and port.
client = Cassandra.new('twitter', '127.0.0.1', 9160)
@@ -12,8 +12,8 @@
For read methods, valid option parameters are:
<tt>:count</tt>:: How many results to return. Defaults to 100.
-<tt>:start</tt>:: Column name token at which to start iterating, inclusive. Defaults to nil, which means the first column in the collation order.
-<tt>:finish</tt>:: Column name token at which to stop iterating, inclusive. Defaults to nil, which means no boundary.
+<tt>:start</tt>:: field name token at which to start iterating, inclusive. Defaults to nil, which means the first field in the collation order.
+<tt>:finish</tt>:: field name token at which to stop iterating, inclusive. Defaults to nil, which means no boundary.
<tt>:reversed</tt>:: Swap the direction of the collation order.
<tt>:consistency</tt>:: The consistency level of the request. Defaults to <tt>Cassandra::Consistency::ONE</tt> (one node must respond). Other valid options are <tt>Cassandra::Consistency::ZERO</tt>, <tt>Cassandra::Consistency::QUORUM</tt>, and <tt>Cassandra::Consistency::ALL</tt>.
@@ -27,7 +27,7 @@
=end rdoc
class Cassandra
- include Columns
+ include Fields
include Protocol
class AccessError < StandardError #:nodoc:
@@ -53,15 +53,15 @@ module Consistency
:consistency => Consistency::ONE
}.freeze
- attr_reader :keyspace, :host, :port, :serializer, :transport, :client, :schema
+ attr_reader :database, :host, :port, :serializer, :transport, :client, :schema
# Instantiate a new Cassandra and open the connection.
- def initialize(keyspace, host = '127.0.0.1', port = 9160)
- @is_super = {}
- @column_name_class = {}
- @sub_column_name_class = {}
+ def initialize(database, host = '127.0.0.1', port = 9160)
+ @is_set = {}
+ @field_class = {}
+ @sub_field_class = {}
- @keyspace = keyspace
+ @database = database
@host = host
@port = port
@@ -71,38 +71,38 @@ def initialize(keyspace, host = '127.0.0.1', port = 9160)
CassandraThrift::Cassandra::Client.new(Thrift::BinaryProtocol.new(@transport)),
@transport)
- keyspaces = @client.get_string_list_property("keyspaces")
- unless keyspaces.include?(@keyspace)
- raise AccessError, "Keyspace #{@keyspace.inspect} not found. Available: #{keyspaces.inspect}"
+ databases = @client.get_string_list_property("keyspaces")
+ unless databases.include?(@database)
+ raise AccessError, "Keyspace #{@database.inspect} not found. Available: #{databases.inspect}"
end
- @schema = @client.describe_keyspace(@keyspace)
+ @schema = @client.describe_keyspace(@database)
end
def inspect
- "#<Cassandra:#{object_id}, @keyspace=#{keyspace.inspect}, @schema={#{
+ "#<Cassandra:#{object_id}, @database=#{database.inspect}, @schema={#{
schema.map {|name, hash| ":#{name} => #{hash['type'].inspect}"}.join(', ')
}}, @host=#{host.inspect}, @port=#{port}>"
end
### Write
- # Insert a row for a key. Pass a flat hash for a regular column family, and
- # a nested hash for a super column family. Supports the <tt>:consistency</tt>
+ # Insert a row for a key. Pass a flat hash for a regular field family, and
+ # a nested hash for a super field family. Supports the <tt>:consistency</tt>
# and <tt>:timestamp</tt> options.
- def insert(column_family, key, hash, options = {})
- column_family, _, _, options = params(column_family, [options], WRITE_DEFAULTS)
+ def insert(row_set, key, hash, options = {})
+ row_set, _, _, options = params(row_set, [options], WRITE_DEFAULTS)
- mutation = if is_super(column_family)
+ mutation = if is_set(row_set)
CassandraThrift::BatchMutationSuper.new(
:key => key,
- :cfmap => {column_family =>
- hash_to_super_columns(column_family, hash, options[:timestamp] || Time.stamp)})
+ :cfmap => {row_set =>
+ hash_to_super_fields(row_set, hash, options[:timestamp] || Time.stamp)})
else
CassandraThrift::BatchMutation.new(
:key => key,
- :cfmap => {column_family =>
- hash_to_columns(column_family, hash, options[:timestamp] || Time.stamp)})
+ :cfmap => {row_set =>
+ hash_to_fields(row_set, hash, options[:timestamp] || Time.stamp)})
end
args = [mutation, options[:consistency]]
@@ -111,102 +111,102 @@ def insert(column_family, key, hash, options = {})
## Delete
- # Remove the element at the column_family:key:[column]:[sub_column]
+ # Remove the element at the row_set:key:[field]:[sub_field]
# path you request. Supports the <tt>:consistency</tt> and <tt>:timestamp</tt>
# options.
- def remove(column_family, key, *columns_and_options)
- column_family, column, sub_column, options = params(column_family, columns_and_options, WRITE_DEFAULTS)
- args = [column_family, key, column, sub_column, options[:consistency], options[:timestamp] || Time.stamp]
+ def remove(row_set, key, *fields_and_options)
+ row_set, field, sub_field, options = params(row_set, fields_and_options, WRITE_DEFAULTS)
+ args = [row_set, key, field, sub_field, options[:consistency], options[:timestamp] || Time.stamp]
@batch ? @batch << args : _remove(*args)
end
- # Remove all rows in the column family you request. Supports options
+ # Remove all rows in the field family you request. Supports options
# <tt>:consistency</tt> and <tt>:timestamp</tt>.
# FIXME May not currently delete all records without multiple calls. Waiting
# for ranged remove support in Cassandra.
- def clear_column_family!(column_family, options = {})
- get_range(column_family).each { |key| remove(column_family, key, options) }
+ def clear_row_set!(row_set, options = {})
+ get_range(row_set).each { |key| remove(row_set, key, options) }
end
- # Remove all rows in the keyspace. Supports options <tt>:consistency</tt> and
+ # Remove all rows in the database. Supports options <tt>:consistency</tt> and
# <tt>:timestamp</tt>.
# FIXME May not currently delete all records without multiple calls. Waiting
# for ranged remove support in Cassandra.
- def clear_keyspace!(options = {})
- @schema.keys.each { |column_family| clear_column_family!(column_family, options) }
+ def clear_database!(options = {})
+ @schema.keys.each { |row_set| clear_row_set!(row_set, options) }
end
### Read
- # Count the elements at the column_family:key:[super_column] path you
+ # Count the elements at the row_set:key:[super_field] path you
# request. Supports options <tt>:count</tt>, <tt>:start</tt>, <tt>:finish</tt>,
# <tt>:reversed</tt>, and <tt>:consistency</tt>.
- def count_columns(column_family, key, *columns_and_options)
- column_family, super_column, _, options = params(column_family, columns_and_options, READ_DEFAULTS)
- _count_columns(column_family, key, super_column, options[:consistency])
+ def count_fields(row_set, key, *fields_and_options)
+ row_set, super_field, _, options = params(row_set, fields_and_options, READ_DEFAULTS)
+ _count_fields(row_set, key, super_field, options[:consistency])
end
- # Multi-key version of Cassandra#count_columns. Supports options <tt>:count</tt>,
+ # Multi-key version of Cassandra#count_fields. Supports options <tt>:count</tt>,
# <tt>:start</tt>, <tt>:finish</tt>, <tt>:reversed</tt>, and <tt>:consistency</tt>.
- def multi_count_columns(column_family, keys, *options)
- OrderedHash[*keys.map { |key| [key, count_columns(column_family, key, *options)] }._flatten_once]
+ def multi_count_fields(row_set, keys, *options)
+ OrderedHash[*keys.map { |key| [key, count_fields(row_set, key, *options)] }._flatten_once]
end
# Return a list of single values for the elements at the
- # column_family:key:column[s]:[sub_columns] path you request. Supports the
+ # row_set:key:field[s]:[sub_fields] path you request. Supports the
# <tt>:consistency</tt> option.
- def get_columns(column_family, key, *columns_and_options)
- column_family, columns, sub_columns, options = params(column_family, columns_and_options, READ_DEFAULTS)
- _get_columns(column_family, key, columns, sub_columns, options[:consistency])
+ def get_fields(row_set, key, *fields_and_options)
+ row_set, fields, sub_fields, options = params(row_set, fields_and_options, READ_DEFAULTS)
+ _get_fields(row_set, key, fields, sub_fields, options[:consistency])
end
- # Multi-key version of Cassandra#get_columns. Supports the <tt>:consistency</tt>
+ # Multi-key version of Cassandra#get_fields. Supports the <tt>:consistency</tt>
# option.
- def multi_get_columns(column_family, keys, *options)
- OrderedHash[*keys.map { |key| [key, get_columns(column_family, key, *options)] }._flatten_once]
+ def multi_get_fields(row_set, keys, *options)
+ OrderedHash[*keys.map { |key| [key, get_fields(row_set, key, *options)] }._flatten_once]
end
# Return a hash (actually, a Cassandra::OrderedHash) or a single value
- # representing the element at the column_family:key:[column]:[sub_column]
+ # representing the element at the row_set:key:[field]:[sub_field]
# path you request. Supports options <tt>:count</tt>, <tt>:start</tt>,
# <tt>:finish</tt>, <tt>:reversed</tt>, and <tt>:consistency</tt>.
- def get(column_family, key, *columns_and_options)
- column_family, column, sub_column, options = params(column_family, columns_and_options, READ_DEFAULTS)
- _get(column_family, key, column, sub_column, options[:count], options[:start], options[:finish], options[:reversed], options[:consistency])
+ def get(row_set, key, *fields_and_options)
+ row_set, field, sub_field, options = params(row_set, fields_and_options, READ_DEFAULTS)
+ _get(row_set, key, field, sub_field, options[:count], options[:start], options[:finish], options[:reversed], options[:consistency])
rescue CassandraThrift::NotFoundException
- is_super(column_family) && !sub_column ? OrderedHash.new : nil
+ is_set(row_set) && !sub_field ? OrderedHash.new : nil
end
# Multi-key version of Cassandra#get. Supports options <tt>:count</tt>,
# <tt>:start</tt>, <tt>:finish</tt>, <tt>:reversed</tt>, and <tt>:consistency</tt>.
- def multi_get(column_family, keys, *options)
- OrderedHash[*keys.map { |key| [key, get(column_family, key, *options)] }._flatten_once]
+ def multi_get(row_set, keys, *options)
+ OrderedHash[*keys.map { |key| [key, get(row_set, key, *options)] }._flatten_once]
end
- # Return true if the column_family:key:[column]:[sub_column] path you
+ # Return true if the row_set:key:[field]:[sub_field] path you
# request exists. Supports the <tt>:consistency</tt> option.
- def exists?(column_family, key, *columns_and_options)
- column_family, column, sub_column, options = params(column_family, columns_and_options, READ_DEFAULTS)
- _get(column_family, key, column, sub_column, 1, nil, nil, nil, options[:consistency])
+ def exists?(row_set, key, *fields_and_options)
+ row_set, field, sub_field, options = params(row_set, fields_and_options, READ_DEFAULTS)
+ _get(row_set, key, field, sub_field, 1, nil, nil, nil, options[:consistency])
true
rescue CassandraThrift::NotFoundException
end
- # Return a list of keys in the column_family you request. Requires the
+ # Return a list of keys in the row_set you request. Requires the
# table to be partitioned with OrderPreservingHash. Supports the
# <tt>:count</tt>, <tt>:start</tt>, <tt>:finish</tt>, and <tt>:consistency</tt>
# options.
- def get_range(column_family, options = {})
- column_family, _, _, options = params(column_family, [options], READ_DEFAULTS)
- _get_range(column_family, options[:start], options[:finish], options[:count], options[:consistency])
+ def get_range(row_set, options = {})
+ row_set, _, _, options = params(row_set, [options], READ_DEFAULTS)
+ _get_range(row_set, options[:start], options[:finish], options[:count], options[:consistency])
end
- # Count all rows in the column_family you request. Requires the table
+ # Count all rows in the row_set you request. Requires the table
# to be partitioned with OrderPreservingHash. Supports the <tt>:start</tt>,
# <tt>:finish</tt>, and <tt>:consistency</tt> options.
# FIXME will count only MAX_INT records
- def count_range(column_family, options = {})
- get_range(column_family, options.merge(:count => MAX_INT)).size
+ def count_range(row_set, options = {})
+ get_range(row_set, options.merge(:count => MAX_INT)).size
end
# Open a batch operation and yield. Inserts and deletes will be queued until
@@ -223,7 +223,7 @@ def batch
private
# Extract and validate options.
- def params(column_family, args, options)
+ def params(row_set, args, options)
if args.last.is_a?(Hash)
if (extras = args.last.keys - options.keys).any?
this = "#{self.class}##{caller[0].split('`').last[0..-2]}"
@@ -232,9 +232,9 @@ def params(column_family, args, options)
options = options.merge(args.pop)
end
- column_family, column, sub_column = column_family.to_s, args[0], args[1]
- assert_column_name_classes(column_family, column, sub_column)
- [column_family, map_to_s(column), map_to_s(sub_column), options]
+ row_set, field, sub_field = row_set.to_s, args[0], args[1]
+ assert_field_classes(row_set, field, sub_field)
+ [row_set, map_to_s(field), map_to_s(sub_field), options]
end
# Convert stuff to strings.
@@ -266,8 +266,8 @@ def compact_mutations
# Do a nested hash merge
if mutation_class = mutations[m.class]
if mutation = mutation_class[m.key]
- if columns = mutation.cfmap[m.cfmap.keys.first]
- columns.concat(m.cfmap.values.first)
+ if fields = mutation.cfmap[m.cfmap.keys.first]
+ fields.concat(m.cfmap.values.first)
else
mutation.cfmap.merge!(m.cfmap)
end
View
@@ -1,86 +0,0 @@
-
-class Cassandra
- # A bunch of crap, mostly related to introspecting on column types
- module Columns #:nodoc:
- private
-
- def is_super(column_family)
- @is_super[column_family] ||= column_family_property(column_family, 'Type') == "Super"
- end
-
- def column_name_class(column_family)
- @column_name_class[column_family] ||= column_name_class_for_key(column_family, "CompareWith")
- end
-
- def sub_column_name_class(column_family)
- @sub_column_name_class[column_family] ||= column_name_class_for_key(column_family, "CompareSubcolumnsWith")
- end
-
- def column_name_class_for_key(column_family, comparator_key)
- property = column_family_property(column_family, comparator_key)
- property =~ /.*\.(.*?)$/
- case $1
- when "LongType" then Long
- when "LexicalUUIDType", "TimeUUIDType" then UUID
- else
- String # UTF8, Ascii, Bytes, anything else
- end
- end
-
- def column_family_property(column_family, key)
- @schema[column_family][key]
- rescue NoMethodError
- raise AccessError, "Invalid column family \"#{column_family}\""
- end
-
- def assert_column_name_classes(column_family, columns, sub_columns = nil)
- {Array(columns) => column_name_class(column_family),
- Array(sub_columns) => sub_column_name_class(column_family)}.each do |columns, klass|
- columns.each do |column|
- raise Comparable::TypeError, "Expected #{column.inspect} to be a #{klass}" if !column.is_a?(klass)
- end
- end
- end
-
- def columns_to_hash(column_family, columns)
- columns_to_hash_for_classes(columns, column_name_class(column_family), sub_column_name_class(column_family))
- end
-
- def sub_columns_to_hash(column_family, columns)
- columns_to_hash_for_classes(columns, sub_column_name_class(column_family))
- end
-
- def columns_to_hash_for_classes(columns, column_name_class, sub_column_name_class = nil)
- hash = OrderedHash.new
- Array(columns).each do |c|
- c = c.super_column || c.column if c.is_a?(CassandraThrift::ColumnOrSuperColumn)
- hash[column_name_class.new(c.name)] = case c
- when CassandraThrift::SuperColumn
- columns_to_hash_for_classes(c.columns, sub_column_name_class) # Pop the class stack, and recurse
- when CassandraThrift::Column
- c.value
- end
- end
- hash
- end
-
- def hash_to_columns(column_family, hash, timestamp)
- assert_column_name_classes(column_family, hash.keys)
- hash_to_columns_without_assertion(column_family, hash, timestamp)
- end
-
- def hash_to_columns_without_assertion(column_family, hash, timestamp)
- hash.map do |column, value|
- CassandraThrift::Column.new(:name => column.to_s, :value => value, :timestamp => timestamp)
- end
- end
-
- def hash_to_super_columns(column_family, hash, timestamp)
- assert_column_name_classes(column_family, hash.keys)
- hash.map do |column, sub_hash|
- assert_column_name_classes(column_family, nil, sub_hash.keys)
- CassandraThrift::SuperColumn.new(:name => column.to_s, :columns => hash_to_columns_without_assertion(column_family, sub_hash, timestamp))
- end
- end
- end
-end
Oops, something went wrong.

0 comments on commit 95532cf

Please sign in to comment.