Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
fix(service): fix send/recv for better compatability
Browse files Browse the repository at this point in the history
  • Loading branch information
hanhxiao committed Sep 25, 2019
1 parent c954e25 commit b6f2cda
Showing 1 changed file with 13 additions and 18 deletions.
31 changes: 13 additions & 18 deletions gnes/proto/__init__.py
Expand Up @@ -206,13 +206,13 @@ def extract_bytes_from_msg(msg: 'gnes_pb2.Message') -> Tuple:


def fill_raw_bytes_to_msg(msg: 'gnes_pb2.Message', msg_data: List[bytes]):
doc_byte_type = msg_data[3].decode()
chunk_byte_type = msg_data[4].decode()
doc_bytes_len = int(msg_data[5])
chunk_bytes_len = int(msg_data[6])
doc_byte_type = msg_data[2].decode()
chunk_byte_type = msg_data[3].decode()
doc_bytes_len = int(msg_data[4])
chunk_bytes_len = int(msg_data[5])

doc_bytes = msg_data[7:(7 + doc_bytes_len)]
chunk_bytes = msg_data[(7 + doc_bytes_len):]
doc_bytes = msg_data[6:(6 + doc_bytes_len)]
chunk_bytes = msg_data[(6 + doc_bytes_len):]

if len(chunk_bytes) != chunk_bytes_len:
raise ValueError('"chunk_bytes_len"=%d in message, but the actual length is %d' % (
Expand Down Expand Up @@ -261,16 +261,16 @@ def send_message(sock: 'zmq.Socket', msg: 'gnes_pb2.Message', timeout: int = -1,
sock.setsockopt(zmq.SNDTIMEO, -1)

if not squeeze_pb:
sock.send_multipart([msg.envelope.client_id.encode(), b'0', msg.SerializeToString()])
sock.send_multipart([msg.envelope.client_id.encode(), msg.SerializeToString()])
else:
doc_bytes, doc_byte_type, chunk_bytes, chunk_byte_type = extract_bytes_from_msg(msg)
# now raw_bytes are removed from message, hoping for faster de/serialization
sock.send_multipart(
[msg.envelope.client_id.encode(), # 0
b'1', msg.SerializeToString(), # 1, 2
doc_byte_type, chunk_byte_type, # 3, 4
b'%d' % len(doc_bytes), b'%d' % len(chunk_bytes), # 5, 6
*doc_bytes, *chunk_bytes]) # 7, 8
msg.SerializeToString(), # 1
doc_byte_type, chunk_byte_type, # 2, 3
b'%d' % len(doc_bytes), b'%d' % len(chunk_bytes), # 4, 5
*doc_bytes, *chunk_bytes]) # 6, 7
except zmq.error.Again:
raise TimeoutError(
'cannot send message to sock %s after timeout=%dms, please check the following:'
Expand All @@ -284,7 +284,6 @@ def send_message(sock: 'zmq.Socket', msg: 'gnes_pb2.Message', timeout: int = -1,

def recv_message(sock: 'zmq.Socket', timeout: int = -1, check_version: bool = False, **kwargs) -> Optional[
'gnes_pb2.Message']:
response = []
try:
if timeout > 0:
sock.setsockopt(zmq.RCVTIMEO, timeout)
Expand All @@ -293,19 +292,15 @@ def recv_message(sock: 'zmq.Socket', timeout: int = -1, check_version: bool = Fa

msg = gnes_pb2.Message()
msg_data = sock.recv_multipart()
squeeze_pb = (msg_data[1] == b'1')
msg.ParseFromString(msg_data[2])

msg.ParseFromString(msg_data[1])
if check_version:
check_msg_version(msg)

# now we have a barebone msg, we need to fill in data
if squeeze_pb:
if len(msg_data) > 2:
fill_raw_bytes_to_msg(msg, msg_data)
return msg

except ValueError:
raise ValueError('received a wrongly-formatted request (expected 4 frames, got %d)' % len(response))
except zmq.error.Again:
raise TimeoutError(
'no response from sock %s after timeout=%dms, please check the following:'
Expand Down

0 comments on commit b6f2cda

Please sign in to comment.