Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Bringing in the more efficient memcached based deduping backend.

  • Loading branch information...
commit cf6f354bff0fd25bce28fcabcb6bd69343f6fe4d 1 parent d4ddd1a
@gtaylor authored
View
45 bin/emdr-relay
@@ -23,18 +23,32 @@ parser.add_argument(
help="Override default relay sender bindings. This determines how "
"other relays or developer applications can connect to this relay.")
parser.add_argument(
- '--dedupe-buffer', action='store', dest='dedupe_buffer',
- help="Overrides default message de-duplication buffer size "
- "(default: %s messages)" % settings.RELAY_DEDUPE_BUFFER)
-parser.add_argument(
- '--enable-decompress', action='store_true', dest='enable_decompress',
- help="Decompress all outbound messages (default: %s)" % (
- 'Disabled' if settings.RELAY_DECOMPRESS_MESSAGES else 'Enabled',
- ))
-parser.add_argument(
'--loglevel', action='store', dest='loglevel', default='INFO',
help="Overrides default logger level (DEBUG, INFO, WARNING, ERROR) "
"(default: %s)" % settings.LOGGING['loggers']['']['level'])
+parser.add_argument(
+ '--enable-decompress', action='store_true', dest='enable_decompress',
+ help="Decompress all outbound messages (default: %s)" % (
+ 'Disabled' if settings.RELAY_DECOMPRESS_MESSAGES else 'Enabled',
+ ))
+
+dedupe_group = parser.add_argument_group(
+ 'Message Deduplication',
+ "Settings for message de-duplication."
+)
+dedupe_group.add_argument(
+ '--dedupe-backend', action='store', dest='dedupe_backend',
+ help="The dedupe backend to use. One of: deque, memcached "
+ "(default: %s)" % settings.RELAY_DEDUPE_BACKEND)
+dedupe_group.add_argument(
+ '--dedupe-conn', action='store', dest='dedupe_conn',
+ help="Backend-specific connection string, if needed "
+ "(default: %s)" % settings.RELAY_DEDUPE_BACKEND_CONN[0])
+dedupe_group.add_argument(
+ '--dedupe-store-time', action='store', dest='dedupe_store_time',
+ help="For backends that store message hashes, this determines "
+ "how long to do so (in seconds) "
+ "(default: %s)" % settings.RELAY_DEDUPE_STORE_TIME)
parsed = parser.parse_args()
@@ -44,8 +58,10 @@ if parsed.listeners:
settings.RELAY_RECEIVER_BINDINGS = parsed.listeners
if parsed.senders:
settings.RELAY_SENDER_BINDINGS = parsed.senders
-if parsed.dedupe_buffer:
- settings.RELAY_DEDUPE_BUFFER = int(parsed.dedupe_buffer)
+if parsed.dedupe_backend:
+ settings.RELAY_DEDUPE_BACKEND = parsed.dedupe_backend
+if parsed.dedupe_store_time:
+ settings.RELAY_DEDUPE_STORE_TIME = int(parsed.dedupe_store_time)
if parsed.enable_decompress:
settings.RELAY_DECOMPRESS_MESSAGES = True
@@ -60,10 +76,9 @@ print("* Accepting SUB connections on:")
for binding in settings.RELAY_SENDER_BINDINGS:
print(" - %s" % binding)
-if settings.RELAY_DEDUPE_BUFFER:
- print("* De-dupe buffer max size: %s" % settings.RELAY_DEDUPE_BUFFER)
-else:
- print("* De-duping disabled.")
+print("* De-dupe backend: %s" % settings.RELAY_DEDUPE_BACKEND)
+print("* De-dupe backend connection: %s" % settings.RELAY_DEDUPE_BACKEND_CONN)
+print("* De-dupe backend hash store time: %s" % settings.RELAY_DEDUPE_STORE_TIME)
if settings.RELAY_DECOMPRESS_MESSAGES:
print("* Outbound message de-compression enabled.")
View
11 emdr/conf/default_settings.py
@@ -39,9 +39,14 @@
RELAY_SENDER_BINDINGS = ["ipc:///tmp/relay-sender.sock"]
# If True, outbound messages to subscribers are decompressed.
RELAY_DECOMPRESS_MESSAGES = False
-# How big of a simple de-duping buffer to keep. If 0, de-duping is disabled.
-# The higher this is set, the more CPU and RAM that gets used in de-duping.
-RELAY_DEDUPE_BUFFER = 1000
+# Default to memcached, as it's fast.
+RELAY_DEDUPE_BACKEND = "memcached"
+# For dedupe backends that require a connection string of some sort, store it
+# here. We'll default to localhost for now. Use a list of strings.
+RELAY_DEDUPE_BACKEND_CONN = ["127.0.0.1"]
+# For timeout based backends, this determines how long (in seconds) we store
+# the message hashes.
+RELAY_DEDUPE_STORE_TIME = 300
#
## Logging Settings
View
17 emdr/daemons/relay/dedupers/__init__.py
@@ -0,0 +1,17 @@
+"""
+This module contains a few simple message de-duplication backends. These are
+used to de-dupe messages, since relays will be connected to at least two
+upstream announcers/relays.
+"""
+from emdr.conf import default_settings as settings
+
+if settings.RELAY_DEDUPE_BACKEND == 'memcached':
+ # Memcached backend. This is currently the fastest.
+ #noinspection PyUnresolvedReferences
+ from emdr.daemons.relay.dedupers.memcached import is_message_duped
+elif settings.RELAY_DEDUPE_BACKEND == 'deque':
+ # Default to the included deque.
+ #noinspection PyUnresolvedReferences
+ from emdr.daemons.relay.dedupers.py_deque import is_message_duped
+else:
+ raise Exception("Unknown deduplication backend.")
View
35 emdr/daemons/relay/dedupers/memcached.py
@@ -0,0 +1,35 @@
+"""
+A memcached-backed deduper. This is much more efficient than the deque backend,
+and should be used in production.
+"""
+from collections import deque
+import pylibmc
+from emdr.conf import default_settings as settings
+from emdr.daemons.relay.dedupers.util import calc_hash_for_message
+
+# The connection to memcached.
+MC_CLIENT = pylibmc.Client(
+ settings.RELAY_DEDUPE_BACKEND_CONN,
+ binary=True,
+)
+
+def is_message_duped(message):
+ """
+ Given a raw EMDR message string, determine whether we have already recently
+ seen the same exact message.
+
+ :rtype: bool
+ :returns: ``True`` if this message is a duplicate, ``False`` if not.
+ """
+ global MC_CLIENT
+
+ # Generate a hash for the incoming message.
+ message_hash = str(calc_hash_for_message(message))
+ # Look at our queue of hashes to figure out if we've seen this
+ # message yet.
+ was_already_seen = MC_CLIENT.get(message_hash) is not None
+ # We always push the message on to the queue, even if it ends up being
+ # a dupe, since it "refreshes" the hash.
+ MC_CLIENT.set(message_hash, 1, time=settings.RELAY_DEDUPE_STORE_TIME)
+
+ return was_already_seen
View
33 emdr/daemons/relay/dedupers/py_deque.py
@@ -0,0 +1,33 @@
+"""
+A simple, inefficient de-duper using Python's included deque data structure.
+Seek time is pretty high, so this is probably only best for developers.
+"""
+from collections import deque
+from emdr.daemons.relay.dedupers.util import calc_hash_for_message
+
+# A simple Python deque. See the docs for details on how this works:
+# http://docs.python.org/library/collections.html#collections.deque
+# We hardcode this, because it's mostly meant for testing when memcached
+# isn't available.
+HASH_DEQUE = deque(maxlen=500)
+
+def is_message_duped(message):
+ """
+ Given a raw EMDR message string, determine whether we have already recently
+ seen the same exact message.
+
+ :rtype: bool
+ :returns: ``True`` if this message is a duplicate, ``False`` if not.
+ """
+ global HASH_DEQUE
+
+ # Generate a hash for the incoming message.
+ message_hash = calc_hash_for_message(message)
+ # Look at our queue of hashes to figure out if we've seen this
+ # message yet.
+ was_already_seen = message_hash in HASH_DEQUE
+ # We always push the message on to the queue, even if it ends up being
+ # a dupe, since it "refreshes" the hash.
+ HASH_DEQUE.append(message_hash)
+
+ return was_already_seen
View
14 emdr/daemons/relay/dedupers/util.py
@@ -0,0 +1,14 @@
+"""
+Various utility functions that are common to all or many dedupers.
+"""
+
+def calc_hash_for_message(message):
+ """
+ Given an EMDR message string, calculate the hash.
+
+ :param basestring message: A compressed or uncompressed EMDR message string.
+ :rtype: str
+ :returns: The hash to use for deduping.
+ """
+ # Use Python's naive 32bit integer hashing for now. It's fast and simple.
+ return hash(message)
View
16 emdr/daemons/relay/main.py
@@ -4,6 +4,7 @@
"""
# Logging has to be configured first before we do anything.
import logging
+
logger = logging.getLogger(__name__)
import zlib
from collections import deque
@@ -11,6 +12,7 @@
import gevent
import zmq.green as zmq
from emdr.conf import default_settings as settings
+from emdr.daemons.relay.dedupers import is_message_duped
def run():
"""
@@ -30,9 +32,6 @@ def run():
# End users, or other relays, may attach here.
sender.bind(binding)
- # Use Python's builtin deque to store a list of hashes for incoming messages.
- hash_queue = deque(maxlen=settings.RELAY_DEDUPE_BUFFER)
-
def relay_worker(message):
"""
This is the worker function that re-sends the incoming messages out
@@ -40,16 +39,7 @@ def relay_worker(message):
:param str message: A JSON string to re-broadcast.
"""
- # Generate a hash for the incoming message.
- message_hash = hash(message)
- # Look at our queue of hashes to figure out if we've seen this
- # message yet.
- was_already_seen = message_hash in hash_queue
- # We always push the message on to the queue, even if it ends up being
- # a dupe, since it "refreshes" the hash.
- hash_queue.append(message_hash)
-
- if settings.RELAY_DEDUPE_BUFFER and was_already_seen:
+ if is_message_duped(message):
# We've already seen this message recently. Discard it.
return
View
9 requirements.txt
@@ -1,9 +1,12 @@
bottle
ujson
gevent
-nose
requests
python-dateutil<2.0
-sphinx
git+http://github.com/zeromq/pyzmq.git
-pytz
+pytz
+# Only required for developers
+nose
+sphinx
+# Only required for relays
+pylibmc
Please sign in to comment.
Something went wrong with that request. Please try again.