Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Newer
Older
100755 443 lines (346 sloc) 17.159 kB
c3756a8 @bcui6611 first cut of design
authored
1 #!/usr/bin/env python
2 """
3 Binary memcached test client.
4
5 Copyright (c) 2007 Dustin Sallings <dustin@spy.net>
6 """
7
8 import sys
9 import time
10 import hmac
11 import socket
12 import random
13 import struct
14 import exceptions
15
16 from memcacheConstants import REQ_MAGIC_BYTE, RES_MAGIC_BYTE
17 from memcacheConstants import REQ_PKT_FMT, RES_PKT_FMT, MIN_RECV_PACKET
18 from memcacheConstants import SET_PKT_FMT, DEL_PKT_FMT, INCRDECR_RES_FMT
19 from memcacheConstants import TOUCH_PKT_FMT, GAT_PKT_FMT, GETL_PKT_FMT
20 import memcacheConstants
21
22 class MemcachedError(exceptions.Exception):
23 """Error raised when a command fails."""
24
25 def __init__(self, status, msg):
26 supermsg='Memcached error #' + `status`
27 if msg: supermsg += ": " + msg
28 exceptions.Exception.__init__(self, supermsg)
29
30 self.status=status
31 self.msg=msg
32
33 def __repr__(self):
34 return "<MemcachedError #%d ``%s''>" % (self.status, self.msg)
35
36 class MemcachedClient(object):
37 """Simple memcached client."""
38
39 vbucketId = 0
40
41 def __init__(self, host='127.0.0.1', port=11211):
42 self.host = host
43 self.port = port
44 self.s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
45 self.s.connect_ex((host, port))
46 self.r=random.Random()
47
48 def close(self):
49 self.s.close()
50
51 def __del__(self):
52 self.close()
53
54 def _sendCmd(self, cmd, key, val, opaque, extraHeader='', cas=0):
55 self._sendMsg(cmd, key, val, opaque, extraHeader=extraHeader, cas=cas,
56 vbucketId=self.vbucketId)
57
58 def _sendMsg(self, cmd, key, val, opaque, extraHeader='', cas=0,
59 dtype=0, vbucketId=0,
60 fmt=REQ_PKT_FMT, magic=REQ_MAGIC_BYTE):
61 msg=struct.pack(fmt, magic,
62 cmd, len(key), len(extraHeader), dtype, vbucketId,
63 len(key) + len(extraHeader) + len(val), opaque, cas)
64 self.s.send(msg + extraHeader + key + val)
65
66 def _recvMsg(self):
67 response = ""
68 while len(response) < MIN_RECV_PACKET:
69 data = self.s.recv(MIN_RECV_PACKET - len(response))
70 if data == '':
71 raise exceptions.EOFError("Got empty data (remote died?).")
72 response += data
73 assert len(response) == MIN_RECV_PACKET
74 magic, cmd, keylen, extralen, dtype, errcode, remaining, opaque, cas=\
75 struct.unpack(RES_PKT_FMT, response)
76
77 rv = ""
78 while remaining > 0:
79 data = self.s.recv(remaining)
80 if data == '':
81 raise exceptions.EOFError("Got empty data (remote died?).")
82 rv += data
83 remaining -= len(data)
84
85 assert (magic in (RES_MAGIC_BYTE, REQ_MAGIC_BYTE)), "Got magic: %d" % magic
86 return cmd, errcode, opaque, cas, keylen, extralen, rv
87
88 def _handleKeyedResponse(self, myopaque):
89 cmd, errcode, opaque, cas, keylen, extralen, rv = self._recvMsg()
90 assert myopaque is None or opaque == myopaque, \
91 "expected opaque %x, got %x" % (myopaque, opaque)
92 if errcode != 0:
93 raise MemcachedError(errcode, rv)
94 return cmd, opaque, cas, keylen, extralen, rv
95
96 def _handleSingleResponse(self, myopaque):
97 cmd, opaque, cas, keylen, extralen, data = self._handleKeyedResponse(myopaque)
98 return opaque, cas, data
99
100 def _doCmd(self, cmd, key, val, extraHeader='', cas=0):
101 """Send a command and await its response."""
102 opaque=self.r.randint(0, 2**32)
103 self._sendCmd(cmd, key, val, opaque, extraHeader, cas)
104 return self._handleSingleResponse(opaque)
105
106 def _mutate(self, cmd, key, exp, flags, cas, val):
107 return self._doCmd(cmd, key, val, struct.pack(SET_PKT_FMT, flags, exp),
108 cas)
109
110 def _cat(self, cmd, key, cas, val):
111 return self._doCmd(cmd, key, val, '', cas)
112
113 def append(self, key, value, cas=0):
114 return self._cat(memcacheConstants.CMD_APPEND, key, cas, value)
115
116 def prepend(self, key, value, cas=0):
117 return self._cat(memcacheConstants.CMD_PREPEND, key, cas, value)
118
119 def __incrdecr(self, cmd, key, amt, init, exp):
120 something, cas, val=self._doCmd(cmd, key, '',
121 struct.pack(memcacheConstants.INCRDECR_PKT_FMT, amt, init, exp))
122 return struct.unpack(INCRDECR_RES_FMT, val)[0], cas
123
124 def incr(self, key, amt=1, init=0, exp=0):
125 """Increment or create the named counter."""
126 return self.__incrdecr(memcacheConstants.CMD_INCR, key, amt, init, exp)
127
128 def decr(self, key, amt=1, init=0, exp=0):
129 """Decrement or create the named counter."""
130 return self.__incrdecr(memcacheConstants.CMD_DECR, key, amt, init, exp)
131
132 def set(self, key, exp, flags, val):
133 """Set a value in the memcached server."""
134 return self._mutate(memcacheConstants.CMD_SET, key, exp, flags, 0, val)
135
136 def add(self, key, exp, flags, val):
137 """Add a value in the memcached server iff it doesn't already exist."""
138 return self._mutate(memcacheConstants.CMD_ADD, key, exp, flags, 0, val)
139
140 def replace(self, key, exp, flags, val):
141 """Replace a value in the memcached server iff it already exists."""
142 return self._mutate(memcacheConstants.CMD_REPLACE, key, exp, flags, 0,
143 val)
144
145 def __parseGet(self, data, klen=0):
146 flags=struct.unpack(memcacheConstants.GET_RES_FMT, data[-1][:4])[0]
147 return flags, data[1], data[-1][4 + klen:]
148
149 def get(self, key):
150 """Get the value for a given key within the memcached server."""
151 parts=self._doCmd(memcacheConstants.CMD_GET, key, '')
152 return self.__parseGet(parts)
153
154 def getl(self, key, exp=15):
155 """Get the value for a given key within the memcached server."""
156 parts=self._doCmd(memcacheConstants.CMD_GET_LOCKED, key, '',
157 struct.pack(memcacheConstants.GETL_PKT_FMT, exp))
158 return self.__parseGet(parts)
159
160 def cas(self, key, exp, flags, oldVal, val):
161 """CAS in a new value for the given key and comparison value."""
162 self._mutate(memcacheConstants.CMD_SET, key, exp, flags,
163 oldVal, val)
164
165 def touch(self, key, exp):
166 """Touch a key in the memcached server."""
167 return self._doCmd(memcacheConstants.CMD_TOUCH, key, '',
168 struct.pack(memcacheConstants.TOUCH_PKT_FMT, exp))
169
170 def gat(self, key, exp):
171 """Get the value for a given key and touch it within the memcached server."""
172 parts=self._doCmd(memcacheConstants.CMD_GAT, key, '',
173 struct.pack(memcacheConstants.GAT_PKT_FMT, exp))
174 return self.__parseGet(parts)
175
176 def version(self):
177 """Get the value for a given key within the memcached server."""
178 return self._doCmd(memcacheConstants.CMD_VERSION, '', '')
179
180 def sasl_mechanisms(self):
181 """Get the supported SASL methods."""
182 return set(self._doCmd(memcacheConstants.CMD_SASL_LIST_MECHS,
183 '', '')[2].split(' '))
184
185 def sasl_auth_start(self, mech, data):
186 """Start a sasl auth session."""
187 return self._doCmd(memcacheConstants.CMD_SASL_AUTH, mech, data)
188
189 def sasl_auth_plain(self, user, password, foruser=''):
190 """Perform plain auth."""
191 return self.sasl_auth_start('PLAIN', '\0'.join([foruser, user, password]))
192
193 def sasl_auth_cram_md5(self, user, password):
194 """Start a plan auth session."""
195 try:
196 self.sasl_auth_start('CRAM-MD5', '')
197 except MemcachedError, e:
198 if e.status != memcacheConstants.ERR_AUTH_CONTINUE:
199 raise
200 challenge = e.msg
201
202 dig = hmac.HMAC(password, challenge).hexdigest()
203 return self._doCmd(memcacheConstants.CMD_SASL_STEP, 'CRAM-MD5',
204 user + ' ' + dig)
205
206 def stop_persistence(self):
207 return self._doCmd(memcacheConstants.CMD_STOP_PERSISTENCE, '', '')
208
209 def start_persistence(self):
210 return self._doCmd(memcacheConstants.CMD_START_PERSISTENCE, '', '')
211
212 def set_flush_param(self, key, val):
213 print "setting flush param:", key, val
214 return self._doCmd(memcacheConstants.CMD_SET_FLUSH_PARAM, key, val)
215
216 def stop_replication(self):
217 return self._doCmd(memcacheConstants.CMD_STOP_REPLICATION, '', '')
218
219 def start_replication(self):
220 return self._doCmd(memcacheConstants.CMD_START_REPLICATION, '', '')
221
222 def start_onlineupdate(self):
223 return self._doCmd(memcacheConstants.CMD_START_ONLINEUPDATE, '', '')
224
225 def complete_onlineupdate(self):
226 return self._doCmd(memcacheConstants.CMD_COMPLETE_ONLINEUPDATE, '', '')
227
228 def revert_onlineupdate(self):
229 return self._doCmd(memcacheConstants.CMD_REVERT_ONLINEUPDATE, '', '')
230
231 def set_tap_param(self, key, val):
232 print "setting tap param:", key, val
233 return self._doCmd(memcacheConstants.CMD_SET_TAP_PARAM, key, val)
234
235 def set_vbucket_state(self, vbucket, stateName):
236 assert isinstance(vbucket, int)
237 self.vbucketId = vbucket
238 state = struct.pack(memcacheConstants.VB_SET_PKT_FMT,
239 memcacheConstants.VB_STATE_NAMES[stateName])
240 return self._doCmd(memcacheConstants.CMD_SET_VBUCKET_STATE, '', '', state)
241
242 def get_vbucket_state(self, vbucket):
243 return self._doCmd(memcacheConstants.CMD_GET_VBUCKET_STATE,
244 str(vbucket), '')
245
246 def delete_vbucket(self, vbucket):
247 assert isinstance(vbucket, int)
248 self.vbucketId = vbucket
249 return self._doCmd(memcacheConstants.CMD_DELETE_VBUCKET, '', '')
250
251 def evict_key(self, key):
252 return self._doCmd(memcacheConstants.CMD_EVICT_KEY, key, '')
253
254 def getMulti(self, keys):
255 """Get values for any available keys in the given iterable.
256
257 Returns a dict of matched keys to their values."""
258 opaqued=dict(enumerate(keys))
259 terminal=len(opaqued)+10
260 # Send all of the keys in quiet
261 for k,v in opaqued.iteritems():
262 self._sendCmd(memcacheConstants.CMD_GETQ, v, '', k)
263
264 self._sendCmd(memcacheConstants.CMD_NOOP, '', '', terminal)
265
266 # Handle the response
267 rv={}
268 done=False
269 while not done:
270 opaque, cas, data=self._handleSingleResponse(None)
271 if opaque != terminal:
272 rv[opaqued[opaque]]=self.__parseGet((opaque, cas, data))
273 else:
274 done=True
275
276 return rv
277
278 def setMulti(self, exp, flags, items):
279 """Multi-set (using setq).
280
281 Give me (key, value) pairs."""
282
283 # If this is a dict, convert it to a pair generator
284 if hasattr(items, 'iteritems'):
285 items = items.iteritems()
286
287 opaqued=dict(enumerate(items))
288 terminal=len(opaqued)+10
289 extra=struct.pack(SET_PKT_FMT, flags, exp)
290
291 # Send all of the keys in quiet
292 for opaque,kv in opaqued.iteritems():
293 self._sendCmd(memcacheConstants.CMD_SETQ, kv[0], kv[1], opaque, extra)
294
295 self._sendCmd(memcacheConstants.CMD_NOOP, '', '', terminal)
296
297 # Handle the response
298 failed = []
299 done=False
300 while not done:
301 try:
302 opaque, cas, data = self._handleSingleResponse(None)
303 done = opaque == terminal
304 except MemcachedError, e:
305 failed.append(e)
306
307 return failed
308
309 def stats(self, sub=''):
310 """Get stats."""
311 opaque=self.r.randint(0, 2**32)
312 self._sendCmd(memcacheConstants.CMD_STAT, sub, '', opaque)
313 done = False
314 rv = {}
315 while not done:
316 cmd, opaque, cas, klen, extralen, data = self._handleKeyedResponse(None)
317 if klen:
318 rv[data[0:klen]] = data[klen:]
319 else:
320 done = True
321 return rv
322
323 def noop(self):
324 """Send a noop command."""
325 return self._doCmd(memcacheConstants.CMD_NOOP, '', '')
326
327 def delete(self, key, cas=0):
328 """Delete the value for a given key within the memcached server."""
329 return self._doCmd(memcacheConstants.CMD_DELETE, key, '', '', cas)
330
331 def flush(self, timebomb=0):
332 """Flush all storage in a memcached instance."""
333 return self._doCmd(memcacheConstants.CMD_FLUSH, '', '',
334 struct.pack(memcacheConstants.FLUSH_PKT_FMT, timebomb))
335
336 def bucket_select(self, name):
337 return self._doCmd(memcacheConstants.CMD_SELECT_BUCKET, name, '')
338
339 def sync_persistence(self, keyspecs):
340 payload = self._build_sync_payload(0x8, keyspecs)
341
342 print "sending sync for persistence command for the following keyspecs:", keyspecs
343 (opaque, cas, data) = self._doCmd(memcacheConstants.CMD_SYNC, "", payload)
344 return (opaque, cas, self._parse_sync_response(data))
345
346 def sync_mutation(self, keyspecs):
347 payload = self._build_sync_payload(0x4, keyspecs)
348
349 print "sending sync for mutation command for the following keyspecs:", keyspecs
350 (opaque, cas, data) = self._doCmd(memcacheConstants.CMD_SYNC, "", payload)
351 return (opaque, cas, self._parse_sync_response(data))
352
353 def sync_replication(self, keyspecs, numReplicas=1):
354 payload = self._build_sync_payload((numReplicas & 0x0f) << 4, keyspecs)
355
356 print "sending sync for replication command for the following keyspecs:", keyspecs
357 (opaque, cas, data) = self._doCmd(memcacheConstants.CMD_SYNC, "", payload)
358 return (opaque, cas, self._parse_sync_response(data))
359
360 def sync_replication_or_persistence(self, keyspecs, numReplicas=1):
361 payload = self._build_sync_payload(((numReplicas & 0x0f) << 4) | 0x8, keyspecs)
362
363 print "sending sync for replication or persistence command for the " \
364 "following keyspecs:", keyspecs
365 (opaque, cas, data) = self._doCmd(memcacheConstants.CMD_SYNC, "", payload)
366 return (opaque, cas, self._parse_sync_response(data))
367
368 def sync_replication_and_persistence(self, keyspecs, numReplicas=1):
369 payload = self._build_sync_payload(((numReplicas & 0x0f) << 4) | 0xA, keyspecs)
370
371 print "sending sync for replication and persistence command for the " \
372 "following keyspecs:", keyspecs
373 (opaque, cas, data) = self._doCmd(memcacheConstants.CMD_SYNC, "", payload)
374 return (opaque, cas, self._parse_sync_response(data))
375
376 def _build_sync_payload(self, flags, keyspecs):
377 payload = struct.pack(">I", flags)
378 payload += struct.pack(">H", len(keyspecs))
379
380 for spec in keyspecs:
381 if not isinstance(spec, dict):
382 raise TypeError("each keyspec must be a dict")
383 if not spec.has_key('vbucket'):
384 raise TypeError("missing vbucket property in keyspec")
385 if not spec.has_key('key'):
386 raise TypeError("missing key property in keyspec")
387
388 payload += struct.pack(">Q", spec.get('cas', 0))
389 payload += struct.pack(">H", spec['vbucket'])
390 payload += struct.pack(">H", len(spec['key']))
391 payload += spec['key']
392
393 return payload
394
395 def _parse_sync_response(self, data):
396 keyspecs = []
397 nkeys = struct.unpack(">H", data[0 : struct.calcsize("H")])[0]
398 offset = struct.calcsize("H")
399
400 for i in xrange(nkeys):
401 spec = {}
402 width = struct.calcsize("QHHB")
403 (spec['cas'], spec['vbucket'], keylen, eventid) = \
404 struct.unpack(">QHHB", data[offset : offset + width])
405 offset += width
406 spec['key'] = data[offset : offset + keylen]
407 offset += keylen
408
409 if eventid == memcacheConstants.CMD_SYNC_EVENT_PERSISTED:
410 spec['event'] = 'persisted'
411 elif eventid == memcacheConstants.CMD_SYNC_EVENT_MODIFED:
412 spec['event'] = 'modified'
413 elif eventid == memcacheConstants.CMD_SYNC_EVENT_DELETED:
414 spec['event'] = 'deleted'
415 elif eventid == memcacheConstants.CMD_SYNC_EVENT_REPLICATED:
416 spec['event'] = 'replicated'
417 elif eventid == memcacheConstants.CMD_SYNC_INVALID_KEY:
418 spec['event'] = 'invalid key'
419 elif spec['event'] == memcacheConstants.CMD_SYNC_INVALID_CAS:
420 spec['event'] = 'invalid cas'
421 else:
422 spec['event'] = eventid
423
424 keyspecs.append(spec)
425
426 return keyspecs
427
428 def restore_file(self, filename):
429 """Initiate restore of a given file."""
430 return self._doCmd(memcacheConstants.CMD_RESTORE_FILE, filename, '', '', 0)
431
432 def restore_complete(self):
433 """Notify the server that we're done restoring."""
434 return self._doCmd(memcacheConstants.CMD_RESTORE_COMPLETE, '', '', '', 0)
435
436 def deregister_tap_client(self, tap_name):
437 """Deregister the TAP client with a given name."""
438 return self._doCmd(memcacheConstants.CMD_DEREGISTER_TAP_CLIENT, tap_name, '', '', 0)
439
440 def reset_replication_chain(self):
441 """Reset the replication chain."""
442 return self._doCmd(memcacheConstants.CMD_RESET_REPLICATION_CHAIN, '', '', '', 0)
Something went wrong with that request. Please try again.