Permalink
Browse files

取消在select中检测writeable的fd,因为这有可能一直在发生,导致CPU100%

  • Loading branch information...
RitterHou committed Jul 21, 2018
1 parent b041e41 commit adface6868b25ceec1286c1f07e98e4904075a48
Showing with 7 additions and 31 deletions.
  1. +7 −31 dubbo/connection/connections.py
@@ -54,7 +54,7 @@ def get(self, host, request_param, timeout=None):
self.conn_events[invoke_id] = event
# 发送数据
logger.debug('Request has been send for request id {}'.format(invoke_id))
conn.send(request_data)
conn.write(request_data)
event.wait(timeout)
del self.conn_events[invoke_id]

@@ -171,7 +171,7 @@ def _parse_head(self, data, conn):
logger.debug('❤ request -> {}'.format(conn.remote_host()))
msg_id = data[4:12]
heartbeat_response = CLI_HEARTBEAT_RES_HEAD + list(msg_id) + CLI_HEARTBEAT_TAIL
conn.send(bytearray(heartbeat_response))
conn.write(bytearray(heartbeat_response))
return body_length, 3, None if body_length > 0 else DEFAULT_READ_PARAMS
elif heartbeat == 1:
logger.debug('❤ response -> {}'.format(conn.remote_host()))
@@ -263,7 +263,7 @@ def _check_conn(self, host):
self.client_heartbeats[host] += 1
invoke_id = list(bytearray(pack('!q', get_invoke_id())))
req = CLI_HEARTBEAT_REQ_HEAD + invoke_id + CLI_HEARTBEAT_TAIL
conn.send(bytearray(req))
conn.write(bytearray(req))
logger.debug('Head has been send for request id {}'.format(invoke_id))


@@ -280,12 +280,10 @@ def _read_from_server(self):
while 1:
try:
conns = self._connection_pool.values()
readable, writeable, exceptional = select.select(conns, conns, [], self.select_timeout)
readable, writeable, exceptional = select.select(conns, [], [], self.select_timeout)
except select.error as e:
logger.exception(e)
break
for conn in writeable:
conn.write()
for conn in readable:
conn.read(self._callback)

@@ -318,8 +316,6 @@ def __init__(self, host, port):

self.read_length, self.read_type, self.invoke_id = DEFAULT_READ_PARAMS
self.read_buffer = []
self.write_buffer = []
self.write_lock = threading.Lock()

self.last_active = time.time()

@@ -331,33 +327,13 @@ def fileno(self):
"""
return self.__sock.fileno()

def send(self, data):
"""
客户端执行发送操作
:param data:
:return:
"""
self.write_lock.acquire()
try:
self.write_buffer.extend(list(data))
finally:
self.write_lock.release()

def write(self):
def write(self, data):
"""
向远程主机写数据
:return:
"""
if len(self.write_buffer) == 0:
return
self.write_lock.acquire()
try:
if len(self.write_buffer) > 0:
length = self.__sock.send(bytearray(self.write_buffer))
self.write_buffer = self.write_buffer[length:]
self.last_active = time.time()
finally:
self.write_lock.release()
# TODO 这里有可能存在一次无法发送完毕的情况
self.__sock.send(data)

def read(self, callback):
"""

0 comments on commit adface6

Please sign in to comment.