Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Use array.array('c') as a type of incomming buffer

A connection incoming buffer is extended using += operator. This leads
to unnecessary memory allocations and significant performance drops when
transferring big amounts of data (i. e. a few megabytes). Using of the array('c')
(an array of bytes) data type enables us to handle incoming data efficiently.
  • Loading branch information...
commit 31188de85d267bf249806b3db00dfb63940e0b1f 1 parent 514eca0
@malor malor authored
View
5 gearman/connection.py
@@ -1,3 +1,4 @@
+import array
import collections
import logging
import socket
@@ -46,7 +47,7 @@ def _reset_connection(self):
self._is_server_side = None
# Reset all our raw data buffers
- self._incoming_buffer = ''
+ self._incoming_buffer = array.array('c')
self._outgoing_buffer = ''
# Toss all commands we may have sent or received
@@ -149,7 +150,7 @@ def read_data_from_socket(self, bytes_to_read=4096):
if len(recv_buffer) == 0:
self.throw_exception(message='remote disconnected')
- self._incoming_buffer += recv_buffer
+ self._incoming_buffer.fromstring(recv_buffer)
return len(self._incoming_buffer)
def _unpack_command(self, given_buffer):
View
3  gearman/protocol.py
@@ -203,6 +203,7 @@ def parse_binary_command(in_buffer, is_response=True):
split_arguments = []
if len(expected_cmd_params) > 0:
+ binary_payload = binary_payload.tostring()
split_arguments = binary_payload.split(NULL_CHAR, len(expected_cmd_params) - 1)
elif binary_payload:
raise ProtocolError('Expected no binary payload: %s' % get_command_name(cmd_type))
@@ -262,7 +263,7 @@ def parse_text_command(in_buffer):
if '\n' not in in_buffer:
return cmd_type, cmd_args, cmd_len
- text_command, in_buffer = in_buffer.split('\n', 1)
+ text_command, in_buffer = in_buffer.tostring().split('\n', 1)
if NULL_CHAR in text_command:
raise ProtocolError('Received unexpected character: %s' % text_command)
View
46 tests/protocol_tests.py
@@ -1,3 +1,4 @@
+import array
import struct
import unittest
@@ -17,26 +18,44 @@ def test_parsing_errors(self):
malformed_command_buffer = "%sAAAABBBBCCCC"
# Raise malformed magic exceptions
- self.assertRaises(ProtocolError, protocol.parse_binary_command, malformed_command_buffer % "DDDD")
- self.assertRaises(ProtocolError, protocol.parse_binary_command, malformed_command_buffer % protocol.MAGIC_RES_STRING, is_response=False)
- self.assertRaises(ProtocolError, protocol.parse_binary_command, malformed_command_buffer % protocol.MAGIC_REQ_STRING, is_response=True)
+ self.assertRaises(
+ ProtocolError,
+ protocol.parse_binary_command,
+ array.array("c", malformed_command_buffer % "DDDD")
+ )
+ self.assertRaises(
+ ProtocolError,
+ protocol.parse_binary_command,
+ array.array("c", malformed_command_buffer % protocol.MAGIC_RES_STRING),
+ is_response=False
+ )
+ self.assertRaises(
+ ProtocolError,
+ protocol.parse_binary_command,
+ array.array("c", malformed_command_buffer % protocol.MAGIC_REQ_STRING),
+ is_response=True
+ )
# Raise unknown command errors
unassigned_gearman_command = 1234
unknown_command_buffer = struct.pack('!4sII', protocol.MAGIC_RES_STRING, unassigned_gearman_command, 0)
+ unknown_command_buffer = array.array("c", unknown_command_buffer)
self.assertRaises(ProtocolError, protocol.parse_binary_command, unknown_command_buffer)
# Raise an error on our imaginary GEARMAN_COMMAND_TEXT_COMMAND
imaginary_command_buffer = struct.pack('!4sII4s', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_TEXT_COMMAND, 4, 'ABCD')
+ imaginary_command_buffer = array.array("c", imaginary_command_buffer)
self.assertRaises(ProtocolError, protocol.parse_binary_command, imaginary_command_buffer)
# Raise an error on receiving an unexpected payload
unexpected_payload_command_buffer = struct.pack('!4sII4s', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_NOOP, 4, 'ABCD')
+ unexpected_payload_command_buffer = array.array("c", unexpected_payload_command_buffer)
self.assertRaises(ProtocolError, protocol.parse_binary_command, unexpected_payload_command_buffer)
def test_parsing_request(self):
# Test parsing a request for a job (server side parsing)
grab_job_command_buffer = struct.pack('!4sII', protocol.MAGIC_REQ_STRING, protocol.GEARMAN_COMMAND_GRAB_JOB_UNIQ, 0)
+ grab_job_command_buffer = array.array("c", grab_job_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(grab_job_command_buffer, is_response=False)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_GRAB_JOB_UNIQ)
self.assertEquals(cmd_args, dict())
@@ -45,6 +64,7 @@ def test_parsing_request(self):
def test_parsing_without_enough_data(self):
# Test that we return with nothing to do... received a partial packet
not_enough_data_command_buffer = struct.pack('!4s', protocol.MAGIC_RES_STRING)
+ not_enough_data_command_buffer = array.array("c", not_enough_data_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(not_enough_data_command_buffer)
self.assertEquals(cmd_type, None)
self.assertEquals(cmd_args, None)
@@ -52,6 +72,7 @@ def test_parsing_without_enough_data(self):
# Test that we return with nothing to do... received a partial packet (expected binary payload of size 4, got 0)
not_enough_data_command_buffer = struct.pack('!4sII', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_ECHO_RES, 4)
+ not_enough_data_command_buffer = array.array("c", not_enough_data_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(not_enough_data_command_buffer)
self.assertEquals(cmd_type, None)
self.assertEquals(cmd_args, None)
@@ -59,6 +80,7 @@ def test_parsing_without_enough_data(self):
def test_parsing_no_args(self):
noop_command_buffer = struct.pack('!4sII', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_NOOP, 0)
+ noop_command_buffer = array.array("c", noop_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(noop_command_buffer)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_NOOP)
self.assertEquals(cmd_args, dict())
@@ -67,6 +89,7 @@ def test_parsing_no_args(self):
def test_parsing_single_arg(self):
echoed_string = 'abcd'
echo_command_buffer = struct.pack('!4sII4s', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_ECHO_RES, 4, echoed_string)
+ echo_command_buffer = array.array("c", echo_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(echo_command_buffer)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_ECHO_RES)
self.assertEquals(cmd_args, dict(data=echoed_string))
@@ -77,6 +100,8 @@ def test_parsing_single_arg_with_extra_data(self):
excess_bytes = 5
excess_data = echoed_string + (protocol.NULL_CHAR * excess_bytes)
excess_echo_command_buffer = struct.pack('!4sII9s', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_ECHO_RES, 4, excess_data)
+ excess_echo_command_buffer = array.array("c", excess_echo_command_buffer)
+
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(excess_echo_command_buffer)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_ECHO_RES)
self.assertEquals(cmd_args, dict(data=echoed_string))
@@ -89,6 +114,7 @@ def test_parsing_multiple_args(self):
payload_size = len(binary_payload)
uniq_command_buffer = struct.pack('!4sII%ds' % payload_size, protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_JOB_ASSIGN_UNIQ, payload_size, binary_payload)
+ uniq_command_buffer = array.array("c", uniq_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(uniq_command_buffer)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
self.assertEquals(cmd_args, dict(job_handle='test', task='function', unique='identifier', data=expected_data))
@@ -195,31 +221,31 @@ class ProtocolTextCommandsTest(unittest.TestCase):
# Begin parsing tests #
#######################
def test_parsing_errors(self):
- received_data = "Hello\x00there\n"
+ received_data = array.array("c", "Hello\x00there\n")
self.assertRaises(ProtocolError, protocol.parse_text_command, received_data)
def test_parsing_without_enough_data(self):
- received_data = "Hello there"
+ received_data = array.array("c", "Hello there")
cmd_type, cmd_response, cmd_len = protocol.parse_text_command(received_data)
self.assertEquals(cmd_type, None)
self.assertEquals(cmd_response, None)
self.assertEquals(cmd_len, 0)
def test_parsing_single_line(self):
- received_data = "Hello there\n"
+ received_data = array.array("c", "Hello there\n")
cmd_type, cmd_response, cmd_len = protocol.parse_text_command(received_data)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_TEXT_COMMAND)
- self.assertEquals(cmd_response, dict(raw_text=received_data.strip()))
+ self.assertEquals(cmd_response, dict(raw_text=received_data.tostring().strip()))
self.assertEquals(cmd_len, len(received_data))
def test_parsing_multi_line(self):
- sentence_one = "Hello there\n"
- sentence_two = "My name is bob\n"
+ sentence_one = array.array("c", "Hello there\n")
+ sentence_two = array.array("c", "My name is bob\n")
received_data = sentence_one + sentence_two
cmd_type, cmd_response, cmd_len = protocol.parse_text_command(received_data)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_TEXT_COMMAND)
- self.assertEquals(cmd_response, dict(raw_text=sentence_one.strip()))
+ self.assertEquals(cmd_response, dict(raw_text=sentence_one.tostring().strip()))
self.assertEquals(cmd_len, len(sentence_one))
def test_packing_errors(self):
Please sign in to comment.
Something went wrong with that request. Please try again.