Skip to content

Commit

Permalink
Make Spinach more reliable to connection errors
Browse files Browse the repository at this point in the history
  • Loading branch information
NicolasLM committed Feb 24, 2018
1 parent 9e8ea10 commit e792f2b
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 4 deletions.
26 changes: 24 additions & 2 deletions spinach/brokers/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from logging import getLogger
import math
from os import path
import socket
import threading
from typing import Optional

Expand All @@ -11,6 +12,8 @@
from ..brokers.base import Broker
from ..job import Job, JobStatus
from ..const import FUTURE_JOBS_KEY, NOTIFICATIONS_KEY, RUNNING_JOBS_KEY
from ..utils import run_forever


logger = getLogger('spinach.broker')
here = path.abspath(path.dirname(__file__))
Expand All @@ -26,7 +29,7 @@ class RedisBroker(Broker):

def __init__(self, redis: Optional[StrictRedis]=None):
super().__init__()
self._r = redis if redis else StrictRedis()
self._r = redis if redis else StrictRedis(**recommended_socket_opts)

# Register the lua scripts
self._move_future_jobs = self._load_script('move_future_jobs.lua')
Expand Down Expand Up @@ -133,7 +136,8 @@ def _subscriber_func(self):

def start(self):
self._subscriber_thread = threading.Thread(
target=self._subscriber_func,
target=run_forever,
args=(self._subscriber_func, self._must_stop, logger),
name='{}-broker-subscriber'.format(self.namespace)
)
self._subscriber_thread.start()
Expand All @@ -144,3 +148,21 @@ def stop(self):

def flush(self):
self._run_script(self._flush, self.namespace)


recommended_socket_opts = {
'socket_timeout': 60,
'socket_connect_timeout': 15
}
try:
# These are the values used by Redis itself by default
recommended_socket_opts['socket_keepalive_options'] = {
socket.TCP_KEEPIDLE: 300, # Send probes after 300s of inactivity
socket.TCP_KEEPINTVL: 100, # Send probes every 100s
socket.TCP_KEEPCNT: 3 # Send 3 probes before closing
}
recommended_socket_opts['socket_keepalive'] = True
except AttributeError:
# Some non-Linux OS do not have the proper attribute in the socket module
# for TCP Keepalive
pass # noqa
5 changes: 3 additions & 2 deletions spinach/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Optional

from .task import Task, Tasks, RetryException, exponential_backoff
from .utils import human_duration
from .utils import human_duration, run_forever
from .job import Job, JobStatus
from .brokers.base import Broker
from .const import DEFAULT_QUEUE, DEFAULT_NAMESPACE
Expand Down Expand Up @@ -142,7 +142,8 @@ def start_workers(self, number: int=5, queue=DEFAULT_QUEUE, block=True):

# Start the arbiter
self._arbiter = threading.Thread(
target=self._arbiter_func,
target=run_forever,
args=(self._arbiter_func, self._must_stop, logger),
name='{}-arbiter'.format(self.namespace)
)
self._arbiter.start()
Expand Down
20 changes: 20 additions & 0 deletions spinach/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,27 @@
from logging import Logger
from threading import Event
import time
from typing import Callable

from redis import ConnectionError, TimeoutError


def human_duration(duration_seconds: float) -> str:
"""Convert a duration in seconds into a human friendly string."""
if duration_seconds < 0.001:
return '0 ms'
if duration_seconds < 1:
return '{} ms'.format(int(duration_seconds * 1000))
return '{} s'.format(int(duration_seconds))


def run_forever(func: Callable, must_stop: Event, logger: Logger):
while not must_stop.is_set():
try:
func()
except (ConnectionError, TimeoutError) as e:
logger.warning('Connection issue: %s', e)
time.sleep(10)
except Exception:
logger.exception('Unexpected error')
time.sleep(10)
32 changes: 32 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import threading
from unittest.mock import Mock, patch, ANY

from redis import ConnectionError

from spinach import utils


Expand All @@ -7,3 +12,30 @@ def test_human_duration():
assert utils.human_duration(0.25) == '250 ms'
assert utils.human_duration(1) == '1 s'
assert utils.human_duration(2500) == '2500 s'


@patch('spinach.utils.time.sleep')
def test_run_forever(_):
must_stop = threading.Event()
logger = Mock()
call_count = 0

def func():
nonlocal call_count
call_count += 1

if call_count == 1:
return
elif call_count == 2:
raise RuntimeError('Foo')
elif call_count == 3:
raise ConnectionError('Bar')
elif call_count == 4:
must_stop.set()
return

utils.run_forever(func, must_stop, logger)
assert call_count == 4
logger.exception.assert_called_once_with(ANY)
logger.warning.assert_called_once_with(ANY, ANY)
assert must_stop.is_set()

0 comments on commit e792f2b

Please sign in to comment.