diff --git a/lib/ruote/redis/storage.rb b/lib/ruote/redis/storage.rb index 1cfaa70..10f84cd 100644 --- a/lib/ruote/redis/storage.rb +++ b/lib/ruote/redis/storage.rb @@ -128,85 +128,108 @@ def put(doc, opts={}) # regular put - rev = doc['_rev'].to_i key = key_for(doc) - - current_rev = @redis.get(key).to_i - - return true if current_rev == 0 && rev > 0 - return do_get(doc, current_rev) if rev != current_rev - - nrev = @redis.incr(key).to_i - - # the setnx here is crucial in multiple workers env... - - r = @redis.setnx( - key_rev_for(doc, nrev), - to_json(doc.merge('_rev' => nrev), opts)) - - return get(doc['type'], doc['_id']) if r == false - - @redis.set(key, nrev) - @redis.del(key_rev_for(doc, rev)) if rev > 0 - - doc['_rev'] = nrev if opts[:update_rev] - - nil + rev = doc['_rev'] + + lock(key) do + + current_doc = do_get(key) + current_rev = current_doc ? current_doc['_rev'] : nil + + if current_rev && rev != current_rev + # + # version in storage is newer than version being put, + # (eturn version in storage) + # + current_doc + + elsif rev && current_rev.nil? + # + # document deleted, put fails (return true) + # + true + + else + # + # put is successful (return nil) + # + nrev = (rev.to_i + 1).to_s + @redis.set(key, to_json(doc.merge('_rev' => nrev))) + doc['_rev'] = nrev if opts[:update_rev] + + nil + end + end end def get(type, key) - return from_json(@redis.get(key_for(type, key))) if MAS.include?(type) - # 'copy' case - - do_get(type, key, @redis.get(key_for(type, key))) + do_get(key_for(type, key)) end def delete(doc) - raise ArgumentError.new( - "can't delete doc without _rev") unless doc['_rev'] + rev = doc['_rev'] - r = put(doc, :delete => true) + raise ArgumentError.new("can't delete doc without _rev") unless rev - return r if r != nil + key = key_for(doc) - @redis.keys_to_a("#{key_for(doc)}*").sort.each { |k| - Thread.pass # lingering a bit... - @redis.del(k) - } - # deleting the key_rev last and making 1 'keys' call preliminarily + lock(key) do - nil + current_doc = do_get(key) + + if current_doc.nil? + # + # document is [already] gone, delete fails (return true) + # + true + + elsif current_doc['_rev'] != rev + # + # version in storage doesn't match version to delete + # (return version in storage) + # + current_doc + + else + # + # delete is successful (return nil) + # + @redis.del(key) + + nil + end + end end def get_many(type, key=nil, opts={}) keys = key ? Array(key) : nil - ids = if type == 'msgs' || type == 'schedules' - - @redis.keys_to_a("#{type}/*") + #ids = if type == 'msgs' || type == 'schedules' + # @redis.keys_to_a("#{type}/*") - elsif keys == nil + ids = if keys == nil - @redis.keys_to_a("#{type}/*/*") + @redis.keys_to_a("#{type}/*") elsif keys.first.is_a?(String) - keys.collect { |k| @redis.keys_to_a("#{type}/*!#{k}/*") }.flatten + keys.collect { |k| @redis.keys_to_a("#{type}/*!#{k}") }.flatten else #if keys.first.is_a?(Regexp) - @redis.keys_to_a("#{type}/*/*").select { |i| + @redis.keys_to_a("#{type}/*").select { |i| - i = i[type.length + 1..i.rindex('/') - 1] - # removing "^type/" and "/\d+$" + i = i[type.length + 1..-1] + # removing "^type/" keys.find { |k| k.match(i) } } end + ids = ids.reject { |i| i.match(LOCK_KEY) } ids = ids.sort ids = ids.reverse if opts[:descending] @@ -228,13 +251,10 @@ def get_many(type, key=nil, opts={}) def ids(type) - @redis.keys_to_a("#{type}/*").inject([]) { |a, k| - - if m = k.match(/^[^\/]+\/([^\/]+)$/) - a << m[1] - end - - a + @redis.keys_to_a("#{type}/*").reject { |i| + i.match(LOCK_KEY) + }.collect { |i| + i.split('/').last }.sort end @@ -271,38 +291,51 @@ def purge_type!(type) protected - # key_for(doc) - # key_for(type, key) - # - def key_for(*args) + LOCK_KEY = /-lock$/ - a = args.first + def lock(key, &block) - (a.is_a?(Hash) ? [ a['type'], a['_id'] ] : args[0, 2]).join('/') + kl = "#{key}-lock" + + #p [ kl, :locking, Thread.current.object_id, Time.now.to_f ] + + #while @redis.setnx(kl, 'null') == false; sleep(0.007); end + loop do + r = @redis.setnx(kl, Time.now.to_f.to_s) + #p [ :setnx, r ] + if r == false + sleep 0.007 + else + break + end + end + + #p [ kl, :locked, Thread.current.object_id, Time.now.to_f ] + + #@redis.expire(kl, 2) + + result = block.call + + @redis.del(kl) + + #p [ kl, :unlocked, Thread.current.object_id, Time.now.to_f ] + + result end - # key_rev_for(doc) - # key_rev_for(doc, rev) - # key_rev_for(type, key, rev) + # key_for(doc) + # key_for(type, key) # - def key_rev_for(*args) + def key_for(*args) - as = nil a = args.first - if a.is_a?(Hash) - as = [ a['type'], a['_id'], a['_rev'] ] if a.is_a?(Hash) - as[2] = args[1] if args[1] - else - as = args[0, 3] - end - - as.join('/') + (a.is_a?(Hash) ? [ a['type'], a['_id'] ] : args[0, 2]).join('/') end - def do_get(*args) + def do_get(key) - from_json(@redis.get(key_rev_for(*args))) + from_json(@redis.get(key)) end def from_json(s) @@ -318,7 +351,7 @@ def to_json(doc, opts={}) # Don't put configuration if it's already in # - #(avoid storages from trashing configuration...) + # (prevent storages from trashing configuration...) # def put_configuration