Permalink
Browse files

Added concurrency to finding over shards.

  • Loading branch information...
treeder committed Mar 10, 2011
1 parent 7a75bce commit 58fb4b410d68c187b9d87717ed29467735e73888
Showing with 99 additions and 45 deletions.
  1. +13 −1 README.markdown
  2. +14 −13 Rakefile
  3. +31 −18 lib/simple_record/sharding.rb
  4. +1 −1 test/my_sharded_model.rb
  5. +2 −2 test/test_base.rb
  6. +38 −10 test/test_shards.rb
View
@@ -366,10 +366,22 @@ The :map function should return which shard name the object should be stored to.
When executing a find() operation, you can explicitly specify the shard(s) you'd like to find on. This is
particularly useful if you know in advance which shard the data will be in.
- MyClass.find(:all, :conditions=>....., :shard=>["CA", "FL"])
+ MyShardedClass.find(:all, :conditions=>....., :shard=>["CA", "FL"])
You can see some [example classes here](https://github.com/appoxy/simple_record/blob/master/test/my_sharded_model.rb).
+## Concurrency
+
+**Subject to change**
+
+This was brought on as a way to query across shards in parallel. Not being able to find a good generic concurrency library,
+I ended up rolling my own called [concur](https://github.com/appoxy/concur).
+
+ MyShardedClass.find(:all, :concurrent=>true)
+
+We may enable a global [Executor](https://github.com/appoxy/concur/blob/master/lib/executor.rb) so you can have a fixed
+thread pool across your app, but for now, it will fire up a thread per shard.
+
## Kudos
Special thanks to Garrett Cox for creating Activerecord2sdb which SimpleRecord is based on:
View
@@ -4,20 +4,21 @@ require 'rubygems'
require './lib/simple_record.rb'
begin
- require 'jeweler'
- Jeweler::Tasks.new do |gemspec|
- gemspec.name = "simple_record"
- gemspec.summary = "ActiveRecord like interface for Amazon SimpleDB. By http://www.appoxy.com"
- gemspec.email = "travis@appoxy.com"
- gemspec.homepage = "http://github.com/appoxy/simple_record/"
- gemspec.description = "ActiveRecord like interface for Amazon SimpleDB. Store, query, shard, etc. By http://www.appoxy.com"
- gemspec.authors = ["Travis Reeder", "Chad Arimura", "RightScale"]
- gemspec.files = FileList['lib/**/*.rb']
- gemspec.add_dependency 'aws'
- end
- Jeweler::GemcutterTasks.new
+ require 'jeweler'
+ Jeweler::Tasks.new do |gemspec|
+ gemspec.name = "simple_record"
+ gemspec.summary = "ActiveRecord like interface for Amazon SimpleDB. By http://www.appoxy.com"
+ gemspec.email = "travis@appoxy.com"
+ gemspec.homepage = "http://github.com/appoxy/simple_record/"
+ gemspec.description = "ActiveRecord like interface for Amazon SimpleDB. Store, query, shard, etc. By http://www.appoxy.com"
+ gemspec.authors = ["Travis Reeder", "Chad Arimura", "RightScale"]
+ gemspec.files = FileList['lib/**/*.rb']
+ gemspec.add_dependency 'aws'
+ gemspec.add_dependency 'concur'
+ end
+ Jeweler::GemcutterTasks.new
rescue LoadError
- puts "Jeweler not available. Install it with: sudo gem install technicalpickles-jeweler -s http://gems.github.com"
+ puts "Jeweler not available. Install it with: sudo gem install technicalpickles-jeweler -s http://gems.github.com"
end
# vim: syntax=Ruby
@@ -1,3 +1,5 @@
+require 'concur'
+
module SimpleRecord
module Sharding
@@ -26,7 +28,7 @@ def find_sharded(*params)
options = params.size > 1 ? params[1] : {}
if options[:shard] # User specified shard.
- shard = options[:shard]
+ shard = options[:shard]
domains = shard.is_a?(Array) ? (shard.collect { |x| prefix_shard_name(x) }) : [prefix_shard_name(shard)]
else
domains = sharded_domains
@@ -45,21 +47,32 @@ def find_sharded(*params)
end
end
+ # todo: should we have a global executor?
+ executor = options[:concurrent] ? Concur::Executor.new_multi_threaded_executor : Concur::Executor.new_single_threaded_executor
results = ShardedResults.new(params)
+ futures = []
domains.each do |d|
- p2 = params.dup
- op2 = options.dup
- op2[:from] = d
+ p2 = params.dup
+ op2 = options.dup
+ op2[:from] = d
op2[:shard_find] = true
- p2[1] = op2
- rs = find(*p2)
+ p2[1] = op2
+
+ futures << executor.execute do
+ puts 'executing... '
+ rs = find(*p2)
+ end
+ end
+ futures.each do |f|
+ puts 'getting future ' + f.inspect
if params.first == :first || single
- return rs if rs
+ return f.get if f.get
else
- results.add_results rs
+ results.add_results f.get
end
end
- puts 'results=' + results.inspect
+ executor.shutdown
+# puts 'results=' + results.inspect
if params.first == :first || single
# Then we found nothing by this point so return nil
return nil
@@ -81,7 +94,7 @@ def prefix_shard_name(s)
def sharded_domains
sharded_domains = []
- shard_names = shards
+ shard_names = shards
shard_names.each do |s|
sharded_domains << prefix_shard_name(s)
end
@@ -91,7 +104,7 @@ def sharded_domains
def sharded_domain
# puts 'getting sharded_domain'
- options = self.class.sharding_options
+ options = self.class.sharding_options
# val = self.send(options[:on])
# puts "val=" + val.inspect
# shards = options[:shards] # is user passed in static array of shards
@@ -107,8 +120,8 @@ class ShardedResults
include Enumerable
def initialize(params)
- @params = params
- @options = params.size > 1 ? params[1] : {}
+ @params = params
+ @options = params.size > 1 ? params[1] : {}
@results_arrays = []
end
@@ -146,9 +159,9 @@ def [](*i)
return element_at(index)
else
offset = i[0]
- rows = i[1]
- ret = []
- x = offset
+ rows = i[1]
+ ret = []
+ x = offset
while x < (offset+rows)
ret << element_at(x)
x+=1
@@ -268,9 +281,9 @@ def self.sdbm_hash(str, len=str.length)
# puts 'sdbm_hash ' + str.inspect
hash = 0
len.times { |i|
- c = str[i]
+ c = str[i]
# puts "c=" + c.class.name + "--" + c.inspect + " -- " + c.ord.inspect
- c = c.ord
+ c = c.ord
hash = c + (hash << 6) + (hash << 16) - hash
}
# puts "hash=" + hash.inspect
View
@@ -9,7 +9,7 @@ class MyShardedModel < SimpleRecord::Base
has_strings :name
def self.num_shards
- 4
+ 10
end
def self.my_shards
View
@@ -39,12 +39,12 @@ def reset_connection(options={})
SimpleRecord::Base.set_domain_prefix("simplerecord_tests_")
SimpleRecord.establish_connection(@config['amazon']['access_key'], @config['amazon']['secret_key'],
- {:connection_mode=>:single}.merge(options))
+ {:connection_mode=>:per_thread}.merge(options))
# Establish AWS connection directly
@@sdb = Aws::SdbInterface.new(@config['amazon']['access_key'], @config['amazon']['secret_key'],
- {:connection_mode => :single}.merge(options))
+ {:connection_mode => :per_thread}.merge(options))
end
View
@@ -12,8 +12,8 @@ class TestShards < TestBase
def setup
super
- delete_all MyShardedModel
- delete_all MyShardedByFieldModel
+# delete_all MyShardedModel
+# delete_all MyShardedByFieldModel
end
def teardown
@@ -25,6 +25,9 @@ def teardown
# be used to select the shard.
def test_id_sharding
+ puts 'test_id_sharding start'
+ ob_count = 1000
+
mm = MyShardedModel.new(:name=>"single")
mm.save
sleep 1
@@ -38,41 +41,66 @@ def test_id_sharding
mm3 = MyShardedModel.find(mm.id)
assert_nil mm3
- puts "saving 20 now"
+ puts "saving #{ob_count} now"
saved = []
- 20.times do |i|
+ ob_count.times do |i|
mm = MyShardedModel.new(:name=>"name #{i}")
mm.save
saved << mm
end
+ sleep 2
# todo: assert that we're actually sharding
- puts "finding them all"
+ puts "finding them all sequentially"
+ start_time = Time.now
found = []
- rs = MyShardedModel.find(:all)
+ rs = MyShardedModel.find(:all, :per_token=>2500)
rs.each do |m|
- p m
+# p m
found << m
end
+ duration = Time.now.to_f - start_time.to_f
+ puts 'Find sequential duration=' + duration.to_s
+ puts 'size=' + found.size.to_s
saved.each do |so|
assert(found.find { |m1| m1.id == so.id })
end
+
+ puts "Now let's try concurrently"
+ start_time = Time.now
+ found = []
+ rs = MyShardedModel.find(:all, :concurrent=>true, :per_token=>2500)
+ rs.each do |m|
+# p m
+ found << m
+ end
+ concurrent_duration = Time.now.to_f - start_time.to_f
+ puts 'Find concurrent duration=' + concurrent_duration .to_s
+ puts 'size=' + found.size.to_s
+ saved.each do |so|
+ assert(found.find { |m1| m1.id == so.id })
+ end
+
+ assert concurrent_duration < duration
+
puts "deleting all of them"
found.each do |fo|
fo.delete
end
+ sleep 2
+
puts "Now ensure that all are deleted"
rs = MyShardedModel.find(:all)
assert rs.size == 0
puts "Testing belongs_to sharding"
-
end
+
def test_field_sharding
states = MyShardedByFieldModel.shards
@@ -105,7 +133,7 @@ def test_field_sharding
puts "finding them all"
found = []
- rs = MyShardedByFieldModel.find(:all)
+ rs = MyShardedByFieldModel.find(:all)
rs.each do |m|
p m
found << m
@@ -124,7 +152,7 @@ def test_field_sharding
end
# Try to find on a specific known shard
- selects = SimpleRecord.stats.selects
+ selects = SimpleRecord.stats.selects
cali_models = MyShardedByFieldModel.find(:all, :shard => "CA")
puts 'cali_models=' + cali_models.inspect
assert_equal(5, cali_models.size)

0 comments on commit 58fb4b4

Please sign in to comment.