Skip to content
This repository has been archived by the owner on Apr 18, 2018. It is now read-only.

Commit

Permalink
Use array.array('c') as a type of incomming buffer
Browse files Browse the repository at this point in the history
A connection incoming buffer is extended using += operator. This leads
to unnecessary memory allocations and significant performance drops when
transferring big amounts of data (i. e. a few megabytes). Using of the array('c')
(an array of bytes) data type enables us to handle incoming data efficiently.
  • Loading branch information
malor committed Feb 14, 2013
1 parent 514eca0 commit 31188de
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 13 deletions.
5 changes: 3 additions & 2 deletions gearman/connection.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import array
import collections
import logging
import socket
Expand Down Expand Up @@ -46,7 +47,7 @@ def _reset_connection(self):
self._is_server_side = None

# Reset all our raw data buffers
self._incoming_buffer = ''
self._incoming_buffer = array.array('c')
self._outgoing_buffer = ''

# Toss all commands we may have sent or received
Expand Down Expand Up @@ -149,7 +150,7 @@ def read_data_from_socket(self, bytes_to_read=4096):
if len(recv_buffer) == 0:
self.throw_exception(message='remote disconnected')

self._incoming_buffer += recv_buffer
self._incoming_buffer.fromstring(recv_buffer)
return len(self._incoming_buffer)

def _unpack_command(self, given_buffer):
Expand Down
3 changes: 2 additions & 1 deletion gearman/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def parse_binary_command(in_buffer, is_response=True):
split_arguments = []

if len(expected_cmd_params) > 0:
binary_payload = binary_payload.tostring()
split_arguments = binary_payload.split(NULL_CHAR, len(expected_cmd_params) - 1)
elif binary_payload:
raise ProtocolError('Expected no binary payload: %s' % get_command_name(cmd_type))
Expand Down Expand Up @@ -262,7 +263,7 @@ def parse_text_command(in_buffer):
if '\n' not in in_buffer:
return cmd_type, cmd_args, cmd_len

text_command, in_buffer = in_buffer.split('\n', 1)
text_command, in_buffer = in_buffer.tostring().split('\n', 1)
if NULL_CHAR in text_command:
raise ProtocolError('Received unexpected character: %s' % text_command)

Expand Down
46 changes: 36 additions & 10 deletions tests/protocol_tests.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import array
import struct
import unittest

Expand All @@ -17,26 +18,44 @@ def test_parsing_errors(self):
malformed_command_buffer = "%sAAAABBBBCCCC"

# Raise malformed magic exceptions
self.assertRaises(ProtocolError, protocol.parse_binary_command, malformed_command_buffer % "DDDD")
self.assertRaises(ProtocolError, protocol.parse_binary_command, malformed_command_buffer % protocol.MAGIC_RES_STRING, is_response=False)
self.assertRaises(ProtocolError, protocol.parse_binary_command, malformed_command_buffer % protocol.MAGIC_REQ_STRING, is_response=True)
self.assertRaises(
ProtocolError,
protocol.parse_binary_command,
array.array("c", malformed_command_buffer % "DDDD")
)
self.assertRaises(
ProtocolError,
protocol.parse_binary_command,
array.array("c", malformed_command_buffer % protocol.MAGIC_RES_STRING),
is_response=False
)
self.assertRaises(
ProtocolError,
protocol.parse_binary_command,
array.array("c", malformed_command_buffer % protocol.MAGIC_REQ_STRING),
is_response=True
)

# Raise unknown command errors
unassigned_gearman_command = 1234
unknown_command_buffer = struct.pack('!4sII', protocol.MAGIC_RES_STRING, unassigned_gearman_command, 0)
unknown_command_buffer = array.array("c", unknown_command_buffer)
self.assertRaises(ProtocolError, protocol.parse_binary_command, unknown_command_buffer)

# Raise an error on our imaginary GEARMAN_COMMAND_TEXT_COMMAND
imaginary_command_buffer = struct.pack('!4sII4s', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_TEXT_COMMAND, 4, 'ABCD')
imaginary_command_buffer = array.array("c", imaginary_command_buffer)
self.assertRaises(ProtocolError, protocol.parse_binary_command, imaginary_command_buffer)

# Raise an error on receiving an unexpected payload
unexpected_payload_command_buffer = struct.pack('!4sII4s', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_NOOP, 4, 'ABCD')
unexpected_payload_command_buffer = array.array("c", unexpected_payload_command_buffer)
self.assertRaises(ProtocolError, protocol.parse_binary_command, unexpected_payload_command_buffer)

def test_parsing_request(self):
# Test parsing a request for a job (server side parsing)
grab_job_command_buffer = struct.pack('!4sII', protocol.MAGIC_REQ_STRING, protocol.GEARMAN_COMMAND_GRAB_JOB_UNIQ, 0)
grab_job_command_buffer = array.array("c", grab_job_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(grab_job_command_buffer, is_response=False)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_GRAB_JOB_UNIQ)
self.assertEquals(cmd_args, dict())
Expand All @@ -45,20 +64,23 @@ def test_parsing_request(self):
def test_parsing_without_enough_data(self):
# Test that we return with nothing to do... received a partial packet
not_enough_data_command_buffer = struct.pack('!4s', protocol.MAGIC_RES_STRING)
not_enough_data_command_buffer = array.array("c", not_enough_data_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(not_enough_data_command_buffer)
self.assertEquals(cmd_type, None)
self.assertEquals(cmd_args, None)
self.assertEquals(cmd_len, 0)

# Test that we return with nothing to do... received a partial packet (expected binary payload of size 4, got 0)
not_enough_data_command_buffer = struct.pack('!4sII', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_ECHO_RES, 4)
not_enough_data_command_buffer = array.array("c", not_enough_data_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(not_enough_data_command_buffer)
self.assertEquals(cmd_type, None)
self.assertEquals(cmd_args, None)
self.assertEquals(cmd_len, 0)

def test_parsing_no_args(self):
noop_command_buffer = struct.pack('!4sII', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_NOOP, 0)
noop_command_buffer = array.array("c", noop_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(noop_command_buffer)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_NOOP)
self.assertEquals(cmd_args, dict())
Expand All @@ -67,6 +89,7 @@ def test_parsing_no_args(self):
def test_parsing_single_arg(self):
echoed_string = 'abcd'
echo_command_buffer = struct.pack('!4sII4s', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_ECHO_RES, 4, echoed_string)
echo_command_buffer = array.array("c", echo_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(echo_command_buffer)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_ECHO_RES)
self.assertEquals(cmd_args, dict(data=echoed_string))
Expand All @@ -77,6 +100,8 @@ def test_parsing_single_arg_with_extra_data(self):
excess_bytes = 5
excess_data = echoed_string + (protocol.NULL_CHAR * excess_bytes)
excess_echo_command_buffer = struct.pack('!4sII9s', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_ECHO_RES, 4, excess_data)
excess_echo_command_buffer = array.array("c", excess_echo_command_buffer)

cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(excess_echo_command_buffer)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_ECHO_RES)
self.assertEquals(cmd_args, dict(data=echoed_string))
Expand All @@ -89,6 +114,7 @@ def test_parsing_multiple_args(self):
payload_size = len(binary_payload)

uniq_command_buffer = struct.pack('!4sII%ds' % payload_size, protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_JOB_ASSIGN_UNIQ, payload_size, binary_payload)
uniq_command_buffer = array.array("c", uniq_command_buffer)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(uniq_command_buffer)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
self.assertEquals(cmd_args, dict(job_handle='test', task='function', unique='identifier', data=expected_data))
Expand Down Expand Up @@ -195,31 +221,31 @@ class ProtocolTextCommandsTest(unittest.TestCase):
# Begin parsing tests #
#######################
def test_parsing_errors(self):
received_data = "Hello\x00there\n"
received_data = array.array("c", "Hello\x00there\n")
self.assertRaises(ProtocolError, protocol.parse_text_command, received_data)

def test_parsing_without_enough_data(self):
received_data = "Hello there"
received_data = array.array("c", "Hello there")
cmd_type, cmd_response, cmd_len = protocol.parse_text_command(received_data)
self.assertEquals(cmd_type, None)
self.assertEquals(cmd_response, None)
self.assertEquals(cmd_len, 0)

def test_parsing_single_line(self):
received_data = "Hello there\n"
received_data = array.array("c", "Hello there\n")
cmd_type, cmd_response, cmd_len = protocol.parse_text_command(received_data)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_TEXT_COMMAND)
self.assertEquals(cmd_response, dict(raw_text=received_data.strip()))
self.assertEquals(cmd_response, dict(raw_text=received_data.tostring().strip()))
self.assertEquals(cmd_len, len(received_data))

def test_parsing_multi_line(self):
sentence_one = "Hello there\n"
sentence_two = "My name is bob\n"
sentence_one = array.array("c", "Hello there\n")
sentence_two = array.array("c", "My name is bob\n")
received_data = sentence_one + sentence_two

cmd_type, cmd_response, cmd_len = protocol.parse_text_command(received_data)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_TEXT_COMMAND)
self.assertEquals(cmd_response, dict(raw_text=sentence_one.strip()))
self.assertEquals(cmd_response, dict(raw_text=sentence_one.tostring().strip()))
self.assertEquals(cmd_len, len(sentence_one))

def test_packing_errors(self):
Expand Down

0 comments on commit 31188de

Please sign in to comment.