Skip to content

Commit

Permalink
Added concurrency to finding over shards.
Browse files Browse the repository at this point in the history
  • Loading branch information
treeder committed Mar 10, 2011
1 parent 7a75bce commit 58fb4b4
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 45 deletions.
14 changes: 13 additions & 1 deletion README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,22 @@ The :map function should return which shard name the object should be stored to.
When executing a find() operation, you can explicitly specify the shard(s) you'd like to find on. This is
particularly useful if you know in advance which shard the data will be in.

MyClass.find(:all, :conditions=>....., :shard=>["CA", "FL"])
MyShardedClass.find(:all, :conditions=>....., :shard=>["CA", "FL"])

You can see some [example classes here](https://github.com/appoxy/simple_record/blob/master/test/my_sharded_model.rb).

## Concurrency

**Subject to change**

This was brought on as a way to query across shards in parallel. Not being able to find a good generic concurrency library,
I ended up rolling my own called [concur](https://github.com/appoxy/concur).

MyShardedClass.find(:all, :concurrent=>true)

We may enable a global [Executor](https://github.com/appoxy/concur/blob/master/lib/executor.rb) so you can have a fixed
thread pool across your app, but for now, it will fire up a thread per shard.

## Kudos

Special thanks to Garrett Cox for creating Activerecord2sdb which SimpleRecord is based on:
Expand Down
27 changes: 14 additions & 13 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,21 @@ require 'rubygems'
require './lib/simple_record.rb'

begin
require 'jeweler'
Jeweler::Tasks.new do |gemspec|
gemspec.name = "simple_record"
gemspec.summary = "ActiveRecord like interface for Amazon SimpleDB. By http://www.appoxy.com"
gemspec.email = "travis@appoxy.com"
gemspec.homepage = "http://github.com/appoxy/simple_record/"
gemspec.description = "ActiveRecord like interface for Amazon SimpleDB. Store, query, shard, etc. By http://www.appoxy.com"
gemspec.authors = ["Travis Reeder", "Chad Arimura", "RightScale"]
gemspec.files = FileList['lib/**/*.rb']
gemspec.add_dependency 'aws'
end
Jeweler::GemcutterTasks.new
require 'jeweler'
Jeweler::Tasks.new do |gemspec|
gemspec.name = "simple_record"
gemspec.summary = "ActiveRecord like interface for Amazon SimpleDB. By http://www.appoxy.com"
gemspec.email = "travis@appoxy.com"
gemspec.homepage = "http://github.com/appoxy/simple_record/"
gemspec.description = "ActiveRecord like interface for Amazon SimpleDB. Store, query, shard, etc. By http://www.appoxy.com"
gemspec.authors = ["Travis Reeder", "Chad Arimura", "RightScale"]
gemspec.files = FileList['lib/**/*.rb']
gemspec.add_dependency 'aws'
gemspec.add_dependency 'concur'
end
Jeweler::GemcutterTasks.new
rescue LoadError
puts "Jeweler not available. Install it with: sudo gem install technicalpickles-jeweler -s http://gems.github.com"
puts "Jeweler not available. Install it with: sudo gem install technicalpickles-jeweler -s http://gems.github.com"
end

# vim: syntax=Ruby
49 changes: 31 additions & 18 deletions lib/simple_record/sharding.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'concur'

module SimpleRecord

module Sharding
Expand Down Expand Up @@ -26,7 +28,7 @@ def find_sharded(*params)
options = params.size > 1 ? params[1] : {}

if options[:shard] # User specified shard.
shard = options[:shard]
shard = options[:shard]
domains = shard.is_a?(Array) ? (shard.collect { |x| prefix_shard_name(x) }) : [prefix_shard_name(shard)]
else
domains = sharded_domains
Expand All @@ -45,21 +47,32 @@ def find_sharded(*params)
end
end

# todo: should we have a global executor?
executor = options[:concurrent] ? Concur::Executor.new_multi_threaded_executor : Concur::Executor.new_single_threaded_executor
results = ShardedResults.new(params)
futures = []
domains.each do |d|
p2 = params.dup
op2 = options.dup
op2[:from] = d
p2 = params.dup
op2 = options.dup
op2[:from] = d
op2[:shard_find] = true
p2[1] = op2
rs = find(*p2)
p2[1] = op2

futures << executor.execute do
puts 'executing... '
rs = find(*p2)
end
end
futures.each do |f|
puts 'getting future ' + f.inspect
if params.first == :first || single
return rs if rs
return f.get if f.get
else
results.add_results rs
results.add_results f.get
end
end
puts 'results=' + results.inspect
executor.shutdown
# puts 'results=' + results.inspect
if params.first == :first || single
# Then we found nothing by this point so return nil
return nil
Expand All @@ -81,7 +94,7 @@ def prefix_shard_name(s)

def sharded_domains
sharded_domains = []
shard_names = shards
shard_names = shards
shard_names.each do |s|
sharded_domains << prefix_shard_name(s)
end
Expand All @@ -91,7 +104,7 @@ def sharded_domains

def sharded_domain
# puts 'getting sharded_domain'
options = self.class.sharding_options
options = self.class.sharding_options
# val = self.send(options[:on])
# puts "val=" + val.inspect
# shards = options[:shards] # is user passed in static array of shards
Expand All @@ -107,8 +120,8 @@ class ShardedResults
include Enumerable

def initialize(params)
@params = params
@options = params.size > 1 ? params[1] : {}
@params = params
@options = params.size > 1 ? params[1] : {}
@results_arrays = []
end

Expand Down Expand Up @@ -146,9 +159,9 @@ def [](*i)
return element_at(index)
else
offset = i[0]
rows = i[1]
ret = []
x = offset
rows = i[1]
ret = []
x = offset
while x < (offset+rows)
ret << element_at(x)
x+=1
Expand Down Expand Up @@ -268,9 +281,9 @@ def self.sdbm_hash(str, len=str.length)
# puts 'sdbm_hash ' + str.inspect
hash = 0
len.times { |i|
c = str[i]
c = str[i]
# puts "c=" + c.class.name + "--" + c.inspect + " -- " + c.ord.inspect
c = c.ord
c = c.ord
hash = c + (hash << 6) + (hash << 16) - hash
}
# puts "hash=" + hash.inspect
Expand Down
2 changes: 1 addition & 1 deletion test/my_sharded_model.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class MyShardedModel < SimpleRecord::Base
has_strings :name

def self.num_shards
4
10
end

def self.my_shards
Expand Down
4 changes: 2 additions & 2 deletions test/test_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ def reset_connection(options={})

SimpleRecord::Base.set_domain_prefix("simplerecord_tests_")
SimpleRecord.establish_connection(@config['amazon']['access_key'], @config['amazon']['secret_key'],
{:connection_mode=>:single}.merge(options))
{:connection_mode=>:per_thread}.merge(options))


# Establish AWS connection directly
@@sdb = Aws::SdbInterface.new(@config['amazon']['access_key'], @config['amazon']['secret_key'],
{:connection_mode => :single}.merge(options))
{:connection_mode => :per_thread}.merge(options))

end

Expand Down
48 changes: 38 additions & 10 deletions test/test_shards.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ class TestShards < TestBase

def setup
super
delete_all MyShardedModel
delete_all MyShardedByFieldModel
# delete_all MyShardedModel
# delete_all MyShardedByFieldModel
end

def teardown
Expand All @@ -25,6 +25,9 @@ def teardown
# be used to select the shard.
def test_id_sharding

puts 'test_id_sharding start'
ob_count = 1000

mm = MyShardedModel.new(:name=>"single")
mm.save
sleep 1
Expand All @@ -38,41 +41,66 @@ def test_id_sharding
mm3 = MyShardedModel.find(mm.id)
assert_nil mm3

puts "saving 20 now"
puts "saving #{ob_count} now"
saved = []
20.times do |i|
ob_count.times do |i|
mm = MyShardedModel.new(:name=>"name #{i}")
mm.save
saved << mm
end
sleep 2

# todo: assert that we're actually sharding

puts "finding them all"
puts "finding them all sequentially"
start_time = Time.now
found = []
rs = MyShardedModel.find(:all)
rs = MyShardedModel.find(:all, :per_token=>2500)
rs.each do |m|
p m
# p m
found << m
end
duration = Time.now.to_f - start_time.to_f
puts 'Find sequential duration=' + duration.to_s
puts 'size=' + found.size.to_s
saved.each do |so|
assert(found.find { |m1| m1.id == so.id })
end


puts "Now let's try concurrently"
start_time = Time.now
found = []
rs = MyShardedModel.find(:all, :concurrent=>true, :per_token=>2500)
rs.each do |m|
# p m
found << m
end
concurrent_duration = Time.now.to_f - start_time.to_f
puts 'Find concurrent duration=' + concurrent_duration .to_s
puts 'size=' + found.size.to_s
saved.each do |so|
assert(found.find { |m1| m1.id == so.id })
end

assert concurrent_duration < duration

puts "deleting all of them"
found.each do |fo|
fo.delete
end

sleep 2

puts "Now ensure that all are deleted"
rs = MyShardedModel.find(:all)
assert rs.size == 0

puts "Testing belongs_to sharding"


end


def test_field_sharding

states = MyShardedByFieldModel.shards
Expand Down Expand Up @@ -105,7 +133,7 @@ def test_field_sharding

puts "finding them all"
found = []
rs = MyShardedByFieldModel.find(:all)
rs = MyShardedByFieldModel.find(:all)
rs.each do |m|
p m
found << m
Expand All @@ -124,7 +152,7 @@ def test_field_sharding
end

# Try to find on a specific known shard
selects = SimpleRecord.stats.selects
selects = SimpleRecord.stats.selects
cali_models = MyShardedByFieldModel.find(:all, :shard => "CA")
puts 'cali_models=' + cali_models.inspect
assert_equal(5, cali_models.size)
Expand Down

0 comments on commit 58fb4b4

Please sign in to comment.