diff --git a/.gitignore b/.gitignore index 8151295..dfb794e 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ simpleleveldb/leveldb_to_csv simpleleveldb/csv_to_leveldb test_output .sw[op] +*.egg-info/ diff --git a/pubsub/Makefile b/pubsub/Makefile index 128cb52..2bc29d1 100644 --- a/pubsub/Makefile +++ b/pubsub/Makefile @@ -5,7 +5,7 @@ LIBSIMPLEHTTP_INC ?= $(LIBSIMPLEHTTP)/.. LIBSIMPLEHTTP_LIB ?= $(LIBSIMPLEHTTP) CFLAGS = -I. -I$(LIBSIMPLEHTTP_INC) -I$(LIBEVENT)/include -O2 -g -LIBS = -L. -L$(LIBSIMPLEHTTP_LIB) -L$(LIBEVENT)/lib -levent -lsimplehttp -lpcre -lm -lcrypto +LIBS = -L. -L$(LIBSIMPLEHTTP_LIB) -L$(LIBEVENT)/lib -levent -lsimplehttp -lm -lcrypto pubsub: pubsub.c $(CC) $(CFLAGS) -o $@ $< $(LIBS) diff --git a/pysimplehttp/.gitignore b/pysimplehttp/.gitignore new file mode 100644 index 0000000..25aacff --- /dev/null +++ b/pysimplehttp/.gitignore @@ -0,0 +1,3 @@ +build/ +dist/ +*.egg-info/ diff --git a/pysimplehttp/scripts/file_to_sq.py b/pysimplehttp/scripts/file_to_sq.py index 9df5d82..15a8ac2 100644 --- a/pysimplehttp/scripts/file_to_sq.py +++ b/pysimplehttp/scripts/file_to_sq.py @@ -34,4 +34,4 @@ ) file_to_sq.start() - \ No newline at end of file + diff --git a/pysimplehttp/src/BackoffTimer.py b/pysimplehttp/src/BackoffTimer.py new file mode 100644 index 0000000..c3bb0cd --- /dev/null +++ b/pysimplehttp/src/BackoffTimer.py @@ -0,0 +1,72 @@ +from decimal import Decimal + + +def _Decimal(v): + if not isinstance(v, Decimal): + return Decimal(str(v)) + return v + + +class BackoffTimer(object): + """ + This is a timer that is smart about backing off exponentially when there are problems + """ + def __init__(self, min_interval, max_interval, ratio=.25, short_length=10, long_length=250): + assert isinstance(min_interval, (int, float, Decimal)) + assert isinstance(max_interval, (int, float, Decimal)) + + self.min_interval = _Decimal(min_interval) + self.max_interval = _Decimal(max_interval) + + self.max_short_timer = (self.max_interval - self.min_interval) * _Decimal(ratio) + self.max_long_timer = (self.max_interval - self.min_interval) * (1 - _Decimal(ratio)) + self.short_unit = self.max_short_timer / _Decimal(short_length) + self.long_unit = self.max_long_timer / _Decimal(long_length) + # print self.short_unit, self.long_unit + # logging.info('short unit %s' % self.short_unit) + # logging.info('long unit %s' % self.long_unit) + + self.short_interval = Decimal(0) + self.long_interval = Decimal(0) + + def success(self): + """Update the timer to reflect a successfull call""" + self.short_interval -= self.short_unit + self.long_interval -= self.long_unit + self.short_interval = max(self.short_interval, Decimal(0)) + self.long_interval = max(self.long_interval, Decimal(0)) + + def failure(self): + """Update the timer to reflect a failed call""" + self.short_interval += self.short_unit + self.long_interval += self.long_unit + self.short_interval = min(self.short_interval, self.max_short_timer) + self.long_interval = min(self.long_interval, self.max_long_timer) + + def get_interval(self): + return float(self.min_interval + self.short_interval + self.long_interval) + + +def test_timer(): + timer = BackoffTimer(.1, 120, long_length=1000) + assert timer.get_interval() == .1 + timer.success() + assert timer.get_interval() == .1 + timer.failure() + interval = '%0.2f' % timer.get_interval() + assert interval == '3.19' + assert timer.min_interval == Decimal('.1') + assert timer.short_interval == Decimal('2.9975') + assert timer.long_interval == Decimal('0.089925') + + timer.failure() + interval = '%0.2f' % timer.get_interval() + assert interval == '6.27' + timer.success() + interval = '%0.2f' % timer.get_interval() + assert interval == '3.19' + for i in range(25): + timer.failure() + interval = '%0.2f' % timer.get_interval() + assert interval == '32.41' + diff --git a/pysimplehttp/src/BaseReader.py b/pysimplehttp/src/BaseReader.py new file mode 100644 index 0000000..0e6a8fb --- /dev/null +++ b/pysimplehttp/src/BaseReader.py @@ -0,0 +1,290 @@ +""" +Simplequeue base reader class. + +This does a /get on a simplequeue and calls task methods to process that message + +It handles the logic for backing off on retries and giving up on a message + +""" +import datetime +import sys +import logging +import os +import tempfile +import time +import copy +import tornado.options +import signal + +import http +import BackoffTimer +from host_pool import HostPool + +try: + import ujson as json +except ImportError: + import json + +tornado.options.define('heartbeat_file', type=str, default=None, help="path to a file to touch for heartbeats every 10 seconds") + + +class RequeueWithoutBackoff(Exception): + """exception for requeueing a message without incrementing backoff""" + pass + + +class BaseReader(object): + def __init__(self, simplequeue_address, all_tasks, max_tries=5, sleeptime_failed_queue=5, + sleeptime_queue_empty=0.5, sleeptime_requeue=1, requeue_delay=90, mget_items=0, + failed_count=0, queuename=None, preprocess_method=None, validate_method=None, + requeue_giveup=None, failed_message_dir=None): + """ + BaseReader provides a queue that calls each task provided by ``all_tasks`` up to ``max_tries`` + requeueing on any failures with increasing multiples of ``requeue_delay`` between subsequent + tries of each message. + + ``preprocess_method`` defines an optional method that can alter the message data before + other task functions are called. + ``validate_method`` defines an optional method that returns a boolean as to weather or not + this message should be processed. + ``all_tasks`` defines the a mapping of tasks and functions that individually will be called + with the message data. + ``requeue_giveup`` defines a callback for when a message has been called ``max_tries`` times + ``failed_message_dir`` defines a directory where failed messages should be written to + """ + assert isinstance(all_tasks, dict) + for key, method in all_tasks.items(): + assert callable(method), "key %s must have a callable value" % key + if preprocess_method: + assert callable(preprocess_method) + if validate_method: + assert callable(validate_method) + assert isinstance(queuename, (str, unicode)) + assert isinstance(mget_items, int) + + if not isinstance(simplequeue_address, HostPool): + if isinstance(simplequeue_address, (str, unicode)): + simplequeue_address = [simplequeue_address] + assert isinstance(simplequeue_address, (list, set, tuple)) + simplequeue_address = HostPool(simplequeue_address) + + self.simplequeue_address = simplequeue_address + self.max_tries = max_tries + self.requeue_giveup = requeue_giveup + self.mget_items = mget_items + self.sleeptime_failed_queue = sleeptime_failed_queue + self.sleeptime_queue_empty = sleeptime_queue_empty + self.sleeptime_requeue = sleeptime_requeue + self.requeue_delay = requeue_delay # seconds + ## max delay time is requeue_delay * (max_tries + max_tries-1 + max_tries-2 ... 1) + self.failed_message_dir = failed_message_dir or tempfile.mkdtemp() + assert os.access(self.failed_message_dir, os.W_OK) + self.failed_count = failed_count + self.queuename = queuename + self.task_lookup = all_tasks + self.preprocess_method = preprocess_method + self.validate_method = validate_method + self.backoff_timer = dict((k, BackoffTimer.BackoffTimer(0, 120)) for k in self.task_lookup.keys()) + # a default backoff timer + self.backoff_timer['__preprocess'] = BackoffTimer.BackoffTimer(0, 120) + self.quit_flag = False + + def callback(self, queue_message): + # copy the dictionary, dont reference + message = copy.copy(queue_message.get('data', {})) + + try: + if self.preprocess_method: + message = self.preprocess_method(message) + + if self.validate_method and not self.validate_method(message): + self.backoff_timer['__preprocess'].success() + return + except: + logging.exception('caught exception while preprocessing') + self.backoff_timer['__preprocess'].failure() + return self.requeue(queue_message) + + self.backoff_timer['__preprocess'].success() + + for task in copy.copy(queue_message['tasks_left']): + method = self.task_lookup[task] + try: + if method(message): + queue_message['tasks_left'].remove(task) + self.backoff_timer[task].success() + else: + self.backoff_timer[task].failure() + except RequeueWithoutBackoff: + logging.info('RequeueWithoutBackoff') + except: + logging.exception('caught exception while handling %s' % task) + self.backoff_timer[task].failure() + + if queue_message['tasks_left']: + self.requeue(queue_message) + + def simplequeue_get(self): + try: + simplequeue_addr = self.simplequeue_address.get() + if self.mget_items: + msg = http.http_fetch(simplequeue_addr + '/mget?items=' + str(self.mget_items)) + else: + msg = http.http_fetch(simplequeue_addr + '/get') + self.simplequeue_address.success(simplequeue_addr) + return msg + except: + self.simplequeue_address.failed(simplequeue_addr) + raise + + def simplequeue_put(self, data): + try: + simplequeue_addr = self.simplequeue_address.get() + http.http_fetch(simplequeue_addr + '/put', dict(data=data)) + self.simplequeue_address.success(simplequeue_addr) + except: + self.simplequeue_address.failed(simplequeue_addr) + raise + + def dump(self, message): + self.failed_count += 1 + path = os.path.join(self.failed_message_dir, self.queuename) + if not os.path.exists(path): + os.makedirs(path) + date_str = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S") + filename = "%s.%d.json" % (date_str, self.failed_count) + f = open(os.path.join(path, filename), 'wb') + if isinstance(message, (str, unicode)): + f.write(message) + else: + f.write(json.dumps(message)) + f.close() + + def requeue(self, message, delay=True, requeue_delay=None): + """ + requeue this message incrementing the retry_on, and incrementing the tries when delay=True + if delay=False, just put it back in the queue as it's not time to run it yet + """ + if message['tries'] > self.max_tries: + logging.warning('giving up on message after max tries %s' % str(message)) + try: + if self.requeue_giveup != None: + self.requeue_giveup(message) + except Exception, e: + logging.exception("Could not call requeue_giveup callback: %s"%e) + self.dump(message) + return + + if delay: + ## delay the next try + if requeue_delay is None: + requeue_delay = self.requeue_delay * message['tries'] + message['retry_on'] = time.time() + requeue_delay + + if message['retry_on']: + next_try_in = message['retry_on'] - time.time() + else: + next_try_in = 0 + + try: + self.simplequeue_put(json.dumps(message)) + logging.info('requeue(%s) next try in %d secs' % (str(message), next_try_in)) + except: + logging.exception('requeue(%s) failed' % str(message)) + time.sleep(self.sleeptime_requeue) + + def handle_message(self, message_bytes): + try: + message = json.loads(message_bytes) + except: + logging.warning('invalid data: %s' % str(message_bytes)) + self.dump(message_bytes) + return + + if not message.get('data'): + # wrap in the reader params + message = { + 'data': message, + 'tries': 0, + 'retry_on': None, + 'started': int(time.time()) + } + + # add tasks_left so it's possible for someone else to add a queue entry + # with the metadata wrapper (ie: to queue for replay later) but without + # knowledge of the tasks that need to happen in *this* queue reader + if 'tasks_left' not in message: + message['tasks_left'] = self.task_lookup.keys() + + # should we wait for this? + retry_on = message.get('retry_on') + if retry_on and retry_on > int(time.time()): + self.requeue(message, delay=False) + return + + message['tries'] = message['tries'] + 1 + logging.info('handling %s' % str(message)) + self.callback(message) + self.end_processing_sleep() + + def update_heartbeat(self): + heartbeat_file = tornado.options.options.heartbeat_file + if not heartbeat_file: + return + now = time.time() + heartbeat_update_interval = 10 + # update the heartbeat file every N seconds + if not hasattr(self, '_last_heartbeat_update'): + self._last_heartbeat_update = now - heartbeat_update_interval - 1 + + if self._last_heartbeat_update < now - heartbeat_update_interval: + self._last_heartbeat_update = now + open(heartbeat_file, 'a').close() + os.utime(heartbeat_file, None) + + def end_processing_sleep(self): + interval = max(bt.get_interval() for i, bt in self.backoff_timer.iteritems()) + if interval > 0: + logging.info('backing off for %0.2f seconds' % interval) + time.sleep(interval) + + def handle_term_signal(self, sig_num, frame): + logging.info('TERM Signal handler called with signal %r.' % sig_num) + if self.quit_flag: + # if we call the term signal twice, just exit immediately + logging.warning('already wating for exit flag, so aborting') + sys.exit(1) + self.quit_flag = True + + def run(self): + signal.signal(signal.SIGTERM, self.handle_term_signal) + logging.info("starting %s reader..." % self.queuename) + while not self.quit_flag: + try: + self.update_heartbeat() + except: + logging.exception('failed touching heartbeat file') + + try: + message_bytes = self.simplequeue_get() + except: + logging.exception('queue.get() failed') + time.sleep(self.sleeptime_failed_queue) + continue + + if not message_bytes: + time.sleep(self.sleeptime_queue_empty) + continue + + if self.mget_items: + messages = message_bytes.splitlines() + else: + messages = [message_bytes] + + for message in messages: + try: + self.handle_message(message) + except: + logging.exception('failed to handle_message() %r' % message) + return + diff --git a/pysimplehttp/src/__init__.py b/pysimplehttp/src/__init__.py index d1f2e39..1ff2e6d 100644 --- a/pysimplehttp/src/__init__.py +++ b/pysimplehttp/src/__init__.py @@ -1 +1,8 @@ -__version__ = "0.1.1" \ No newline at end of file +__version__ = "0.2.0" + +import file_to_simplequeue +import pubsub_reader +import BackoffTimer +import BaseReader +import http +import formatters diff --git a/pysimplehttp/src/formatters.py b/pysimplehttp/src/formatters.py new file mode 100644 index 0000000..52ccc0c --- /dev/null +++ b/pysimplehttp/src/formatters.py @@ -0,0 +1,97 @@ +import re +import binascii +import calendar + + +def _crc(key): + """crc32 hash a string""" + return binascii.crc32(_utf8(key)) & 0xffffffff + + +def _b32(number): + """convert positive integer to a base32 string""" + assert isinstance(number, (int, long)) + alphabet = '0123456789abcdefghijklmnopqrstuv' + alphabet_len = 32 + + if number == 0: + return alphabet[0] + + base32 = '' + + sign = '' + if number < 0: + sign = '-' + number = -number + + while number != 0: + number, i = divmod(number, alphabet_len) + base32 = alphabet[i] + base32 + + return sign + base32 + + +def _idn(domain): + """idn encode a domain name""" + if not domain: + return domain + if 'xn--' in domain: + return domain.decode('idna') + return _unicode(domain) + + +def _punycode(domain): + """idna encode (punycode) a domain name""" + if not domain: + return domain + domain = _unicode(domain) + if re.findall(r'[^-_a-zA-Z0-9\.]', domain): + return domain.encode('idna') + return _utf8(domain) + + +def _unicode(value): + """decode a utf-8 string as unicode""" + if isinstance(value, str): + return value.decode("utf-8") + assert isinstance(value, unicode) + return value + + +def _utf8(s): + """encode a unicode string as utf-8""" + if isinstance(s, unicode): + return s.encode("utf-8") + assert isinstance(s, str) + return s + + +def _utc_ts(dt): + """convert a datetime object into a UNIX epoch timestamp""" + return calendar.timegm(dt.utctimetuple()) + + +def _utf8_params(params): + """encode a dictionary of URL parameters (including iterables) as utf-8""" + isinstance(params, dict) + encoded_params = [] + for k, v in params.items(): + if isinstance(v, (list, tuple)): + v = [_utf8(x) for x in v] + else: + v = _utf8(v) + encoded_params.append((k, v)) + return dict(encoded_params) + + +class _O(dict): + """Makes a dictionary behave like an object.""" + def __getattr__(self, name): + try: + return self[name] + except KeyError: + # raise AttributeError(name) + return None + + def __setattr__(self, name, value): + self[name] = value diff --git a/pysimplehttp/src/http.py b/pysimplehttp/src/http.py new file mode 100644 index 0000000..3966332 --- /dev/null +++ b/pysimplehttp/src/http.py @@ -0,0 +1,66 @@ +import tornado.httpclient +import urllib + +from formatters import _utf8, _utf8_params + +try: + import ujson as json +except ImportError: + import json + +_HTTPCLIENT = None +def get_http_client(): + global _HTTPCLIENT + if _HTTPCLIENT is None: + _HTTPCLIENT = tornado.httpclient.HTTPClient() + return _HTTPCLIENT + +def http_fetch(url, params={}, headers=None, method='GET', body=None, timeout=5.0, client_options=None, connect_timeout=None, request_timeout=None): + headers = headers or {} + client_options = client_options or {} + + for key, value in params.items(): + if isinstance(value, (int, long, float)): + params[key] = str(value) + if value is None: + del params[key] + + params = _utf8_params(params) + if method in ['GET', 'HEAD'] and params: + url += '?' + urllib.urlencode(params, doseq=1) + body = None + elif params: + body = urllib.urlencode(params, doseq=1) + headers['Content-type'] = 'application/x-www-form-urlencoded' + if body: + body = _utf8(body) + if 'follow_redirects' not in client_options: + client_options['follow_redirects'] = False + req = tornado.httpclient.HTTPRequest(url=_utf8(url), \ + method=method, + body=body, + headers=headers, + connect_timeout=connect_timeout or timeout, + request_timeout=request_timeout or timeout, + validate_cert=False, + **client_options) + + # sync client raises errors on non-200 responses + http = get_http_client() + response = http.fetch(req) + if method == 'HEAD': + return True + return response.body + + +def pubsub_write(endpoint, data): + if isinstance(data, dict): + data = json.dumps(data) + return http_fetch(endpoint + '/pub', body=data, method='POST', timeout=1.5) + +def simplequeue_write(endpoint, data): + assert isinstance(data, dict) + data = json.dumps(data) + result = http_fetch(endpoint + '/put', dict(data=data)) + # simplequeue success is a 200 response w/ an empty response body + return result == '' diff --git a/setup.py b/setup.py index 2a3c2de..2d75fa0 100644 --- a/setup.py +++ b/setup.py @@ -1,9 +1,4 @@ -#!/usr/bin/env python - -from distutils.core import setup - -scripts = ['pysimplehttp/scripts/ps_to_sq.py', - 'pysimplehttp/scripts/file_to_sq.py'] +from setuptools import setup # release steps # --------------- @@ -13,21 +8,32 @@ # upload .tar.gz to github # run python setup.py register to update pypi -version = "0.1.1" -setup(name='pysimplehttp', - version=version, - description='Python libraries for simplehttp', - author='Jehiah Czebotar', - author_email='jehiah@gmail.com', - url='https://github.com/bitly/simplehttp', - classifiers=[ - 'Intended Audience :: Developers', - 'Programming Language :: Python', - ], - download_url="http://github.com/downloads/bitly/simplehttp/pysimplehttp-%s.tar.gz" % version, - scripts = scripts, - packages=['pysimplehttp'], - package_dir = {'pysimplehttp' : 'pysimplehttp/src'}, - install_requires=['tornado'], - requires=['tornado'], - ) \ No newline at end of file +__version__ = "0.2.0" +scripts = ['pysimplehttp/scripts/ps_to_sq.py', + 'pysimplehttp/scripts/file_to_sq.py'] + +setup( + name='pysimplehttp', + version=__version__, + author='Jehiah Czebotar', + author_email='jehiah@gmail.com', + description='Python libraries for simplehttp', + url='https://github.com/bitly/simplehttp', + classifiers=[ + 'Intended Audience :: Developers', + 'Programming Language :: Python', + ], + download_url="http://github.com/downloads/bitly/simplehttp/pysimplehttp-%s.tar.gz" %__version__, + + packages=['pysimplehttp'], + package_dir = {'pysimplehttp' : 'pysimplehttp/src'}, + + scripts = scripts, + install_requires = [ + 'tornado', + ], + requires = [ + 'ujson', + 'host_pool', + ], +)