Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial commit with the source.

  • Loading branch information...
commit 52ae6895219b08f922a60e6d19bbd0a06ea08d50 1 parent e54c8df
@hrosenhorn authored
View
27 LICENSE
@@ -0,0 +1,27 @@
+Copyright (C) 2010, H�kan Rosenhorn
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+ * Neither the name of Hyves (Startphone Ltd.) nor the names of its
+ contributors may be used to endorse or promote products derived from this
+ software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
View
27 LICENSE.concurrence
@@ -0,0 +1,27 @@
+Copyright (C) 2009, Hyves (Startphone Ltd.)
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+ * Neither the name of Hyves (Startphone Ltd.) nor the names of its
+ contributors may be used to endorse or promote products derived from this
+ software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
View
13 examples/simple_query.py
@@ -0,0 +1,13 @@
+from geventmemcache.client import Memcache
+
+servers = [(("127.0.0.1", 6070), 100)]
+
+client = Memcache(servers)
+
+client.set("TestKey", 123)
+result = client.get("TestKey")
+
+print "Result is", result
+
+
+
View
61 lib/geventmemcache/__init__.py
@@ -0,0 +1,61 @@
+# Copyright (C) 2009, Hyves (Startphone Ltd.)
+#
+# This module is part of the Concurrence Framework and is released under
+# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
+import logging
+logging.TRACE = 5
+
+class MemcacheError(Exception):
+ pass
+
+class MemcacheResult(object):
+ """representation of memcache result (codes)"""
+
+ _interned = {}
+
+ def __init__(self, name, msg = ''):
+ self._name = name
+ self._msg = msg
+
+ @property
+ def msg(self):
+ return self._msg
+
+ def __repr__(self):
+ return "MemcacheResult.%s" % self._name
+
+ def __eq__(self, other):
+ return isinstance(other, MemcacheResult) and other._name == self._name
+
+ @classmethod
+ def get(cls, line):
+ code = cls._interned.get(line, None)
+ if code is None:
+ #try client or server error
+ if line.startswith('CLIENT_ERROR'):
+ return MemcacheResult("CLIENT_ERROR", line[13:])
+ elif line.startswith('SERVER_ERROR'):
+ return MemcacheResult("SERVER_ERROR", line[13:])
+ else:
+ raise MemcacheError("unknown response: %s" % repr(line))
+ else:
+ return code
+
+ @classmethod
+ def _intern(cls, name):
+ cls._interned[name] = MemcacheResult(name)
+ return cls._interned[name]
+
+MemcacheResult.OK = MemcacheResult._intern("OK")
+MemcacheResult.STORED = MemcacheResult._intern("STORED")
+MemcacheResult.NOT_STORED = MemcacheResult._intern("NOT_STORED")
+MemcacheResult.EXISTS = MemcacheResult._intern("EXISTS")
+MemcacheResult.NOT_FOUND = MemcacheResult._intern("NOT_FOUND")
+MemcacheResult.DELETED = MemcacheResult._intern("DELETED")
+MemcacheResult.ERROR = MemcacheResult._intern("ERROR")
+MemcacheResult.TIMEOUT = MemcacheResult._intern("TIMEOUT")
+
+from geventmemcache.client import Memcache, MemcacheConnection, MemcacheConnectionManager
+from geventmemcache.behaviour import MemcacheBehaviour
+from geventmemcache.protocol import MemcacheProtocol
+from geventmemcache.codec import MemcacheCodec
View
35 lib/geventmemcache/behaviour.py
@@ -0,0 +1,35 @@
+from geventmemcache import MemcacheError, ketama
+
+class MemcacheBehaviour(object):
+ @classmethod
+ def create(cls, type_):
+ if isinstance(type_, MemcacheBehaviour):
+ return type_
+ elif type_ == "modulo":
+ return MemcacheModuloBehaviour()
+ elif type_ == "ketama":
+ return MemcacheKetamaBehaviour()
+ else:
+ raise MemcacheError("unknown behaviour: %s" % type_)
+
+class MemcacheModuloBehaviour(MemcacheBehaviour):
+ def __init__(self):
+ pass
+
+ def set_servers(self, servers):
+ self._servers = servers
+
+ def key_to_addr(self, key):
+ return self._servers[hash(key) % len(self._servers)]
+
+class MemcacheKetamaBehaviour(MemcacheBehaviour):
+ def __init__(self):
+ self._continuum = None
+
+ def set_servers(self, servers):
+ self._continuum = ketama.build_continuum(servers)
+
+ def key_to_addr(self, key):
+ return ketama.get_server(key, self._continuum)
+
+
View
297 lib/geventmemcache/buffered.py
@@ -0,0 +1,297 @@
+# Copyright (C) 2009, Hyves (Startphone Ltd.)
+#
+# This module is part of the Concurrence Framework and is released under
+# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
+
+
+from common import Buffer, BufferOverflowError, BufferUnderflowError, BufferInvalidArgumentError
+from gevent import socket
+
+class BufferedReader(object):
+ def __init__(self, stream, buffer):
+ assert stream is None or isinstance(stream, socket.socket)
+ self.stream = stream
+ self.buffer = buffer
+ #assume no reading from underlying stream was done, so make sure buffer reflects this:
+ self.buffer.position = 0
+ self.buffer.limit = 0
+
+ #def file(self):
+ # return CompatibleFile(self, None)
+
+ def clear(self):
+ self.buffer.clear()
+
+ def _read_more(self):
+ #any partially read data will be put in front, otherwise normal clear:
+ self.buffer.compact()
+ data = self.stream.recv(self.buffer.limit - self.buffer.position)
+ if not data:
+ raise EOFError("while reading")
+ self.buffer.write_bytes(data)
+ self.buffer.flip() #prepare to read from buffer
+
+ def read_lines(self):
+ """note that it cant read line accross buffer"""
+ if self.buffer.remaining == 0:
+ self._read_more()
+ while True:
+ try:
+ yield self.buffer.read_line()
+ except BufferUnderflowError:
+ self._read_more()
+
+ def read_line(self):
+ """note that it cant read line accross buffer"""
+ if self.buffer.remaining == 0:
+ self._read_more()
+ while True:
+ try:
+ return self.buffer.read_line()
+ except BufferUnderflowError:
+ self._read_more()
+
+ def read_bytes_available(self):
+ if self.buffer.remaining == 0:
+ self._read_more()
+ return self.buffer.read_bytes(-1)
+
+ def read_bytes(self, n):
+ """read exactly n bytes from stream"""
+ buffer = self.buffer
+ s = []
+ while n > 0:
+ r = buffer.remaining
+ if r > 0:
+ s.append(buffer.read_bytes(min(n, r)))
+ n -= r
+ else:
+ self._read_more()
+
+ return ''.join(s)
+
+ def read_int(self):
+ if self.buffer.remaining == 0:
+ self._read_more()
+ while True:
+ try:
+ return self.buffer.read_int()
+ except BufferUnderflowError:
+ self._read_more()
+
+ def read_short(self):
+ if self.buffer.remaining == 0:
+ self._read_more()
+ while True:
+ try:
+ return self.buffer.read_short()
+ except BufferUnderflowError:
+ self._read_more()
+
+class BufferedWriter(object):
+ def __init__(self, stream, buffer):
+ assert stream is None or isinstance(stream, socket.socket)
+ self.stream = stream
+ self.buffer = buffer
+
+ #def file(self):
+ # return CompatibleFile(None, self)
+
+ def clear(self):
+ self.buffer.clear()
+
+ def write_bytes(self, s):
+ assert type(s) == str, "arg must be a str, got: %s" % type(s)
+ try:
+ self.buffer.write_bytes(s)
+ except BufferOverflowError:
+ #we need to send it in parts, flushing as we go
+ while s:
+ r = self.buffer.remaining
+ part, s = s[:r], s[r:]
+ self.buffer.write_bytes(part)
+ self.flush()
+
+ def write_byte(self, ch):
+ assert type(ch) == int, "ch arg must be int"
+ while True:
+ try:
+ self.buffer.write_byte(ch)
+ return
+ except BufferOverflowError:
+ self.flush()
+
+ def write_short(self, i):
+ while True:
+ try:
+ self.buffer.write_short(i)
+ return
+ except BufferOverflowError:
+ self.flush()
+
+ def write_int(self, i):
+ while True:
+ try:
+ self.buffer.write_int(i)
+ return
+ except BufferOverflowError:
+ self.flush()
+
+ def flush(self):
+ self.buffer.flip()
+ bytes = self.buffer.read_bytes()
+ self.stream.sendall(bytes)
+ self.buffer.clear()
+
+class BufferedStream(object):
+
+ _reader_pool = {} #buffer_size -> [list of readers]
+ _writer_pool = {} #bufffer_size -> [list of writers]
+
+ __slots__ = ['_stream', '_writer', '_reader', '_read_buffer_size', '_write_buffer_size']
+
+ def __init__(self, stream, buffer_size = 1024 * 8, read_buffer_size = 0, write_buffer_size = 0):
+ self._stream = stream
+ self._writer = None
+ self._reader = None
+ self._read_buffer_size = read_buffer_size or buffer_size
+ self._write_buffer_size = write_buffer_size or buffer_size
+
+ def flush(self):
+ if self._writer:
+ self._writer.flush()
+
+ @property
+ def reader(self):
+ if self._reader is None:
+ self._reader = BufferedReader(self._stream, Buffer(self._read_buffer_size))
+ return self._reader
+
+ @property
+ def writer(self):
+ if self._writer is None:
+ self._writer = BufferedWriter(self._stream, Buffer(self._write_buffer_size))
+ return self._writer
+
+ class _borrowed_writer(object):
+ def __init__(self, stream):
+ buffer_size = stream._write_buffer_size
+ if stream._writer is None:
+ if stream._writer_pool.get(buffer_size, []):
+ writer = stream._writer_pool[buffer_size].pop()
+ else:
+ writer = BufferedWriter(None, Buffer(buffer_size))
+ else:
+ writer = stream._writer
+ writer.stream = stream._stream
+ self._writer = writer
+ self._stream = stream
+
+ def __enter__(self):
+ return self._writer
+
+ def __exit__(self, type, value, traceback):
+ #TODO!!! handle exception case/exit
+ if self._writer.buffer.position != 0:
+ self._stream._writer = self._writer
+ else:
+ writer_pool = self._stream._writer_pool.setdefault(self._stream._write_buffer_size, [])
+ writer_pool.append(self._writer)
+ self._stream._writer = None
+
+ class _borrowed_reader(object):
+ def __init__(self, stream):
+ buffer_size = stream._read_buffer_size
+ if stream._reader is None:
+ if stream._reader_pool.get(buffer_size, []):
+ reader = stream._reader_pool[buffer_size].pop()
+ else:
+ reader = BufferedReader(None, Buffer(buffer_size))
+ else:
+ reader = stream._reader
+ reader.stream = stream._stream
+ self._reader = reader
+ self._stream = stream
+
+ def __enter__(self):
+ return self._reader
+
+ def __exit__(self, type, value, traceback):
+ #TODO!!! handle exception case/exit
+ if self._reader.buffer.remaining:
+ self._stream._reader = self._reader
+ else:
+ reader_pool = self._stream._reader_pool.setdefault(self._stream._read_buffer_size, [])
+ reader_pool.append(self._reader)
+ self._stream._reader = None
+
+ def get_writer(self):
+ return self._borrowed_writer(self)
+
+ def get_reader(self):
+ return self._borrowed_reader(self)
+
+ def close(self):
+ self._stream.close()
+ del self._stream
+ del self._reader
+ del self._writer
+'''
+class CompatibleFile(object):
+ """A wrapper that implements python's file like object semantics on top
+ of concurrence BufferedReader and or BufferedWriter. Don't create
+ this object directly, but use the file() method on BufferedReader or BufferedWriter"""
+ def __init__(self, reader = None, writer = None):
+ self._reader = reader
+ self._writer = writer
+
+ def readlines(self):
+ reader = self._reader
+ buffer = reader.buffer
+ while True:
+ try:
+ yield buffer.read_line(True)
+ except BufferUnderflowError:
+ try:
+ reader._read_more()
+ except EOFError:
+ buffer.flip()
+ yield buffer.read_bytes(-1)
+
+ def readline(self):
+ return self.readlines().next()
+
+ def read(self, n = -1):
+ reader = self._reader
+ buffer = reader.buffer
+ s = []
+ if n == -1: #read all available bytes until EOF
+ while True:
+ s.append(buffer.read_bytes(-1))
+ try:
+ reader._read_more()
+ except EOFError:
+ buffer.flip()
+ break
+ else:
+ while n > 0: #read uptill n avaiable bytes or EOF
+ r = buffer.remaining
+ if r > 0:
+ s.append(buffer.read_bytes(min(n, r)))
+ n -= r
+ else:
+ try:
+ reader._read_more()
+ except EOFError:
+ buffer.flip()
+ break
+ return ''.join(s)
+
+ def write(self, s):
+ self._writer.write_bytes(s)
+
+ def flush(self):
+ self._writer.flush()
+
+
+'''
View
378 lib/geventmemcache/client.py
@@ -0,0 +1,378 @@
+# Copyright (C) 2009, Hyves (Startphone Ltd.)
+#
+# This module is part of the Concurrence Framework and is released under
+# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
+from __future__ import with_statement
+
+import logging
+
+from gevent import Timeout
+from gevent import queue
+from gevent import socket
+
+#from concurrence import DeferredQueue
+from geventmemcache.extra import DeferredQueue
+#from geventmysql.buffered import BufferedStream
+from buffered import BufferedStream
+
+from common import Buffer
+
+
+from geventmemcache import MemcacheError, MemcacheResult
+from geventmemcache.codec import MemcacheCodec
+from geventmemcache.behaviour import MemcacheBehaviour
+from geventmemcache.protocol import MemcacheProtocol
+
+TIMEOUT_CURRENT = -2
+
+class TimeoutError(Exception):
+ """This exception can be raised by various methods that accept *timeout* parameters."""
+ pass
+
+#TODO:
+
+#linger on close
+#batch operations
+#how to communicate and handle errors (raise error for get/gets?) and or extra stuff like flags?
+#timeout on commands (test tasklet based timeout)
+#statistics
+#gzip support
+#close unused connections
+#proper buffer sizes
+#norepy e.g. no server response to set commands, what is the fastest fill rate agains a single memcached server?
+#stats cmd (+item + item size stats
+
+#what to do with partial multi get failure accross multiple servers?, e.g. return partial keys?
+
+#bundling of multiple requests in 1 flush (autoflush on/off)
+#todo detect timeouts on write/read, and mark host as dead
+#keep some time before retrying host
+#close down node no recv ERROR?
+#UPD support
+#binary support
+#how to handle timouts in the pipelined case?
+#TODO validate keys!, they are 'txt' not random bins!, e.g. some chars not allowed, which ones?
+#CLAMP timestamps at 2**31-1
+#CHECK KEY MAX LEN, VAL MAX VALUE LEN, VALID KEY
+
+class ResultChannel(queue.Queue):
+ # Ported to gevent Queue
+ pass
+
+class MemcacheConnection(object):
+ log = logging.getLogger("MemcacheConnection")
+
+ _read_timeout = 2
+ _write_timeout = 2
+
+ def __init__(self, address, protocol = "text", codec = "default"):
+
+ self._address = address
+
+ self._stream = None
+ self._read_queue = DeferredQueue()
+ self._write_queue = DeferredQueue()
+
+ self._protocol = MemcacheProtocol.create(protocol)
+ self._protocol.set_codec(MemcacheCodec.create(codec))
+
+ def connect(self):
+ self.log.log(logging.TRACE, "Connecting to Memcache %s", self._address)
+
+ self._stream = BufferedStream(socket.create_connection(self._address))
+
+ def disconnect(self):
+ if self._stream is None:
+ return
+
+ try:
+ self._stream.close()
+ except:
+ pass
+
+ self._stream = None
+
+ def is_connected(self):
+ return self._stream is not None
+
+ def flush(self):
+ self._stream.flush()
+
+ def _write_command(self, cmd, args, flush = True):
+ with self._stream.get_writer() as writer:
+ getattr(self._protocol, 'write_' + cmd)(writer, *args)
+ if flush:
+ writer.flush()
+
+ def _read_result(self, cmd):
+ with self._stream.get_reader() as reader:
+ return getattr(self._protocol, 'read_' + cmd)(reader)
+
+ def _defer_command(self, cmd, args, result_channel, error_value = None):
+ def _read_result():
+
+ timeout = Timeout(self._read_timeout, Timeout)
+ timeout.start()
+ try:
+ result = self._read_result(cmd)
+ result_channel.put(result)
+ except Timeout:
+ raise
+ except:
+ self.log.exception("read error in defer_command")
+ result_channel.put((MemcacheResult.ERROR, error_value))
+
+ self.log.warn("Error communicating with Memcache %s, disconnecting", self._address)
+ self.disconnect()
+ finally:
+ timeout.cancel()
+
+ def _write_command():
+
+ timeout = Timeout(self._write_timeout, Timeout)
+ timeout.start()
+ try:
+ if not self.is_connected():
+ self.connect()
+ self._write_command(cmd, args, True)
+ self._read_queue.defer(_read_result)
+ except Timeout:
+ raise
+ except:
+ result_channel.put((MemcacheResult.ERROR, error_value))
+
+ self.log.warn("Error communicating with Memcache %s, disconnecting", self._address)
+ self.disconnect()
+ finally:
+ timeout.cancel()
+
+ self._write_queue.defer(_write_command)
+
+ def _do_command(self, cmd, args, error_value = None):
+ result_channel = ResultChannel()
+
+ self._defer_command(cmd, args, result_channel, error_value)
+
+ try:
+ return result_channel.get()
+ except TimeoutError:
+ return MemcacheResult.TIMEOUT, error_value
+
+ def close(self):
+ if self.is_connected():
+ self._stream.close()
+ self._stream = None
+
+ def delete(self, key, expiration = 0):
+ return self._do_command("delete", (key, expiration))[0]
+
+ def set(self, key, data, expiration = 0, flags = 0):
+ return self._do_command("set", (key, data, expiration, flags))[0]
+
+ def __setitem__(self, key, data):
+ self.set(key, data)
+
+ def add(self, key, data, expiration = 0, flags = 0):
+ return self._do_command("add", (key, data, expiration, flags))[0]
+
+ def replace(self, key, data, expiration = 0, flags = 0):
+ return self._do_command("replace", (key, data, expiration, flags))[0]
+
+ def append(self, key, data, expiration = 0, flags = 0):
+ return self._do_command("append", (key, data, expiration, flags))[0]
+
+ def prepend(self, key, data, expiration = 0, flags = 0):
+ return self._do_command("prepend", (key, data, expiration, flags))[0]
+
+ def cas(self, key, data, cas_unique, expiration = 0, flags = 0):
+ return self._do_command("cas", (key, data, expiration, flags, cas_unique))[0]
+
+ def incr(self, key, increment):
+ return self._do_command("incr", (key, increment))
+
+ def decr(self, key, increment):
+ return self._do_command("decr", (key, increment))
+
+ def get(self, key, default = None):
+ _, values = self._do_command("get", ([key], ), {})
+ return values.get(key, default)
+
+ def __getitem__(self, key):
+ return self.get(key)
+
+ def getr(self, key, default = None):
+ result, values = self._do_command("get", ([key], ), {})
+ return result, values.get(key, default)
+
+ def gets(self, key, default = None):
+ result, values = self._do_command("gets", ([key], ), {})
+ value, cas_unique = values.get(key, (default, None))
+ return result, value, cas_unique
+
+ def get_multi(self, keys):
+ return self._do_command("get", (keys, ))
+
+ def gets_multi(self, keys):
+ return self._do_command("gets", (keys, ))
+
+ def version(self):
+ return self._do_command("version", ())
+
+ def stats(self):
+ return self._do_command("stats", ())
+
+class MemcacheConnectionManager(object):
+ _instance = None #TODO when we support multiple protocols, we need to have 1 instance per protocol
+
+ def __init__(self):
+ self._connections = {} #address -> connection
+
+ def get_connection(self, address, protocol):
+ """gets a connection to memcached servers at given address using given protocol."""
+ if not address in self._connections:
+ self._connections[address] = MemcacheConnection(address, protocol)
+
+ return self._connections[address]
+
+ def close_all(self):
+ for connection in self._connections.values():
+ connection.close()
+ self._connections = {}
+
+ @classmethod
+ def create(cls, type_):
+ if isinstance(type_, MemcacheConnectionManager):
+ return type_
+ elif type_ == "default":
+ if cls._instance is None:
+ cls._instance = MemcacheConnectionManager()
+ return cls._instance
+ else:
+ raise MemcacheError("connection manager: %s" % type_)
+
+class Memcache(object):
+ """
+ TODO: Write docstring
+ """
+ def __init__(self, servers = None, codec = "default", behaviour = "ketama", protocol = "text", connection_manager = "default"):
+
+ self.read_timeout = 2
+ self.write_timeout = 2
+ self.connect_timeout = 2
+
+ self._protocol = MemcacheProtocol.create(protocol)
+ self._protocol.set_codec(codec)
+
+ self._connection_manager = MemcacheConnectionManager.create(connection_manager)
+
+ self._behaviour = MemcacheBehaviour.create(behaviour)
+ self._key_to_addr = self._behaviour.key_to_addr
+
+ self.set_servers(servers)
+
+ def _get_connection(self, addr):
+ return self._connection_manager.get_connection(addr, self._protocol)
+
+ def _get(self, cmd, key, default):
+ result_channel = ResultChannel()
+ connection = self.connection_for_key(key)
+ connection._defer_command(cmd, [[key]], result_channel, {})
+ result, values = result_channel.get()
+ return result, values.get(key, default)
+
+ def _get_multi(self, cmd, keys):
+
+
+ #group keys by address (address->[keys]):
+ grouped_addrs = {}
+ for key in keys:
+ addr = self._key_to_addr(key)
+ grouped_addrs.setdefault(addr, []).append(key)
+
+ #n is the number of servers we need to 'get' from
+ n = len(grouped_addrs)
+
+ result_channel = ResultChannel()
+
+ for address, _keys in grouped_addrs.iteritems():
+ connection = self._get_connection(address)
+ connection._defer_command(cmd, [_keys], result_channel, {})
+
+ #loop over the results as they come in and aggregate the final result
+ values = {}
+ result = MemcacheResult.OK
+
+ # Mark FIX-GEVENT
+ receivedValues = []
+ for i in xrange(0, n):
+ receivedValues.append(result_channel.get())
+
+ #for _result, _values in result_channel.receive_n(n):
+ for _result, _values in receivedValues:
+ if MemcacheResult.OK is _result:
+ values.update(_values)
+ else:
+ result = _result #document that we only return the last not OK result
+ return result, values
+
+ def set_servers(self, servers = None):
+ if servers is not None:
+ self._behaviour.set_servers(servers)
+
+ def connection_for_key(self, key):
+ return self._get_connection(self._key_to_addr(key))
+
+ def delete(self, key, expiration = 0):
+ return self.connection_for_key(key)._do_command("delete", (key, expiration))[0]
+
+ def set(self, key, data, expiration = 0, flags = 0):
+ return self.connection_for_key(key)._do_command("set", (key, data, expiration, flags))[0]
+
+ def __setitem__(self, key, data):
+ self.set(key, data)
+
+ def add(self, key, data, expiration = 0, flags = 0):
+ return self.connection_for_key(key)._do_command("add", (key, data, expiration, flags))[0]
+
+ def replace(self, key, data, expiration = 0, flags = 0):
+ return self.connection_for_key(key)._do_command("replace", (key, data, expiration, flags))[0]
+
+ def append(self, key, data, expiration = 0, flags = 0):
+ return self.connection_for_key(key)._do_command("append", (key, data, expiration, flags))[0]
+
+ def prepend(self, key, data, expiration = 0, flags = 0):
+ return self.connection_for_key(key)._do_command("prepend", (key, data, expiration, flags))[0]
+
+ def cas(self, key, data, cas_unique, expiration = 0, flags = 0):
+ return self.connection_for_key(key)._do_command("cas", (key, data, expiration, flags, cas_unique))[0]
+
+ def incr(self, key, increment):
+ return self.connection_for_key(key)._do_command("incr", (key, increment))
+
+ def decr(self, key, increment):
+ return self.connection_for_key(key)._do_command("decr", (key, increment))
+
+ def get(self, key, default = None):
+ return self._get("get", key, default)[1]
+
+ def __getitem__(self, key):
+ return self.get(key)
+
+ def getr(self, key, default = None):
+ return self._get("get", key, default)
+
+ def gets(self, key, default = None):
+ result, (value, cas_unique) = self._get("gets", key, (default, None))
+ return result, value, cas_unique
+
+ def get_multi(self, keys):
+ return self._get_multi("get", keys)
+
+ def gets_multi(self, keys):
+ return self._get_multi("gets", keys)
+
+ def stats(self, addr):
+ connection = self._connection_manager.get_connection(addr, self._protocol)
+ return connection._do_command("stats", ())
+
+
View
65 lib/geventmemcache/codec.py
@@ -0,0 +1,65 @@
+import cPickle as pickle
+
+from geventmemcache import MemcacheError
+
+class MemcacheCodec(object):
+ def decode(self, flags, encoded_value):
+ assert False, "implement" #pragma: no cover
+
+ def encode(self, value, flags):
+ assert False, "implement" #pragma: no cover
+
+ @classmethod
+ def create(self, type_):
+ if isinstance(type_, MemcacheCodec):
+ return type_
+ elif type_ == "default":
+ return MemcacheDefaultCodec()
+ elif type_ == "raw":
+ return MemcacheRawCodec()
+ else:
+ raise MemcacheError("unknown codec: %s" % type_)
+
+class MemcacheDefaultCodec(MemcacheCodec):
+ _FLAG_PICKLE = 1<<0
+ _FLAG_INTEGER = 1<<1
+ _FLAG_LONG = 1<<2
+ _FLAG_UNICODE = 1<<3
+
+ def decode(self, flags, encoded_value):
+ if flags & self._FLAG_INTEGER:
+ return int(encoded_value)
+ elif flags & self._FLAG_LONG:
+ return long(encoded_value)
+ elif flags & self._FLAG_UNICODE:
+ return encoded_value.decode('utf-8')
+ elif flags & self._FLAG_PICKLE:
+ return pickle.loads(encoded_value)
+ else:
+ return encoded_value
+
+ def encode(self, value, flags):
+ if isinstance(value, str):
+ encoded_value = value
+ elif isinstance(value, int):
+ flags |= self._FLAG_INTEGER
+ encoded_value = str(value)
+ elif isinstance(value, long):
+ flags |= self._FLAG_LONG
+ encoded_value = str(value)
+ elif isinstance(value, unicode):
+ flags |= self._FLAG_UNICODE
+ encoded_value = value.encode('utf-8')
+ else:
+ flags |= self._FLAG_PICKLE
+ # When we support the binary protocol we can change this
+ encoded_value = pickle.dumps(value, 0)
+ return encoded_value, flags
+
+class MemcacheRawCodec(MemcacheCodec):
+ def decode(self, flags, encoded_value):
+ return encoded_value
+
+ def encode(self, value, flags):
+ return str(value), flags
+
View
96 lib/geventmemcache/extra.py
@@ -0,0 +1,96 @@
+import gevent
+from gevent import Timeout
+from gevent import Greenlet, GreenletExit, queue
+import logging
+
+class TaskletPool(object):
+ log = logging.getLogger('TaskletPool')
+
+ GAMMA = 0.995
+ TRESHOLD = 2.0
+ INIT_WORKERS = 2
+
+ def __init__(self):
+ self._queue = queue.Queue()
+ self._workers = []
+ for i in range(self.INIT_WORKERS):
+ self._add_worker()
+
+ #self._adjuster = Tasklet.interval(1.0, self._adjust, daemon = True)()
+ self._queue_len = 0.0
+
+ def _adjuster():
+ while True:
+ gevent.sleep(1)
+ self._adjust()
+
+ gevent.spawn(_adjuster)
+
+
+ def _add_worker(self):
+ self._workers.append(gevent.spawn(self._worker))
+
+ def _adjust(self):
+ self._queue_len = (self.GAMMA * self._queue_len) + ((1.0 - self.GAMMA) * self._queue.qsize())
+ x = self._queue_len / len(self._workers)
+ if x > self.TRESHOLD:
+ self._add_worker()
+
+ def _worker(self):
+ while True:
+ try:
+ f, args, kwargs = self._queue.get()
+
+ f(*args, **kwargs)
+
+ except GreenletExit:
+ raise
+ except:
+ self.log.exception("in taskpool worker")
+ gevent.sleep(1.0)
+
+
+ def defer(self, f, *args, **kwargs):
+ self._queue.put((f, args, kwargs))
+
+class DeferredQueue(object):
+ log = logging.getLogger('DeferredQueue')
+
+ __slots__ = ['_queue', '_working']
+
+ def __init__(self):
+ self._queue = queue.Queue()
+ self._working = False
+
+ def _pump(self):
+ try:
+ while not self._queue.empty():
+ try:
+ f, args, kwargs = self._queue.get()
+ f(*args, **kwargs)
+ except GreenletExit:
+ raise
+ except:
+ self.log.exception("in deferred queue")
+ finally:
+ self._working = False
+
+ def defer(self, f, *args, **kwargs):
+ self._queue.put((f, args, kwargs))
+ if not self._working:
+ self._working = True
+ GreenletExtra.defer(self._pump)
+
+class GreenletExtra(object):
+ _tasklet_pool = None
+
+ @classmethod
+ def _defer(cls, f, *args, **kwargs):
+ cls._tasklet_pool.defer(f, *args, **kwargs)
+
+ @classmethod
+ def defer(cls, f, *args, **kwargs):
+ #first time init the tasklet pool, next time _defer is used directly
+ cls.defer = cls._defer
+ cls._tasklet_pool = TaskletPool()
+ cls._tasklet_pool.defer(f, *args, **kwargs)
View
7,432 lib/geventmemcache/geventmemcache.common.c
7,432 additions, 0 deletions not shown
View
442 lib/geventmemcache/geventmemcache.common.pyx
@@ -0,0 +1,442 @@
+# Copyright (C) 2009, Hyves (Startphone Ltd.)
+#
+# This module is part of the Concurrence Framework and is released under
+# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
+
+"""
+base aynchronous mysql io library
+"""
+
+import datetime
+import types
+import sys
+
+cdef extern from "string.h":
+ cdef void *memmove(void *, void *, int)
+ cdef void *memcpy(void *, void *, int)
+ cdef void *memchr(void *, int, int)
+
+cdef extern from "stdlib.h":
+ cdef void *calloc(int, int)
+ cdef void free(void *)
+
+
+cdef extern from "Python.h":
+ object PyString_FromStringAndSize(char *, int)
+ object PyString_FromString(char *)
+ int PyString_AsStringAndSize(object obj, char **s, Py_ssize_t *len) except -1
+
+class BufferError(Exception):
+ pass
+
+class BufferOverflowError(BufferError):
+ pass
+
+class BufferUnderflowError(BufferError):
+ pass
+
+class BufferInvalidArgumentError(BufferError):
+ pass
+
+
+cdef class Buffer:
+ """Creates a :class:`Buffer` object. The buffer class forms the basis for IO in the Concurrence Framework.
+ The buffer class represents a mutable array of bytes of that can be read from and written to using the
+ read_XXX and write_XXX methods.
+ Operations on the buffer are performed relative to the current :attr:`position` attribute of the buffer.
+ A buffer also has a current :attr:`limit` property above which no data may be read or written.
+ If an operation tries to read beyond the current :attr:`limit` a BufferUnderflowError is raised. If an operation
+ tries to write beyond the current :attr:`limit` a BufferOverflowError is raised.
+ The general idea of the :class:`Buffer` was shamelessly copied from java NIO.
+ """
+
+ cdef unsigned char * _buff
+ cdef int _position
+ cdef Buffer _parent
+ cdef int _capacity
+ cdef int _limit
+
+ def __cinit__(self, int capacity, Buffer parent = None):
+ if parent is not None:
+ #this is a copy contructor for a shallow
+ #copy, e.g. we reference the same data as our parent, but have our
+ #own position and limit (use .duplicate method to get the copy)
+ self._parent = parent #this incs the refcnt on parent
+ self._buff = parent._buff
+ self._position = parent._position
+ self._limit = parent._limit
+ self._capacity = parent._capacity
+ else:
+ #normal constructor
+ self._parent = None
+ self._capacity = capacity
+ self._buff = <unsigned char *>(calloc(1, self._capacity))
+
+ def __dealloc__(self):
+ if self._parent is None:
+ free(self._buff)
+ else:
+ self._parent = None #releases our refcnt on parent
+
+ def __init__(self, int capacity, Buffer parent = None):
+ """Create a new empty buffer with the given *capacity*."""
+ self.clear()
+
+
+ def duplicate(self):
+ """Return a shallow copy of the Buffer, e.g. the copied buffer
+ references the same bytes as the original buffer, but has its own
+ independend position and limit."""
+ return Buffer(0, self)
+
+ def copy(self, Buffer src, int src_start, int dst_start, int length):
+ """Copies *length* bytes from buffer *src*, starting at position *src_start*, to this
+ buffer at position *dst_start*."""
+ if length < 0:
+ raise BufferInvalidArgumentError("length must be >= 0")
+ if src_start < 0:
+ raise BufferInvalidArgumentError("src start must be >= 0")
+ if src_start > src._capacity:
+ raise BufferInvalidArgumentError("src start must <= src capacity")
+ if src_start + length > src._capacity:
+ raise BufferInvalidArgumentError("src start + length must <= src capacity")
+ if dst_start < 0:
+ raise BufferInvalidArgumentError("dst start must be >= 0")
+ if dst_start > self._capacity:
+ raise BufferInvalidArgumentError("dst start must <= dst capacity")
+ if dst_start + length > self._capacity:
+ raise BufferInvalidArgumentError("dst start + length must <= dst capacity")
+ #now we can safely copy!
+ memcpy(self._buff + dst_start, src._buff + src_start, length)
+
+ def clear(self):
+ """Prepares the buffer for relative read operations. The buffers :attr:`limit` will set to the buffers :attr:`capacity` and
+ its :attr:`position` will be set to 0."""
+ self._limit = self._capacity
+ self._position = 0
+
+ def flip(self):
+ """Prepares the buffer for relative write operations. The buffers :attr:`limit` will set to the buffers :attr:`position` and
+ its :attr:`position` will be set to 0."""
+ self._limit = self._position
+ self._position = 0
+
+ def rewind(self):
+ """Sets the buffers :attr:`position` back to 0."""
+ self._position = 0
+
+ cdef int _skip(self, int n) except -1:
+ if self._position + n <= self.limit:
+ self._position = self._position + n
+ return n
+ else:
+ raise BufferUnderflowError()
+
+ def skip(self, int n):
+ """Updates the buffers position by skipping n bytes. It is not allowed to skip passed the current :attr:`limit`.
+ In that case a :exc:`BufferUnderflowError` will be raised and the :attr:`position` will remain the same"""
+ return self._skip(n)
+
+ cdef int _remaining(self):
+ return self._limit - self._position
+
+
+ property capacity:
+ def __get__(self):
+ return self._capacity
+
+ property remaining:
+ def __get__(self):
+ return self._limit - self._position
+
+ property limit:
+ def __get__(self):
+ return self._limit
+
+ def __set__(self, limit):
+ if limit >= 0 and limit <= self._capacity and limit >= self._position:
+ self._limit = limit
+ else:
+ if limit < 0:
+ raise BufferInvalidArgumentError("limit must be >= 0")
+ elif limit > self._capacity:
+ raise BufferInvalidArgumentError("limit must be <= capacity")
+ elif limit < self._position:
+ raise BufferInvalidArgumentError("limit must be >= position")
+ else:
+ raise BufferInvalidArgumentError()
+
+ property position:
+ def __get__(self):
+ return self._position
+
+ def __set__(self, position):
+ if position >= 0 and position <= self._capacity and position <= self._limit:
+ self._position = position
+ else:
+ if position < 0:
+ raise BufferInvalidArgumentError("position must be >= 0")
+ elif position > self._capacity:
+ raise BufferInvalidArgumentError("position must be <= capacity")
+ elif position > self._limit:
+ raise BufferInvalidArgumentError("position must be <= limit")
+ else:
+ raise BufferInvalidArgumentError()
+
+ cdef int _read_byte(self) except -1:
+ cdef int b
+ if self._position + 1 <= self._limit:
+ b = self._buff[self._position]
+ self._position = self._position + 1
+ return b
+ else:
+ raise BufferUnderflowError()
+
+ def read_byte(self):
+ """Reads and returns a single byte from the buffer and updates the :attr:`position` by 1."""
+ return self._read_byte()
+
+ def recv(self, int fd):
+ """Reads as many bytes as will fit up till the :attr:`limit` of the buffer from the filedescriptor *fd*.
+ Returns a tuple (bytes_read, bytes_remaining). If *bytes_read* is negative, a IO Error was encountered.
+ The :attr:`position` of the buffer will be updated according to the number of bytes read.
+ """
+ cdef int b
+ b = 0
+ #TODO
+ #b = read(fd, self._buff + self._position, self._limit - self._position)
+ if b > 0: self._position = self._position + b
+ return b, self._limit - self._position
+
+ def send(self, int fd):
+ """Sends as many bytes as possible up till the :attr:`limit` of the buffer to the filedescriptor *fd*.
+ Returns a tuple (bytes_written, bytes_remaining). If *bytes_written* is negative, an IO Error was encountered.
+ """
+ cdef int b
+ b = 0
+ #TODO
+ #b = write(fd, self._buff + self._position, self._limit - self._position)
+
+ if b > 0: self._position = self._position + b
+ return b, self._limit - self._position
+
+ def compact(self):
+ """Prepares the buffer again for relative reading, but any left over data still present in the buffer (the bytes between
+ the current :attr:`position` and current :attr:`limit`) will be copied to the start of the buffer. The position of the buffer
+ will be right after the copied data.
+ """
+ cdef int n
+ n = self._limit - self._position
+ if n > 0 and self._position > 0:
+ if n < self._position:
+ memcpy(self._buff + 0, self._buff + self._position, n)
+ else:
+ memmove(self._buff + 0, self._buff + self._position, n)
+ self._position = n
+ self._limit = self._capacity
+
+ def __getitem__(self, object i):
+ cdef int start, end, stride
+ if type(i) == types.IntType:
+ if i >= 0 and i < self._capacity:
+ return self._buff[i]
+ else:
+ raise BufferInvalidArgumentError("index must be >= 0 and < capacity")
+ elif type(i) == types.SliceType:
+ start, end, stride = i.indices(self._capacity)
+ return PyString_FromStringAndSize(<char *>(self._buff + start), end - start)
+ else:
+ raise BufferInvalidArgumentError("wrong index type")
+
+ def __setitem__(self, object i, object value):
+ cdef int start, end, stride
+ cdef char *b
+ cdef Py_ssize_t n
+ if type(i) == types.IntType:
+ if type(value) != types.IntType:
+ raise BufferInvalidArgumentError("value must be integer")
+ if value < 0 or value > 255:
+ raise BufferInvalidArgumentError("value must in range [0..255]")
+ if i >= 0 and i < self._capacity:
+ self._buff[i] = value
+ else:
+ raise BufferInvalidArgumentError("index must be >= 0 and < capacity")
+ elif type(i) == types.SliceType:
+ start, end, stride = i.indices(self._capacity)
+ PyString_AsStringAndSize(value, &b, &n)
+ if n != (end - start):
+ raise BufferInvalidArgumentError("incompatible slice")
+ memcpy(self._buff + start, b, n)
+ else:
+ raise BufferInvalidArgumentError("wrong index type")
+
+ def read_short(self):
+ """Read a 2 byte little endian integer from buffer and updates position."""
+ cdef int s
+ if 2 > (self._limit - self._position):
+ raise BufferUnderflowError()
+ else:
+ s = self._buff[self._position] + (self._buff[self._position + 1] << 8)
+ self._position = self._position + 2
+ return s
+
+ cdef object _read_bytes(self, int n):
+ """reads n bytes from buffer, updates position, and returns bytes as a python string"""
+ if n > (self._limit - self._position):
+ raise BufferUnderflowError()
+ else:
+ s = PyString_FromStringAndSize(<char *>(self._buff + self._position), n)
+ self._position = self._position + n
+ return s
+
+ def read_bytes(self, int n = -1):
+ """Reads n bytes from buffer, updates position, and returns bytes as a python string,
+ if there are no n bytes available, a :exc:`BufferUnderflowError` is raised."""
+ if n == -1:
+ return self._read_bytes(self._limit - self._position)
+ else:
+ return self._read_bytes(n)
+
+ def read_bytes_until(self, int b):
+ """Reads bytes until character b is found, or end of buffer is reached in which case it will raise a :exc:`BufferUnderflowError`."""
+ cdef int n, maxlen
+ cdef char *zpos, *start
+ if b < 0 or b > 255:
+ raise BufferInvalidArgumentError("b must in range [0..255]")
+ maxlen = self._limit - self._position
+ start = <char *>(self._buff + self._position)
+ zpos = <char *>(memchr(start, b, maxlen))
+ if zpos == NULL:
+ raise BufferUnderflowError()
+ else:
+ n = zpos - start
+ s = PyString_FromStringAndSize(start, n)
+ self._position = self._position + n + 1
+ return s
+
+ def read_line(self, int include_separator = 0):
+ """Reads a single line of bytes from the buffer where the end of the line is indicated by either 'LF' or 'CRLF'.
+ The line will be returned as a string not including the line-separator. Optionally *include_separator* can be specified
+ to make the method to also return the line-separator."""
+ cdef int n, maxlen
+ cdef char *zpos, *start
+ maxlen = self._limit - self._position
+ start = <char *>(self._buff + self._position)
+ zpos = <char *>(memchr(start, 10, maxlen))
+ if maxlen == 0:
+ raise BufferUnderflowError()
+ if zpos == NULL:
+ raise BufferUnderflowError()
+ n = zpos - start
+ if self._buff[self._position + n - 1] == 13: #\r\n
+ if include_separator:
+ s = PyString_FromStringAndSize(start, n + 1)
+ self._position = self._position + n + 1
+ else:
+ s = PyString_FromStringAndSize(start, n - 1)
+ self._position = self._position + n + 1
+ else: #\n
+ if include_separator:
+ s = PyString_FromStringAndSize(start, n + 1)
+ self._position = self._position + n + 1
+ else:
+ s = PyString_FromStringAndSize(start, n)
+ self._position = self._position + n + 1
+ return s
+
+ def write_bytes(self, s):
+ """Writes a number of bytes given by the python string s to the buffer and updates position. Raises
+ :exc:`BufferOverflowError` if you try to write beyond the current :attr:`limit`."""
+ cdef char *b
+ cdef Py_ssize_t n
+ PyString_AsStringAndSize(s, &b, &n)
+ if n > (self._limit - self._position):
+ raise BufferOverflowError()
+ else:
+ memcpy(self._buff + self._position, b, n)
+ self._position = self._position + n
+ return n
+
+ def write_buffer(self, Buffer other):
+ """writes available bytes from other buffer to this buffer"""
+ self.write_bytes(other.read_bytes(-1)) #TODO use copy
+
+ cdef int _write_byte(self, unsigned int b) except -1:
+ """writes a single byte to the buffer and updates position"""
+ if self._position + 1 <= self._limit:
+ self._buff[self._position] = b
+ self._position = self._position + 1
+ return 1
+ else:
+ raise BufferOverflowError()
+
+ def write_byte(self, unsigned int b):
+ """writes a single byte to the buffer and updates position"""
+ return self._write_byte(b)
+
+ def write_int(self, unsigned int i):
+ """writes a 32 bit integer to the buffer and updates position (little-endian)"""
+ if self._position + 4 <= self._limit:
+ self._buff[self._position + 0] = (i >> 0) & 0xFF
+ self._buff[self._position + 1] = (i >> 8) & 0xFF
+ self._buff[self._position + 2] = (i >> 16) & 0xFF
+ self._buff[self._position + 3] = (i >> 24) & 0xFF
+ self._position = self._position + 4
+ return 4
+ else:
+ raise BufferOverflowError()
+
+ def write_short(self, unsigned int i):
+ """writes a 16 bit integer to the buffer and updates position (little-endian)"""
+ if self._position + 2 <= self._limit:
+ self._buff[self._position + 0] = (i >> 0) & 0xFF
+ self._buff[self._position + 1] = (i >> 8) & 0xFF
+ self._position = self._position + 2
+ return 2
+ else:
+ raise BufferOverflowError()
+
+ def hex_dump(self, out = None):
+ highlight1 = "\033[34m"
+ highlight2 = "\033[32m"
+ default = "\033[0m"
+
+ if out is None: out = sys.stdout
+
+ import string
+
+ out.write('<concurrence.io.Buffer id=%x, position=%d, limit=%d, capacity=%d>\n' % (id(self), self.position, self.limit, self._capacity))
+ printable = set(string.printable)
+ whitespace = set(string.whitespace)
+ x = 0
+ s1 = []
+ s2 = []
+ while x < self._capacity:
+ v = self[x]
+ if x < self.position:
+ s1.append('%s%02x%s' % (highlight1, v, default))
+ elif x < self.limit:
+ s1.append('%s%02x%s' % (highlight2, v, default))
+ else:
+ s1.append('%02x' % v)
+ c = chr(v)
+ if c in printable and not c in whitespace:
+ s2.append(c)
+ else:
+ s2.append('.')
+ x += 1
+ if x % 16 == 0:
+ out.write('%04x' % (x - 16) + ' ' + ' '.join(s1[:8]) + ' ' + ' '.join(s1[8:]) + ' ' + ''.join(s2[:8]) + ' ' + (''.join(s2[8:]) + '\n'))
+ s1 = []
+ s2 = []
+ out.flush()
+
+ def __repr__(self):
+ import cStringIO
+ sio = cStringIO.StringIO()
+ self.hex_dump(sio)
+ return sio.getvalue()
+
+ def __str__(self):
+ return repr(self)
+
View
88 lib/geventmemcache/ketama.py
@@ -0,0 +1,88 @@
+# Copyright (C) 2009, Hyves (Startphone Ltd.)
+#
+# This module is part of the Concurrence Framework and is released under
+# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
+
+#this is a pretty straightforward pure python implementation of
+#libketama (a consistent hashing implementation used by many memcached clients)
+# http://www.last.fm/user/RJ/journal/2007/04/10/rz_libketama_-_a_consistent_hashing_algo_for_memcache_clients
+# svn://svn.audioscrobbler.net/misc/ketama/
+#a good explanation of constent hashing can be found here:
+# http://www.spiteful.com/2008/03/17/programmers-toolbox-part-3-consistent-hashing/
+#
+#the test server_list below was taken from the libketama distribution and 1 million keys were mapped with
+#both this implementation and the libketama to test this implementations compatibility
+
+import math
+import hashlib
+import bisect
+import unittest
+
+def key_to_digest(key):
+ return hashlib.md5(key).hexdigest()
+
+def point_from_hex(s):
+ return long(s[6:8] + s[4:6] + s[2:4] + s[0:2], 16)
+
+def hashi(key):
+ return point_from_hex(key_to_digest(key)[0:8])
+
+def get_server(key, continuum):
+ """maps given key to a server in the continuum"""
+ point = hashi(str(key))
+ i = bisect.bisect_right(continuum, (point, ()))
+ if i < len(continuum):
+ return continuum[i][1]
+ else:
+ return continuum[0][1]
+
+def build_continuum(servers):
+ """builds up the 'continuum' from the given list of servers.
+ each item in the list is a tuple ((ip_addr, port), weight).
+ where ip_addr is an ip-address as a string, e.g. '127.0.0.1'.
+ port is the ip-port as an integer and weight is the relative weight
+ of this server as an integer.
+ """
+ continuum = {}
+ memory = sum([s[1] for s in servers]) #total weight of servers (a.k.a. memory)
+ server_count = len(servers)
+ for server in servers:
+ pct = float(server[1]) / memory #pct of memory of this server
+ ks = int(math.floor(pct * 40.0 * server_count))
+ for k in range(ks):
+ # max 40 hashes, 4 numbers per hash = max 160 points per server */
+ ss = "%s:%s-%d" % (server[0][0], server[0][1], k)
+ digest = key_to_digest(ss)
+ for h in range(4):
+ point = point_from_hex(digest[h * 8: h * 8 + 8])
+ if not point in continuum:
+ continuum[point] = server[0]
+ else:
+ assert False, "point collission while building continuum"
+ return sorted(continuum.items())
+
+
+class TestKetama(unittest.TestCase):
+ test_servers = [(('10.0.1.1', 11211), 600),
+ (('10.0.1.2', 11211), 300),
+ (('10.0.1.3', 11211), 200),
+ (('10.0.1.4', 11211), 350),
+ (('10.0.1.5', 11211), 1000),
+ (('10.0.1.6', 11211), 800),
+ (('10.0.1.7', 11211), 950),
+ (('10.0.1.8', 11211), 100)]
+
+ def testKetama(self):
+
+ continuum = build_continuum(self.test_servers)
+
+ self.assertEquals((3769287096, ('10.0.1.7', 11211)), (hashi('12936'), get_server('12936', continuum)))
+ self.assertEquals((435768809, ('10.0.1.5', 11211)), (hashi('27804'), get_server('27804', continuum)))
+ self.assertEquals((1996655674, ('10.0.1.2', 11211)), (hashi('37045'), get_server('37045', continuum)))
+ self.assertEquals((2954822664, ('10.0.1.1', 11211)), (hashi('50829'), get_server('50829', continuum)))
+ self.assertEquals((1423001712, ('10.0.1.6', 11211)), (hashi('65422'), get_server('65422', continuum)))
+ self.assertEquals((3809055594, ('10.0.1.6', 11211)), (hashi('74912'), get_server('74912', continuum)))
+
+if __name__ == '__main__':
+ unittest.main()
+
View
162 lib/geventmemcache/protocol.py
@@ -0,0 +1,162 @@
+from geventmemcache import MemcacheError, MemcacheResult
+from geventmemcache.codec import MemcacheCodec
+
+class MemcacheProtocol(object):
+ @classmethod
+ def create(cls, type_):
+
+ if isinstance(type_, MemcacheProtocol):
+ return type_
+ elif type_ == 'text':
+ return MemcacheTextProtocol()
+ else:
+ raise MemcacheError("unknown protocol: %s" % type_)
+
+class MemcacheTextProtocol(MemcacheProtocol):
+ def __init__(self, codec = "default"):
+ self.set_codec(codec)
+
+ def write_stats(self, writer):
+ writer.write_bytes("stats\r\n")
+
+ def read_stats(self, reader):
+ response_line = reader.read_line()
+
+ result = {}
+ while True:
+ response_line = reader.read_line()
+ if response_line.startswith('STAT'):
+ response_fields = response_line.split(' ')
+ key = response_fields[1]
+ value = response_fields[2]
+ result[key] = value
+ elif response_line == 'END':
+ return MemcacheResult.OK, result
+ else:
+ return MemcacheResult.get(response_line), {}
+
+ return MemcacheResult.OK, response_line
+
+# if response_line.startswith('VERSION'):
+# return MemcacheResult.OK, response_line[8:].strip()
+# else:
+# return MemcacheResult.get(response_line), None
+
+ def set_codec(self, codec):
+ self._codec = MemcacheCodec.create(codec)
+
+ def _read_result(self, reader, value = None):
+ response_line = reader.read_line()
+ return MemcacheResult.get(response_line), value
+
+ def write_version(self, writer):
+ writer.write_bytes("version\r\n")
+
+ def read_version(self, reader):
+ response_line = reader.read_line()
+ if response_line.startswith('VERSION'):
+ return MemcacheResult.OK, response_line[8:].strip()
+ else:
+ return MemcacheResult.get(response_line), None
+
+ def _write_storage(self, writer, cmd, key, value, expiration, flags, cas_unique = None):
+ encoded_value, flags = self._codec.encode(value, flags)
+ if cas_unique is not None:
+ writer.write_bytes("%s %s %d %d %d %d\r\n%s\r\n" % (cmd, key, flags, expiration, len(encoded_value), cas_unique, encoded_value))
+ else:
+ writer.write_bytes("%s %s %d %d %d\r\n%s\r\n" % (cmd, key, flags, expiration, len(encoded_value), encoded_value))
+
+ def write_cas(self, writer, key, value, expiration, flags, cas_unique):
+ self._write_storage(writer, "cas", key, value, expiration, flags, cas_unique)
+
+ def read_cas(self, reader):
+ return self._read_result(reader)
+
+ def _write_incdec(self, writer, cmd, key, value):
+ writer.write_bytes("%s %s %s\r\n" % (cmd, key, value))
+
+ def _read_incdec(self, reader):
+ response_line = reader.read_line()
+ try:
+ return MemcacheResult.OK, int(response_line)
+ except ValueError:
+ return MemcacheResult.get(response_line), None
+
+ def write_incr(self, writer, key, value):
+ self._write_incdec(writer, "incr", key, value)
+
+ def read_incr(self, reader):
+ return self._read_incdec(reader)
+
+ def write_decr(self, writer, key, value):
+ self._write_incdec(writer, "decr", key, value)
+
+ def read_decr(self, reader):
+ return self._read_incdec(reader)
+
+ def write_get(self, writer, keys):
+ writer.write_bytes("get %s\r\n" % " ".join(keys))
+
+ def write_gets(self, writer, keys):
+ writer.write_bytes("gets %s\r\n" % " ".join(keys))
+
+ def read_get(self, reader, with_cas_unique = False):
+ result = {}
+ while True:
+ response_line = reader.read_line()
+ if response_line.startswith('VALUE'):
+ response_fields = response_line.split(' ')
+ key = response_fields[1]
+ flags = int(response_fields[2])
+ n = int(response_fields[3])
+ if with_cas_unique:
+ cas_unique = int(response_fields[4])
+ encoded_value = reader.read_bytes(n)
+ reader.read_line() #\r\n
+ if with_cas_unique:
+ result[key] = (self._codec.decode(flags, encoded_value), cas_unique)
+ else:
+ result[key] = self._codec.decode(flags, encoded_value)
+ elif response_line == 'END':
+ return MemcacheResult.OK, result
+ else:
+ return MemcacheResult.get(response_line), {}
+
+ def read_gets(self, reader):
+ return self.read_get(reader, with_cas_unique = True)
+
+ def write_delete(self, writer, key, expiration):
+ writer.write_bytes("delete %s %d\r\n" % (key, expiration))
+
+ def read_delete(self, reader):
+ return self._read_result(reader)
+
+ def write_set(self, writer, key, value, expiration, flags):
+ return self._write_storage(writer, "set", key, value, expiration, flags)
+
+ def read_set(self, reader):
+ return self._read_result(reader)
+
+ def write_add(self, writer, key, value, expiration, flags):
+ return self._write_storage(writer, "add", key, value, expiration, flags)
+
+ def read_add(self, reader):
+ return self._read_result(reader)
+
+ def write_replace(self, writer, key, value, expiration, flags):
+ return self._write_storage(writer, "replace", key, value, expiration, flags)
+
+ def read_replace(self, reader):
+ return self._read_result(reader)
+
+ def write_append(self, writer, key, value, expiration, flags):
+ return self._write_storage(writer, "append", key, value, expiration, flags)
+
+ def read_append(self, reader):
+ return self._read_result(reader)
+
+ def write_prepend(self, writer, key, value, expiration, flags):
+ return self._write_storage(writer, "prepend", key, value, expiration, flags)
+
+ def read_prepend(self, reader):
+ return self._read_result(reader)
View
16 setup.py
@@ -0,0 +1,16 @@
+from distutils.core import setup
+from distutils.extension import Extension
+from Cython.Distutils import build_ext
+
+VERSION = '0.0.1'
+
+setup(
+ name = "gevent-memcache",
+ version = VERSION,
+ license = "New BSD",
+ description = "A gevent (http://www.gevent.org) adaption of the asynchronous Memcache from the Concurrence framework (http://opensource.hyves.org/concurrence)",
+ cmdclass = {"build_ext": build_ext},
+ package_dir = {'':'lib'},
+ packages = ['geventmemcache'],
+ ext_modules = [Extension("geventmemcache.common", ["lib/geventmemcache/geventmemcache.common.pyx"])]
+)
Please sign in to comment.
Something went wrong with that request. Please try again.