Skip to content

Commit

Permalink
Whenever either or both specify 0 value for heartbeat, pick 0 since i…
Browse files Browse the repository at this point in the history
…t's the one

that consumes the fewest resources and goes along with AMQP 0.9.1 negotiation rules
regarding picking the value that reduces resource consumption.

Update heartbeat negotiation tests for the new heartbeat negotiation logic that favors max heartbeat value.
  • Loading branch information
Vitaly Kruglikov committed Feb 8, 2016
1 parent d4dcecb commit 781c548
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 8 deletions.
56 changes: 51 additions & 5 deletions pika/connection.py
Expand Up @@ -1313,6 +1313,52 @@ def _on_connection_start(self, method_frame):
self._add_connection_tune_callback()
self._send_connection_start_ok(*self._get_credentials(method_frame))


@staticmethod
def _tune_heartbeat_timeout(client_value, server_value):
""" Determine heartbeat timeout per AMQP 0-9-1 rules
Per https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf,
> Both peers negotiate the limits to the lowest agreed value as follows:
> - The server MUST tell the client what limits it proposes.
> - The client responds and **MAY reduce those limits** for its
connection
When negotiating heartbeat timeout, the reasoning needs to be reversed.
The way I think it makes sense to interpret this rule for heartbeats is
that the consumable resource is the frequency of heartbeats, which is
the inverse of the timeout. The more frequent heartbeats consume more
resources than less frequent heartbeats. So, when both heartbeat
timeouts are non-zero, we should pick the max heartbeat timeout rather
than the min. The heartbeat timeout value 0 (zero) has a special
meaning - it's supposed to disable the timeout. This makes zero a
setting for the least frequent heartbeats (i.e., never); therefore, if
any (or both) of the two is zero, then the above rules would suggest
that negotiation should yield 0 value for heartbeat, effectively turning
it off.
:param client_value: None to accept server_value; otherwise, an integral
number in seconds; 0 (zero) to disable heartbeat.
:param server_value: integral value of the heartbeat timeout proposed by
broker; 0 (zero) to disable heartbeat.
:returns: the value of the heartbeat timeout to use and return to broker
"""
if client_value is None:
# Accept server's limit
timeout = server_value
elif client_value == 0 or server_value == 0:
# 0 has a special meaning "disable heartbeats", which makes it the
# least frequent heartbeat value there is
timeout = 0
else:
# Pick the one with the bigger heartbeat timeout (i.e., the less
# frequent one)
timeout = max(client_value, server_value)

return timeout

def _on_connection_tune(self, method_frame):
"""Once the Broker sends back a Connection.Tune, we will set our tuning
variables that have been returned to us and kick off the Heartbeat
Expand All @@ -1329,11 +1375,11 @@ def _on_connection_tune(self, method_frame):
method_frame.method.channel_max)
self.params.frame_max = self._combine(self.params.frame_max,
method_frame.method.frame_max)
if self.params.heartbeat is None:
self.params.heartbeat = method_frame.method.heartbeat
elif self.params.heartbeat != 0:
self.params.heartbeat = max(self.params.heartbeat,
method_frame.method.heartbeat)

# Negotiate heatbeat timeout
self.params.heartbeat = self._tune_heartbeat_timeout(
client_value=self.params.heartbeat,
server_value=method_frame.method.heartbeat)

# Calculate the maximum pieces for body frames
self._body_max_length = self._get_body_frame_max_length()
Expand Down
54 changes: 51 additions & 3 deletions tests/unit/connection_tests.py
Expand Up @@ -481,7 +481,7 @@ def test_on_connection_tune(self, method, heartbeat_checker):
method_frame.method = mock.Mock()
method_frame.method.channel_max = 40
method_frame.method.frame_max = 10
method_frame.method.heartbeat = 0
method_frame.method.heartbeat = 10
self.connection.params.channel_max = 20
self.connection.params.frame_max = 20
self.connection.params.heartbeat = 20
Expand All @@ -498,8 +498,56 @@ def test_on_connection_tune(self, method, heartbeat_checker):
self.assertEqual(['ab'], list(self.connection.outbound_buffer))
self.assertEqual('hearbeat obj', self.connection.heartbeat)

def test_on_connection_close(self):
"""make sure _on_connection_close terminates connection"""
# Repeat with smaller user heartbeat than broker
method_frame.method.heartbeat = 60
self.connection.params.heartbeat = 20
#Test
self.connection._on_connection_tune(method_frame)
#verfy
self.assertEqual(60, self.connection.params.heartbeat)

# Repeat with user deferring to server's heartbeat timeout
method_frame.method.heartbeat = 500
self.connection.params.heartbeat = None
#Test
self.connection._on_connection_tune(method_frame)
#verfy
self.assertEqual(500, self.connection.params.heartbeat)

# Repeat with user deferring to server's disabled heartbeat value
method_frame.method.heartbeat = 0
self.connection.params.heartbeat = None
#Test
self.connection._on_connection_tune(method_frame)
#verfy
self.assertEqual(0, self.connection.params.heartbeat)

# Repeat with user-disabled heartbeat
method_frame.method.heartbeat = 60
self.connection.params.heartbeat = 0
#Test
self.connection._on_connection_tune(method_frame)
#verfy
self.assertEqual(0, self.connection.params.heartbeat)

# Repeat with server-disabled heartbeat
method_frame.method.heartbeat = 0
self.connection.params.heartbeat = 60
#Test
self.connection._on_connection_tune(method_frame)
#verfy
self.assertEqual(0, self.connection.params.heartbeat)

# Repeat with both user/server disabled heartbeats
method_frame.method.heartbeat = 0
self.connection.params.heartbeat = 0
#Test
self.connection._on_connection_tune(method_frame)
#verfy
self.assertEqual(0, self.connection.params.heartbeat)

def test_on_connection_closed(self):
"""make sure connection close sends correct frames"""
method_frame = mock.Mock()
method_frame.method = mock.Mock(spec=spec.Connection.Close)
method_frame.method.reply_code = 1
Expand Down

0 comments on commit 781c548

Please sign in to comment.