Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
feat(frontend): new route time table
Browse files Browse the repository at this point in the history
  • Loading branch information
hanhxiao committed Sep 12, 2019
1 parent c31f21d commit 6c8017c
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 25 deletions.
12 changes: 3 additions & 9 deletions gnes/proto/__init__.py
Expand Up @@ -103,13 +103,7 @@ def array2blob(x: np.ndarray) -> 'gnes_pb2.NdArray':


def router2str(m: 'gnes_pb2.Message') -> str:
route_str = []
for r in m.envelope.routes:
if r.num_replicas and r.num_replicas > 1:
route_str.append('%s%s' % (r.service, colored(' x%d' % r.num_replicas, 'yellow')))
else:
route_str.append(r.service)

route_str = [r.service for r in m.envelope.routes]
return colored('▸', 'green').join(route_str)


Expand All @@ -122,8 +116,8 @@ def add_route(evlp: 'gnes_pb2.Envelope', name: str):
def merge_routes(msg: 'gnes_pb2.Message', prev_msgs: List['gnes_pb2.Message'], idx: int = -1):
r = msg.envelope.routes[idx]
if len(msg.envelope.routes) > 1:
msg.envelope.routes[idx - 1].service = '[%s]' % ', '.join([r.service for r in msg.envelope.routes])
r.num_replicas = len(prev_msgs)
msg.envelope.routes[idx - 1].service = colored('[%s]' % ', '.join(
[m.envelope.routes[idx - 1].service for m in prev_msgs]), 'yellow')
r.first_start_time.CopyFrom(
sorted((m.envelope.routes[idx].start_time for m in prev_msgs),
key=lambda x: (x.seconds, x.nanos))[0])
Expand Down
1 change: 0 additions & 1 deletion gnes/proto/gnes.proto
Expand Up @@ -93,7 +93,6 @@ message Envelope {
google.protobuf.Timestamp end_time = 3;
google.protobuf.Timestamp first_start_time = 4;
google.protobuf.Timestamp last_end_time = 5;
uint32 num_replicas = 6;
}
repeated route routes = 6;

Expand Down
29 changes: 17 additions & 12 deletions gnes/service/frontend.py
Expand Up @@ -95,13 +95,18 @@ def remove_envelope(self, m: 'gnes_pb2.Message'):
route_time.append((k.service, d))
sum_duration += d

route_time.append(('system', total_duration - sum_duration))
route_time.append(('total', total_duration))
route_time.append(('job', sum_duration))

route_table = '\n'.join(
['%40s\t%.3fs\t%2.0f%%' % (k[0], k[1], k[1] / total_duration * 100) for k in
sorted(route_time, key=lambda x: x[1], reverse=True)])
def get_table_str(time_table):
return '\n'.join(
['%40s\t%3.3fs\t%3d%%' % (k[0], k[1], k[1] / total_duration * 100) for k in
sorted(time_table, key=lambda x: x[1], reverse=True)])

summary = [('system', total_duration - sum_duration),
('total', total_duration),
('job', sum_duration)]

route_table = ('\n%s\n' % ('-' * 80)).join(
['%40s\t%-6s\t%3s' % ('Breakdown', 'Time', 'Percent'), get_table_str(route_time),
get_table_str(summary)])
self.logger.info('route table: \n%s' % route_table)

return resp
Expand All @@ -111,12 +116,12 @@ def get_duration(start_time, end_time):
d_s = end_time.seconds - start_time.seconds
d_n = end_time.nanos - start_time.nanos
if d_s < 0 and d_n > 0:
d_s += 1
d_n -= 1e9
d_s = max(d_s + 1, 0)
d_n = max(d_n - 1e9, 0)
elif d_s > 0 and d_n < 0:
d_s -= 1
d_n += 1e9
return d_s + d_n / 1e9
d_s = max(d_s - 1, 0)
d_n = max(d_n + 1e9, 0)
return max(d_s + d_n / 1e9, 0)

def Call(self, request, context):
with self.zmq_context as zmq_client:
Expand Down
6 changes: 4 additions & 2 deletions tests/test_client_cli.py
@@ -1,10 +1,11 @@
import unittest
import os
import unittest

from gnes.cli.parser import set_frontend_parser, set_router_parser, set_client_cli_parser
from gnes.client.cli import CLIClient
from gnes.service.base import SocketType
from gnes.service.frontend import FrontendService
from gnes.service.router import RouterService
from gnes.client.cli import CLIClient


class TestCLI(unittest.TestCase):
Expand All @@ -21,6 +22,7 @@ def test_cli(self):
])

args = set_frontend_parser().parse_args([
'--show_route_table'
])

p_args = set_router_parser().parse_args([
Expand Down
2 changes: 1 addition & 1 deletion tests/test_dict_indexer.py
Expand Up @@ -33,7 +33,7 @@ def setUp(self):
def test_pymode(self):
os.unsetenv('http_proxy')
os.unsetenv('https_proxy')
args = set_frontend_parser().parse_args([])
args = set_frontend_parser().parse_args(['--show_route_table'])

p_args = set_preprocessor_parser().parse_args([
'--port_in', str(args.port_out),
Expand Down
1 change: 1 addition & 0 deletions tests/test_grpc_service.py
Expand Up @@ -56,6 +56,7 @@ def setUp(self):
'--grpc_port', '9999',
'--port_in', str(self.s_args.port_out),
'--port_out', str(self.s_args.port_in),
'--show_route_table'
])

def test_grpc_empty_service(self):
Expand Down
2 changes: 2 additions & 0 deletions tests/test_service_mgr.py
Expand Up @@ -31,6 +31,7 @@ def _test_multiple_router(self, backend='thread', num_parallel=5):
def _test_grpc_multiple_router(self, backend='thread', num_parallel=5):
args = set_frontend_parser().parse_args([
'--grpc_host', '127.0.0.1',
'--show_route_table'
])

p_args = set_router_parser().parse_args([
Expand All @@ -54,6 +55,7 @@ def _test_grpc_multiple_router(self, backend='thread', num_parallel=5):
def _test_grpc_multiple_pub(self, backend='thread', num_parallel=5):
args = set_frontend_parser().parse_args([
'--grpc_host', '127.0.0.1',
'--show_route_table'
])

p_args = set_router_parser().parse_args([
Expand Down

0 comments on commit 6c8017c

Please sign in to comment.