Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
Merge pull request #324 from gnes-ai/feat-grpc-proxy
Browse files Browse the repository at this point in the history
feat(grpc): add proxy argument to cli
  • Loading branch information
mergify[bot] committed Oct 12, 2019
2 parents e64bc7a + 4055ad8 commit e3ab1aa
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 26 deletions.
10 changes: 7 additions & 3 deletions gnes/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ def set_service_parser(parser=None):
'dump_interval will be ignored')
parser.add_argument('--parallel_backend', type=str, choices=['thread', 'process'], default='thread',
help='parallel backend of the service')
parser.add_argument('--num_parallel', type=int, default=1,
help='number of parallel services running at the same time, '
parser.add_argument('--num_parallel', '--replicas', type=int, default=1,
help='number of parallel services running at the same time (i.e. replicas), '
'`port_in` and `port_out` will be set to random, '
'and routers will be added automatically when necessary')
parser.add_argument('--parallel_type', type=ParallelType.from_string, choices=list(ParallelType),
parser.add_argument('--parallel_type', '--replica_type', type=ParallelType.from_string, choices=list(ParallelType),
default=ParallelType.PUSH_NONBLOCK,
help='parallel type of the concurrent services')
parser.add_argument('--check_version', action=ActionNoYes, default=True,
Expand Down Expand Up @@ -308,6 +308,10 @@ def _set_grpc_parser(parser=None):
help='host port of the grpc service')
parser.add_argument('--max_message_size', type=int, default=-1,
help='maximum send and receive size for grpc server in bytes, -1 means unlimited')
parser.add_argument('--proxy', action=ActionNoYes, default=False,
help='respect the http_proxy and https_proxy environment variables. '
'otherwise, it will unset these proxy variables before start. '
'gRPC seems perfer --no_proxy')
return parser


Expand Down
4 changes: 4 additions & 0 deletions gnes/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from typing import Tuple, List, Union

import grpc
Expand Down Expand Up @@ -120,6 +121,9 @@ class GrpcClient:

def __init__(self, args):
self.args = args
if not args.proxy:
os.unsetenv('http_proxy')
os.unsetenv('https_proxy')
self.logger = set_logger(self.__class__.__name__, self.args.verbose)
self.logger.info('setting up grpc insecure channel...')
# A gRPC channel provides a connection to a remote gRPC server.
Expand Down
101 changes: 80 additions & 21 deletions gnes/flow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import copy
import os
from collections import OrderedDict, defaultdict
from contextlib import ExitStack
from functools import wraps
Expand Down Expand Up @@ -143,30 +142,81 @@ def to_mermaid(self, left_right: bool = True):
Output the mermaid graph for visualization
:param left_right: render the flow in left-to-right manner, otherwise top-down manner.
:return:
:return: a mermaid-formatted string
"""

# fill, stroke
service_color = {
Service.Frontend: ('#FFE0E0', '#000'),
Service.Router: ('#C9E8D2', '#000'),
Service.Encoder: ('#FFDAAF', '#000'),
Service.Preprocessor: ('#CED7EF', '#000'),
Service.Indexer: ('#FFFBC1', '#000'),
}

mermaid_graph = OrderedDict()
for k in self._service_nodes.keys():
mermaid_graph[k] = []
cls_dict = defaultdict(set)
replicas_dict = {}

for k, v in self._service_nodes.items():
mermaid_graph[k] = []
num_replicas = getattr(v['parsed_args'], 'num_parallel', 1)
if num_replicas > 1:
head_router = k + '_HEAD'
tail_router = k + '_TAIL'
replicas_dict[k] = (head_router, tail_router)
cls_dict[Service.Router].add(head_router)
cls_dict[Service.Router].add(tail_router)
p_r = '((%s))'
k_service = v['service']
p_e = '((%s))' if k_service == Service.Router else '(%s)'

mermaid_graph[k].append('subgraph %s["%s (replias=%d)"]' % (k, k, num_replicas))
for j in range(num_replicas):
r = k + '_%d' % j
cls_dict[k_service].add(r)
mermaid_graph[k].append('\t%s%s-->%s%s' % (head_router, p_r % 'router', r, p_e % r))
mermaid_graph[k].append('\t%s%s-->%s%s' % (r, p_e % r, tail_router, p_r % 'router'))
mermaid_graph[k].append('end')
mermaid_graph[k].append(
'style %s fill:%s,stroke:%s,stroke-width:2px,stroke-dasharray:5,stroke-opacity:0.3,fill-opacity:0.5' % (
k, service_color[k_service][0], service_color[k_service][1]))

for k, ed_type in self._service_edges.items():
start_node, end_node = k.split('-')
cur_node = mermaid_graph[start_node]

s_service = self._service_nodes[start_node]['service']
e_service = self._service_nodes[end_node]['service']

start_node_text = start_node
end_node_text = end_node

# check if is in replicas
if start_node in replicas_dict:
start_node = replicas_dict[start_node][1] # outgoing
s_service = Service.Router
start_node_text = 'router'
if end_node in replicas_dict:
end_node = replicas_dict[end_node][0] # incoming
e_service = Service.Router
end_node_text = 'router'

# always plot frontend at the start and the end
if e_service == Service.Frontend:
end_node_text = end_node
end_node += '_END'

cls_dict[s_service].add(start_node)
cls_dict[e_service].add(end_node)
p_s = '((%s))' if s_service == Service.Router else '(%s)'
p_e = '((%s))' if e_service == Service.Router else '(%s)'
mermaid_graph[start_node].append('\t%s%s-- %s -->%s%s' % (
start_node, p_s % start_node, ed_type,
end_node, p_e % end_node))

style = ['classDef FrontendCLS fill:#FFE0E0,stroke:#FFE0E0,stroke-width:1px;',
'classDef EncoderCLS fill:#FFDAAF,stroke:#FFDAAF,stroke-width:1px;',
'classDef IndexerCLS fill:#FFFBC1,stroke:#FFFBC1,stroke-width:1px;',
'classDef RouterCLS fill:#C9E8D2,stroke:#C9E8D2,stroke-width:1px;',
'classDef PreprocessorCLS fill:#CEEEEF,stroke:#CEEEEF,stroke-width:1px;']
cur_node.append('\t%s%s-- %s -->%s%s' % (
start_node, p_s % start_node_text, ed_type,
end_node, p_e % end_node_text))

style = ['classDef %sCLS fill:%s,stroke:%s,stroke-width:1px;' % (k, v[0], v[1]) for k, v in
service_color.items()]
class_def = ['class %s %sCLS;' % (','.join(v), k) for k, v in cls_dict.items()]
mermaid_str = '\n'.join(
['graph %s' % ('LR' if left_right else 'TD')] + [ss for s in mermaid_graph.values() for ss in
Expand All @@ -175,19 +225,30 @@ def to_mermaid(self, left_right: bool = True):
return mermaid_str

@_build_level(BuildLevel.GRAPH)
def to_jpg(self, path: str = 'flow.jpg', left_right: bool = True):
def to_url(self, **kwargs) -> str:
"""
Rendering the current flow as a url points to a SVG, it needs internet connection
:param kwargs: keyword arguments of :py:meth:`to_mermaid`
:return: the url points to a SVG
"""
import base64
mermaid_str = self.to_mermaid(**kwargs)
encoded_str = base64.b64encode(bytes(mermaid_str, 'utf-8')).decode('utf-8')
return 'https://mermaidjs.github.io/mermaid-live-editor/#/view/%s' % encoded_str

@_build_level(BuildLevel.GRAPH)
def to_jpg(self, path: str = 'flow.jpg', **kwargs):
"""
Rendering the current flow as a jpg image, this will call :py:meth:`to_mermaid` and it needs internet connection
:param path: the file path of the image
:param left_right: render the flow in left-to-right manner, otherwise top-down manner.
:param kwargs: keyword arguments of :py:meth:`to_mermaid`
:return:
"""
import base64

from urllib.request import Request, urlopen
mermaid_str = self.to_mermaid(left_right)
encoded_str = base64.b64encode(bytes(mermaid_str, 'utf-8')).decode('utf-8')
print('https://mermaidjs.github.io/mermaid-live-editor/#/view/%s' % encoded_str)
encoded_str = self.to_url().replace('https://mermaidjs.github.io/mermaid-live-editor/#/view/', '')
self.logger.info('saving jpg...')
req = Request('https://mermaid.ink/img/%s' % encoded_str, headers={'User-Agent': 'Mozilla/5.0'})
with open(path, 'wb') as fp:
Expand Down Expand Up @@ -226,8 +287,6 @@ def query(self, bytes_gen: Iterator[bytes] = None, **kwargs):

@_build_level(BuildLevel.RUNTIME)
def _call_client(self, bytes_gen: Iterator[bytes] = None, **kwargs):
os.unsetenv('http_proxy')
os.unsetenv('https_proxy')
args, p_args = self._get_parsed_args(self, set_client_cli_parser, kwargs)
p_args.grpc_port = self._service_nodes[self._frontend]['parsed_args'].grpc_port
p_args.grpc_host = self._service_nodes[self._frontend]['parsed_args'].grpc_host
Expand Down
2 changes: 2 additions & 0 deletions gnes/service/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,11 +553,13 @@ def __init__(self, service_cls, args):
if args.num_parallel > 1:
from .router import RouterService
_head_router = copy.deepcopy(args)
_head_router.yaml_path = resolve_yaml_path('BaseRouter')
_head_router.port_ctrl = self._get_random_port()
port_out = self._get_random_port()
_head_router.port_out = port_out

_tail_router = copy.deepcopy(args)
_tail_router.yaml_path = resolve_yaml_path('BaseRouter')
port_in = self._get_random_port()
_tail_router.port_in = port_in
_tail_router.port_ctrl = self._get_random_port()
Expand Down
4 changes: 4 additions & 0 deletions gnes/service/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.


import os
import threading
from concurrent.futures import ThreadPoolExecutor

Expand All @@ -28,6 +29,9 @@
class FrontendService:

def __init__(self, args):
if not args.proxy:
os.unsetenv('http_proxy')
os.unsetenv('https_proxy')
self.logger = set_logger(self.__class__.__name__, args.verbose)
self.server = grpc.server(
ThreadPoolExecutor(max_workers=args.max_concurrency),
Expand Down
27 changes: 25 additions & 2 deletions tests/test_gnes_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,26 @@ def test_flow5(self):
print(f.to_mermaid())
f.to_jpg()

def test_flow_replica_pot(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor', replicas=4)
.add(gfs.Encoder, yaml_path='PyTorchTransformers', replicas=3)
.add(gfs.Indexer, name='vec_idx', yaml_path='NumpyIndexer', replicas=2)
.add(gfs.Indexer, name='doc_idx', yaml_path='DictIndexer', service_in='prep', replicas=2)
.add(gfs.Router, name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, service_in=['vec_idx', 'doc_idx'])
.build(backend=None))
print(f.to_mermaid())
print(f.to_url(left_right=False))
print(f.to_url(left_right=True))

def _test_index_flow(self, backend):
for k in [self.indexer1_bin, self.indexer2_bin, self.encoder_bin]:
self.assertFalse(os.path.exists(k))

flow = (Flow(check_version=False, route_table=False)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor')
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'))
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add(gfs.Indexer, name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'))
.add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml'),
service_in='prep')
Expand All @@ -137,7 +150,7 @@ def _test_index_flow(self, backend):
def _test_query_flow(self, backend):
flow = (Flow(check_version=False, route_table=False)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor')
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'))
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add(gfs.Indexer, name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'))
.add(gfs.Router, name='scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml'))
.add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml')))
Expand All @@ -153,3 +166,13 @@ def test_index_query_flow(self):
def test_indexe_query_flow_proc(self):
self._test_index_flow('process')
self._test_query_flow('process')

def test_query_flow_plot(self):
flow = (Flow(check_version=False, route_table=False)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor', replicas=2)
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add(gfs.Indexer, name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'),
replicas=4)
.add(gfs.Router, name='scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml'))
.add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml')))
print(flow.build(backend=None).to_url())

0 comments on commit e3ab1aa

Please sign in to comment.