Skip to content

Commit

Permalink
Major webhook rework. (#1780)
Browse files Browse the repository at this point in the history
* Async webhooks & proper session/pooling.

* Added pool connections & maxsize when overwriting mounts.

* Don't forget to return the session...

* Performance changes.

* Reduced threads to min. one, we can use Locks now.

* Fix logic.
  • Loading branch information
sebastienvercammen committed Jan 28, 2017
1 parent 72e57ce commit 0f93353
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 39 deletions.
7 changes: 5 additions & 2 deletions pogom/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,16 @@ def get_args():
help=('Number of webhook threads; increase if the ' +
'webhook queue falls behind.'),
type=int, default=1)
parser.add_argument('-whc', '--wh-concurrency',
help=('Async requests pool size.'), type=int,
default=25)
parser.add_argument('-whr', '--wh-retries',
help=('Number of times to retry sending webhook ' +
'data on failure.'),
type=int, default=5)
type=int, default=3)
parser.add_argument('-wht', '--wh-timeout',
help='Timeout (in seconds) for webhook requests.',
type=int, default=2)
type=float, default=1.0)
parser.add_argument('-whbf', '--wh-backoff-factor',
help=('Factor (in seconds) by which the delay ' +
'until next retry will increase.'),
Expand Down
116 changes: 79 additions & 37 deletions pogom/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,32 @@

import logging
import requests
from datetime import datetime
from requests_futures.sessions import FuturesSession
import threading
from .utils import get_args
from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

log = logging.getLogger(__name__)

# How low do we want the queue size to stay?
wh_warning_threshold = 100
# How long can it be over the threshold, in seconds?
# Default: 5 seconds per 100 in threshold.
wh_threshold_lifetime = int(5 * (wh_warning_threshold / 100.0))
wh_lock = threading.Lock()

def send_to_webhook(message_type, message):

def send_to_webhook(session, message_type, message):
args = get_args()

if not args.webhooks:
# What are you even doing here...
log.warning('Called send_to_webhook() without webhooks.')
return

# Config / arg parser
num_retries = args.wh_retries
req_timeout = args.wh_timeout
backoff_factor = args.wh_backoff_factor

# Use requests & urllib3 to auto-retry.
# If the backoff_factor is 0.1, then sleep() will sleep for [0.1s, 0.2s,
# 0.4s, ...] between retries. It will also force a retry if the status
# code returned is 500, 502, 503 or 504.
session = requests.Session()

# If any regular response is generated, no retry is done. Without using
# the status_forcelist, even a response with status 500 will not be
# retried.
retries = Retry(total=num_retries, backoff_factor=backoff_factor,
status_forcelist=[500, 502, 503, 504])

# Mount handler on both HTTP & HTTPS.
session.mount('http://', HTTPAdapter(max_retries=retries))
session.mount('https://', HTTPAdapter(max_retries=retries))

data = {
'type': message_type,
Expand All @@ -46,14 +37,23 @@ def send_to_webhook(message_type, message):

for w in args.webhooks:
try:
session.post(w, json=data, timeout=(None, req_timeout))
session.post(w, json=data, timeout=(None, req_timeout),
background_callback=__wh_completed)
except requests.exceptions.ReadTimeout:
log.exception('Response timeout on webhook endpoint %s.', w)
except requests.exceptions.RequestException as e:
log.exception(repr(e))


def wh_updater(args, queue, key_cache):
wh_threshold_timer = datetime.now()
wh_over_threshold = False

# Set up one session to use for all requests.
# Requests to the same host will reuse the underlying TCP
# connection, giving a performance increase.
session = __get_requests_session(args)

# The forever loop.
while True:
try:
Expand All @@ -68,21 +68,19 @@ def wh_updater(args, queue, key_cache):
}
ident = message.get(ident_fields.get(whtype), None)

# cachetools in Python2.7 isn't thread safe, but adding a Lock
# slows down the queue immensely. Rather than being entirely
# thread safe, we catch the rare exception and re-add to queue.
try:
# cachetools in Python2.7 isn't thread safe, so we add a lock.
with wh_lock:
# Only send if identifier isn't already in cache.
if ident is None:
# We don't know what it is, so let's just log and send
# as-is.
log.debug(
'Sending webhook item of unknown type: %s.', whtype)
send_to_webhook(whtype, message)
send_to_webhook(session, whtype, message)
elif ident not in key_cache:
key_cache[ident] = message
log.debug('Sending %s to webhook: %s.', whtype, ident)
send_to_webhook(whtype, message)
send_to_webhook(session, whtype, message)
else:
# Make sure to call key_cache[ident] in all branches so it
# updates the LFU usage count.
Expand All @@ -91,23 +89,32 @@ def wh_updater(args, queue, key_cache):
# data to webhooks.
if __wh_object_changed(whtype, key_cache[ident], message):
key_cache[ident] = message
send_to_webhook(whtype, message)
send_to_webhook(session, whtype, message)
log.debug('Sending updated %s to webhook: %s.',
whtype, ident)
else:
log.debug('Not resending %s to webhook: %s.',
whtype, ident)
except KeyError as ex:
log.debug(
'LFUCache thread unsafe exception: %s. Requeuing.',
repr(ex))
queue.put((whtype, message))

# Webhook queue moving too slow.
if queue.qsize() > 50:
log.warning('Webhook queue is > 50 (@%d); ' +
'try increasing --wh-threads.',
queue.qsize())
if (not wh_over_threshold) and (
queue.qsize() > wh_warning_threshold):
wh_over_threshold = True
wh_threshold_timer = datetime.now()
elif wh_over_threshold:
if queue.qsize() < wh_warning_threshold:
wh_over_threshold = False
else:
timediff = datetime.now() - wh_threshold_timer

if timediff.total_seconds() > wh_threshold_lifetime:
log.warning('Webhook queue has been > %d (@%d);'
+ ' for over %d seconds,'
+ ' try increasing --wh-concurrency'
+ ' or --wh-threads.',
wh_warning_threshold,
queue.qsize(),
wh_threshold_lifetime)

queue.task_done()
except Exception as e:
Expand All @@ -116,6 +123,41 @@ def wh_updater(args, queue, key_cache):

# Helpers

# Background handler for completed webhook requests.
# Currently doesn't do anything.
def __wh_completed():
pass


def __get_requests_session(args):
# Config / arg parser
num_retries = args.wh_retries
backoff_factor = args.wh_backoff_factor
pool_size = args.wh_concurrency

# Use requests & urllib3 to auto-retry.
# If the backoff_factor is 0.1, then sleep() will sleep for [0.1s, 0.2s,
# 0.4s, ...] between retries. It will also force a retry if the status
# code returned is 500, 502, 503 or 504.
session = FuturesSession(max_workers=pool_size)

# If any regular response is generated, no retry is done. Without using
# the status_forcelist, even a response with status 500 will not be
# retried.
retries = Retry(total=num_retries, backoff_factor=backoff_factor,
status_forcelist=[500, 502, 503, 504])

# Mount handler on both HTTP & HTTPS.
session.mount('http://', HTTPAdapter(max_retries=retries,
pool_connections=pool_size,
pool_maxsize=pool_size))
session.mount('https://', HTTPAdapter(max_retries=retries,
pool_connections=pool_size,
pool_maxsize=pool_size))

return session


def __get_key_fields(whtype):
key_fields = {
# lure_expiration is a UTC timestamp so it's good (Y).
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ sphinx-autobuild==0.6.0
recommonmark==0.4.0
sphinx_rtd_theme==0.1.9
requests==2.10.0
requests-futures==0.9.7
PySocks==1.5.6
git+https://github.com/maddhatter/Flask-CacheBust.git@38d940cc4f18b5fcb5687746294e0360640a107e#egg=flask_cachebust
cachetools==2.0.0

0 comments on commit 0f93353

Please sign in to comment.