forked from bmuller/txyam
/
client.py
181 lines (148 loc) · 5.72 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import cPickle
import zlib
from twisted.internet.defer import inlineCallbacks, DeferredList, returnValue
from twisted.internet import reactor
from twisted.python import failure, log
from txyam.utils import ketama, deferredDict
from txyam.factory import MemCacheClientFactory
class NoServerError(Exception):
"""
No available connected servers to accept command.
"""
class InvalidHostPortError(Exception):
"""
Invalid host/port specification.
"""
def _wrap(cmd):
"""
Used to wrap all of the memcache methods (get,set,getMultiple,etc).
"""
def unwrap(result):
return result[None]
def wrapper(self, key, *args, **kwargs):
client = self.getClient(key)
request = {client: (None, cmd, (key,) + args, kwargs)}
return self._issueRequest(request).addCallback(unwrap)
return wrapper
def _issueRequest(request):
"""
Issue a named request to some clients.
This is primarily for testing purposes, so that wrappers can build up a
request and the tests can inspect the request.
"""
ret = {}
for client, (resultKey, method, args, kwargs) in request.iteritems():
method = getattr(client, method)
ret[resultKey] = method(*args, **kwargs)
return deferredDict(ret)
class YamClient:
_issueRequest = staticmethod(_issueRequest)
def __init__(self, hosts, connect=True):
"""
@param hosts: A C{list} of C{tuple}s containing hosts and ports.
"""
self.hosts = hosts
if connect:
self.connect()
def getActiveConnections(self):
return [factory.client for factory in self.factories
if not factory.client is None]
def getClient(self, key):
hosts = self.getActiveConnections()
log.msg("Using %i active hosts" % len(hosts))
if len(hosts) == 0:
raise NoServerError("No connected servers remaining.")
return hosts[ketama(key) % len(hosts)]
@inlineCallbacks
def connect(self):
self.factories = []
for hp in self.hosts:
if isinstance(hp, tuple):
host, port = hp
elif isinstance(hp, str):
host = hp
port = 11211
else:
raise InvalidHostPortError(
"Connection info should be either hostnames or "
"host/port tuples")
factory = MemCacheClientFactory()
reactor.connectTCP(host, port, factory)
self.factories.append(factory)
# fire callback when all connections have been established
yield DeferredList([factory.deferred for factory in self.factories])
returnValue(self)
def disconnect(self):
log.msg("Disconnecting from all clients.")
for factory in self.factories:
factory.stopTrying()
for connection in self.getActiveConnections():
connection.transport.loseConnection()
def flushAll(self):
request = {}
for e, client in enumerate(self.getActiveConnections()):
request[client] = e, 'flushAll', (), {}
log.msg("Flushing %i hosts" % len(request))
def unwrap(result):
result = result.items()
result.sort()
return [(not isinstance(b, failure.Failure), b) for a, b in result]
return self._issueRequest(request).addCallback(unwrap)
def stats(self):
request = {}
for factory in self.factories:
if factory.client is None:
continue
hp = "%s:%i" % (factory.addr.host, factory.addr.port)
request[factory.client] = hp, 'stats', (), {}
log.msg("Getting stats on %i hosts" % len(request))
return self._issueRequest(request)
def version(self):
request = {}
for factory in self.factories:
if factory.client is None:
continue
hp = "%s:%i" % (factory.addr.host, factory.addr.port)
request[factory.client] = hp, 'version', (), {}
log.msg("Getting version on %i hosts" % len(request))
return self._issueRequest(request)
def pickle(self, value, compress):
p = cPickle.dumps(value, cPickle.HIGHEST_PROTOCOL)
if compress:
p = zlib.compress(p)
return p
def unpickle(self, value, uncompress):
if uncompress:
value = zlib.decompress(value)
return cPickle.loads(value)
def setPickled(self, key, value, **kwargs):
value = self.pickle(value, kwargs.pop('compress', False))
return self.set(key, value, **kwargs)
def addPickled(self, key, value, **kwargs):
value = self.pickle(value, kwargs.pop('compress', False))
return self.add(key, value, **kwargs)
def getPickled(self, key, **kwargs):
def handleResult(result, uncompress):
index = len(result) - 1
result = list(result)
if result[index] is not None:
result[index] = self.unpickle(result[index], uncompress)
return tuple(result)
uncompress = kwargs.pop('uncompress', False)
return self.get(key, **kwargs).addCallback(handleResult, uncompress)
# Following methods can be found at
# http://twistedmatrix.com/trac/browser/tags/releases/twisted-12.0.0/twisted/protocols/memcache.py
set = _wrap("set")
get = _wrap("get")
increment = _wrap("increment")
decrement = _wrap("decrement")
replace = _wrap("replace")
add = _wrap("add")
set = _wrap("set")
checkAndSet = _wrap("checkAndSet")
append = _wrap("append")
prepend = _wrap("prepend")
getMultiple = _wrap("getMultiple")
delete = _wrap("delete")
def ConnectedYamClient(hosts):
return YamClient(hosts, connect=False).connect()