public
Description: A memcache message queue
Homepage: http://coderrr.wordpress.com/2009/01/17/memcachequeue-a-pure-memcached-queue/
Clone URL: git://github.com/coderrr/memcache_queue.git
memcache_queue / lib / memcache_queue.rb
100644 113 lines (96 sloc) 3.03 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
require 'rubygems'
require 'memcache'
 
# It's your job to catch any signals relating to process termination
# (INT, TERM, etc.) and call the shutdown method on each worker. If
# this isn't done you run the risk of losing messages.
class MemcacheQueue
  class ShutdownError < StandardError; end
 
  attr_accessor :add_timeout
 
  def initialize(name, *args)
    @name = name or raise ArgumentError, "Must specify a unique name for each worker"
    @client = MemCache.new *args
    @first_key = state(:get, name)
    @add_timeout = 10
  end
 
  def create_queue
    @client.add('latest_added', '0', 0, true)
    @client.add('latest_read', '0', 0, true)
  end
 
  # ! lost message potential !
  # If it takes more than add_timeout seconds from the time incr
  # returns to the time the message is add()ed to memcache the message _could_
  # be lost.
  def add_msg(msg)
    raise ShutdownError if @shutdown
 
    begin
      latest_msg ||= @client.incr('latest_added')
      @client.add(latest_msg.to_s, msg)
      @client.set("added_#{latest_msg}", true)
    rescue Exception
      warn "Error on add: #{$!.inspect}, retrying"
      sleep 1
      retry
    end
  end
 
  # ! lost message potential !
  # If we die between incr and state(:set) the message _will_ be
  # lost. This would have to be due to hard process kill or other like events.
  def get_msg
    raise ShutdownError if @shutdown
 
    if key = @first_key
      @first_key = nil
    else
      key = @client.incr('latest_read').to_s
    end
 
    start_time = Time.now
    loop do
      begin
        if msg = @client.get(key)
          begin
            @client.delete(key)
          rescue Exception
            warn "Error while deleting key #{$!.inspect}"
          end
          return msg
        end
      rescue Exception
        warn "Error on get, #{$!.inspect}, retrying"
      end
      state(:set, key) unless @state_set
      raise ShutdownError if @shutdown
      return nil if (Time.now - start_time) > @add_timeout and failed_add?(key)
      sleep 1
      start_time = Time.now if msgs_left(true) < 0
    end
  ensure
    state(:delete, key) if @state_set and ! @shutdown
  end
 
  def msgs_left(neg_value = false)
    latest_added = @client.get('latest_added', true).to_i
    latest_read = @client.get('latest_read', true).to_i
 
    # check for 64bit boundary crossing
    if latest_added > 2**63 and latest_read < 2**62
      latest_read += 2**64
    elsif latest_read > 2**63 and latest_added < 2**62
      latest_added += 2**64
    end
 
    diff = latest_added - latest_read
    neg_value ? diff : [0, diff].max
  end
 
  def shutdown
    @shutdown = true
  end
 
  private
 
  def failed_add?(key)
    msgs_left(true) >= 0 and ! @client.get("added_#{key}")
  end
 
  def state(action, key = nil)
    return @client.get("state_#@name") if action == :get
    @client.send(action, "state_#@name", key)
    @state_set = (action == :set)
  rescue Exception
    warn "Error on #{action} state, #{$!.inspect}, retrying"
    sleep 1
    retry
  end
end