Skip to content
Permalink
Browse files

clean and format codes

  • Loading branch information...
numb3r3 committed Sep 12, 2019
1 parent f60de64 commit edba197a22edb809d5336ff755b1db623e04bb28
Showing with 22 additions and 11 deletions.
  1. +21 −11 gnes/client/base.py
  2. +1 −0 gnes/client/stream.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import grpc
import zmq
from termcolor import colored
@@ -26,12 +25,14 @@


class ResponseHandler:

def __init__(self, h: 'ResponseHandler' = None):
self.routes = {k: v for k, v in h.routes.items()} if h else {}
self.logger = set_logger(self.__class__.__name__)
self._context = None

def register(self, resp_type: Union[List, Tuple, type]):

def decorator(f):
if isinstance(resp_type, list) or isinstance(resp_type, tuple):
for t in resp_type:
@@ -43,12 +44,16 @@ def decorator(f):
return decorator

def call_routes(self, resp: 'gnes_pb2.Response'):

def get_default_fn(r_type):
self.logger.warning('cant find handler for response type: %s, fall back to the default handler' % r_type)
self.logger.warning(
'cant find handler for response type: %s, fall back to the default handler'
% r_type)
f = self.routes.get(r_type, self.routes[NotImplementedError])
return f

self.logger.info('received a response for request %d' % resp.request_id)
self.logger.info(
'received a response for request %d' % resp.request_id)
if resp.WhichOneof('body'):
body = getattr(resp, resp.WhichOneof('body'))
resp_type = type(body)
@@ -70,14 +75,17 @@ def __init__(self, args):
self.logger = set_logger(self.__class__.__name__, self.args.verbose)
self.ctx = zmq.Context()
self.ctx.setsockopt(zmq.LINGER, 0)
self.receiver, recv_addr = build_socket(self.ctx, self.args.host_in, self.args.port_in, self.args.socket_in,
getattr(self, 'identity', None))
self.sender, send_addr = build_socket(self.ctx, self.args.host_out, self.args.port_out, self.args.socket_out,
self.receiver, recv_addr = build_socket(
self.ctx, self.args.host_in, self.args.port_in,
self.args.socket_in, getattr(self, 'identity', None))
self.sender, send_addr = build_socket(self.ctx, self.args.host_out,
self.args.port_out,
self.args.socket_out,
getattr(self, 'identity', None))
self.logger.info(
'input %s:%s\t output %s:%s' % (
self.args.host_in, colored(self.args.port_in, 'yellow'),
self.args.host_out, colored(self.args.port_out, 'yellow')))
'input %s:%s\t output %s:%s' %
(self.args.host_in, colored(self.args.port_in, 'yellow'),
self.args.host_out, colored(self.args.port_out, 'yellow')))

def __enter__(self):
return self
@@ -95,7 +103,10 @@ def send_message(self, message: "gnes_pb2.Message", timeout: int = -1):
send_message(self.sender, message, timeout=timeout)

def recv_message(self, timeout: int = -1) -> gnes_pb2.Message:
r = recv_message(self.receiver, timeout=timeout, check_version=self.args.check_version)
r = recv_message(
self.receiver,
timeout=timeout,
check_version=self.args.check_version)
self.logger.debug('recv a message: %s' % r.envelope)
return r

@@ -162,4 +173,3 @@ def open(self):
def close(self):
self._channel.close()
self._stub = None
self.total_response = 0
@@ -37,6 +37,7 @@ def close(self):
self._pool.shutdown(wait=True)
super().close()


class StreamingClient(GrpcClient):
handler = ResponseHandler(GrpcClient.handler)

0 comments on commit edba197

Please sign in to comment.
You can’t perform that action at this time.