Skip to content
Permalink
Browse files

refactor(preprocessor): rename singleton to unary

  • Loading branch information...
hanxiao committed Jul 25, 2019
1 parent 187606c commit 9973f60065d8127bdc236e547faa2f44c4eb9afd
@@ -1,3 +1,3 @@
!BaseSingletonPreprocessor
!BaseUnaryPreprocessor
parameter:
doc_type: 2
@@ -292,7 +292,7 @@ def build_mermaid(all_layers: List['YamlComposer.Layer'], mermaid_leftright: boo
# if len(last_layer.components) > 1:
# self.mermaid_graph.append('\tend')

style = ['classDef FrontendCLS fill:#ffb347,stroke:#277CE8,stroke-width:1px,stroke-dasharray:5;',
style = ['classDef gRPCFrontendCLS fill:#FFAA04,stroke:#277CE8,stroke-width:1px;',
'classDef EncoderCLS fill:#27E1E8,stroke:#277CE8,stroke-width:1px;',
'classDef IndexerCLS fill:#27E1E8,stroke:#277CE8,stroke-width:1px;',
'classDef RouterCLS fill:#2BFFCB,stroke:#277CE8,stroke-width:1px;',
@@ -26,7 +26,7 @@
'VanillaSlidingPreprocessor': 'image.sliding_window',
'WeightedSlidingPreprocessor': 'image.sliding_window',
'SegmentPreprocessor': 'image.segmentation',
'BaseSingletonPreprocessor': 'base',
'BaseUnaryPreprocessor': 'base',
'BaseVideoPreprocessor': 'video.base',
'FFmpegPreprocessor': 'video.ffmpeg',
'ShotDetectPreprocessor': 'video.shotdetect',
@@ -38,7 +38,7 @@ def apply(self, doc: 'gnes_pb2.Document') -> None:
doc.doc_type = self.doc_type


class BaseSingletonPreprocessor(BasePreprocessor):
class BaseUnaryPreprocessor(BasePreprocessor):

def __init__(self, doc_type: int, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -6,7 +6,7 @@

from gnes.encoder.image.base import BasePytorchEncoder
from gnes.preprocessor.image.sliding_window import VanillaSlidingPreprocessor
from gnes.preprocessor.base import BaseSingletonPreprocessor
from gnes.preprocessor.base import BaseUnaryPreprocessor
from gnes.proto import gnes_pb2, blob2array


@@ -20,7 +20,7 @@ def img_process_for_test(dirname):
test_img.append(d)

test_img_all_preprocessor = []
for preprocessor in [BaseSingletonPreprocessor(doc_type=gnes_pb2.Document.IMAGE),
for preprocessor in [BaseUnaryPreprocessor(doc_type=gnes_pb2.Document.IMAGE),
VanillaSlidingPreprocessor()]:
test_img_copy = copy.deepcopy(test_img)
for img in test_img_copy:
@@ -12,13 +12,13 @@ class TestProto(unittest.TestCase):

def setUp(self):
self.dirname = os.path.dirname(__file__)
self.singleton_img_pre_yaml = os.path.join(self.dirname, 'yaml', 'base-singleton-image-prep.yml')
self.unary_img_pre_yaml = os.path.join(self.dirname, 'yaml', 'base-unary-image-prep.yml')
self.slidingwindow_img_pre_yaml = os.path.join(self.dirname, 'yaml', 'base-vanilla_sldwin-image-prep.yml')
self.segmentation_img_pre_yaml = os.path.join(self.dirname, 'yaml', 'base-segmentation-image-prep.yml')

def test_singleton_preprocessor_service_empty(self):
def test_unary_preprocessor_service_empty(self):
args = set_preprocessor_service_parser().parse_args([
'--yaml_path', self.singleton_img_pre_yaml
'--yaml_path', self.unary_img_pre_yaml
])
with PreprocessorService(args):
pass
@@ -37,9 +37,9 @@ def test_segmentation_preprocessor_service_empty(self):
with PreprocessorService(args):
pass

def test_singleton_preprocessor_service_echo(self):
def test_unary_preprocessor_service_echo(self):
args = set_preprocessor_service_parser().parse_args([
'--yaml_path', self.singleton_img_pre_yaml
'--yaml_path', self.unary_img_pre_yaml
])
c_args = _set_client_parser().parse_args([
'--port_in', str(args.port_out),
@@ -94,9 +94,9 @@ def test_segmentation_preprocessor_service_echo(self):
r = client.recv_message()
# print(r)

def test_singleton_preprocessor_service_realdata(self):
def test_unary_preprocessor_service_realdata(self):
args = set_preprocessor_service_parser().parse_args([
'--yaml_path', self.singleton_img_pre_yaml
'--yaml_path', self.unary_img_pre_yaml
])
c_args = _set_client_parser().parse_args([
'--port_in', str(args.port_out),
@@ -111,7 +111,7 @@ def test_singleton_preprocessor_service_realdata(self):
msg.request.index.CopyFrom(req.index)
client.send_message(msg)
r = client.recv_message()
self.assertEqual(r.envelope.routes[0].service, 'PreprocessorService:BaseSingletonPreprocessor')
self.assertEqual(r.envelope.routes[0].service, 'PreprocessorService:BaseUnaryPreprocessor')
for d in r.request.index.docs:
self.assertEqual(len(d.chunks), 1)
self.assertEqual(len(blob2array(d.chunks[0].blob).shape), 3)
@@ -1,20 +1,42 @@
import os
import time
import unittest.mock

import grpc

from gnes.cli.parser import set_grpc_frontend_parser, set_router_service_parser
from gnes.helper import TimeContext
from gnes.proto import RequestGenerator, gnes_pb2_grpc
from gnes.service.base import SocketType
from gnes.service.base import SocketType, MessageHandler, BaseService as BS
from gnes.service.grpc import GRPCFrontend
from gnes.service.router import RouterService


class Router1(RouterService):
handler = MessageHandler(BS.handler)

@handler.register(NotImplementedError)
def _handler_default(self, msg: 'gnes_pb2.Message'):
self.logger.info('im doing fancy jobs...')
time.sleep(2)
super()._handler_default(msg)


class Router2(RouterService):
handler = MessageHandler(BS.handler)

@handler.register(NotImplementedError)
def _handler_default(self, msg: 'gnes_pb2.Message'):
self.logger.info('im doing stupid jobs...')
time.sleep(6)
super()._handler_default(msg)


class TestStreamgRPC(unittest.TestCase):

def setUp(self):
self.all_bytes = [b'abc', b'def', b'cde'] * 10
self.all_bytes2 = [b'abc', b'def', b'cde']

@unittest.mock.patch.dict(os.environ, {'http_proxy': '', 'https_proxy': ''})
def test_grpc_frontend(self):
@@ -43,3 +65,35 @@ def test_grpc_frontend(self):
with TimeContext('async call'): # immeidiately returns 0.001 s
resp = stub.RequestStreamCall.future(RequestGenerator.train(self.all_bytes, 1))
self.assertEqual(resp.result().request_id, str(len(self.all_bytes)))

@unittest.mock.patch.dict(os.environ, {'http_proxy': '', 'https_proxy': ''})
def test_async_block(self):
args = set_grpc_frontend_parser().parse_args([
'--grpc_host', '127.0.0.1',
])

p1_args = set_router_service_parser().parse_args([
'--port_in', str(args.port_out),
'--port_out', '8899',
'--socket_in', str(SocketType.PULL_CONNECT),
'--socket_out', str(SocketType.PUSH_CONNECT),
])

p2_args = set_router_service_parser().parse_args([
'--port_in', str(p1_args.port_out),
'--port_out', str(args.port_in),
'--socket_in', str(SocketType.PULL_BIND),
'--socket_out', str(SocketType.PUSH_CONNECT),
])

with Router1(p1_args), Router2(p2_args), GRPCFrontend(args), grpc.insecure_channel(
'%s:%s' % (args.grpc_host, args.grpc_port),
options=[('grpc.max_send_message_length', 70 * 1024 * 1024),
('grpc.max_receive_message_length', 70 * 1024 * 1024)]) as channel:
stub = gnes_pb2_grpc.GnesRPCStub(channel)
with TimeContext('sync call'): # about 5s
resp = stub.RequestStreamCall.future(RequestGenerator.train(self.all_bytes, 1))

self.assertEqual(resp.result().request_id, str(len(self.all_bytes)))

self.assertEqual(resp.request_id, str(len(self.all_bytes2))) # idx start with 0, but +1 for final FLUSH
@@ -1,3 +1,3 @@
!BaseSingletonPreprocessor
!BaseUnaryPreprocessor
parameter:
doc_type: 2
@@ -1,3 +1,3 @@
!BaseSingletonPreprocessor
!BaseUnaryPreprocessor
parameter:
doc_type: 1

0 comments on commit 9973f60

Please sign in to comment.
You can’t perform that action at this time.