Skip to content
This repository has been archived by the owner on Dec 13, 2018. It is now read-only.

Commit

Permalink
1.6.5 (#183)
Browse files Browse the repository at this point in the history
* Add reload timeout for old process (#174)

* Add RELOAD_TIMEOUT to configs file

* add timeout to update_helper

* update tests

* Fix bug where 'timer' wasn't defined

* "logger" not "logging"

* Update readme

* bump version & reformat the code
  • Loading branch information
Feng Honglin committed Apr 3, 2017
1 parent 44fdc4a commit ef61092
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 19 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ Settings in this part is immutable, you have to redeploy HAProxy service to make
|MONITOR_PORT| |the port number where monitor_uri should be added to. Use together with `MONITOR_URI`. Possible value: `80`|
|MONITOR_URI| |the exact URI which we want to intercept to return HAProxy's health status instead of forwarding the request.See: http://cbonte.github.io/haproxy-dconv/configuration-1.5.html#4-monitor-uri. Possible value: `/ping`|
|OPTION|redispatch|comma-separated list of HAProxy `option` entries to the `default` section.|
|RELOAD_TIMEOUT|0| When haproxy is reconfigured, a new process starts and attaches to the TCP socket for new connections, leaving the old process to handle existing connections. This timeout specifies how long the old process is permitted to continue running before being killed. <br/> `-1`: Old process is killed immediately<br/> `0`: No timeout, old process will run as long as TCP connections last. This could potentially be quite a while as `http-keep-alives` are enabled which will keep TCP connections open.<br/> `>0`: Timeout in secs after which the process will be killed.
|RSYSLOG_DESTINATION|127.0.0.1|the rsyslog destination to where HAProxy logs are sent|
|SKIP_FORWARDED_PROTO||If set to any value, HAProxy will not add an X-Forwarded- headers. This can be used when combining HAProxy with another load balancer|
|SSL_BIND_CIPHERS| |explicitly set which SSL ciphers will be used for the SSL server. This sets the HAProxy `ssl-default-bind-ciphers` configuration setting.|
Expand Down
2 changes: 1 addition & 1 deletion haproxy/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.6.4"
__version__ = "1.6.5"
5 changes: 3 additions & 2 deletions haproxy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ def parse_additional_backend_settings(envvars):
TIMEOUT = os.getenv("TIMEOUT", "connect 5000, client 50000, server 50000")
NBPROC = int(os.getenv("NBPROC", 1))
SWARM_MODE_POLLING_INTERVAL = int(os.getenv("SWARM_MODE_POLLING_INTERVAL", 5))
HAPROXY_USER=os.getenv("HAPROXY_USER", "haproxy")
HAPROXY_GROUP=os.getenv("HAPROXY_GROUP", "haproxy")
HAPROXY_USER = os.getenv("HAPROXY_USER", "haproxy")
HAPROXY_GROUP = os.getenv("HAPROXY_GROUP", "haproxy")
RELOAD_TIMEOUT = os.getenv("RELOAD_TIMEOUT", "0")

# global
RUNNING_MODE = None
Expand Down
66 changes: 58 additions & 8 deletions haproxy/helper/update_helper.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,46 @@
import logging
import subprocess
import thread
import threading
import time

from haproxy.config import HAPROXY_RUN_COMMAND
from haproxy.config import HAPROXY_RUN_COMMAND, RELOAD_TIMEOUT

logger = logging.getLogger("haproxy")


def run_reload(old_process):
# RELOAD_TIMEOUT has the following values and effect:
# -1 : Reload haproxy with "-st" which will immediately kill the previous process
# 0 : Reload haproxy with "-sf" and no timeout. This can potentially leave
# "broken" processes (where the backends have changed) hanging around
# with existing connections.
# > 0 : Reload haproxy with "-sf" but if it takes longer than RELOAD_TIMEOUT then kill it
# This gives existing connections a chance to finish. RELOAD_TIMEOUT should be set to
# the approximate time it takes docker to finish updating services. By this point the
# existing configuration will be invalid, and any connections still using it will
# have invalid backends.
#
def run_reload(old_process, timeout=int(RELOAD_TIMEOUT)):
if old_process:
# Reload haproxy
logger.info("Reloading HAProxy")
new_process = subprocess.Popen(HAPROXY_RUN_COMMAND + ["-sf", str(old_process.pid)])
thread.start_new_thread(wait_pid, (old_process,))
logger.info("HAProxy has been reloaded(PID: %s)", str(new_process.pid))
if timeout == -1:
flag = "-st"
logger.info("Restarting HAProxy immediately")
else:
flag = "-sf"
logger.info("Restarting HAProxy gracefully")

new_process = subprocess.Popen(HAPROXY_RUN_COMMAND + [flag, str(old_process.pid)])
logger.info("HAProxy is reloading (new PID: %s)", str(new_process.pid))

thread = threading.Thread(target=wait_pid, args=[old_process, timeout])
thread.start()

# Block only if we have a timeout. If we don't it could take forever, and so
# returning immediately maintains the original behaviour of no timeout.
if timeout > 0:
thread.join()

else:
# Launch haproxy
logger.info("Launching HAProxy")
Expand All @@ -23,6 +50,29 @@ def run_reload(old_process):
return new_process


def wait_pid(process):
def wait_pid(process, timeout):
start = time.time()

timer = None

if timeout > 0:
timer = threading.Timer(timeout, timeout_handler, [process])
timer.start()

process.wait()
logger.info("HAProxy(PID:%s) has been terminated" % str(process.pid))

if timer is not None:
timer.cancel();

duration = time.time() - start
logger.info("Old HAProxy(PID: %s) ended after %s sec", str(process.pid), str(duration))


def timeout_handler(processs):
if processs.poll() is None:
try:
processs.terminate()
logger.info("Old HAProxy process taking too long to complete - terminating")
except OSError as e:
if e.errno != errno.ESRCH:
raise
68 changes: 60 additions & 8 deletions tests/unit/helper/test_update_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,75 @@


class UpdateHelperTestCase(unittest.TestCase):
class blockingObject(object):
terminated = False
timeout = None

def __init__(self, timeout):
self.timeout = timeout

def wait(self):
startTime = time.time()
# block until waiting == true or we've hit the timeout
while self.terminated == False and time.time() < startTime + self.timeout:
time.sleep(0.5)

def poll(self):
return None

def terminate(self):
self.terminated = True

pass

class Object(object):
pass

@mock.patch("haproxy.helper.update_helper.thread.start_new_thread")
@mock.patch("haproxy.helper.update_helper.subprocess.Popen")
def test_run_reload_with_old_process(self, mock_popen, mock_new_thread):
def test_run_graceful_reload_within_timeout(self, mock_popen):
old_process = UpdateHelperTestCase.blockingObject(2)
old_process.pid = "old_pid"
new_process = UpdateHelperTestCase.Object()
new_process.pid = "new_pid"
mock_popen.return_value = new_process
run_reload(old_process, 5)
self.assertFalse(old_process.terminated)

@mock.patch("haproxy.helper.update_helper.subprocess.Popen")
def test_run_graceful_reload_exceeding_timeout(self, mock_popen):
old_process = UpdateHelperTestCase.blockingObject(10)
old_process.pid = "old_pid"
new_process = UpdateHelperTestCase.Object()
new_process.pid = "new_pid"
mock_popen.return_value = new_process
run_reload(old_process, 5)
self.assertTrue(old_process.terminated)

@mock.patch("haproxy.helper.update_helper.threading.Thread")
@mock.patch("haproxy.helper.update_helper.subprocess.Popen")
def test_run_graceful_reload_with_old_process(self, mock_popen, mock_new_thread):
old_process = UpdateHelperTestCase.Object()
old_process.pid = "old_pid"
new_process = UpdateHelperTestCase.Object()
new_process.pid = "new_pid"
mock_popen.return_value = new_process
run_reload(old_process, 0)
mock_popen.assert_called_with(HAPROXY_RUN_COMMAND + ['-sf', 'old_pid'])
mock_new_thread.assert_called_with(target=wait_pid, args=[old_process, 0])

@mock.patch("haproxy.helper.update_helper.threading.Thread")
@mock.patch("haproxy.helper.update_helper.subprocess.Popen")
def test_run_brutal_reload_with_old_process(self, mock_popen, mock_new_thread):
old_process = UpdateHelperTestCase.Object()
old_process.pid = "pid"
old_process.pid = "old_pid"
new_process = UpdateHelperTestCase.Object()
new_process.pid = "new_pid"
mock_popen.return_value = new_process
mock_popen.return_value = old_process
run_reload(old_process)
mock_popen.assert_called_with(HAPROXY_RUN_COMMAND + ['-sf', 'pid'])
mock_new_thread.caslled_with(wait_pid, (old_process,))
run_reload(old_process, -1)
mock_popen.assert_called_with(HAPROXY_RUN_COMMAND + ['-st', 'old_pid'])
mock_new_thread.assert_called_with(target=wait_pid, args=[old_process, -1])

@mock.patch("haproxy.helper.update_helper.thread.start_new_thread")
@mock.patch("haproxy.helper.update_helper.threading.Thread")
@mock.patch("haproxy.helper.update_helper.subprocess.Popen")
def test_run_reload_with_empty_old_process(self, mock_popen, mock_new_thread):
new_process = UpdateHelperTestCase.Object()
Expand Down

0 comments on commit ef61092

Please sign in to comment.