Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Slow handling of connection incoming buffer #33

Merged
merged 1 commit into from

3 participants

Roman Podoliaka Eskil Heyn Olsen Kevin Lange
Roman Podoliaka

A connection incoming buffer is extended using += operator. This leads
to unnecessary memory allocations and significant performance drops when
transferring big amounts of data (i. e. a few megabytes). Using of the array('c')
(an array of bytes) data type enables us to handle incoming data efficiently.

I used this dummy code for testing:
worker: http://xsnippet.org/359371/
client: http://xsnippet.org/359372/
binary data: dd if=/dev/urandom of=data.bin bs=512 count=10000

Roman Podoliaka malor Use array.array('c') as a type of incomming buffer
A connection incoming buffer is extended using += operator. This leads
to unnecessary memory allocations and significant performance drops when
transferring big amounts of data (i. e. a few megabytes). Using of the array('c')
(an array of bytes) data type enables us to handle incoming data efficiently.
31188de
Eskil Heyn Olsen

This looks good. Thanks for the fix!

Kevin Lange
Collaborator

Agreed. This looks useful.

Kevin Lange klange merged commit b568ce7 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Feb 14, 2013
  1. Roman Podoliaka

    Use array.array('c') as a type of incomming buffer

    malor authored
    A connection incoming buffer is extended using += operator. This leads
    to unnecessary memory allocations and significant performance drops when
    transferring big amounts of data (i. e. a few megabytes). Using of the array('c')
    (an array of bytes) data type enables us to handle incoming data efficiently.
This page is out of date. Refresh to see the latest.
5 gearman/connection.py
View
@@ -1,3 +1,4 @@
+import array
import collections
import logging
import socket
@@ -46,7 +47,7 @@ def _reset_connection(self):
self._is_server_side = None
# Reset all our raw data buffers
- self._incoming_buffer = ''
+ self._incoming_buffer = array.array('c')
self._outgoing_buffer = ''
# Toss all commands we may have sent or received
@@ -149,7 +150,7 @@ def read_data_from_socket(self, bytes_to_read=4096):
if len(recv_buffer) == 0:
self.throw_exception(message='remote disconnected')
- self._incoming_buffer += recv_buffer
+ self._incoming_buffer.fromstring(recv_buffer)
return len(self._incoming_buffer)
def _unpack_command(self, given_buffer):
3  gearman/protocol.py
View
@@ -203,6 +203,7 @@ def parse_binary_command(in_buffer, is_response=True):
split_arguments = []
if len(expected_cmd_params) > 0:
+ binary_payload = binary_payload.tostring()
split_arguments = binary_payload.split(NULL_CHAR, len(expected_cmd_params) - 1)
elif binary_payload:
raise ProtocolError('Expected no binary payload: %s' % get_command_name(cmd_type))
@@ -262,7 +263,7 @@ def parse_text_command(in_buffer):
if '\n' not in in_buffer:
return cmd_type, cmd_args, cmd_len
- text_command, in_buffer = in_buffer.split('\n', 1)
+ text_command, in_buffer = in_buffer.tostring().split('\n', 1)
if NULL_CHAR in text_command:
raise ProtocolError('Received unexpected character: %s' % text_command)
46 tests/protocol_tests.py
View
@@ -1,3 +1,4 @@
+import array
import struct
import unittest
@@ -17,26 +18,44 @@ def test_parsing_errors(self):
malformed_command_buffer = "%sAAAABBBBCCCC"
# Raise malformed magic exceptions
- self.assertRaises(ProtocolError, protocol.parse_binary_command, malformed_command_buffer % "DDDD")
- self.assertRaises(ProtocolError, protocol.parse_binary_command, malformed_command_buffer % protocol.MAGIC_RES_STRING, is_response=False)
- self.assertRaises(ProtocolError, protocol.parse_binary_command, malformed_command_buffer % protocol.MAGIC_REQ_STRING, is_response=True)
+ self.assertRaises(
+ ProtocolError,
+ protocol.parse_binary_command,
+ array.array("c", malformed_command_buffer % "DDDD")
+ )
+ self.assertRaises(
+ ProtocolError,
+ protocol.parse_binary_command,
+ array.array("c", malformed_command_buffer % protocol.MAGIC_RES_STRING),
+ is_response=False
+ )
+ self.assertRaises(
+ ProtocolError,
+ protocol.parse_binary_command,
+ array.array("c", malformed_command_buffer % protocol.MAGIC_REQ_STRING),
+ is_response=True
+ )
# Raise unknown command errors
unassigned_gearman_command = 1234
unknown_command_buffer = struct.pack('!4sII', protocol.MAGIC_RES_STRING, unassigned_gearman_command, 0)
+ unknown_command_buffer = array.array("c", unknown_command_buffer)
self.assertRaises(ProtocolError, protocol.parse_binary_command, unknown_command_buffer)
# Raise an error on our imaginary GEARMAN_COMMAND_TEXT_COMMAND
imaginary_command_buffer = struct.pack('!4sII4s', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_TEXT_COMMAND, 4, 'ABCD')
+ imaginary_command_buffer = array.array("c", imaginary_command_buffer)
self.assertRaises(ProtocolError, protocol.parse_binary_command, imaginary_command_buffer)
# Raise an error on receiving an unexpected payload
unexpected_payload_command_buffer = struct.pack('!4sII4s', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_NOOP, 4, 'ABCD')
+ unexpected_payload_command_buffer = array.array("c", unexpected_payload_command_buffer)
self.assertRaises(ProtocolError, protocol.parse_binary_command, unexpected_payload_command_buffer)
def test_parsing_request(self):
# Test parsing a request for a job (server side parsing)
grab_job_command_buffer = struct.pack('!4sII', protocol.MAGIC_REQ_STRING, protocol.GEARMAN_COMMAND_GRAB_JOB_UNIQ, 0)
+ grab_job_command_buffer = array.array("c", grab_job_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(grab_job_command_buffer, is_response=False)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_GRAB_JOB_UNIQ)
self.assertEquals(cmd_args, dict())
@@ -45,6 +64,7 @@ def test_parsing_request(self):
def test_parsing_without_enough_data(self):
# Test that we return with nothing to do... received a partial packet
not_enough_data_command_buffer = struct.pack('!4s', protocol.MAGIC_RES_STRING)
+ not_enough_data_command_buffer = array.array("c", not_enough_data_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(not_enough_data_command_buffer)
self.assertEquals(cmd_type, None)
self.assertEquals(cmd_args, None)
@@ -52,6 +72,7 @@ def test_parsing_without_enough_data(self):
# Test that we return with nothing to do... received a partial packet (expected binary payload of size 4, got 0)
not_enough_data_command_buffer = struct.pack('!4sII', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_ECHO_RES, 4)
+ not_enough_data_command_buffer = array.array("c", not_enough_data_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(not_enough_data_command_buffer)
self.assertEquals(cmd_type, None)
self.assertEquals(cmd_args, None)
@@ -59,6 +80,7 @@ def test_parsing_without_enough_data(self):
def test_parsing_no_args(self):
noop_command_buffer = struct.pack('!4sII', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_NOOP, 0)
+ noop_command_buffer = array.array("c", noop_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(noop_command_buffer)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_NOOP)
self.assertEquals(cmd_args, dict())
@@ -67,6 +89,7 @@ def test_parsing_no_args(self):
def test_parsing_single_arg(self):
echoed_string = 'abcd'
echo_command_buffer = struct.pack('!4sII4s', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_ECHO_RES, 4, echoed_string)
+ echo_command_buffer = array.array("c", echo_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(echo_command_buffer)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_ECHO_RES)
self.assertEquals(cmd_args, dict(data=echoed_string))
@@ -77,6 +100,8 @@ def test_parsing_single_arg_with_extra_data(self):
excess_bytes = 5
excess_data = echoed_string + (protocol.NULL_CHAR * excess_bytes)
excess_echo_command_buffer = struct.pack('!4sII9s', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_ECHO_RES, 4, excess_data)
+ excess_echo_command_buffer = array.array("c", excess_echo_command_buffer)
+
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(excess_echo_command_buffer)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_ECHO_RES)
self.assertEquals(cmd_args, dict(data=echoed_string))
@@ -89,6 +114,7 @@ def test_parsing_multiple_args(self):
payload_size = len(binary_payload)
uniq_command_buffer = struct.pack('!4sII%ds' % payload_size, protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_JOB_ASSIGN_UNIQ, payload_size, binary_payload)
+ uniq_command_buffer = array.array("c", uniq_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(uniq_command_buffer)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
self.assertEquals(cmd_args, dict(job_handle='test', task='function', unique='identifier', data=expected_data))
@@ -195,31 +221,31 @@ class ProtocolTextCommandsTest(unittest.TestCase):
# Begin parsing tests #
#######################
def test_parsing_errors(self):
- received_data = "Hello\x00there\n"
+ received_data = array.array("c", "Hello\x00there\n")
self.assertRaises(ProtocolError, protocol.parse_text_command, received_data)
def test_parsing_without_enough_data(self):
- received_data = "Hello there"
+ received_data = array.array("c", "Hello there")
cmd_type, cmd_response, cmd_len = protocol.parse_text_command(received_data)
self.assertEquals(cmd_type, None)
self.assertEquals(cmd_response, None)
self.assertEquals(cmd_len, 0)
def test_parsing_single_line(self):
- received_data = "Hello there\n"
+ received_data = array.array("c", "Hello there\n")
cmd_type, cmd_response, cmd_len = protocol.parse_text_command(received_data)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_TEXT_COMMAND)
- self.assertEquals(cmd_response, dict(raw_text=received_data.strip()))
+ self.assertEquals(cmd_response, dict(raw_text=received_data.tostring().strip()))
self.assertEquals(cmd_len, len(received_data))
def test_parsing_multi_line(self):
- sentence_one = "Hello there\n"
- sentence_two = "My name is bob\n"
+ sentence_one = array.array("c", "Hello there\n")
+ sentence_two = array.array("c", "My name is bob\n")
received_data = sentence_one + sentence_two
cmd_type, cmd_response, cmd_len = protocol.parse_text_command(received_data)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_TEXT_COMMAND)
- self.assertEquals(cmd_response, dict(raw_text=sentence_one.strip()))
+ self.assertEquals(cmd_response, dict(raw_text=sentence_one.tostring().strip()))
self.assertEquals(cmd_len, len(sentence_one))
def test_packing_errors(self):
Something went wrong with that request. Please try again.