-
Notifications
You must be signed in to change notification settings - Fork 33
/
memcache_queue_client.rb
82 lines (71 loc) · 2.69 KB
/
memcache_queue_client.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
require 'workling/clients/base'
#
# This client can be used for all Queue Servers that speak Memcached, such as Starling.
#
# Wrapper for the memcache connection. The connection is made using fiveruns-memcache-client,
# or memcache-client, if this is not available. See the README for a discussion of the memcache
# clients.
#
# method_missing delegates all messages through to the underlying memcache connection.
#
module Workling
module Clients
class MemcacheQueueClient < Workling::Clients::Base
# the class with which the connection is instantiated
cattr_accessor :memcache_client_class
@@memcache_client_class ||= ::MemCache
# the url with which the memcache client expects to reach starling
attr_accessor :queueserver_urls
# the memcache connection object
attr_accessor :connection
#
# the client attempts to connect to queueserver using the configuration options found in
#
# Workling.config. this can be configured in config/workling.yml.
#
# the initialization code will raise an exception if memcache-client cannot connect
# to queueserver.
#
def connect
@queueserver_urls = Workling.config[:listens_on].split(',').map { |url| url ? url.strip : url }
options = [@queueserver_urls, Workling.config[:memcache_options]].compact
self.connection = MemcacheQueueClient.memcache_client_class.new(*options)
raise_unless_connected!
end
# closes the memcache connection
def close
self.connection.flush_all
self.connection.reset
end
# implements the client job request and retrieval
def request(key, value)
set(key, value)
end
def retrieve(key)
begin
get(key)
rescue MemCache::MemCacheError => e
# failed to enqueue, raise a workling error so that it propagates upwards
raise Workling::WorklingError.new("#{e.class.to_s} - #{e.message}")
end
end
private
# make sure we can actually connect to queueserver on the given port
def raise_unless_connected!
begin
self.connection.stats
rescue
raise Workling::QueueserverNotFoundError.new
end
end
# delegates directly through to the memcache connection.
def method_missing(method, *args)
begin
self.connection.send(method, *args)
rescue MemCache::MemCacheError => e
raise Workling::WorklingConnectionError.new("#{e.class.to_s} - #{e.message}")
end
end
end
end
end