Permalink
Browse files

Re-factor table vs. column family interface per discussion with jbellis.

  • Loading branch information...
1 parent 0a333b9 commit 8774ecfb68063797f335dd25850a6633a3dd40bf Evan Weaver committed Jul 6, 2009
View
@@ -1,2 +1,4 @@
+v0.2. Re-factor table vs. column family interface per discussion with jbellis.
+
v0.1. First release.
View
@@ -37,25 +37,21 @@ Require the library:
require 'cassandra_client'
-Connect to a server:
+Connect to a server and keyspace:
- client = CassandraClient.new("127.0.0.1")
+ client = CassandraClient.new('Twitter', "127.0.0.1")
-Get a keyspace:
-
- users = client.table('Users')
-
Insert into a column family. You can insert a CassandraClient::OrderedHash, or a regular Hash, if order doesn't matter:
- users.insert("5", :row, {'screen_name' => "buttonscat"})
+ client.insert(:Users, "5", {'screen_name' => "buttonscat"})
Insert into a super column family:
- users.insert("5", :relationships, {"user_timeline" => {"1" => ""}})
+ client.insert(:UserRelationships, "5", {"user_timeline" => {"1" => ""}})
Query a super column:
- timeline = users.get("5", :relationships, "user_timeline")
+ timeline = client.get(:UserRelationships, "5", "user_timeline")
The returned result will always be a CassandraClient::OrderedHash.
View
@@ -20,7 +20,7 @@
<!--======================================================================-->
<!-- Basic Configuration -->
<!--======================================================================-->
- <ClusterName>Up and Running</ClusterName>
+ <ClusterName>Test</ClusterName>
<!-- Tables and ColumnFamilies
Think of a table as a namespace, not a relational table.
@@ -29,22 +29,19 @@
There is an implicit table named 'system' for Cassandra internals.
-->
<Tables>
- <Table Name="Users">
- <ColumnFamily ColumnSort="Name" Name="row" />
- <ColumnFamily ColumnSort="Name" Name="audit" />
- <ColumnFamily ColumnType="Super" ColumnSort="Name" Name="relationships" />
- <ColumnFamily ColumnSort="Time" Name="usernames" />
- </Table>
-
- <Table Name="Statuses">
- <ColumnFamily ColumnSort="Time" Name="row" />
- <ColumnFamily ColumnSort="Name" Name="audit" />
- <ColumnFamily ColumnType="Super" ColumnSort="Name" Name="relationships" />
+ <Table Name="Twitter">
+ <ColumnFamily ColumnSort="Name" Name="Users" />
+ <ColumnFamily ColumnSort="Name" Name="UserAudits" />
+ <ColumnFamily ColumnType="Super" ColumnSort="Name" Name="UsersRelationships" />
+ <ColumnFamily ColumnSort="Time" Name="Usernames" />
+ <ColumnFamily ColumnSort="Time" Name="Statuses" />
+ <ColumnFamily ColumnSort="Name" Name="StatusAudits" />
+ <ColumnFamily ColumnType="Super" ColumnSort="Name" Name="StatusRelationships" />
</Table>
<Table Name="Blogs">
- <ColumnFamily ColumnSort="Time" Name="posts"/>
- <ColumnFamily ColumnSort="Time" Name="comments"/>
+ <ColumnFamily ColumnSort="Time" Name="Posts"/>
+ <ColumnFamily ColumnSort="Time" Name="Comments"/>
</Table>
</Tables>
@@ -5,10 +5,11 @@
HERE = File.expand_path(File.dirname(__FILE__))
-require "#{HERE}/cassandra_client/client"
-require "#{HERE}/cassandra_client/table"
+require "#{HERE}/cassandra_client/helper"
+require "#{HERE}/cassandra_client/safe_client"
require "#{HERE}/cassandra_client/serialization"
require "#{HERE}/cassandra_client/ordered_hash"
+require "#{HERE}/cassandra_client/cassandra_client"
$LOAD_PATH << "#{HERE}/../vendor/gen-rb"
require 'cassandra'
@@ -0,0 +1,170 @@
+class CassandraClient
+ include Helper
+ class AccessError < StandardError; end
+
+ MAX_INT = 2**31 - 1
+
+ attr_reader :keyspace, :host, :port, :quorum, :serialization, :transport, :client, :schema
+
+ # Instantiate a new CassandraClient and open the connection.
+ def initialize(keyspace, host = '127.0.0.1', port = 9160, quorum = 1, serialization = CassandraClient::Serialization::JSON)
+ @keyspace = keyspace
+ @host = host
+ @port = port
+ @quorum = quorum
+ @serialization = serialization
+
+ extend(@serialization)
+
+ @transport = Thrift::BufferedTransport.new(Thrift::Socket.new(@host, @port))
+ @transport.open
+ @client = Cassandra::SafeClient.new(
+ Cassandra::Client.new(Thrift::BinaryProtocol.new(@transport)),
+ @transport)
+
+ keyspaces = @client.getStringListProperty("tables")
+ unless keyspaces.include?(@keyspace)
+ raise AccessError, "Keyspace #{@keyspace.inspect} not found. Available: #{keyspaces.inspect}"
+ end
+
+ @schema = @client.describeTable(@keyspace)
+ end
+
+ def inspect
+ "#<CassandraClient:#{object_id}, @keyspace=#{keyspace.inspect}, @schema={
+ #{schema.map {|name, hash| ":#{name} => #{hash['type'].inspect}"}.join(', ')}
+ }, @host=#{host.inspect}, @port=#{port}, @quorum=#{quorum}, @serialization=#{serialization.name}>"
+ end
+
+ ## Write
+
+ # Insert a row for a key. Pass a flat hash for a regular column family, and
+ # a nested hash for a super column family.
+ def insert(column_family, key, hash, timestamp = now)
+ column_family = column_family.to_s
+ insert = is_super(column_family) ? :insert_super : :insert_standard
+ send(insert, column_family, key, hash, timestamp)
+ end
+
+ private
+
+ def insert_standard(column_family, key, hash, timestamp = now)
+ mutation = Batch_mutation_t.new(
+ :table => @keyspace,
+ :key => key,
+ :cfmap => {column_family => hash_to_columns(hash, timestamp)})
+ @client.batch_insert(mutation, @quorum)
+ end
+
+ def insert_super(column_family, key, hash, timestamp = now)
+ mutation = Batch_mutation_super_t.new(
+ :table => @keyspace,
+ :key => key,
+ :cfmap => {column_family => hash_to_super_columns(hash, timestamp)})
+ @client.batch_insert_superColumn(mutation, @quorum)
+ end
+
+ public
+
+ ## Delete
+
+ # Remove the element at the column_family:key:super_column:column
+ # path you request.
+ def remove(column_family, key, super_column = nil, column = nil, timestamp = now)
+ column_family = column_family.to_s
+ column_family += ":#{super_column}" if super_column
+ column_family += ":#{column}" if column
+ @client.remove(@keyspace, key, column_family, timestamp, @quorum)
+ end
+
+ # Remove all rows in the column family you request.
+ def clear_column_family!(column_family)
+ get_key_range(column_family).each do |key|
+ remove(column_family, key)
+ end
+ end
+
+ # Remove all rows in the keyspace
+ def clear_keyspace!
+ @schema.keys.each do |column_family|
+ clear_column_family!(column_family)
+ end
+ end
+
+ ## Read
+
+ # Count the elements at the column_family:key:super_column path you
+ # request.
+ def count_columns(column_family, key, super_column = nil)
+ column_family = column_family.to_s
+ column_family += ":#{super_column}" if super_column
+ @client.get_column_count(@keyspace, key, column_family)
+ end
+
+ # Return a list of single values for the elements at the
+ # column_family:key:super_column:column path you request.
+ def get_columns(column_family, key, super_columns, columns = nil)
+ column_family = column_family.to_s
+ get_slice_by_names = (is_super(column_family) && !columns) ? :get_slice_super_by_names : :get_slice_by_names
+ if super_columns and columns
+ column_family += ":#{super_columns}"
+ columns = Array(columns)
+ else
+ columns = Array(super_columns)
+ end
+
+ hash = columns_to_hash(@client.send(get_slice_by_names, @keyspace, key, column_family, columns))
+ columns.map { |column| hash[column] }
+ end
+
+ # Return a hash (actually, a CassandraClient::OrderedHash) or a single value
+ # representing the element at the column_family:key:super_column:column
+ # path you request.
+ def get(column_family, key, super_column = nil, column = nil, offset = -1, limit = 100)
+ column_family = column_family.to_s
+ column_family += ":#{super_column}" if super_column
+ column_family += ":#{column}" if column
+
+ # You have got to be kidding
+ if is_super(column_family)
+ if column
+ load(@client.get_column(@keyspace, key, column_family).value)
+ elsif super_column
+ columns_to_hash(@client.get_superColumn(@keyspace, key, column_family).columns)
+ else
+ columns_to_hash(@client.get_slice_super(@keyspace, key, "#{column_family}:", offset, limit))
+ end
+ else
+ if super_column
+ load(@client.get_column(@keyspace, key, column_family).value)
+ elsif is_sorted_by_time(column_family)
+ result = columns_to_hash(@client.get_columns_since(@keyspace, key, column_family, 0))
+
+ # FIXME Hack until get_slice on a time-sorted column family works again
+ result = OrderedHash[*flatten_once(result.to_a[offset, limit])] if offset > -1
+ result
+ else
+ columns_to_hash(@client.get_slice(@keyspace, key, "#{column_family}:", offset, limit))
+ end
+ end
+ rescue NotFoundException
+ is_super(column_family) && !column ? {} : nil
+ end
+
+ # FIXME
+ # def get_recent(column_family, key, super_column = nil, column = nil, timestamp = 0)
+ # end
+
+ # Return a list of keys in the column_family you request. Requires the
+ # table to be partitioned with OrderPreservingHash.
+ def get_key_range(column_family, key_range = ''..'', limit = 100)
+ column_families = Array(column_family).map {|c| c.to_s}
+ @client.get_key_range(@keyspace, column_families, key_range.begin, key_range.end, limit)
+ end
+
+ # Count all rows in the column_family you request. Requires the table
+ # to be partitioned with OrderPreservingHash.
+ def count(column_family, key_range = ''..'', limit = MAX_INT)
+ get_key_range(column_family, key_range, limit).size
+ end
+end
@@ -1,65 +0,0 @@
-class CassandraClient
- attr_reader :client, :transport, :tables, :host, :port, :block_for, :serialization
-
- class AccessError < StandardError; end
-
- # Instantiate a new CassandraClient and open the connection.
- def initialize(host = '127.0.0.1', port = 9160, block_for = 1, serialization = CassandraClient::Serialization::JSON)
- @host = host
- @port = port
- @serialization = serialization
- @block_for = block_for
-
- @transport = Thrift::BufferedTransport.new(Thrift::Socket.new(@host, @port))
- @transport.open
-
- @client = SafeClient.new(
- Cassandra::Client.new(Thrift::BinaryProtocol.new(@transport)),
- @transport)
-
- @tables = @client.getStringListProperty("tables").map do |table_name|
- ::CassandraClient::Table.new(table_name, self)
- end
- end
-
- def inspect(full = true)
- string = "#<CassandraClient:#{object_id}, @host=#{host.inspect}, @port=#{@port.inspect}"
- string += ", @block_for=#{block_for.inspect}, @tables=[#{tables.map {|t| t.inspect(false) }.join(', ')}]" if full
- string + ">"
- end
-
- # Return the CassandraClient::Table instance for the table_name you
- # request. You can get an array of all available tables with the #tables
- # method.
- def table(table_name)
- table = @tables.detect {|table| table.name == table_name }
- raise AccessError, "No such table #{table_name.inspect}" unless table
- table
- end
-
- # Remove all rows in all column families in all tables.
- def remove_all
- tables.each do |table|
- table.schema.keys.each do |column_family|
- table.remove_all(column_family)
- end
- end
- end
-
- class SafeClient
- def initialize(client, transport)
- @client = client
- @transport = transport
- end
-
- def method_missing(*args)
- @client.send(*args)
- rescue IOError
- @transport.open
- raise if defined?(once)
- once = true
- retry
- end
- end
-
-end
@@ -0,0 +1,57 @@
+class CassandraClient
+ module Helper
+
+ private
+
+ def is_super(column_family)
+ column_family_property(column_family, 'type') == 'Super'
+ end
+
+ def is_sorted_by_time(column_family)
+ column_family_property(column_family, 'sort') == 'Time'
+ end
+
+ def column_family_property(column_family_or_path, key)
+ column_family = column_family_or_path.to_s.split(':').first
+ @schema[column_family][key]
+ rescue NoMethodError
+ raise AccessError, "Invalid column family \":#{column_family}\""
+ end
+
+ def columns_to_hash(columns)
+ hash = ::CassandraClient::OrderedHash.new
+ Array(columns).each do |c|
+ if c.is_a?(SuperColumn_t)
+ hash[c.name] = columns_to_hash(c.columns)
+ else
+ hash[c.columnName] = load(c.value)
+ end
+ end
+ hash
+ end
+
+ def hash_to_columns(hash, timestamp)
+ hash.map do |column, value|
+ Column_t.new(:columnName => column, :value => dump(value), :timestamp => timestamp)
+ end
+ end
+
+ def hash_to_super_columns(hash, timestamp)
+ hash.map do |super_column, columns|
+ SuperColumn_t.new(:name => super_column, :columns => hash_to_columns(columns, timestamp))
+ end
+ end
+
+ def time_in_microseconds
+ time = Time.now
+ time.to_i * 1_000_000 + time.usec
+ end
+ alias :now :time_in_microseconds
+
+ def flatten_once(array)
+ result = []
+ array.each { |el| result.concat(el) }
+ result
+ end
+ end
+end
Oops, something went wrong.

0 comments on commit 8774ecf

Please sign in to comment.